Buffering/Throttling/Sampling/Taking/??? RRS feed

  • Question

  • So I'm still wrapping my brain around some of the Rx basics, and I'm hoping the group here can point me in the right direction.  

    Basically, I have a client systray app that is going to listen to a service for some incoming requests (pub/sub type operation).  The handling of each of these requests will ultimately incur quite a bit of overhead on the client itself.  As such, my brain is telling me that I need to "throttle" the incoming messages somehow to ensure it doesn't exceed the abilities of the client.  

    For example, I might only want to ever be actively processing three of these messages concurrently, after which the processing of any further incoming messages needs to be blocked until the one of the current messages is completed.

    This sounds like a good candidate to use Rx, but I'm not exactly sure how best to "throttle" these messages so I only process 3 at the same time and have the others wait.  I've read quite a bit about how Rx's implementation of "throttle" doesn't exactly line up with what most people expect, and I'm hoping to avoid over-engineering the blocking aspect if Rx has some nice tricks already available that I'm not aware of.

    Can somebody point me in the general direction I need to be heading with this? 

    Thanks :)

    Saturday, December 4, 2010 12:52 AM


  • Hi Joe,

    See the following discussions about controlling concurrency:


    Basically, the solution that you need is probably orthogonal to the operators that you've mentioned; e.g., creating a custom IScheduler perhaps.

    There are many different ways that each of those operators could work, so Rx only provides a few of them; presumably, the most common.

    Buffer*: Keeps track of values by count or time.  When the specified threshold elapses, the current buffer is observed.

    Throttle: Keeps a buffer of 1 value, replacing previous values, until the specified period of silence elapses (i.e., no more values), after which the last value buffered is observed.

    Sample: Keeps a buffer of 1 value, replacing previous values, until the specified sampling period elapses, after which the last value buffered is observed.

    Take*: Takes the specified number of values from the beginning or end of the sequence, or until another observable produces a value, or until some predicate returns false, and then calls OnCompleted.

    - Dave

    • Proposed as answer by James Miles Thursday, December 30, 2010 10:43 AM
    • Marked as answer by fixedpoint Tuesday, January 11, 2011 12:25 AM
    Sunday, December 5, 2010 1:30 AM