none
Monitoring depth of an Observable

    Soru

  • Hello

    Is it possible to monitor depth of IObservale without "consuming" it. More precisely, in practical terms: I have a message stream. Messages come through at random times and quantities. The observable it self is buffered in various ways, to throttle and chunk the messages. It is conceivable that with message processing delays and high volume of input, messages will start backing up(part question, I think its possible but not sure)?

    Is there a way to check for that and measure how bad its getting? 

    07 Mart 2012 Çarşamba 01:29

Yanıtlar

  • Hi,

    This is a great question that I don't recall seeing on this forum, at least not worded in this way.

    Yes, it's possible for observers to be slower than observables.  And since well-behaved observables push notifications serially (see the Rx Design Guidelines, §4.2), it's possible that notifications can get "backed up".

    But I wonder, why do you want to monitor for this behavior?  Is it...

    1. acceptable, but you want to be aware of it?
    2. unacceptable, and you need your query to automatically adapt at runtime?
    3. unacceptable, and you need to fix your code to avoid the behavior at runtime based on performance goals against realistic data?

    I think #3 is pretty important.  Do you have specific performance goals in mind or is monitoring just a contingency?  Have you tested your query against realistic data to see if there's any chance of this behavior occurring?  (You may be pleasantly surprised.)

    One way to monitor throughput is to record timestamps when data is generated and then again when data is observed.  The relative differences will show whether observers are slower than observables.  Rx has Timestamp and TimeInterval operators that you may find useful.  Rx also has a Do operator that is useful for logging.  If you're interested, Rxx provides several Trace* extensions designed specifically for logging.

    Other possibilities depend upon how exactly you are buffering and throttling the data.  For example, if you're using a simple buffer-by-count, e.g., xs.Buffer(10), then you could Publish the observable first and attach Scan to keep track of the number of values that are being buffered.  Whenever the limit is reached, notify observers and reset the accumulator.

    For example: (Untested)

    public void Foo(IObservable<Data> source)
    {
    	const int bufferSize = 10;
    
    	var published = source.Publish();
    
    	published.Buffer(bufferSize).SelectMany(AsyncProcess).Subscribe(NormalObserver);
    
    	if (MonitorEnabled)
    	{
    		published.Scan(0, (acc, _) => acc == bufferSize ? 1 : acc + 1)
    			.Where(count => count == bufferSize)
    			.Subscribe(MonitorObserver);
    	}
    
    	published.Connect();
    }

    Time-based operators in Rx seem to be forgiving of this behavior and will adapt reactively.  For example, if you forcefully back up Observable.Interval by temporarily blocking an observer for longer than its period, you'll see that only the following notification is observed without delay.  Subsequent notifications keep the original period; i.e., they are automatically time-shifted, starting from the time at which the single backed-up notification is observed.

    I'm not sure about other time-sensitive operators in Rx, but I suspect that they all behave similarly to Interval with respect to slow observers.

    - Dave


    http://davesexton.com/blog

    • Düzenleyen Dave Sexton 07 Mart 2012 Çarşamba 03:16 Added SelectMany to example
    • Yanıt Olarak İşaretleyen Dmitry Orlovsky 08 Mart 2012 Perşembe 19:17
    07 Mart 2012 Çarşamba 03:10
  • Hi Dmitry,

    I may have mixed together two ideas in my previous response.

    Calculating deltas is something that you can do regardless of whether concurrency is introduced or not.  This was how I previously suggested to monitor queries that observe notifications synchronously with respect to some observable source; e.g., UI events.  This is also probably the simplest solution.

    However, since AsTimer introduces concurrency, it means that you can get an accurate real-time view of the data in your monitor without having to calculate deltas.  For example, consider publishing the chunked observable before AsTimer and subscribe to that as your monitor.  The monitor will receive notifications that aren't choked by slow observers of the AsTimer query.  Therefore, if you can correlate the data being observed in the monitor with the data being observed in the main observers, adjusting for the 3 second window, then you can determine instantaneously whether an observer is slow or not.  This could be pretty complicated though and I haven't thought it through fully either, so I'm sure there are things I haven't considered.  But if you want to code a system that reacts in real-time to slow observers, without waiting to collect deltas, then perhaps it can be done this way.

    - Dave


    http://davesexton.com/blog

    07 Mart 2012 Çarşamba 04:50

