none
ThrottleAfterFirst - code review please? RRS feed

  • Question

  • Hi,

    I needed a version of Throttle that let the first value through immediately, and only throttled subsequent values. The use case being that we use RX inside a GUI to send selection events between views, and want the first click to be instantly responsive, but if the user is scrolling up in a grid or something, we don't want to bomb all the subscribers with intermediate values.

    So I took a look at the existing Throttle() method, using R# decompiler as a basis (and I really don't think the Decompiler worked properly as it looks like it never stores anything into the 'value' variable - which means the code wouldn't work!).

    I'd be really grateful for any feedback on my code as to whether it might have some latent problems? It seems to work ok in the UI, but this type of code is hard...

    The premise is that I keep a counter, which I increment every time I get an OnNext. If the counter is zero then I call OnNext on the observer immediately. I then increment counter and start a scheduled event.  When this fires, if the count is 1 then I know I don't need to OnNext as the value was already fired (as it was the first), otherwise I do a couple of checks and fire the OnNext and reset the counter.

            /// <summary>
            /// Throttles values after the first is published. Useful for getting immediate notification of first action for a responsive UI.
            /// </summary>
            /// <typeparam name="TSource"></typeparam>
            /// <param name="source"></param>
            /// <param name="dueTime"></param>
            /// <param name="scheduler"></param>
            /// <returns></returns>
            public static IObservable<TSource> ThrottleAfterFirst<TSource>(this IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)
            {
                if (source == null)
                    throw new ArgumentNullException("source");
                if (scheduler == null)
                    throw new ArgumentNullException("scheduler");
     
                return Observable.Create<TSource>(obs =>
                {
                    var gate = new object();
                    ulong count = 0;
                    TSource value = default(TSource);
                    var cancelable = new SerialDisposable();
     
                    var disposable = source.Subscribe(
                    x =>
                    {
                        ulong currentCount;
                        lock (gate)
                        {
                            if (count == 0)
                                obs.OnNext(x);
                            else
                                value = x;
     
                            ++count;
                            currentCount = count;
                        }
     
                        var singleDisp = new SingleAssignmentDisposable();
                        cancelable.Disposable = singleDisp;
                             
                        singleDisp.Disposable = scheduler.Schedule(dueTime, () =>
                        {
                            lock (gate)
                            {
                                if (count == currentCount && count > 1)
                                    obs.OnNext(value);
     
                                count = 0;
                            }
                        });
                    },
                    ex =>
                    {
                        cancelable.Dispose();
                        lock (gate)
                        {
                            obs.OnError(ex);
                        }
                    },
                    () =>
                    {
                        cancelable.Dispose();
                        lock (gate)
                        {
                            if (count > 1)
                                obs.OnNext(value);
                            obs.OnCompleted();
                        }
                    });
                    return new CompositeDisposable(disposable, cancelable);
                });
            }

    Tuesday, October 23, 2012 12:01 PM

