locked
Using a buffer block with streaminsight RRS feed

  • Question

  • I am attempting to connect a buffer block to a consumer in Stream Insight.  
      Given this code sample.

    var buffer = new BufferBlock<string>();    // 20 random items.    Array.ForEach(new Fixture().CreateMany<string>(20).ToArray(), _ => buffer.Post(_));     // which are indeed there...    Assert(buffer.Count == 20);    // create the source / sink and link.        var source = from i in Application.DefineObservable(() => buffer.AsObservable())    select (string)i; var query = from e in source

    .ToPointStreamable(

    e => PointEvent.CreateInsert(DateTimeOffset.Now, e),

    AdvanceTimeSettings.StrictlyIncreasingStartTime)

    select e             

    var sink = Application.DefineObserver(() => Observer.Create<string>(Console.WriteLine));        // nothing happens...    query.Bind(sink).Run();

      


    Console.WriteLine is never called.

    I have tried it hosted in linqpad (using the streaminsight linqpad driver), and in my host application.

    edit

    Further to this, I have discovered that using a simple `Observable.Interval` works just fine. If I change the source to this:

    var source = Application.DefineObservable(() => Observable
       .Interval(TimeSpan.FromSeconds(1)))
       .ToPointStreamable(
          x => PointEvent.CreateInsert(
             DateTimeOffset.Now,
             Convert.ToString(x)),
        AdvanceTimeSettings.IncreasingStartTime);
    
    // this now works perfectly... 
    // and in parallel if executed more than once...
    source.Bind(sink).Run();

    Then it all works just fine, both in linqpad, and in the application.

    There seems to be an inexplicable incompatiblity between BufferBlock<T>().AsObservable() and the streaminsight engine?

    Particularly as buffer.AsObservable().Subscribe(Console.WriteLine); writes out all 20 strings perfectly.

    edit: 21/10/2015

    The team should consider open sourcing, so we can debug these scenarios. I have no responses since posting this, and I do think it is a reasonable question.

    edit: 27/10/2015

    It would really be helpful if this could be acknowledged as a bug or incorrect approach to solving my problem.


    • Edited by Jimmy Main Thursday, October 29, 2015 9:00 AM
    Monday, October 12, 2015 6:29 AM