Unanswered Recreating time for a bigdata reactive queryset.

  • Monday, March 04, 2013 3:06 AM
     
     

    I need to be able to replay data through my system as if it were actually being published at the time intervals indicated in the data with ms precision. In other words, I would like to schedule my onnext events on my observablex at the same relative intervals as shown in the data. So for eg: If have the following data in a csv file & and an Observable<quote> stream created from the file

    10:42:01.239 MSFT 24.99

    10:42:01.242 MSFT 24.99

    10:42:01.256 MSFT 24.99

    10:42:01.257 MSFT 24.99

    On start of the observable I would like to receive the 2nd item after 3ms from the 1st , the third after 14ms and so on and so forth. If this were just one stream it would be fairly simple to write a loop and just have it sleep etc ... but the problem is I have 5000 such feed files & millions of data-points with overlapping times. So what is the most efficient way to schedule the replay through the 50 different reactive queries I have in my system ? 

All Replies

  • Monday, March 04, 2013 8:06 AM
     
      Has Code

    Hi,

    Parse the files and merge them into a single sequence of quotes.  Then sort the sequence by timestamps and use Scan from Ix to calculate the differences between timestamps of consecutive items.  Finally, pass the deltas as input to Observable.Generate.

    I'm not sure what kind of precision you can get using Generate.  You'll have to run performance tests to see if it meets your goals.  Just keep in mind that to get accurate results your tests should be with optimizations enabled, without a debugger attached and preferably with minimal observer side-effects; e.g., you could add the quotes into a list with timestamps and analyze them later.  (I.e., don't use the TimeInterval operator and write directly to the console, as I've done below for the sake of simplicity.)

    For example:

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Reactive.Linq;
    
    namespace RxLabs.Reactive
    {
    	public sealed class GenerateWithDelayFromDataLab
    	{
    		static void Main()
    		{
    			var parsedQuotes = new List<Quote>()
    			{
    				new Quote(TimeSpan.Parse("10:42:01.239"), "MSFT", 21.99m),
    				new Quote(TimeSpan.Parse("10:42:04.257"), "MSFT", 24.99m),
    				new Quote(TimeSpan.Parse("10:42:02.242"), "MSFT", 22.99m),
    				new Quote(TimeSpan.Parse("10:42:03.256"), "MSFT", 23.99m)
    			};
    
    			var deltas = parsedQuotes
    				.OrderBy(q => q.Timestamp)
    				.Scan(
    					new
    					{
    						Quote = (Quote) null,
    						Delta = TimeSpan.Zero
    					},
    					(previous, current) => new
    					{
    						Quote = current,
    						Delta = previous.Quote == null
    									? TimeSpan.Zero
    									: current.Timestamp - previous.Quote.Timestamp
    					});
    
    			var quotes = Observable.Generate(
    				deltas.GetEnumerator(),
    				e => e.MoveNext(),
    				e => e,
    				e => e.Current.Quote,
    				e => e.Current.Delta);
    
    			using (quotes.TimeInterval().Subscribe(
    				t => Console.WriteLine("{0}\t{1}", t.Value, t.Interval),
    				() => Console.WriteLine("Completed")))
    			{
    				Console.ReadLine();
    			}
    		}
    
    		class Quote
    		{
    			public TimeSpan Timestamp { get; private set; }
    			public string Symbol { get; private set; }
    			public decimal Price { get; private set; }
    
    			public Quote(TimeSpan timestamp, string symbol, decimal price)
    			{
    				Timestamp = timestamp;
    				Symbol = symbol;
    				Price = price;
    			}
    
    			public override string ToString()
    			{
    				return Symbol + " " + Price.ToString("C") + " @" + Timestamp;
    			}
    		}
    	}
    }

    - Dave


    http://davesexton.com/blog

  • Tuesday, March 05, 2013 4:56 AM
     
     

    Dave

    Thank you that gives me a good starting point to tackle this. First challenge is the List<quote> creation since there is a lot of data I cannot load it all into memory to sort and somehow need to stream it in and chunk orderby maybe at 5 sec intervals across all the 5000 symbols. 

    Will try it out and hopefully it works...