none
Batching up values from multiple IObservables and then doing one Scheduler dispatch RRS feed

  • Question

  • Hi,

    We have a GUI program which would like to show the ticking prices of N different tickers. Our pricing library currently lets callers specify the ticker they would like to watch. The library returns an IObservable<Price> per ticker. Currently, we take these price IObservables, add an ObserveOn(WPFDispatcher), and write our GUI update logic inside the OnNext handler for this result.

    This works fine for a few tickers when the tick rate is low. However, if we add many tickers and boost the tick rate, the list of pending GUI updates grows large. Our application does not need to show EVERY tick to the user. We just need to show the latest tick before too much of a delay. The solution we have been playing around with is to queue up notifications as they arrive and then do one (scheduler) dispatch after a short time window has passed. This approach is also flexible enough that multiple IObservables can share the same queue.

    Here is an example of what I mean

    (The queue implementation has been left out for now. I first wanted to see what you guys think of the overall idea)

      public class BatchedKeyValue<TKey, TValue> : IObservable<IEnumerable<KeyValuePair<TKey, TValue>>> {
       private readonly TimeSpan bufferWindow;
       private readonly IScheduler scheduler;
    
       public BatchedKeyValue(TimeSpan bufferWindow, IScheduler scheduler) {
        this.bufferWindow=bufferWindow;
        this.scheduler=scheduler;
       }
    
       public IDisposable Add(TKey key, IObservable<TValue> values) {
        // wire up to this IObservable with an event handler which will queue up events
        // and set up a timer callback
        return Disposable.Empty;
       }
    
       public IDisposable Subscribe(IObserver<IEnumerable<KeyValuePair<TKey, TValue>>> observer) {
        return Disposable.Empty;
       }
      }

     

    The caller should allocate one batcher and share it with multiple feeder IObservables

        var batchedKeyValueItems=new BatchedKeyValue<int, int>(TimeSpan.FromSeconds(2), Scheduler.Dispatcher);
        batchedKeyValueItems.Add("TICKER1", GetPrices("TICKER1"));
        batchedKeyValueItems.Add("TICKER2", GetPrices("TICKER2"));
        batchedKeyValueItems.Add("TICKER3", GetPrices("TICKER3"));
        batchedKeyValueItems.Subscribe(items => {
         // update GUI
        });
    

     

    Other notes

    - Originally, I planned to make this batched thing implement ISubject<TValue> and then subscribe it to the individual value IObservables (let's call these component IObservables). There are a few problems with this approach. For one thing, it is not clear how the "key" for each component would be specified during this subscription process. I need to deliver keys and values on the batched stream, or else I would have no way of figuring out which GUI element takes each price.

    - There seems to be a lot of behavior implemented by this one data structure. It will do delayed scheduling, batching of keys and values, and maybe even coalescing (if we see multiple values for the same key before we get a chance to do a dispatch, then maybe the IEnumerable<KV<TKey,TValue>> should only contain one entry, mapping the key to the "last seen" value). I'm not sure whether there is a reasonable way to decompose this structure into a few separable pieces.

    - I noticed that this batched structure has many things in common with the "responsive, yet throttled" implementation discussed at http://social.msdn.microsoft.com/Forums/en/rx/thread/c1cd78b6-8b76-451f-9281-584bab3f4fec. I am interested in using this pre-existing, single IObservable throttler in my implementation, but I don't think it can be used. For one, my throttling is shared across all component IObservables. I don't want to throttle each component. I want each component's OnNext handler (which my batcher will register to internally) to grab a lock, add its value to a shared structure, start the throttle if it has not already been started, release the lock, and return. I also don't think I can somehow apply the pre-existing throttle implementation "at the end" when my batched values are delivered. The point of the throttle is to wait long enough for multiple values to be batched up. This means that we would need not just a throttle at the end, but also some batching capability before we add our ObserveOn().

    - I'm not completely sure what should happen if the caller tried to invoke Add() multiple times with the same key. Maybe this thing should behave like a dictionary and raise a DuplicateKeyException. Another possibility is that the batcher could keep its subscription to the old values stream and also add a new one. The values would be interleaved (or coalesced, in which case, the last writer before batched dispatch wins).

    - Finally, it is not clear whether this batcher should immediately subscribe to the underlying component IObservables or whether the caller should be forced to add the IObservables with Add() and then invoke some other method, like Connect() before the underlying subscriptions are made and the batched events start streaming. This would be similar to the pattern for IConnectableObservable.

    What do you think?

     

    Tuesday, November 9, 2010 4:25 PM

Answers

  • Added queue to serialize out going events.

     

      public class SchedulerQueue : IDisposable
      {
        private readonly MutableDisposable disposable;
        private bool hasFaulted;
        private bool isAcquired;
        private readonly Queue<Action> queue;
        private readonly IScheduler scheduler;
    
        public SchedulerQueue(IScheduler scheduler)
        {
          queue = new Queue<Action>();
          disposable = new MutableDisposable();
          this.scheduler = scheduler;
        }
    
        public void Dispose()
        {
          disposable.Dispose();
        }
    
        public void EnsureActive()
        {
          bool flag = false;
          lock (queue)
          {
            if (!hasFaulted && (queue.Count > 0))
            {
              flag = !isAcquired;
              isAcquired = true;
            }
          }
          if (flag)
          {
            disposable.Disposable = scheduler.Schedule(self =>
            {
              Action action;
              lock (queue)
              {
                if (queue.Count > 0)
                {
                  action = queue.Dequeue();
                }
                else
                {
                  isAcquired = false;
                  return;
                }
              }
              try
              {
                action();
              }
              catch (Exception exception)
              {
                lock (queue)
                {
                  queue.Clear();
                  hasFaulted = true;
                }
                throw exception.PrepareForRethrow();
              }
              self();
            });
          }
        }
    
        public void Enqueue(Action action)
        {
          lock (queue)
          {
            queue.Enqueue(action);
          }
        }
      }
    
      public static class Ex
      {
        public static IObservable<TResult> BufferOn<TSource, TResult>(
          this IObservable<TSource> source, Func<IObservable<TSource>,
          IObservable<TResult>> selector,
          IScheduler scheduler)
        {
          return Observable.CreateWithDisposable<TResult>(observer =>
          {
            var active = false;
            var gate = new object();
            var mutable = new MutableDisposable();
            var current = default(Subject<TSource>);
            var queue = new SchedulerQueue(scheduler);
    
            Action<IObservable<TSource>> queryWindow = window =>
            {
              mutable.Disposable = selector(window).Subscribe(
                n =>
                {
                  queue.Enqueue(() => observer.OnNext(n));
                  queue.EnsureActive();
                },
                ex =>
                {
                  queue.Enqueue(() => observer.OnError(ex));
                  queue.EnsureActive();
                },
                () =>
                {
                  queue.Enqueue(() =>
                  {
                    lock (gate)
                    {
                      if (current != null)
                      {
                        current.OnCompleted();
                        current = null;
                      }
                      else
                      {
                        active = false;
                      }
                    }
                  });
                  queue.EnsureActive();
                });
            };
    
            var disposable = source.Subscribe(n =>
            {
              lock (gate)
              {
                if (!active)
                {
                  active = true;
                  queryWindow(Observable.Return(n));
                }
                else
                {
                  if (current == null)
                  {
                    current = new Subject<TSource>();
                    queryWindow(current);
                  }
                  current.OnNext(n);
                }
              }
            }, ex =>
            {
              lock (gate)
              {
                if (current != null)
                {
                  current.OnError(ex);
                }
                queue.Enqueue(() => observer.OnError(ex));
              }
              queue.EnsureActive();
            }, () =>
            {
              lock (gate)
              {
                if (current != null)
                {
                  current.OnCompleted();
                }
                queue.Enqueue(observer.OnCompleted);
              }
              queue.EnsureActive();
            });
            return new CompositeDisposable(disposable, mutable);
          });
        }
      }
    

    James Miles http://enumeratethis.com
    Thursday, January 27, 2011 4:12 PM

All replies

  • Hi,

    We had exactly the same problem at my last job. Some guys on my team solved the problem by introducing a variation of ObserveOn, ObserveLatestOn. This simply stores a single notification rather than a queue. From memory it looked something like this.

     

    public static IObservable<TSource> ObserveLatestOn<TSource>(
      this IObservable<TSource> source,
      IScheduler scheduler)
    {
      if (source == null)
      {
        throw new ArgumentNullException("source");
      }
      if (scheduler == null)
      {
        throw new ArgumentNullException("scheduler");
      }
      return Observable.CreateWithDisposable<TSource>(observer =>
      {
        var active = false;
        var gate = new object();
        var cancelable = new MutableDisposable();
        Notification<TSource> current;
    
        IDisposable disposable = source.Materialize().Subscribe(n =>
        {
          bool flag;
          lock (gate)
          {
            flag = !active;
            active = true;
            current = n;
          }
          if (flag)
          {
            cancelable.Disposable = scheduler.Schedule(self =>
            {
              Notification<TSource> notification;
              lock (gate)
              {
                notification = current;
                current = null;
              }
              notification.Accept(observer);
              bool flag2;
              lock (gate)
              {
                flag2 = active = current != null;
              }
              if (flag2)
              {
                self();
              }
            });
          }
        });
        return new CompositeDisposable(disposable, cancelable);
      });
    }
    

     

    If you need to key this, you could add a key selector, or simply combine with groupby;

    (Sample program)

      class Program
      {
        static IScheduler myDispatcher = new EventLoopScheduler("MyDispatcher");
    
        static void Main()
        {
          var source = new Subject<Update>();
    
          var query =
            from update in source
            group update by update.Symbol into grp
            select grp.ObserveLatestOn(myDispatcher);
    
          query.Merge()
            .Do(_ => Thread.Sleep(100)) // simulate a slow dispatcher
            .Subscribe(u => Console.WriteLine(u));
    
          // initial updates
          source.OnNext(new Update("EURUSD", 1));
          source.OnNext(new Update("GBPAUD", 1));
          Thread.Sleep(100);
    
          // burst of updates
          source.OnNext(new Update("GBPAUD", 2));
          source.OnNext(new Update("EURUSD", 2));
          source.OnNext(new Update("EURUSD", 3));
          source.OnNext(new Update("GBPAUD", 3));
          source.OnNext(new Update("EURUSD", 4));
          source.OnNext(new Update("GBPAUD", 4));
    
          Console.ReadLine();
        }
    
        struct Update
        {
          public readonly string Symbol;
          public readonly decimal Price;
    
          public Update(string symbol, decimal price)
          {
            Symbol = symbol; Price = price;
          }
    
          public override string ToString()
          {
            return Symbol + " : " + Price;
          }
        }
      }
    

     

    Another approach would be to use .Latest() & have a dispatcher timer updating your prices at controlled intervals.

    Let us know what you come up with.


    James Miles http://enumeratethis.com
    • Proposed as answer by James Miles Monday, December 13, 2010 9:54 AM
    • Unproposed as answer by ngm_msdn Monday, January 10, 2011 7:18 PM
    • Marked as answer by fixedpoint Wednesday, January 12, 2011 1:19 AM
    • Unmarked as answer by fixedpoint Wednesday, January 12, 2011 1:19 AM
    Wednesday, November 10, 2010 1:35 PM
  • My best solution so far is a "bundler" class, like the one below.


    What problem am I trying to solve?

    We have a high-frequency source and we want to "bundle up" items so that the (slower) consumers can receive bundles. The term "bundle" refers to some mix of buffering (the source stashes new items in some mutable structure rather than delivering them directly to the consumer) and folding/accumulation (the source decides that a new value can be merged with a previously stashed value). When the consumer finally processes the bundle, the bundle will contain only accumulated items.


    Why bundle?

    Bundling is one way to connect different parts of a system that work at different rates. Note that buffering (without accumulation) is insufficient. If a fast producer tries to buffer all items until a slow consumer is ready, then the buffers will grow without bound.

    For bundling to work:

    (1) We must have a quick way of checking whether a new notification can be folded with a previously stashed value. The check is done by the source IObservable, so any slowdown will delay the delivery of new source events. In the Bundler class, a new item can be folded if its key matches some key that we have previously seen.

    (2) We must have a quick way of merging new items with previously seen ones. In my program, new items are Key,Value pairs (actually Key1,Key2,Value, but you can think of Key1, and Key2 being components in a single logical key). We "fold" by replacing the old value assocated with that key with a new value.

    (3) Our high frequency source must have the property that many items CAN be folded. If all items in our stream have unique keys, then we are back in a buffering world, and our buffers will grow without bound.


    Concurrency

    In the implementation below, the source IObservable(s) is/are expected to call Add(key1, key2, value). I originally wrote this code assuming that source notifications are serialized, but later realized that the implementation did not rely on this property. This may be useful if you need to subscribe multiple high frequency IObservables to the same bundler. I haven't done any performance tests in a "parallel sources" scenario. It is very fast in my single source sandbox (200K to 500K adds per second when the keys are ints and the consumer takes 5s to process each bundle).

    When a bundle is "ready," it should be forwarded to a consumer. I am going to assume that this consumer is always running concurrently with the source thread. If the consumer did not run concurrently, then it is not clear why we need bundling in the first place. Folding *may* be useful in some single-threaded contexts as a form of deferred processing (making sure we need to do the work before actually doing it later), but this is not the problem I'm trying to solve. I'm going to assume that the processing of a bundle is slow enough that we DON'T WANT to do it on the source thread.

    Finally, I don't *think* that the scheduler below needs to be anything special. Bundle dispatches will be serialized because the isReady flag ensures that a new bundle won't be delivered before a previous one has been processed. You can use Scheduler.ThreadPool if you want.

    In the basic scenario, you allocate an ObservableStash object (wrapper around Bundler and Subject), subscribe observers to Values1, and call Start() when you are ready to accept events.


    Delays (coming soon)

    The implementation below will schedule a new bundle as soon as the consumer (ObservableStash, not observers on Values1) invokes SetReady(). I think for the strict scenario I described above (fast producer, slow consumer), this is fine. However, we have sometimes also encountered scenarios where we want to impose ADDITIONAL DELAYS on the dispatch of bundles. For example, if the consumer is some GUI which will update GUI controls, then we don't want the next bundle to be sent as soon as the previous update is done. Doing so will cause the GUI thread to be very busy processing our updates. The GUI will most likely appear frozen.

    I will post another solution which imposes additional delays. For now, I wanted to present something that is as simple as possible. If your scheduler is the thread pool, you can force delays by making the observer code sleep. You will not slow down the source by doing so. Don't use this approach if your scheduler is the GUI ;-)

    It is also worth noting that the slower the consumer, the more the source will be able to amortize the work that is required to dispatch a new bundle. This work is a bit expensive (allocation of a new keyToValue, Schedule call, etc). This is another good reason why you may want to impose some minimum delay in your consumer (using Thread.Sleep, whatever). It will keep the source thread from being slowed down by dispatches.


    Misc

    Finally, I've written Bundler with generics <TKey1, TKey2, TValue>, but I've noticed that the performance of Dictionary<K,V> will vary widely depending on the type of K. For example, our high frequency source produces items of the form {key,object} where key has two int components. We've found that Dictionary<int,V> is MUCH faster than Dictionary<struct{int,int},V>. This is the main motivation for the nested dictionary structure above. We use ints for TKey1 and TKey2.

      public class Bundler<TKey1, TKey2, TValue> {
       private SpinLock spinLock=new SpinLock();
       private readonly Action<IDictionary<TKey1, Dictionary<TKey2, TValue>>> notify;
       private readonly IScheduler scheduler;
       private bool isReady=false;
    
       public Bundler(Action<IDictionary<TKey1, Dictionary<TKey2, TValue>>> notify, IScheduler scheduler) {
        this.notify=notify;
        this.scheduler=scheduler;
       }
    
       private Dictionary<TKey1, Dictionary<TKey2, TValue>>
        key1ToKey2ToValue=new Dictionary<TKey1, Dictionary<TKey2, TValue>>();
    
       public void Add(TKey1 key1, TKey2 key2, TValue value) {
        Dictionary<TKey1, Dictionary<TKey2, TValue>> nextItem;
        var lockTaken=false;
        try {
         spinLock.Enter(ref lockTaken);
         UnsafeAdd(key1, key2, value);
         if(!isReady) {
          return;
         }
         nextItem=key1ToKey2ToValue;
         key1ToKey2ToValue=new Dictionary<TKey1, Dictionary<TKey2, TValue>>();
         isReady=false;
        } finally {
         if(lockTaken) {
          spinLock.Exit();
         }
        }
    
        scheduler.Schedule(() => notify(nextItem));
       }
    
       private void UnsafeAdd(TKey1 key1, TKey2 key2, TValue value) {
        Dictionary<TKey2, TValue> key2ToValue;
        if(!key1ToKey2ToValue.TryGetValue(key1, out key2ToValue)) {
         key2ToValue=new Dictionary<TKey2, TValue>();
         key1ToKey2ToValue[key1]=key2ToValue;
        }
        key2ToValue[key2]=value;
       }
    
       public void SetReady() {
        var lockTaken=false;
        try {
         spinLock.Enter(ref lockTaken);
         isReady=true;
        } finally {
         if(lockTaken) {
          spinLock.Exit();
         }
        }
       }
      }
    
    
      public class ObservableStash {
       private readonly IScheduler scheduler;
       private readonly Subject<IDictionary<int, Dictionary<int, object>>> values1;
       private readonly Bundler<int, int, object> bundler1;
    
       public ObservableStash(IScheduler scheduler) {
        this.scheduler=scheduler;
        this.values1=new Subject<IDictionary<int, Dictionary<int, object>>>();
        this.bundler1=new Bundler<int, int, object>(k1k2v => {
         values1.OnNext(k1k2v);
         bundler1.SetReady();
        }, scheduler);
       }
    
       public IScheduler Scheduler {
        get { return scheduler; }
       }
    
       public IObservable<IDictionary<int, Dictionary<int, object>>> Values1 {
        get { return values1; }
       }
    
       public void Start() {
        bundler1.SetReady();
       }
    
       public void OnNext(int subjectId, int fieldId, object value) {
        bundler1.Add(subjectId, fieldId, value);
       }
      }
    
    

    Wednesday, January 12, 2011 5:13 PM
  • Have you considered just buffering the data ?

    source.BufferWithTime(TimeSpan.FromSeconds(0.1))

    This way your UI would be restricted to 10 updates a second, and you will get 100ms chunks of events.

    That would seem like the "simplest" approach.


    James Miles http://enumeratethis.com
    Wednesday, January 12, 2011 5:28 PM
  •  

    Ahh, BufferWithTime... I can't remember the last time we did battle, but I'm pretty sure I lost.

     

    Older (List-based) BufferWithTime:

    1. BufferWithTime only buffers. It does not fold. Pure buffers are extremely useful for "bursty" source scenarios. That is, when the producer may produce items in quick succession, but the average production rate is still lower than the average consumption rate. However, in my scenario, the source is FASTER (on average or otherwise) than the consumer. The consumer cannot keep up if it tries to process every item, even if these items are grouped into windows. It can only "keep up" because some items can be thrown away (or more generally, folded with previously stashed items).

    The bundler folds items by replacing values with the same key.

    2. BufferWithTime continues to generate "empty windows" during intervals when no items arrive. I never found a good way to make it stop doing this. Adding a Where() operator does not stop a dispatch to the GUI, which runs the filter. If we try to filter on some intermediate scheduler (such as Scheduler.ThreadPool), then we risk introducing the unbounded buffer problem (I can explain why in more detail if anyone cares).

     

    Newer (IObservable-based) BufferWithTime:

    I think the new BufferWithTime implementation is even less suitable for what you suggest (although it may be possible to add some ToList() calls or something to make it do what the old implementation did). It will happily construct a new IObservable when a time interval passes, but it forwards all events as soon as they arrive (onto the GUI or whatever).

    I *could* subscribe to this stream and send out a new bundle whenever a new window IObservable is created (I *think* this means that the previous IObservable will no longer produce values), but it is not clear what this buys me. I still need to do all the folding myself. Also, jumping through additional combinator code, such as GroupJoin, just means that my source needs to do more work per event.

     

    Wednesday, January 12, 2011 7:55 PM
  • Ok, I think perhaps GroupBy + Sample might be the way.

    void Main()
    {
    	var source = new Subject<Foo>();
    	
    	var query =
    		from foo in source
    		group foo by foo.Symbol into grp
    		from foo in grp.Sample(TimeSpan.FromSeconds(0.1))
    		select new {foo.Symbol, foo.Price};
    		
    	query.Subscribe(Console.WriteLine);
    	
    	source.OnNext(new Foo{Symbol = "EURUSD", Price = 10});
    	source.OnNext(new Foo{Symbol = "EURUSD", Price = 11});
    	source.OnNext(new Foo{Symbol = "GBPUSD", Price = 5});
    	source.OnNext(new Foo{Symbol = "GBPUSD", Price = 6});
    	Thread.Sleep(200);
    	source.OnNext(new Foo{Symbol = "EURUSD", Price = 12});
    	source.OnNext(new Foo{Symbol = "EURUSD", Price = 13});
    	source.OnNext(new Foo{Symbol = "GBPUSD", Price = 7});
    	source.OnNext(new Foo{Symbol = "GBPUSD", Price = 8});
    	
    	Console.ReadLine();
    }
    
    public class Foo
    {
    	public string Symbol { get; set; }
    	public int Price { get; set; }
    }
    
    What do you reckon?
    James Miles http://enumeratethis.com
    Wednesday, January 12, 2011 11:06 PM
  • This way you are only storing the latest event for a unique symbol, and you are limiting it to 10 per second (per symbol)?


    James Miles http://enumeratethis.com
    Wednesday, January 12, 2011 11:08 PM
  • James,

    Thanks for your groupby + sample idea.

    Good:

    Drops intermediate events which have the same key

    Bad:

    Does not batch across symbols. If we have 100 keys, this translates to 100 IGroupedObservables. Why should we sample each one separately and dispatch 100 separate sample events? If our consumer is a GUI, we don't want 100 GUI dispatches. We only want one.

    Also, I'm not sure how heavy a timer is from a resource point of view, but I think it's probably a bad idea to create per-key timers. There is some freakish sharing happening behind the scenes, so as the number of timers in your library increase, you will take away "quick notification" resources from other timers in the application.

    Thursday, January 13, 2011 4:33 AM
  • Here is a new Bundler implementation which supports additional delays. I could not decide whether callers would want to specify a min delay between successive Schedule() calls or between the COMPLETION of a previously scheduled notify delegate and the next Schedule() call. In the end, I decided to implement both. The caller selects which mode he wants with the DelayKind enum. The minDelay parameter is ignored when DelayKind is None.

    I moved my dictionary swap code to the scheduled method. I don't think this will make any significant differences in performance. The consumer allocates the replacement dictionary now, not the producer. The max producer rate does not seem to be affected.

    SetReady() is now called within Bundler. The container class should not need to call it for us.

    Finally, I try to distinguish between the need to call Schedule(action) and Schedule(action, TimeSpan.Zero). I've created another forum thread about why these two calls may behave differently, depending on what scheduler is being used. Things get really weird when dueTime<0.

      public enum DelayKind {
       None,
       SinceSchedule,
       SinceReady,
      }
    
    
      public class Bundler<TKey1, TKey2, TValue> {
       private SpinLock spinLock=new SpinLock();
       private readonly TimeSpan minDelay;
       private readonly DelayKind delayKind;
       private readonly Action<IDictionary<TKey1, Dictionary<TKey2, TValue>>> notify;
       private readonly IScheduler scheduler;
       private bool isReady=false;
       private DateTime scheduleTimestamp=DateTime.MinValue;
       private DateTime readyTimestamp=DateTime.MinValue;
    
       public Bundler(
        TimeSpan minDelay, DelayKind delayKind,
        Action<IDictionary<TKey1, Dictionary<TKey2, TValue>>> notify, IScheduler scheduler) {
        this.minDelay=minDelay;
        this.delayKind=delayKind;
        this.notify=notify;
        this.scheduler=scheduler;
       }
    
       private Dictionary<TKey1, Dictionary<TKey2, TValue>>
        key1ToKey2ToValue=new Dictionary<TKey1, Dictionary<TKey2, TValue>>();
    
       public void Add(TKey1 key1, TKey2 key2, TValue value) {
        var delay=new TimeSpan?();
        var lockTaken=false;
        try {
         spinLock.Enter(ref lockTaken);
         UnsafeAdd(key1, key2, value);
         if(!isReady) {
          return;
         }
         isReady=false;
         if(delayKind!=DelayKind.None) {
          var now=DateTime.Now;
          var timestamp=(delayKind==DelayKind.SinceSchedule) ? scheduleTimestamp : readyTimestamp;
          var since=now-timestamp;
          if(since>=minDelay) {
           scheduleTimestamp=now;
          } else {
           var delay1=minDelay-since;
           scheduleTimestamp=now+delay1;
           delay=delay1;
          }
         }
        } finally {
         if(lockTaken) {
          spinLock.Exit();
         }
        }
        if(delay.HasValue) {
         scheduler.Schedule(OnSchedule, delay.Value);
        } else {
         scheduler.Schedule(OnSchedule);
        }
       }
    
       private void OnSchedule() {
        var nextItems=new Dictionary<TKey1, Dictionary<TKey2, TValue>>();
        var lockTaken=false;
        try {
         spinLock.Enter(ref lockTaken);
         var temp=key1ToKey2ToValue;
         key1ToKey2ToValue=nextItems;
         nextItems=temp;
        } finally {
         if(lockTaken) {
          spinLock.Exit();
         }
        }
        notify(nextItems);
        SetReady();
       }
    
       private void UnsafeAdd(TKey1 key1, TKey2 key2, TValue value) {
        Dictionary<TKey2, TValue> key2ToValue;
        if(!key1ToKey2ToValue.TryGetValue(key1, out key2ToValue)) {
         key2ToValue=new Dictionary<TKey2, TValue>();
         key1ToKey2ToValue[key1]=key2ToValue;
        }
        key2ToValue[key2]=value;
       }
    
       public void SetReady() {
        var now=DateTime.Now;
        var lockTaken=false;
        try {
         spinLock.Enter(ref lockTaken);
         readyTimestamp=now;
         isReady=true;
        } finally {
         if(lockTaken) {
          spinLock.Exit();
         }
        }
       }
      }
    
    
      public class ObservableStash {
       private readonly IScheduler scheduler;
       private readonly Subject<IDictionary<int, Dictionary<int, object>>> values1;
       private readonly Bundler<int, int, object> bundler1;
    
       public ObservableStash(TimeSpan minDelay, DelayKind delayKind, IScheduler scheduler) {
        this.scheduler=scheduler;
        this.values1=new Subject<IDictionary<int, Dictionary<int, object>>>();
        this.bundler1=new Bundler<int, int, object>(minDelay, delayKind, k1k2v => values1.OnNext(k1k2v), scheduler);
       }
    ...
    
    

    Thursday, January 13, 2011 5:10 AM
  • Hello again,

    OK, in that case, I think the new BufferWithTime. The new buffer with time is actually more powerful, as you can control how the data is buffered. In your case we can buffer into a dictionary, which is exactly what you want.

    Let me know what you think.

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Threading;
    
    namespace ConsoleApplication78
    {
     class Program
     {
      static void Main()
      {
       var source = new Subject<Foo>();
    
       var query =
        from buffer in source.BufferWithTime(TimeSpan.FromSeconds(0.1))
        from dictionary in buffer.Aggregate(new Dictionary<string, Foo>(), (dic, value) =>
        {
         dic[value.Symbol] = value;
         return dic;
        })
        where dictionary.Count > 0
        select dictionary;
    
    
       query.Subscribe(dic =>
       {
        foreach (var kvp in dic) Console.WriteLine("{0} : {1}", kvp.Value.Symbol, kvp.Value.Price);
        Console.WriteLine();
       });
    
       source.OnNext(new Foo { Symbol = "EURUSD", Price = 10 });
       source.OnNext(new Foo { Symbol = "EURUSD", Price = 11 });
       source.OnNext(new Foo { Symbol = "GBPUSD", Price = 5 });
       source.OnNext(new Foo { Symbol = "GBPUSD", Price = 6 });
       Thread.Sleep(200);
       source.OnNext(new Foo { Symbol = "EURUSD", Price = 12 });
       source.OnNext(new Foo { Symbol = "EURUSD", Price = 13 });
       source.OnNext(new Foo { Symbol = "GBPUSD", Price = 7 });
       source.OnNext(new Foo { Symbol = "GBPUSD", Price = 8 });
    
       Console.ReadLine();
      }
     }
    
     public class Foo
     {
      public string Symbol { get; set; }
      public int Price { get; set; }
     }
    }
    

    *Update* looks like there is a memory leak here, just looking into it.


    James Miles http://enumeratethis.com

    • Marked as answer by fixedpoint Friday, January 14, 2011 1:13 AM
    • Unmarked as answer by ngm_msdn Wednesday, January 19, 2011 9:51 PM
    Thursday, January 13, 2011 1:00 PM
  • Seems to be an issue with BufferWithTime / GroupJoin.

    For now we could use Window, this overload doesn't suffer from the problem.

    using System;
    using System.Collections.Generic;
    using System.Linq;
    
    namespace ConsoleApplication78
    {
      class Program
      {
        static void Main()
        {
          var source = new Subject<Foo>();
    
          var query =
            from buffer in source.Window(() => Observable.Timer(TimeSpan.FromSeconds(0.1)))
            from dictionary in buffer.Aggregate(new Dictionary<string, Foo>(), (dic, value) =>
            {
              dic[value.Symbol] = value;
              return dic;
            })
            where dictionary.Count > 0
            select dictionary;
    
    
          query.Subscribe(dic =>
          {
            
            foreach (var kvp in dic) Console.WriteLine("{0} : {1}", kvp.Value.Symbol, kvp.Value.Price);
            Console.WriteLine();
          });
    
          for (int i = 0; i < 10000; i++ )
          {
            source.OnNext(new Foo { Symbol = "EURUSD", Price = 10 });
            source.OnNext(new Foo { Symbol = "EURUSD", Price = 11 });
            source.OnNext(new Foo { Symbol = "GBPUSD", Price = 5 });
            source.OnNext(new Foo { Symbol = "GBPUSD", Price = 6 });
            source.OnNext(new Foo { Symbol = "EURUSD", Price = 12 });
            source.OnNext(new Foo { Symbol = "EURUSD", Price = 13 });
            source.OnNext(new Foo { Symbol = "GBPUSD", Price = 7 });
            source.OnNext(new Foo { Symbol = "GBPUSD", Price = 8 });
          }
    
          Console.ReadLine();
        }
      }
    
      public class Foo
      {
        public string Symbol { get; set; }
        public int Price { get; set; }
      }
    }
    
    

    James Miles http://enumeratethis.com
    Thursday, January 13, 2011 2:08 PM
  • This should work.  We have a fix for the memory leak and it will be in the next release.
    Friday, January 14, 2011 1:13 AM
  • Thank you Wes for marking an answer to MY question ;-)

    James,

    Thank you for this BufferWithTime/Window + Aggregate idea.


    Good:

    1. Drops intermediate events which have the same keys

    2. Batches across keys


    Bad:


    1. I don't think I can dispatch the latest batch to another scheduler without either introducing the unbounded buffers problem OR by grabbing that context every 0.1s to close the pending window.

    Dispatch attempt 1

    Add an ObserveOn() operator to switch to the specified scheduler. This risks the unbounded buffer problem if our consumer slows down more than the Timer() delay. The "unbounded buffer" shows up as an unbounded queue of Dictionary<TKey, TValue> which are waiting to be processed.

    Dispatch attempt 2

    Use the consumer IScheduler as the second parameter to Observable.Timer(). I *think* this may avoid the unbounded buffer problem, but it will run every 0.1s, pull the latest Dictionary<TKey,TValue>, and run the Count>0 filter. The filter will drop the dictionary if it is empty, but the scheduler must still run this code whether or not items have arrived.

    The bundler approach avoids these problems by using the isReady flag. The consumer signals the producer that it is ready to accept the next bundle. It is not dispatched (and not IScheduler.Schedule call is made) until this happens.


    2. The max supported producer rate appears to be much lower than the bundler. When I measured both solutions with int keys, the bundler supports 500K-800K add operations per second. This Window+Aggregate approach supports 80K-150K.

    Any solution to this fast producer problem will have some "max supported producer rate," and the solution will only be useful for producers that fall within those bounds. The bundler has a higher max production rate.

    [I'm not advocating that anyone use the bundler just because it measures faster in my sandbox tests. Normally, I would not care about a 5x performance difference in some non-core part of my program, and the simplicity of the Window+Aggregate approach would be a sufficient reason to use it. However, if you measure the Window+Aggregate approach and find that it is too slow, you may want to try something like the bundler solution above.]


    BTW, I found that the default Equals() and GetHashCode() implementations for structs are extremely slow. This was the main reason why int-key dictionaries are MUCH FASTER than struct-key dictionaries (where the struct has two int components and does not override these methods).

    I think if you use a struct for a key, you should make sure that it overrides Equals(object) and GetHashCode(). While you're at it, make sure it implements IEquatable<T> so that EqualityComparer<T> can work its RuntimeType magic.

    Friday, January 14, 2011 7:01 PM
  • Sorry, I'll be more careful about using "propose an answer" first.  I have been going through the forums as fast as possible to catch up with unanswered questions.
    Friday, January 14, 2011 7:13 PM
  • OK, what about something like this;

    (untested)

    public static IObservable<TBuffer> BufferOn<TSource, TBuffer>(
      this IObservable<TSource> source,
      Func<TBuffer> factory,
      Func<TBuffer, TSource, TBuffer> acumulator,
      IScheduler scheduler)
    {
      if (source == null)
      {
        throw new ArgumentNullException("source");
      }
      if (scheduler == null)
      {
        throw new ArgumentNullException("scheduler");
      }
      return Observable.CreateWithDisposable<TBuffer>(observer =>
      {
        var active = false;
        var gate = new object();
        var cancelable = new MutableDisposable();
        var buffering = false;
        TBuffer current = default(TBuffer);
    
        IDisposable disposable = source.Subscribe(n =>
        {
          var flag = false;
          lock (gate)
          {
            if (!active)
            {
              active = true;
              flag = true;
            }
            if (!buffering)
            {
              current = factory();
              buffering = true;
            }
            current = acumulator(current, n);
          }
          if (flag)
          {
            scheduler.Schedule(self =>
            {
              TBuffer notification;
              lock (gate)
              {
                notification = current;
                buffering = false;
                current = default(TBuffer);
              }
              observer.OnNext(notification);
              bool flag2;
              lock (gate)
              {
                flag2 = active = buffering;
              }
              if (flag2)
              {
                self();
              }
    
            });
          }
        }, observer.OnError, observer.OnCompleted);
        return new CompositeDisposable(disposable, cancelable);
      });
    }
    

     

    This would give us a general purpose buffering solution that is based on how quickly the consumer can process the notifications. It also gives you control of how many items you want to buffer and the data structure you use, so if a dictionary is not performant enough, you can use something else.

    Usage;

    // buffer in a list
    subject.BufferOn(() => new List<int>(), (buffer, item) =>
    {
      buffer.Add(item);
      return buffer;
    }, dispatcher)
    
    // buffer in a dictionary
    subject.BufferOn(() => new Dictionary<int, Foo>(), (dic, item) =>
    {
      dic[item.Id] = item;
      return dic;
    }, scheduler)
    
    // just buffer the latest notification
    subject.BufferOn(() => 0, (buffer, item) => item, scheduler)

    James Miles http://enumeratethis.com
    Saturday, January 15, 2011 3:37 PM
  • Looking at this again. I think this is better;

    public static IObservable<TResult> Buffer<TSource, TResult>(this IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TResult>> selector, IScheduler scheduler)

    where the selector takes an observable of "windows", the window ends when the previous notification has been dispatched.


    James Miles http://enumeratethis.com

    Tuesday, January 18, 2011 7:55 PM
  • Is there more to this?
    Tuesday, January 18, 2011 8:23 PM
  • Not yet, usage would be;

    // Just interested in latest value
    source.Buffer(window => window.TakeLast(1))

    // Put into list
    source.Buffer(window => window.Aggregate(new List<>(), (list, value) => {list.Add(value); return list;})

    // Dictionary
    source.Buffer(window => window.Aggregate(new Dictionary<,>(), (dic, value) => {dic[value.Key] = value; return dic;})

    or any thing else you can think of...


    James Miles http://enumeratethis.com
    Tuesday, January 18, 2011 8:53 PM
  • FYI - had some time to prototype this arvo. Seems to be working OK. Will pick it back up tomorrow.

    public static IObservable<TResult> BufferOn<TSource, TResult>(
      this IObservable<TSource> source, Func<IObservable<TSource>,
      IObservable<TResult>> selector,
      IScheduler scheduler)
    {
      return Observable.CreateWithDisposable<TResult>(observer =>
      {
        var active = false;
        var gate = new object();
        var mutable = new MutableDisposable();
        var current = default(Subject<TSource>);
    
        Action<IObservable<TSource>> queryWindow = window =>
        {
          mutable.Disposable = selector(window).Subscribe(
            n => scheduler.Schedule(() => observer.OnNext(n)),
            ex => scheduler.Schedule(() => observer.OnError(ex)),
            () => scheduler.Schedule(() =>
          {
            lock (gate)
            {
              if (current != null)
              {
                current.OnCompleted();
                current = null;
              }
              else
              {
                active = false;
              }
            }
          }));
        };
    
        var disposable = source.Subscribe(n =>
        {
          lock (gate)
          {
            if (!active)
            {
              active = true;
              queryWindow(Observable.Return(n));
            }
            else
            {
              if (current == null)
              {
                current = new Subject<TSource>();
                queryWindow(current);
              }
              current.OnNext(n);
            }
          }
        }, ex =>
        {
          lock (gate)
          {
            if (current != null)
            {
              current.OnError(ex);
            }
            scheduler.Schedule(() => observer.OnError(ex));
          }
        }, () =>
        {
          lock (gate)
          {
            if (current != null)
            {
              current.OnCompleted();
            }
            scheduler.Schedule(observer.OnCompleted);
          }
        });
        return new CompositeDisposable(disposable, mutable);
      });
    }
    

    James Miles http://enumeratethis.com
    Wednesday, January 26, 2011 5:41 PM
  • Added queue to serialize out going events.

     

      public class SchedulerQueue : IDisposable
      {
        private readonly MutableDisposable disposable;
        private bool hasFaulted;
        private bool isAcquired;
        private readonly Queue<Action> queue;
        private readonly IScheduler scheduler;
    
        public SchedulerQueue(IScheduler scheduler)
        {
          queue = new Queue<Action>();
          disposable = new MutableDisposable();
          this.scheduler = scheduler;
        }
    
        public void Dispose()
        {
          disposable.Dispose();
        }
    
        public void EnsureActive()
        {
          bool flag = false;
          lock (queue)
          {
            if (!hasFaulted && (queue.Count > 0))
            {
              flag = !isAcquired;
              isAcquired = true;
            }
          }
          if (flag)
          {
            disposable.Disposable = scheduler.Schedule(self =>
            {
              Action action;
              lock (queue)
              {
                if (queue.Count > 0)
                {
                  action = queue.Dequeue();
                }
                else
                {
                  isAcquired = false;
                  return;
                }
              }
              try
              {
                action();
              }
              catch (Exception exception)
              {
                lock (queue)
                {
                  queue.Clear();
                  hasFaulted = true;
                }
                throw exception.PrepareForRethrow();
              }
              self();
            });
          }
        }
    
        public void Enqueue(Action action)
        {
          lock (queue)
          {
            queue.Enqueue(action);
          }
        }
      }
    
      public static class Ex
      {
        public static IObservable<TResult> BufferOn<TSource, TResult>(
          this IObservable<TSource> source, Func<IObservable<TSource>,
          IObservable<TResult>> selector,
          IScheduler scheduler)
        {
          return Observable.CreateWithDisposable<TResult>(observer =>
          {
            var active = false;
            var gate = new object();
            var mutable = new MutableDisposable();
            var current = default(Subject<TSource>);
            var queue = new SchedulerQueue(scheduler);
    
            Action<IObservable<TSource>> queryWindow = window =>
            {
              mutable.Disposable = selector(window).Subscribe(
                n =>
                {
                  queue.Enqueue(() => observer.OnNext(n));
                  queue.EnsureActive();
                },
                ex =>
                {
                  queue.Enqueue(() => observer.OnError(ex));
                  queue.EnsureActive();
                },
                () =>
                {
                  queue.Enqueue(() =>
                  {
                    lock (gate)
                    {
                      if (current != null)
                      {
                        current.OnCompleted();
                        current = null;
                      }
                      else
                      {
                        active = false;
                      }
                    }
                  });
                  queue.EnsureActive();
                });
            };
    
            var disposable = source.Subscribe(n =>
            {
              lock (gate)
              {
                if (!active)
                {
                  active = true;
                  queryWindow(Observable.Return(n));
                }
                else
                {
                  if (current == null)
                  {
                    current = new Subject<TSource>();
                    queryWindow(current);
                  }
                  current.OnNext(n);
                }
              }
            }, ex =>
            {
              lock (gate)
              {
                if (current != null)
                {
                  current.OnError(ex);
                }
                queue.Enqueue(() => observer.OnError(ex));
              }
              queue.EnsureActive();
            }, () =>
            {
              lock (gate)
              {
                if (current != null)
                {
                  current.OnCompleted();
                }
                queue.Enqueue(observer.OnCompleted);
              }
              queue.EnsureActive();
            });
            return new CompositeDisposable(disposable, mutable);
          });
        }
      }
    

    James Miles http://enumeratethis.com
    Thursday, January 27, 2011 4:12 PM