All replies

  • The currentCount bit is based on Throttle code, but I confess I don't think I really grokked what it was there for in Throttle.

    I guess its to deal with a race around the timer, whereby its firing just as another one replacing it via the serialdisposable. If thats right I think that means the count = 0 should perhaps be inside the if statement.

    edit: Just made this change to my code as otherwise I believe it would mean that I'd lose a message if the race condition occurs around the timer:

                                if (count == currentCount && count > 1)
                                {
                                    obs.OnNext(value);
                                    count = 0;
                                }

    • Edited by Dan Harman Tuesday, October 23, 2012 12:35 PM
    Tuesday, October 23, 2012 12:17 PM
  • Hi,

    Try the following query:

    source.Publish(p => p.Take(1).Concat(p.Throttle(duration)));

    - Dave


    http://davesexton.com/blog

    Tuesday, October 23, 2012 3:07 PM
  • Hi,

    Try the following query:

    source.Publish(p => p.Take(1).Concat(p.Throttle(duration)));

    - Dave


    http://davesexton.com/blog

    Hi Dave,

    Thanks for replying. Does this approach not cause a dupe of the first value to materialise after duration as its going through the throttle too?

    n.b. assuming that is true, a .DistinctUntilChanged() wont' work as we have users reclicking rows to force other views to go back to the previous selection if they navigated off. At the same time, we do not want to dupe the first selection.

    Dan




    • Edited by Dan Harman Tuesday, October 23, 2012 3:33 PM
    Tuesday, October 23, 2012 3:32 PM
  • Hi Dan,

    > Does this approach not cause a dupe of the first value to materialise after duration as its going through the throttle too?

    Try it out and you tell me ;)

    - Dave


    http://davesexton.com/blog

    Tuesday, October 23, 2012 4:05 PM
  • Nice one Dave! I have not used that operator before.

    @Dan, from the Intellisense docs

    "Selector function which can use the multicasted source sequence as many times as needed, without causing multiple subscriptions to the source sequence."

    var source = Observable.Interval(TimeSpan.FromMilliseconds(100))
      .Take(10)
      .Concat(
        Observable.Interval(TimeSpan.FromMilliseconds(150))
          .Take(4)
          .Select(i=>i+20));
    
    source.Publish(
      src=>
      {
        return src.Take(1)
            .Concat(
              src.Throttle(TimeSpan.FromMilliseconds(110)));
      })
      .Dump();

    The output is 0,1,4,6,920,21,22,23

    i.e. the single 0 tells you it is not subscribed to twice as Interval is a cold observable (hence why you are publishing it ) :-)


    Lee Campbell http://LeeCampbell.blogspot.com


    • Edited by LeeCampbell Tuesday, October 23, 2012 4:50 PM Added code example
    Tuesday, October 23, 2012 4:48 PM
  • Hi Dan,

    > Does this approach not cause a dupe of the first value to materialise after duration as its going through the throttle too?

    Try it out and you tell me ;)

    - Dave


    http://davesexton.com/blog

    Hi Cryptic like Yoda ;)

    Ok, so I've tried it. Doesn't work!

    However I'd like to understand how the Take(1) is preventing the first value hitting the throttle? This is a hot observable so I don't really understand how that is working. Very keen for an explanation if you wouldn't mind?

    The problem with this approach is that its not resetting after having throttled a selection event. Everything is throttled after the first click. Whereas it needs to reset after the the throttling window expires so that subsequent user selections don't there after get delayed if they aren't rapidly chaning.

    Dan

    Tuesday, October 23, 2012 5:03 PM
  • Hi Dan,

    Sorry for being "cryptic".  I was trying to help you help yourself.  You know, teach a man to fish...  :)

    > Ok, so I've tried it. Doesn't work!

    You mean that you are getting a duplicate value or that it doesn't meet your additional unspecified requirements?  Please post your test code.  A short but complete example that reproduces the problem would be great.

    > However I'd like to understand how the Take(1) is preventing the first value hitting the throttle?

    Concat defers the subscription to the observable passed to the right.  Consider its semantics: When the observable on the left completes, continue the sequence by subscribing to the observable on the right.

    > This is a hot observable so I don't really understand how that is working. [snip]

    The source is a hot observable either because it's already hot or because you've used the Publish operator as I've shown.

    First, Concat subscribes to the Take(1) query.  After a value is observed, the Take operator calls OnCompleted.  That causes Concat to subscribe to the Throttle query.  Given that the source is hot, the first value has already passed, thus it will not be observed by Throttle.

    > The problem with this approach is that its not resetting after having throttled a selection event.
    > Everything is throttled after the first click.  Whereas it needs to reset [snip]

    Well that's a bit different than your original specification:

    "I needed a version of Throttle that let the first value through immediately, and only throttled subsequent values."

    However, I believe your new requirement can be accomplished using existing operators, if I've understood you correctly.

    For example, try the following:

    source.Publish(p => p.Take(1).Concat(p.Throttle(duration).Take(1)).Repeat());

    - Dave


    http://davesexton.com/blog

    Tuesday, October 23, 2012 5:53 PM
  • Hi guys,

    Sorry for seemingly abandoning this thread. Just had a release to deal with!

    I really like the approach you are suggesting, and sorry for not explaining the reqs well enough.

    One thing that concerns me with the 

    source.Publish(p => p.Take(1).Concat(p.Throttle(duration).Take(1)).Repeat());

    approach is that we may drop a message during the resubscribe in the concat since the source is a hot observable of user generated clicks/button press events?

    Also that by doing this concat we are always forcing a throttle after the first OnNext, so we end up having every 2nd operation delayed by throttle if they are sufficiently spaced.

    What I'm trying to do here is replicate throttle, but have it push through the first notification immediately, then get to its throttling business only for so long as the throttle interval is set.

    So say we have an interval of 2 units. I expect the following marbles:

    s| - 0 - 1 - 2 - -
    r| - 0 - - - - - 2
    
    s| - 0 - - 1 - - 2 - - -
    r| - 0 - - 1 - - 2 - - -
    
    s| - 0 - - 1 - 2 - 3 - -
    r| - 0 - - 1 - - - - - 3
    

    Wednesday, October 31, 2012 2:29 PM
  • It seems Dave's original suggestion aligns well with the original requirements and your latest reiteration of the problem and desired behavior. As for messages dropped by the Concat, this shouldn't be a problem. E.g. when Take(1) gets its first message, it will synchronously propagate OnNext and OnCompleted to its observer, which is Concat. In response to the OnCompled, the Concat operator will subscribe to the next source. Notice all of this switching happens during the source's OnNext callback... The source is blocked and cannot send new notifications during this time.

    By the way, a good way to state the desired intent is by using the Rx test framework as outlined in Bart's post on the subject. This way you can simply run the tests against solutions posted on the forum and reply with any test assert failures that fire when the specification isn't met.

    • Edited by Rx team Monday, November 5, 2012 3:35 AM Broken link
    Monday, November 5, 2012 3:34 AM
  • It seems Dave's original suggestion aligns well with the original requirements and your latest reiteration of the problem and desired behavior. As for messages dropped by the Concat, this shouldn't be a problem. E.g. when Take(1) gets its first message, it will synchronously propagate OnNext and OnCompleted to its observer, which is Concat. In response to the OnCompled, the Concat operator will subscribe to the next source. Notice all of this switching happens during the source's OnNext callback... The source is blocked and cannot send new notifications during this time.

    By the way, a good way to state the desired intent is by using the Rx test framework as outlined in Bart's post on the subject. This way you can simply run the tests against solutions posted on the forum and reply with any test assert failures that fire when the specification isn't met.

    Hi,

    Thanks for the explanation of why there isn't a race.

    I have done some RX tests before, but confess I was trying to avoid it in this case, but seems it might be necessary as I really don't think the behaviour of Dave's solution is the same as what I'm describing! This is because the throttle is taking the 2nd value from the stream and will stay around until it has emitted 1 value, even if that 2 hours later. So that means my 2nd mouse click is always going to be throttled.

    i.e. I think its going to do this where throttle interval is 2 units again:

    s| - 0 - - - - - - 1 - 
    r| - 0 - - - - - - - -1

    which means delaying the users 2nd mouse click for the throttle amount after an arbitrary time. It should be doing this:

    s| - 0 - - - - - - 1 - 
    r| - 0 - - - - - - 1 -

    Potentially some kind of timeout around the throttle might address this I guess, although wouldn't want it to OnError, and it would need to not fire if the throttle is currently throttling messages.

    There is also the subtle difference with Dave solution, that since the Throttle is not seeing the first value it isn't starting the throttle window for subsequent messages immediately, only once it gets a 2nd message (which also ties into the prev point).



    • Edited by Dan Harman Tuesday, November 6, 2012 6:26 PM
    Tuesday, November 6, 2012 6:14 PM
  • If we had a throttle type operator that completed after its interval had expired as long as no data was coming through then that would make this a lot easier I think. Would be less divergent from the rx Throttle code than my operator to implement as well, so less chance to introduce subtle threading probs.

    Could even start firing into the into this new type throttle at the same time as the first value arrived if the input observable was wrapped with a sequence number and a distinct until changed was used to throw away any first values coming out of the throttle (which would have already been delivered by the first Take(1)).





    • Edited by Dan Harman Tuesday, November 6, 2012 6:41 PM
    Tuesday, November 6, 2012 6:39 PM