Limiting concurrency based on inputdata

Traitée Limiting concurrency based on inputdata

  • Wednesday, August 08, 2012 11:29 AM
     
     

    Hello everyone,

    I've been checking out TPL Dataflow and I can't seem to figure out if its a good solution for the following problem:

    My application receives work items that are all linked to a certain user. I was planning on putting all those work items in a BufferBlock<WorkItem> and linking them to an ActionBlock<WorkItem> for processing. The problem is: I can only process on 1 thread per user of the work item, since it requires a session with a server that won't allow more than 1 at a time.

    The work processing time ranges from 1 to 60 seconds and we have enough work and users to load up about 500 threads for an hour or two. So far, I only see two options to this problem. I was hoping someone with more experience could comment on them or offer a better suggestion:

    Option 1:
    Create ActionBlocks per user at runtime and use a predicate in the .LinkTo method. Because I not have a finite number of users, managing these ActionBlocks could prove to be quite difficult.

    Option 2:
    Create a custom BufferBlock that will hold back a workitem if one with the same user is currently being processed. This approach feels like it would require a lot of syncing between the active tasks and the block. 

    If anyone has any advice or suggestions I would really appreciate it!

    Rob

All Replies

  • Wednesday, August 08, 2012 12:47 PM
     
     Answered Has Code

    I don't think your second option would work: the BufferBlock would have no way to know when an item finished processing, so it can send another.

    But something like option 1 should work fine. Although I wouldn't use LinkTo() with a predicate, because I think it means you need one more block, which would make your code more complicated than necessary. Instead, I would use a Dictionary of blocks:

    static ITargetBlock<WorkItem> CreateSplitter()
    {
    	var userBlocks = new Dictionary<User, ITargetBlock<WorkItem>>();
    	var splitterBlock = new ActionBlock<WorkItem>(
    		workItem =>
    		{
    			ITargetBlock<WorkItem> userBlock;
    			if (!userBlocks.TryGetValue(workItem.User, out userBlock))
    			{
    				userBlock = new ActionBlock<WorkItem>(wi => ProcessWorkItem(wi));
    				userBlocks.Add(workItem.User, userBlock);
    			}
    
    			userBlock.Post(workItem);
    		});
    	return splitterBlock;
    }

    This works, because ActionBlock is not parallel by default, but if you have more of them, each runs separately.

    If you also wanted to limit the total degree of concurrency, you could use a single limited concurrency task scheduler on all of the user blocks. Although that wouldn't work if the processing for each item was asynchronous (which, based on the times and number of threads you want to use, it probably should). In that case, you could instead use asynchronous semaphore.

    Also, this code has a small memory leak: the block for each user is never released. This shouldn't be a problem if the application doesn't run for too long or if you don't have many users. But if you do, you will need to add additional logic to remove the block if there are no work items for some user for now.

    • Edited by svick Wednesday, August 08, 2012 12:51 PM
    • Marked As Answer by lasagna Wednesday, August 08, 2012 4:49 PM
    •  
  • Wednesday, August 08, 2012 4:49 PM
     
      Has Code

    Thank you for your suggestions, Svick!

    I've made a tiny POC using your splitter idea and it works perfectly. Unfortunately this application will become a service that will (hopefully) keep running for days. Considering the fact that there are a huge amount of distinct users (they're from a CRM database) they should be cleaned up asap. This is what I'm doing right now:

                var userBlocks = new Dictionary<String, ActionBlock<WorkItem>>();
    
                var splitterBlock = new ActionBlock<WorkItem>(
                    workItem =>
                    {
                        ActionBlock<WorkItem> userBlock;
    
                        if (!userBlocks.TryGetValue(workItem.UserName, out userBlock))
                        {
                            userBlock = new ActionBlock<WorkItem>(async wi =>
                            {
                                Console.WriteLine("{1} Processing for {0}...", wi.UserName, DateTime.Now.ToLongTimeString());
                                await Task.Delay(2000);
                                if (userBlock.InputCount == 0)
                                {
                                    userBlocks.Remove(wi.UserName);
                                }
                            }, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 });
                            userBlocks.Add(workItem.UserName, userBlock);
                        }
    
                        userBlock.Post(workItem);
                    });
    

    It seems to work perfectly but isn't there a cleaner/safer way of doing this though? I initially wanted to inherit from ActionBlock and see if I could override a task being started or something similar, but it's sealed so that's a no go. The Completion task seems like a good place, but I can't figure out when to call it exactly. Am I missing something here?

  • Wednesday, August 08, 2012 5:41 PM
     
     Answered Has Code

    I don't think using Completion is the way to go here. To use that, you need to fist call Complete() when you know there won't be any more items coming to that block. And you don't know that.

    And I don't think you're missing anything, something like the code you wrote is probably the best way. Except that you need to make it thread-safe. Your code has two problems:

    1. Dictionary is not thread-safe. If one thread calls Remove() and another thread calls Add() (or two threads call Remove()) at the same time, it could get corrupted. You need to use locking or ConcurrentDictionary, which is thread-safe.
    2. The check for InputCount and the removal is not atomic. So, this sequence of events could happen:
      1. Thread A checks InputCount and sees that it's 0.
      2. Thread B retrieves the block from the dictionary and posts new item to it.
      3. Thread A removes the block from the dictionary.
      So, now you have block that's executing, but isn't in the dictionary. And if another work item for the same user comes now, a new block will be created, which means the two would run in parallel, which is not what you want.
      To fix this, you should use locking.

    So, to fix the code, add locks around the sensitive parts:

    lock (userBlocks)
    {
        if (!userBlock.TryGetValue(out userBlock))
        {
            userBlock = new ActionBlock<WorkItem>(async wi =>
            {
                Console.WriteLine("{1} Processing for {0}...", wi.UserName, DateTime.Now.ToLongTimeString());
                await Task.Delay(2000);
    
                lock (userBlocks)
                {
                    if (userBlock.InputCount == 0)
                    {
                        userBlocks.Remove(wi.UserName);
                    }
                }
            }, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 });
            userBlocks.Add(workItem.UserName, userBlock);
        }
    
        userBlock.Post(workItem);
    }
    • Edited by svick Wednesday, August 08, 2012 5:42 PM
    • Marked As Answer by lasagna Thursday, August 09, 2012 12:59 PM
    •  
  • Thursday, August 09, 2012 12:59 PM
     
      Has Code

    Thank you for all your help Svick! If you're interested this is what I'll be using for my POC:

    public class ExclusiveConstraintActionBlock<TInput, TKey> : ITargetBlock<TInput>
    {
        private IDictionary<TKey, ActionBlock<TInput>> blocks = new Dictionary<TKey, ActionBlock<TInput>>();
    
        private ActionBlock<TInput> splitterBlock = null;
    
        public ExclusiveConstraintActionBlock(Func<TInput, Task> action, Func<TInput, TKey> distinctionSelector)
        {
            this.splitterBlock = new ActionBlock<TInput>(
                item =>
                {
                    ActionBlock<TInput> block;
                    TKey key = distinctionSelector(item);
                    lock (this.blocks)
                    {
                        if (!blocks.TryGetValue(key, out block))
                        {
                            block = new ActionBlock<TInput>(async a =>
                                {
                                    await action(a);
                                    lock (this.blocks)
                                    {
                                        if (block.InputCount == 0)
                                        {
                                            this.blocks.Remove(key);
                                        }
                                    }
                                }, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 });
                            this.blocks.Add(key, block);
                        }
                        block.Post(item);
                    }
                });
        }
    
        public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, TInput messageValue, ISourceBlock<TInput> source, bool consumeToAccept)
        {
            return ((ITargetBlock<TInput>)this.splitterBlock).OfferMessage(messageHeader, messageValue, source, consumeToAccept);
        }
    
        public void Complete()
        {
            this.splitterBlock.Complete();
        }
    
        public Task Completion
        {
            get { return this.splitterBlock.Completion; }
        }
    
        public void Fault(Exception exception)
        {
            ((ITargetBlock<TInput>)this.splitterBlock).Fault(exception);
        }
    }

    Is there a specific reason why some of the ITargetBlock<T>'s functions are being hidden and the why the blocks are sealed? It would be really nice to inherit from them.

  • Thursday, August 09, 2012 1:18 PM
     
     

    I believe the reason some of the methods are hidden (they are explicit interface implementations) is because they are not supposed to be used in “normal” user code. Most of the time, if you use ActionBlock, you shouldn't use OfferMessage(), with its low-level details like the message headers, you would instead use Post(). Of course, this is not the situation you're in if you want to create your own block by wrapping another, you do need to use OfferMessage() then.

    I think the simplest solution to avoid the casts is to change the type of the splitterBlock field to ITargetBlock<TInput>.

    And creating your own implementation of ITargetBlock is certainly a valid option here, but I don't see any advantage it gives you when compared with a static method that returns the sppliterBlock (like in my first answer). Because I think a method works just as well and is simpler to write, I would personally prefer that.

    And I'm not sure why the blocks are sealed, but I guess the developers didn't see much use in that, when there aren't any methods to override. And creating a method that returns the block you want (like I suggested) or a custom implementation that wraps some other block (like you did) are both simple workarounds.

  • Saturday, August 11, 2012 6:40 PM
     
     

    3rd option, is to use it combine with Rx.

    you can ToObservable to port the ISourceBlock into IObservable then you can use the Group By operator 
    in order to split it into observable per user.

    subscribe to the group and attach an ActionBlock to each of the grouped stream by using ToObserver upon the ActionBlock.


    Bnaya Eshet