none
How to apply process on very fast receiving response in websocket data ? RRS feed

  • Question

  • Hi, I am making connection with stock market live data provider through web sockets. I am getting nearly 40,000 records per minute in response for all companies live price. I need to store all that data into text file and so that I need to apply my code for all responses and when I write code for all responses then it makes process very slow. When I apply code on response then it execute only 5,000 records. So it's very slow as compare to our responses. So how to make our process fast so I can store data with receiving speed. Please help me to solve this.

    I have write my code below :

    public static string tempFile = @"D:\LiveData.txt";
    private void websocket_MessageReceived(object sender, MessageReceivedEventArgs e)
    {
     using (w = System.IO.File.AppendText(tempFile))
     {
        Log(e.Message, w);
     }
     using (System.IO.StreamReader r = System.IO.File.OpenText(tempFile))
     {
         DumpLog(r);
     }
    }

    public static void Log(string responseMessage, System.IO.TextWriter w)
    {
        w.WriteLine(responseMessage);
    }

    static string tempPath = @"D:\LongTimeHoldingDataFile.txt";

    public static void DumpLog(System.IO.StreamReader r)
    {
       string line;
       while ((line = r.ReadLine()) != null)
       {
         using (System.IO.StreamWriter w = System.IO.File.AppendText(tempPath))
         {
             w.WriteLine(line);
             w.Close();
             something();
          }
       }
       r.Close();
       var lines = System.IO.File.ReadAllLines(tempFile).Skip(1);
       System.IO.File.WriteAllLines(tempFile, lines);
    }

    private const string DirPath = @"D:\seprateSymbolPair";
    private const string Separator = @",";
    public static void something()
    {
        System.IO.File.ReadLines(tempFile).ToList()
                    .AsParallel()
         .Select(Newtonsoft.Json.JsonConvert.DeserializeObject<List<CryptoObject>>)
                    .ForAll(WriteRecord);
    }

    public static void WriteRecord(List<CryptoObject> data)
    {
       foreach (var item in data)
       {
         var fileNames = System.IO.Directory.GetFiles(DirPath, item.pair + ".txt", System.IO.SearchOption.AllDirectories);
         foreach (var fileName in fileNames)
         {
           List<string> teFil = new List<string>();
           try
           {
             var fileLines = System.IO.File.ReadAllLines(fileName).Skip(1).ToList();
             teFil = fileLines;
             writeLineTo(fileName, fileLines, item.pair, item.ap, (int)item.bp);
           }
           catch (Exception ex)
           {
              if (ex.Message.StartsWith("The process cannot access the file"))
              {
                 System.Threading.Thread.Sleep(500);
                 writeLineTo(fileName, teFil, item.pair, item.ap, (int)item.bp);
               }
            }
         }
       }
    }

    public static void writeLineTo(string fileName, List<string> fileLines, string sym, double clo, int vol)
    {
        try
        {
            fileLines.Add(new StringBuilder().Append(sym).Append(Separator)
    .Append(clo).Append(Separator).Append(vol).Append(Environment.NewLine)
           .ToString());
            System.IO.File.WriteAllLines(fileName, fileLines);
         }
         catch (Exception ex)
         {
            if (ex.Message.StartsWith("The process cannot access the file"))
            {
               System.Threading.Thread.Sleep(5);
               fileLines.Add(new StringBuilder().Append(sym).Append(Separator)
                .Append(clo).Append(Separator).Append(vol)
                .Append(Environment.NewLine).ToString());
                System.IO.File.WriteAllLines(fileName, fileLines);
            }
         }
    }

    Monday, February 18, 2019 11:42 AM

