none
Polling a website with Rx

    Question

  • just trying to get my head around Rx

    I am using Rx to poll a website every 2 seconds

    var results = new List<MyDTO>();
    var cx = new WebserviceAPI( ... );
    var callback = cx.GetDataAsync().Subscribe(rs => { results.AddRange(rs); });
    var poller = Observable.Interval(TimeSpan.FromSeconds(2)).Subscribe( _ => { cx.StartGetDataAsync(); });

    (The webservice API exposes a getItemsAsync/getItemsCompleted event handler type mechanism from which I am creating an observable).

    When the web site returns, I am unpacking the "business part of" the response into an IEnumerable of DTOs

    public IObservable<IEnumerable<MyDTO>> GetDataAsync()
    {
       
    var o = Observable.FromEventPattern<getItemsCompletedEventHandler,getItemsCompletedEventArgs>(
            h
    => _webService.getItemsCompleted += h,
            h
    => _webService.getItemsCompleted -= h);

       
    return o.Select(c=> from itm in c.EventArgs.Result.ItemList
                           
    select new MyDTO()
                           
    {
                               
    ...
                           
    });
    }

    My reasoning being that given that all the data was just there in the string, it made sense just to pack it up there an then into an IEnumerable ... but now I'm not sure if that is right!

    If the website takes longer than 2 secs to respond I am finding that MSTest is crashing out. When debugging, the error being generated is

    "There was an error during asynchronous processing. Unique state object is required for multiple asynchronous simultaneous operations to be outstanding"

    with the inner exception

    "Item has already been added. Key in dictionary: 'System.Object' Key being added: 'System.Object'"

    I am supposing that the problem is one of reentrancy in that the next call is starting and returning data before the previous call has finished populating the data.

    So I'm not sure whether

    1. I have put the thing together quite right
    2. I should be throttling the connection in some way so as to avoid re-entrancy.
    3. I should use a different intermediate data structure (or mechanism) instead of an IEnumerable

    I would appreciate some guidance.

    EDIT 1: So I have changed the web call to include a unique state object

    public void StartGetDataAsync()
    {
       
    ...
       
    //  was: _webService.getItemsAsync(request);
        _webService
    .getItemsAsync(request, Guid.NewGuid());
    }

    and made it work. But I am still unsure if that is the right way to do it

    Wednesday, May 23, 2012 9:51 AM

Answers

  • Hi Simon,

    I don't have a lot of experience with making numerous async web calls But I am sure i can help.

    First I think we can clean up the code a bit. I get the idea that you want to poll a webservice every 2 seconds. I will also make the assumption that you are happy to actually wait for the request i.e. if the request takes 3 seconds, you are then happy to wait the 3seconds and then wait 2 more seconds to fire off your next request. If however any request takes too long (10 seconds?) then maybe retry or notify the consumer that there is a problem.

    I would also suggest that if you can control the WebServiceAPI that you do away with the events and StartGetDataAsync() method. You can make the whole thing Rx and this should reduce your code a lot, and hopefully make it pretty simple. 

    Based on those assumptions I would write the code like this:

    Assume you rewrite the GetData method to be encapsulated

    public IObservable<IEnumerable<MyDTO>> GetData()
    {
        return Observable.Create<IEnumerable<MyDTO>>(observer =>
        {
            var items = Observable.FromEventPattern<getItemsCompletedEventHandler, getItemsCompletedEventArgs>(
                h => _webService.getItemsCompleted += h,
                h => _webService.getItemsCompleted -= h);
    
            var subscription = items.Select(c => c.EventArgs.Result.ItemList.Select(i => new MyDTO(...)))
                                    .Subscribe(observer);
            cx.StartGetDataAsync();
    
            return subscription;
        });
    }

    Now if we call the code, we also want to recall it 2 seconds after the response comes back. If we fire another request every 2seconds, eventually we will overlap our requests that we are sending. That is bad.

    webRequest.GetData()
       .Concat(Observable.Empty<IEnumerable<MyDTO>>().Delay(TimeSpan.FromSeconds(2)))
       .Repeat()

    Now this will call GetData(), push the values to the subscriber, wait for the result sequence to complete, and then concat the empty sequence that is delayed 2 seconds. Once that delay has completed, we repeat, sending off the request again, waiting for the response and then holding off for another 2seconds before repeating again.

    The fun part comes when you want to put Timeouts in there too :)

    But thanks to Rx this is not so hard. We pick a time out we are happy with and then just say .TimeOut(). If we want to retry anyway when a timeout happens we just add a retry. Easy.

    webRequest.GetData()
        .Concat(Observable.Empty<IEnumerable<MyDTO>>().Delay(TimeSpan.FromSeconds(2)))
        .Repeat()
        .Timeout(TimeSpan.FromSeconds(5))
        .Retry()

    HTH

    Lee


    Lee Campbell http://LeeCampbell.blogspot.com



    • Edited by LeeCampbell Wednesday, May 23, 2012 5:26 PM
    • Proposed as answer by LeeCampbell Wednesday, May 23, 2012 5:26 PM
    • Marked as answer by Simon Woods Thursday, May 24, 2012 9:04 AM
    Wednesday, May 23, 2012 5:18 PM

