Combining latest on groups of observables
-
יום שלישי 15 מאי 2012 02:12
Let's say I have a stream of asset valuations. I want to get the total value of all my assets. Each asset's value might change several times, and whenever it does, I should update my total valuation to include the new asset value. For example, if I have this stream as my input
new Asset { Name = "IBM", Value = 1000 }
new Asset {Name = "BBT", Value = 2500}
new Asset { Name = "MSFT", Value = 2500 }
new Asset { Name = "IBM", Value = 2500 }
new Asset { Name = "MSFT", Value = 2000 }I should get this stream as my output:
1000
3500
6000
7500
7000
Any ideas? Here's my failed attempt.
private static void Main(string[] args)
{
var subject = new Subject<Asset>();
subject.Subscribe(Console.WriteLine);
// This is obviously wrong
var totalValue =
subject.GroupBy(i => i.Name).Merge().Select(
a => a.Value).Scan((acc, d) => acc + d);
totalValue.Subscribe(Console.WriteLine);
subject.OnNext(new Asset { Name = "IBM", Value = 1000 });
subject.OnNext(new Asset {Name = "BBT", Value = 2500});
subject.OnNext(new Asset { Name = "MSFT", Value = 2500 });
subject.OnNext(new Asset { Name = "IBM", Value = 2500 });
subject.OnNext(new Asset { Name = "MSFT", Value = 2000 });
Console.ReadKey();
// Desired output
// 1000
// 3500
// 6000
// 7500
// 7000
// Actual output
// 1000
// 3500
// 6000
// 8500
// 10500
}
public class Asset
{
public string Name { get; set; }
public decimal Value { get; set; }
public override string ToString()
{
return string.Format("[{0} {1}]", Name, Value);
}
}
כל התגובות
-
יום שלישי 15 מאי 2012 09:18
There's probably a more elegant solution that one could come up with which involves a ReplaySubject but this would solve your problem by using a dictionary which is captured by the closure for the grouping:
var latestValue = new Dictionary<string, Asset>(); subject .Do(a => latestValue[a.Name] = a) .Select(_ => latestValue.Values.Select(a => a.Value)) .Subscribe(latestValues => Console.WriteLine(latestValues.Sum()));
This produces the correct result.
Maybe this could help you if you prefer a different solution which doesn't involve mutable data: http://social.msdn.microsoft.com/Forums/vi-VN/rx/thread/4c4bab36-0818-4945-a5cc-0c74eb265e3b
- נערך על-ידי ssboisen יום שלישי 15 מאי 2012 09:19
-
יום שלישי 15 מאי 2012 12:55Thanks. That certainly is a solution. I was hoping for a solution that just involves operations on the input stream and doesn't make use of extra storage. ReplaySubject sounds intriguing. I've seen that link before, but I'm not clear on how I can apply it to a scenario where I want CombineLatest on groups of observables. Hmmm.
-
יום שלישי 15 מאי 2012 14:15
Hi Ranjith,
I spent a bit of time on this and then tought you actually only need the "change in" values i.e the Delta. This makes it easier to sum things up, you dont!
First group your assets by key. Then for each group you can use the Scan method to take each value, compare it to the last (or zero if it is the first value for the group) and then push the delta.
so grouping is easy
subject.GroupBy(i => i.Name)
Next we need to apply the Delta logic to each of the inner grouped sequences
grp=>
grp.Select(asset=>asset.Value) //Only want the value .Scan(
new {Delta=0m, Value=0m},
(last, next)=>new {Delta=next-last.Value, Value=next})//for each new value, push new value and the change since last value .Select(changes=>changes.Item1) //Ignore value, just push delta
Now we will have gone from IObservable<Asset> to IObservable<IGroupedObservable<string,Asset> and now IObservable<IObservable<decimal>>. We only want IObservable<decimal> so we flatten it all with a .Merge call. Still we are only getting a sequence of deltas, we still need to roll them all up, so we go back to good old scan.
.Merge().Scan(0m, (last, current)=>last+current);
And when we put it all together we get
var totals = subject .GroupBy(i => i.Name) .Select(grp=> grp.Select(asset=>asset.Value) .Scan(new {Delta=0m, Value=0m}, (last, next)=>new {Delta=next-last.Value, Value=next}) .Select(changes=>changes.Item1) ) .Merge() .Scan(0m, (last, current)=>last+current);Output is
1000
3500
6000
7500
7000
That was fun!
HTH
Lee
Lee Campbell http://LeeCampbell.blogspot.com
- הוצע כתשובה על-ידי LeeCampbell יום שלישי 15 מאי 2012 14:16
- סומן כתשובה על-ידי Ranjith Zachariah יום רביעי 16 מאי 2012 04:29
- נערך על-ידי LeeCampbell יום רביעי 16 מאי 2012 17:30 Use anon type instead of Tuple. I think it makes it more descriptive
-
יום רביעי 16 מאי 2012 04:30Genius! Thank you!