All replies

  • Okay, some recommendations to make it faster:

    First, avoid temporary files. Every time you receive a message, you are saving it to a temporary file and then opening the temporary file for a streamreader that reads it line by line.

    Instead, open a StringReader against the received message. Then you can read from the StringReader in the same way as you were reading from the StreamReader. But you save the intermediate file; everything is done in memory.

    And then, in DumpLog you are opening a file using AppendText. Opening a file for appending is a slow operation. Instead, open the file once and keep it open. Then, simply keep writing into the file. This will result in each write being appended at the end, without having to open and reopen the file. Even more, the way you are using the file now, you just close it and read it back. Avoid it. Don't use the file. Simply apply the same principle as in the beginning, and keep reading from the preceding step in memory without using temporary file. Only write into a file at the very end, when all processing is done.

    If you keep everything in memory without using intermediate files, and you keep the destination file open without closing and reopening, then 40.000 records per minute should be easy to achieve. Even 40.000 per second should be doable without difficulty.

    Monday, February 18, 2019 10:54 PM
    Moderator
  • Hi Jack, 
               Thank you very much for your valuable reply. and sorry for late reply.
    Your recommendations  is really very good. I apply some changes on code like below : 

    public static StringBuilder sb = new StringBuilder();
    public static System.IO.StringWriter sw;
    private void websocket_MessageReceived(object sender, MessageReceivedEventArgs e)
    {
        sw = new System.IO.StringWriter(sb);
        sw.WriteLine(e.Message);
        Reader();
    }
    
    public static void Reader()
    {
        System.IO.StringReader _sr = new System.IO.StringReader(sb.ToString());
         while (_sr.Peek() > -1)
         {
            WriteRecord(sb.ToString());
         }
         sb.Remove(0, sb.Length);
    }
    
     public static void WriteRecord(string data)
    {
       List<LiveAMData> ld = JsonConvert.DeserializeObject<List<LiveAMData>>(data);
       List<string> fileLines = new List<string>();
       foreach (var item in ld)
       {
          if (item.sym != null)
          {
                var fileName = @"D:\symbolsDataFile1\" + item.sym + "_Aggregate.txt";
                fileLines = File.ReadAllLines(fileName).AsParallel().Skip(1).ToList();
                fileLines.Add(item.sym + "," + item.s + "," + item.o + "," + item.h);
                File.WriteAllLines(fileName, fileLines);
           }
       }
    }

    So, This code right now I am using. We are handling 10,000 files in WriteRecord method. We need to bifurcate response according to symbol and store in it's file.
    It's working faster then earlier but still after few minutes it's becoming slow and due to slow work websocket connection become closed. 

    Is there any option to make process faster of WriteRecord method? So It can run smoothly.
     


    Friday, March 1, 2019 3:48 PM
  • You're reading the entire file in, appending the line in memory, and writing it back out.  Clearly, that's going to get slower and slower and slower as time goes on.

    You need to open the file in "append" mode, as in

        using (StreamWriter sw = File.AppendText(fileName)) 
        {
            sw.WriteLine(item.sym+","+item.s"+,+"item.o+","+item.h);
        }
    Even that may not be enough.  You may have to start looking at some "big data" streaming technologies to handle that data, especially if it runs 24/7.



    Tim Roberts | Driver MVP Emeritus | Providenza &amp; Boekelheide, Inc.

    Saturday, March 2, 2019 1:38 AM
  • Hi

    Is your problem solved? If so, please post "Mark as answer" to the appropriate answer, so that it will help other members to find a solution quickly if they face a similar issue.

    Best Regards,

    Jack


    MSDN Community Support
    Please remember to click "Mark as Answer" the responses that resolved your issue, and to click "Unmark as Answer" if not. This can be beneficial to other community members reading this thread. If you have any compliments or complaints to MSDN Support, feel free to contact MSDNFSF@microsoft.com.


    Monday, March 4, 2019 6:29 AM
    Moderator
  • Hi Tim,
              Thank you for your help.
    I think process handling with websocket response data become slower due to WriteRecord() method. I need to remove first line from text file and add a new line in file. So if I will use StreamWriter as you suggest then should I use StreamReader also for remove first line from file ?

    Also this process will be run for nearly 6 to 7 hours during NASDAQ Stock Market Open time. So "big data" streaming will be helpful for this? I am not familiar with "big data" concept so please can you share me your guidance to me on it?

    Thank you,
    Hardik
    Monday, March 4, 2019 10:50 AM
  • Hi Jack,
               As per your guidelines it's become more faster then earlier. 
    I have checked new code with websocket connection at live market. So after few minutes, socket connection become close due to process speed on response data is slower then receiving data. So I think socket buffer becomes full and connection becomes close. 
         I am thinking that WriteRecord is still taking time which it should not consume at live market/process. So please can you guide me that how should I make process very fast on WriteRecord Method. So it can be able to handle websocket response with same speed without connection close.

    Best Regards,
    Hardik Dhankecha

     

    Monday, March 4, 2019 10:56 AM