none
How to combine n observables dynamically with CombineLatest semantics? RRS feed

  • Question

  • Hi,

    I have a collection of observables that generate state changes for so-called channels. And I have a channel set that should monitor those channels. Suppose I have three channels. The semantics I'd like to get is: if any channel is up, the channel set is up, if all channels are down, the channel set is down.

    Each channel generates an event when its state changes, so I have three Observable<ChannelState> instances. I want to combine these using CombineLatest semantics. So for example, when any channel changes state, I'd like to see an IEnumerable<ChannelState> with the three latest channel states. I can then write code like:

    if (channelStates.Any(cs => cs.ChannelState == ChannelState.Up))
      _channelSet.ChannelSetState = ChannelSetState.Up;
    else{
      _channelSet.ChannelSetState = ChannelSetState.Down;
    

    The tricky part is that channels can be added and removed at runtime. So I could start out with two channels and one minute later a third channel is added. And another minute later the first channel is removed. I suppose something like this should be possible, I just can't figure it out yet.

    So I'm looking for a function that accepts a (mutable) list of Observable<ChannelState> instances and returns an IObservable<IEnumerable<ChannelState>>.

     

    Kind regards,

    Ronald Wildenberg

    Monday, July 25, 2011 2:31 PM

Answers

  • Hi Ronald,

    Here's one way to perform an N-ary CombineLatest.  It's designed with considerations for performance, memory consumption (written in terms of IEnumerable, without requiring an initial buffer for a total count) and to avoid stack overflows.  See this related discussion:

    http://social.msdn.microsoft.com/Forums/en-ca/rx/thread/daaa84db-b560-4eda-871e-e523098db20c

    public static partial class ObservableEx
    {
    	public static IObservable<IList<TSource>> CombineLatest<TSource>(this IEnumerable<IObservable<TSource>> sources)
    	{
    		return Observable.Create<IList<TSource>>(
    			observer =>
    			{
    				object gate = new object();
    				var disposables = new CompositeDisposable();
    				var list = new List<TSource>();
    				var hasValueFlags = new List<bool>();
    				var actionSubscriptions = 0;
    				bool hasSources, hasValueFromEach = false;
    
    				using (var e = sources.GetEnumerator())
    				{
    					bool subscribing = hasSources = e.MoveNext();
    
    					while (subscribing)
    					{
    						var source = e.Current;
    						int index;
    
    						lock (gate)
    						{
    							actionSubscriptions++;
    
    							list.Add(default(TSource));
    							hasValueFlags.Add(false);
    
    							index = list.Count - 1;
    
    							subscribing = e.MoveNext();
    						}
    
    						disposables.Add(
    							source.Subscribe(
    								value =>
    								{
    									IList<TSource> snapshot;
    
    									lock (gate)
    									{
    										list[index] = value;
    
    										if (!hasValueFromEach)
    										{
    											hasValueFlags[index] = true;
    
    											if (!subscribing)
    											{
    												hasValueFromEach = hasValueFlags.All(b => b);
    											}
    										}
    
    										if (subscribing || !hasValueFromEach)
    										{
    											snapshot = null;
    										}
    										else
    										{
    											snapshot = list.ToList().AsReadOnly();
    										}
    									}
    
    									if (snapshot != null)
    									{
    										observer.OnNext(snapshot);
    									}
    								},
    								observer.OnError,
    								() =>
    								{
    									bool completeNow;
    
    									lock (gate)
    									{
    										actionSubscriptions--;
    
    										completeNow = actionSubscriptions == 0 && !subscribing;
    									}
    
    									if (completeNow)
    									{
    										observer.OnCompleted();
    									}
    								}));
    					}
    				}
    
    				if (!hasSources)
    				{
    					observer.OnCompleted();
    				}
    
    				return disposables;
    			});
    	}
    }
    

    Although, based on your description it seems like you need to react to a changing list of observables, which means that you'll need to add another layer on top of this combinator.

    We've proposed adding extensions such as these to Rxx.  Please let me know if the following works for you (see below for a usage example).

    public sealed class ObservableCollectionSubject<T> : ObservableCollection<T>, ISubject<NotifyCollectionChangedEventArgs, IList<T>>
    {
    	private readonly object gate = new object();
    	private readonly List<IObserver<IList<T>>> observers = new List<IObserver<IList<T>>>();
    	private bool suppressChangeEvents;
    	private volatile bool isStopped;
    	private Exception fault;
    
    	public IDisposable Subscribe(IObserver<IList<T>> observer)
    	{
    		IList<T> snapshot = null;
    
    		if (!isStopped)
    		{
    			lock (gate)
    			{
    				if (!isStopped)
    				{
    					observers.Add(observer);
    
    					snapshot = Snapshot();
    				}
    			}
    		}
    
    		if (snapshot == null)
    		{
    			if (fault != null)
    			{
    				observer.OnError(fault);
    			}
    			else
    			{
    				observer.OnCompleted();
    			}
    
    			return Disposable.Empty;
    		}
    		else
    		{
    			observer.OnNext(snapshot);
    
    			return Disposable.Create(() =>
    			{
    				if (!isStopped)
    				{
    					lock (gate)
    					{
    						if (!isStopped)
    						{
    							observers.Remove(observer);
    						}
    					}
    				}
    			});
    		}
    	}
    
    	public void OnNext(NotifyCollectionChangedEventArgs value)
    	{
    		ChangeAndNotify(
    			change: () =>
    			{
    				suppressChangeEvents = true;
    
    				try
    				{
    					Update(value);
    				}
    				finally
    				{
    					suppressChangeEvents = false;
    				}
    			},
    			notify: () =>
    			{
    				if (value.Action == NotifyCollectionChangedAction.Add
    					|| value.Action == NotifyCollectionChangedAction.Remove
    					|| value.Action == NotifyCollectionChangedAction.Reset)
    				{
    					OnPropertyChanged(new PropertyChangedEventArgs("Count"));
    				}
    
    				OnPropertyChanged(new PropertyChangedEventArgs("Item[]"));
    
    				OnCollectionChanged(value);
    			});
    	}
    
    	public void OnError(Exception error)
    	{
    		Complete(error);
    	}
    
    	public void OnCompleted()
    	{
    		Complete();
    	}
    
    	protected override void InsertItem(int index, T item)
    	{
    		ChangeAndNotify(() => base.InsertItem(index, item));
    	}
    
    	protected override void RemoveItem(int index)
    	{
    		ChangeAndNotify(() => base.RemoveItem(index));
    	}
    
    	protected override void SetItem(int index, T item)
    	{
    		ChangeAndNotify(() => base.SetItem(index, item));
    	}
    
    	protected override void MoveItem(int oldIndex, int newIndex)
    	{
    		ChangeAndNotify(() => base.MoveItem(oldIndex, newIndex));
    	}
    
    	protected override void ClearItems()
    	{
    		ChangeAndNotify(base.ClearItems);
    	}
    
    	private IList<T> Snapshot()
    	{
    		return new List<T>(this).AsReadOnly();
    	}
    
    	private void Update(NotifyCollectionChangedEventArgs e)
    	{
    		int index;
    
    		switch (e.Action)
    		{
    			case NotifyCollectionChangedAction.Add:
    				index = e.NewStartingIndex;
    
    				if (index < 0)
    				{
    					index = Count;
    				}
    
    				foreach (T item in e.NewItems)
    				{
    					Insert(index++, item);
    				}
    				break;
    			case NotifyCollectionChangedAction.Remove:
    				foreach (T item in e.OldItems)
    				{
    					Remove(item);
    				}
    				break;
    			case NotifyCollectionChangedAction.Replace:
    				index = e.OldStartingIndex;
    
    				if (index < 0)
    				{
    					index = IndexOf((T) e.OldItems[0]);
    				}
    
    				for (var i = 0; i < e.OldItems.Count; i++, index++)
    				{
    					this[index] = (T) e.NewItems[i];
    				}
    				break;
    			case NotifyCollectionChangedAction.Reset:
    				Clear();
    
    				foreach (T item in e.NewItems)
    				{
    					Add(item);
    				}
    				break;
    			case NotifyCollectionChangedAction.Move:
    				index = e.NewStartingIndex;
    
    				for (int from = e.OldStartingIndex; from < e.OldItems.Count; from++, index++)
    				{
    					Move(from, index);
    				}
    				break;
    		}
    	}
    
    	private void Complete(Exception error = null)
    	{
    		List<IObserver<IList<T>>> observersToNotify = null;
    
    		if (!isStopped)
    		{
    			lock (gate)
    			{
    				if (!isStopped)
    				{
    					isStopped = true;
    					fault = error;
    
    					observersToNotify = new List<IObserver<IList<T>>>(observers);
    
    					observers.Clear();
    				}
    			}
    		}
    
    		if (observersToNotify != null)
    		{
    			foreach (var observer in observersToNotify)
    			{
    				if (error == null)
    				{
    					observer.OnCompleted();
    				}
    				else
    				{
    					observer.OnError(error);
    				}
    			}
    		}
    	}
    
    	private void ChangeAndNotify(Action change, Action notify = null)
    	{
    		List<IObserver<IList<T>>> observersToNotify = null;
    		IList<T> snapshot = null;
    
    		if (!isStopped)
    		{
    			lock (gate)
    			{
    				if (!isStopped)
    				{
    					change();
    
    					if (!suppressChangeEvents)
    					{
    						snapshot = Snapshot();
    						observersToNotify = new List<IObserver<IList<T>>>(observers);
    					}
    				}
    			}
    		}
    
    		if (notify != null)
    		{
    			notify();
    		}
    
    		if (observersToNotify != null)
    		{
    			foreach (var observer in observersToNotify)
    			{
    				observer.OnNext(snapshot);
    			}
    		}
    	}
    
    	protected override void OnPropertyChanged(PropertyChangedEventArgs e)
    	{
    		if (!suppressChangeEvents)
    		{
    			base.OnPropertyChanged(e);
    		}
    	}
    
    	protected override void OnCollectionChanged(NotifyCollectionChangedEventArgs e)
    	{
    		if (!suppressChangeEvents)
    		{
    			base.OnCollectionChanged(e);
    		}
    	}
    }
    


    Usage example:

    using System;
    using System.Collections.Generic;
    using System.Collections.ObjectModel;
    using System.Collections.Specialized;
    using System.ComponentModel;
    using System.Linq;
    using System.Reactive.Disposables;
    using System.Reactive.Linq;
    using System.Reactive.Subjects;
    
    namespace Rxx.Labs.Reactive
    {
    	public class ObservableCollectionSubjectLab
    	{
    		public enum ChannelState
    		{
    			Up,
    			Down
    		}
    
    		static void Main()
    		{
    			var channel1 = new BehaviorSubject<ChannelState>(ChannelState.Down);
    			var channel2 = new BehaviorSubject<ChannelState>(ChannelState.Down);
    			var channel3 = new BehaviorSubject<ChannelState>(ChannelState.Down);
    
    			var collection = new ObservableCollectionSubject<IObservable<ChannelState>>()
    				{
    					channel1, 
    					channel2
    				};
    
    			var query = collection.Select(ObservableEx.CombineLatest).Switch();
    
    			query.Subscribe(states =>
    				{
    					Console.WriteLine("States: {0}", states.Aggregate(
    						string.Empty,
    						(acc, cur) => acc + ", " + cur,
    						acc => acc.Length > 0 ? acc.Substring(2) : acc));
    				},
    				() => Console.WriteLine("Completed"));
    
    			collection.Add(channel3);
    
    			channel2.OnNext(ChannelState.Up);
    			channel3.OnNext(ChannelState.Up);
    			channel1.OnNext(ChannelState.Up);
    
    			collection.Remove(channel2);
    
    			channel1.OnNext(ChannelState.Down);
    			channel3.OnNext(ChannelState.Down);
    
    			collection.Remove(channel1);
    
    			channel3.OnCompleted();
    
    			collection.OnCompleted();
    
    			Console.ReadKey();
    		}
    	}
    }
    

    Example output:

    States: Down, Down
    States: Down, Down, Down
    States: Down, Up, Down
    States: Down, Up, Up
    States: Up, Up, Up
    States: Up, Up
    States: Down, Up
    States: Down, Down
    States: Down
    Completed

    - Dave


    http://davesexton.com/blog
    • Marked as answer by rwwilden Thursday, August 28, 2014 9:59 AM
    Tuesday, July 26, 2011 5:55 AM