Asked by:
How to manage a dynamic collections of signals using Rx ?

Question
-
Hi,
What would a combinator that manages a collections of observables look like in Rx ?
Requirements:
- Starts with a initial set of observables
- When a signal arrives to add an observable, the new observable becomes part of the merged output
- When a signal arrives to remove an observable, the old observable gets removed from the merged output.
Is that what a Yampa Dynamic collection is ? And how would you implement it in Rx C# ?
Cheers
Sunday, May 22, 2011 3:27 PM
All replies
-
Hi,
It's not clear to me what your expectations are based on your description of the problem. Could you provide a more detailed spec and a method signature?
How close is the following to what you had in mind?
(Note that this code has not been tested)
public static class ObservableEx { public static IObservable<TSource> Merge<TSource>( IObservable<IObservable<TSource>> add, IObservable<IObservable<TSource>> remove) { return Observable.Create<TSource>( observer => { object gate = new object(); bool addCompleted = false; var subscriptions = new Dictionary<IObservable<TSource>, IDisposable>(); Action<IObservable<TSource>> tryRemove = observable => { bool completeNow; lock (gate) { IDisposable s = null; if (subscriptions.ContainsKey(observable)) { s = subscriptions[observable]; s.Dispose(); subscriptions.Remove(observable); } completeNow = s != null && addCompleted && subscriptions.Count == 0; } if (completeNow) observer.OnCompleted(); }; var addSubscription = add.Subscribe( observable => { lock (gate) { subscriptions.Add(observable, observable.Subscribe( observer.OnNext, observer.OnError, () => tryRemove(observable))); } }, observer.OnError, () => { bool completeNow; lock (gate) { addCompleted = true; completeNow = subscriptions.Count == 0; } if (completeNow) observer.OnCompleted(); }); var removeSubscription = remove.Subscribe(tryRemove, observer.OnError); return new CompositeDisposable(addSubscription, removeSubscription, Disposable.Create(() => { lock (gate) { foreach (var pair in subscriptions) { pair.Value.Dispose(); } } })); }); } }
- Dave
http://davesexton.com/blogSunday, May 22, 2011 4:38 PM -
Hi,
Isn't this the same as above ? And is this what a Yampa dpSwitch is ?
public static class ObservableEx
{
public static IObservable<TSource> Merge<TSource>(IObservable<IObservable<TSource>> add, IObservable<IObservable<TSource>> remove)
{
return from x in add
from z in x.TakeUntil(remove.Where(x_ => x_ == x))
select z;
}
}
Monday, May 23, 2011 10:03 AM -
Hi,
Yes, it appears to have the same behavior and it's certainly succinct; although, you'll probably want to ensure that remove is hot using Publish.
I don't know what Yampa is though, so maybe somebody else will reply. Could you provide links to Yampa dbSwitch examples in other languages at least?
- Dave
http://davesexton.com/blogMonday, May 23, 2011 11:05 AM -
Hi,
A more general version of the above. (Just came up with it)
public static IObservable<TSource> Merge<TItem, TSource>(
IObservable<Tuple<TItem, IObservable<TSource>>> add,
IObservable<TItem> remove,
Func<TItem, TItem, bool> equalityComparer)
{
return add.SelectMany(p => p.Item2.TakeUntil(remove.Where(q => equalityComparer(p.Item1, q))));
}
The Yampa Arcade (Dynamic Collections paragraph 5.5)
http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.65.8016&rep=rep1&type=pdf
Monday, May 23, 2011 11:29 AM -
I've never heard of this Yampa... just reading this pdf you linked to.
Why dpSwitch & not pSwitch? how is this "delayed"?
James Miles http://enumeratethis.comMonday, May 23, 2011 12:43 PM -
Could you just use an IObservable<IObservable<T>>?
The addition of an IObservable is when the outer OnNexts an inner. The removal of the inner is when it completes.
var s1 = new Subject<int>(); var s2 = new Subject<int>(); var s3 = new Subject<int>(); var streams = new Subject<IObservable<int>>(); streams.Merge().Subscribe(i=>Console.WriteLine(i)); streams.OnNext(s1); streams.OnNext(s2); s1.OnNext(1); s2.OnNext(2); s1.OnNext(1); s2.OnNext(2); s3.OnNext(3); //Ignored as no-one is subscribed to it. streams.OnNext(s3); s3.OnNext(3); s2.OnCompleted(); //Indicate that the s2 stream should be removed? s1.OnNext(1); //The outer still publishes values from the remaining inner streams s3.OnNext(3);
HTH
Lee Campbell http://LeeCampbell.blogspot.comTuesday, June 21, 2011 11:18 AM -
Hi Lee
But, Observable needs to be added/removed on some action (say stop button or something). The original observable might still be running.
Cheer
Amit
Sunday, February 11, 2018 5:11 PM