none
Streaminsight not performing well when the number of Joins grow

    Question

  • Hi,

    So the POC worked and it is emitting all the data as expected with 2 counterparties.

    I had to stress test this with 10,000 counterparties to see how this would work.

    This meant my reference data for risk exposures would contain 10,000 records. Each counterparty would have a risk value for 212 future time points (212 timepoints because this date range covers the longest trade possible) , because streaminsight doesnt support any embedded type that represent the Risk Vector ( array of timepoints and the risk value at each timepoint)

    Reference data for Risk Exposures became 10000 x 212 =2,120,000 objects

    Allowed credit limits for these counterparties would again be 10000 x Approximately 10 time bands that cover these 212 Risk Exposure time points = 100,000 objects

    Whenever there is an incoming trade that needs to be checked if it caused any breaches in limits , the number of joins were quickly becoming

    1 (trade) x 2120000 X 100000 =>  This is with out considering other aggregation schemes that we have , every counterparty might contain a limit against a particular product type , which means that there would be an risk exposure reported for that counterparty for that product type against 212 time points, and also a limit corresponding to that Counterparty >> Product group.

    When i mocked up this volume , streaminsight was not able to generate any data.I mocked this up for you in linqpad here.

    The moment i set the counterpartyCount to 100 & above , it wouldnt respond.

    Is this a limitation or is there anything i could improve ?

    Skydrive Link:

    https://skydrive.live.com/redir?resid=B0E65D93CE548317!112&authkey=!ALUXhhUNCqYMZYc

    Tuesday, November 12, 2013 4:05 AM

