none
RX and simple metronone - stop after x ticks

    Question

  • Hi all,

    I just started learning RX and I already stumbled upon simple case which I cannot solve. I'm porting simple music application to use RX (if you're Win8 user you can find it here http://apps.microsoft.com/windows/pl-pl/app/rhythm-trainer/86626ba4-21ab-49eb-8dfd-9b1221286a87). I want to implement Metronome class with following contract:

    Play(TimeSpan interval) - start ticks with given interval

    Play(int barCount) - if already playing play additional count of bars and stop

    Stop() - stop immediately.

    Here's my implementation (simplified):

            private readonly Subject<TimeSpan> _startSubject;
            private readonly Subject<int> _stopRequestedSubject;
            private readonly Subject<long> _stopSubject;
            private readonly IObservable<long> _metronome;
    
            public Metronome(IScheduler scheduler)
            {
                _startSubject = new Subject<TimeSpan>();
                _stopSubject = new Subject<long>();
                _stopRequestedSubject = new Subject<int>();
    
                _metronome =
                    _startSubject
                    .Select(x => Observable.Interval(x, scheduler).TakeUntil(_stopSubject))
                    .Concat()
                    .Repeat();
    
                _stopRequestedSubject
                    .SelectMany(_metronome.Skip)
                    .Repeat()
                    .Subscribe(r => _stopSubject.OnNext(r));
    
                _metronome.Subscribe(DoTick);
            }
    
            private void DoTick(long tickNumber)
            {
    
            }
    
            public void Start( double bpm)
            {
                var delay = TimeSpan.FromMilliseconds(60000.0d / (bpm));
                _startSubject.OnNext(delay);
            } 
    
            public void Stop()
            {
                _stopRequestedSubject.OnNext(4);
            }

    The problem with above is that Stop(int barCount) does not stop metronome after barCount ticks played.

    Could you direct me where's the problems it that code or suggest other solution that will satisfy described requirements?

    Tanks for help!

    Artur


    Friday, October 04, 2013 8:13 PM

