Hi,
I am trying to define an observable over a concurrent data structure.
var buffer = new BlockingCollection<string>();
Array.ForEach(new Fixture().CreateMany<string>(2).ToArray(), _ => buffer.Add(_));
buffer.Count.Dump("count");
var source = from i in Application.DefineObservable(
() => buffer.GetConsumingEnumerable().ToObservable(Scheduler.ThreadPool))
select i;
var query = from e in source.ToPointStreamable(
e => PointEvent.CreateInsert(DateTimeOffset.Now, e), AdvanceTimeSettings.StrictlyIncreasingStartTime)
select e;
var sink = Application.DefineObserver(() => Observer.Create<string>(Console.WriteLine));
Array.ForEach(new Fixture().CreateMany<string>(1).ToArray(), _ => buffer.Add(_));
query.Bind(sink).Run("x");
Array.ForEach(new Fixture().CreateMany<string>(4).ToArray(), _ => buffer.Add(_));
// produces only two items.
Console.ReadLine();
Only the initial two items ever appear in the stream.
Is it at all possible to use any of the concurrent collections as observables to stream insight?