Answered Delaying insignificant events

  • 2012年7月8日 14:54
     
     

    Hi,

    I would like to throttle down an event feed, by delaying "insignificant" events. In particular, I have a stock quote feed where each event is a PricingNode with a Symbol and a Price. I'd like to throttle down the feed rate by filtering or delaying insignificant ticks. Let's say that a tick is insignificant if for that symbol, on a percent or absolute basis, its difference from the previous published tick is below a threshold. Filtering the stream is easy enough. Here's how I did it:

    public static IObservable<PricingNode> Transform(this IObservable<PricingNode> priceFeed)
    {
    return priceFeed.GroupBy(p => p.Symbol).Select(g =>
       g.Scan(new PricingNode(),
      (lastPublished, current) =>
      SignificantPriceChange(lastPublished, current)
    ? current
    : lastPublished).DistinctUntilChanged()).Merge();
    }

    private static bool SignificantPriceChange(PricingNode px1, PricingNode px2)
    {
    decimal price1 = px1.Price;
    decimal price2 = px2.Price;
    const decimal threshold = .001m;

    if (price1 == 0 || price2 == 0) return true;
    if (price1 == price2) return false;

    return (Math.Abs(price1 - price2) > threshold * price1);
    }

    This gives us a .1% threshold. What I would like to do is modify the above Transform method to delay, instead of throwing out, insignificant ticks.

    Let's say I have the following input stream:

    Time          0                              1                              2                              3                              4                              5                              6                             

    Input        (IBM, 191.41)            (IBM, 191.42)

    Let's say I define my delay as 5 time units. Then I'd like to produce the following output:

    Output     (IBM, 191.41)                                                                                                                                                                            (IBM, 191.42)

    If I get a significant tick after an insignificant one, then I should publish the significant one immediately, and throw out the insignificant one

    Time          0                              1                              2                              3                              4                              5                              6                             

    Input        (IBM, 191.41)            (IBM, 191.42)          (IBM, 180.01)

    Output     (IBM, 191.41)                                            (IBM, 180.01)

    If I get multiple insignificant ticks during the delay period, I should publish the last one at the end of the delay period

    Time          0                              1                              2                              3                              4                              5                              6                             

    Input        (IBM, 191.41)            (IBM, 191.42)          (IBM, 191.43)           (IBM, 191.42)          (IBM, 191.44)            (191.40)

    Output     (IBM, 191.41)                                                                                                                                                                            (IBM, 191.40)


    Conceptually, the insignificant ticks go onto a delayed feed. Each new insignificant tick replaces the element on the delayed feed, but does not reset the delay timer. If we get a significant tick during the delay period, we flush the delayed feed.

    I'm not sure how to go about doing this. My intuition tells me that I need to use a SelectMany operator, but I'm not clear on how.

    Anyone have any ideas?

    Thanks,
    Ranj

