Rx vs Synchronous
-
Thursday, October 04, 2012 12:12 PM
Hi,
I have a question comparing Reactive Extensions with Synchronous methods. I have the following setup:
List<byte[]> frames = new List<byte[]>(); for (int i = 0; i < numPackages; i++) { frames.Add(CreateMeasurementPackage(numMeasProPackage, (byte)i, 0)); } Stopwatch sw1 = new Stopwatch(), sw2 = new Stopwatch(), sw3 = new Stopwatch(); ; syncList = new List<DataPoint>(); asyncList = new List<DataPoint>(); var services = ServiceFactory.CreateServices(); int j, x = 0; sw1.Start(); for (j = 0; j < numPackages; j++) { services.ProcessService.ProcessMeasurements(frames[j]).Subscribe(y => { }, () => { System.Threading.Interlocked.Increment(ref x); if (x == numPackages - 1) { sw1.Stop(); Console.WriteLine("asyncRx" + sw1.ElapsedMilliseconds); } }); } sw3.Start(); for (int f = 0; f < numPackages; f++) { await services.ProcessService.ProcessPointsAsync(frames[f]); } sw3.Stop(); Console.WriteLine("async" + sw3.ElapsedMilliseconds); sw2.Start(); for (int f = 0; f < numPackages; f++) { services.ProcessService.ProcessPoints(frames[f]); } sw2.Stop(); Console.WriteLine("sync" + sw2.ElapsedMilliseconds);And when the ouput comes on the console window the results are roughly:
Sync 200ms
AsyncRX: 18000ms
Async: 27000msThe ProcessPoint methods are listed below:
public IEnumerable<DataPoint> ProcessPoints(byte[] frame) { if (!ASCIIEncoding.ASCII.GetString(frame, 0, 5).ToLower().Equals("error")) { byte modulenumber = 0; int startIndex = 0, length = 0; //length length = (frame[10] << 8) | frame[11]; //modulenumber modulenumber = frame[12]; //startIndex for (int i = 0; i < 4; i++) { startIndex <<= 8; startIndex |= frame[13 + i]; } byte j = 0; //Parallel.For(0, length*6, i => for (int i = 0; i < length * 6; i += 6, j++) { if (j == 12) { j %= 12; startIndex++; } int max = 0; ushort position = 0; int z = 0; for (; z < 4; z++) { max <<= 8; max |= frame[17 + i + z]; } for (; z < 6; z++) { position <<= 8; position |= frame[17 + i + z]; } yield return new DataPoint() { Channel = j, Index = startIndex, Max = max, Position = position, Module = modulenumber }; } } yield break; }/*ProcessPoints*/
private delegate IEnumerable<DataPoint> ProcessPointsDelegate(byte[] frame); public async Task<IEnumerable<DataPoint>> ProcessPointsAsync(byte[] frame) { ProcessPointsDelegate del = ProcessPoints; return await Task.Factory.FromAsync<byte[], IEnumerable<DataPoint>>(del.BeginInvoke, del.EndInvoke, frame, null); }
public IObservable<DataPoint> ProcessMeasurements(byte[] frame) { return Observable.Create<DataPoint>(x => Observable.ToAsync<byte[], IEnumerable<DataPoint>>(ProcessPoints, System.Reactive.Concurrency.TaskPoolScheduler.Default)(frame) .SelectMany(y => y).Subscribe(x)); }/*ProcessMeasurements*/I find it strange that Async execution is SO much slower then sync execution. Can anyone explain this? Are there any improvements to be done to my code?
Sincerely,
Brecht
All Replies
-
Thursday, October 04, 2012 12:49 PM
Hi Brecht,
Please post a short but complete program. For example, where are the definitions of numPackages and CreateMeasurementPackage? Are there other pieces that you left out?
- Dave
-
Thursday, October 04, 2012 12:57 PM
Hi Dave,
I didn't think these pieces were important, since they are the same for every of the 3 methods. But here they are:
const int numPackages = 100000, numMeasProPackage = 1000;
static byte[] CreateMeasurementPackage(short numberOfMeasurements, byte moduleNr, int StartIndex) { byte[] frame = new byte[17 + numberOfMeasurements * 6]; //header byte[] header = System.Text.Encoding.ASCII.GetBytes("MEASUREMNT"); header.CopyTo(frame, 0); //measurements byte[] measNr = BitConverter.GetBytes(numberOfMeasurements); measNr = measNr.Reverse().ToArray(); measNr.CopyTo(frame, 10); //moduleNr frame[12] = moduleNr; //Start index byte[] index = BitConverter.GetBytes(StartIndex); index = index.Reverse().ToArray(); index.CopyTo(frame, 13); byte[] payload = new byte[numberOfMeasurements * 6]; rand.NextBytes(payload); payload.CopyTo(frame, 17); return frame; }When I change one of the constants, the values will lead to the same conclusion: Synchronous is much faster.
Sincerely,
Brecht -
Friday, October 05, 2012 7:50 AM
A few things to note here
1) You are micro-benchmarking. The tests run so fast that you have other things that can cause a massive amount of disruption to your results (e.g a GC mid test). Eg when i run the tests on my machine sometimes I can get a TPL version with MaxDegreeOfParallelism = 1 running faster than the plain sync version. Obviously nonsense. I also get variations between 6ms and 11ms to complete the tests, each run I can get a different implementation out performing the other. However it is never the Parallel implementation that wins i.e. only Sync or MaxDegreeOfParallelism = 1
2) (IMO) Synchronous should be faster. You are mainly performing memory manipulation here and not so much raw number crunching. I would imagine that adding many things to contend with the same blocks of memory would introduce contention, and thus slow it down.
3) We want to avoid concurrency where ever we can. Concurrency introduces costs in complexity, allocations, context switching etc. So concurrency should pay for itself by I assume here you are looking for it to pay in a perf improvement. However, I think what you are producing/testing here is how many Tasks you can load up on the .NET Virtual Machine and how many allocations you can do instead of actually doing any work.
If you wanted to increase the throughput of this raw CPU crunching then you probably want to look at TPL. However, ProcessPoints barely does any work for a modern processor to worry about introducing concurrency. There is a lot of allocating, therefore you will probably incur more cost allocating tasks and creating threads and context switching between them than actually doing any work.
With regards to Rx, it is a Framework for querying sequences of events. Yes you can use Rx to start async work items and then subscribe to it's results but that is not (IMO) the primary design goal of Rx.
Where possible stay single threaded. Were you hoping to get a lot faster than 200ms for processing 100,000 5.5KB messages?
I am keen to hear what the other people (smarter than me) have to say.
Hope this helps
Lee Campbell
Lee Campbell http://LeeCampbell.blogspot.com
- Edited by LeeCampbell Friday, October 05, 2012 7:51 AM smarter->smarter than me
- Edited by LeeCampbell Friday, October 05, 2012 7:53 AM
- Proposed As Answer by Dave Sexton Friday, October 05, 2012 12:20 PM
- Marked As Answer by BrechtVsk Monday, October 08, 2012 7:00 AM
-
Friday, October 05, 2012 9:34 AM
Hi Lee,
Thanks for your reply. I think I misinterpreted the punchline of the Rx website:
The Reactive Extensions (Rx)...
...is a library to compose asynchronous and event-based programs using observable collections and LINQ-style query operators.
I thought it was for asynchronous programs or event-based programs, but (correct me if I'm wrong) the program you compose with Rx uses asynchronous event (you don't know when to expect an event and you can't be blocked waiting for it). Thanks for clarifying this. I now see that I'm using Rx in wrong places, call it Rx-fever (cfr design-patterns-fever).
I'm making this program as part of my thesis, which is built upon the work of a former student that pointed out that parallel execution speeds up things. So I do hope to go even faster (yes FASTER :-) not even 1ms is fast enough) with parallel execution, but maybe not in this particular case.
This discussion can also lead to a conclusion in my thesis that Rx is to be used in some specific cases, but that you don't have to RX-ify everthing in your program.
Thanks again for the answer and my sincere apologies for the incoherent chit-chat, but I'm just posting my ideas here hoping for reactions, comments, ideas, suggestions, (spelling/grammar corrections),..
Sincerely, Brecht
(PS I'm not going to mark it as an answer yet, so maybe more people will take a look and comment to this, but I will do that in time)
-
Friday, October 05, 2012 1:18 PM
Hi,
Lee's answer makes sense to me, though I haven't done an analysis of your code myself. For one thing, I don't think it will compile as is. You know, it's a courtesy to post a short but complete program so that it's easy for us to repro without wasting any time fixing the holes in your code. Furthermore, when asking about performance benchmarking, it's absolutely necessary to post all of the pieces involved.
> I have a question comparing Reactive Extensions with Synchronous methods
Rx is primarily about composing queries and controlling concurrency. It's less about introducing concurrency, though of course it supports that as well.
IObservable<T> can be synchronous too. So either you want to measure the synchronous throughput of a particular Rx component, such as Subject<T>, or you want to measure the parallel throughput of different schedulers, perhaps. But it doesn't make sense to compare the total throughput of many synchronous iterators vs. many observables that wrap task-based concurrency wrapping synchronous iterators.
Furthermore, consider averaging the throughput of each iteration instead of summing them. Also run several iterations prior to the actual benchmarking and throw the results away to eliminate environmental factors such as processor caching. And there's even more to consider than just this.
http://blogs.msdn.com/b/ricom/archive/2010/04/07/variability-in-benchmarks.aspx
http://blogs.msdn.com/b/ricom/archive/2010/04/26/a-few-words-about-micro-benchmarks.aspx- Dave
- Marked As Answer by BrechtVsk Monday, October 08, 2012 7:00 AM
-
Monday, October 08, 2012 6:45 AM
Hi Dave,
Thanks for the reply. I took a quick look at the 2 links you posted, and from what I read I can make out that what I'm doing is not representative for the performance. I'll need to benchmark greater parts of my application.
Thanks for the help all in clarifying these matters.
(PS next posts, I'll give complete examples. Sorry for the troubles)
Sincerely, Brecht

