none
Interval Polling from Yahoo Fianance

    Question

  • I have started building a StreamInsight 2.1 application to read data from Yahoo Finance.  The application returns my custom object currently on an interval for one ticker symbol requested.  The requestor hits the api url that returns xml.  The xml is parsed to update a custom object and return that object with the values set.  Currently I create an observable interval of a single symbol. 

    Here is the code:

    var inputStream = application.DefineObservable(() => Observable.Interval(TimeSpan.FromSeconds(5))).ToPointStreamable(x =>

         PointEvent.Create(DateTimeOffset.Now, GetSingleQuote("AAPL")),

         AdvanceTimeSettings.IncreasingStartTime);

    What I want to do is be able to request several symbols at once and have them reported into the stream on the interval much like above.  However, I have to return several PointEvent<customObject>, or customObjects at once because each symbol requested would fall under its own instance of customObject. 

    As a disclaimer, I am a very beginner level programmer when it comes to StreamInsight and also in general.  So I already understand I am in over my head.  But I wanted to ask for some expertise from anyone that might be able to help me come up with a solution.  So please play nice, I am a slow learner.

    Regards,

    Dan

    Monday, April 15, 2013 5:01 PM

Answers

  • OK ... here's what's happening.

    The publisher is disposed while you are in the middle of publishing events. You can try to re-order the Dispose() method on the base StreamEventProducer to:

            public void Dispose() {
                //Make sure Dispose is only handled once!
                if (!_disposed) {
                    Dispose(true);
                    _disposed = true;
                    GC.SuppressFinalize(this);
                }
            }
    That may help. It would be even better to acquire a lock before publishing your events. If you do that (by locking on the protected LockObject) ... and rearrange the Dispose method, that should resolve any issues ... you won't finish your Dispose() until you are done publishing.

    DevBiker (aka J Sawyer)
    Microsoft MVP - Sql Server (StreamInsight)


    Ruminations of J.net


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

    Thursday, April 25, 2013 6:14 PM

