Introducing SampledSelect - Can it be simplified?

Pertanyaan Introducing SampledSelect - Can it be simplified?

  • 15 Juli 2011 8:07
     
      Memiliki Kode

    I had a need for an operator that turned quite hard to write - SampledSelect.

    The use case is as follows:

    We have a source that pushs video frames.

    We have a filter (selector) that manipulates the frames but the latency it introduces might be longer than the source frame rate.

    We need to pipe the source with the filter, but buffer only the last frame while the filter is still working on an older frame, and drop all older frames, even if they haven't been pushed to the filter at all.

    In short - we want the source to be sampled in the rate dictated by our filter output, so the filter acts both as a selector and a sampler.

    When writing the SampledSelect operator I used the SampledResponsive example by Sergey Aldoukhov (also discussed in this forum).

    The source + test can be found here: https://gist.github.com/1058594

    I'll paste it here for convenience:

    using System;
    using System.Globalization;
    using System.Linq;
    using System.Reactive.Concurrency;
    using System.Reactive.Linq;
    using System.Reactive.Subjects;
    using Microsoft.Reactive.Testing;
    using NUnit.Framework;
    
    namespace TestRx
    {
      public class Tester : ReactiveTest
      {
        [Test]
        public void TestSampledSelect()
        {
          var scheduler = new TestScheduler();
          var source = scheduler.CreateHotObservable(
            OnNext(110, "A"),
            OnNext(120, "B"),
            OnNext(130, "C"),
            OnNext(140, "D"),
            // ... delay ...
            OnNext(180, "E"),
            OnNext(190, "F"),
            OnNext(200, "G"), OnNext(201, "H"), OnNext(202, "I"), // burst
            // ... delay ...
            OnNext(250, "J"),
            // ... delay ...
            OnNext(300, "K"), OnNext(301, "L"), OnNext(302, "M") // burst
            );
    
          var computationTime = TimeSpan.FromTicks(15);
          Func<IObservable<string>, IObservable<string>> computation =
            src => src
                  .Select(str => str + str)
                  .Delay(computationTime, scheduler);
    
          var sampledSource = source.SampledSelect(computation, scheduler);
    
          var results = scheduler.Start(() => sampledSource, created: 0, subscribed: 0, disposed: 10000);
    
          results.Messages.ForEach(n => Console.WriteLine(n.Value + ":" + n.Time));
    
          var expectedResults = new[]
                     {
                       OnNext(125, "AA"),
                       OnNext(140, "BB"),
                       // skipping "CC"
                       OnNext(155, "DD"),
                       OnNext(195, "EE"),
                       OnNext(210, "FF"),
                       // skipping "GG"
                       // skipping "HH"
                       OnNext(225, "II"),
                       OnNext(265, "JJ"),
                       OnNext(315, "KK"),
                       // skipping "LL"
                       OnNext(330, "MM"),
                     };
          ReactiveAssert.AreElementsEqual(expectedResults, results.Messages);
        }
      }
    
      public static class RxOperators
      {
        public static IObservable<Indexed<T>> Index<T>(this IObservable<T> source)
        {
          return source.Select((value, index) => new Indexed<T>(value, index));
        }
    
        public static IObservable<T> RemoveIndex<T>(this IObservable<Indexed<T>> source)
        {
          return source.Select(indexed => indexed.Value);
        }
    
        public static IObservable<TResult> CombineVeryLatest<TLeft, TRight, TResult>(
          this IObservable<TLeft> leftSource,
          IObservable<TRight> rightSource,
          Func<TLeft, TRight, TResult> selector)
        {
          var coldResult = Observable.Defer(() =>
          {
            int leftIndex = -1, rightIndex = -1;
            var result = leftSource
              .Index()
              .CombineLatest(rightSource.Index(), (left, right) => new {left, right})
              .Where(t => t.left.Index != leftIndex && t.right.Index != rightIndex)
              .Do(t =>
              {
                leftIndex = t.left.Index;
                rightIndex = t.right.Index;
              })
              .Select(t => selector(t.left.Value, t.right.Value));
            return result;
          });
          return coldResult;
        }
    
        public static IObservable<TResult> SampledSelect<T, TResult>(
          this IObservable<T> source,
          Func<IObservable<T>, IObservable<TResult>> selector,
          IScheduler scheduler = null)
        {
          scheduler = scheduler ?? Scheduler.TaskPool;
    
          var hotResult = source.Publish(src =>
          {
            var projectedValues = new Subject<TResult>();
    
            var whenCanFire = projectedValues
              .StartWith(scheduler, default(TResult));
    
            var sampledSource = source
              .CombineVeryLatest(whenCanFire, (value, signal) => value);
    
            var subscription = selector(sampledSource).Subscribe(projectedValues);
            var result = projectedValues.Finally(subscription.Dispose);
    
            return result;
          });
          return hotResult;
        }
      }
    
      [Serializable]
      public struct Indexed<T>
      {
        private readonly int index;
        private readonly T value;
    
        public int Index
        {
          get { return index; }
        }
    
        public T Value
        {
          get { return value; }
        }
    
        public Indexed(T value, int index)
        {
          this.index = index;
          this.value = value;
        }
    
        public bool Equals(Indexed<T> other)
        {
          return other.index == index && Equals(other.value, value);
        }
    
        public override bool Equals(object obj)
        {
          if (ReferenceEquals(null, obj)) return false;
          if (!(obj is Indexed<T>)) return false;
          return Equals((Indexed<T>) obj);
        }
    
        public override int GetHashCode()
        {
          unchecked
          {
            return index*397 ^ ((object) value == null ? 1979 : value.GetHashCode());
          }
        }
    
        public static bool operator ==(Indexed<T> first, Indexed<T> second)
        {
          return first.Equals(second);
        }
    
        public static bool operator !=(Indexed<T> first, Indexed<T> second)
        {
          return !first.Equals(second);
        }
    
        public override string ToString()
        {
          return string.Format(CultureInfo.CurrentCulture, "{0}:{1}", Index, Value);
        }
      }
    }


    I hope this might help someone.

    I'd also like to ask the experts here if this can be further simplified? Try to write a simpler operator that passes the provided test.

    Maybe duration operators could help here.


    Omer Mor