Answers

  • Hi Artur,

    1. The _metronome observable is hot.  A symptom is that when you subscribe, you may have already missed notifications.
    2. The _metronome observable is active.  Each time that you subscribe you get an observable that duplicates significant computation side-effects.

     

    You're incorrectly assuming that _metronome is passive and so it's shared by multiple subscriptions because it's hot.

    See my blog post on Hot and Cold Observables for an explanation of these terms.

    How this applies to your query specifically:

    1. You initiate the query by notifying _startSubject and it creates a new Interval observable.  Then, you notify _stopRequestedSubject, causing it to subscribe to _metronome, though because _metronome is hot the _startSubject notification and the projected Interval observable were missed.  As a result, _stopRequestedSubject doesn't see any ticks from Interval.  It simply waits for _startSubject to be notified again, thus _stopSubject is never notified.
    2. Each observer gets its own _metronome observable with a Select that projects a new Interval, which means that _metronome is active.  As a result, using the same reference to the _metronome observable within multiple queries and/or subscribing to it directly multiple times causes multiple Interval observables to be created whenever _startSubject is notified.

    The first point is easily proven by the fact that calling Stop has no effect.

    The second point is proven by calling Start once again after calling Stop.  This creates two additional Interval observables: one for your direct _metronome subscription and one for the subscription in the _stopRequestedSubject query.  Now there are three active Interval observables.

    Calling Start again after Stop also triggers the _stopRequestedSubject query as expected, so the first Interval does in fact stop; however, the second Interval is activated due to your use of the Concat operator.  You must call Stop a second time to stop the second Interval; however, as shown, calling Stop has no effect until you call Start again, which creates two additional Interval observables; therefore, it's a never-ending cycle.

    Ignoring those problems for a moment, your query's semantics with Concat are that you must call Stop each time that you call Start.  This is probably not the desired behavior of your class anyway.  It's probably safest for callers if you specify a contract that disallows calling Start multiple times without calling Stop in between.  It will also simplify your query.

    You can generally debug these issues by placing the Do operator in logical places to see if/when segments of the query are being notified.  For instance, the second point is proven by adding Do to _metronome in two places:

    var _metronome =_startSubject
        .Do(_ => Console.WriteLine("Creating Interval"))
        ...;
    _stopRequestedSubject
        .SelectMany(_metronome.Do(_ => Console.WriteLine("Skipping")).Skip)
        ...;

    You'll see that calling Stop has absolutely no effect on the printed output.  Then, calling Start again causes "Creating Interval" to be printed twice.  Also, "Skipping" begins printing until the first Interval is stopped; however, the second Interval is also activated.  Calling Stop a second time does nothing unless you follow it with another call to Start, thus proving that the query is unstoppable.

    Repeat has no effect in any of your queries since apparently you're not calling OnCompleted on any of the subjects.

    Furthermore, you're probably using too many subjects in the first place.  See my blog post on To Use Subject or Not To Use Subject for details.

    I'd start designing this class by exposing the Start/Stop contract that I mentioned previously.  Also, it's important to realize that your implementation is not thread-safe, so I'm going to assume that's acceptable and design the following example without regard to thread-safety either.

    class Metronome
    {
        public bool IsRunning
        {
            get
            {
                Contract.Ensures(Contract.Result<bool>() == running);
    
                return running;
            }
        }
    
        private bool running;
    
        public void Start(TimeSpan rate)
        {
            Contract.Requires(rate > TimeSpan.Zero);
            Contract.Requires(!IsRunning);
            Contract.Ensures(IsRunning);
    
            running = true;
        }
    
        public void Stop()
        {
            Contract.Requires(IsRunning);
            Contract.Ensures(!IsRunning);
    
            running = false;
        }
    }

    (I'm using Code Contracts for my examples, though of course you could use if...throw instead if you'd prefer.)

    Finally, since you apparently want to be stateful, I'd fill in the implementation by storing only the subscription to an observable and I'd recreate that observable every time Start is called.

    using System;
    using System.Diagnostics.Contracts;
    using System.Reactive.Disposables;
    using System.Reactive.Linq;
    
    namespace RxLabs.Net45
    {
        public sealed class Metronome : IDisposable
        {
            public bool IsRunning
            {
                get
                {
                    Contract.Ensures(Contract.Result<bool>() == running);
    
                    return running;
                }
            }
    
            public bool IsStopping
            {
                get
                {
                    Contract.Ensures(Contract.Result<bool>() == (stopAfterCount != ignoreStopAfterCount));
    
                    return stopAfterCount != ignoreStopAfterCount;
                }
            }
    
            private const int ignoreStopAfterCount = -1;
    
            private readonly SerialDisposable subscription = new SerialDisposable();
            private bool running;
            private int stopAfterCount = ignoreStopAfterCount;
    
            public void Start(double beatsPerMinute)
            {
                Contract.Requires(beatsPerMinute > 0);
                Contract.Requires(!IsRunning);
                Contract.Ensures(IsRunning);
    
                Start(TimeSpan.FromSeconds(60 / beatsPerMinute));
            }
    
            public void Start(TimeSpan rate)
            {
                Contract.Requires(rate > TimeSpan.Zero);
                Contract.Requires(!IsRunning);
                Contract.Ensures(IsRunning);
    
                stopAfterCount = ignoreStopAfterCount;
    
                long? stopAfterCountLastTick = null;
    
                running = true;
    
                try
                {
                    subscription.Disposable = Observable
                        .Interval(rate)
                        .Do(number => stopAfterCountLastTick = stopAfterCount == 0 ? number : (long?)null)
                        .TakeWhile(_ => stopAfterCount == ignoreStopAfterCount || --stopAfterCount > ignoreStopAfterCount)
                        .Concat(Observable.If(
                            () => stopAfterCountLastTick.HasValue,
                            Observable.Defer(() => Observable.Return(stopAfterCountLastTick.Value))))
                        .Finally(() => running = false)
                        .Subscribe(Tick);
                }
                catch
                {
                    running = false;
                    throw;
                }
            }
    
            public void Stop()
            {
                Contract.Requires(IsRunning);
                Contract.Ensures(!IsRunning);
    
                subscription.Disposable = Disposable.Empty;
            }
    
            public void Stop(int afterCount)
            {
                Contract.Requires(afterCount >= 0);
                Contract.Requires(IsRunning);
                Contract.Requires(!IsStopping);
    
                if (afterCount == 0)
                {
                    Stop();
                }
                else
                {
                    Contract.Assert(afterCount > 0);
    
                    stopAfterCount = afterCount - 1;
                }
            }
    
            private void Tick(long number)
            {
                Contract.Requires(number >= 0);
    
                // TODO: Handle tick
                Console.WriteLine(number);
            }
    
            public void Dispose()
            {
                subscription.Dispose();
            }
        }
    }

    As you can see, this code is pretty simple though it leaves you in a strange situation where you must somehow expose ticks to callers.  You could use a .NET event or another observable, though being in this situation suggests that perhaps encapsulating the query in a class wasn't the best approach to begin with.

    Perhaps a simple factory method would be better: 

    using System;
    using System.Diagnostics.Contracts;
    using System.Reactive.Linq;
    using System.Threading;
    
    namespace RxLabs.Net45
    {
        public static class Metronome
        {
            public static IObservable<long> Create(double beatsPerMinute, out Action<int> stopAfterCount)
            {
                Contract.Requires(beatsPerMinute > 0);
                Contract.Ensures(Contract.Result<IObservable<long>>() != null);
                Contract.Ensures(Contract.ValueAtReturn(out stopAfterCount) != null);
    
                return Create(TimeSpan.FromSeconds(60 / beatsPerMinute), out stopAfterCount);
            }
    
            public static IObservable<long> Create(TimeSpan rate, out Action<int> stopAfterCount)
            {
                Contract.Requires(rate > TimeSpan.Zero);
                Contract.Ensures(Contract.Result<IObservable<long>>() != null);
                Contract.Ensures(Contract.ValueAtReturn(out stopAfterCount) != null);
    
                const int ignoreStopAfterCount = -1;
    
                var stop = ignoreStopAfterCount;
    
                stopAfterCount = afterCount =>
                {
                    Contract.Requires(afterCount > 0, "The stopAfterCount argument must be greater than 0.");
    
                    if (Interlocked.CompareExchange(ref stop, afterCount - 1, ignoreStopAfterCount) != ignoreStopAfterCount)
                    {
                        throw new InvalidOperationException("The stopAfterCount action cannot be called again until the observable has stopped.");
                    }
                };
    
                var query = Observable.Defer(() =>
                {
                    long? stopAfterCountLastTick = null;
    
                    stop = ignoreStopAfterCount;
    
                    return Observable
                        .Interval(rate)
                        .Do(number => stopAfterCountLastTick = stop == 0 ? number : (long?)null)
                        .TakeWhile(_ => stop == ignoreStopAfterCount || --stop > ignoreStopAfterCount)
                        .Concat(Observable.If(
                            () => stopAfterCountLastTick.HasValue,
                            Observable.Defer(() => Observable.Return(stopAfterCountLastTick.Value))));
                });
    
                // Ensure thread-safety for the stop variable when there are multiple subscriptions.
                return query.Publish().RefCount();
            }
        }
    }

    - Dave


    http://davesexton.com/blog

    • Edited by Dave Sexton Saturday, October 05, 2013 8:17 PM Fixed some grammar; formatting
    • Marked as answer by Artur Tadrała Tuesday, October 08, 2013 10:38 AM
    Saturday, October 05, 2013 8:13 PM

