locked
Problem with .Timestamp() RRS feed

  • Question

  • Hello,

    Why is it that this code works fine:

    var fs = new Func<int, string>[] {o => "First: " + o, o => "Second: " + o};
    
    Observable.Range(0, 20 * fs.Length, Scheduler.TaskPool)
    	.GroupBy(o => o % 2, o => o / fs.Length * 10000 / 20)
    	.ToEnumerable()
    	.Zip(fs, (sch, f) => new { f, sch })
    	.ForEach(o => o.sch.Select(o.f).Subscribe(Console.WriteLine));
    	
    Thread.Sleep(1000);
    

    While as soon as I introduce .Timestamp() like so:

    var fs = new Func<int, string>[] {o => "First: " + o, o => "Second: " + o};
    
    Observable.Range(0, 20 * fs.Length, Scheduler.TaskPool)
    	.GroupBy(o => o % 2, o => o / fs.Length * 10000 / 20)
    	.ToEnumerable()
    	.Zip(fs, (sch, f) => new { f, sch = sch.Timestamp()})
    	.ForEach(o => o.sch.Select(v => new { result = o.f(v.Value), Elapsed = (DateTimeOffset.Now - v.Timestamp).TotalMilliseconds }).Subscribe(Console.WriteLine));
    	
    Thread.Sleep(1000);
    

    I do not get a single result back.

    I also tried passing a scheduler to Timestamp to no avail.

    Thanks for any help,

    David

    Wednesday, July 27, 2011 1:11 AM

Answers

  • Hi David,

    The query should work fine if you remove ToEnumerable.  Any particular reason why you added it?

    I was able to repro not just with Timestamp but other Rx operators as well; e.g., Select.

    Furthermore, even the "working" query seems to skip values - once it didn't produce any values at all for me, without Timestamp.  Clearly there's some sort of race condition here.

    When ForEach begins iterating the IEnumerable query, it causes Range to start pushing values as fast as possible on a thread-pool thread.  GroupBy then pushes these values into new observables that are hot and do not replay notifications.  Since your query hasn't actually subscribed to these new observables yet, it's possible that you'll miss all of the values (as I've observed myself).  Apparently for you, adding the Timestamp operator provides enough of a subscription delay that it's causing you to miss all of the values.

    Try keeping your query entirely in the reactive world without switching to IEnumerable and you won't have this problem.

    - Dave


    http://davesexton.com/blog
    • Marked as answer by David Grenier Thursday, July 28, 2011 8:53 PM
    Wednesday, July 27, 2011 4:10 AM

