Observable.Generate Memory leak, or somethin I'm doing wrong.

Unanswered Observable.Generate Memory leak, or somethin I'm doing wrong.

  • jeudi 12 juillet 2012 15:43
     
      A du code

    Hi,

    I'm having an issue with Observable.Generate

    The following is my code:

    Observable.Generate<double, double>(0,       //initial condition
                                    sampleNo => !EndNow, //when To stop
                                    sampleNo => (sampleNo >= SampleRate) ? 0 : sampleNo + 1,    //increment sampleNo
                                    sampleNo => GenerateSample(sampleNo),     //Generate Sample
                                    //sampleNo => new TimeSpan((long)(100000 / SampleRate)),
                                    NewThreadScheduler.Default
                                    )
                                    .Buffer(1024) 
                                    .Subscribe(data =>   //subscribe subject
                                    {
                                        subjLeft.OnNext(data.ToArray());
                                        subjRight.OnNext(data.ToArray());
                                    }); 

    Working Version (Long Running  Task instead of Observable.Generate)

     Task.Factory.StartNew(() =>
                    {
                        int sampleNo = 0;
                        while (!EndNow)
                        {
                            sampleNo= (sampleNo >= SampleRate) ? 0 : sampleNo + 1;
                            if (dataevnt!=null)
                                dataevnt.Invoke(this, new doubleargs() { data = GenerateSample(sampleNo) });
                        }
                    }, TaskCreationOptions.LongRunning);
    
                Observable.FromEventPattern<doubleargs>(h => dataevnt += h,
                                                        h => dataevnt -= h)
                                                        .Select(ev => ev.EventArgs.data)
                                                        .Buffer(1024)
                                                        .Subscribe(data =>
                                                        {
                                                            subjLeft.OnNext(data.ToArray());
                                                            subjRight.OnNext(data.ToArray());
                                                        });

    The Observable.Generate causes memory usage to grow to 2Gb in a matter of minutes,

    Task version hovers between 25-38k. Any ideas what could be going on with Observable.Generate version.

Toutes les réponses

  • vendredi 13 juillet 2012 14:33
     
     

    I didn't see anything wrong with your code, and only suspected that potentially the NewThreadScheduler was a culprit. However, when I run this code on my machine in a console app, the Observable.Generate version has the same memory characteristics and actually uses less CPU.

    Maybe you are missing something from the code you have provided? I assume that the subjLeft and subjRight are Subject<double[]> in both cases and that the memory problem version isn't ReplaySubject<double[]> etc..


    Lee Campbell http://LeeCampbell.blogspot.com

  • vendredi 13 juillet 2012 21:57
     
     

    Hi Lee,

    Thanks for the reply.

    subjLeft  & subjRight are just subject<double[]>.

    There is a whole pipeline after these, with some custom operators, however I would have imagined that if the rest of the pipeline were to blame, changing this code would not fix the issue.

    I'm not using a replaysubject within the pipeline ( thought I am using Publish().Refcount() ), however again these are in place regardless of which version of the source I use.

    Strangely, Just tried it on my home computer & no memory issues! hmmm

    Will try it again in work on monday.

     

  • samedi 14 juillet 2012 08:49
     
     

    So there is still some mystery. I am interested to hear what the cause is.

    As a side note, why do you have the two subjects? Would it not be easier to Publish the stream and just subscribe to it twice. Or, if the subjects, get values from other places too, then merge those sources with this published stream. I find generally that using subjects to be a code smell. You now have to know how they are consumed and how they are published. As you can have multiple writers it can be harder to follow the sequence. Having read-only sequences (IObservable) and then write only consumers (IObserver/Subscribe extension methods) allows for your query to have good locality of concerns. Ideally your query would compose all of the sequences together in one place.  Other devs can now clearly see the intent of the code.

    My $0.02

    Lee


    Lee Campbell http://LeeCampbell.blogspot.com

  • dimanche 15 juillet 2012 18:13
     
     

    Hi Lee,

    I'm only using the Subjects at the sources, not exposing them to the outside world. The semantics I was trying to achieve being a StartableObservable rather than ConnectableObservable, so subscription & connecting would all have been done before the observable start pushing values.

    The reasoning for having 2 subjects is that this is an audio application, so one for each channel. This was to match how I had written other types of source. I've changed how I've implemented other sources too, so this argument is no longer valid!

    I'm currently having deadlock issues in both the signal generator and the wavout observer. So I'm re-examining the architecture from the ground up, so should get rid of the subjects on the source side at least.