ReplaySubject causes memory leak
-
Tuesday, January 31, 2012 6:49 PM
I have a program, which I've simplified to the following:
public static void Main(string[] args) { ReplaySubject<string> subject = new ReplaySubject<string>(10); subject.Subscribe(str => Console.WriteLine(str)); Task.Factory.StartNew(() => { while (true) { subject.OnNext("Testing..."); } }); Console.ReadKey(); }
Here, I have a loop that continually publishes messages (without any thread sleeping or interval to better show my point). This works. However, if I switch the order of the Subscribe and the beginning of the publishing, memory usage goes through the roof.
public static void Main(string[] args) { ReplaySubject<string> subject = new ReplaySubject<string>(10); Task.Factory.StartNew(() => { while (true) { subject.OnNext("Testing..."); } }); subject.Subscribe(str => Console.WriteLine(str)); Console.ReadKey(); }How can I subscribe after I begin publishing without getting a memory leak?
Thanks!
Nick Ramirez
All Replies
-
Wednesday, February 01, 2012 1:14 AM
Update:
Maybe Rx doesn't play well with TPL? Using Observable.Interval instead fixed the memory leak.
Observable.Interval(TimeSpan.FromSeconds(1), Scheduler.TaskPool).Subscribe((t) => {subject.OnNext("Testing...");});
Nick
-
Wednesday, February 01, 2012 7:04 AM
Hi Nick
I believe that your test has one problem which is the Console.WriteLine
The write line occurs on a single thread and it taking more time than the
string allocations.
Even those test wasn’t correct your observation was quit correct.
I was running the same test using the replay subject and the QueueSubject
(which is part of my Rx.Contrib open source project http://rxcontrib.codeplex.com/)
and I have found that the replay subject is leaking while the queue subject
has the following benefits:
lower memory footprint
much faster execution
and doesn’t leakCurrently the queue subject does not have the exact behavior of the replay subject
but I consider having such behavior and also adding a threshold behavior (like the one of the BlockingCollection)You can try the following test
private static long _counter = 0;
public static void Main(string[] args)
{
Console.WindowWidth = 50;
Console.BufferWidth = 50;
Console.WriteLine("press any key");
Console.ReadKey();
ISubject<string> subject = QueueSubject<string>.Create();
//ISubject<string> subject = new ReplaySubject<string>(10);
Task.Factory.StartNew(() =>
{
while (true)
{
subject.OnNext(".");
}
});
subject.Subscribe(str =>
{
long i = Interlocked.Increment(ref _counter);
if (i % 100000 == 0)
Console.Write(str);
});
Console.ReadKey();
}
I will release a new version of Rx Contrib in the near future
I was testing it against the upcoming version (with rx reference to v1.1.11111)I will also write a blog series of how to use Rx Contrib in the near future
see my blog:
http://blogs.microsoft.co.il/blogs/bnaya/
Bnaya Eshet- Edited by Bnaya Eshet Wednesday, February 01, 2012 7:07 AM

