locked
Mutable Observables RRS feed

  • Question

  • Is it possible to filter, project, or otherwise operate on an observable with existing subscribed observers?  As an example, could I have a keyPress observable with several subscribers, and then later modify that observable to filter out certain keys without requiring all of the original subscribers to subscribe to a new observable.  It seems that most of the observable operators return a new observable rather than mutating an existing one, and I'm interested in creating the later behavior.
    Monday, August 1, 2011 8:33 PM

Answers

  • Do you mean like this?

    	var xss = new Subject<IObservable<int>>();
    	
    	var xs = xss.Switch();
    	
    	Func<int, bool> filter = x => x % 2 == 0;
    	
    	var ys =
    		from x in xs
    		where filter(x)
    		select x;
    		
    	ys.Subscribe(Console.WriteLine);
    	
    	xss.OnNext(Observable.Generate(
    		0, x => x < 20,
    		x => x + 1, x => x,
    		x => TimeSpan.FromSeconds(0.5)));
    	
    	Thread.Sleep(5000);
    	
    	filter = x => x % 3 == 0;
    	
    	Thread.Sleep(5000);
    	
    	xss.OnNext(Observable.Range(0, 10).Select(x => -x));
    	
    	Thread.Sleep(10000);
    

    :-)


    James C-S
    • Marked as answer by LiquidAsh Tuesday, August 2, 2011 3:56 AM
    Tuesday, August 2, 2011 3:07 AM

All replies

  • Hi LiquidAsh,

    Rx is generally functional in nature and, although there are lots of side-effects going on, there are few places that are mutable in the way that you are asking. However, side-effects can allow the immutable to behave as if they are mutable. Try something like this:

    	var xs = Observable.Generate(
    		0, x => x < 10,
    		x => x + 1, x => x,
    		x => TimeSpan.FromSeconds(1.0));
    	
    	var evens = false;
    	
    	var ys =
    		from x in xs
    		where evens ? x % 2 == 0 : true
    		select x;
    		
    	ys.Subscribe(Console.WriteLine);
    	
    	Thread.Sleep(5000);
    	
    	evens = true;
    	
    	Thread.Sleep(10000);
    

    Does something like this give you what you need?

    Cheers.


    James C-S
    Monday, August 1, 2011 11:56 PM
  • Thanks for the quick response James C-S.  The one issue in your example, that I'd like to work around (if possible) is needing to know that evens are what you may want to filter in the future.  Instead you may want to filter odds, or powers of two, or any other type of number.  Or maybe you even want to restart the sequence, inject some other numbers, or turn the counting around so it counts down rather than up.  It seems like these could be useful operations on the event stream, but it's unfortunate that there doesn't seem to be a good way of doing this without having to subscribe a bunch of observers to a new observable every time such a change is made.

     

    Tuesday, August 2, 2011 1:12 AM
  • Do you mean like this?

    	var xss = new Subject<IObservable<int>>();
    	
    	var xs = xss.Switch();
    	
    	Func<int, bool> filter = x => x % 2 == 0;
    	
    	var ys =
    		from x in xs
    		where filter(x)
    		select x;
    		
    	ys.Subscribe(Console.WriteLine);
    	
    	xss.OnNext(Observable.Generate(
    		0, x => x < 20,
    		x => x + 1, x => x,
    		x => TimeSpan.FromSeconds(0.5)));
    	
    	Thread.Sleep(5000);
    	
    	filter = x => x % 3 == 0;
    	
    	Thread.Sleep(5000);
    	
    	xss.OnNext(Observable.Range(0, 10).Select(x => -x));
    	
    	Thread.Sleep(10000);
    

    :-)


    James C-S
    • Marked as answer by LiquidAsh Tuesday, August 2, 2011 3:56 AM
    Tuesday, August 2, 2011 3:07 AM
  • Yes thanks!  I was considering re-subscribing and pushing events through a subject, but the Switch makes this much more elegant.  If I'm not mistaken, the filter could actually be added as a .Where() on any Observable passed to xss.OnNext().  One last note, the x=>TimeSpan.FromSeconds(0.5) didn't seem to compile under the stable rx release, but works just fine when commented out(?).  Thanks again for all of your help, this is exactly what I was looking for.
    Tuesday, August 2, 2011 4:07 AM