none
Joining multiple sequences while observing only the last element in them

    Question

  • Here's a bit of a problem that I have. Lets say I have a container that implements this interface:

    interface IElementStore<T>
    {
     event EventHandler<StoreEventArgs> ItemStored;
     event EventHandler<StoreEventArgs> ItemRemoved;
    
     T GetItem();
     StoreItem(T item);
     RemoveItem();
    }

    Assume that I also have a class defined like this:

    class Addin
    {
       private void Initialize(IElementStore<int> countProvider, IElementStore<double> valueProvider)
       {
       }
    
       private void Create(int count, double value)
       {
           //Do something
       }
    
       private void Destroy()
       {
           //Do something
       }
    }

    Basically, what I have is an addin that can be loaded and unloaded anytime at runtime. This addin depends on values stored in both providers passed to the Initialize method. The requirement I have is that addin should call Create method only when both providers have something stored in them. Since items in both providers can be removed at any time, the addin must also call Destroy method if at least one of the providers doesn't have an item stored. And, if later items will be reinserted back in to providers, then the addin should once again call Create method.

    Since I'm rather new to RX, I thought maybe someone could provide a code snippet how to properly write such RX query, preferably, if that's not too much to ask, with some step by step explanation, so that I could also get some educational value out of it, rather than simply copy and paste it.

    Thanks for any help in advance.


    Tuesday, February 18, 2014 1:27 PM

All replies

  • There are a few concepts here that are useful.

    1. Getting from Events to Observable sequences
    2. Performing a running aggregate with `Scan`
    3. Doing something only when a value changes with DistinctUntilChanged

    First you want to get a sequence that pushes values that indicated if the ElementStore has a value or not.

    private IObservable<bool> HasValues<T>(IElementStore<T> source)
    {
    	var added = Observable.FromEventPattern<StoreEventArgs>(
    		h=>source.ItemStored+=h,
    		h=>source.ItemStored-=h);
    	var removed = Observable.FromEventPattern<StoreEventArgs>(
    		h=>source.ItemRemoved+=h,
    		h=>source.ItemRemoved-=h);
    	
    	return Observable.Merge(
    		added.Select(_=>1),
    		removed.Select(_=>-1)
    	)
    	.Scan(0, (acc, cur)=>acc+cur)
    	.Select(count=>count>0);
    }

    This code first maps from the event pattern to an Observable sequence. Next it projects each "Add" to a +1 and each "Remove" to a -1. We then merge these two sequences together and perform a running sum using Scan. What this means is that we will start with a seed value of 0. When an Add is performed, the value 1 is pushed. The running sum will yield  1 (because 0+1 == 1). When another Add is performed, the value 1 is pushed and this is added to the aggregate which is now at 1, so we get 2 (0+1+1==2). When a remove is performed, a -1 is pushed and we decrement the running counter e.g. (0+1+1-1==1).

    Finally in this method, we return a bool flag to indicate if the running count is greater than 1 (i.e. are there any elements in the ElementStore).

    Next we just combine each of these sequences for each of the providers with a merge. We only want to know when it changes to true or to false i.e. we dont want false, false, false, true, true, false; we want false, true, false. So we use DistinctUntilChanged to do this filtering for us.

    var aCount = HasValues(providerA);
    var bCount = HasValues(providerB);
    
    Observable.CombineLatest(
    		aCount,
    		bCount,
    		(a,b)=>a && b)
    	.DistinctUntilChanged()
    	.Subscribe(bothHaveValues=>
    		{
    			if(bothHaveValues)
    			{
    				Create(providerA.GetItem(), providerB.GetItem());	
    			}
    			else
    			{
    			  Destroy();
    			}
    		});
    

    The full working code as a LinqPad snippet is here

    void Main()
    {
      var a = new ElementStore<int>();
    	var b = new ElementStore<double>();
    	
    	var addin = new Addin();
    	addin.Initialize(a, b);
    	
    	"Adding 1 to a".Dump();
    	a.StoreItem(1);
    	"Adding 2 to a".Dump();
    	a.StoreItem(2);
    	"Adding 3 to a".Dump();
    	a.StoreItem(3);
    	
    	"Adding 1.1 to b".Dump();
    	b.StoreItem(1.1);
    	"Adding 2.2 to b".Dump();
    	b.StoreItem(2.2);
    	
    	"Removing from a".Dump();
    	a.RemoveItem();
    	"Removing from a".Dump();
    	a.RemoveItem();
    	"Removing from a".Dump();
    	a.RemoveItem();
    }
    
    // Define other methods and classes here
    public class ElementStore<T> : IElementStore<T>
    {
    	private readonly Stack<T> _items = new Stack<T>();
    	
    	public event EventHandler<StoreEventArgs> ItemStored;
     	public event EventHandler<StoreEventArgs> ItemRemoved;
    
      public T GetItem()
    	{
    		return _items.Peek();
    	}
     
     	public void StoreItem(T item)
    	{
    		_items.Push(item);
    		var handler = ItemStored;
    		if(handler!=null) handler(this, new StoreEventArgs());
    	}
     	
    	public void RemoveItem()
    	{
    		_items.Pop();
    		var handler = ItemRemoved;
    		if(handler!=null) handler(this, new StoreEventArgs());
    	}
    }
    
    
    interface IElementStore<T>
    {
     event EventHandler<StoreEventArgs> ItemStored;
     event EventHandler<StoreEventArgs> ItemRemoved;
    
     T GetItem();
     void StoreItem(T item);
     void RemoveItem();
    }
    public class StoreEventArgs : EventArgs
    {}
    
    class Addin
    {
       public void Initialize(IElementStore<int> providerA, IElementStore<double> providerB)
       {
    		var aCount = HasValues(providerA);
    		var bCount = HasValues(providerB);
    		
    		Observable.CombineLatest(
    				aCount,
    				bCount,
    				(a,b)=>a && b)
    			.DistinctUntilChanged()
    			.Subscribe(bothHaveValues=>
    				{
    					if(bothHaveValues)
    					{
    						Create(providerA.GetItem(), providerB.GetItem());	
    					}
    					else
    					{
    						Destroy();
    					}
    				});
    		}
    
       private void Create(int count, double value)
       {
           //Do something
    			 string.Format("--Created with {0} & {1}", count, value).Dump();
       }
    
       private void Destroy()
       {
           //Do something
    			 string.Format("--Destroyed").Dump();
       }
    	 
    	private IObservable<bool> HasValues<T>(IElementStore<T> source)
    	{
    		var added = Observable.FromEventPattern<StoreEventArgs>(
    			h=>source.ItemStored+=h,
    			h=>source.ItemStored-=h);
    		var removed = Observable.FromEventPattern<StoreEventArgs>(
    			h=>source.ItemRemoved+=h,
    			h=>source.ItemRemoved-=h);
    		
    		return Observable.Merge(
    			added.Select(_=>1),
    			removed.Select(_=>-1)
    		)
    		.Scan(0, (acc, cur)=>acc+cur)
    		.Select(count=>count>0);
    	}
    }

    This would output the following

    Adding 1 to a
    Adding 2 to a
    Adding 3 to a
    Adding 1.1 to b
    --Created with 3 & 1.1
    Adding 2.2 to b
    Removing from a
    Removing from a
    Removing from a
    --Destroyed
    

    HTH

    Lee


    Lee Campbell http://LeeCampbell.blogspot.com

    Tuesday, May 13, 2014 4:45 PM