All replies

  • Hi David,

    I suspect that the problem with your code is that you are partially jumping from observables to enumerables. You should stay in one world or the other.

    If you change your code like so you still get values:

    var fs = new Func<int, string>[] {o => "First: " + o, o => "Second: " + o};
    
    var fso = fs.ToObservable();
    
    Observable.Range(0, 20 * fs.Length, Scheduler.TaskPool)
    	.GroupBy(o => o % 2, o => o / fs.Length * 10000 / 20)
    	.Zip(fso, (sch, f) => new { f, sch = sch.Timestamp()})
    	.ForEach(o => o.sch.Select(v => new { result = o.f(v.Value), Elapsed = (DateTimeOffset.Now - v.Timestamp).TotalMilliseconds }).Subscribe(Console.WriteLine));
    

    However, the elapsed time is all zero for me, so I further suspect that your query isn't quite right in any case.

     


    James C-S
    Wednesday, July 27, 2011 4:04 AM
  • Hi David,

    The query should work fine if you remove ToEnumerable.  Any particular reason why you added it?

    I was able to repro not just with Timestamp but other Rx operators as well; e.g., Select.

    Furthermore, even the "working" query seems to skip values - once it didn't produce any values at all for me, without Timestamp.  Clearly there's some sort of race condition here.

    When ForEach begins iterating the IEnumerable query, it causes Range to start pushing values as fast as possible on a thread-pool thread.  GroupBy then pushes these values into new observables that are hot and do not replay notifications.  Since your query hasn't actually subscribed to these new observables yet, it's possible that you'll miss all of the values (as I've observed myself).  Apparently for you, adding the Timestamp operator provides enough of a subscription delay that it's causing you to miss all of the values.

    Try keeping your query entirely in the reactive world without switching to IEnumerable and you won't have this problem.

    - Dave


    http://davesexton.com/blog
    • Marked as answer by David Grenier Thursday, July 28, 2011 8:53 PM
    Wednesday, July 27, 2011 4:10 AM
  • This is the problem I'm having... I must pass this to the .NET 4.0 charts (wrapped with FSharpChart) and it only accepts enumerables.

    I need to be able to produce two schedules for two different functions and alternate between each of them.

    I.E. I may be profiling x functions calling them each in a row with the same values (x1(0), x2(0), x3(0), x1(1000), x2(1000), etc) but need three distinct observables to subscribe to. I've been trying different approaches in order to not have any of the functions running in parallel yet have each observable producing a value only once the previous function call is completed.

    I'm well aware I'm not explaining myself properly, so let me know if I should start from scratch.

    @James the functions for which those are called above do not perform anything computationally intensive which is why the Elapsed is zero. Production code would be different.

    @David Note that I do understand why I'm loosing value with ToEnumerable. I guess I should call a publish somewhere and after getting the enumerable call connect somehow, but I'm sure there's a simpler way.
    Wednesday, July 27, 2011 3:16 PM
  • Hi David,

    >  I must pass this to the .NET 4.0 charts (wrapped with FSharpChart) and it only accepts enumerables.

    Then what you really need is an interactive query, not a reactive query.

    > I need to be able to produce two schedules for two different functions and alternate between each of them. [snip]

    The term "alternate" implies pulling (interactive) while the term "schedule" implies pushing (reactive).  Therefore, why do you need scheduling at all if your chart control is going to be pulling the data anyway?

    > I've been trying different approaches in order to not have any of the functions running in parallel
    > yet have each observable producing a value only once the previous function call is completed.

    So why not just use an IEnumerable query to begin with?  That's what you're describing.

    - Dave


    http://davesexton.com/blog
    Wednesday, July 27, 2011 11:59 PM
  • The charting supports IObservable and displays the values as they are provided.
    Thursday, July 28, 2011 1:51 AM
  • Hi David,

    > I must pass this to the .NET 4.0 charts (wrapped with FSharpChart) and it only accepts enumerables.

    > The charting supports IObservable and displays the values as they are provided.

    I don't understand.  Aren't these conflicting statements?

    - Dave


    http://davesexton.com/blog
    Thursday, July 28, 2011 2:12 AM
  • You're right but I didn't express myself properly.

    The library accepts an enumerable of line chart each fed by an observable.

    Anyway I figure the problem isn't with Rx, I coded another version using subjects and although it works fine when subscribing a Console.WriteLine statement, nothing happens when fed into the Line Chart.

    Thanks for the help!

    David

    Thursday, July 28, 2011 3:19 PM
  • Hi David,

    Well, the problem that James and I have identified with your query is real.  I was able to repro the unpredictable behavior in a simple console application.  There's definitely a race condition between iterating the interactive part of the query and the concurrency introduced for the hot observables that are generated in the reactive part of the query.

    Note that the TaskPool scheduler in this case uses a single thread-pool thread to create all of the observable groups.  Each group doesn't get its own thread.  If you need parallelization then consider using PLINQ instead.

    Your actual requirements aren't clear to me, but it seems like what you really need is a hot enumerable that contains cold observables.

    Perhaps an appropriate solution is to start with Enumerable.Range and GroupBy and then project each value into a cold observable.

    - Dave


    http://davesexton.com/blog
    Thursday, July 28, 2011 3:51 PM
  • Hi Dave,

    Thanks for all the help.

    I just figured the problem was with where I was calling connect... I was actually trying to call it as late as possible but before the graph was being render while I should've done the opposite. That is, call connect after rendering the graph.

    I also found a more elegant solution without using subjects:

    var fs = new Func<int, string>[] {x => "First: " + x,x => "Second: " + x,x => "Third: " + x};
    
    var obs = Observable.Range(0, 20 * fs.Length, Scheduler.TaskPool).Publish();
    var max = 30000;
    
    fs.Select((f, i) =>
    	obs.Where(o => o % fs.Length == i)
    	.Select(o => o / fs.Length * max / 20)
    	.Timestamp()
    	.Select(o => {f(o.Value); return new { o.Value, Elapsed = (DateTimeOffset.Now - o.Timestamp).TotalMilliseconds };}))
    .ForEach(o => o.Subscribe(Console.WriteLine));
    
    obs.Connect();
    
    Thread.Sleep(2000);
    


    I don't see why, after the groupby which returns an observable of observables, calling toEnumerable on that should change the behavior of the underlying observables... but I guess it should behave like that because the sub-observables are somwhat subscribed to the underlying observable which isn't an observable anymore.

    Thank you very much!

    David

    Thursday, July 28, 2011 8:29 PM
  • Hi David, 

    > I don't see why, after the groupby which returns an observable of observables, calling toEnumerable on that should change
    > the behavior of the underlying observables

    It doesn't change the behavior of the inner observables - they remain as hot observables.

    There are two rules in particular that apply to your issue.  The first is in the Rx Design Guidelines.  And the second, although it probably should be in the Rx Design Guidelines as well, is actually just a comment from Wes in this forum.

    1. §4.2: Assume observer instances are called in a serialized fashion; i.e., notifications in Rx are single-threaded.  This means that when the outer observable calls OnNext(IObservable<T>>), it won't call OnNext again until after it returns; furthermore, it won't start pushing values into the inner observable until after OnNext returns.
    2. Wes's forum comment: (1)  Subscribe to the inner sequence within the message send from the outer subscription; i.e., whenever you have an IObservable<IObservable<T>>, you must subscribe to the inner observables during each call to OnNext.  This guarantees that you will not miss any notifications.

    In your original query, when ForEach called GetEnumerator it caused ToEnumerable to subscribe to the outer observable.  Observable.Range started pushing values on a thread-pool thread.  Observable.GroupBy projected these values into hot observables.  ToEnumerable inserted a buffer to ensure that ForEach didn't miss any of the inner observables; however, sometimes your query failed to Subscribe to them synchronously, and since they're hot it sometimes missed values.  Adding even a small delay to the subscription, such as adding an operator like Timestamp to the query, increased the subscription time sufficiently that it missed values being pushed in by the thread-pool thread.

    Your new solution seems much better, although I still wonder if the query should just be changed as I had recommended: In terms of IEnumerable projecting IObservables, instead of an IObservable projecting IObservables and converting the outer sequence into IEnumerable.

    - Dave


    http://davesexton.com/blog

    Friday, July 29, 2011 3:35 AM