locked
Bug in BufferWithTime RRS feed

  • Question

  • var source = new Subject<int>();
    
    
    
    source.BufferWithTime(TimeSpan.FromSeconds(1)).Subscribe(Console.WriteLine);
    
    
    
    source.OnNext(1);
    
    source.OnNext(2);
    
    source.OnNext(3);
    
    source.OnNext(4);
    
    source.OnNext(5);
    
    source.OnNext(6);
    
    

     

    I'm seeing varible output.

    2
    2
    3
    4
    5
    6

    vs

    1
    2
    3
    4
    5
    6

    vs

    1
    1
    2
    3
    4
    5
    6


    James Miles http://enumeratethis.com
    Wednesday, January 26, 2011 12:03 PM

Answers

All replies

  • Thanks for the catch.  We have fixed it.
    Friday, January 28, 2011 9:28 PM
  • Hey James

     

    Did you see any examples for out of order messages? Or just duplicates?

    Monday, January 31, 2011 10:10 AM
  • I've seen duplicates & missing. I'd guess that out of order is possible.
    James Miles http://enumeratethis.com
    Monday, January 31, 2011 10:14 AM
  • Thanks for the catch.  We have fixed it.

    Hi Wes.

    Just double checked with James and he is seeing missing items (potentially with out of order).  Do you have a release date in mind for this? We're currently relying on this method in production, and missing items is a rather major concern. :)

    Monday, January 31, 2011 10:44 AM
  • FYI

    Enumerable.Range(1,100000)
    .Except(
    	Observable.Range(1,100000)
    	.BufferWithTime(TimeSpan.FromSeconds(1))
    	.SelectMany(w => w)
    	.BufferWithCount(int.MaxValue).Single()
    )
    

    James Miles http://enumeratethis.com
    Monday, January 31, 2011 10:56 AM
  • We are targeting the end of this week.  Will that work for you?

    • Proposed as answer by Ray Booysen Tuesday, February 1, 2011 9:06 PM
    Monday, January 31, 2011 8:11 PM
  • We are targeting the end of this week.  Will that work for you?

    Perfect
    Monday, January 31, 2011 10:01 PM
  • Would I be correct in assuming this bug also affects BufferWithTimeOrCount?

    Also, I am seing an issue where calls to Run, or calling .ToEnumerable().ToList() on the output of any of the Observable.Buffer**** calls' results, will cause the code to hang:

    			Subject<int> subject = new Subject<int>();
    
    			subscription = subject
    					.BufferWithTime(TimeSpan.FromSeconds(1))
    					.SubscribeOn(Scheduler.NewThread)
    					.Subscribe(
    						messages =>
    						{
    							var list = messages.ToEnumerable().ToList(); // this line always hangs
    							Console.WriteLine("we did it!");
    							//var list = new List<Message>();
    							//messages.Run(message => list.Add(message)); // this Run call also hangs
    
    							//messageSource.OnNext(list);
    						});
    			for (int i = 0; i < 20; i++)
    				subject.OnNext(i);
    • Edited by JeroMiya Wednesday, February 2, 2011 11:03 PM more concrete example without using my source
    Wednesday, February 2, 2011 10:55 PM
  • There is a similar problem (not the same) with BufferWithTimeOrCount.  That has been fixed as well and will be released very soon.

    As to the Blocking issue, we know about it and users should be careful to avoid it.  In fact, what you describe is a way for a user to create deadlock in observable sequences.  I will describe under what circumstances this can happen and what users can do to avoid it.

    If an observable sequence is a sequence of observable sequences then the subscriber must do two things:

    (1)  Subscribe to the inner sequence within the message send from the outer subscription

    (2)  Must not block messages from the outer subscription

    For example, if you use Buffer*, GroupBy, Window, Join, or GroupJoin then you get IO<IO<T>>.  In order to get all of the results you must follow (1).  This is generally quite easy since all operators follow (1) by default.  Here is an example of what a user can do in the final subscription:

    xss.Subscribe(xs => xs.Subscribe(x => F(x)))

    Second, you must not block sending more windows.  In particular, it is bad to do this:

    xss.Subscribe(xs => xs.ToEnumerable().ToList())

    The ToEnumerable.ToList conversion causes xs to try and get all of the members of xs.  Suppose xss is produced by GroupBy then in order to get all of the members the rest of the sequence must be examined, but you are currently blocking the rest of the sequence (contradiction!).

    There are a number of ways to get around this, but perhaps the easiest is to use an aggregate function.  In the next release, we have already included a ToList operator that takes an observable sequence and returns an observable sequence that includes a single list.  Then to get the desired result simply do the following:

    xss.SelectMany(xs => xs.ToList()).Subscribe(xs => { foreach (var x in xs) ... })

    We also plan to change Buffer* back to return IObservable<IList<T>> and instead include Window* that returns IObservable<IObservable<T>>.  In fact, Buffer* will just delegate to Window* and add the necessary SelectMany.

    Wednesday, February 2, 2011 11:15 PM
  • Perhaps, I should add that the only problem with not following (1) is that if you subscribe to an inner observable later then you may miss some of its values.  That is because while the entire observable is cold, the inner observable is hot *in the context of* the outer observable's message.  In it guaranteed that you won't miss any messages if you subscribe within the message as I describe above.  The only alternative would be to make every inner observable replayable, but that would be a huge memory hit and if people need that then they can add replayableness to the inner observables themselves since that satisfies (1) & (2).

    Wednesday, February 2, 2011 11:21 PM
  • Thank you very much Wes! However, while this solves the deadlock, it causes a racing condition:

    			var parsedMessages = UnparsedMessages.SelectAsync(
    				(msg, cancellationToken) => ParseMessage(msg, cancellationToken));
    
    			subscription = parsedMessages
    					.BufferWithTimeOrCount(TimeSpan.FromSeconds(1), 100)
    					.Subscribe(
    						messages =>
    						{
    							var list = new List<Message>();
    							messages.Subscribe(
    								message => list.Add(message),
    								() =>
    								{
    									if (list.Count > 0)
    										messageSource.OnNext(list);
    								});
    						});
    

    I don't think the above code is gauranteed to deliver the List<Message> to messageSource in the same order as they are received from the IO returned by BufferWithTimeOrCount. The second inner IO could get processed and sent before the first batch is finished accumulating. In this case I *want* the inner OnNext lamda to block the delivery of the next IO<Message>. The order of the messages is important.

    I'm not sure how to get around this since I can't queue up IObservable<T> subscriptions and "wait" on them in the proper order. Calling ToTask() on the inner IO, adding the tasks to a queue, and waiting on each task in order triggers the deadlock, even if I'm waiting for them in a different thread. Any ideas?

    Also, quick question, why does BufferWithTime and BufferWithTimeOrCount trigger an OnNext every interval when there were no messages from the source IObservable? It seems like it should only send an IObservable down the pipe if there was data coming in during that interval. It's not a terrible inconvenience - I was just curious if there were a deeper reasoning behind it.

    Thursday, February 3, 2011 2:57 PM
  • The next release will provide an operator called ToList.

    IObservable<IList<T>> ToList<T>(this IObservable<T> source)

    When you use this in combination with a windowing operator it will pump out the contents of a window when the window closes.  So the list will appear in the order of the closing time of the window.  This is similar to how it worked with the original Buffer* implementations.

    You can achieve the result of pumping windows out in the order they appear.  The new release will also have a Concat operator:

    IObservable<T> Concat<T>(this IObservable<IObservable<T>> source)

    Using these together (along with Replay) you can do the following:

    WindowWithTimeOrCount(TimeSpan.FromSeconds(1), 100).Select(w => w.ToList().Replay()).Concat()

    Friday, February 4, 2011 9:35 PM
  • Thanks for the catch.  We have fixed it.

    Hi Wes

     

    I'm seeing weird behaviour with the latest release.  With the above example, BufferWithTime never OnNexts an IList<T> with values (Count is always zero), and BufferWithCount duplicates values again.

    With BufferWithCount I see results where we OnNext 2 items, 2 items, 2 items and then the entire sequence again (6 items).

     

     

    Monday, February 14, 2011 9:15 AM
  • I was able to repro what you are seeing.  The reason that you are seeing that behavior is because the latest release creates windows and subscribes to the source on the scheduler which potentially causes it to happen much later than when subscribe returns.  So you should only see the difference if you immediately start pumping values into a hot observable.

    I changed the code to subscribe immediately although this means that the first message from a Window operator will always come during subscription.  In light of this, I think we will discuss what should happen here and any change will be included in the next release.

    • Proposed as answer by Ray Booysen Monday, February 14, 2011 10:33 PM
    Monday, February 14, 2011 5:51 PM