I am attempting to connect a buffer block to a consumer in Stream Insight.
Given this code sample.
var buffer = new BufferBlock<string>();
// 20 random items.
Array.ForEach(new Fixture().CreateMany<string>(20).ToArray(), _ => buffer.Post(_));
// which are indeed there...
Assert(buffer.Count == 20);
// create the source / sink and link.
var source = from i in Application.DefineObservable(() => buffer.AsObservable())
select (string)i;
var query = from e in source
.ToPointStreamable(
e => PointEvent.CreateInsert(DateTimeOffset.Now, e),
AdvanceTimeSettings.StrictlyIncreasingStartTime)
select e
var sink = Application.DefineObserver(() => Observer.Create<string>(Console.WriteLine));
// nothing happens...
query.Bind(sink).Run();
Console.WriteLine is never called.
I have tried it hosted in linqpad (using the streaminsight linqpad driver), and in my host application.
edit
Further to this, I have discovered that using a simple `Observable.Interval` works just fine. If I change the source to this:
var source = Application.DefineObservable(() => Observable
.Interval(TimeSpan.FromSeconds(1)))
.ToPointStreamable(
x => PointEvent.CreateInsert(
DateTimeOffset.Now,
Convert.ToString(x)),
AdvanceTimeSettings.IncreasingStartTime);
// this now works perfectly...
// and in parallel if executed more than once...
source.Bind(sink).Run();
Then it all works just fine, both in linqpad, and in the application.
There seems to be an inexplicable incompatiblity between BufferBlock<T>().AsObservable() and the
streaminsight engine?
Particularly as buffer.AsObservable().Subscribe(Console.WriteLine); writes out all 20 strings perfectly.
edit: 21/10/2015
The team should consider open sourcing, so we can debug these scenarios. I have no responses since posting this, and I do think it is a reasonable question.
edit: 27/10/2015
It would really be helpful if this could be acknowledged as a bug or incorrect approach to solving my problem.