All replies

  • First, you can't analyze performance at all with LinqPad, especially when you have multiple Dump() statements. Each Dump() statement will (in your model) create a new query tree, with multiple instances of the input adapters. So ... you're enqueuing your 10,000 reference events 3 times in your example. You can run into the same situation when running in production if you have multiple output adapters ... you'd need to use DQC (Dynamic Query Composition) to re-use the input from the input adapters without creating new instances of the input adapters. You can see if this is happening to you by adding some logging into the Adapter Factory to see how many times they are being instantiated.

    Now, to get an idea of what's going on in production, I'd look at some performance counters, starting with, in your case, the input adapter queue. You'll also have an events/second counter. Take a look at thread contention as well ... you can run into this if you are creating threads additional threads in, say, a UDO or UDF either explicitly or by using AsParallel(). What happens in these cases is that the threads for processing the queries wind up contending with the threads in your UDx and none of them get the time they need to actually operate. On the flip side, depending on the scenario, you might actually wind up doing better with AsParallel() ... it will depend on where your bottleneck is. You might also want to tweak your CTI frequency. Finally, the edition of StreamInsight that you are using will also make a difference. With the number of reference events that you have, you will probably need to look at Premium rather than Standard. And, of course, hardware is always a question. Speaking of which, you never did mention what your CPU utilization was while you were running this. Oh ... and when you are stress testing, use a Release build and don't attach a debugger. The VS debugger will fall all over itself trying to track all of the threads that get created at a high volume. That is never pretty.


    DevBiker (aka J Sawyer)
    Microsoft MVP - Sql Server (StreamInsight)


    Ruminations of J.net


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

    Wednesday, November 13, 2013 1:40 AM
    Moderator
  • I am using Evaluation edition of streaminsight which i believe is equivalent of premium edition.

    I was running this on a windows server 2008 R2, 8 cores and the CPU usage when i run the query hits 100%

    I changed the query from

    var joinQuery = from exposure in alteredExposureStream
    						from limit in alteredLimitStream
    							from trade in tradeStream
    							 let AdjustedRiskValue= exposure.RiskValue + trade.AddOnExposure
    								where exposure.PortfolioName==trade.PortfolioName && limit.PortfolioName==exposure.PortfolioName
    									&& (exposure.RiskTimePoint>= limit.LimitStartDate && exposure.RiskTimePoint < limit.LimitEndDate) 
    									&& AdjustedRiskValue > limit.LimitThreshold
    										select new {
    														Trade=trade,
    														Limit=limit,
    														Exposure=exposure,
    														AdjustedRiskValue=AdjustedRiskValue
    													};

    to

    var riskLimitStream = from exposure in alteredExposureStream
    						from limit in alteredLimitStream
    							where  limit.PortfolioName==exposure.PortfolioName && (exposure.RiskTimePoint>= limit.LimitStartDate && exposure.RiskTimePoint < limit.LimitEndDate) 
    									select new 
    										{
    											Risk = exposure,
    											Limit=limit
    										};
    										
    var joinQuery = from riskLimit in riskLimitStream
    							from trade in tradeStream
    							 let AdjustedRiskValue= riskLimit.Risk.RiskValue + trade.AddOnExposure
    								where riskLimit.Risk.PortfolioName==trade.PortfolioName 
    									&& AdjustedRiskValue >  riskLimit.Limit.LimitThreshold
    										select new {
    														Trade=trade,
    														Limit= riskLimit.Limit,
    														Exposure= riskLimit.Risk,
    														AdjustedRiskValue=AdjustedRiskValue
    													};

    it started  responding. What is the difference between the two in that the latter query performs better than the former.

    I got rid of all the dump statements except the last dump which outputs the results of the last join that i am interested in. The whole program is below. I understand that Linqpad isn't the right way to benchmark performance but this allowed me to quickly Mockup stuff.  There are no user defined functions or User defined aggregates in my program.

    Right now for 10000 counterparties it takes about ~13 seconds (

    Portfolio Exposures Count:80000
    Limits Count  :30000
    Trades Count  :8

    Total join count
    2020130816)

    Complete program : Do you reckon there is anything wrong.

    // Increasing this decreases the performance
    int counterpartyCount =10000;
    //Total of 212 timepoints for which exposure is caluclated, this is decreased set.
    DateTime[] dates = new DateTime[] {new DateTime(2013,11,08),new DateTime(2013,11,10),new DateTime(2013,12,21),new DateTime(2014,11,07),new DateTime(2016,11,07),new DateTime(2017,11,07),new DateTime(2018,11,07),new DateTime(2019,11,07)};
    DateTime[] limitDates = new DateTime[] {            DateTime.Now.Date,
                                                            DateTime.Now.Date.AddYears(1),  
                                                            DateTime.Now.Date.AddYears(2),
                                                           	DateTime.Now.Date.AddYears(6),
                                                           
                                                          };
    DateTime eventValidityDate=DateTime.Now;
    Random rand =new Random();
    void Main()
    {
    
    	var portfolioExposures=Getportfolios();
    	
    	var exposurecount=portfolioExposures.Count();
    	Console.WriteLine("Portfolio Exposures Count:"+exposurecount);
    	
    	
    	var limits=GetLimitsWithRange();
    	
    	var limitscount=limits.Count();
    	Console.WriteLine("Limits Count  :"+ limitscount);
    	// Portfolio Interval Stream
    	var portfolioExposuresStream = portfolioExposures.ToIntervalStream(Application, t=> IntervalEvent.CreateInsert(t.EventValidityDate, t.EventValidityDate.AddTicks(1), t), AdvanceTimeSettings.IncreasingStartTime);
    	
    	// Limit Interval Stream
    	var limitStream = limits.ToIntervalStream(Application, t=> IntervalEvent.CreateInsert(t.EventValidityDate, t.EventValidityDate.AddTicks(1), t), AdvanceTimeSettings.IncreasingStartTime);
    	
       
    	
    	//Alter the Portfolio EventLifeTimes
    	var alteredExposureStream = from y in portfolioExposuresStream.AlterEventLifetime(
    											y2 => y2.StartTime.Date, 
    											y2 => TimeSpan.FromDays(1))
    									select y;
    									
       // Dump Exposure Stream
    //   (from y in alteredExposureStream.ToIntervalEnumerable()
    //		where y.EventKind == EventKind.Insert
    //		orderby y.Payload.PortfolioName , y.Payload.RiskTimePoint 
    //		select y).Dump("Exposure Stream");
    	
       //Alter the limit EventLifeTimes
    	var alteredLimitStream = from y in limitStream.AlterEventLifetime(
    											y2 => y2.StartTime.Date, 
    											y2 => TimeSpan.FromDays(1))
    									select y;
    									
        // Dump Limit Stream
    //   (from y in alteredLimitStream.ToIntervalEnumerable()
    //		where y.EventKind == EventKind.Insert
    //		orderby y.Payload.PortfolioName , y.Payload.LimitStartDate,y.Payload.LimitEndDate 
    //		select y).Dump("Altered Limit Stream");
    	
    	var trades = new []
    							{
    								new {PortfolioName="Counterparty1",PortfolioType="Counterparty",AddOnExposure=rand.Next(10000),TradeTime=DateTime.Now},
    								new {PortfolioName="Counterparty21",PortfolioType="Counterparty",AddOnExposure=rand.Next(10000),TradeTime=DateTime.Now},
    								new {PortfolioName="Counterparty41",PortfolioType="Counterparty",AddOnExposure=rand.Next(10000),TradeTime=DateTime.Now},
    								new {PortfolioName="Counterparty65",PortfolioType="Counterparty",AddOnExposure=rand.Next(10000),TradeTime=DateTime.Now},
    								new {PortfolioName="Counterparty13",PortfolioType="Counterparty",AddOnExposure=rand.Next(10000),TradeTime=DateTime.Now},
    								new {PortfolioName="Counterparty61",PortfolioType="Counterparty",AddOnExposure=rand.Next(10000),TradeTime=DateTime.Now},
    								new {PortfolioName="Counterparty13",PortfolioType="Counterparty",AddOnExposure=rand.Next(10000),TradeTime=DateTime.Now},
    								new {PortfolioName="Counterparty11",PortfolioType="Counterparty",AddOnExposure=rand.Next(10000),TradeTime=DateTime.Now}
    							};
    							
    	var tradeCount= trades.Count();
    	
    	Console.WriteLine("Trades Count  :"+ tradeCount);			
    	
    	var totalJoinCount= exposurecount * limitscount * tradeCount;
    	
    	totalJoinCount.Dump("Total join count");
    							
    	// Convert trade to PointStream
    	var tradeStream = trades.ToPointStream(Application,t=>PointEvent.CreateInsert(t.TradeTime,t),AdvanceTimeSettings.IncreasingStartTime);
    	
    //Join trade, exposure and Limit stream and find if there is a limit breach because of this trade 
    //This limit stream has a difference limit range (limits for next 1 year, 2 years etc) instead of lining up limits
    //with exposure dates
    //var joinQuery = from exposure in alteredExposureStream
    //						from limit in alteredLimitStream
    //							from trade in tradeStream
    //							 let AdjustedRiskValue= exposure.RiskValue + trade.AddOnExposure
    //								where exposure.PortfolioName==trade.PortfolioName && limit.PortfolioName==exposure.PortfolioName
    //									&& (exposure.RiskTimePoint>= limit.LimitStartDate && exposure.RiskTimePoint < limit.LimitEndDate) 
    //									&& AdjustedRiskValue > limit.LimitThreshold
    //										select new {
    //														Trade=trade,
    //														Limit=limit,
    //														Exposure=exposure,
    //														AdjustedRiskValue=AdjustedRiskValue
    //													};
    													
    var riskLimitStream = from exposure in alteredExposureStream
    						from limit in alteredLimitStream
    							where  limit.PortfolioName==exposure.PortfolioName && (exposure.RiskTimePoint>= limit.LimitStartDate && exposure.RiskTimePoint < limit.LimitEndDate) 
    									select new 
    										{
    											Risk = exposure,
    											Limit=limit
    										};
    										
    var joinQuery = from riskLimit in riskLimitStream
    							from trade in tradeStream
    							 let AdjustedRiskValue= riskLimit.Risk.RiskValue + trade.AddOnExposure
    								where riskLimit.Risk.PortfolioName==trade.PortfolioName 
    									&& AdjustedRiskValue >  riskLimit.Limit.LimitThreshold
    										select new {
    														Trade=trade,
    														Limit= riskLimit.Limit,
    														Exposure= riskLimit.Risk,
    														AdjustedRiskValue=AdjustedRiskValue
    													};
    
    													
        var stopWatch= new System.Diagnostics.Stopwatch();
    	stopWatch.Start();
    	
    	//Output the Join	
    	(from y in joinQuery.ToIntervalEnumerable()
    		where y.EventKind == EventKind.Insert
    		orderby y.Payload.Exposure.PortfolioName , y.Payload.Exposure.RiskTimePoint 
    		select y).Dump("Join Query");
    	
    	stopWatch.Stop();
    	
    	stopWatch.ElapsedMilliseconds.Dump("Join query in milli seconds");
    }
    
    #region Helper Functions
    //Only Counterparty Exposures reported, Ideally in our world PortfolioTypes would be {Counterparty,Product,AssetGroup}
    PortfolioExposure[] Getportfolios()
    {
    	
    	var exposures =new List<PortfolioExposure>();
    	for (int i=1;i<=counterpartyCount;i++)
    	{
    		var pfos= dates.Select (exposureDate=>
    								new PortfolioExposure{
    												PortfolioName="Counterparty"+i,
    												PortfolioType="Counterparty",
    												RiskValue=rand.Next(1000000),
    												RiskTimePoint=exposureDate,
    												EventValidityDate=eventValidityDate
    												});
    		exposures.AddRange(pfos);
    	}
    		
    	return exposures.ToArray();
    }
    
    
    //Limits in real world are assigned for 1 years, two years ,5 years etc..Trade can mature any day and this maturity date would fall in 
    //to the limit range. This objects represents the right real world example and should be more efficient as it would reduce the number of 
    //joins
    LimitWithRange[] GetLimitsWithRange()
    {
    	var limits =new List<LimitWithRange>();
    	
    	for (int i=1;i<=counterpartyCount;i++)
    	{
    		for (int j=0;j< limitDates.Length-1;j++)
    		{
    			limits.Add(new LimitWithRange{
    												PortfolioName="Counterparty"+i,
    												PortfolioType="Counterparty",
    												LimitThreshold=rand.Next(1000000),
    												LimitStartDate=j==0? limitDates[j]: limitDates[j].AddDays(1),
    												LimitEndDate= limitDates[j+1],
    												EventValidityDate=eventValidityDate
    												});
    		}
    		
    	}
    	
    	return limits.ToArray();
    }
    
    class PortfolioExposure
    {
    	public string PortfolioName{get;set;}
    	public string PortfolioType{get;set;}
    	public double RiskValue{get;set;}
    	public DateTime RiskTimePoint{get;set;}
    	public DateTime EventValidityDate{get;set;}
    }
    
    
    class LimitWithRange
    {
    	public string PortfolioName{get;set;}
    	public string PortfolioType{get;set;}
    	public double LimitThreshold{get;set;}
    	public DateTime LimitStartDate{get;set;}
    	public DateTime LimitEndDate{get;set;}
    	public DateTime EventValidityDate{get;set;}
    }
    #endregion


    Friday, November 15, 2013 4:40 AM
  • i believe the trick here is to reduce the number of objects in exposure stream and limit stream so that the number of joins reduce.

    10,000 portfolios means 10,000 Exposure objects is not a great deal , but because there are 212 exposures time points for each Object( on average even if there are 100 exposure points)  and because there is no embedded object support in streaminsight 10000 objects quickly become  10,000 x 100 = 1,000,000 Exposure reference objects

    Likewise for Limits there would be 12 time buckets for these 10,000 portfolios there would be 10,000 x 12 = 120,000 limit objects

    So every time a trade comes it has to be joined with the exposure stream and the limit stream that leads to

    1,000,000 x 120,000 = 120,000,000,000

    I had put logs to see if input adapters are getting instantiated twice but i don't see them getting instantiated more than once as i have just one output adapter at the moment.

    I ran binaries built in  release mode on windows server with 6 cores ,reduced the exposure timepoints from 212 to 8 and ran the program.

    With the below volume and it took close to a minute to for the trade output to appear. You reckon with this volume this is the latency expected or am i still doing something wrong

    2013-11-15 16:40:08,977[17] INFO  Fetching data....
    2013-11-15 16:40:08,980[17] INFO  Starting task - Generating Mock Limits
    2013-11-15 16:40:09,272[17] INFO  Limits count 175483
    2013-11-15 16:40:09,273[17] INFO  Completed task - Generating Mock Limits - Duration: 292(ms)
    2013-11-15 16:40:23,970[13] INFO  --------------------------------
    2013-11-15 16:40:23,971[13] INFO  Fetching data....
    2013-11-15 16:40:23,979[13] INFO  Starting task - Generating Mock Risk Exposures
    2013-11-15 16:40:24,143[13] INFO  RiskExposures count 127624
    2013-11-15 16:40:24,810[13] INFO  Completed task - Generating Mock Risk Exposures - Duration: 831(ms)

    ----------------------------------------------------------------------------------------------------------------------------------

    2013-11-15 16:40:33,817[16] INFO  Generated Data INSERT - 11/15/2013 05:40:33 +00:00 Event:
            Portfolio:Counterparty18, AddOnExposure:72288.2419437395, PortfolioType:Counterparty,
    2013-11-15 16:40:33,818[16] INFO  Generated Data INSERT - 11/15/2013 05:40:33 +00:00 Event:
            Portfolio:Counterparty18/ANZ BANKING GROUP LTD, AddOnExposure:72288.2419437395, PortfolioType:ANZEntity,
    2013-11-15 16:40:33,819[16] INFO  Generated Data INSERT - 11/15/2013 05:40:33 +00:00 Event:
            Portfolio:Counterparty18/Bond Forwards, AddOnExposure:72288.2419437395, PortfolioType:Product,
    2013-11-15 16:40:33,820[16] INFO  Generated Data INSERT - 11/15/2013 05:40:33 +00:00 Event:
            Portfolio:Counterparty18/Treasury, AddOnExposure:72288.2419437395, PortfolioType:RazorAssetGroup,
    2013-11-15 16:41:03,880[17] INFO  --------------------------------

    2013-11-15 16:41:06,645[18] INFO  OUTPUT: Point at 05:40:33.817:
        AdjustedRiskValue = 10161487.0831113
        Exposure = 10089198.8411676
        Limit = 8748401.38607203
        TimePoint = 02/24/2014 00:00:00
        Trade.AddOnExposure = 72288.2419437395
        Trade.Portfolio = Counterparty18/Treasury
        Trade.PortfolioType = RazorAssetGroup
    2013-11-15 16:41:06,647[18] INFO  OUTPUT: Point at 05:40:33.817:
        AdjustedRiskValue = 8606818.98183651
        Exposure = 8534530.73989278
        Limit = 156184.799888218
        TimePoint = 01/25/2073 00:00:00
        Trade.AddOnExposure = 72288.2419437395
        Trade.Portfolio = Counterparty18/Bond Forwards
        Trade.PortfolioType = Product
    2013-11-15 16:41:06,648[18] INFO  OUTPUT: Point at 05:40:33.817:
        AdjustedRiskValue = 5143112.82156981
        Exposure = 5070824.57962607
        Limit = 133218.767187789
        TimePoint = 02/24/2014 00:00:00
        Trade.AddOnExposure = 72288.2419437395
        Trade.Portfolio = Counterparty18/ANZ BANKING GROUP LTD
        Trade.PortfolioType = ANZEntity
    2013-11-15 16:41:06,651[18] INFO  OUTPUT: CTI at 11/15/2013 05:40:33 +00:00
    2013-11-15 16:41:07,826[18] INFO  OUTPUT: Point at 05:41:03.882:
        AdjustedRiskValue = 8120921.17907126
        Exposure = 8097378.87930098
        Limit = 277398.989384491
        TimePoint = 01/25/2073 00:00:00
        Trade.AddOnExposure = 23542.2997702827
        Trade.Portfolio = Counterparty226
        Trade.PortfolioType = Counterparty
    2013-11-15 16:41:07,833[18] INFO  OUTPUT: Point at 05:41:03.882:
        AdjustedRiskValue = 11946519.4627564
        Exposure = 11922977.1629862
        Limit = 1556981.37841387
        TimePoint = 02/24/2014 00:00:00
        Trade.AddOnExposure = 23542.2997702827
        Trade.Portfolio = Counterparty226
        Trade.PortfolioType = Counterparty
    2013-11-15 16:41:07,841[18] INFO  OUTPUT: Point at 05:41:03.882:
        AdjustedRiskValue = 5043834.07040115
        Exposure = 5020291.77063087
        Limit = 1436402.51533861
        TimePoint = 01/25/2073 00:00:00
        Trade.AddOnExposure = 23542.2997702827
        Trade.Portfolio = Counterparty226/Bond Repos
        Trade.PortfolioType = Product
    2013-11-15 16:41:07,850[18] INFO  OUTPUT: CTI at 11/15/2013 05:41:03 +00:00

    Friday, November 15, 2013 6:16 AM
  • A couple of comments/questions:

    First, it typically takes a minute or so for things to get started up, especially with the amount of reference data you have. That needs to be filled into the stream and that will take some amount of time. Also, on startup, StreamInsight needs to parse and create the internal query structures.

    Second, is this how you are planning on running in production? Or are you going to be hooking into a live inbound stream? Here's what I've seen ... when replaying events from a database, you can dump a lot of events in that cover a long period of time very, very quickly. Depending on how your CTIs are generated, this can kick of a lot of evaluations. If you have IncreasingStartTime as your AdvanceTimeGenerationSettings and you immediately dump in 15,000 events, all with different timestamps, that span the space of an hour of application time and all have different start times, you'll be kicking off 15,000 evaluations in the span of a second or two. That will spike your CPU. Adjusting your AdvanceTimeGenerationSettings will help with some of that but that also impacts your latency.

    And yes, reducing the number of joins and join evaluations *will* help. StreamInsight is pretty efficient with CPU usage and CPUs today are pretty powerful, but it's still limited.

    One thing that jumps out at me is that it looks like your input adapters are being started in series, rather than in parallel. When starting them, are you creating a new thread to create the new events?

    I'll try to look at this in more detail later and let you know if I have additional comments.


    DevBiker (aka J Sawyer)
    Microsoft MVP - Sql Server (StreamInsight)


    Ruminations of J.net


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

    Friday, November 15, 2013 12:19 PM
    Moderator
  • First, it typically takes a minute or so for things to get started up, especially with the amount of reference data you have. That needs to be filled into the stream and that will take some amount of time. Also, on startup, StreamInsight needs to parse and create the internal query structures.

    1 min for the join’s to produce output after all the bootstrapping is done.

    Second, is this how you are planning on running in production? Or are you going to be hooking into a live inbound stream? Here's what I've seen ... when replaying events from a database, you can dump a lot of events in that cover a long period of time very, very quickly. Depending on how your CTIs are generated, this can kick of a lot of evaluations. If you have IncreasingStartTime as your AdvanceTimeGenerationSettings and you immediately dump in 15,000 events, all with different timestamps, that span the space of an hour of application time and all have different start times, you'll be kicking off 15,000 evaluations in the span of a second or two. That will spike your CPU. Adjusting your AdvanceTimeGenerationSettings will help with some of that but that also impacts your latency.

    Typically our reference data comes overnight as a result of some computation process that runs on a cluster. It produces the exposures for all the 50k portfolios. Each portfolio has an exposure on an average of 100 timepoints. All this reference data is valid until the next day. Therefore all events would have the same time .Some portfolio exposures might change during the day which should invalidate the old portfolio Exposure that was loaded overnight. Similar thing happens with Limits reference data except that it only changes overnight and not change during the day. Once the reference data is loaded any PreDeal Check by the trader on a trade would map to one of the portfolio and one of the limit. It’s about finding the right portfolio and right limit and checking if the limit has exceeded because of this trade.

    And yes, reducing the number of joins and join evaluations *will* help. StreamInsight is pretty efficient with CPU usage and CPUs today are pretty powerful, but it's still limited.

    Two options here to reduce the join’s

    1. Possibly put this application on a cluster with each node only catering to a few portofolio’s and the PDC check request goes to the cluster and only the node having reference data pertaining to that trade emits out the results while the other nodes would not.
    2. Streaminsight provides Userdefined types support for the payload members instead of just the primitive types. This would mean my 50k portfolio objects would not become 50k x 100 (exposure time points).

    One thing that jumps out at me is that it looks like your input adapters are being started in series, rather than in parallel. When starting them, are you creating a new thread to create the new events?

    Each input adapter instance has a timer and each starts after a certain delay and a certain frequency. The reason I did this is to simulate the real world scenario. Trades can come any time. Exposures and limits can come any time. First few trades that come in when there is no reference data would not produce any output. Once both Exposure reference stream and limits reference stream are loaded join’s would start emitting output.

    My question to you would be

    1. In the post above if you try to run the program in linqpad using join1 with > 1000 counterparties , it would take for ever to return data. However Join2 would give the data back in about a few seconds. Why is join 1 performing worse than join2 ?
    2. What are the number of join’s that streaminsight can handle with latency only in the order of milliseconds.


    • Edited by vijay_g Saturday, November 16, 2013 2:59 AM
    Saturday, November 16, 2013 2:58 AM
  • You say "1 min after bootstrapping is done" ... what do you include in "bootstrapping"? How long after your reference data is enqueued does the first event appear?

    Second, it's not the reference data being enqueued in a rush that I was asking about, it's the actual data that is moving the timeline forward. It's not just the raw "events/second" that you need to consider but the CTIs per second that you are generating. If you are enqueuing data that was recorded over, say, a full day in a matter of seconds, you'll be issuing a TON of CTIs in a very short period of time. The CTIs drive the scheduling of the queries and are based on application, not clock, time. So ... if you have 12 hours worth of events at 50 events/second, you have about 2.1 million events. If you open a data reader and just dump them as fast as you can, you could maybe enqueue them in, say, 1 minute. If you have your AdvanceTimeGenerationSettings configured to issue a CTI every second, that'll be 43000 CTIs generated in a minute ... that's 43K scheduled query executions where all of your joins are running or 720 CTIs/second. That's be 720x faster than you'd actually run in production with the same event load and it will back things up. If you are using IncreasingStartTime, you could well wind up with even more CTIs in that timeframe. Make sense?

    Now ... as for LinqPad, every time you do Dump(), it creates a new process tree. That'll mean that it creates your observables for each process. So ... you wind up enqueing all of your reference data for each time you call Dump(). I suspect that's what you are facing. And this is also why LinqPad is good for modeling queries but NOT at all useful for testing performance. And how many joins? I've done quite a few joins and still had very good performance. But I've not done joins on the order of magnitude of possible joins that you're doing. What is your expected event rate in production, by the way? And how are you planning on handling your CTIs?


    DevBiker (aka J Sawyer)
    Microsoft MVP - Sql Server (StreamInsight)


    Ruminations of J.net


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

    Sunday, November 17, 2013 7:48 PM
    Moderator

  • 1.Bootstrapping involves starting Input adapters for trade, limit and exposure.Each has its own delay (trade first , limit next and exposure later)

    var tradeInputCfg = new GeneratorConfig()
                        {
                            EventInterval = TimeSpan.Parse("00:00:30"),
                            EventIntervalVariance = 250,
                            Delay = TimeSpan.Parse("00:00:10"),
                            Name="Trade"
                        };
     var referenceCfg = new GeneratorConfig()
                                               {
                                                   Directory = ConfigurationManager.AppSettings["ResultsPath"],
                                                   EventInterval = TimeSpan.Parse("00:05:00"),
                                                   EventIntervalVariance = 0,
                                                   Delay = TimeSpan.Parse("00:00:30"),
                                                   Name = "Exposure"
                                               };
      var creditLimitCfg = new GeneratorConfig()
                        {
                            Directory = ConfigurationManager.AppSettings["ResultsPath"],
                            EventInterval = TimeSpan.Parse("00:05:00"),
                            EventIntervalVariance = 0,
                            Delay = TimeSpan.Parse("00:00:15"),
                            Name = "Limits"
    
                        };


    2. Limit and Exposure adapters don't insert any CTI's , they import CTI's from trade stream

     var tradeStream = CepStream<PdcTrade>.Create(cepApp,
                            "tradeStream", typeof(GeneratorFactory),
                            tradeInputCfg, EventShape.Point);
    
                       
    var timeImportSettings = new AdvanceTimeSettings(null,
                            new AdvanceTimeImportSettings("tradeStream"),
                            AdvanceTimePolicy.Adjust);
    
     var referenceStream = CepStream<RiskValueAtPoint>.Create(
                            "exposureReferenceStream", typeof(GeneratorFactory),
                            referenceCfg, EventShape.Interval, timeImportSettings);
    
     var creditLimitStream = CepStream<GranularCreditLimit>.Create(
                            "creditLimitStream", typeof(GeneratorFactory),
                            creditLimitCfg, EventShape.Interval, timeImportSettings);

    3. Once all streams have loaded data , any trade (Fired every 30 seconds) that comes later on inserts a CTI which is imported into reference streams and that when i count the time taken for the join to produce result. With the number of reference events in 100,000's , output takes quite some time to produce result.

    4. I couldn't reproduce all this in Linqpad and hence in reference data stream i used AdvanceTimeSettings.IncreasingStartTime to insert CTI's.

    What is your expected event rate in production, by the way? And how are you planning on handling your CTIs?

    we will have about 80,000  exposure values during the boot up process , each has a risk vector of 100 time points on an average. I convert them to flat list which makes them 8 million records for exposure stream. Limits would be 800k during bootup. There could be a max of 10 trades per second through out the day (That frequency would not always be there , its only when a user does a pre-deal check).

    We will have about 10k updates on an average to the exposure stream across the day and none for the limit stream during the day.

    Every pre-deal check ( may be 10 requests in a second or 1 request in a second ,it really depends on the traders usage of the system) would have to be checked against the reference data in the system to get its relevant bucket. While the pre-deal check is running , there could be updates to the reference stream. Pre-deal check requests arriving after the exposure update should use the newer reference data. The system has to support concurrent run of pre-deal checks and updates to reference streams. I was imagining that temporal nature of the streaminsight's server would handle this nicely with out any locking as the current running pre-deal checks would use older reference data  and whenever there is new reference data subsequent pre-deal checks would use the newer reference data thanks to the ClipEventDuration and the import of CTI's from trade stream in to reference stream.

    However at any point in time we would have 80k exposure objects with each object having values across 100 time points and 80k limit objects(8 million queued at the engine) with each limit having values across 10 time points(800k queued to the engine). These are the numbers that the trade should be joined against.




    • Edited by vijay_g Monday, November 18, 2013 9:09 AM
    Monday, November 18, 2013 8:47 AM