none
Caching the result of an asynchronous request with RX ? RRS feed

  • Question

  • Hi there.

    I'm loving RX but I have to say the learning curve is steep.

    How could I go about caching the result of an asynchronous request, so that the server is only hit once when the first call is made. After the first call, the observable returned can fire results straight away. Maybe the Replay operator would be a good fit.

    Something like this:

    IObservable<Result> GetResult()

    {

    //call server if results not in cache, otherwise returned IObservable that can fire the results directly on subscribption

    }

    I should also say that the solution needs to cope for the case where the method might be called while the request to the server is in progress, in which case, the returned Observable should start firing the results as soon as the request completes.

    Finally, it also needs to be thread safe so that two threads can't happen to both fire a request to the server because of a race condition. I say so because usually caching involves storing the data somewhere in a static member.

    Thanks.

    Monday, March 7, 2011 11:22 PM

Answers

  • Hi,

    If you're using one of the operators that I mentioned before, then after you call the service for the first time you can simply store a reference to the observable and subscribe to it at any time.  It will only ever call the service once and will return the same result for every subscriber, regardless of whether the service call has completed yet or not.  That's the work of AsyncSubject<T> internally.

    For example:

    class Service
    {
    	public IObservable<Result> GetResult(object data)
    	{
    		var service = new ServiceProxy();
    
    		var start = Observable.FromAsyncPattern<object>(service.BeginGetResult, service.EndGetResult);
    
    		return start(data);
    	}
    }
    

    If you can't call the service for "the first time" due to multi-threading issues, then consider using lazy initialization and a singleton.

    For example, you can encapsulate this logic in a request class and then create a single instance to use in your main application.

    class Request
    {
    	private readonly Lazy<IObservable<Result>> result;
    
    	public Request(Service service, object data)
    	{
    		result = new Lazy<IObservable<Result>>(
    			() => service.GetResult(data), 
    			LazyThreadSafetyMode.ExecutionAndPublication);
    	}
    
    	public IObservable<Result> GetResult()
    	{
    		return result.Value;
    	}
    }
    

    No matter when Request.GetResult is called, it guarantees that only the first call will actually begin the service request.  All subsequent calls will simply get the same observable.

    - Dave


    http://davesexton.com/blog
    • Marked as answer by clement_911 Tuesday, March 8, 2011 6:38 AM
    Tuesday, March 8, 2011 5:59 AM

All replies

  • Hi,

    This is a job for AsyncSubject<T>.  It caches the value from the last OnNext call and replays it to subsequent subscribers.

    If you use Observable.Start, .ToAsync, .FromAsyncPattern or .Prune, they already use AsyncSubject internally so you don't have to do anything yourself to get this behavior.

    For example:

    var start = Observable.FromAsyncPattern(service.BeginGetData, service.EndGetData);
    
    // start the request
    var observable = start();
    
    // listen for the response - we may already have it cached, or not.
    observable.Subscribe(result => Print(result));
    
    // listen for the response (without invoking the service call again)
    observable.Subscribe(result => Print(result));
    

    - Dave


    http://davesexton.com/blog
    • Proposed as answer by fixedpoint Tuesday, March 8, 2011 4:47 AM
    Tuesday, March 8, 2011 4:35 AM
  • Hello.

    Hmm I see.

    So would you put that observable instance in a static member of your service proxy class and then always returned that cached instance ?

    I'll give that a shot later today.

    Tuesday, March 8, 2011 5:11 AM
  • Hi,

    If you're using one of the operators that I mentioned before, then after you call the service for the first time you can simply store a reference to the observable and subscribe to it at any time.  It will only ever call the service once and will return the same result for every subscriber, regardless of whether the service call has completed yet or not.  That's the work of AsyncSubject<T> internally.

    For example:

    class Service
    {
    	public IObservable<Result> GetResult(object data)
    	{
    		var service = new ServiceProxy();
    
    		var start = Observable.FromAsyncPattern<object>(service.BeginGetResult, service.EndGetResult);
    
    		return start(data);
    	}
    }
    

    If you can't call the service for "the first time" due to multi-threading issues, then consider using lazy initialization and a singleton.

    For example, you can encapsulate this logic in a request class and then create a single instance to use in your main application.

    class Request
    {
    	private readonly Lazy<IObservable<Result>> result;
    
    	public Request(Service service, object data)
    	{
    		result = new Lazy<IObservable<Result>>(
    			() => service.GetResult(data), 
    			LazyThreadSafetyMode.ExecutionAndPublication);
    	}
    
    	public IObservable<Result> GetResult()
    	{
    		return result.Value;
    	}
    }
    

    No matter when Request.GetResult is called, it guarantees that only the first call will actually begin the service request.  All subsequent calls will simply get the same observable.

    - Dave


    http://davesexton.com/blog
    • Marked as answer by clement_911 Tuesday, March 8, 2011 6:38 AM
    Tuesday, March 8, 2011 5:59 AM
  • This is perfect Dave thanks a lot.

    This is a also very awesome and I'm surprised this stuff in the on RX samples 101.

    • Proposed as answer by phillypeejay Saturday, March 19, 2011 9:01 PM
    • Unproposed as answer by phillypeejay Saturday, March 19, 2011 9:01 PM
    Tuesday, March 8, 2011 6:39 AM
  • Dave's answer if a very nice one. The other thing you could do is "Publish and RefCount" the observable. Both this and Dave's version require a member variable in the (singleton) request class. If you only want one Value from the IObservale then AsyncSubject can be easily substituted conceptually with Task<T>. If you want to return a stream of values from your IObservable (eg from a Comet server, WebSockets, Streaming servers etc.) then AsyncSubject is not what you want as it will only return the last value.

    class Request
    {
    	private readonly IObservable<Result> result;
    
    	public Request(Service service, object data)
    	{
    		result = service.GetResult(data)
    		.Publish()
    		.RefCount(); 
    	}
    
    	public IObservable<Result> GetResult()
    	{
    		return result;
    	}
    }
    
    

    Check out what hot and cold observables are in this post (http://leecampbell.blogspot.com/2010/08/rx-part-7-hot-and-cold-observables.html). I will need to double check this is all still valid for the latest release of Rx :)


    Lee Campbell http://LeeCampbell.blogspot.com
    Wednesday, March 9, 2011 9:54 AM
  • That's interesting although I'm not sure I fully understand.

    What's the advantage of using refcount here ? Is this for lazy loading ? What happens if there are no more subscribers for a period of time ?

    Your Task suggestion is also interesting, do you have a sample of how RX and Task play together ?

    Cheers.

    Wednesday, March 9, 2011 10:27 PM
  • So the RefCount will ensure that the subscription is disposed once there are no more subscriptions to the shared observable. If you dont need this then you can just call Publish and then connect it. This means it will subscribe to the underlying and remain subscribed regardless if there are any subscription or not.

    I have not got any examples at the moment for Task+Rx but am working on some :)


    Lee Campbell http://LeeCampbell.blogspot.com
    Monday, March 14, 2011 10:16 AM
  • Can you please describe what is meant by "If you're using one of the operators that I mentioned before"?  And where does
     new ServiceProxy() 

    comes from?
    Sunday, February 24, 2019 11:25 PM