Answered Rx vs Synchronous

  • Thursday, October 04, 2012 12:12 PM
     
      Has Code

    Hi,

    I have a question comparing Reactive Extensions with Synchronous methods. I have the following setup:

    List<byte[]> frames = new List<byte[]>();
    for (int i = 0; i < numPackages; i++)
    {
    	frames.Add(CreateMeasurementPackage(numMeasProPackage, (byte)i, 0));
    }
    Stopwatch sw1 = new Stopwatch(), sw2 = new Stopwatch(), sw3 = new Stopwatch(); ;
    syncList = new List<DataPoint>();
    asyncList = new List<DataPoint>();
    var services = ServiceFactory.CreateServices();
    
    int j, x = 0;
    sw1.Start();
    for (j = 0; j < numPackages; j++)
    {
    	services.ProcessService.ProcessMeasurements(frames[j]).Subscribe(y => { }, () =>
    	{
    		System.Threading.Interlocked.Increment(ref x);
    		if (x == numPackages - 1)
    		{
    			sw1.Stop();
    			Console.WriteLine("asyncRx" + sw1.ElapsedMilliseconds);
    		}
    	});
    }
    sw3.Start();
    for (int f = 0; f < numPackages; f++)
    {
    	await services.ProcessService.ProcessPointsAsync(frames[f]);
    }
    sw3.Stop();
    Console.WriteLine("async" + sw3.ElapsedMilliseconds);
    
    sw2.Start();
    for (int f = 0; f < numPackages; f++)
    {
    	services.ProcessService.ProcessPoints(frames[f]);
    }
    sw2.Stop();
    Console.WriteLine("sync" + sw2.ElapsedMilliseconds);
    

    And when the ouput comes on the console window the results are roughly:
    Sync 200ms
    AsyncRX: 18000ms
    Async: 27000ms

    The ProcessPoint methods are listed below:

    public IEnumerable<DataPoint> ProcessPoints(byte[] frame)
    {
    	if (!ASCIIEncoding.ASCII.GetString(frame, 0, 5).ToLower().Equals("error"))
    	{
    		byte modulenumber = 0;
    		int startIndex = 0, length = 0;
    		//length
    		length = (frame[10] << 8) | frame[11];
    		//modulenumber
    		modulenumber = frame[12];
    		//startIndex
    		for (int i = 0; i < 4; i++)
    		{
    			startIndex <<= 8;
    			startIndex |= frame[13 + i];
    		}
    		byte j = 0;
    		//Parallel.For(0, length*6, i =>
    		for (int i = 0; i < length * 6; i += 6, j++)
    		{
    			if (j == 12)
    			{
    				j %= 12;
    				startIndex++;
    			}
    
    			int max = 0;
    			ushort position = 0;
    			int z = 0;
    			for (; z < 4; z++)
    			{
    				max <<= 8;
    				max |= frame[17 + i + z];
    			}
    			for (; z < 6; z++)
    			{
    				position <<= 8;
    				position |= frame[17 + i + z];
    			}
    			yield return new DataPoint()
    			{
    				Channel = j,
    				Index = startIndex,
    				Max = max,
    				Position = position,
    				Module = modulenumber
    			};
    		}
    	}
    	yield break;
    }/*ProcessPoints*/


    private delegate IEnumerable<DataPoint> ProcessPointsDelegate(byte[] frame);
    public async Task<IEnumerable<DataPoint>> ProcessPointsAsync(byte[] frame)
    {
    	ProcessPointsDelegate del = ProcessPoints;
    	return await Task.Factory.FromAsync<byte[], IEnumerable<DataPoint>>(del.BeginInvoke, del.EndInvoke, frame, null); 
    }


    public IObservable<DataPoint> ProcessMeasurements(byte[] frame)
    {
    	return Observable.Create<DataPoint>(x =>
    		Observable.ToAsync<byte[], IEnumerable<DataPoint>>(ProcessPoints, System.Reactive.Concurrency.TaskPoolScheduler.Default)(frame) .SelectMany(y => y).Subscribe(x));
    }/*ProcessMeasurements*/

    I find it strange that Async execution is SO much slower then sync execution. Can anyone explain this? Are there any improvements to be done to my code?

    Sincerely,
    Brecht

