Answered Dictionary<TKey,TValue>.ToObservable()

  • Wednesday, January 16, 2013 11:57 AM
     
      Has Code

    I want to subscribe from a given object to a data stream in the form of a dictionary. The following test doesn't work:

     [Test]
            public void Can_create_an_observable_dictionary()
            {
    
                Dictionary<string, float> sourceDict = new Dictionary<string, float>();
                Dictionary<string,float> targetDict = new Dictionary<string,float>();
    
                var obs = sourceDict.ToObservable();
    
                var observer = obs.Subscribe(x => targetDict.Add(x.Key,x.Value));
    
                sourceDict.Add("c1", 12.3f);
                sourceDict.Add("c2", 1.5f);
                sourceDict.Add("c3", 2.7f);
    
                //var changedValue = 3.2f;
                //sourceDict["c1"] = changedValue;
    
                Assert.That(targetDict.Count(), Is.EqualTo(3));
                //Assert.That(targetDict["c1"], Is.EqualTo(changedValue));
    
            }

    The commented lines are the next test I would like to pass. How could this be achieved?

All Replies

  • Wednesday, January 16, 2013 5:42 PM
     
     Answered Has Code

    Hi,

    Well, your code doesn't compile.  Note that ToObservable only works on IEnumerable<T>.  Rx does not provide a ToObservable overload that converts from IDictionary<TKey, TValue>.

    Dictionary<TKey, TValue> has an explicit conversion to IEnumerable<KeyValuePair<TKey, TValue>>; however, calling Subscribe would result in a synchronous snapshot of the dictionary's contents.  The dictionary is empty in your example, so nothing would be added to the target dictionary.  When the test begins to add elements to the source dictionary, the subscription would have already completed and so nothing would be added to the target dictionary.  You could verify this behavior by specifying the onCompleted parameter when calling Subscribe to see that it's called before the test begins to add elements to the source dictionary.

    The reason is that Dictionary<TKey, TValue> does not provide a reactive model; i.e., it does not define any events that indicate when a new item has been added to the dictionary, nor does it implement IObservable<T>.  Thus, the ToObservable operator cannot adapt a dictionary into a reactive model any more than it can adapt an enumerable.

    Instead, consider using the DictionarySubject<TKey, TValue> class from Rxx.  It provides a reactive model on top of a native dictionary.  It implements IObservable<CollectionNotification<KeyValuePair<TKey, TValue>>>, so just call Subscribe without the ToObservable operator.

    Both tests pass in the following example.

    using System;
    using System.Collections.Generic;
    using System.Reactive;
    using System.Reactive.Linq;
    using System.Reactive.Subjects;
    using Microsoft.VisualStudio.TestTools.UnitTesting;
    
    namespace UnitTests
    {
    	[TestClass]
    	public class DictionaryTests
    	{
    		[TestMethod]
    		public void Can_create_an_observable_dictionary()
    		{
    			DictionarySubject<string, float> sourceDict = new DictionarySubject<string, float>();
    			Dictionary<string, float> targetDict = new Dictionary<string, float>();
    
    			var observer = sourceDict
    				.SelectMany(change => change.ToModifications())
    				.Subscribe(mod => mod.Accept(targetDict));
    
    			sourceDict.Add("c1", 12.3f);
    			sourceDict.Add("c2", 1.5f);
    			sourceDict.Add("c3", 2.7f);
    
    			var changedValue = 3.2f;
    			sourceDict["c1"] = changedValue;
    
    			Assert.AreEqual(targetDict.Count, 3);
    			Assert.AreEqual(targetDict["c1"], changedValue);
    		}
    	}
    }

    - Dave


    http://davesexton.com/blog

    • Proposed As Answer by LeeCampbell Wednesday, January 16, 2013 6:03 PM
    • Marked As Answer by jruizaranguren Wednesday, January 16, 2013 7:14 PM
    •  
  • Wednesday, January 16, 2013 5:57 PM
     
     Answered Has Code

    I assume that you are using the standard Dictionary<TKey,TValue> type in the BCL/.NET and not your own custom implementation. If that is the case, then it doesn't have any observable behavior. The ToObservable() method you are using is actually an extension method to IEnumerable<T> that Dictionary<> implements. All this method does is allow you to easily transition to the Observable type set which is most oftenly used when trying to join data from both IEnumerable and IObservable worlds.

    An example of this is if we had an array of messageService implementations (maybe Twitter, IM, FB etc) and each of these had a method that allowed you to get an observable sequence of messages. You could loop through each of them and subscribe and manage the lifetime of the subscription explicitly like this...

    void Main()
    {
    	IMessageService[] messageServices = ResolveAll<IMessageService>();
    	CompositeDisposable subscriptions = new CompositeDisposable();
    	foreach (var messageService in messageServices)
    	{
    		var subscription = messageService.MessageStream().Subscribe(OnMessageReceived);
    		subscriptions.Add(subscription);
    	}
    	
    	//later dispose of the subscriptions.
    }
    
    // Define other methods and classes here
    
    public interface IMessageService
    {
    	IObservable<string> MessageStream();
    }
    
    public static T[] ResolveAll<T>()
    {
    	return new T[]{};
    }
    
    public void OnMessageReceived(string message)
    {}


    However you could make this more simple if only the array of IMessageService instances could play nicely with IObservable. By applying the ToObservable we can do this.

    void Main()
    {
    	IMessageService[] messageServices = ResolveAll<IMessageService>();
    	
    	var allMessages = from messageService in messageServices.ToObservable()
    					  from message in messageService.MessageStream()
    					  select message;
    	var subscription = allMessages.Subscribe(OnMessageReceived);
    	
    	
    	//later dispose of the subscriptions.
    }

    http://introtorx.com/Content/v1.0.10621.0/04_CreatingObservableSequences.html#FromIEnumerable

    So enough for the lesson on Rx.

    So answer your question, you need to use a type that supports change notification. The easiest way to get this is to probably use one of the Rxx types like DictionarySubject. This type will feel like a normal dictionary but provide the change notification behavior you are looking for.

    HTH

    Lee


    Lee Campbell http://LeeCampbell.blogspot.com


    • Edited by LeeCampbell Wednesday, January 16, 2013 6:00 PM Add link to ToObservable explaination
    • Proposed As Answer by LeeCampbell Wednesday, January 16, 2013 6:01 PM
    • Marked As Answer by jruizaranguren Wednesday, January 16, 2013 7:14 PM
    •  
  • Wednesday, January 16, 2013 6:04 PM
     
     
    Sorry Dave, stepped on your toes here. Your answer wasn't when I started writing :-)

    Lee Campbell http://LeeCampbell.blogspot.com

  • Wednesday, January 16, 2013 6:10 PM
     
     

    Hi Lee,

    Not a problem :)  It's always good to have an additional perspective and examples anyway.

    - Dave


    http://davesexton.com/blog

  • Wednesday, January 16, 2013 7:19 PM
     
     

    Thank you both.

    Do you know if there is any plan to include some of the Rxx work in official Rx libraries?


  • Wednesday, January 16, 2013 7:26 PM
     
     

    Hi,

    > Do you know if there is any plan to include some of the Rxx work in official Rx libraries?

    Maybe.  I'm working on it :)

    - Dave


    http://davesexton.com/blog

  • Wednesday, January 16, 2013 7:38 PM
     
     

    Great.

    The dictionary works great but with Rx1.0. I have first installed Rx2.0 and Rxx1.0 and it did not compile.

    Anyway source code is in codeplex so if we just need DictionarySubject we will try to get just the needed bits.

  • Wednesday, January 16, 2013 7:46 PM
     
     

    Hi,

    Rxx 1.0 is not compatible with Rx 2.0.  Rxx 2.0 hasn't been released yet as a discrete download.

    I recommend building the Rxx 2.0 library yourself.  VS 2012 is required.  The following instructions target VS 2010 but I don't recall there being any differences:

    http://rxx.codeplex.com/wikipage?title=Building

    - Dave


    http://davesexton.com/blog