locked
Is there anyway to insert an item into an observable sequence RRS feed

  • Question

  • I have an observable sequence being generated and I want to be able to insert an item so it is the next item pushed to the subscribers and then the rest continues.

     

    I guess this is a probably possible just can't see the wood for the trees...


    Trying to remember what I learned yesterday
    Thursday, July 28, 2011 8:19 PM

All replies

  • You could create a subject, merge it with the observable and then push value to the subject using onNext.

    Here's an example:

    var s = new Subject<int>();
    
    var obs = Observable.Range(0, 100, Scheduler.TaskPool).Do(_ => Thread.Sleep(100));
    
    var newObs = obs.Merge(s.ObserveOn(Scheduler.TaskPool));
    
    newObs.Subscribe(Console.WriteLine);
    
    Thread.Sleep(400);
    
    s.OnNext(12);
    
    Thread.Sleep(20000);


    Thursday, July 28, 2011 8:58 PM
  • Hi Ollie,

    You could use Expand for this, although it's only available in Rx Experimental at the moment.

    For example, if you wanted to expand a sequence by index you could do the following.  Note that if you need to expand by data and not index, then simply remove the Tuple projection.

     

    var o = GetObservable();
    
    var indexed = o.Select((value, index) => Tuple.Create(index, value));
    
    var inserted = indexed.Expand(item => 
    {
    	if (item.Item1 == 10)
    	{
    		return Observable.Range(1, 100).Select(value => Tuple.Create(10, value));
    	}
    	else
    	{
    		return Observable.Empty<Tuple<int, int>>();
    	}
    })
    .Select(item => item.Item2);
    

     

    - Dave


    http://davesexton.com/blog
    Friday, July 29, 2011 3:57 AM