Answered Combining latest on groups of observables

  • 2012年5月15日 2:12
     
     

    Let's say I have a stream of asset valuations. I want to get the total value of all my assets. Each asset's value might change several times, and whenever it does, I should update my total valuation to include the new asset value. For example, if I have this stream as my input

    new Asset { Name = "IBM", Value = 1000 }
    new Asset {Name = "BBT", Value = 2500}
    new Asset { Name = "MSFT", Value = 2500 }
    new Asset { Name = "IBM", Value = 2500 }
    new Asset { Name = "MSFT", Value = 2000 }

    I should get this stream as my output:

    1000
    3500
    6000
    7500
    7000

    Any ideas? Here's my failed attempt.

    private static void Main(string[] args)
    {
    var subject = new Subject<Asset>();

    subject.Subscribe(Console.WriteLine);

    // This is obviously wrong
    var totalValue =
    subject.GroupBy(i => i.Name).Merge().Select(
    a => a.Value).Scan((acc, d) => acc + d);

    totalValue.Subscribe(Console.WriteLine);

    subject.OnNext(new Asset { Name = "IBM", Value = 1000 });
    subject.OnNext(new Asset {Name = "BBT", Value = 2500});
    subject.OnNext(new Asset { Name = "MSFT", Value = 2500 });
    subject.OnNext(new Asset { Name = "IBM", Value = 2500 });
    subject.OnNext(new Asset { Name = "MSFT", Value = 2000 });

    Console.ReadKey();

    // Desired output
    // 1000
    // 3500
    // 6000
    // 7500
    // 7000

    // Actual output
    // 1000
    // 3500
    // 6000
    // 8500
    // 10500
    }

    public class Asset
    {
    public string Name { get; set; }
    public decimal Value { get; set; }

    public override string ToString()
    {
    return string.Format("[{0} {1}]", Name, Value);
    }
    }


すべての返信

  • 2012年5月15日 9:18
     
      コードあり

    There's probably a more elegant solution that one could come up with which involves a ReplaySubject but this would solve your problem by using a dictionary which is captured by the closure for the grouping:

    var latestValue = new Dictionary<string, Asset>();
    
    subject
       .Do(a => latestValue[a.Name] = a)
       .Select(_ => latestValue.Values.Select(a => a.Value))
       .Subscribe(latestValues => Console.WriteLine(latestValues.Sum()));

    This produces the correct result.

    Maybe this could help you if you prefer a different solution which doesn't involve mutable data: http://social.msdn.microsoft.com/Forums/vi-VN/rx/thread/4c4bab36-0818-4945-a5cc-0c74eb265e3b


    • 編集済み ssboisen 2012年5月15日 9:19
    •  
  • 2012年5月15日 12:55
     
     
    Thanks. That certainly is a solution. I was hoping for a solution that just involves operations on the input stream and doesn't make use of extra storage. ReplaySubject sounds intriguing. I've seen that link before, but I'm not clear on how I can apply it to a scenario where I want CombineLatest on groups of observables. Hmmm.
  • 2012年5月15日 14:15
     
     回答済み コードあり

    Hi Ranjith,

    I spent a bit of time on this and then tought you actually only need the "change in" values i.e the Delta. This makes it easier to sum things up, you dont!

    First group your assets by key. Then for each group you can use the Scan method to take each value, compare it to the last (or zero if it is the first value for the group) and then push the delta.

    so grouping is easy

    subject.GroupBy(i => i.Name)

    Next we need to apply the Delta logic to each of the inner grouped sequences

    grp=>

    grp.Select(asset=>asset.Value) //Only want the value .Scan(

    new {Delta=0m, Value=0m},

    (last, next)=>new {Delta=next-last.Value, Value=next})//for each new value, push new value and the change since last value .Select(changes=>changes.Item1) //Ignore value, just push delta

    Now we will have gone from IObservable<Asset> to IObservable<IGroupedObservable<string,Asset> and now IObservable<IObservable<decimal>>. We only want IObservable<decimal> so we flatten it all with a .Merge call. Still we are only getting a sequence of deltas, we still need to roll them all up, so we go back to good old scan.

    .Merge().Scan(0m, (last, current)=>last+current);


    And when we put it all together we get

    var totals = subject
    	.GroupBy(i => i.Name)
    	.Select(grp=>
    		grp.Select(asset=>asset.Value)
    			.Scan(new {Delta=0m, Value=0m}, (last, next)=>new {Delta=next-last.Value, Value=next})
    			.Select(changes=>changes.Item1)
    	)
    	.Merge()
    	.Scan(0m, (last, current)=>last+current);

    Output is 

    1000

    3500

    6000

    7500

    7000

    That was fun!

    HTH

    Lee


    Lee Campbell http://LeeCampbell.blogspot.com


    • 回答の候補に設定 LeeCampbell 2012年5月15日 14:16
    • 回答としてマーク Ranjith Zachariah 2012年5月16日 4:29
    • 編集済み LeeCampbell 2012年5月16日 17:30 Use anon type instead of Tuple. I think it makes it more descriptive
    •  
  • 2012年5月16日 4:30
     
     
    Genius! Thank you!