none
Filtering inside GroupedObservable

    คำถาม

  • How can I filter inside a grouped observable, while keeping it a grouped observable?

    The problem is that all the observable operators return an IObservable, so I lose the grouping.

    Example:

    var elements = new [] 
    {
        Tuple.Create("a", 1),
        Tuple.Create("b", 1),
        Tuple.Create("a", 1),
        Tuple.Create("a", 2),
        Tuple.Create("b", 2),
    }
    
    var stream = elements.ToObservable();
    var query = stream
        .GroupBy(t => t.Item1, t => t.Item2)
        .Select(grp => grp.DistinctUntilChanged());

    The query I'm looking for should return IObservable<IGroupedObservable<string,int>>, but instead I'm getting IO<IO<int>> and losing the group information.

    I can write something like this:

    var stream = elements.ToObservable();
    var query = stream
        .GroupBy(t => t.Item1)
        .SelectMany(grp => grp.DistinctUntilChanged(t => t.Item2))
        .GroupBy(t => t.Item1, t => t.Item2);

    But it feels like a hack, and I don't like having to group the stream twice.

    I'd love to hear your thoughts.


    Omer Mor


    • แก้ไขโดย Omer Mor 28 มีนาคม 2555 8:25
    28 มีนาคม 2555 8:24

ตอบทั้งหมด

  • Hi Omer,

    You could create an extension: (Untested)

    public static partial class Observable3
    {
    	public static IGroupedObservable<TKey, TElement> DistinctUntilChanged<TKey, TElement>(
    		this IGroupedObservable<TKey, TElement> source)
    	{
    		return new AnonymousGroupedObservable<TKey, TElement>(source.Key, source.DistinctUntilChanged());
    	}
    }
    
    public sealed class AnonymousGroupedObservable<TKey, TElement> : IGroupedObservable<TKey, TElement>
    {
    	public TKey Key
    	{
    		get
    		{
    			return key;
    		}
    	}
    
    	private readonly TKey key;
    	private readonly IObservable<TElement> source;
    
    	public AnonymousGroupedObservable(TKey key, IObservable<TElement> source)
    	{
    		this.key = key;
    		this.source = source;
    	}
    
    	public IDisposable Subscribe(IObserver<TElement> observer)
    	{
    		return source.Subscribe(observer);
    	}
    }

    - Dave


    http://davesexton.com/blog

    28 มีนาคม 2555 14:54
  • Thanks dave,

    that might work, but it's:

    1. too specific for the DistinctUntilChanged case (what about Sample, Throttle, etc), and 
    2. looks like a patch (no offense).

    Currently I feel like my design approach to model my data as an observable of grouped observable is cumbersome. Which is sad because I find it very elegant.

    Am I missing something and there's a better way to handle groups?

    Or should I ditch this design?

    I'd love to hear from the Rx team as well (Bart?).


    Omer Mor


    • แก้ไขโดย Omer Mor 28 มีนาคม 2555 21:43
    28 มีนาคม 2555 21:42
  • Hi Omer,

    You could conceivably create an entire library of LINQ operators for IGroupedObservable<K,E>.  The point of my extension is simple: transfer the key between operations.  I don't know of a better way than this.

    - Dave


    http://davesexton.com/blog

    • แก้ไขโดย Dave Sexton 28 มีนาคม 2555 23:22 Fixed type params
    28 มีนาคม 2555 23:21