Delaying insignificant events
-
2012年7月8日 14:54
Hi,
I would like to throttle down an event feed, by delaying "insignificant" events. In particular, I have a stock quote feed where each event is a PricingNode with a Symbol and a Price. I'd like to throttle down the feed rate by filtering or delaying insignificant ticks. Let's say that a tick is insignificant if for that symbol, on a percent or absolute basis, its difference from the previous published tick is below a threshold. Filtering the stream is easy enough. Here's how I did it:
public static IObservable<PricingNode> Transform(this IObservable<PricingNode> priceFeed)
{
return priceFeed.GroupBy(p => p.Symbol).Select(g =>
g.Scan(new PricingNode(),
(lastPublished, current) =>
SignificantPriceChange(lastPublished, current)
? current
: lastPublished).DistinctUntilChanged()).Merge();
}private static bool SignificantPriceChange(PricingNode px1, PricingNode px2)
{
decimal price1 = px1.Price;
decimal price2 = px2.Price;
const decimal threshold = .001m;
if (price1 == 0 || price2 == 0) return true;
if (price1 == price2) return false;
return (Math.Abs(price1 - price2) > threshold * price1);
}This gives us a .1% threshold. What I would like to do is modify the above Transform method to delay, instead of throwing out, insignificant ticks.
Let's say I have the following input stream:
Time 0 1 2 3 4 5 6
Input (IBM, 191.41) (IBM, 191.42)
Let's say I define my delay as 5 time units. Then I'd like to produce the following output:
Output (IBM, 191.41) (IBM, 191.42)
If I get a significant tick after an insignificant one, then I should publish the significant one immediately, and throw out the insignificant one
Time 0 1 2 3 4 5 6
Input (IBM, 191.41) (IBM, 191.42) (IBM, 180.01)
Output (IBM, 191.41) (IBM, 180.01)
If I get multiple insignificant ticks during the delay period, I should publish the last one at the end of the delay period
Time 0 1 2 3 4 5 6
Input (IBM, 191.41) (IBM, 191.42) (IBM, 191.43) (IBM, 191.42) (IBM, 191.44) (191.40)
Output (IBM, 191.41) (IBM, 191.40)
Conceptually, the insignificant ticks go onto a delayed feed. Each new insignificant tick replaces the element on the delayed feed, but does not reset the delay timer. If we get a significant tick during the delay period, we flush the delayed feed.I'm not sure how to go about doing this. My intuition tells me that I need to use a SelectMany operator, but I'm not clear on how.
Anyone have any ideas?
Thanks,
Ranj
全部回复
-
2012年7月9日 15:20
I have not solved this yet, but I thought I would post the tests that reflect your requirements. Maybe some one else will beat me to the solution...
[TestMethod] public void Test1() { var testScheduler = new TestScheduler(); var observer = testScheduler.CreateObserver<Tick>(); var ticks = testScheduler.CreateColdObservable( new Recorded<Notification<Tick>>(0, Notification.CreateOnNext(new Tick("IBM", 191.41))), new Recorded<Notification<Tick>>(1000, Notification.CreateOnNext(new Tick("IBM", 191.42))) ); //Apply the Operator here... ticks.Subscribe(observer); testScheduler.Start(); Assert.AreEqual(2, observer.Messages.Count); Assert.AreEqual(1, observer.Messages[0].Time); Assert.AreEqual("IBM", observer.Messages[0].Value.Value.Symbol); Assert.AreEqual(191.41, observer.Messages[0].Value.Value.Price); Assert.AreEqual(5000, observer.Messages[1].Time); Assert.AreEqual("IBM", observer.Messages[1].Value.Value.Symbol); Assert.AreEqual(191.42, observer.Messages[1].Value.Value.Price); } [TestMethod] public void Test2() { var testScheduler = new TestScheduler(); var observer = testScheduler.CreateObserver<Tick>(); var ticks = testScheduler.CreateColdObservable( new Recorded<Notification<Tick>>(0, Notification.CreateOnNext(new Tick("IBM", 191.41))), new Recorded<Notification<Tick>>(1000, Notification.CreateOnNext(new Tick("IBM", 191.42))), new Recorded<Notification<Tick>>(2000, Notification.CreateOnNext(new Tick("IBM", 180.01))) ); //Apply the Operator here... ticks.Subscribe(observer); testScheduler.Start(); Assert.AreEqual(2, observer.Messages.Count); Assert.AreEqual(1, observer.Messages[0].Time); Assert.AreEqual("IBM", observer.Messages[0].Value.Value.Symbol); Assert.AreEqual(191.41, observer.Messages[0].Value.Value.Price); Assert.AreEqual(2000, observer.Messages[1].Time); Assert.AreEqual("IBM", observer.Messages[1].Value.Value.Symbol); Assert.AreEqual(180.01, observer.Messages[1].Value.Value.Price); } [TestMethod] public void Test3() { var testScheduler = new TestScheduler(); var observer = testScheduler.CreateObserver<Tick>(); var ticks = testScheduler.CreateColdObservable( new Recorded<Notification<Tick>>(0, Notification.CreateOnNext(new Tick("IBM", 191.41))), new Recorded<Notification<Tick>>(1000, Notification.CreateOnNext(new Tick("IBM", 191.42))), new Recorded<Notification<Tick>>(2000, Notification.CreateOnNext(new Tick("IBM", 191.43))), new Recorded<Notification<Tick>>(3000, Notification.CreateOnNext(new Tick("IBM", 191.42))), new Recorded<Notification<Tick>>(4000, Notification.CreateOnNext(new Tick("IBM", 191.44))), new Recorded<Notification<Tick>>(5000, Notification.CreateOnNext(new Tick("IBM", 191.40))) ); //Apply the Operator here... ticks.Subscribe(observer); testScheduler.Start(); Assert.AreEqual(2, observer.Messages.Count); Assert.AreEqual(1, observer.Messages[0].Time); Assert.AreEqual("IBM", observer.Messages[0].Value.Value.Symbol); Assert.AreEqual(191.41, observer.Messages[0].Value.Value.Price); Assert.AreEqual(5000, observer.Messages[1].Time); Assert.AreEqual("IBM", observer.Messages[1].Value.Value.Symbol); Assert.AreEqual(191.40, observer.Messages[1].Value.Value.Price); }I will keep working on the solution. Sounds interesting...
HTH
Lee
Lee Campbell http://LeeCampbell.blogspot.com
-
2012年7月9日 17:55
Great testcases. Should the time for the first tick be 0 (instead of 1)? I would also change the last testcase slightly. Let's say the delay for insignificant ticks is set to 5 s. So we should get a message at 0 and 6s.
[TestMethod] public void Test3() { var testScheduler = new TestScheduler(); var observer = testScheduler.CreateObserver<Tick>(); var ticks = testScheduler.CreateColdObservable( new Recorded<Notification<Tick>>(0, Notification.CreateOnNext(new Tick("IBM", 191.41))), new Recorded<Notification<Tick>>(1000, Notification.CreateOnNext(new Tick("IBM", 191.42))), new Recorded<Notification<Tick>>(2000, Notification.CreateOnNext(new Tick("IBM", 191.43))), new Recorded<Notification<Tick>>(3000, Notification.CreateOnNext(new Tick("IBM", 191.42))), new Recorded<Notification<Tick>>(4000, Notification.CreateOnNext(new Tick("IBM", 191.44))), new Recorded<Notification<Tick>>(5000, Notification.CreateOnNext(new Tick("IBM", 191.40))) ); //Apply the Operator here... ticks.Subscribe(observer); testScheduler.Start(); Assert.AreEqual(2, observer.Messages.Count); Assert.AreEqual(0, observer.Messages[0].Time); Assert.AreEqual("IBM", observer.Messages[0].Value.Value.Symbol); Assert.AreEqual(191.41, observer.Messages[0].Value.Value.Price); Assert.AreEqual(6000, observer.Messages[1].Time); Assert.AreEqual("IBM", observer.Messages[1].Value.Value.Symbol); Assert.AreEqual(191.40, observer.Messages[1].Value.Value.Price); }
-
2012年7月11日 14:18
Here's something I threw together in Linqpad, I think its in the general ballpark. Sorry Lee Thread.Sleep was my preferred scheduler in this case!void Main() { var source = new Subject<PricingNode>(); var q = (from tick in source.GroupBy(p => p.Ticker) from pair in tick.Scan(new PricePair(), (pair, current) => new PricePair(pair, current)) from next in pair.IsSignificantChange ? Observable.Return(pair.Current) : Observable.Timer(pair.DueTime.Value).Select(_ => pair.Current).TakeUntil(tick) select pair).Select(p => p.Current).Timestamp(); q.Dump(); source.OnNext(new UserQuery.PricingNode("EUR/USD", 1.511m)); Thread.Sleep(1000); source.OnNext(new UserQuery.PricingNode("EUR/USD", 1.512m)); Thread.Sleep(1000); source.OnNext(new UserQuery.PricingNode("EUR/USD", 1.513m)); Thread.Sleep(1000); source.OnNext(new UserQuery.PricingNode("EUR/USD", 1.514m)); Thread.Sleep(1000); source.OnNext(new UserQuery.PricingNode("EUR/USD", 1.515m)); Thread.Sleep(1000); source.OnNext(new UserQuery.PricingNode("EUR/USD", 1.516m)); } public class PricePair { public PricePair() { } public PricePair(PricePair pair, PricingNode current) : this(pair.IsSignificantChange ? pair.Current : pair.Last, current) { if (!IsSignificantChange && pair.DueTime != null) { DueTime = pair.DueTime; } } public PricePair(PricingNode last, PricingNode current) { Last = last; Current = current; IsSignificantChange = SignificantPriceChange(); if (!IsSignificantChange) { DueTime = new DateTimeOffset(DateTime.Now.AddSeconds(5)); } else { DueTime = null; } } public PricingNode Last { get; private set; } public PricingNode Current { get; private set; } public DateTimeOffset? DueTime { get; private set; } public bool IsSignificantChange { get; private set; } private bool SignificantPriceChange() { if (Last == null || Current == null) { return true; } decimal price1 = Last.Price; decimal price2 = Current.Price; const decimal threshold = .001m; if (price1 == 0 || price2 == 0) return true; if (price1 == price2) return false; return (Math.Abs(price1 - price2) > threshold * price1); } } public class PricingNode { public PricingNode(string ticker, decimal price) { Ticker = ticker; Price = price; } public string Ticker { get; private set; } public decimal Price { get; private set; } } // Define other methods and classes here- 已标记为答案 Ranjith Zachariah 2012年7月13日 2:45
-
2012年7月13日 2:48
This solution works. Thank you!
For some reason Lee's testcases using the TestScheduler don't work with this solution--in particular any test case that requires a delayed output. However, testing this solution with console output, I see that it produces correct results.
-
2012年7月13日 8:00
If you pass the scheduler into Observable.Timer it should work,
i.e.
var q = (from tick in source.GroupBy(p => p.Ticker) from pair in tick.Scan(new PricePair(), (pair, current) => new PricePair(pair, current)) from next in pair.IsSignificantChange ? Observable.Return(pair.Current) : Observable.Timer(pair.DueTime.Value, scheduler).Select(_ => pair.Current).TakeUntil(tick) select pair).Select(p => p.Current).Timestamp();

