Answered Combining latest on groups of observables

  • 15 mai 2012 02:12
     
     

    Let's say I have a stream of asset valuations. I want to get the total value of all my assets. Each asset's value might change several times, and whenever it does, I should update my total valuation to include the new asset value. For example, if I have this stream as my input

    new Asset { Name = "IBM", Value = 1000 }
    new Asset {Name = "BBT", Value = 2500}
    new Asset { Name = "MSFT", Value = 2500 }
    new Asset { Name = "IBM", Value = 2500 }
    new Asset { Name = "MSFT", Value = 2000 }

    I should get this stream as my output:

    1000
    3500
    6000
    7500
    7000

    Any ideas? Here's my failed attempt.

    private static void Main(string[] args)
    {
    var subject = new Subject<Asset>();

    subject.Subscribe(Console.WriteLine);

    // This is obviously wrong
    var totalValue =
    subject.GroupBy(i => i.Name).Merge().Select(
    a => a.Value).Scan((acc, d) => acc + d);

    totalValue.Subscribe(Console.WriteLine);

    subject.OnNext(new Asset { Name = "IBM", Value = 1000 });
    subject.OnNext(new Asset {Name = "BBT", Value = 2500});
    subject.OnNext(new Asset { Name = "MSFT", Value = 2500 });
    subject.OnNext(new Asset { Name = "IBM", Value = 2500 });
    subject.OnNext(new Asset { Name = "MSFT", Value = 2000 });

    Console.ReadKey();

    // Desired output
    // 1000
    // 3500
    // 6000
    // 7500
    // 7000

    // Actual output
    // 1000
    // 3500
    // 6000
    // 8500
    // 10500
    }

    public class Asset
    {
    public string Name { get; set; }
    public decimal Value { get; set; }

    public override string ToString()
    {
    return string.Format("[{0} {1}]", Name, Value);
    }
    }


Toate mesajele

  • 15 mai 2012 09:18
     
      Are cod

    There's probably a more elegant solution that one could come up with which involves a ReplaySubject but this would solve your problem by using a dictionary which is captured by the closure for the grouping:

    var latestValue = new Dictionary<string, Asset>();
    
    subject
       .Do(a => latestValue[a.Name] = a)
       .Select(_ => latestValue.Values.Select(a => a.Value))
       .Subscribe(latestValues => Console.WriteLine(latestValues.Sum()));

    This produces the correct result.

    Maybe this could help you if you prefer a different solution which doesn't involve mutable data: http://social.msdn.microsoft.com/Forums/vi-VN/rx/thread/4c4bab36-0818-4945-a5cc-0c74eb265e3b


    • Editat de ssboisen 15 mai 2012 09:19
    •  
  • 15 mai 2012 12:55
     
     
    Thanks. That certainly is a solution. I was hoping for a solution that just involves operations on the input stream and doesn't make use of extra storage. ReplaySubject sounds intriguing. I've seen that link before, but I'm not clear on how I can apply it to a scenario where I want CombineLatest on groups of observables. Hmmm.
  • 15 mai 2012 14:15
     
     Răspuns Are cod

    Hi Ranjith,

    I spent a bit of time on this and then tought you actually only need the "change in" values i.e the Delta. This makes it easier to sum things up, you dont!

    First group your assets by key. Then for each group you can use the Scan method to take each value, compare it to the last (or zero if it is the first value for the group) and then push the delta.

    so grouping is easy

    subject.GroupBy(i => i.Name)

    Next we need to apply the Delta logic to each of the inner grouped sequences

    grp=>

    grp.Select(asset=>asset.Value) //Only want the value .Scan(

    new {Delta=0m, Value=0m},

    (last, next)=>new {Delta=next-last.Value, Value=next})//for each new value, push new value and the change since last value .Select(changes=>changes.Item1) //Ignore value, just push delta

    Now we will have gone from IObservable<Asset> to IObservable<IGroupedObservable<string,Asset> and now IObservable<IObservable<decimal>>. We only want IObservable<decimal> so we flatten it all with a .Merge call. Still we are only getting a sequence of deltas, we still need to roll them all up, so we go back to good old scan.

    .Merge().Scan(0m, (last, current)=>last+current);


    And when we put it all together we get

    var totals = subject
    	.GroupBy(i => i.Name)
    	.Select(grp=>
    		grp.Select(asset=>asset.Value)
    			.Scan(new {Delta=0m, Value=0m}, (last, next)=>new {Delta=next-last.Value, Value=next})
    			.Select(changes=>changes.Item1)
    	)
    	.Merge()
    	.Scan(0m, (last, current)=>last+current);

    Output is 

    1000

    3500

    6000

    7500

    7000

    That was fun!

    HTH

    Lee


    Lee Campbell http://LeeCampbell.blogspot.com


    • Propus ca răspuns de LeeCampbell 15 mai 2012 14:16
    • Marcat ca răspuns de Ranjith Zachariah 16 mai 2012 04:29
    • Editat de LeeCampbell 16 mai 2012 17:30 Use anon type instead of Tuple. I think it makes it more descriptive
    •  
  • 16 mai 2012 04:30
     
     
    Genius! Thank you!