Semua Balasan

  • 15 Juli 2011 18:57
     
     

    Hi Omer,

    Check out the BufferIntrospective operator in Rxx:

    http://rxx.codeplex.com/SourceControl/changeset/view/61975#1055700

    Here's a lab that illustrates its usage (see BufferExperiment):

    http://rxx.codeplex.com/SourceControl/changeset/view/61975#980347

    - Dave


    http://davesexton.com/blog
  • 15 Juli 2011 19:00
     
     

    Hi Omer,

    To clarify, you can get the desired behavior by always retrieving the last element in the buffer:

    var sample = buffer[buffer.Length - 1]

    - Dave


    http://davesexton.com/blog
  • 16 Juli 2011 20:01
     
      Memiliki Kode

    Hi Dave,

    I failed to use BufferIntrospective in my tests. I suspect it might be due to limitations of using the TestScheduler.

    Here is my test:

     

     public class Tester : ReactiveTest
     {
      [Test]
      public void TestRxx()
      {
       var scheduler = new TestScheduler();
       var source = scheduler.CreateHotObservable(
        OnNext(110, "A"),
        OnNext(120, "B"),
        OnNext(130, "C"),
        OnNext(140, "D"),
        // ... delay ...
        OnNext(180, "E"),
        OnNext(190, "F"),
        OnNext(200, "G"), OnNext(201, "H"), OnNext(202, "I"), // burst
        // ... delay ...
        OnNext(250, "J"),
        // ... delay ...
        OnNext(300, "K"), OnNext(301, "L"), OnNext(302, "M") // burst
        );
    
       var computationTime = TimeSpan.FromTicks(15);
    
       var sampledSource = source
        .BufferIntrospective(scheduler)
        .SelectMany(buffer => Observable
              .Return(buffer.Last())
              .Select(str => str + str)
              .Do(x => Console.WriteLine(">s {0}@{1}", x, scheduler.Now.Ticks))
              .Delay(computationTime, scheduler)
              .Do(x => Console.WriteLine(">e {0}@{1}", x, scheduler.Now.Ticks))
              .Take(1)
              );
    
    
       var results = scheduler.Start(() => sampledSource, created: 0, subscribed: 0, disposed: 10000);
    
       var expectedResults = new[]
             {
              OnNext(125, "AA"),
              OnNext(140, "BB"),
              // skipping "CC"
              OnNext(155, "DD"),
              OnNext(195, "EE"),
              OnNext(210, "FF"),
              // skipping "GG"
              // skipping "HH"
              OnNext(225, "II"),
              OnNext(265, "JJ"),
              OnNext(315, "KK"),
              // skipping "LL"
              OnNext(330, "MM"),
             };
       ReactiveAssert.AreElementsEqual(expectedResults, results.Messages);
      }
     }
    
    

    Please see if you can spot usage errors.

     


    Omer Mor
  • 17 Juli 2011 8:38
     
      Memiliki Kode
    //solution with the new Async CTP / Rx Experimental
    var sampledSource = Observable.Create<string>(async observer =>
    {
      var ready = new Subject<int>();
      //take latest after ready or if empty then take the next new
      var oNext = source.TakeUntil(ready).TakeLast(1).Concat(source).Take(1);
      var tNext = oNext.ToTask();
      while (true)
      {
        ready.OnNext(0);
        string str = await tNext;
        tNext = oNext.ToTask();
        observer.OnNext(await computation(Observable.Return(str)));
      }
    
    });
    
    

  • 17 Juli 2011 12:36
     
      Memiliki Kode
    //three lines solution
    var latest = new BehaviorSubject<string>(null);
    source.Subscribe(latest);
    var sampledSource = computation(latest.Where(x => x != null).Take(1).Do(_ => latest.OnNext(null))).Repeat();
    
    

    • Ditandai sebagai Jawaban oleh Omer Mor 17 Juli 2011 18:40
    • Tanda sebagai Jawaban dihapus oleh Omer Mor 25 Juli 2011 19:23
    •  
  • 17 Juli 2011 18:39
     
      Memiliki Kode

    Brilliant Steffen - very elegant solution!

    I updated the gist source with your solution, written as an Rx operator with the appropriate subscription disposal and cold-to-hot inner publishing.

    Here's the complete source for the test + operator:

    using System;
    using System.Reactive.Linq;
    using System.Reactive.Subjects;
    using Microsoft.Reactive.Testing;
    using NUnit.Framework;
    
    namespace TestRx
    {
      public class Tester : ReactiveTest
      {
        [Test]
        public void TestSampledSelect()
        {
          var scheduler = new TestScheduler();
          var source = scheduler.CreateHotObservable(
            OnNext(110, "A"),
            OnNext(120, "B"),
            OnNext(130, "C"),
            OnNext(140, "D"),
            // ... delay ...
            OnNext(180, "E"),
            OnNext(190, "F"),
            OnNext(200, "G"), OnNext(201, "H"), OnNext(202, "I"), // burst
            // ... delay ...
            OnNext(250, "J"),
            // ... delay ...
            OnNext(300, "K"), OnNext(301, "L"), OnNext(302, "M") // burst
            );
    
          var computationTime = TimeSpan.FromTicks(15);
          Func<IObservable<string>, IObservable<string>> computation =
            src => src
                  .Select(str => str + str)
                  .Delay(computationTime, scheduler);
    
          var sampledSource = source.SampledSelect(computation);
    
          var results = scheduler.Start(() => sampledSource, created: 0, subscribed: 0, disposed: 10000);
    
          var expectedResults = new[]
                     {
                       OnNext(125, "AA"),
                       OnNext(140, "BB"),
                       // skipping "CC"
                       OnNext(155, "DD"),
                       OnNext(195, "EE"),
                       OnNext(210, "FF"),
                       // skipping "GG"
                       // skipping "HH"
                       OnNext(225, "II"),
                       OnNext(265, "JJ"),
                       OnNext(315, "KK"),
                       // skipping "LL"
                       OnNext(330, "MM"),
                     };
          ReactiveAssert.AreElementsEqual(expectedResults, results.Messages);
        }
      }
    
      public static class RxOperators
      {
        public static IObservable<TResult> SampledSelect<T, TResult>(
          this IObservable<T> source,
          Func<IObservable<T>, IObservable<TResult>> selector)
        {
          var hotResult = source.Publish(src =>
          {
            var latest = new BehaviorSubject<Tuple<T>>(null);
            var subscription = src
              .Select(value => Tuple.Create(value))
              .Subscribe(latest);
    
            var sampledSource = selector(
              latest
                .Where(tuple => tuple != null)
                .Select(tuple => tuple.Item1)
                .Take(1)
                .Do(_ => latest.OnNext(null)))
              .Repeat();
            var result = sampledSource.Finally(subscription.Dispose);
            return result;
          });
          return hotResult;
        }
    
      }
    }
    


     


    Omer Mor
  • 25 Juli 2011 19:44
     
      Memiliki Kode

    Unfortunatly I found out there are problems with the Steffen's solution, and apparently my test is not as rock-solid as I thought.

    There are 2 problems:

    1. It mistreats sequence termination: OnComplete & OnError.
    2. It doesn't work in the real world, i.e when not using the TestScheduler.

    I'll work on writing a test for the 2nd problem (I found it out in a real-world app).

    The 1st problem is easier to fix the test for:

     

      [Test]
      public void TestSampledSelect()
      {
       var scheduler = new TestScheduler();
       var source = scheduler.CreateHotObservable(
        OnNext(110, "A"),
        OnNext(120, "B"),
        OnNext(130, "C"),
        OnNext(140, "D"),
        // ... delay ...
        OnNext(180, "E"),
        OnNext(190, "F"),
        OnNext(200, "G"), OnNext(201, "H"), OnNext(202, "I"), // burst
        // ... delay ...
        OnNext(250, "J"),
        // ... delay ...
        OnNext(300, "K"), OnNext(301, "L"), OnNext(302, "M"), // burst
        // complete
        OnCompleted<string>(302),
        OnNext(303, "N")
        );
    
       var computationTime = TimeSpan.FromTicks(15);
       Func<IObservable<string>, IObservable<string>> computation =
        src => src
           .Select(str => str + str)
           .Delay(computationTime, scheduler);
    
       var sampledSource = source.SampledSelect(computation);
    
       var results = scheduler.Start(() => sampledSource, created: 0, subscribed: 0, disposed: 10000);
    
       var expectedResults = new[]
             {
              OnNext(125, "AA"),
              OnNext(140, "BB"),
              // skipping "CC"
              OnNext(155, "DD"),
              OnNext(195, "EE"),
              OnNext(210, "FF"),
              // skipping "GG"
              // skipping "HH"
              OnNext(225, "II"),
              OnNext(265, "JJ"),
              OnNext(315, "KK"),
              // skipping "LL"
              OnCompleted<string>(315)
             };
       ReactiveAssert.AreElementsEqual(expectedResults, results.Messages);
      }
    
    

     

    The test currently gives the following error:

     

    TestSampledSelect : FailedMicrosoft.VisualStudio.TestTools.UnitTesting.AssertFailedException : Assert.Fail failed. 
    Expected: [OnNext(AA)@125, OnNext(BB)@140, OnNext(DD)@155, OnNext(EE)@195, OnNext(FF)@210, OnNext(II)@225, OnNext(JJ)@265, OnNext(KK)@315, OnCompleted()@330]
    Actual..: [OnNext(AA)@125, OnNext(BB)@140, OnNext(DD)@155, OnNext(EE)@195, OnNext(FF)@210, OnNext(II)@225, OnNext(JJ)@265, OnNext(KK)@315]
    
    at Microsoft.VisualStudio.TestTools.UnitTesting.Assert.HandleFail(String assertionName, String message, Object[] parameters)
    at Microsoft.VisualStudio.TestTools.UnitTesting.Assert.Fail(String message, Object[] parameters)
    at Microsoft.Reactive.Testing.ReactiveAssert.AreElementsEqual(IEnumerable`1 expected, IEnumerable`1 actual)
    at TestRx.Tester.TestSampledSelect() in SampledSelect.cs: line 66 

    Can anyone here please help me fix this?

     


    Omer Mor
  • 27 Juli 2011 6:41
     
      Memiliki Kode
    //For this problem TPL Dataflow may better suited as Rx.
    //A solution could look like this Dataflow pattern:
    
    var source = new BroadcastBlock<string>(null);
    var computationBlock = new ActionBlock<string>(computation1, new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 });
    source.LinkTo(computationBlock);
    
    

  • 27 Juli 2011 17:56
     
     
    Interesting, I'll give it a try. However, on the surface of it, it seems like a good use case for Rx, and I'd still like to know how best can I write this operator.
    Omer Mor
  • 28 Juli 2011 0:11
     
     

    Hi Omer,

    The BufferIntrospective solution should work fine, although you're correct that apparently it doesn't work with TestScheduler.  Although I haven't looked into it yet, I suspect that it's because the operator is built around the idea that the scheduler will introduce real-time concurrency.  The introspective component of the operator tracks this time, but since the TestScheduler doesn't actually add any time your test fails.

    Perhaps you could try it in a small console application instead of a unit test.

    I'll take a look at the implementation and see if there's any way to make it work with virtual time.

    We'll also consider adding a SampleIntrospective operator to Rxx to help simplify your query.

    - Dave


    http://davesexton.com/blog
  • 08 Maret 2012 20:36
     
     

    Hi Omer,

    The reason your unit test doesn't work is because Delay doesn't block observations.  I've described the problem in a bit more detail in the following thread:

    http://social.msdn.microsoft.com/Forums/en-US/rx/thread/2519be4c-c704-4a1c-9f95-63c5289c226b

    I've just added the SampleIntrospective operator (at the bottom) to Rxx along with a unit test similar to yours; however, to adjust for scheduling offsets in your unit test I had to increase a few of the ticks for notifications by 1 and also increased most of the expected results by 1 or 2.  This is required due to race conditions and the internal scheduling of actions 1 tick late, which causes the TestScheduler to increase the clock by 1 tick before notifying observers.  This isn't really predictable, so I simply chose the correct offsets to match the actual results.  But if you follow along with the logic from your old unit test, you'll see that it's relatively the same - only off by 1 or 2 ticks with each notification.

    - Dave


    http://davesexton.com/blog

  • 09 Maret 2012 14:30
     
     

    Thanks Dave,

    I love what you did with your SampleIntrospective operator. I don't like the no-deterministic nature of the test, but I can live with it (after all the tests are not the goal).

    I saw this thread which talks basically about the same problem (fast producer/slow consumer): http://social.msdn.microsoft.com/Forums/ar-SA/rx/thread/29a6b31c-254e-4daa-99b2-16e5b12ecdbc. They came up with a bunch of solutions as well. This means the fast producer/slow consumer is a real problem in this domain, and I with the Rx team would address it in the framework. I believe the TPL Dataflow framework does.

    Maybe someone can sum all the different approaches we came up for this problem (SampledSelect, SampleIntrospective, ObserveLatestOn, Bundler, BufferOn), maybe even compare and contrast them with each other.


    Omer Mor

  • 09 Maret 2012 15:26
     
     

    Hi Omer,

    The BufferOn operator I created in the post you're referring to is VERY similar to the *Introspective operators in Rxx.

    I'm not sure if there is a difference in behavior between WindowIntrospective & BufferOn (Dave wrote introspective & I wrote BufferOn).

    Have you tried switching the two operators in/out to see if your tests behave differently?

    *Update*

    Actually there is a difference. Dave's version returns IObservable<IObservable<T>>. I'm not 100% sure, but when I wrote BufferOn I decided it was not possible to return IO<IO<T>> and have the operator behave correctly in all scenarios.


    James Miles http://enumeratethis.com

  • 09 Maret 2012 16:28
     
     

    Hi James,

    I haven't checked the operators yet.

    The problem I was trying to solve is already in production. Now I'm just curios and would want the community to have a single, definitive, "textbook" solution.


    Omer Mor

  • 09 Maret 2012 17:25
     
     

    Hi Omer,

    > I don't like the no-deterministic nature of the test

    Agreed.  So I just looked at it closely and realized that it's actually deterministic.  Every notification is off by 1.  The reason why some of the expected notifications appear to be off by 2 is because they are observed relative to the time of the previous notification, which is off by 1, so the effect is actually being compounded.

    I think each notification is off by 1 simply because SampleIntrospective uses the specified scheduler internally to schedule each output notification "immediately".  In other words, it schedules the first notification to run immediately on the specified scheduler, and then it does the same for each internal buffer when the previous observation completes.  TestScheduler reacts to this "immediate" scheduling of an action by incrementing its clock by 1.  Perhaps it's meant to simulate side-effects of Windows thread scheduling, which may cause delays when switching contexts.

    - Dave


    http://davesexton.com/blog

  • 09 Maret 2012 17:52
     
     

    Hi James, Omer, 

    Rxx actually contains two different but related introspection operators.

    The Introspect operator pairs each notification with an observable that represents the duration of its observation.  I don't recall ever seeing this operator in any discussion in this forum.

    The WindowIntrospective operator is more of a generalization on the need to aggregate notifications while the previous notification is being observed.  I agree that there have been variations on this forum, but I feel that Rxx's design is probably as slim and generalized as it's going to get.  I found a way to turn overlapping notifications into windows by using only a queue and a scheduler.  The BufferIntrospective operator simply calls WindowIntrospective and ToList, which reveals a similarity in design to Rx's Window and Buffer operators.  The SampleIntrospective operator simply calls WindowIntrospective as well, though there may be room for optimization here; e.g., to avoid buffering data except for the last notification.  We can look into that for a future release.  I suspect the solution is going to be similar to the Window overloads in Rx, by accepting parameter(s) that support the opening and closing of introspection windows; i.e., sliding vs. hopping.

    Of course, my opinion on the matter is biased ;)

    - Dave


    http://davesexton.com/blog

  • 09 Maret 2012 18:15
     
      Memiliki Kode

    Hi Dave,

    Firstly, I think BufferOn & WindowIntrospective are trying to do the same thing.

    Secondly, I agree that WindowIntrospective is the better API. In fact, I originally intended (and tried) to implement BufferOn like this! At the time though, I came to the conclusion (I'm now thinking incorrectly!) that there was fundamental flaw with an "introspective" operator that yielded a window like this (returning IO<IO<T>).

    It has been a long time since I wrote BufferOn though, so I'll have to think about this some more.

    The implementations of BufferOn & WindowIntrospective have some similarities.

    BufferOn (I've just ported to 2.0)

        public class SchedulerQueue : IDisposable
        {
            private readonly MultipleAssignmentDisposable disposable;
            private bool hasFaulted;
            private bool isAcquired;
            private readonly Queue<Action> queue;
            private readonly IScheduler scheduler;
    
            public SchedulerQueue(IScheduler scheduler)
            {
                queue = new Queue<Action>();
                disposable = new MultipleAssignmentDisposable();
                this.scheduler = scheduler;
            }
    
            public void Dispose()
            {
                disposable.Dispose();
            }
    
            public void EnsureActive()
            {
                bool flag = false;
                lock (queue)
                {
                    if (!hasFaulted && (queue.Count > 0))
                    {
                        flag = !isAcquired;
                        isAcquired = true;
                    }
                }
                if (flag)
                {
                    disposable.Disposable = scheduler.Schedule(self =>
                    {
                        Action action;
                        lock (queue)
                        {
                            if (queue.Count > 0)
                            {
                                action = queue.Dequeue();
                            }
                            else
                            {
                                isAcquired = false;
                                return;
                            }
                        }
                        try
                        {
                            action();
                        }
                        catch (Exception exception)
                        {
                            lock (queue)
                            {
                                queue.Clear();
                                hasFaulted = true;
                            }
                            ExceptionDispatchInfo.Capture(exception).Throw();
                        }
                        self();
                    });
                }
            }
    
            public void Enqueue(Action action)
            {
                lock (queue)
                {
                    queue.Enqueue(action);
                }
            }
        }
    
        public static class Ex
        {
            public static IObservable<TResult> BufferOn<TSource, TResult>(
                this IObservable<TSource> source, 
                Func<IObservable<TSource>, IObservable<TResult>> selector,
                IScheduler scheduler)
            {
                return Observable.Create<TResult>(observer =>
                {
                    var active = false;
                    var gate = new object();
                    var mutable = new MultipleAssignmentDisposable();
                    var current = default(Subject<TSource>);
                    var queue = new SchedulerQueue(scheduler);
    
                    Action<IObservable<TSource>> queryWindow = window =>
                    {
                        mutable.Disposable = selector(window).Subscribe(
                            n =>
                            {
                                queue.Enqueue(() => observer.OnNext(n));
                                queue.EnsureActive();
                            },
                            ex =>
                            {
                                queue.Enqueue(() => observer.OnError(ex));
                                queue.EnsureActive();
                            },
                            () =>
                            {
                                queue.Enqueue(() =>
                                {
                                    lock (gate)
                                    {
                                        if (current != null)
                                        {
                                            current.OnCompleted();
                                            current = null;
                                        }
                                        else
                                        {
                                            active = false;
                                        }
                                    }
                                });
                                queue.EnsureActive();
                            });
                    };
    
                    var disposable = source.Subscribe(n =>
                    {
                        lock (gate)
                        {
                            if (!active)
                            {
                                active = true;
                                queryWindow(Observable.Return(n));
                            }
                            else
                            {
                                if (current == null)
                                {
                                    current = new Subject<TSource>();
                                    queryWindow(current);
                                }
                                current.OnNext(n);
                            }
                        }
                    }, ex =>
                    {
                        lock (gate)
                        {
                            if (current != null)
                            {
                                current.OnError(ex);
                            }
                            queue.Enqueue(() => observer.OnError(ex));
                        }
                        queue.EnsureActive();
                    }, () =>
                    {
                        lock (gate)
                        {
                            if (current != null)
                            {
                                current.OnCompleted();
                            }
                            queue.Enqueue(observer.OnCompleted);
                        }
                        queue.EnsureActive();
                    });
                    return new CompositeDisposable(disposable, mutable);
                });
            }
        }

    I like the idea of separating the queue out like this.


    James Miles http://enumeratethis.com


  • 09 Maret 2012 18:58
     
      Memiliki Kode

    Hi Dave,

    I'm comparing WindowIntrospective with BufferOn.

    I have some unit tests for WindowIntrospective that are failing. Do you agree that these tests should pass?

    using System;
    using System.Linq;
    using System.Collections.Generic;
    using System.Reactive.Concurrency;
    using System.Reactive.Disposables;
    using System.Reactive.Linq;
    using System.Reactive.Subjects;
    using Microsoft.Reactive.Testing;
    using Microsoft.VisualStudio.TestTools.UnitTesting;
    
    namespace UnitTestProject1
    {
        [TestClass]
        public class BufferOnTests
        {
            [TestMethod]
            public void OnNext_while_active_and_window_open()
            {
                var scheduler = new TestScheduler();
                var source = new Subject<object>();
                var expectedInWindow1 = new object();
                var expectedInWindow2A = new object();
                var expectedInWidnow2B = new object();
                var results = new List<IList<object>>();
    
                source.WindowIntrospective(scheduler).SelectMany(w => w.ToList()).Subscribe(results.Add);
    
                source.OnNext(expectedInWindow1);
                source.OnNext(expectedInWindow2A);
                source.OnNext(expectedInWidnow2B);
                scheduler.Start();
    
                Assert.AreEqual(2, results.Count);
                Assert.AreEqual(expectedInWindow1, results[0].Single());
                Assert.IsTrue(results[1].SequenceEqual(new[] { expectedInWindow2A, expectedInWidnow2B }));
            }
    
            [TestMethod]
            public void OnNext_while_active_and_window_closed()
            {
                var scheduler = new TestScheduler();
                var source = new Subject<object>();
                var expectedInWindow1 = new object();
                var expectedInWindow2 = new object();
                var results = new List<IList<object>>();
    
                source.WindowIntrospective(scheduler).SelectMany(w => w.ToList()).Subscribe(results.Add);
    
                source.OnNext(expectedInWindow1);
                source.OnNext(expectedInWindow2);
    
                scheduler.Start();
    
                Assert.AreEqual(2, results.Count);
                Assert.AreEqual(expectedInWindow1, results[0].Single());
                Assert.AreEqual(expectedInWindow2, results[1].Single());
            }
        }
    
        public static class Ex
        {
            /// <summary>
            /// Generates a sequence of windows where each window contains all values that were observed from 
            /// the <paramref name="source"/> while the values in the previous window were being observed.
            /// </summary>
            /// <typeparam name="TSource">The object that provides notification information.</typeparam>
            /// <param name="source">The observable sequence from which to create introspection windows.</param>
            /// <param name="scheduler">Schedules when windows are observed as well as the values in each window.</param>
            /// <returns>The source observable sequence buffered into introspection windows.</returns>
            public static IObservable<IObservable<TSource>> WindowIntrospective<TSource>(
                this IObservable<TSource> source,
                IScheduler scheduler)
            {
                var observable = Observable.Create<IObservable<TSource>>(
                    observer =>
                    {
                        var queue = new Queue<TSource>();
                        var window = new Subject<TSource>();
    
                        bool pendingDrain = false;
                        bool sourceCompleted = false;
                        object gate = new object();
    
                        var sourceSubscription = new SingleAssignmentDisposable();
                        var drainSchedule = new SerialDisposable();
                        var schedules = new CompositeDisposable(drainSchedule);
    
                        Action ensureDraining = () =>
                        {
                            if (pendingDrain)
                                return;
    
                            pendingDrain = true;
    
                            drainSchedule.Disposable =
                                scheduler.Schedule(self =>
                                {
                                    Queue<TSource> currentQueue;
    
                                    lock (gate)
                                    {
                                        currentQueue = queue;
                                        queue = new Queue<TSource>();
                                    }
    
                                    currentQueue.ForEach(window.OnNext);
    
                                    window.OnCompleted();
    
                                    bool loop, completeNow;
    
                                    lock (gate)
                                    {
                                        pendingDrain = queue.Count > 0;
                                        completeNow = !pendingDrain && sourceCompleted;
    
                                        if (completeNow)
                                        {
                                            loop = false;
                                        }
                                        else
                                        {
                                            window = new Subject<TSource>();
    
                                            // Must push the new window before unlocking the gate to avoid a race condition when pendingDrain is false.
                                            observer.OnNext(window);
    
                                            // Ensure pendingDrain is read again after making a call to OnNext, in case of re-entry.
                                            loop = pendingDrain;
                                        }
                                    }
    
                                    if (completeNow)
                                    {
                                        observer.OnCompleted();
                                    }
                                    else if (loop)
                                    {
                                        self();
                                    }
                                });
                        };
    
                        schedules.Add(
                            scheduler.Schedule(() =>
                            {
                                observer.OnNext(window);
    
                                sourceSubscription.Disposable = source.Subscribe(
                                    value =>
                                    {
                                        lock (gate)
                                        {
                                            queue.Enqueue(value);
    
                                            ensureDraining();
                                        }
                                    },
                                    ex => schedules.Add(scheduler.Schedule(() => observer.OnError(ex))),
                                    () =>
                                    {
                                        bool completeNow = false;
    
                                        lock (gate)
                                        {
                                            sourceCompleted = true;
                                            completeNow = !pendingDrain;
                                        }
    
                                        if (completeNow)
                                        {
                                            schedules.Add(scheduler.Schedule(() =>
                                            {
                                                window.OnCompleted();
    
                                                observer.OnCompleted();
                                            }));
                                        }
                                    });
                            }));
    
                        return new CompositeDisposable(sourceSubscription, drainSchedule, schedules);
                    });
    
                return observable;
            }
        }
    }


    James Miles http://enumeratethis.com

  • 09 Maret 2012 19:28
     
     

    I can see the tests don't "line up" with WindowIntrospective's behavior (which I think is correct).


    James Miles http://enumeratethis.com

  • 09 Maret 2012 19:32
     
     

    Hi James,

    I'm running your tests now, but my first impression is that there are two problems:

    1. There's no concurrency in your tests.  Introspection is meant to buffer notifications while the previous buffer is being observed, but without concurrency each notification is just going to get its own window.
    2. The operator schedules the first window instead of pushing it through on the subscription thread.  This is conflicting with your tests, but it may not be the tests' fault.  I'm considering whether this behavior should be removed from the operator.  What are your thoughts?

    - Dave


    http://davesexton.com/blog

  • 09 Maret 2012 19:44
     
     

    Hi James,

    After removing the scheduling of the first window your tests behave correctly (though they still fail).  It pushes values before allowing any scheduled actions to execute (i.e., OnNext is called before Start), so the result is a single window that contains all of the notifications.  I believe this is the correct behavior, though perhaps not test worthy ;)

    I'm not sure that I want to make this change to the real codebase though.  The reason that I chose to schedule the first window is because that's the semantics of the operator.  It schedules observations.  The first window is no different.  Do you agree?

    - Dave


    http://davesexton.com/blog

    • Diedit oleh Dave Sexton 09 Maret 2012 19:47 Clarifiaction
    •