none
Multiple observables with single subscription

    Domanda

  • Hi, 

    have multiple observables with on subscriber, something like the following:

     public class Filter : INotifyPropertyChanged
        {
            public string Val1 { .../*raise prop change event*/ }
            public string Val2{ .../*raise prop change event*/ }
        }

    this filter object can be a part of a collection 

    public class FilteringViewModel
        {         public ObservableCollection<Filter> Filters { getset; }

    The user can add or remove filters. I want to be able to subscribe to the filter change event in a timely manner so if a specific filter changes and before my timeout another filter changes only 1 event will be fired.

    i'v played a bit with the multicast and publish with no luck, something like 

    private void AddEmptyFilter()
            {
               
                var filter = new Filter();
                Filters.Add(filter);
    
                var tmp = Observable.
                         FromEventPattern<PropertyChangedEventArgs>(filter, "PropertyChanged").
                         Where(evt => evt.EventArgs.PropertyName == "Val1" || evt.EventArgs.PropertyName == "Val2").
                         Select(_ => filter).
                    //DistinctUntilChanged(). 
                         Throttle(TimeSpan.FromSeconds(3));
               // tmp.Subscribe(DoWork);
               
                tmp.Multicast(latest);
    }
    
    .
    .
    .
    
    latest = new Subject<RelationFilter>();
    latest.Subscribe(DoWork);

    please advice
    • Modificato onemenny giovedì 29 marzo 2012 12:07
    giovedì 29 marzo 2012 12:06

Risposte

  • Hi, 

    Removal is a bit trickier.  Your query now will subscribe to property notifications any time a filter is added, removed or moved, which means that a given filter could have more than one subscription to its PropertyChanged event.  Instead, I assume what you really want is to remove the existing subscription when a filter is removed and do nothing when a filter is moved.  Is that correct?

    I didn't realize that you also wanted notifications whenever a filter was added, since that wasn't in your original spec.  Your idea to use Merge is good.

    Also, to indicate that you don't care about the notification type consider using Unit instead of casting to object, though casting is fine it's just not semantic.

    var filtersChanged = Observable.FromEventPattern<NotifyCollectionChangedEventHandler, NotifyCollectionChangedEventArgs>(
    	eh => Filters.CollectionChanged += eh,
    	eh => Filters.CollectionChanged -= eh);
    
    var filtersRemoved = 
    	from change in filtersChanged
    	where change.EventArgs.Action == NotifyCollectionChangedAction.Remove
    	from filter in change.EventArgs.OldItems.Cast<Filter>()
    	select filter;
    
    var filtersAdded = 
    	from change in filtersChanged
    	where change.EventArgs.Action == NotifyCollectionChangedAction.Add
    	from filter in change.EventArgs.NewItems.Cast<Filter>()
    	select filter;
    
    var filterPropertyChanges = 
    	from filter in filtersAdded
    	from propertyChanged in Observable.FromEventPattern<PropertyChangedEventArgs>(filter, "PropertyChanged")
    		.TakeUntil(filtersRemoved.Where(removed => removed == filter))
    	where propertyChanged.EventArgs.PropertyName == "Val1"
    		|| propertyChanged.EventArgs.PropertyName == "Val2"
    	select Unit.Default;
    
    latest = filtersAdded
    	.Select(_ => Unit.Default)
    	.Merge(filterPropertyChanges)
    	.Throttle(TimeSpan.FromSeconds(3))
    	.Subscribe(_ => DoWork());

    If you also want to include filtersRemoved in the throttled query so that removing a filter may cause DoWork to be called, then you can simply merge it in like you did previously.  For example:

    latest = new[]
    	{
    		filtersAdded.Select(_ => Unit.Default), 
    		filtersRemoved.Select(_ => Unit.Default), 
    		filterPropertyChanges
    	}
    	.Merge()
    	.Throttle(TimeSpan.FromSeconds(3))
    	.Subscribe(_ => DoWork());

    - Dave


    http://davesexton.com/blog

    • Modificato Dave Sexton domenica 1 aprile 2012 12:39 Removed "Replace" from filtersAdded since it's not fully supported without also treaing it as "Remove"
    • Contrassegnato come risposta onemenny domenica 1 aprile 2012 14:02
    domenica 1 aprile 2012 12:37

Tutte le risposte

  • Hi,

    The order of operators is very important.  By applying Throttle before Multicast, you are not sharing the throttling.  (Edit: Well, technically you are sharing it but not in the way that you wanted.  You're creating a separate Throttle query for every filter, then multicasting.  You'd need to multicast first and then apply Throttle only once on the subject before subcribing.)

    Try something like the following instead, which I've made a bit more reactive.  (Untested)

    var filterChanges = 
    	from change in Observable.FromEventPattern<NotifyCollectionChangedEventHandler, NotifyCollectionChangedEventArgs>(
    		eh => eh, 
    		eh => Filters.CollectionChanged += eh, 
    		eh => Filters.CollectionChanged -= eh)
    	where change.EventArgs.Action == NotifyCollectionChangedAction.Add
    		|| change.EventArgs.Action == NotifyCollectionChangedAction.Replace
    	from filter in change.NewItems.Cast<Filter>()
    	select from propertyChanged in Observable.FromEventPattern<PropertyChangedEventArgs>(filter, "PropertyChanged")
    		where propertyChanged.EventArgs.PropertyName == "Val1"
    			|| propertyChanged.EventArgs.PropertyName == "Val2"
    		select filter;
    
    filterChanges
    	.Merge()
    	.Throttle(TimeSpan.FromSeconds(3))
    	.Subscribe(DoWork);
    
    Filters.Add(new Filter());
    Filters.Add(new Filter());
    Filters.Add(new Filter());

    - Dave


    http://davesexton.com/blog

    • Modificato Dave Sexton giovedì 29 marzo 2012 12:45 Clarified first sentence; fixed spelling
    giovedì 29 marzo 2012 12:41
  • Hi Dave,

    Thank you very much for the prompt reply. 

    i may missed something - the user can, at any time, add a new filter - which will trigger the AddEmptyFilter 

    so (i changed a little bit your code)  DoWork Method will fire X times. where X is the number of calls to AddEmptyFilter

     private void AddEmptyFilter()
            {
    		var filterChanges =
                        from change in Observable.FromEventPattern<NotifyCollectionChangedEventHandler, NotifyCollectionChangedEventArgs>(
                            eh => Filters.CollectionChanged += eh,
                            eh => Filters.CollectionChanged -= eh)
                        where change.EventArgs.Action == NotifyCollectionChangedAction.Add
                            || change.EventArgs.Action == NotifyCollectionChangedAction.Replace
                        from filter in change.EventArgs.NewItems.Cast<Filter>()
                        select from propertyChanged in Observable.FromEventPattern<PropertyChangedEventArgs>(filter, "PropertyChanged")
                               where propertyChanged.EventArgs.PropertyName == "Val1"
                                   || propertyChanged.EventArgs.PropertyName == "Val2"
                               select filter;
    
                filterChanges
                    .Merge()
                    .Throttle(TimeSpan.FromSeconds(3))
                    .Subscribe(DoWork);
    
                var filter = new Filter();
                Filters.Add(filter);
    

    please advice

    giovedì 29 marzo 2012 13:07
  • Hi,

    > i may missed something - the user can, at any time, add a new filter - which will trigger the AddEmptyFilter

    Yes, the query I posted handles this by listenting for changes to the Filters collection.  You probably don't want to put the entire query inside your AddEmptyFilter method.

    > DoWork Method will fire X times. where X is the number of calls to AddEmptyFilter

    Yes, if you put the query inside AddEmptyFilter.  But is that really what you want?

    If you place the query outside of AddEmptyFilter, then the DoWork method will be called whenever a filter changes and there are no additional changes to any filter for 3 seconds.

    - Dave


    http://davesexton.com/blog

    giovedì 29 marzo 2012 13:24
  • Hi,

    The reason why I got rid of AddEmptyFilter in my example was because I was able to reduce it to this:

    private void AddEmptyFilter()
    {
    	Filters.Add(new Filter());
    }

    In my example, I simply called Filters.Add(new Filter()) directly to illustrate this.

    - Dave


    http://davesexton.com/blog

    giovedì 29 marzo 2012 13:26
  • Hi Dave,

    Thank you very much for your answer and clarification.  I have to say that the RX Linq Expressions confuses me. 

    in your answer:

    var filterChanges =
                        from change in Observable.FromEventPattern<NotifyCollectionChangedEventHandler, NotifyCollectionChangedEventArgs>(
                            eh => Filters.CollectionChanged += eh,
                            eh => Filters.CollectionChanged -= eh)
                        where change.EventArgs.Action == NotifyCollectionChangedAction.Add
                            || change.EventArgs.Action == NotifyCollectionChangedAction.Replace
                        from filter in change.EventArgs.NewItems.Cast<Filter>()
                        select from propertyChanged in Observable.FromEventPattern<PropertyChangedEventArgs>(filter, "PropertyChanged")
                               where propertyChanged.EventArgs.PropertyName == "Val1"
                                   || propertyChanged.EventArgs.PropertyName == "Val2"
                               select filter;
    
                filterChanges
                    .Merge()
                    .Throttle(TimeSpan.FromSeconds(3))
                    .Subscribe(DoWork);

    The DoWork get's called only when a property change is detected on my filters and NOT when a filter is added or removed. I have tried merging it with another expression or doing something like 

    var filterChanges =
                        from change in Observable.FromEventPattern<NotifyCollectionChangedEventHandler, NotifyCollectionChangedEventArgs>(
                            eh => Filters.CollectionChanged += eh,
                            eh => Filters.CollectionChanged -= eh).Throttle(Timespan.FromSeconds(3)).Do(DoWork)
                        where change.EventArgs.Action == NotifyCollectionChangedAction.Add
                            || change.EventArgs.Action == NotifyCollectionChangedAction.Replace
                        from filter in change.EventArgs.NewItems.Cast<Filter>()
                        select from propertyChanged in Observable.FromEventPattern<PropertyChangedEventArgs>(filter, "PropertyChanged")
                               where propertyChanged.EventArgs.PropertyName == "Val1"
                                   || propertyChanged.EventArgs.PropertyName == "Val2"
                               select filter;
    
                filterChanges
                    .Merge()
                    .Throttle(TimeSpan.FromSeconds(3))
                    .Subscribe(DoWork);


    notice the Do method on the FromEventPatter. This of course did not work. I thought that the merge operation should accomplish that. 

    I then tried to merge the two "listeners" like so

     var filterChanges2 = Observable.FromEventPattern<NotifyCollectionChangedEventHandler, NotifyCollectionChangedEventArgs>(
                            eh => Filters.CollectionChanged += eh,
                            eh => Filters.CollectionChanged -= eh);
    
    
                var filterChanges = from change in filterChanges2
                                    where change.EventArgs.NewItems != null
                                    from filter in change.EventArgs.NewItems.Cast<Filter>()
                                    from propertyChanged in Observable.FromEventPattern<PropertyChangedEventArgs>(filter, "PropertyChanged")
                                    where propertyChanged.EventArgs.PropertyName == "Val1"
                                        || propertyChanged.EventArgs.PropertyName == "Val2"
                                    select filter;
    
                latest = filterChanges.Merge(filterChanges2)...

    but of course the two LINQ selections are different, so i had to fallback to something like that:

     var filterChanges2 = Observable.FromEventPattern<NotifyCollectionChangedEventHandler, NotifyCollectionChangedEventArgs>(
                            eh => Filters.CollectionChanged += eh,
                            eh => Filters.CollectionChanged -= eh);
    
    
                var filterChanges = from change in filterChanges2
                                    where change.EventArgs.NewItems != null
                                    from filter in change.EventArgs.NewItems.Cast<Filter>()
                                    from propertyChanged in Observable.FromEventPattern<PropertyChangedEventArgs>(filter, "PropertyChanged")
                                    where propertyChanged.EventArgs.PropertyName == "Val1"
                                        || propertyChanged.EventArgs.PropertyName == "Val2"
                                    select filter;
    
    
    
                latest = filterChanges2.Cast<object>()
                    .Merge(filterChanges.Cast<object>())
                     .Throttle(TimeSpan.FromSeconds(3))
                     .Subscribe(DoWork);

    that worked for me since i don't care about the event args in the DoWork method. 

    1 more thing: i did notice that when i use DistinctUntilChange it only distinct between the property of the PropertyChanged, and that i accept :(

    any way, your help is much appreciated, and any any elegancy/clarification added to my code above is welcomely accepted 


    • Modificato onemenny domenica 1 aprile 2012 09:44 find the solution
    domenica 1 aprile 2012 06:36
  • Hi, 

    Removal is a bit trickier.  Your query now will subscribe to property notifications any time a filter is added, removed or moved, which means that a given filter could have more than one subscription to its PropertyChanged event.  Instead, I assume what you really want is to remove the existing subscription when a filter is removed and do nothing when a filter is moved.  Is that correct?

    I didn't realize that you also wanted notifications whenever a filter was added, since that wasn't in your original spec.  Your idea to use Merge is good.

    Also, to indicate that you don't care about the notification type consider using Unit instead of casting to object, though casting is fine it's just not semantic.

    var filtersChanged = Observable.FromEventPattern<NotifyCollectionChangedEventHandler, NotifyCollectionChangedEventArgs>(
    	eh => Filters.CollectionChanged += eh,
    	eh => Filters.CollectionChanged -= eh);
    
    var filtersRemoved = 
    	from change in filtersChanged
    	where change.EventArgs.Action == NotifyCollectionChangedAction.Remove
    	from filter in change.EventArgs.OldItems.Cast<Filter>()
    	select filter;
    
    var filtersAdded = 
    	from change in filtersChanged
    	where change.EventArgs.Action == NotifyCollectionChangedAction.Add
    	from filter in change.EventArgs.NewItems.Cast<Filter>()
    	select filter;
    
    var filterPropertyChanges = 
    	from filter in filtersAdded
    	from propertyChanged in Observable.FromEventPattern<PropertyChangedEventArgs>(filter, "PropertyChanged")
    		.TakeUntil(filtersRemoved.Where(removed => removed == filter))
    	where propertyChanged.EventArgs.PropertyName == "Val1"
    		|| propertyChanged.EventArgs.PropertyName == "Val2"
    	select Unit.Default;
    
    latest = filtersAdded
    	.Select(_ => Unit.Default)
    	.Merge(filterPropertyChanges)
    	.Throttle(TimeSpan.FromSeconds(3))
    	.Subscribe(_ => DoWork());

    If you also want to include filtersRemoved in the throttled query so that removing a filter may cause DoWork to be called, then you can simply merge it in like you did previously.  For example:

    latest = new[]
    	{
    		filtersAdded.Select(_ => Unit.Default), 
    		filtersRemoved.Select(_ => Unit.Default), 
    		filterPropertyChanges
    	}
    	.Merge()
    	.Throttle(TimeSpan.FromSeconds(3))
    	.Subscribe(_ => DoWork());

    - Dave


    http://davesexton.com/blog

    • Modificato Dave Sexton domenica 1 aprile 2012 12:39 Removed "Replace" from filtersAdded since it's not fully supported without also treaing it as "Remove"
    • Contrassegnato come risposta onemenny domenica 1 aprile 2012 14:02
    domenica 1 aprile 2012 12:37
  • Hi,

    Note that Replace isn't supported, so I edited my previous post.  To support Replace you'd have to treat it as both Add and Remove, which of course can be done simply by adding the following to both filtersRemoved and filtersAdded:

    || change.EventArgs.Action == NotifyCollectionChangedAction.Replace

    But you'd also need to be careful about the order of subscriptions so that if a filter replaces itself it's treated as being added instead of removed.

    It's not something that you necessarily need to be concerned about.  I'd recommend simply not supporting Replace at all if you don't need it.

    - Dave


    http://davesexton.com/blog

    domenica 1 aprile 2012 12:42
  • Thanks Dave,

    This is way more elegant than my way, and yes, i dont care about collection being item move - just the "add" and "delete". I wasn't aware of System.Reactive.Unit

    Thank you for your great help!

    Menny


    www.onemenny.com/blog/

    domenica 1 aprile 2012 14:08
  • Hi again,

    I have been trying to make this more efficient than the proposed solution: consider I want filters collection to be unique, so if someone adds a new filter I don’t want it to trigger my subscription. On the other hand I want the filter to notify the property change only when truly the value changes (between Val1 and Val2)

    I have been playing with it again a bit and noticed that if I take the “filtersAdded” and apply DistinctUntilChanged it only gets distinct once.

     var singleFilter =
                    from filter in filtersAdded.Distinct()
                    from propertyChanged in Observable.FromEventPattern<PropertyChangedEventArgs>(filter, "PropertyChanged")
                        .TakeUntil(filtersRemoved.Where(removed => removed == filter))
                    select filter;
    
                singleFilter
                    .Where(f => !string.IsNullOrEmpty(f.LeftSideText) && !string.IsNullOrEmpty(f.RightSideText) && !string.IsNullOrEmpty(f.SelectedOperator))
                    .Throttle(TimeSpan.FromSeconds(1))
                    .DistinctUntilChanged(new SingleFilter())
                    .ObserveOnDispatcher()
                     .Subscribe(ApplyFilter);

    I can send a sample solution if needed

    Many thanks

    giovedì 28 giugno 2012 13:49