locked
create an observable over a consuming enumerable. RRS feed

  • Question

  • Hi,

    I am trying to define an observable over a concurrent data structure.

    var buffer = new BlockingCollection<string>(); Array.ForEach(new Fixture().CreateMany<string>(2).ToArray(), _ => buffer.Add(_)); buffer.Count.Dump("count"); var source = from i in Application.DefineObservable(
    () => buffer.GetConsumingEnumerable().ToObservable(Scheduler.ThreadPool)) select i; var query = from e in source.ToPointStreamable(
    e => PointEvent.CreateInsert(DateTimeOffset.Now, e), AdvanceTimeSettings.StrictlyIncreasingStartTime) select e; var sink = Application.DefineObserver(() => Observer.Create<string>(Console.WriteLine)); Array.ForEach(new Fixture().CreateMany<string>(1).ToArray(), _ => buffer.Add(_)); query.Bind(sink).Run("x"); Array.ForEach(new Fixture().CreateMany<string>(4).ToArray(), _ => buffer.Add(_));

    // produces only two items.

    Console.ReadLine();

    Only the initial two items ever appear in the stream.

    Is it at all possible to use any of the concurrent collections as observables to stream insight?


    • Edited by Jimmy Main Thursday, October 29, 2015 9:50 AM
    Thursday, October 29, 2015 9:49 AM