none
Back Pressure issue in RX.Net RRS feed

  • Question

  • I wanted to read lines from the CSV File and use RX.Net to do some transformation.

    I only want to buffer the Updates every 50 rows and send the update every 250 milliseconds

            public static IEnumerable<string> ReadCSV(string filePath)
            {
                var reader = new StreamReader(File.OpenRead(filePath));
                while (!reader.EndOfStream)
                {
                    var line = reader.ReadLine();
                    yield return line;
                }
            }
    var rows = ReadCSV("filePath").ToObservable();
    
    rows
        .Buffer(50)
        .Zip(Observable.Interval(
            TimeSpan.FromMilliseconds(250)), (res, _) => res)
        .Subscribe(lines =>
            {
                //do something
            });

    But the memory usage is going crazy (depends on the file size, I got 80mb csv file, then the application goes to 1gb ).

    Then I do some research on that, what happening here is Zip is waiting for both sequence to give the signal. but the producer(in this case is the csv reader) is always returning rows, not matter the previous one is processed or not.

    So the csv sequence is keep storing the buffered lines, and wait for the 250 milliseconds to process.

    Also what makes it even worse is that, the memory didn't released even all the updates are finished.

    If I remove the zip, the memory looks very good (move around 20mb), it looks like it's releasing the memory for each update  after the "do something" is finished.

    Two questions

    1. Is there a way to tell the observable I want to pause the read until the previous one is processed(in my case is the buffered 50 lines)

    2. Why the memory is not released after all the updates are being processed, is there a way to avoid this?

    Wednesday, July 3, 2019 2:05 AM