create an observable over a consuming enumerable. RRS feed

  • Question

  • Hi,

    I am trying to define an observable over a concurrent data structure.

    var buffer = new BlockingCollection<string>(); Array.ForEach(new Fixture().CreateMany<string>(2).ToArray(), _ => buffer.Add(_)); buffer.Count.Dump("count"); var source = from i in Application.DefineObservable(
    () => buffer.GetConsumingEnumerable().ToObservable(Scheduler.ThreadPool)) select i; var query = from e in source.ToPointStreamable(
    e => PointEvent.CreateInsert(DateTimeOffset.Now, e), AdvanceTimeSettings.StrictlyIncreasingStartTime) select e; var sink = Application.DefineObserver(() => Observer.Create<string>(Console.WriteLine)); Array.ForEach(new Fixture().CreateMany<string>(1).ToArray(), _ => buffer.Add(_)); query.Bind(sink).Run("x"); Array.ForEach(new Fixture().CreateMany<string>(4).ToArray(), _ => buffer.Add(_));

    // produces only two items.


    Only the initial two items ever appear in the stream.

    Is it at all possible to use any of the concurrent collections as observables to stream insight?

    • Edited by Jimmy Main Thursday, October 29, 2015 9:50 AM
    Thursday, October 29, 2015 9:49 AM