All replies

  • Hi Artur,

    1. The _metronome observable is hot.  A symptom is that when you subscribe, you may have already missed notifications.
    2. The _metronome observable is active.  Each time that you subscribe you get an observable that duplicates significant computation side-effects.

     

    You're incorrectly assuming that _metronome is passive and so it's shared by multiple subscriptions because it's hot.

    See my blog post on Hot and Cold Observables for an explanation of these terms.

    How this applies to your query specifically:

    1. You initiate the query by notifying _startSubject and it creates a new Interval observable.  Then, you notify _stopRequestedSubject, causing it to subscribe to _metronome, though because _metronome is hot the _startSubject notification and the projected Interval observable were missed.  As a result, _stopRequestedSubject doesn't see any ticks from Interval.  It simply waits for _startSubject to be notified again, thus _stopSubject is never notified.
    2. Each observer gets its own _metronome observable with a Select that projects a new Interval, which means that _metronome is active.  As a result, using the same reference to the _metronome observable within multiple queries and/or subscribing to it directly multiple times causes multiple Interval observables to be created whenever _startSubject is notified.

    The first point is easily proven by the fact that calling Stop has no effect.

    The second point is proven by calling Start once again after calling Stop.  This creates two additional Interval observables: one for your direct _metronome subscription and one for the subscription in the _stopRequestedSubject query.  Now there are three active Interval observables.

    Calling Start again after Stop also triggers the _stopRequestedSubject query as expected, so the first Interval does in fact stop; however, the second Interval is activated due to your use of the Concat operator.  You must call Stop a second time to stop the second Interval; however, as shown, calling Stop has no effect until you call Start again, which creates two additional Interval observables; therefore, it's a never-ending cycle.

    Ignoring those problems for a moment, your query's semantics with Concat are that you must call Stop each time that you call Start.  This is probably not the desired behavior of your class anyway.  It's probably safest for callers if you specify a contract that disallows calling Start multiple times without calling Stop in between.  It will also simplify your query.

    You can generally debug these issues by placing the Do operator in logical places to see if/when segments of the query are being notified.  For instance, the second point is proven by adding Do to _metronome in two places:

    var _metronome =_startSubject
        .Do(_ => Console.WriteLine("Creating Interval"))
        ...;
    _stopRequestedSubject
        .SelectMany(_metronome.Do(_ => Console.WriteLine("Skipping")).Skip)
        ...;

    You'll see that calling Stop has absolutely no effect on the printed output.  Then, calling Start again causes "Creating Interval" to be printed twice.  Also, "Skipping" begins printing until the first Interval is stopped; however, the second Interval is also activated.  Calling Stop a second time does nothing unless you follow it with another call to Start, thus proving that the query is unstoppable.

    Repeat has no effect in any of your queries since apparently you're not calling OnCompleted on any of the subjects.

    Furthermore, you're probably using too many subjects in the first place.  See my blog post on To Use Subject or Not To Use Subject for details.

    I'd start designing this class by exposing the Start/Stop contract that I mentioned previously.  Also, it's important to realize that your implementation is not thread-safe, so I'm going to assume that's acceptable and design the following example without regard to thread-safety either.

    class Metronome
    {
        public bool IsRunning
        {
            get
            {
                Contract.Ensures(Contract.Result<bool>() == running);
    
                return running;
            }
        }
    
        private bool running;
    
        public void Start(TimeSpan rate)
        {
            Contract.Requires(rate > TimeSpan.Zero);
            Contract.Requires(!IsRunning);
            Contract.Ensures(IsRunning);
    
            running = true;
        }
    
        public void Stop()
        {
            Contract.Requires(IsRunning);
            Contract.Ensures(!IsRunning);
    
            running = false;
        }
    }

    (I'm using Code Contracts for my examples, though of course you could use if...throw instead if you'd prefer.)

    Finally, since you apparently want to be stateful, I'd fill in the implementation by storing only the subscription to an observable and I'd recreate that observable every time Start is called.

    using System;
    using System.Diagnostics.Contracts;
    using System.Reactive.Disposables;
    using System.Reactive.Linq;
    
    namespace RxLabs.Net45
    {
        public sealed class Metronome : IDisposable
        {
            public bool IsRunning
            {
                get
                {
                    Contract.Ensures(Contract.Result<bool>() == running);
    
                    return running;
                }
            }
    
            public bool IsStopping
            {
                get
                {
                    Contract.Ensures(Contract.Result<bool>() == (stopAfterCount != ignoreStopAfterCount));
    
                    return stopAfterCount != ignoreStopAfterCount;
                }
            }
    
            private const int ignoreStopAfterCount = -1;
    
            private readonly SerialDisposable subscription = new SerialDisposable();
            private bool running;
            private int stopAfterCount = ignoreStopAfterCount;
    
            public void Start(double beatsPerMinute)
            {
                Contract.Requires(beatsPerMinute > 0);
                Contract.Requires(!IsRunning);
                Contract.Ensures(IsRunning);
    
                Start(TimeSpan.FromSeconds(60 / beatsPerMinute));
            }
    
            public void Start(TimeSpan rate)
            {
                Contract.Requires(rate > TimeSpan.Zero);
                Contract.Requires(!IsRunning);
                Contract.Ensures(IsRunning);
    
                stopAfterCount = ignoreStopAfterCount;
    
                long? stopAfterCountLastTick = null;
    
                running = true;
    
                try
                {
                    subscription.Disposable = Observable
                        .Interval(rate)
                        .Do(number => stopAfterCountLastTick = stopAfterCount == 0 ? number : (long?)null)
                        .TakeWhile(_ => stopAfterCount == ignoreStopAfterCount || --stopAfterCount > ignoreStopAfterCount)
                        .Concat(Observable.If(
                            () => stopAfterCountLastTick.HasValue,
                            Observable.Defer(() => Observable.Return(stopAfterCountLastTick.Value))))
                        .Finally(() => running = false)
                        .Subscribe(Tick);
                }
                catch
                {
                    running = false;
                    throw;
                }
            }
    
            public void Stop()
            {
                Contract.Requires(IsRunning);
                Contract.Ensures(!IsRunning);
    
                subscription.Disposable = Disposable.Empty;
            }
    
            public void Stop(int afterCount)
            {
                Contract.Requires(afterCount >= 0);
                Contract.Requires(IsRunning);
                Contract.Requires(!IsStopping);
    
                if (afterCount == 0)
                {
                    Stop();
                }
                else
                {
                    Contract.Assert(afterCount > 0);
    
                    stopAfterCount = afterCount - 1;
                }
            }
    
            private void Tick(long number)
            {
                Contract.Requires(number >= 0);
    
                // TODO: Handle tick
                Console.WriteLine(number);
            }
    
            public void Dispose()
            {
                subscription.Dispose();
            }
        }
    }

    As you can see, this code is pretty simple though it leaves you in a strange situation where you must somehow expose ticks to callers.  You could use a .NET event or another observable, though being in this situation suggests that perhaps encapsulating the query in a class wasn't the best approach to begin with.

    Perhaps a simple factory method would be better: 

    using System;
    using System.Diagnostics.Contracts;
    using System.Reactive.Linq;
    using System.Threading;
    
    namespace RxLabs.Net45
    {
        public static class Metronome
        {
            public static IObservable<long> Create(double beatsPerMinute, out Action<int> stopAfterCount)
            {
                Contract.Requires(beatsPerMinute > 0);
                Contract.Ensures(Contract.Result<IObservable<long>>() != null);
                Contract.Ensures(Contract.ValueAtReturn(out stopAfterCount) != null);
    
                return Create(TimeSpan.FromSeconds(60 / beatsPerMinute), out stopAfterCount);
            }
    
            public static IObservable<long> Create(TimeSpan rate, out Action<int> stopAfterCount)
            {
                Contract.Requires(rate > TimeSpan.Zero);
                Contract.Ensures(Contract.Result<IObservable<long>>() != null);
                Contract.Ensures(Contract.ValueAtReturn(out stopAfterCount) != null);
    
                const int ignoreStopAfterCount = -1;
    
                var stop = ignoreStopAfterCount;
    
                stopAfterCount = afterCount =>
                {
                    Contract.Requires(afterCount > 0, "The stopAfterCount argument must be greater than 0.");
    
                    if (Interlocked.CompareExchange(ref stop, afterCount - 1, ignoreStopAfterCount) != ignoreStopAfterCount)
                    {
                        throw new InvalidOperationException("The stopAfterCount action cannot be called again until the observable has stopped.");
                    }
                };
    
                var query = Observable.Defer(() =>
                {
                    long? stopAfterCountLastTick = null;
    
                    stop = ignoreStopAfterCount;
    
                    return Observable
                        .Interval(rate)
                        .Do(number => stopAfterCountLastTick = stop == 0 ? number : (long?)null)
                        .TakeWhile(_ => stop == ignoreStopAfterCount || --stop > ignoreStopAfterCount)
                        .Concat(Observable.If(
                            () => stopAfterCountLastTick.HasValue,
                            Observable.Defer(() => Observable.Return(stopAfterCountLastTick.Value))));
                });
    
                // Ensure thread-safety for the stop variable when there are multiple subscriptions.
                return query.Publish().RefCount();
            }
        }
    }

    - Dave


    http://davesexton.com/blog

    • Edited by Dave Sexton Saturday, October 05, 2013 8:17 PM Fixed some grammar; formatting
    • Marked as answer by Artur Tadrała Tuesday, October 08, 2013 10:38 AM
    Saturday, October 05, 2013 8:13 PM
  • Hi Dave

    First of all thank you for detailed answers! Now I understand why my code does not work but honestly I need some time to fully grasp RX philosophy.

    If it comes to state in Metronome class my idea was to declaratively describe behavior of metronome by Rx queries and remove as much state as possible. Previous implementation was very imperative and statefull (using events and timers). I planned to wire UI with service classes (one of which is Metronome class) by injecting IObservable instances and wire them with queries (they would replace Subjects<> classes) possibly through constructor parameters (i.e. RactiveCommands from http://www.reactiveui.net/).

    I plan to expose tick to consumers by simply implement IObservable by Metronome class.

    I'll post my solution again once I digest all you suggestion.

    My primary goal is to learn Rx, and grasp reactive programming idea, so feel free to destroy my ideas.;)

    Thanks!

    Artur

     

    Sunday, October 06, 2013 7:43 PM