All Replies

  • Thursday, October 04, 2012 12:49 PM
     
     

    Hi Brecht,

    Please post a short but complete program.  For example, where are the definitions of numPackages and CreateMeasurementPackage?  Are there other pieces that you left out?

    - Dave


    http://davesexton.com/blog

  • Thursday, October 04, 2012 12:57 PM
     
      Has Code

    Hi Dave,

    I didn't think these pieces were important, since they are the same for every of the 3 methods. But here they are:

    const int numPackages = 100000, numMeasProPackage = 1000;

    static byte[] CreateMeasurementPackage(short numberOfMeasurements, byte moduleNr, int StartIndex)
    {
    	byte[] frame = new byte[17 + numberOfMeasurements * 6];
    	//header
    	byte[] header = System.Text.Encoding.ASCII.GetBytes("MEASUREMNT");
    	header.CopyTo(frame, 0);
    	//measurements
    	byte[] measNr = BitConverter.GetBytes(numberOfMeasurements);
    	measNr = measNr.Reverse().ToArray();
    	measNr.CopyTo(frame, 10);
    	//moduleNr
    	frame[12] = moduleNr;
    	//Start index
    	byte[] index = BitConverter.GetBytes(StartIndex);
    	index = index.Reverse().ToArray();
    	index.CopyTo(frame, 13);
    
    	byte[] payload = new byte[numberOfMeasurements * 6];
    	rand.NextBytes(payload);
    	payload.CopyTo(frame, 17);
    	return frame;
    }

    When I change one of the constants, the values will lead to the same conclusion: Synchronous is much faster.

    Sincerely,
    Brecht

  • Friday, October 05, 2012 7:50 AM
     
     Answered

    A few things to note here

    1) You are micro-benchmarking. The tests run so fast that you have other things that can cause a massive amount of disruption to your results (e.g a GC mid test). Eg when i run the tests on my machine sometimes I can get a TPL version with MaxDegreeOfParallelism = 1 running faster than the plain sync version. Obviously nonsense. I also get variations between 6ms and 11ms to complete the tests, each run I can get a different implementation out performing the other. However it is never the Parallel implementation that wins i.e. only Sync or MaxDegreeOfParallelism = 1

    2) (IMO) Synchronous should be faster. You are mainly performing memory manipulation here and not so much raw number crunching. I would imagine that adding many things to contend with the same blocks of memory would introduce contention, and thus slow it down.

    3) We want to avoid concurrency where ever we can. Concurrency introduces costs in complexity, allocations, context switching etc. So concurrency should pay for itself by I assume here you are looking for it to pay in a perf improvement. However, I think what you are producing/testing here is how many Tasks you can load up on the .NET Virtual Machine and how many allocations you can do instead of actually doing any work.

    If you wanted to increase the throughput of this raw CPU crunching  then you probably want to look at TPL. However, ProcessPoints barely does any work for a modern processor to worry about introducing concurrency. There is a lot of allocating, therefore you will probably incur more cost allocating tasks and creating threads and context switching between them than actually doing any work.

    With regards to Rx, it is a Framework for querying sequences of events. Yes you can use Rx to start async work items and then subscribe to it's results but that is not (IMO) the primary design goal of Rx.

    Where possible stay single threaded. Were you hoping to get a lot faster than 200ms for processing 100,000 5.5KB messages?

    I am keen to hear what the other people (smarter than me) have to say.

    Hope this helps

    Lee Campbell


    Lee Campbell http://LeeCampbell.blogspot.com



    • Edited by LeeCampbell Friday, October 05, 2012 7:51 AM smarter->smarter than me
    • Edited by LeeCampbell Friday, October 05, 2012 7:53 AM
    • Proposed As Answer by Dave Sexton Friday, October 05, 2012 12:20 PM
    • Marked As Answer by BrechtVsk Monday, October 08, 2012 7:00 AM
    •  
  • Friday, October 05, 2012 9:34 AM
     
     

    Hi Lee,

    Thanks for your reply. I think I misinterpreted the punchline of the Rx website:

    The Reactive Extensions (Rx)...

    ...is a library to compose asynchronous and event-based programs using observable collections and LINQ-style query operators.

    I thought it was for asynchronous programs or event-based programs, but (correct me if I'm wrong) the program you compose with Rx uses asynchronous event (you don't know when to expect an event and you can't be blocked waiting for it). Thanks for clarifying this. I now see that I'm using Rx in wrong places, call it Rx-fever (cfr design-patterns-fever).

    I'm making this program as part of my thesis, which is built upon the work of a former student that pointed out that parallel execution speeds up things. So I do hope to go even faster (yes FASTER :-) not even 1ms is fast enough) with parallel execution, but maybe not in this particular case.

    This discussion can also lead to a conclusion in my thesis that Rx is to be used in some specific cases, but that you don't have to RX-ify everthing in your program.

    Thanks again for the answer and my sincere apologies for the incoherent chit-chat, but I'm just posting my ideas here hoping for reactions, comments, ideas, suggestions, (spelling/grammar corrections),..


    Sincerely, Brecht

    (PS I'm not going to mark it as an answer yet, so maybe more people will take a look and comment to this, but I will do that in time)


    • Edited by BrechtVsk Friday, October 05, 2012 9:35 AM
    • Edited by BrechtVsk Friday, October 05, 2012 10:07 AM
    • Edited by BrechtVsk Friday, October 05, 2012 10:16 AM
    •  
  • Friday, October 05, 2012 1:18 PM
     
     Answered

    Hi,

    Lee's answer makes sense to me, though I haven't done an analysis of your code myself.  For one thing, I don't think it will compile as is.  You know, it's a courtesy to post a short but complete program so that it's easy for us to repro without wasting any time fixing the holes in your code.  Furthermore, when asking about performance benchmarking, it's absolutely necessary to post all of the pieces involved.

    > I have a question comparing Reactive Extensions with Synchronous methods

    Rx is primarily about composing queries and controlling concurrency.  It's less about introducing concurrency, though of course it supports that as well.

    IObservable<T> can be synchronous too.  So either you want to measure the synchronous throughput of a particular Rx component, such as Subject<T>, or you want to measure the parallel throughput of different schedulers, perhaps.  But it doesn't make sense to compare the total throughput of many synchronous iterators vs. many observables that wrap task-based concurrency wrapping synchronous iterators.

    Furthermore, consider averaging the throughput of each iteration instead of summing them.  Also run several iterations prior to the actual benchmarking and throw the results away to eliminate environmental factors such as processor caching.  And there's even more to consider than just this.

    http://blogs.msdn.com/b/ricom/archive/2010/04/07/variability-in-benchmarks.aspx
    http://blogs.msdn.com/b/ricom/archive/2010/04/26/a-few-words-about-micro-benchmarks.aspx

    - Dave


    http://davesexton.com/blog

    • Marked As Answer by BrechtVsk Monday, October 08, 2012 7:00 AM
    •  
  • Monday, October 08, 2012 6:45 AM
     
     

    Hi Dave,

    Thanks for the reply. I took a quick look at the 2 links you posted, and from what I read I can make out that what I'm doing is not representative for the performance. I'll need to benchmark greater parts of my application.

    Thanks for the help all in clarifying these matters.

    (PS next posts, I'll give complete examples. Sorry for the troubles)


    Sincerely, Brecht