All replies

  • Hi Simon,

    I don't have a lot of experience with making numerous async web calls But I am sure i can help.

    First I think we can clean up the code a bit. I get the idea that you want to poll a webservice every 2 seconds. I will also make the assumption that you are happy to actually wait for the request i.e. if the request takes 3 seconds, you are then happy to wait the 3seconds and then wait 2 more seconds to fire off your next request. If however any request takes too long (10 seconds?) then maybe retry or notify the consumer that there is a problem.

    I would also suggest that if you can control the WebServiceAPI that you do away with the events and StartGetDataAsync() method. You can make the whole thing Rx and this should reduce your code a lot, and hopefully make it pretty simple. 

    Based on those assumptions I would write the code like this:

    Assume you rewrite the GetData method to be encapsulated

    public IObservable<IEnumerable<MyDTO>> GetData()
    {
        return Observable.Create<IEnumerable<MyDTO>>(observer =>
        {
            var items = Observable.FromEventPattern<getItemsCompletedEventHandler, getItemsCompletedEventArgs>(
                h => _webService.getItemsCompleted += h,
                h => _webService.getItemsCompleted -= h);
    
            var subscription = items.Select(c => c.EventArgs.Result.ItemList.Select(i => new MyDTO(...)))
                                    .Subscribe(observer);
            cx.StartGetDataAsync();
    
            return subscription;
        });
    }

    Now if we call the code, we also want to recall it 2 seconds after the response comes back. If we fire another request every 2seconds, eventually we will overlap our requests that we are sending. That is bad.

    webRequest.GetData()
       .Concat(Observable.Empty<IEnumerable<MyDTO>>().Delay(TimeSpan.FromSeconds(2)))
       .Repeat()

    Now this will call GetData(), push the values to the subscriber, wait for the result sequence to complete, and then concat the empty sequence that is delayed 2 seconds. Once that delay has completed, we repeat, sending off the request again, waiting for the response and then holding off for another 2seconds before repeating again.

    The fun part comes when you want to put Timeouts in there too :)

    But thanks to Rx this is not so hard. We pick a time out we are happy with and then just say .TimeOut(). If we want to retry anyway when a timeout happens we just add a retry. Easy.

    webRequest.GetData()
        .Concat(Observable.Empty<IEnumerable<MyDTO>>().Delay(TimeSpan.FromSeconds(2)))
        .Repeat()
        .Timeout(TimeSpan.FromSeconds(5))
        .Retry()

    HTH

    Lee


    Lee Campbell http://LeeCampbell.blogspot.com



    • Edited by LeeCampbell Wednesday, May 23, 2012 5:26 PM
    • Proposed as answer by LeeCampbell Wednesday, May 23, 2012 5:26 PM
    • Marked as answer by Simon Woods Thursday, May 24, 2012 9:04 AM
    Wednesday, May 23, 2012 5:18 PM
  • Lee

    That is excellent!

    I really need to digest it some more, as it is obvious when comparing what you are suggesting to my original that you have managed to shift it to be much more functional and less imperative. Do you have any advice about the way you think about the problem and to help make the paradigm shift?! I'm obviously still thinking in terms of ... get a timer pulsing then make the call then process the results then etc etc .... I can sort of see hints of the way to loop with functional programming (ie recursion ... do the simple case and build up from there) in what you are proposing.

    Many many thx again.

    S

    Wednesday, May 23, 2012 6:02 PM
  • I think you initial code was ok-ish for the requirements you had given yourself. The code I proposed is actually for different requirements, but requirements I think you actually want :)

    If you can think of your problem domain as a series of values i.e. a sequence, and then think about how you can compose that. It seems you first went straight to the timer, but your requirement was actually more based around GetData(). Knowing the operators that help combine sequences and the operators that help with time based operations helps too. My way of solving the problem is too map the problem to a marble diagram, see what it looks like and then (due to a familiarity with the methods) apply the methods to the code that would give me the effect of the marble diagram. eg

    G=GetData sequence

    D=Delay sequence

    G --0|
    D    ----|
    G        -----0|
    D              ----|
    G                  ---0|
    D                      ----|
    

    Note how I have intentionally made the GetData sequences variable length, becuase we dont know how long the network call will take. From looking at this I see a pattern of GetData, wait for a constant period, repeat. So I start a sequence with a call to GetData() and then Concat an Observable.Timer(2.Seconds()). But this doesn't work because Timer returns an IObservable<long> with is not the same as GetData(). So I apply a select operator to ignore the value of the timer, but this code turns in to a mess. What I really want is an Empty Sequence that completes in 2seconds. It seems far less code to just concat a deplayed Observable.Empty. That is the hard part, I then just have to call Repeat now. The Timeout/Retry stuff is easy from here.

    HTH

    Lee

    P.S mark the question as answered if you are happy, so others dont have to worry about this post.


    Lee Campbell http://LeeCampbell.blogspot.com

    Thursday, May 24, 2012 7:36 AM
  • Thx again, Lee. I've marked your initial response as The answer.

    Could you explain a bit more (or perhaps in slightly different terms as I don't think I am quite getting it!) why you concat an empty observable with a delay rather than just repeating  the original observable with a delay ... 

    eg

    webRequest.GetData()
       .Delay(TimeSpan.FromSeconds(2))
       .Repeat()

    Is there something else going on which I am missing? Is it because the delay doesn't necessarily start when the GetData request has finished and so using concat means that both streams have to have completed before the delay can take place ... something like that?

    Thx again.

    S


    • Edited by Simon Woods Thursday, May 24, 2012 9:17 AM More details
    Thursday, May 24, 2012 9:13 AM
  • That is a fair question. And to be honest that is something I did initially, luckily I put some debugging/logging around my little mockup, as it reminded me that Delay does not defer the execution of the subscription, it just delays giving you the values.

    So if we did what you suggest, the first webrequest would be sent immediately, the result comes back and then Rx just holds the value for 2seconds before giving it to us. This is not really what we want. We want to data without delay, we just want to then hold off before sending our next request. Again, when that data comes back we want to process it immediately.

    Does that make sense?

    If you delay a subscription you will always have stale data (by the amount of the delay). With an empty sequence this is not really interesting and just acts as a buffer between repeat calls. If we delay the GetData() sequence, we just end up with 2sec old data :( I assume that as you are polling the service that you want fairly timely data, which you wouldn't get.

    Lee


    Lee Campbell http://LeeCampbell.blogspot.com

    Thursday, May 24, 2012 10:44 AM
  • OK. That is clear now.

    Once again, thank you, Lee

    S

    Thursday, May 24, 2012 12:52 PM