Throttling a block until the next one is available.
-
Thursday, September 27, 2012 5:31 PM
Hi everyone,
I'm trying to put together an application that uses tcp connections to transmit very short messages to a large number of clients.
I wanted to create a connection inside a block and pool it for re-use. The problem i'm having now is that the connections sometimes time out.
So far I've traced it back to the block that creates or gets a connection from the pool: it gets placed in the output buffer without knowing that the processingblock has available threads.This is a simplified version of what i'm trying to achieve:
BufferBlock<String> buffer = new BufferBlock<String>(); TransformBlock<String, String> transform = new TransformBlock<String, String>((s) => { // Grab connection return String.Format("{0} started at {1}", s, DateTime.Now.ToLongTimeString()); }, new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 }); ActionBlock<String> action = new ActionBlock<String>((s) => { Console.WriteLine("{0} processed at {1}, Transform OutputBuffer={2}", s, DateTime.Now.ToLongTimeString(), transform.OutputCount); Thread.Sleep(2000); }, new ExecutionDataflowBlockOptions() { BoundedCapacity = 2, MaxDegreeOfParallelism = 2 }); buffer.LinkTo(transform); transform.LinkTo(action); buffer.Post("test1"); buffer.Post("test2"); buffer.Post("test3"); Console.WriteLine("Press any key to exit."); Console.ReadKey();
The output is as follows when I choose a maximum concurrency of 2:
test1 started at 6:49:55 PM processed at 6:49:55 PM, Transform OutputBuffer=1
test2 started at 6:49:55 PM processed at 6:49:55 PM, Transform OutputBuffer=1
test3 started at 6:49:55 PM processed at 6:49:57 PM, Transform OutputBuffer=0In reality, the delay can be much greater and my connection will have timed out by then. I've managed to move the 3rd record from the actionblocks inputbuffer to the transformblocks outputbuffer by setting the BoundedCapacity to the same number as the maximum concurrency but the result stays the same. Is there any way to get that transformblock to only perform its function once the actionblock is free?
Thanks for your help,
Rob
All Replies
-
Friday, September 28, 2012 1:58 PM
Hello, Rob -
I think the general idea of a dataflow mesh is to push data through as quickly as possible. There are a few special exceptions made to enable scenarios like non-greedy joins and throttling, but it just isn't designed to "pushback" a "pause command".
In your case, you're moving not just data but also some kind of time-sensitive resource through the mesh. So my first question would be: is there any way to move the Connection Acquisition to where you're going to use it? That is, merge the TransformBlock logic into the ActionBlock logic.
-Steve
Programming blog: http://nitoprograms.blogspot.com/
Including my TCP/IP .NET Sockets FAQ
and How to Implement IDisposable and Finalizers: 3 Easy Rules
Microsoft Certified Professional Developer
How to get to Heaven according to the Bible- Proposed As Answer by Stephen Toub - MSFTMicrosoft Employee, Owner Saturday, September 29, 2012 2:19 AM
- Marked As Answer by lasagna Saturday, September 29, 2012 7:52 PM
-
Friday, September 28, 2012 3:17 PM
Hi Steve,
My real application will contain multiple buffer-transportblock combinations (1 per client machine) linked to that single actionblock. There are concurrency/speed limits that we have to adhere to when transmitting the data records. If I put the connection logic inside the actionblock, there's a possibility that I'm currently not allowed to send that specific data record while another client may be ready to receive.
I suppose I could check for the readiness inside the actionblock and put that record in a "not ready" state that will put it back into the buffer, creating a loop. It feels like a bad solution performance wise though. There are potentially hundreds of records that will be flushed through the actionblock this way.
By the way, I really liked your IDisposable article. I'm definitely sending the link to a few colleagues!
Kind regards,
Rob
-
Saturday, September 29, 2012 9:27 AM
If you want to throttle connections like this, the object that you send doesn't have to be an open connection. It could be just a token that you use for the throttling and the connection will be actually opened in the action block, just before it's needed.- Marked As Answer by lasagna Saturday, September 29, 2012 7:52 PM
-
Tuesday, October 02, 2012 3:58 PM
Hey Guys,
I've done some work trying to get the connections going in the actionblock itself and decided to investigate another route. So I created the block I needed: a transformblock that simply puts the item back into its inputbuffer if the message wasnt accepted by the target.
I used Zlatko Michailov's guide to implementing custom TPL dataflow blocks as a start. So far I haven't worked on multiple targets or unlinking but I still wanted to check if any of you guys have thoughts or advice on it. Here's the code:
public class LazyTransformBlockLink : IDisposable { public void Dispose() { throw new NotImplementedException(); } } public class LazyTransformBlock<TInput, TOutput> : ISourceBlock<TOutput>, ITargetBlock<TInput> { private ITargetBlock<TOutput> target = null; private readonly Func<TInput, TOutput> Transform = null; private readonly Object pushLock = new Object(); private Task pushTask = null; private readonly ConcurrentQueue<Tuple<Int64, TInput>> buffer = new ConcurrentQueue<Tuple<Int64, TInput>>(); public LazyTransformBlock(Func<TInput, TOutput> transformation) { this.Transform = transformation; } private void StartPushing() { lock (this.pushLock) { if ((this.pushTask == null || this.pushTask.IsCompleted) && target != null) { this.pushTask = Task.Factory.StartNew(() => this.Push()); } } } private void Push() { Tuple<Int64, TInput> pair; while (this.buffer.TryDequeue(out pair)) { TOutput transformed = this.Transform(pair.Item2); if (target.OfferMessage(new DataflowMessageHeader(pair.Item1), transformed, this, false) != DataflowMessageStatus.Accepted) { this.buffer.Enqueue(pair); return; } } } #region ISourceBlock public TOutput ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target, out Boolean messageConsumed) { Tuple<Int64, TInput> pair; if (messageConsumed = this.buffer.TryDequeue(out pair)) { this.StartPushing(); return this.Transform(pair.Item2); } else { return default(TOutput); } } public IDisposable LinkTo(ITargetBlock<TOutput> target, DataflowLinkOptions linkOptions) { this.target = target; return new LazyTransformBlockLink(); } public void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target) { throw new NotImplementedException(); } public bool ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target) { throw new NotImplementedException(); } public void Complete() { throw new NotImplementedException(); } public Task Completion { get { throw new NotImplementedException(); } } public void Fault(Exception exception) { throw new NotImplementedException(); } #endregion #region ITargetBlock public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, TInput messageValue, ISourceBlock<TInput> source, bool consumeToAccept) { if (messageValue == null) { if (consumeToAccept == false) { throw new ArgumentNullException("messageValue"); } else { Boolean consumed = false; messageValue = source.ConsumeMessage(messageHeader, this, out consumed); if (!consumed) { throw new InvalidOperationException("Failed to consume"); } } } this.buffer.Enqueue(new Tuple<Int64, TInput>(messageHeader.Id, messageValue)); this.StartPushing(); return DataflowMessageStatus.Accepted; } #endregion }Thanks for your help!
Rob

