none
Design question - Throttling hits on external systems

    Question

  • Hi,

    I am writing a message broker that taps onto IBM MQs and folders on the file system. After picking up messages it then materializes them into strongly typed classes and plugs them into RX Subjects.

    I have built awareness on the messages that allow me to identify which external systems need to be hit to handle them, so I can do queries on the RX observables and pick messages that do not target an external system, etc.

    What I want to do next, is throttle messages by external system being hit, for example:

    If I was hitting a CRM system with a certain type of message, and I would decide that I wanted to hit that system with 4 concurrent calls max, I would be handling 4 messages at a time only, if I had a 5th message I would have to wait for one of the previous 4 to be done with and then move on to the 5th. The same for other types of resources like external databases, other external web services, etc.

    I have started researching on this matter, and so far the best design approach would be to write my own scheduler. The downside is that I would have to write my own internal structures that queue up messages inside the scheduler, after they are picked up, and this is where I dislike this approach.

    Does anyone have a better way of doing this?

    Thanks,

    David Rodrigues

    Friday, August 16, 2013 2:24 PM

Answers

  • Hi David,

    > I wanted to hit that system with 4 concurrent calls max, I would be handling 4 messages at a time only.

    How are you modeling concurrency at the moment?

    In case you're unaware, each observable sequence in Rx must satisfy a serialization contract (§4.2, Rx Design Guidelines); otherwise, Rx operators may introduce threading bugs.

    Therefore, multiple observables are required if you want to observe notifications concurrently.  Actually, "multiple observables" technically means multiple subscriptions; e.g., you can subscribe multiple times to the same cold observable, or different hot observables, to introduce concurrency between observers.

    So if you have multiple subscriptions, then notifications may be observed concurrently with respect to any two observers, though any particular observer will always observe notifications serially.

    If you only have a single subscription, then your observable cannot safely push notifications concurrently through any Rx operators.

    (Note: The observer that you pass to Subscribe may observe concurrent notifications if you want, as long as you explicitly implement any necessary synchronization and as long as there aren't any Rx operators between where the concurrency is introduced and the observer is passed to Subscribe.  Also note that the Rx Grammar is no longer ensured by Rx's implementation of IObserver<T>.)

    In summary, to model concurrency in Rx you can either:

    1. Subscribe multiple observers to multiple observables that are concurrent with respect to each other
      E.g., xs.Subscribe(a); ys.Subscribe(b);
    2. Subscribe multiple observers to the same cold observable that introduces concurrency
      E.g., xs.Subscribe(a); xs.Subscribe(b);
    3. Project notifications into new observables that introduce concurrency with respect to each other
      E.g., xs.Select(x => Observable.Start(...))
    4. Invoke an asynchronous action in the observer that you pass to Subscribe
      E.g. xs.Subscribe(x => DoSomethingVoidAsync(x))

    > if I had a 5th message I would have to wait for one of the previous 4 to be done with and then move on to the 5th.

    Here are 4 possible solutions to your problem, corresponding to each of the four models listed above.

    1. Multiple observers, multiple observables:
      Since you'll have multiple observers executing concurrently, it's your responsibility to limit concurrency and to ensure synchronization yourself.

      One approach is to use a Semaphore, which would block an observer until the amount of concurrency drops to an acceptable level.  Each time that a notification is observed, you would wait on the semaphore.  Once acquired, you would send the message to the CRM.  When the CRM signals that it's done processing the message, you release the semaphore.  Note that blocking an observer blocks the observable, due to the §4.2 contract.  This could potentially cause problems with the observable, depending upon various factors such as the origin of the notifications and the amount of latency caused by the CRM.

      Alternatively, you could avoid blocking observers by applying the producer/consumer pattern.  A concurrent queue allows each notification to be enqueued without blocking.  Then you can have 4 dedicated concurrent consumers taking from the queue and sending messages to the CRM.  Note that this approach introduces additional concurrency that may be unnecessary, so consider whether the former approach is better for your situation.

      Neither of the above approaches are entirely in the spirit of reactive programming with Rx, since both of them require multiple observers and they are explicitly stateful.
    2. Multiple observers, single cold observable
      Same as #1.
    3. Project notifications into new observables
      Given a single serialized observable, you can project its notifications into concurrent observables with Select and then apply this overload of Merge to limit concurrency.  Merge also safely merges results into a serialized sequence, if the CRM returns any useful information.

      For example:
      IObservable<Message> messages = GetMessages();
      
      var crm = messages.Where(m => m.Id == "CRM");
      
      crm.Select(message => Observable.Defer(() => Observable.StartAsync(
      	cancel => SendMessageToCRMAsync(message, cancel))))
      	.Merge(maxConcurrent: 4)
      	.Subscribe(LogResult);

      Note that if you didn't actually care about limiting concurrency, then you could simply replace Select with SelectMany to allow unlimited concurrency while also safely merging results into a serialized sequence; i.e., SelectMany is an unrestricted Select -> Merge.
    4. Invoke an asynchronous action in Subscribe
      Similar to #1.  The conclusion is the same, even though in this case we only have a single observer.  Multiple observers executing concurrently and a single observer executing concurrently for multiple notifications have the same exact problems, so #1 still applies.

    #3 is probably the best approach in general, and it's certainly the most reactive approach.

    Notice that I never even mentioned schedulers anywhere in my reply.

    One last thing to consider is why you're limiting concurrency in the first place: If you're trying to prevent the CRM from being flooded with messages, then how do you know that 4 messages at a time is ideal?  What if the CRM changes its latency dynamically?  What if the CRM has no problem handling all of the messages that you send to it?  E.g., it could have its own internal producer/consumer mechanism.  Consider whether it's better to let the CRM handle all messages.

    Alternatively, consider whether the CRM can process batches close to the efficiency of individual messages.  If it's comparable, then you can use the BufferIntrospective operator provided by Rxx to generate heterogeneously-sized buffers that grow or shrink dynamically based on the changing latencies of the CRM.  This is perhaps the most reactive solution since it goes two-ways: Introspection relies on the CRM to let it know when it's ready to receive another batch.  (Note that the current implementation of WindowIntrospective should really be renamed and modified slightly to replace BufferIntrospective since it creates windows artificially from buffers.  It's on my TODO list.)

    - Dave


    http://davesexton.com/blog

    Friday, August 16, 2013 10:45 PM
  • Hi David,

    > [snip] resubscribe with a different Merge argument?

    Yep, that'll work.

    Though you'll probably want to be sure that you don't miss any messages while re-subscribing, which means that either:

    • The source must be cold, which is unlikely.
    • The source is hot so you'll need some kind of buffering strategy, such as Replay, which may be too simplistic due to its unbounded memory usage or its static configuration.  Although, perhaps whether it's too simplistic or not depends on your actual requirements.
    • The source is hot and you want to rely on the §4.2 contract to ensure that re-subscription occurs serially with respect to messages.  This would allow you to implement re-subscription reactively and elegantly as part of the query itself, without missing any messages.  It's easy to implement this query by defining a sequence of maximum concurrency changes ("concurrencies"), projecting each into the Merge query described previously, and then applying TakeUntil/Repeat or, more simply, Switch.

      However, ensuring the §4.2 contract implies synchronization.  If your concurrencies and messages sequences are naturally synchronized, then a simple Switch query works as is – you've got nothing more to do; otherwise, you can use coarse-grained locking by applying Merge with Publish (i.e., flattening them into a single sequence and then breaking them apart again into separate sequences) or by directly applying the Synchronize operator (i.e., by providing a common gate), or you can insert an asynchronous queue, just like the producer/consumer pattern.  The latter is accomplished by marshaling notifications to a single thread of execution via ObserveOn, such as the UI thread or EventLoopScheduler, which provides a dedicated background thread.  Perhaps whether enqueuing or acquiring a lock for every notification is better depends upon the effects of each and a number of other factors.  Normally I'd recommend the former to avoid introducing additional concurrency, though if you plan to have a large quantity of messages then locking every message and potentially blocking pooled threads doesn't seem like the best choice.

    For example:

    var loop = new EventLoopScheduler();
    
    IObservable<IObservable<Unit>> messagesToSend = 
        crm.ObserveOn(loop).Select(message => 
            Observable.Defer(() => 
                Observable.StartAsync(() => SendMessageToCRMAsync(message))));
    
    IObservable<int> maxConcurrencies = GetConcurrencyChanges()
        .StartWith(4)
        .ObserveOn(loop);
    
    maxConcurrencies
        .Select(messagesToSend.Merge)
        .Switch()
        .Subscribe(LogResult);

     

    Either way, if the crm sequence is hot, then you'll probably have to give up cancellation of SendMessageToCRMAsync.  You wouldn't want re-subscription to cancel a message that's already been projected into a deferred observable yet hasn't been processed yet.  If it did, then that message may be lost forever.  That's why I got rid of the cancel argument in the example above.

    One consequence of giving up cancellation as part of the query is that disposing the subscription no longer notifies SendMessageToCRMAsync that it must be cancelled.  Fire-and-forget may be a better strategy for you anyway; however, if it's not, then you can easily workaround this problem by providing an out-of-band cancellation token to SendMessageToCRMAsync.  I highly recommend using CancellationDisposable, which gives you a cancellation token and an IDisposable, which is easily combined with the subscription's IDisposable via CompositeDisposable.  That way you've effectively separated re-subscription from query cancellation, yet you'll still end up with a single IDisposable that represents the entire subscription.

    - Dave


    http://davesexton.com/blog

    Monday, August 19, 2013 1:41 PM
  • Hi David,

    I just tried to throw together a quick sample using Publish and I discovered that re-subscribing to Merge doesn't actually work.  Sorry for misleading you.

    Merge keeps an internal buffer of the observables that it hasn't subscribed to yet.  That's how it controls concurrency.  Disposing of the subscription to Merge means that you'll lose those messages.

    EventLoopScheduler does in fact solve the problem that I mentioned, though it's not really a problem in your console app given that Merge has already buffered all of the messages by the time that you press a key to increment the maximum concurrency value!  Of course, in a real application it may actually prevent message loss from the source, assuming that it's slower than Observable.Range, but it doesn't really matter in your case since it can't prevent the loss of messages from Merge's internal buffer.

    Ultimately, it seems that you'll need a dynamic operator to reactively change the level of concurrency.  Merge is static.

    To be clear, this isn't a problem with Switch per se, it's a problem with Merge in general.  Even if you were to manually track the subscription, dispose and re-subscribe to another Merge query, you're going to lose all of the messages that the previous Merge query had already buffered.

    Here's a dynamic Merge operator that should meet your needs.  Use the + and - keys to update the maximum concurrency value.

    using System;
    using System.Collections.Generic;
    using System.Reactive.Disposables;
    using System.Reactive.Linq;
    using System.Reactive.Subjects;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace RxLabs.Net45
    {
      class DynamicMergeLab
      {
        internal static void Main()
        {
          var messagesToSend = GetMessages()
            .Where(m => m.Type == "CRM")
            .Select(message =>
              Observable.Defer(() =>
                Observable.StartAsync(() => SendMessageToCRMAsync(message))));
    
          var maxConcurrencies = new BehaviorSubject<int>(4);
    
          var merged = messagesToSend.Merge(maxConcurrencies);
    
          using (maxConcurrencies.Subscribe(c => Console.WriteLine("Concurrency: {0}", c)))
          using (merged.Subscribe(_ => { }))
          {
            ConsoleKey key;
    
            while ((key = Console.ReadKey(true).Key) != ConsoleKey.Escape)
            {
              var currentValue = maxConcurrencies.FirstAsync().Wait();
    
              if (key == ConsoleKey.Add || key == ConsoleKey.OemPlus)
              {
                maxConcurrencies.OnNext(currentValue + 1);
              }
              else if (key == ConsoleKey.Subtract || key == ConsoleKey.OemMinus)
              {
                maxConcurrencies.OnNext(currentValue - 1);
              }
            }
          }
        }
    
        private static int activeMessageCount;
    
        private static async Task SendMessageToCRMAsync(CrmMessage message)
        {
          Console.WriteLine("-> {0,-3}; Thread {1}; Active message count = {2}",
            message.Id,
            Thread.CurrentThread.ManagedThreadId,
            Interlocked.Increment(ref activeMessageCount));
    
          await Task.Delay(TimeSpan.FromSeconds(message.Id % 5));  // simulate latency
    
          Console.WriteLine("<- {0,-3}; Thread {1}; Active message count = {2}",
            message.Id,
            Thread.CurrentThread.ManagedThreadId,
            Interlocked.Decrement(ref activeMessageCount));
        }
    
        private static IObservable<CrmMessage> GetMessages()
        {
          return Observable.Range(0, 1000)
            .Select(n => new CrmMessage(n) { Type = "CRM" });
        }
    
        public class CrmMessage
        {
          public int Id { get; set; }
          public string Type { get; set; }
    
          public CrmMessage(int id)
          {
            Id = id;
            Type = "CRM";
          }
        }
      }
    
      static class ObservableEx24
      {
        public static IObservable<TSource> Merge<TSource>(this IObservable<IObservable<TSource>> source, IObservable<int> maxConcurrent)
        {
          return Observable.Create<TSource>(
            observer =>
            {
              var gate = new object();
              var q = new Queue<IObservable<TSource>>();
              var isStopped = false;
              var activeCount = 0;
              var maxConcurrentValue = 1;
              var group = new CompositeDisposable();
              var sourceSubscription = new SingleAssignmentDisposable();
              var maxConcurrentSubscription = new SingleAssignmentDisposable();
    
              group.Add(sourceSubscription);
              group.Add(maxConcurrentSubscription);
    
              Action<IObservable<TSource>> subscribeInner = null;
              subscribeInner = innerSource =>
              {
                var innerSubscription = new SingleAssignmentDisposable();
    
                group.Add(innerSubscription);
    
                innerSubscription.Disposable = innerSource.Subscribe(
                  value =>
                  {
                    lock (gate)
                    {
                      observer.OnNext(value);
                    }
                  },
                  ex =>
                  {
                    lock (gate)
                    {
                      observer.OnError(ex);
                    }
                  },
                  () =>
                  {
                    group.Remove(innerSubscription);
    
                    lock (gate)
                    {
                      if (activeCount <= maxConcurrentValue && q.Count > 0)
                      {
                        subscribeInner(q.Dequeue());
                      }
                      else
                      {
                        activeCount--;
                        if (isStopped && activeCount == 0)
                        {
                          observer.OnCompleted();
                        }
                      }
                    }
                  });
              };
    
              maxConcurrentSubscription.Disposable = maxConcurrent.Subscribe(
                value =>
                {
                  lock (gate)
                  {
                    maxConcurrentValue = Math.Max(1, value);
    
                    // When the maximum concurrency value is raised, try to dequeue and subscribe to as many 
                    // available observables as needed to fill the new bounds.
                    //
                    // When the maximum concurrency value is lowered, do not dispose of any active observables.
                    // 
                    // Lowering the maximum concurrency value does not imply cancelation; it only implies that 
                    // future inner observables will be deferred until the number of active observables drops
                    // sufficiently.  The number of active observables may currently be higher than the current 
                    // maximum concurrency value, though prematurely disposing of active observables may result 
                    // in irreversible data loss.
    
                    while (activeCount < maxConcurrentValue && q.Count > 0)
                    {
                      activeCount++;
                      subscribeInner(q.Dequeue());
                    }
                  }
                },
                ex =>
                {
                  lock (gate)
                  {
                    observer.OnError(ex);
                  }
                });
    
              sourceSubscription.Disposable = source.Subscribe(
                innerSource =>
                {
                  lock (gate)
                  {
                    if (activeCount < maxConcurrentValue)
                    {
                      activeCount++;
                      subscribeInner(innerSource);
                    }
                    else
                    {
                      q.Enqueue(innerSource);
                    }
                  }
                },
                ex =>
                {
                  lock (gate)
                  {
                    observer.OnError(ex);
                  }
                },
                () =>
                {
                  lock (gate)
                  {
                    isStopped = true;
                    if (activeCount == 0)
                    {
                      observer.OnCompleted();
                    }
                    else
                    {
                      sourceSubscription.Dispose();
                    }
                  }
                });
    
              return group;
            });
        }
      }
    }

    - Dave


    http://davesexton.com/blog

    Monday, August 19, 2013 7:59 PM

All replies

  • Hi David,

    > I wanted to hit that system with 4 concurrent calls max, I would be handling 4 messages at a time only.

    How are you modeling concurrency at the moment?

    In case you're unaware, each observable sequence in Rx must satisfy a serialization contract (§4.2, Rx Design Guidelines); otherwise, Rx operators may introduce threading bugs.

    Therefore, multiple observables are required if you want to observe notifications concurrently.  Actually, "multiple observables" technically means multiple subscriptions; e.g., you can subscribe multiple times to the same cold observable, or different hot observables, to introduce concurrency between observers.

    So if you have multiple subscriptions, then notifications may be observed concurrently with respect to any two observers, though any particular observer will always observe notifications serially.

    If you only have a single subscription, then your observable cannot safely push notifications concurrently through any Rx operators.

    (Note: The observer that you pass to Subscribe may observe concurrent notifications if you want, as long as you explicitly implement any necessary synchronization and as long as there aren't any Rx operators between where the concurrency is introduced and the observer is passed to Subscribe.  Also note that the Rx Grammar is no longer ensured by Rx's implementation of IObserver<T>.)

    In summary, to model concurrency in Rx you can either:

    1. Subscribe multiple observers to multiple observables that are concurrent with respect to each other
      E.g., xs.Subscribe(a); ys.Subscribe(b);
    2. Subscribe multiple observers to the same cold observable that introduces concurrency
      E.g., xs.Subscribe(a); xs.Subscribe(b);
    3. Project notifications into new observables that introduce concurrency with respect to each other
      E.g., xs.Select(x => Observable.Start(...))
    4. Invoke an asynchronous action in the observer that you pass to Subscribe
      E.g. xs.Subscribe(x => DoSomethingVoidAsync(x))

    > if I had a 5th message I would have to wait for one of the previous 4 to be done with and then move on to the 5th.

    Here are 4 possible solutions to your problem, corresponding to each of the four models listed above.

    1. Multiple observers, multiple observables:
      Since you'll have multiple observers executing concurrently, it's your responsibility to limit concurrency and to ensure synchronization yourself.

      One approach is to use a Semaphore, which would block an observer until the amount of concurrency drops to an acceptable level.  Each time that a notification is observed, you would wait on the semaphore.  Once acquired, you would send the message to the CRM.  When the CRM signals that it's done processing the message, you release the semaphore.  Note that blocking an observer blocks the observable, due to the §4.2 contract.  This could potentially cause problems with the observable, depending upon various factors such as the origin of the notifications and the amount of latency caused by the CRM.

      Alternatively, you could avoid blocking observers by applying the producer/consumer pattern.  A concurrent queue allows each notification to be enqueued without blocking.  Then you can have 4 dedicated concurrent consumers taking from the queue and sending messages to the CRM.  Note that this approach introduces additional concurrency that may be unnecessary, so consider whether the former approach is better for your situation.

      Neither of the above approaches are entirely in the spirit of reactive programming with Rx, since both of them require multiple observers and they are explicitly stateful.
    2. Multiple observers, single cold observable
      Same as #1.
    3. Project notifications into new observables
      Given a single serialized observable, you can project its notifications into concurrent observables with Select and then apply this overload of Merge to limit concurrency.  Merge also safely merges results into a serialized sequence, if the CRM returns any useful information.

      For example:
      IObservable<Message> messages = GetMessages();
      
      var crm = messages.Where(m => m.Id == "CRM");
      
      crm.Select(message => Observable.Defer(() => Observable.StartAsync(
      	cancel => SendMessageToCRMAsync(message, cancel))))
      	.Merge(maxConcurrent: 4)
      	.Subscribe(LogResult);

      Note that if you didn't actually care about limiting concurrency, then you could simply replace Select with SelectMany to allow unlimited concurrency while also safely merging results into a serialized sequence; i.e., SelectMany is an unrestricted Select -> Merge.
    4. Invoke an asynchronous action in Subscribe
      Similar to #1.  The conclusion is the same, even though in this case we only have a single observer.  Multiple observers executing concurrently and a single observer executing concurrently for multiple notifications have the same exact problems, so #1 still applies.

    #3 is probably the best approach in general, and it's certainly the most reactive approach.

    Notice that I never even mentioned schedulers anywhere in my reply.

    One last thing to consider is why you're limiting concurrency in the first place: If you're trying to prevent the CRM from being flooded with messages, then how do you know that 4 messages at a time is ideal?  What if the CRM changes its latency dynamically?  What if the CRM has no problem handling all of the messages that you send to it?  E.g., it could have its own internal producer/consumer mechanism.  Consider whether it's better to let the CRM handle all messages.

    Alternatively, consider whether the CRM can process batches close to the efficiency of individual messages.  If it's comparable, then you can use the BufferIntrospective operator provided by Rxx to generate heterogeneously-sized buffers that grow or shrink dynamically based on the changing latencies of the CRM.  This is perhaps the most reactive solution since it goes two-ways: Introspection relies on the CRM to let it know when it's ready to receive another batch.  (Note that the current implementation of WindowIntrospective should really be renamed and modified slightly to replace BufferIntrospective since it creates windows artificially from buffers.  It's on my TODO list.)

    - Dave


    http://davesexton.com/blog

    Friday, August 16, 2013 10:45 PM
  • Thanks for the great reply Dave!

    > How are you modeling concurrency at the moment?

    I am not at the moment. It's just doing everything synchronously (thus not ready for production yet!).

    > If you're trying to prevent the CRM from being flooded with messages, then how do you know that 4 messages at a time is ideal?

    Through several runs of our performance suite, that creates constant load through web performance and integration unit tests that target some of the WCF service layer. I then use another performance project that just mimics some of the handling of the CRM messages and scale that up and track normal load to see when it starts to degrade. This will give a soft limit cap.

    > What if the CRM changes its latency dynamically?

    I am currently tracking CRM response times. Optionally I can also track WMI counters on the target boxes part of the CRM farm, or that own the external resources if I need a lot more accurate information. The performance suite is also tracking the WMI counters, so I know exactly what "system is stressed" looks like in terms of the major counters (cpu, memory, physical storage).

    > What if the CRM has no problem handling all of the messages that you send to it?

    Potentially I can also track this and go above the soft limit very slowly and continue to track response times or WMI counters.

    > Consider whether it's better to let the CRM handle all messages.

    It's enterprise architecture (at it's finest ...), CRM doesn't want to have anything to deal with the specifics of the external systems and it's message formats.

    > Alternatively, consider whether the CRM can process batches close to the efficiency of individual messages.

    This would be ideal just for CRM, but I don't own those services and the team that developed them didn't do any batch related work. However for other resources the Broker is targeting this isn't an option, those are old legacy systems that no one wants to change!

    I went in and I did a very small implementation of your #3 suggestion, that really seems like the best design option:

    public class Program
    {
        private static readonly Random RandomGen =
            new Random((int) DateTime.Now.Ticks);
    
        static void Main(string[] args)
        {
            var messages = GetMessages();
    
            var crm = messages.Where(m => m.Type == "CRM");
    
            crm.Select(message => Observable.Defer(() => Observable.StartAsync(
                cancel => SendMessageToCRMAsync(message, cancel))))
                .Merge(4)
                .Subscribe(LogResult);
    
            Console.ReadKey(false);
        }
    
        private static Task SendMessageToCRMAsync(
            CrmMessage message,
            CancellationToken cancel)
        {
            var randomNumber = (RandomGen.Next(3) + 3)*1000;
            Console.WriteLine(
                "Sending Message <{0}> to Crm (taking: {1}) ...",
                message.Id,
                randomNumber);
    
            return Task.Delay(randomNumber)
                        .ContinueWith(_ => Console.WriteLine(
                            "Message <{0}> sent to Crm ...",
                            message.Id));
        }
    
        private static IObservable<CrmMessage> GetMessages()
        {
            return Observable.Range(0, 10)
                                .Select(n => new CrmMessage(n));
        }
    
        private static void LogResult(Unit message)
        {
        }
    }
    
    public class CrmMessage
    {
        public int Id { get; set; }
        public string Type { get; set; }
    
        public CrmMessage(int id)
        {
            Id = id;
            Type = "CRM";
        }
    }
    

    So, to solve my problem I would just have to track the subscription, and if the number of concurrent calls change dynamically I would then dispose of it and resubscribe with a different Merge argument?

    - David Rodrigues

    Monday, August 19, 2013 8:35 AM
  • Hi David,

    > [snip] resubscribe with a different Merge argument?

    Yep, that'll work.

    Though you'll probably want to be sure that you don't miss any messages while re-subscribing, which means that either:

    • The source must be cold, which is unlikely.
    • The source is hot so you'll need some kind of buffering strategy, such as Replay, which may be too simplistic due to its unbounded memory usage or its static configuration.  Although, perhaps whether it's too simplistic or not depends on your actual requirements.
    • The source is hot and you want to rely on the §4.2 contract to ensure that re-subscription occurs serially with respect to messages.  This would allow you to implement re-subscription reactively and elegantly as part of the query itself, without missing any messages.  It's easy to implement this query by defining a sequence of maximum concurrency changes ("concurrencies"), projecting each into the Merge query described previously, and then applying TakeUntil/Repeat or, more simply, Switch.

      However, ensuring the §4.2 contract implies synchronization.  If your concurrencies and messages sequences are naturally synchronized, then a simple Switch query works as is – you've got nothing more to do; otherwise, you can use coarse-grained locking by applying Merge with Publish (i.e., flattening them into a single sequence and then breaking them apart again into separate sequences) or by directly applying the Synchronize operator (i.e., by providing a common gate), or you can insert an asynchronous queue, just like the producer/consumer pattern.  The latter is accomplished by marshaling notifications to a single thread of execution via ObserveOn, such as the UI thread or EventLoopScheduler, which provides a dedicated background thread.  Perhaps whether enqueuing or acquiring a lock for every notification is better depends upon the effects of each and a number of other factors.  Normally I'd recommend the former to avoid introducing additional concurrency, though if you plan to have a large quantity of messages then locking every message and potentially blocking pooled threads doesn't seem like the best choice.

    For example:

    var loop = new EventLoopScheduler();
    
    IObservable<IObservable<Unit>> messagesToSend = 
        crm.ObserveOn(loop).Select(message => 
            Observable.Defer(() => 
                Observable.StartAsync(() => SendMessageToCRMAsync(message))));
    
    IObservable<int> maxConcurrencies = GetConcurrencyChanges()
        .StartWith(4)
        .ObserveOn(loop);
    
    maxConcurrencies
        .Select(messagesToSend.Merge)
        .Switch()
        .Subscribe(LogResult);

     

    Either way, if the crm sequence is hot, then you'll probably have to give up cancellation of SendMessageToCRMAsync.  You wouldn't want re-subscription to cancel a message that's already been projected into a deferred observable yet hasn't been processed yet.  If it did, then that message may be lost forever.  That's why I got rid of the cancel argument in the example above.

    One consequence of giving up cancellation as part of the query is that disposing the subscription no longer notifies SendMessageToCRMAsync that it must be cancelled.  Fire-and-forget may be a better strategy for you anyway; however, if it's not, then you can easily workaround this problem by providing an out-of-band cancellation token to SendMessageToCRMAsync.  I highly recommend using CancellationDisposable, which gives you a cancellation token and an IDisposable, which is easily combined with the subscription's IDisposable via CompositeDisposable.  That way you've effectively separated re-subscription from query cancellation, yet you'll still end up with a single IDisposable that represents the entire subscription.

    - Dave


    http://davesexton.com/blog

    Monday, August 19, 2013 1:41 PM
  • Thanks for another great reply!

    > Fire-and-forget may be a better strategy for you anyway

    On my scenario, because I want to make sure that all messages are handled, a fire and forget scenario is the best, so removing the cancelation token is what I'm actually doing.

    I don't yet fully understand all the RX operators you're using, but I've tried to put together another working console app with the information on your reply.

    It works fine, except that every time I push a new concurrency value to the concurrencies observable, it sends all the messages to CRM starting from the start, so re-sends the ones that have already been sent.

    The code for what I have now is:

    public class Program
    {
        private static readonly Random RandomGen =
            new Random((int) DateTime.Now.Ticks);
    
        static void Main(string[] args)
        {
            var loop = new EventLoopScheduler();
    
            var concurrentCalls = 4;
            var concurrenciesSubject = new Subject<int>();
    
            var messagesToSend = GetMessages()
                .ObserveOn(loop)
                .Select(message =>
                        Observable.Defer(() =>
                            Observable.StartAsync(() =>
                                SendMessageToCRMAsync(message))));
    
            var maxConcurrencies = concurrenciesSubject
                .StartWith(concurrentCalls)
                .ObserveOn(loop);
    
            maxConcurrencies.Select(messagesToSend.Merge)
                            .Switch()
                            .Subscribe(LogResult);
    
            var key = new ConsoleKeyInfo();
    
            while (key.Key != ConsoleKey.Escape)
            {
                key = Console.ReadKey(false);
                if (key.Key == ConsoleKey.Escape) break;
    
                concurrenciesSubject.OnNext(++concurrentCalls);
    
                Console.WriteLine(
                    "Increasing concurrent calls to {0}",
                    concurrentCalls);
            }
        }
    
        private static Task SendMessageToCRMAsync(CrmMessage message)
        {
            var randomNumber = (RandomGen.Next(3) + 3)*1000;
            Console.WriteLine(
                "Sending Message <{0}> to Crm (taking: {1}) ...",
                message.Id,
                randomNumber);
    
            return Task.Delay(randomNumber)
                        .ContinueWith(_ => Console.WriteLine(
                            "Message <{0}> sent to Crm ...",
                            message.Id));
        }
    
        private static IObservable<CrmMessage> GetMessages()
        {
            return Observable.Range(0, 1000)
                                .Select(n => new CrmMessage(n));
        }
    
        private static void LogResult(Unit message)
        {
        }
    }
    
    public class CrmMessage
    {
        public int Id { get; set; }
        public string Type { get; set; }
    
        public CrmMessage(int id)
        {
            Id = id;
            Type = "CRM";
        }
    }
    

    How do I prevent this code from re-sending messages already send after I push another value to the concurrencies Observable?

    Thanks,

    - David Rodrigues

    Monday, August 19, 2013 3:45 PM
  • Hi David,

    > How do I prevent this code from re-sending messages already send after I push another value to the concurrencies Observable?

    Observable.Range is cold, but I'm assuming that your real message sequence is hot.  You can make Range into a hot sequence by applying the Publish operator, but don't forget to call Connect on the IConnectableObservable<T> that it returns, after you've subscribed to the query of course.

    See my hot and cold observables blog post for starter links and, if you're interested, a deep analysis.

    Your console app looks good.

    - Dave


    http://davesexton.com/blog

    Monday, August 19, 2013 4:22 PM
  • Hi David,

    I just tried to throw together a quick sample using Publish and I discovered that re-subscribing to Merge doesn't actually work.  Sorry for misleading you.

    Merge keeps an internal buffer of the observables that it hasn't subscribed to yet.  That's how it controls concurrency.  Disposing of the subscription to Merge means that you'll lose those messages.

    EventLoopScheduler does in fact solve the problem that I mentioned, though it's not really a problem in your console app given that Merge has already buffered all of the messages by the time that you press a key to increment the maximum concurrency value!  Of course, in a real application it may actually prevent message loss from the source, assuming that it's slower than Observable.Range, but it doesn't really matter in your case since it can't prevent the loss of messages from Merge's internal buffer.

    Ultimately, it seems that you'll need a dynamic operator to reactively change the level of concurrency.  Merge is static.

    To be clear, this isn't a problem with Switch per se, it's a problem with Merge in general.  Even if you were to manually track the subscription, dispose and re-subscribe to another Merge query, you're going to lose all of the messages that the previous Merge query had already buffered.

    Here's a dynamic Merge operator that should meet your needs.  Use the + and - keys to update the maximum concurrency value.

    using System;
    using System.Collections.Generic;
    using System.Reactive.Disposables;
    using System.Reactive.Linq;
    using System.Reactive.Subjects;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace RxLabs.Net45
    {
      class DynamicMergeLab
      {
        internal static void Main()
        {
          var messagesToSend = GetMessages()
            .Where(m => m.Type == "CRM")
            .Select(message =>
              Observable.Defer(() =>
                Observable.StartAsync(() => SendMessageToCRMAsync(message))));
    
          var maxConcurrencies = new BehaviorSubject<int>(4);
    
          var merged = messagesToSend.Merge(maxConcurrencies);
    
          using (maxConcurrencies.Subscribe(c => Console.WriteLine("Concurrency: {0}", c)))
          using (merged.Subscribe(_ => { }))
          {
            ConsoleKey key;
    
            while ((key = Console.ReadKey(true).Key) != ConsoleKey.Escape)
            {
              var currentValue = maxConcurrencies.FirstAsync().Wait();
    
              if (key == ConsoleKey.Add || key == ConsoleKey.OemPlus)
              {
                maxConcurrencies.OnNext(currentValue + 1);
              }
              else if (key == ConsoleKey.Subtract || key == ConsoleKey.OemMinus)
              {
                maxConcurrencies.OnNext(currentValue - 1);
              }
            }
          }
        }
    
        private static int activeMessageCount;
    
        private static async Task SendMessageToCRMAsync(CrmMessage message)
        {
          Console.WriteLine("-> {0,-3}; Thread {1}; Active message count = {2}",
            message.Id,
            Thread.CurrentThread.ManagedThreadId,
            Interlocked.Increment(ref activeMessageCount));
    
          await Task.Delay(TimeSpan.FromSeconds(message.Id % 5));  // simulate latency
    
          Console.WriteLine("<- {0,-3}; Thread {1}; Active message count = {2}",
            message.Id,
            Thread.CurrentThread.ManagedThreadId,
            Interlocked.Decrement(ref activeMessageCount));
        }
    
        private static IObservable<CrmMessage> GetMessages()
        {
          return Observable.Range(0, 1000)
            .Select(n => new CrmMessage(n) { Type = "CRM" });
        }
    
        public class CrmMessage
        {
          public int Id { get; set; }
          public string Type { get; set; }
    
          public CrmMessage(int id)
          {
            Id = id;
            Type = "CRM";
          }
        }
      }
    
      static class ObservableEx24
      {
        public static IObservable<TSource> Merge<TSource>(this IObservable<IObservable<TSource>> source, IObservable<int> maxConcurrent)
        {
          return Observable.Create<TSource>(
            observer =>
            {
              var gate = new object();
              var q = new Queue<IObservable<TSource>>();
              var isStopped = false;
              var activeCount = 0;
              var maxConcurrentValue = 1;
              var group = new CompositeDisposable();
              var sourceSubscription = new SingleAssignmentDisposable();
              var maxConcurrentSubscription = new SingleAssignmentDisposable();
    
              group.Add(sourceSubscription);
              group.Add(maxConcurrentSubscription);
    
              Action<IObservable<TSource>> subscribeInner = null;
              subscribeInner = innerSource =>
              {
                var innerSubscription = new SingleAssignmentDisposable();
    
                group.Add(innerSubscription);
    
                innerSubscription.Disposable = innerSource.Subscribe(
                  value =>
                  {
                    lock (gate)
                    {
                      observer.OnNext(value);
                    }
                  },
                  ex =>
                  {
                    lock (gate)
                    {
                      observer.OnError(ex);
                    }
                  },
                  () =>
                  {
                    group.Remove(innerSubscription);
    
                    lock (gate)
                    {
                      if (activeCount <= maxConcurrentValue && q.Count > 0)
                      {
                        subscribeInner(q.Dequeue());
                      }
                      else
                      {
                        activeCount--;
                        if (isStopped && activeCount == 0)
                        {
                          observer.OnCompleted();
                        }
                      }
                    }
                  });
              };
    
              maxConcurrentSubscription.Disposable = maxConcurrent.Subscribe(
                value =>
                {
                  lock (gate)
                  {
                    maxConcurrentValue = Math.Max(1, value);
    
                    // When the maximum concurrency value is raised, try to dequeue and subscribe to as many 
                    // available observables as needed to fill the new bounds.
                    //
                    // When the maximum concurrency value is lowered, do not dispose of any active observables.
                    // 
                    // Lowering the maximum concurrency value does not imply cancelation; it only implies that 
                    // future inner observables will be deferred until the number of active observables drops
                    // sufficiently.  The number of active observables may currently be higher than the current 
                    // maximum concurrency value, though prematurely disposing of active observables may result 
                    // in irreversible data loss.
    
                    while (activeCount < maxConcurrentValue && q.Count > 0)
                    {
                      activeCount++;
                      subscribeInner(q.Dequeue());
                    }
                  }
                },
                ex =>
                {
                  lock (gate)
                  {
                    observer.OnError(ex);
                  }
                });
    
              sourceSubscription.Disposable = source.Subscribe(
                innerSource =>
                {
                  lock (gate)
                  {
                    if (activeCount < maxConcurrentValue)
                    {
                      activeCount++;
                      subscribeInner(innerSource);
                    }
                    else
                    {
                      q.Enqueue(innerSource);
                    }
                  }
                },
                ex =>
                {
                  lock (gate)
                  {
                    observer.OnError(ex);
                  }
                },
                () =>
                {
                  lock (gate)
                  {
                    isStopped = true;
                    if (activeCount == 0)
                    {
                      observer.OnCompleted();
                    }
                    else
                    {
                      sourceSubscription.Dispose();
                    }
                  }
                });
    
              return group;
            });
        }
      }
    }

    - Dave


    http://davesexton.com/blog

    Monday, August 19, 2013 7:59 PM
  • Hi David,

    I've added a work item to Rx:

    https://rx.codeplex.com/workitem/43

    - Dave


    http://davesexton.com/blog

    Monday, August 19, 2013 8:14 PM
  • Thanks! Works like a charm!

    Also learned a couple of new C# things from looking at your code, didn't know you could do multiple using statements without nesting them and doing {} on each of them.

    Tuesday, August 20, 2013 9:17 AM