none
Observing a queue using Rx

    Question

  • Hi

    I try to do the following using Rx.
    I have a queue where I add some elements.
    I want to lauch a "big process" using the queue elements when the number of elements have reached a certain number, or after a certain amount of time after the first element has been added. The queue is reset between each "big process" launch

    For the moment, this is what I wrote :

     public class ObservableQueue : Subject<int>
      {
        private ConcurrentQueue<int> list = new ConcurrentQueue<int>();
    
        public void Add(int i)
        {
          Console.WriteLine(" Add " + i + " to queue.");
          if (list.Count == 0)
          {
            Console.WriteLine("First call");
            this.Buffer(TimeSpan.FromSeconds(3)).Subscribe(
              enumerable =>
              {
                Console.WriteLine("Delay expired : Sum is " + Sum(list));
                list = new ConcurrentQueue<int>();
                this.OnCompleted();
              });
    
            this.Buffer(5).Subscribe(
            enumerable =>
            {
              Console.WriteLine("Queue is full : Sum is " + Sum(list));
              list = new ConcurrentQueue<int>();
              this.OnCompleted();
            });
          }
    
          Console.WriteLine("Adding " + i);
          list.Enqueue(i);
          OnNext(i);
        }
      }
    I'm faced to various problems.
    For the second subscription, the one with Buffer(5), it is never called more than once, whearas the Buffer(Timestamp) is repeating after each "big process" (here the Sum()).
    I don't know to handle the "first one between both" event.
    I try the following, but it does not produce anything.
        public void Add(int i)
        {
          if (this.list.Count == 0)
          {
            Console.WriteLine("First call");
            IObservable<IEnumerable<int>> observable1 = this.Buffer(TimeSpan.FromSeconds(3));
            IObservable<IEnumerable<int>> observable2 = this.Buffer(5);
    
            observable1.Amb(observable2).Subscribe(
              enumerable =>
                {
                  Console.WriteLine("Sum is " + Sum(list));
                  list = new ConcurrentQueue<int>();
                  this.OnCompleted();
                });
          }
    
          Console.WriteLine("Adding " + i);
          this.list.Enqueue(i);
          OnNext(i);
        }
    Any help would be appreciated.
    Thanks.
    Thursday, February 18, 2010 7:54 PM

Answers

  • Hi Jufl0,

    maybe you could try something like this:

    class Program
    {
        static void Main(string[] args)
        {
            Subject<int> source = new Subject<int>();
            Observable.Amb(
                source.Buffer(5),
                source.Buffer(TimeSpan.FromSeconds(5))
            ).Take(1)
            .Repeat(Scheduler.Now)
            .Subscribe(e =>
            {
                Console.WriteLine("Received {0} elements, sum: {1}", e.Count(), e.Sum());
            });
    
            while (true)
            {
                int num;
                if (!int.TryParse(Console.ReadLine(), out num))
                    break;
                source.OnNext(num);
            }
            Console.ReadKey();
        }
    }


    If you execute the sample and enter numbers it will print out the sum either if 5 elements have been written or 5 seconds have elapsed whatever happens first.

    Andreas
    Thursday, February 18, 2010 8:54 PM

