none
How to manage a dynamic collections of signals using Rx ?

    Question

  • Hi,

    What would a combinator that manages a collections of observables look like in Rx ?

    Requirements:

    - Starts with a initial set of observables

    - When a signal arrives to add an observable, the new observable becomes part of the merged output

    - When a signal arrives to remove an observable, the old observable gets removed from the merged output.

    Is that what a Yampa Dynamic collection is ? And how would you implement it in Rx C# ?

    Cheers

     

    Sunday, May 22, 2011 3:27 PM

All replies

  • Hi, 

    It's not clear to me what your expectations are based on your description of the problem.  Could you provide a more detailed spec and a method signature?

    How close is the following to what you had in mind?

    (Note that this code has not been tested)

    public static class ObservableEx
    {
    	public static IObservable<TSource> Merge<TSource>(
    		IObservable<IObservable<TSource>> add,
    		IObservable<IObservable<TSource>> remove)
    	{
    		return Observable.Create<TSource>(
    			observer =>
    			{
    				object gate = new object();
    				bool addCompleted = false;
    
    				var subscriptions = new Dictionary<IObservable<TSource>, IDisposable>();
    
    				Action<IObservable<TSource>> tryRemove = observable =>
    					{
    						bool completeNow;
    
    						lock (gate)
    						{
    							IDisposable s = null;
    
    							if (subscriptions.ContainsKey(observable))
    							{
    								s = subscriptions[observable];
    								s.Dispose();
    
    								subscriptions.Remove(observable);
    							}
    
    							completeNow = s != null
    								&& addCompleted
    								&& subscriptions.Count == 0;
    						}
    
    						if (completeNow)
    							observer.OnCompleted();
    					};
    
    				var addSubscription = add.Subscribe(
    					observable =>
    					{
    						lock (gate)
    						{
    							subscriptions.Add(observable, observable.Subscribe(
    								observer.OnNext,
    								observer.OnError,
    								() => tryRemove(observable)));
    						}
    					},
    					observer.OnError,
    					() =>
    					{
    						bool completeNow;
    
    						lock (gate)
    						{
    							addCompleted = true;
    							completeNow = subscriptions.Count == 0;
    						}
    
    						if (completeNow)
    							observer.OnCompleted();
    					});
    
    				var removeSubscription = remove.Subscribe(tryRemove, observer.OnError);
    
    				return new CompositeDisposable(addSubscription, removeSubscription, Disposable.Create(() =>
    					{
    						lock (gate)
    						{
    							foreach (var pair in subscriptions)
    							{
    								pair.Value.Dispose();
    							}
    						}
    					}));
    			});
    	}
    }
    

     

    - Dave


    http://davesexton.com/blog
    Sunday, May 22, 2011 4:38 PM
  • Hi,

     

    Isn't this the same as above ? And is this what a Yampa dpSwitch is ?

     

    public static class ObservableEx

        {

            public static IObservable<TSource> Merge<TSource>(IObservable<IObservable<TSource>> add, IObservable<IObservable<TSource>> remove)

            {

                return from x in add

                       from z in x.TakeUntil(remove.Where(x_ => x_ == x))

                       select z;

            }

        }

     

     

    Monday, May 23, 2011 10:03 AM
  • Hi,

    Yes, it appears to have the same behavior and it's certainly succinct; although, you'll probably want to ensure that remove is hot using Publish.

    I don't know what Yampa is though, so maybe somebody else will reply.  Could you provide links to Yampa dbSwitch examples in other languages at least?

    - Dave


    http://davesexton.com/blog
    Monday, May 23, 2011 11:05 AM
  • Hi,

    A more general version of the above. (Just came up with it)

    public static IObservable<TSource> Merge<TItem, TSource>(

                 IObservable<Tuple<TItem, IObservable<TSource>>> add,

                 IObservable<TItem> remove,

                 Func<TItem, TItem, bool> equalityComparer)

            {

                return add.SelectMany(p => p.Item2.TakeUntil(remove.Where(q => equalityComparer(p.Item1, q))));

            }

     

    The Yampa Arcade (Dynamic Collections paragraph 5.5)

    http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.65.8016&rep=rep1&type=pdf 

     

     

     

    Monday, May 23, 2011 11:29 AM
  • I've never heard of this Yampa... just reading this pdf you linked to.

    Why dpSwitch & not pSwitch? how is this "delayed"?


    James Miles http://enumeratethis.com
    Monday, May 23, 2011 12:43 PM
  • Could you just use an IObservable<IObservable<T>>?

    The addition of an IObservable is when the outer OnNexts an inner. The removal of the inner is when it completes.

    var s1 = new Subject<int>();
    var s2 = new Subject<int>();
    var s3 = new Subject<int>();
    
    var streams = new Subject<IObservable<int>>();
    
    streams.Merge().Subscribe(i=>Console.WriteLine(i));
    
    streams.OnNext(s1);
    streams.OnNext(s2);
    
    s1.OnNext(1);
    s2.OnNext(2);
    s1.OnNext(1);
    s2.OnNext(2);
    
    s3.OnNext(3);	//Ignored as no-one is subscribed to it.
    
    streams.OnNext(s3);
    
    s3.OnNext(3);
    s2.OnCompleted();	//Indicate that the s2 stream should be removed?
    
    s1.OnNext(1);		//The outer still publishes values from the remaining inner streams
    s3.OnNext(3);
    

    HTH


    Lee Campbell http://LeeCampbell.blogspot.com
    Tuesday, June 21, 2011 11:18 AM
  • Hi Lee

    But, Observable needs to be added/removed on some action (say stop button or something). The original observable might still be running.

    Cheer

    Amit

    Sunday, February 11, 2018 5:11 PM