How to use Rx when "T" is a large, expensive object


  • It seems that the IObservable<T>/IObserver<T> works really well when "T" is small and inexpensive. The producer just creates a new instance each time it has something to send down the chain and shoots it into the pipe. All the observers do their thing and when the last reference goes out of scope the garbage collector just deals with the cleanup.

    What is not clear to me is how this pattern should work when "T" is really big (like arrays of a million elements) and expensive - something you can't afford to "new" and "dispose" for every message. Clearly you need to pass these things by reference, and you may want to create a pool of them, but in either case you have the same problem - how does the producer know when it can reuse one of these elements? The subscription chain may be arbitrary in length and shape, and some subscribers may be observing on different threads so their use of the data may be asynchronous to the producer's call to "OnNext". How do you know when everyone is done using an element? Is there a pattern for this?

    Friday, October 03, 2014 10:25 PM

All replies

  • I can think of two possible strategies that you can use:

    1. Reference counting; e.g., something like:
      class Reference<T> {
          T Value { get; }
           IDisposable Acquire();
           IDisposable Subscribe(Action unreferenced);
    2. Singleton notification; e.g., something like:
      class Disposable<T> {
          T Value { get; }
          void Dispose();
          IDisposable Subscribe(Action disposed);

    (Edit: To be clear, the idea is to use IObservable<Reference<T>> or IObservable<Disposable<T>>.)

    Both strategies require your consumers to respect your contract, both must allow the observable to register for an async callback, and the wrapper classes themselves are relatively cheap.

    #1 is useful if all of your observers respect your T's acquire/release semantics.

    #2 is useful if you ultimately have a single observer that defines the lifetime of the resource.

    - Dave

    • Edited by Dave Sexton Saturday, October 04, 2014 3:38 AM Added missing Value property
    Saturday, October 04, 2014 3:34 AM
  • Hey Dave,

    Thanks for your reply. Unfortunately I am not sure that either strategy is sufficient. The problem is that you have some producer of "T" that has access to a pool of "T"s. It grabs a "T" from the pool, fills it with data, builds a Reference<T> wrapper around it and calls "OnNext" to push the item out the door. It has no idea at all of how many subscribers are listening to this, nor when or even if all those subscribers will be done using the instance of "T". Imagine you have a setup like this:

    var producer = new Producer<Reference<T>>();
    var publisher = producer.Publish();
    var subscriber1 = publisher.Subscribe(OnNextAction);
    var subscriber2 = publisher.ObserveOn(TaskPoolScheduler.Default)

    When Producer's call to OnNext completes, it has no idea there are two consumers, nor do the OnNext handlers for each subscriber know of each other, so there is no element in the system that knows when both "OnNextAction" and "SomeOtherOnNextAction" are done with "T". The two OnNext handlers cannot simply call Dispose on whatever is returned from Reference.Acquire since the first one done would leave the other with a buffer that could be recycled by the pool while the handler is in progress. The second subscriber's handler may not even have started executing when the first calls Dispose on the item wrapper. Waiting till all references to the wrapper go out of scope and letting the garbage collector deal with the problem (assuming it implements a finalizer that returns the wrapped item to the pool) makes for non-deterministic pool behavior and potentially long recycle times on the pool.

    It is also easy to envision scenarios where one subscriber might be something like a visualizer that you want to throttle to no more than one update per second. Rx provides extension methods to do that sort of throttling but they know nothing of Reference<T> and the contract it requires - they just drop the unneeded postings and move on. In this case, even fore-knowledge of all the potential consumers of T is insufficient since not all will necessarily see any given posting of T.

    What really seems to be needed here is some way to hand the garbage collector a list of objects to consider for collection. When there are no items left in the pool, the next attempt to get a pooled item would perform a "mini-collection" of all the Reference<T> instances the pool has handed out and hopefully one or more of them would be eligible for disposable so their wrapped T could be returned to the pool. WeakReference does not fit the bill because it only tells you when GC has already collected the object.

    Any other suggestions?


    Tuesday, October 07, 2014 10:11 PM
  • Hi Ron,

    I don't see this as a problem.  You can't just layer on concurrency-introducing operators over your data if your data has a specific contract that doesn't allow it.  It's not like you have an IObservable<int>, so developers won't be confused.

    For example, if you have an IObservable<IObservable<int>> what can you do with it?  Can you apply Throttle?  How about ObserveOn?

    No, you can't, because the contract of nested observables in Rx is that they are hot.  If you don't subscribe within the call to OnNext, then you may miss notifications.

    Likewise, one of the contracts for IObservable<Reference<T>> is that you must acquire the reference within the call to OnNext.

    However, there's also another contract.  You can't apply operators that could potentially drop the notifications, such as Throttle, because then you'd have no way of releasing the resource.

    I see this as being very similar to the problems we face with deterministic disposal in imperative-style apps, as can be seen in the subscription model used by Rx.  For example, do you keep the subscription's IDisposable around or not?  It depends upon whether you may need to dispose of the subscription later.  It's the same with the proposed Reference<T> or Disposable<T> data, except that due to the nature of Rx (the monad) an asynchronous boundary is like a speed bump that causes you to drop your coffee.

    It's similar to the old problem with APM (begin/end pairs) with the frustrating fact that the using statement couldn't be used to ensure disposal.  As soon as you're over an async boundary, all bets are off.

    - Dave

    Wednesday, October 08, 2014 1:51 PM
  • Hi Ron,

    I think you may have missed the point about "reference counting".  Calling Acquire was meant to increment a flag internally, thus two references would hold an internal value of 2.  Either reference is free to dispose at any time, causing the value to drop to 1.  Dispose won't cause any side-effects until the value drops to 0, meaning that there are no more references.

    If a query has a beginning and an end, then it's easy to create a scope for the reference.  During the call to OnNext you acquire the first reference (as described in my previous reply), which establishes a global scope.  Now other parts of the query are free to acquire references as well, even across async boundaries.  In the call to Subscribe, the original reference is released.  If any other parts of the query are still holding references, then essentially control over the lifetime of the resource has been transferred to those parts of the system.  When all parts finally release their references, the resource is returned to the pool.

    Admittedly, this scoping pattern requires the Reference<T> to be passed all the way through to the subscription, but that shouldn't be a huge problem.

    For example, you could define a Select method on Reference<T> to allow projections on T while preserving the reference count.  (Note that it can't be an extension method, unless the ref count mechanism has internal accessibility.)

    Edit: I could also imagine other operators, such as Where, which might drop the reference but ensure that it's released first.  Not sure of the exact semantics, but maybe it should be parameterized to be able to release N references at a time.

    - Dave

    • Edited by Dave Sexton Wednesday, October 08, 2014 2:16 PM Note about Where
    Wednesday, October 08, 2014 2:12 PM
  • Hey Dave,

    Thank you so much for taking the time to respond in-depth like this. In case you can't tell, I am an Rx newbie and I am still having issues trying to sort out how all this stuff hangs together. Is there any sort of book or other material that you could recommend to help me along this learning curve? I really need to try to understand what you have said in your replies, and at the moment the light bulb of understanding is only flickering, dimly at times. I have this feeling that there is a solution for the problem I am trying to solve that is beyond the extent of my current understanding of Rx.

    Thanks again for your patience and insight.


    Wednesday, October 08, 2014 5:54 PM
  • Check out a list of resources on my blog:

    In particular, Intro to Rx is often cited as a great place to start.

    - Dave

    Thursday, October 09, 2014 6:04 PM
  • I should also point out that I don't recall ever seeing your particular question discussed before, at least not to the relative depth that we have.  I think the reason is that it's not really an Rx problem. It's a general problem, similar to those that garbage collection algorithms solve.

    Namely, how do you track/scope the lifetime of resources in a deterministic fashion? That's exactly what IDisposable is for in OOP.

    The reactive world is no different.  Calling Dispose at a later time is reactive.

    There may theoretically be a better native solution than what I'm offering on top of Rx via T, but as it stands, it seems like the only alternative to what I've proposed is to abandon many of Rx's principles and do your own thing; i.e., define some out-of-band reference tracking system. Of course, the result is a tight coupling between observables and observers, which I think is what we've both implicitly agreed is best to avoid.

    Either way, it's a hard problem and I doubt whether you'll be able to find a 100% satisfactory solution.  My recommendation is to learn about Rx's contracts and their benefits, then try to "stay within the monad" as per the solutions that I've offered.

    - Dave

    Thursday, October 09, 2014 6:18 PM
  • Hey Dave,

    You're right - this problem is not specific to Rx. However, Rx provides this wonderful mechanism for creating loosely coupled, concurrent, composable systems. If Rx and/or .NET could provide some assistance to ease the resource sharing/lifetime problems, life would be good.

    For example, if Rx had a "baked in" type of "IReference<T>", every extension method in the system could recognize that type and participate in the contract. Anyone who wants to inject a lifetime-controlled object T into the system needs to ensure that it implements IReference<T> (or is wrapped in something that does). Intrinsically, Rx would know that when it calls "OnNext" on any item that has an IReference<T> embedded in it, Rx would call "AddRef" on all the IReference<T> elements, and similarly would call "ReleaseRef" when dropping a reference to such an item. It would be up to the implementer of IReference<T> to determine how to clean up T when the reference count goes to zero.

    In the absence of something like IReference<T>, I am still trying to understand your earlier proposals. In particular, you stated that I could implement a Select (or Where) method on Reference<T> to allow projection (or selection). I'm not sure I understand this statement - it seems that what I need is an extension method for the type "IObservable<Reference<T>>". I have implemented a prototype of this and indeed I can intercept each Reference<T> as it flows by and do the "right" thing. However, if the Reference<T> is embedded in something else, like Tuple<Reference<T>, int>, I now need an implementation of each extension method that is specialized for IObservable<Tuple<Reference<T>, int>>. While doable, this restricts the use of anonymous types in projections involving Reference<T>. Am I just completely misunderstanding your suggestion?


    Friday, October 10, 2014 6:00 PM
  • Hi Ron,

    I don't think that natively Rx can do anything with an IReference<T>.  For one thing, you wouldn't want every operator to be stateful by having to keep a reference to every notification.  And anyway it actually doesn't make sense that before calling OnNext an operator should acquire a ref, because isn't that up to the observer to decide?  But if each operator acquired a ref when observing a value, then when would it be released?  The problem gets pushed to the outermost observer passed to Subscribe, which has knowledge of the application's problem domain and thus can define the lifetime scope of resources, which may be required to exist long after unsubscription.

    Rx provides the mechanism to get the resources asynchronously, but it has know way of knowing how to define the scope of a resource's lifetime.

    I think ultimately it's up to the developer to choose at the application level, similar to how we choose when to apply the using statement, when to hold a reference to an IDisposable and when to call Dispose.

    Note that Rx does offer a Using method as the dual to C#'s imperative using statement.  But I think that's just about as far as Rx can go with resource management.

    > you stated that I could implement a Select (or Where) method on Reference<T> to allow projection (or selection).  [snip]

    Yes, you'll find that you have to vary T through a query, but you don't want to lose the ref count; therefore, you should never have a Tuple<Reference<T>, int>, but instead you should have a Reference<Tuple<T, int>>.  If you started with a Reference<T>, then a Select operator on Reference<T> would allow you to project T into Tuple<T, int>, resulting in Reference<Tuple<T, int>>.

    The semantics of Where are a bit strange.  I didn't fully think it through, and perhaps it's not relevant.

    - Dave

    Friday, October 10, 2014 6:58 PM
  • I think I was not clear in my description of IReference<T> - operators do not have to be stateful. When an observable wants to publish an IReference<T> item it needs to create the "T" - that might involve grabbing something from a pool, or opening an expensive connection to something, etc. That is why the observable needs to set the reference count before pushing it out using "OnNext" - the T exists and at some point needs to be cleaned up. Any operator that receives an IReference<T> does one of two things with it - it either passes it downstream to its subscribers, or it consumes the item. In the case where the item is passed on down the chain, the operator has nothing more to do - the T is still in play and can't be cleaned up. If an operator consumes an item (i.e. does not pass it on), it needs to call ReleaseRef on IReference<T> before letting go of the item. All that is required for this to work is for every operator to recognize when it is dealing with an IReference<T>.

    You can't wait for the observer(s) to increment a reference count because of the asynchronous nature of the system. For example, if there are two subscribers it is possible that the ref counting sequence will be subscriber1(increment), subscriber1(decrement), subscriber2(increment), subscriber2(decrement) - in other words subscriber1 finishes using the resource before subscriber2 starts using it. As soon as subscriber1 decrements the count, it goes to zero and the resource gets released. That is why the entity that brings the resource into existence has to set the refcount. To do this it needs to know how many subscribers there are (I think this information is available to any observable). Operators like Publish would have to diddle refcounts appropriately since their subscriber count is likely different than that of the original observer (since the Publish method hides multiple subscribers behind a single subscription to the source).

    I did look at the Using method, but that has the wrong semantics for what I want - basically it brings a resource into being for the lifetime of a subscription. I don't want to have to unsubscribe and re-subscribe to maintain the lifetime of my resources.

    Your final point about Reference<T> being the outer wrapper is great - that means I only ever have to write the one extension method to handle IObservable<Reference<T>>, where T may be a composite of the expensive resource and other stuff. It is completely feasible to document that restriction on projections.


    Friday, October 10, 2014 7:35 PM
  • Hi Ron,

    > [snip] the observable needs to set the reference count before pushing it out using "OnNext"

    How does it know how many downstream observers will acquire the reference?  That's exactly what I was trying to avoid with Reference<T>.

    > You can't wait for the observer(s) to increment a reference count because of the asynchronous nature of the system [snip]

    This is not a problem.  See my previous replay about the Reference<T> contracts; e.g., an observer must acquire the reference before returning to OnNext.  It doesn't matter if the observable is hot or cold.

    > I did look at the Using method, but that has the wrong semantics for what I want  [snip]

    Yes, I didn't mean to imply that you may want to use it.  My point is that it ties the lifetime of a resource to the lifetime of a subscription.  I think that's about all that Rx can do natively.

    - Dave

    Friday, October 10, 2014 8:23 PM
  • Hi Ron,

    > it is possible that the ref counting sequence will be subscriber1(increment), subscriber1(decrement), subscriber2(increment), subscriber2(decrement)

    See also §4.2 in the Rx Design Guidelines.

    - Dave

    Friday, October 10, 2014 8:25 PM
  • I was under the naïve assumption that an observable would know how many downstream subscribers there are, but it is becoming evident that is not the case.

    Just to be clear then on the contract for Reference<T>:

    1. the producer creates Reference<T> to wrap the expensive object
    2. the producer calls Subscribe on the wrapper, providing a delegate to be called when the object is unreferenced
    3. the producer calls OnNext on its subscriber
    4. inside the OnNext delegate of each consumer, the consumer calls "Acquire", does what it needs to do, and then calls "Dispose" on the handle returned from "Acquire" (or better yet, wraps the use of "Value" in a "using" statement to automate this)
    5. The "Acquire" method and the Dispose method on the handle it returns implement some sort of reference counting
    6. when the reference count goes to zero, the Dispose method on the IDisposable returned by "Acquire" calls OnNext on the observer supplied by the producer when it called Subscribe on the Reference<T> wrapper

    Part of what I was struggling with before was due to not realizing that if you have a chain of operators like this:


    then a.OnNext calls b.OnNext which calls c.OnNext, at least in the case where nobody is observing on a different thread. As a result there is no "gap" between a and b where the ref count goes to zero - b and c are done with their OnNext calls before a.OnNext returns. And now your earlier statement about not being able to introduce concurrency operators makes more sense.

    Finally, it is up to the producer to deal with the issues associated with potentially having multiple subscribers to which it sends the same expensive object, and I would need to implement contract-aware versions of any selectors, filters, or concurrency-introducing methods.

    Have I finally understood what you have been saying for a few days now?  :-)


    Friday, October 10, 2014 11:24 PM