All replies

  • Hi Jufl0,

    maybe you could try something like this:

    class Program
    {
        static void Main(string[] args)
        {
            Subject<int> source = new Subject<int>();
            Observable.Amb(
                source.Buffer(5),
                source.Buffer(TimeSpan.FromSeconds(5))
            ).Take(1)
            .Repeat(Scheduler.Now)
            .Subscribe(e =>
            {
                Console.WriteLine("Received {0} elements, sum: {1}", e.Count(), e.Sum());
            });
    
            while (true)
            {
                int num;
                if (!int.TryParse(Console.ReadLine(), out num))
                    break;
                source.OnNext(num);
            }
            Console.ReadKey();
        }
    }


    If you execute the sample and enter numbers it will print out the sum either if 5 elements have been written or 5 seconds have elapsed whatever happens first.

    Andreas
    Thursday, February 18, 2010 8:54 PM
  • Hi,

    Here's something similar to Andreas' suggestion, but it uses a thread-safe producer/consumer instead (BlockingCollection<T>).  Rx is only used to consume (observe) asynchronously.  This makes it really easy to add to the collection without being forced to use Rx in other areas of the app.

    Maybe somebody could explain the weird behavior though (Rx team?):

    The onCompleted action (which is supposed to write, "Consumer ended") is never called even though GetConsumerEnumerable() ends when ints.CompleteAdding() is called.  It's not shown in the code below, but I've even tried calling queue.OnCompleted() explicitly when the producer is done, but still onCompleted is not being called.  I expected the code below to work as is.

    Any ideas?  Thanks.

    using System;
    using System.Collections.Concurrent;
    using System.Collections.Generic;
    using System.Linq;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace ReactiveProgramming.Labs
    {
      static class ProducerConsumerLab
      {
        static readonly BlockingCollection<int> ints = new BlockingCollection<int>();
    
        public static void Main()
        {
          using (var cancel = new CancellationTokenSource())
          {
            StartConsuming(cancel);
            StartProducing(cancel);
    
            Console.ReadKey();
            cancel.Cancel();
          }
    
          Console.WriteLine("Canceled");
          Console.ReadKey();
        }
    
        static void StartProducing(CancellationTokenSource cancel)
        {
          Task.Factory.StartNew(() =>
            {
              var rnd = new Random();
    
              while (!cancel.IsCancellationRequested)
              {
                int next = rnd.Next(0, 100);
    
                Console.WriteLine("Adding: {0}", next);
    
                ints.Add(next);
    
                Thread.Sleep(TimeSpan.FromSeconds(rnd.Next(1, 4)));
              }
            },
            cancel.Token)
    
          .ContinueWith(task =>
            {
              ints.CompleteAdding();
    
              Console.WriteLine("Producer ended.");
            });
        }
    
        static void StartConsuming(CancellationTokenSource cancel)
        {
          var queue = ints.GetConsumingEnumerable(cancel.Token)
                          .ToObservable()
                          .Publish();
          
          var whenThreshold = queue.Buffer(5);
          var whenExpired = queue.Buffer(TimeSpan.FromSeconds(5));
    
          var whenThresholdOrExpired = whenThreshold.Amb(whenExpired);
    
          whenThresholdOrExpired.Subscribe(buffer =>
            {
              Console.WriteLine("Buffer Contains: {0}",
                buffer.Aggregate("", (s, i) => s += i + ",", s => s.Substring(0, s.Length - 1)));
            }, 
            () => Console.WriteLine("Consumer ended."));
        }
      }
    }

    - Dave
    • Edited by Dave Sexton Friday, February 19, 2010 2:56 AM Removed ints.Count which was not being written by the subscriber
    Friday, February 19, 2010 2:40 AM
  • Thanks for your answer Andreas.
    The problem with this code is that the Buffer(Timespan) is "fixed" : Let say you do nothing, wait 4s, add an element, the Buffer(Timespan) will launch the process 1s after.
    In my case, I would like that this Buffer(Timespan) start only after having added the first element to the buffer/queue of 5 elements, then wait 5s, and if no more than 4 elements have been added, launch the process.

    Friday, February 19, 2010 9:36 AM
  • Thanks Dave for your answer too.
    I try this solution too, but it really doesn't work as (I ?) expected.
    For exemple, if you replace the line with Thread.Sleep(...) with Thread.Sleep(TimeSpan.FromMilliseconds(rnd.Next(100, 1000))), the buffer will keep the last 5 values.
    This is what the output give :

    Adding: 46
    Adding: 48
    Adding: 72
    Adding: 64
    Adding: 50
    Buffer Contains: 46,48,72,64,50
    Adding: 56
    Buffer Contains: 48,72,64,50,56
    Adding: 74
    Buffer Contains: 72,64,50,56,74
    Adding: 82
    Buffer Contains: 64,50,56,74,82
    Adding: 40
    Buffer Contains: 50,56,74,82,40
    Adding: 77
    Buffer Contains: 56,74,82,40,77
    Adding: 41
    Buffer Contains: 74,82,40,77,41

    How would you clear the buffer ?
    Friday, February 19, 2010 9:42 AM
  • I finally succeed in implementing it :

     
        public static void Start()
        {
          Subject<int> source = new Subject<int>();
          bool isEmpty = true;
    
          while (true)
          {
            int num;
            if (!int.TryParse(Console.ReadKey().KeyChar.ToString(), out num))
            {
              break;
            }
    
            Console.WriteLine(" added to buffer.");
    
            if (isEmpty)
            //if (source.IsEmpty())
            {
              isEmpty = false;
              IObservable<IEnumerable<int>> amb = source.Buffer(5).Amb(source.Buffer(TimeSpan.FromSeconds(3))).Take(1);
    
              amb.Subscribe(
                e =>
                {
                  Console.WriteLine("Received {0} elements, sum: {1}", e.Count(), e.Sum());
                  isEmpty = true;
                });
            }
    
            source.OnNext(num);
          }
    
          Console.ReadKey();
        }

    Last questions : 
    Is it possible to do the same thing without using the boolean "isEmpty" ?
    Why a call to source.IsEmpty() (see the commented line above) locks the process ?
    Friday, February 19, 2010 12:01 PM
  • Hi jufl0,

    source.IsEmpty() blocks because it is the bool Observable.IsEmpty<T>(this IObservable<T> source); extension method which waits for a OnNext() or OnCompleted() call. IsEmpty() like Observable.First(), FirstOrDefault(), Last(), LastOrDefault() is a blocking operator. A future release of Rx will probably offer asynchronous aggregation functions. See for the following popular thread: Aggregation operators blocking feels wrong...

    I also tried to do what you want just with Rx, but since there is no BufferUntilEnd() or BufferUpTo() operator in the current Rx release I had to write my own one (which of course makes the code much longer than your own version).

    Here is the code I tried (maybe somebody else has a better idea):
    public static class ObservasbleExtensions
    {
        public static IObservable<IEnumerable<T>> BufferUpTo<T>(this IObservable<T> source, int count)
        {
            return Observable.CreateWithDisposable<IEnumerable<T>>(o =>
            {
                var buffer = new List<T>();
                return source
                    .Synchronize(buffer)
                    .Subscribe(n =>
                {
                    buffer.Add(n);
                    if (buffer.Count >= count)
                    {
                        o.OnNext(buffer);
                        buffer = new List<T>();
                    }
                },
                o.OnError,
                () =>
                {
                    if (buffer.Count > 0)
                        o.OnNext(buffer);
                    o.OnCompleted();
                });
            });
        }
    }
    
    class Program
    {
        static void Main(string[] args)
        {
            Subject<int> source = new Subject<int>();
            source.TakeUntil(
                Observable.Amb(
                    source.SelectMany(i => Observable.Timer(TimeSpan.FromSeconds(5))
                    .Select(n => new Unit())),
                    source.Skip(4).Select(n => new Unit())
                )
            )
            .BufferUpTo(5)
            .Repeat(Scheduler.Now)
            .Subscribe(e =>
            {
                Console.WriteLine("Received {0} elements, sum: {1}", e.Count(), e.Sum());
            });
    
            while (true)
            {
                int num;
                if (!int.TryParse(Console.ReadLine(), out num))
                    break;
                source.OnNext(num);
            }
            Console.ReadKey();
        }
    }
    Saturday, February 20, 2010 3:36 PM
  • Hi,

    > if you replace the line with Thread.Sleep(...) with Thread.Sleep(TimeSpan.FromMilliseconds(rnd.Next(100, 1000))),
    > the buffer will keep the last 5 values.


    Sorry, I didn't notice that Buffer(5) wasn't returning values in my test case because I set the delay for too long.  So only Buffer(TimeSpan) was returning values.

    The problem is as you suggested; Buffer(5) maintains the total size of the buffer - it doesn't automatically reset after it becomes full.

    There's a really simple fix though to get my sample working.  Use Andreas' technique of Take(1).Repeat(Scheduler.Now) as follows:

    var whenThresholdOrExpired = whenThreshold.Amb(whenExpired)
                                              .Take(1)
                                              .Repeat(Scheduler.Now);

    - Dave
    Saturday, February 20, 2010 10:49 PM
  • Thanks everyone. I will continue dig into Rx a little more to see what I can do.
    Monday, February 22, 2010 3:36 PM
  • Monday, May 03, 2010 7:14 PM