All replies

  • What you'll need to do is something like the following:

    Create an observable (see my blog for a ready-made framework to do this). This will actually poll the Yahoo Finance service to get your quotes.

    When you get each quote, publish it from your observable ... one at a time, but in a loop. Now, I don't know what your Yahoo Finance API looks like ... if you have to make a call for each ticker symbol or if you can make a single call and get multiple tickers back (better) but you can abstract those details in your event source.


    DevBiker (aka J Sawyer)
    Microsoft MVP - Sql Server (StreamInsight)


    Ruminations of J.net


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

    Tuesday, April 16, 2013 2:07 PM
  • Can you provide a link to your blog post? 

    The API makes a request to a url with multiple symbols and the request returns an xml document with a quote element for each symbol.  The quote element is the basis for my customObject.  I am trying to avoid using adapters and relying on the StreamInsight framework as much as possible.

    Thanks!

    Tuesday, April 16, 2013 2:39 PM
  • Here is the url that I hit to capture the xml document.

    http://query.yahooapis.com/v1/public/yql?q=select%20*%20from%20yahoo.finance.quotes%20where%20symbol%20in%20(%22AAPL%22%2C%22DELL%22%2C%22MSFT%22)&env=store%3A%2F%2Fdatatables.org%2Falltableswithkeys

    You can see that there are 3 different symbols being requested within the url.

    Tuesday, April 16, 2013 3:05 PM
  • The link to my blog is in my signature ... www.devbiker.net. My latest post has a framework to make both input and output easier - it's part of a series that I'm building - and to allow you to either use the 2.1 reactive model or the previous adapter model from the same set of code.

    DevBiker (aka J Sawyer)
    Microsoft MVP - Sql Server (StreamInsight)


    Ruminations of J.net


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

    Tuesday, April 16, 2013 3:35 PM
  • Yes ... so your sink would poll this URL, get all 3 updates, deserialize them and package them as your payload and then enqueue each one at a time.

    DevBiker (aka J Sawyer)
    Microsoft MVP - Sql Server (StreamInsight)


    Ruminations of J.net


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

    Tuesday, April 16, 2013 3:36 PM
  • I see you use an input adapter factory.  Isn't that only used in the method of using input adapters?  Or am I mistaken on how your code is implementing an Rx only based model?
    Wednesday, April 17, 2013 1:15 PM
  • If you follow the posts, it's a framework that allows use of *both* input adapters and RX with the same codebase. If you want to use RX only, just use the RX-based queries and extensions. Hence ... "Dual Mode".

    DevBiker (aka J Sawyer)
    Microsoft MVP - Sql Server (StreamInsight)


    Ruminations of J.net


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

    Wednesday, April 17, 2013 2:04 PM
  • I understand the premises of your post.  But just having a hard time grasping which part of your post refers only to the Rx-based model.  Sorry, just a slow learner and I really do appreciate all of your help. 

    If using the Rx-based model, adapter factories are not needed.  Is that a correct statement?  And if that is a correct statement, why would it be necessary to use adapter factories?


    • Edited by DataNerd Wednesday, April 17, 2013 3:15 PM
    Wednesday, April 17, 2013 2:17 PM
  • The framework that I've written for those blog entries does use a factory to create the sinks ... but it's not an AdapterFactory. Why? Well, for all the reasons that you would typically include a factory pattern in your architecture. The implementation does include both the traditional AdapterFactory and my ISourceFactory in the same class but that's really not necessary. Only ISourceFactory is necessary if you are only going to use the RX model.

    Reality is ... for anything beyond a quick-and-dirty sample, you'd want to have a factory for your sinks. And you have the side benefit of being able to use either model from the same source code. Do you have to use a factory with StreamInsight 2.1? Nope, not at all. But, in any non-trivial application, you'd wind up with some kind of factory pattern sooner or later.


    DevBiker (aka J Sawyer)
    Microsoft MVP - Sql Server (StreamInsight)


    Ruminations of J.net


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

    Wednesday, April 17, 2013 5:22 PM
  • Ok I understand now.  The usage of the term "factory" was throwing me a little bit at first.  But now I understand its not necessarily an "Adapter Factory".  It is a factory architecture that is being implemented. 

    Thanks!  That explanation allowed everything to come full circle and allow me to understand your examples as a whole.

    This has helped a bunch!  Thanks again.  I may have a few more questions as I get a deeper understanding.  But I am about (90% there in having a fully functioning application now.

    Wednesday, April 17, 2013 5:35 PM
  • I have uploaded the code to my GitHub account.  https://github.com/AquaNerd/StreamInsightYahoo.git.  Take a look at your own risk.  ;)
    • Edited by DataNerd Friday, April 19, 2013 3:32 PM
    Friday, April 19, 2013 3:30 PM
  • There is one issue I am having now with my code.  I have switched everything to poll the quote table at yahoo finance and retrieve data using YQL API.  However intermittently, it throws the ObjectDisposedException in the StreamEventProducer class.  Not sure what is causing it, but a second set of eyes might help find the issue. 

    It does this sometimes on startup as soon as some quotes are received.  And it also sometimes does it on shutdown.

    Thanks for your help!


    • Edited by DataNerd Friday, April 19, 2013 7:25 PM including more detail
    Friday, April 19, 2013 7:16 PM
  • I've downloaded your app and run it ... didn't get an ObjectDisposedException (but also didn't run it for long).

    You mentioned that it's in the StreamEventProducer class ... do you have a full stack trace?


    DevBiker (aka J Sawyer)
    Microsoft MVP - Sql Server (StreamInsight)


    Ruminations of J.net


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

    Tuesday, April 23, 2013 12:45 PM
  •    at SiDualMode.Base.StreamEventProducer`2.CheckDisposed() in d:\GIT Local Repositories\StreamInsightYahoo\SiDualMode\Base\StreamEventProducer.cs:line 79
       at SiDualMode.Base.StreamEventProducer`2.PublishEvent(StreamInputEvent`1 newEvent) in d:\GIT Local Repositories\StreamInsightYahoo\SiDualMode\Base\StreamEventProducer.cs:line 51
       at SiDualMode.InputAdapter.YahooFinanceAdapter.YahooDataProducer.ProduceEvents(Object state) in d:\GIT Local Repositories\StreamInsightYahoo\SiDualMode\InputAdapter\YahooFinanceAdapter\YahooDataProducer.cs:line 41
       at System.Threading.TimerQueueTimer.CallCallbackInContext(Object state)
       at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
       at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
       at System.Threading.TimerQueueTimer.CallCallback()
       at System.Threading.TimerQueueTimer.Fire()
       at System.Threading.TimerQueue.FireNextTimers()
       at System.Threading.TimerQueue.AppDomainTimerCallback()
    Tuesday, April 23, 2013 1:32 PM
  • here is the exception detail as well.

    System.ObjectDisposedException was unhandled
      HResult=-2146232798
      Message=Cannot access a disposed object.
    Object name: 'Object has been disposed'.
      Source=SiDualMode
      ObjectName=Object has been disposed
      StackTrace:
           at SiDualMode.Base.StreamEventProducer`2.CheckDisposed() in d:\GIT Local Repositories\StreamInsightYahoo\SiDualMode\Base\StreamEventProducer.cs:line 79
           at SiDualMode.Base.StreamEventProducer`2.PublishEvent(StreamInputEvent`1 newEvent) in d:\GIT Local Repositories\StreamInsightYahoo\SiDualMode\Base\StreamEventProducer.cs:line 51
           at SiDualMode.InputAdapter.YahooFinanceAdapter.YahooDataProducer.ProduceEvents(Object state) in d:\GIT Local Repositories\StreamInsightYahoo\SiDualMode\InputAdapter\YahooFinanceAdapter\YahooDataProducer.cs:line 41
           at System.Threading.TimerQueueTimer.CallCallbackInContext(Object state)
           at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
           at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
           at System.Threading.TimerQueueTimer.CallCallback()
           at System.Threading.TimerQueueTimer.Fire()
           at System.Threading.TimerQueue.FireNextTimers()
           at System.Threading.TimerQueue.AppDomainTimerCallback()
      InnerException:

    Tuesday, April 23, 2013 1:33 PM
  • OK ... here's what's happening.

    The publisher is disposed while you are in the middle of publishing events. You can try to re-order the Dispose() method on the base StreamEventProducer to:

            public void Dispose() {
                //Make sure Dispose is only handled once!
                if (!_disposed) {
                    Dispose(true);
                    _disposed = true;
                    GC.SuppressFinalize(this);
                }
            }
    That may help. It would be even better to acquire a lock before publishing your events. If you do that (by locking on the protected LockObject) ... and rearrange the Dispose method, that should resolve any issues ... you won't finish your Dispose() until you are done publishing.

    DevBiker (aka J Sawyer)
    Microsoft MVP - Sql Server (StreamInsight)


    Ruminations of J.net


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

    Thursday, April 25, 2013 6:14 PM
  • That appears to work really well.  Thanks for taking a look at it!
    Friday, April 26, 2013 1:51 PM