Tüm Yanıtlar

  • Hi,

    This is a great question that I don't recall seeing on this forum, at least not worded in this way.

    Yes, it's possible for observers to be slower than observables.  And since well-behaved observables push notifications serially (see the Rx Design Guidelines, §4.2), it's possible that notifications can get "backed up".

    But I wonder, why do you want to monitor for this behavior?  Is it...

    1. acceptable, but you want to be aware of it?
    2. unacceptable, and you need your query to automatically adapt at runtime?
    3. unacceptable, and you need to fix your code to avoid the behavior at runtime based on performance goals against realistic data?

    I think #3 is pretty important.  Do you have specific performance goals in mind or is monitoring just a contingency?  Have you tested your query against realistic data to see if there's any chance of this behavior occurring?  (You may be pleasantly surprised.)

    One way to monitor throughput is to record timestamps when data is generated and then again when data is observed.  The relative differences will show whether observers are slower than observables.  Rx has Timestamp and TimeInterval operators that you may find useful.  Rx also has a Do operator that is useful for logging.  If you're interested, Rxx provides several Trace* extensions designed specifically for logging.

    Other possibilities depend upon how exactly you are buffering and throttling the data.  For example, if you're using a simple buffer-by-count, e.g., xs.Buffer(10), then you could Publish the observable first and attach Scan to keep track of the number of values that are being buffered.  Whenever the limit is reached, notify observers and reset the accumulator.

    For example: (Untested)

    public void Foo(IObservable<Data> source)
    {
    	const int bufferSize = 10;
    
    	var published = source.Publish();
    
    	published.Buffer(bufferSize).SelectMany(AsyncProcess).Subscribe(NormalObserver);
    
    	if (MonitorEnabled)
    	{
    		published.Scan(0, (acc, _) => acc == bufferSize ? 1 : acc + 1)
    			.Where(count => count == bufferSize)
    			.Subscribe(MonitorObserver);
    	}
    
    	published.Connect();
    }

    Time-based operators in Rx seem to be forgiving of this behavior and will adapt reactively.  For example, if you forcefully back up Observable.Interval by temporarily blocking an observer for longer than its period, you'll see that only the following notification is observed without delay.  Subsequent notifications keep the original period; i.e., they are automatically time-shifted, starting from the time at which the single backed-up notification is observed.

    I'm not sure about other time-sensitive operators in Rx, but I suspect that they all behave similarly to Interval with respect to slow observers.

    - Dave


    http://davesexton.com/blog

    • Düzenleyen Dave Sexton 07 Mart 2012 Çarşamba 03:16 Added SelectMany to example
    • Yanıt Olarak İşaretleyen Dmitry Orlovsky 08 Mart 2012 Perşembe 19:17
    07 Mart 2012 Çarşamba 03:10
  • Hi,

    I forgot an important detail.  Due to the serialized behavior of well-behaved observables, a slow observer will actually prevent an operator such as Buffer(count) from receiving any notifications until the observer returns from the current notification, unless you are introducing concurrency into the query before Subscribe is called; e.g., via SelectMany or an operator such as Throttle.  (I've updated my previous example to include SelectMany for this reason, which allows MonitorObserver to receive notifications concurrently with respect to NormalObserver.)

    If your observers are actually blocking your buffering and throttling in some way, then you may actually have to monitor at the source of the observable instead of within the query.  For example, if your source is a producer (as in producer/consumer) that runs on a background thread and periodically pushes notifications into an observable from a queue, then it's probably best to monitor for slow observers by analyzing the size of the queue vs. the throughput to observers.

    - Dave


    http://davesexton.com/blog

    07 Mart 2012 Çarşamba 03:29
  • Hi Dave 

    Thanks again for you answer, I have to say though, wow, do you live on this forum? hehehe but i guess you get that a lot.

    Regarding your questions:

    1 is yes

    2  no, would be awesome but no bc I would imagine any solution like that would complect the definition to a none maintainable point.

    3 maybe, I did run it against semi realistic message stream and it did fair quiet well.

    The top level stream is a Rxx ReadToEndObservable of of a TcpClient's stream, there is a SelectMany in the buffering as well as AsTimer.

    Re, Observers blocking the buffering.. im not sure how i would achieve that but here's an excerpt of the buffering(mock prototype not prod):

    source.Buffer(() => Observable.Amb(new[]{
                                                    source
                                                        .Select(a => a.Length)
                                                        .Scan((a,c) =>
                                                                         {
                                                                             Console.WriteLine("Accum: {0} next: {1}",a,c);
                                                                             a += c;
                                                                             return a;
                                                                         })
                                                        .SkipWhile(a => a < 100).Select(i => 0L),
                                                        Observable.Timer(TimeSpan.FromSeconds(3))}))
                                                        .SelectMany(chunk => chunk)
                                                        .AsTimer(TimeSpan.FromMilliseconds(10))

    source is a string IObservable.

    The reason I was looking for a depth of queue solution here is mostly for operations to have an indication that the service is getting hammered and to start thinking about "rolling out more hw" so to speak, if that makes sense.

     



    07 Mart 2012 Çarşamba 03:57
  • Hi,

    The same thing goes for a Window's message loop, which is basically the same as my previous description of a producer and queue.

    In other words, if your observable is converted from a UI event and you apply a query that doesn't introduce concurrency, and you have an observer that blocks for N seconds, then even the Buffer(count) operator will not receive any notifications until the observer returns; i.e., the UI thread is effectively blocked, so the entire query is choked.

    You won't be able to monitor for slow observers by injecting anything into the query; at least not synchronously.  Instead, you could subscribe another observer for the UI event's observable and update a timestamp in memory for each notification.  Have a background thread periodically analyze the timestamp (e.g., via Observable.Interval) to see whether the UI thread is being blocked by observers.  You could also instrument the query to record the deltas between observations (via the Do operator, for example) to determine which observer has poor througput.  You could even adapt reactively by having the background monitor cancel slow observers and dispose of their subscriptions; e.g., this could be useful if you have an add-on model in your application and you want to automatically disable add-ons that are blocking the UI thread for an unacceptable amount of time.

    But notice that an appropriate solution to your problem depends upon whether you're introducing concurrency into your query or not.  If so, you can monitor throughput by instrumenting the query at various places, as I've shown in my original example with SelectMany.  If not, then you'll have no choice but to monitor at either end of the query.  For example, you could monitor the source directly, which is probably going to be a message queue of some kind, or you could monitor observers by recording timestamps and analyzing deltas, or both.

    - Dave


    http://davesexton.com/blog

    07 Mart 2012 Çarşamba 04:07
  • Interesting!

    JP Cowboy Coders Unite!

    07 Mart 2012 Çarşamba 04:30
  • Hi Dmitry,

    Thanks for providing an example.  The AsTimer operator introduces concurrency, which means that Buffer is free to receive notifications concurrently with respect to observers.  For this reason, you can follow my previous example to instrument your query before and after Buffer.  However, since it's not just a simple count you can't really use Scan like I did.  Perhaps you should create a separate query on source as your monitor.  You can use the TimeInterval operator to track deltas.  Observers of the monitor query will receive notifications concurrently with respect to observers of the main query, thus you can determine whether any of the main observers are slow by comparing the average delta of the monitor with the average deltas of each main observer.  Deltas that are substantially larger in any of the main observers may indicate that they are processing data too slowly.  Though you'll have to adjust your calculation for the 3 second cap that you're using with Amb.

    > wow, do you live on this forum?

    :)

    My dedication is only slight compared to what I've seen from other contributors, especially on the C# general forums and newsgroups.  It's sad though, because I don't have a life and yet I still produce less ;)

    - Dave


    http://davesexton.com/blog

    07 Mart 2012 Çarşamba 04:34
  • Hi Dmitry,

    I may have mixed together two ideas in my previous response.

    Calculating deltas is something that you can do regardless of whether concurrency is introduced or not.  This was how I previously suggested to monitor queries that observe notifications synchronously with respect to some observable source; e.g., UI events.  This is also probably the simplest solution.

    However, since AsTimer introduces concurrency, it means that you can get an accurate real-time view of the data in your monitor without having to calculate deltas.  For example, consider publishing the chunked observable before AsTimer and subscribe to that as your monitor.  The monitor will receive notifications that aren't choked by slow observers of the AsTimer query.  Therefore, if you can correlate the data being observed in the monitor with the data being observed in the main observers, adjusting for the 3 second window, then you can determine instantaneously whether an observer is slow or not.  This could be pretty complicated though and I haven't thought it through fully either, so I'm sure there are things I haven't considered.  But if you want to code a system that reacts in real-time to slow observers, without waiting to collect deltas, then perhaps it can be done this way.

    - Dave


    http://davesexton.com/blog

    07 Mart 2012 Çarşamba 04:50
  • Guys

    I think Dave answered Dmitry's query, but I just wanted to draw attention to an area of 'depth' in Rx that I didn't see mentioned elsewhere in this thread.

    The ObserveOn(IScheduler) operator introduces a somewhat hidden queue between the producer and the consumer(s) in order to decouple the two threads involved in the Rx stream.

    We've been caught out by this on a project before, and its something I find that quite often confuses newcomes to Rx.

    We implemented an ObserveLatestOn operator that only stored the latest value, and threw away all previous values. This meant that the consumer would miss values if they were processing them more slowly than values were arriving, but that when they did process a value, they always processed the latest value.

    This had the nice effect of scaling throughput to the performance of the consumers, and was perfect for our situation as each value was a full update.

    Just something to consider..

    Cheers
    Matt

    07 Mart 2012 Çarşamba 11:16
  • Matt;

      Interesting.  I have a post here in this forum for turning EF queries into Observables.  But because .NET seems to prefer IENUMERABLES as in ObservableCollections I found it quite easy to take a newly pushed value and simply add it to the collection.  We know that the COUNT method on the IENUMERABLE could satisfy this ops question right?

    The concept is pretty simple...  Create a LINQ statement (which normally returns the IENUMERABLE), convert it to an OBSERVABLE and then SUBSCRIBE.  On the post back take the pushed value and add to a collection.  There's the count right there...


    JP Cowboy Coders Unite!

    07 Mart 2012 Çarşamba 13:43
  • Hi Matt,

    That's a good option to have.  In Rxx, the BufferIntrospective operator can be used to accomplish the same thing.  For example:

    var scheduler = Scheduler.ThreadPool;
    var observeLatestOn = source.BufferIntrospective(scheduler).SelectMany(buffer => buffer.TakeLast(1));

    We've also got a work item for a SampleIntrospective operator to simplify this scenario:

    http://rxx.codeplex.com/workitem/21246

    - Dave


    http://davesexton.com/blog

    07 Mart 2012 Çarşamba 15:26
  • Hi Mr. Javaman,

    > We know that the COUNT method on the IENUMERABLE could satisfy this ops question right?

    If you have access to an explicit collection or queue, as in the producer of a consumer/producer pattern, then yes, its count can be used along with a time interval to interactively monitor throughput.  However, the OP's code shows that there is no explicit queue in this scenario.  The AsTimer operator maintains the queue internally.

    > Create a LINQ statement (which normally returns the IENUMERABLE), [snip]

    In this case, the query begins as a reactive query, so that's probably not a good option.  Converting to IEnumerable<T> is typically undesirable in a reactive system.  Also consider my point about reactive monitoring - it's possible to create a separate monitoring subscription by publishing a source before the final operator in which concurrency is introduced.  Then attach a side-effect (e.g., using the Do operator) immediately after the final concurrency-introducing operator that updates a shared value with the identity of the last observed element.  Then the monitor can determine, based on the semantics of the query at the point in which concurrency is introduced, whether or not observers are running slower than the source sequence.  You could also attach a reference to the observer itself and then reactively unsubscribe observers that are misbehaving.  No enumerables needed.  :)

    - Dave


    http://davesexton.com/blog

    07 Mart 2012 Çarşamba 15:35
  • Thanks Dave I'm still green at RX...

    JP Cowboy Coders Unite!

    09 Mart 2012 Cuma 00:01