全部回复

  • 2012年7月9日 15:20
     
      包含代码

    I have not solved this yet, but I thought I would post the tests that reflect your requirements. Maybe some one else will beat me to the solution...

    [TestMethod]
    public void Test1()
    {
        var testScheduler = new TestScheduler();
        var observer = testScheduler.CreateObserver<Tick>();
        var ticks = testScheduler.CreateColdObservable(
            new Recorded<Notification<Tick>>(0, Notification.CreateOnNext(new Tick("IBM", 191.41))),
            new Recorded<Notification<Tick>>(1000, Notification.CreateOnNext(new Tick("IBM", 191.42)))
            );
                
                
        //Apply the Operator here...
        ticks.Subscribe(observer);
    
        testScheduler.Start();
    
        Assert.AreEqual(2, observer.Messages.Count);
        Assert.AreEqual(1, observer.Messages[0].Time);
        Assert.AreEqual("IBM", observer.Messages[0].Value.Value.Symbol);
        Assert.AreEqual(191.41, observer.Messages[0].Value.Value.Price);
        Assert.AreEqual(5000, observer.Messages[1].Time);
        Assert.AreEqual("IBM", observer.Messages[1].Value.Value.Symbol);
        Assert.AreEqual(191.42, observer.Messages[1].Value.Value.Price);
    }
    
    [TestMethod]
    public void Test2()
    {
        var testScheduler = new TestScheduler();
        var observer = testScheduler.CreateObserver<Tick>();
        var ticks = testScheduler.CreateColdObservable(
            new Recorded<Notification<Tick>>(0, Notification.CreateOnNext(new Tick("IBM", 191.41))),
            new Recorded<Notification<Tick>>(1000, Notification.CreateOnNext(new Tick("IBM", 191.42))),
            new Recorded<Notification<Tick>>(2000, Notification.CreateOnNext(new Tick("IBM", 180.01)))
            );
                
        //Apply the Operator here...
        ticks.Subscribe(observer);
    
        testScheduler.Start();
    
        Assert.AreEqual(2, observer.Messages.Count);
        Assert.AreEqual(1, observer.Messages[0].Time);
        Assert.AreEqual("IBM", observer.Messages[0].Value.Value.Symbol);
        Assert.AreEqual(191.41, observer.Messages[0].Value.Value.Price);
        Assert.AreEqual(2000, observer.Messages[1].Time);
        Assert.AreEqual("IBM", observer.Messages[1].Value.Value.Symbol);
        Assert.AreEqual(180.01, observer.Messages[1].Value.Value.Price);
    }
    
    [TestMethod]
    public void Test3()
    {
        var testScheduler = new TestScheduler();
        var observer = testScheduler.CreateObserver<Tick>();
        var ticks = testScheduler.CreateColdObservable(
            new Recorded<Notification<Tick>>(0, Notification.CreateOnNext(new Tick("IBM", 191.41))),
            new Recorded<Notification<Tick>>(1000, Notification.CreateOnNext(new Tick("IBM", 191.42))),
            new Recorded<Notification<Tick>>(2000, Notification.CreateOnNext(new Tick("IBM", 191.43))),
            new Recorded<Notification<Tick>>(3000, Notification.CreateOnNext(new Tick("IBM", 191.42))),
            new Recorded<Notification<Tick>>(4000, Notification.CreateOnNext(new Tick("IBM", 191.44))),
            new Recorded<Notification<Tick>>(5000, Notification.CreateOnNext(new Tick("IBM", 191.40)))
            );
    
        //Apply the Operator here...
        ticks.Subscribe(observer);
    
        testScheduler.Start();
    
        Assert.AreEqual(2, observer.Messages.Count);
        Assert.AreEqual(1, observer.Messages[0].Time);
        Assert.AreEqual("IBM", observer.Messages[0].Value.Value.Symbol);
        Assert.AreEqual(191.41, observer.Messages[0].Value.Value.Price);
        Assert.AreEqual(5000, observer.Messages[1].Time);
        Assert.AreEqual("IBM", observer.Messages[1].Value.Value.Symbol);
        Assert.AreEqual(191.40, observer.Messages[1].Value.Value.Price);
    }

    I will keep working on the solution. Sounds interesting...

    HTH

    Lee


    Lee Campbell http://LeeCampbell.blogspot.com

  • 2012年7月9日 17:55
     
      包含代码

    Great testcases. Should the time for the first tick be 0 (instead of 1)? I would also change the last testcase slightly. Let's say the delay for insignificant ticks is set to 5 s. So we should get a message at 0 and 6s.

    [TestMethod]
    public void Test3()
    {
        var testScheduler = new TestScheduler();
        var observer = testScheduler.CreateObserver<Tick>();
        var ticks = testScheduler.CreateColdObservable(
            new Recorded<Notification<Tick>>(0, Notification.CreateOnNext(new Tick("IBM", 191.41))),
            new Recorded<Notification<Tick>>(1000, Notification.CreateOnNext(new Tick("IBM", 191.42))),
            new Recorded<Notification<Tick>>(2000, Notification.CreateOnNext(new Tick("IBM", 191.43))),
            new Recorded<Notification<Tick>>(3000, Notification.CreateOnNext(new Tick("IBM", 191.42))),
            new Recorded<Notification<Tick>>(4000, Notification.CreateOnNext(new Tick("IBM", 191.44))),
            new Recorded<Notification<Tick>>(5000, Notification.CreateOnNext(new Tick("IBM", 191.40)))
            );
    
        //Apply the Operator here...
        ticks.Subscribe(observer);
    
        testScheduler.Start();
    
        Assert.AreEqual(2, observer.Messages.Count);
        Assert.AreEqual(0, observer.Messages[0].Time);
        Assert.AreEqual("IBM", observer.Messages[0].Value.Value.Symbol);
        Assert.AreEqual(191.41, observer.Messages[0].Value.Value.Price);
        Assert.AreEqual(6000, observer.Messages[1].Time);
        Assert.AreEqual("IBM", observer.Messages[1].Value.Value.Symbol);
        Assert.AreEqual(191.40, observer.Messages[1].Value.Value.Price);
    }

  • 2012年7月11日 14:18
     
     已答复 包含代码
    Here's something I threw together in Linqpad, I think its in the general ballpark. Sorry Lee Thread.Sleep was my preferred scheduler in this case!
    void Main()
    {
    	var source = new Subject<PricingNode>();
    	
    	var q = (from tick in source.GroupBy(p => p.Ticker)
    			from pair in tick.Scan(new PricePair(),   
    				(pair, current) => new PricePair(pair, current))
    			from next in pair.IsSignificantChange ? Observable.Return(pair.Current) : 
    				Observable.Timer(pair.DueTime.Value).Select(_ => pair.Current).TakeUntil(tick)
    			select pair).Select(p => p.Current).Timestamp();
    
    	q.Dump();
    	
    	source.OnNext(new UserQuery.PricingNode("EUR/USD", 1.511m));
    	Thread.Sleep(1000);
    	source.OnNext(new UserQuery.PricingNode("EUR/USD", 1.512m));
    	Thread.Sleep(1000);
    	source.OnNext(new UserQuery.PricingNode("EUR/USD", 1.513m));
    	Thread.Sleep(1000);
    	source.OnNext(new UserQuery.PricingNode("EUR/USD", 1.514m));
    	Thread.Sleep(1000);
    	source.OnNext(new UserQuery.PricingNode("EUR/USD", 1.515m));
    	Thread.Sleep(1000);
    	source.OnNext(new UserQuery.PricingNode("EUR/USD", 1.516m));
    				
    }
    
    public class PricePair
    {
    	public PricePair()
    	{
    	}
    	
    	public PricePair(PricePair pair, PricingNode current)
    		: this(pair.IsSignificantChange ? pair.Current : pair.Last, current)
    	{
    		if (!IsSignificantChange && pair.DueTime != null)
    		{
    			DueTime = pair.DueTime;	
    		}
    	}
    
    	public PricePair(PricingNode last, PricingNode current)
    	{
    		Last = last;
    		Current = current;
    		
    		IsSignificantChange = SignificantPriceChange();
    		if (!IsSignificantChange)
    		{
    			DueTime = new DateTimeOffset(DateTime.Now.AddSeconds(5));
    		}
    		else
    		{
    			DueTime = null;
    		}
    	}
    	
    	public PricingNode Last { get; private set; }
    	public PricingNode Current { get; private set; }
    	public DateTimeOffset? DueTime { get; private set; }
    	public bool IsSignificantChange { get; private set; }
    	
    	private bool SignificantPriceChange()
    	{
    		if (Last == null || Current == null)
    		{
    			return true;
    		}
    		
    		decimal price1 = Last.Price;
    		decimal price2 = Current.Price;
    		const decimal threshold = .001m;
    		
    		if (price1 == 0 || price2 == 0) return true;
    		if (price1 == price2) return false;
    		
    		return (Math.Abs(price1 - price2) > threshold * price1);
    	}
    }
    
    public class PricingNode
    {
    	public PricingNode(string ticker, decimal price)
    	{
    		Ticker = ticker;
    		Price = price;
    	}
    
    	public string Ticker { get; private set; }
    	public decimal Price { get; private set; }
    }
    
    // Define other methods and classes here
    

  • 2012年7月13日 2:48
     
     

    This solution works. Thank you!

    For some reason Lee's testcases using the TestScheduler don't work with this solution--in particular any test case that requires a delayed output. However, testing this solution with console output, I see that it produces correct results.

  • 2012年7月13日 8:00
     
      包含代码

    If you pass the scheduler into Observable.Timer it should work,

    i.e.

    var q = (from tick in source.GroupBy(p => p.Ticker)
    			from pair in tick.Scan(new PricePair(),   
    				(pair, current) => new PricePair(pair, current))
    			from next in pair.IsSignificantChange ? Observable.Return(pair.Current) : 
    				Observable.Timer(pair.DueTime.Value, scheduler).Select(_ => pair.Current).TakeUntil(tick)
    			select pair).Select(p => p.Current).Timestamp();