none
ReplaySubject causes memory leak

    Question

  • I have a program, which I've simplified to the following:

     

    public static void Main(string[] args)
    {
            ReplaySubject<string> subject = new ReplaySubject<string>(10);
    
            subject.Subscribe(str => Console.WriteLine(str));
    
            Task.Factory.StartNew(() =>
             {
                    while (true)
                    {
                        subject.OnNext("Testing...");
                    }
            });
    
            Console.ReadKey();
    }

     

    Here, I have a loop that continually publishes messages (without any thread sleeping or interval to better show my point). This works. However, if I switch the order of the Subscribe and the beginning of the publishing, memory usage goes through the roof.

            public static void Main(string[] args)
            {
                ReplaySubject<string> subject = new ReplaySubject<string>(10);
    
                Task.Factory.StartNew(() =>
                    {
                        while (true)
                        {
                            subject.OnNext("Testing...");
                        }
                    });
    
                subject.Subscribe(str => Console.WriteLine(str));
    
                Console.ReadKey();
            }

     

    How can I subscribe after I begin publishing without getting a memory leak?

     

    Thanks!

    Nick Ramirez

    Tuesday, January 31, 2012 6:49 PM

All replies

  • Update:

    Maybe Rx doesn't play well with TPL? Using Observable.Interval instead fixed the memory leak.

     

     

    Observable.Interval(TimeSpan.FromSeconds(1), Scheduler.TaskPool).Subscribe((t) =>
    
                    {  
    
                      subject.OnNext("Testing...");
                    });

     

    Nick

    Wednesday, February 01, 2012 1:14 AM
  • Hi Nick

    I believe that your test has one problem which is the Console.WriteLine

    The write line occurs on a single thread and it taking more time than the
    string allocations.

    Even those test wasn’t correct your observation was quit correct.
    I was running the same test using the replay subject and the QueueSubject
    (which is part of my Rx.Contrib open source project  http://rxcontrib.codeplex.com/)
    and I have found that the replay subject is leaking while the queue subject
    has the following benefits:
    lower memory footprint
    much faster execution
    and doesn’t leak

    Currently the queue subject does not have the exact behavior of the replay subject
    but I consider having such behavior and also adding a threshold behavior (like the one of the BlockingCollection)

    You can try the following test

            private static long _counter = 0;

            public static void Main(string[] args)

            {

                Console.WindowWidth = 50;

                Console.BufferWidth = 50;

     

                Console.WriteLine("press any key");

                Console.ReadKey();

                ISubject<string> subject = QueueSubject<string>.Create();

                //ISubject<string> subject = new ReplaySubject<string>(10);

     

                Task.Factory.StartNew(() =>

                {

                    while (true)

                    {

                        subject.OnNext(".");

                    }

                });

     

                subject.Subscribe(str =>

                    {

                        long i = Interlocked.Increment(ref _counter);

                        if (i % 100000 == 0)

                            Console.Write(str);

                    });

     

                Console.ReadKey();

            }

     

     

    I will release a new version of Rx Contrib in the near future
    I was testing it against the upcoming version (with rx reference to v1.1.11111)

    I will also write a blog series of how to use Rx Contrib in the near future

    see my blog:

    http://blogs.microsoft.co.il/blogs/bnaya/


    Bnaya Eshet
    • Edited by Bnaya Eshet Wednesday, February 01, 2012 7:07 AM
    Wednesday, February 01, 2012 7:04 AM