none
синхронное выполнение асинхронных потоков RRS feed

  • Вопрос

  • Здравствуйте,

    Есть задача получения максимального быстродействия на с# при обработке данных. Данные представляют собой строки вида

    ключ-параметры

    Обрабатывать данные одного ключа нужно синхронно.

    Собственно, вопросов два
    1. Как организовать синхронную обработку внутри асинхронной
    2. Как все сделать с максимальным быстродействием
    Заранее спасибо


    • Изменено abb269 29 сентября 2013 г. 7:35
    29 сентября 2013 г. 6:58

Ответы

  • Могу предложить такой вариант:

        class Program
        {
            private static readonly TaskScheduler importantTaskScheduler = new LimitedConcurrencyLevelTaskScheduler(1);
            private static readonly TaskScheduler otherTaskScheduler = new LimitedConcurrencyLevelTaskScheduler(1);
    
            static void Main(string[] args)
            {
                var items = new[]
                            {
                                new KeyValuePair<String, Object>("1", "Value 1.1"),
                                new KeyValuePair<String, Object>("2", "Value 2.1"),
                                new KeyValuePair<String, Object>("ImportantKey", "Value 3.1"),
                                new KeyValuePair<String, Object>("1", "Value 1.2"),
                                new KeyValuePair<String, Object>("2", "Value 2.2"),
                                new KeyValuePair<String, Object>("ImportantKey", "Value 3.2"),
                                new KeyValuePair<String, Object>("1", "Value 1.3"),
                                new KeyValuePair<String, Object>("2", "Value 2.3"),
                                new KeyValuePair<String, Object>("ImportantKey", "Value 3.3")
                            };
    
                foreach (var keyValuePair in items)
                {
                    Process(keyValuePair);
                }
    
                Console.ReadKey();
            }
    
            private static void Process(KeyValuePair<String, Object> pair)
            {
                if (pair.Key.Equals("ImportantKey"))
                {
                    Task.Factory.StartNew(ImportantHandler, pair.Value, CancellationToken.None, TaskCreationOptions.None,
                        importantTaskScheduler);
                }
                else
                {
                    Task.Factory.StartNew(OtherHandler, pair.Value, CancellationToken.None, TaskCreationOptions.None,
                        otherTaskScheduler);
                }
            }
    
            private static void ImportantHandler(Object value)
            {
                // Обработка важных ключей
                // Типа долго
                Thread.Sleep(5000);
                Console.WriteLine(value);
            }
    
            private static void OtherHandler(Object value)
            {
                // Обработка остальных ключей
                Console.WriteLine(value);
            }
        }
    

    Что здесь происходит?

    Для важных ключей создается свой диспетчер задач, а для не важных соответственно свой (при этом задачи в каждом диспетчере будут выполняться последовательно, благодаря классу LimitedConcurrencyLevelTaskScheduler с параметром 1).

    Далее метод Process принимает на обработку очередную порцию данных, если это важный ключ, запускает выполнения метода ImportantHandler с указанным диспетчером задач. Если ключ не важный, то запускается OtherHandler.

    В выводе программы, можно заметить (специально поставил Sleep), что обработка важных ключей выполняется последовательно.

    Данный вариант можно расширить и для каждого ключа создавать свой диспетчер, с помощью которого и запускать обработку этих ключей.

    • Помечено в качестве ответа abb269 1 октября 2013 г. 5:49
    30 сентября 2013 г. 14:46

Все ответы

  • Как организовать синхронную обработку внутри асинхронной

    Если я правильно понял Ваш вопрос, то это можно сделать с помощью Parallel.ForEach.

    Если же этот вариант не подходит, то опишите свою задачу более конкретно (приведите к примеру код, который вызывает у Вас трудности).

    29 сентября 2013 г. 18:22
  • Добрый день.

    Если вы используете Task-и и async методы, то можете посмотреть пример здесь. Там как раз асинхронный метод вызывается как синхронный.

    30 сентября 2013 г. 6:42
    Отвечающий
  • Спасибо за ответ.

    Давайте спрошу более конкретно

    Пусть есть такие данные:
    1. key1, object1
    2. key2, object2
    3. key1, object3
    4. key1, object4
    5. key1, object5

    Пытаюсь обработать их так

    switch (key)
    {
    key1:
      Thread t1 = new Thread(Handler1);
      t1.Start(object);
      break;
    key2:
      Thread t2 = new Thread(Handler2);
       t2.Start(object);
       break;
    }
    
    


    Если ключевые поля разные (строки 1 и 2), то все быстро (параллельно) обрабатывается. Но для строк данных 3, 4 и 5 Handler1 может работать одновременно и придется потом долго разбираться с очередностью прихода object3, 4, 5.
    Может есть какие-то более правильные схемы?

    30 сентября 2013 г. 11:41
  • Если у вас есть одинаковые ключи, то Вы можете сначала их сгруппировать по ключу, а затем выполнить обработку каждой группы, к примеру в том же самом Parallel.ForEach:

            static void Main(string[] args)
            {
                var items = new[]
                            {
                                new KeyValuePair<string, string>("1", "Value 1.1"),
                                new KeyValuePair<string, string>("2", "Value 2.1"),
                                new KeyValuePair<string, string>("3", "Value 3.1"),
                                new KeyValuePair<string, string>("1", "Value 1.2"),
                                new KeyValuePair<string, string>("2", "Value 2.2"),
                                new KeyValuePair<string, string>("3", "Value 3.2"),
                                new KeyValuePair<string, string>("1", "Value 1.3"),
                                new KeyValuePair<string, string>("2", "Value 2.3"),
                                new KeyValuePair<string, string>("3", "Value 3.3")
                            };
    
                Parallel.ForEach(items.GroupBy(pair => pair.Key), pairs =>
                                                                  {
                                                                      Console.WriteLine("Group {0}", pairs.Key);
                                                                      foreach (var keyValuePair in pairs)
                                                                      {
                                                                          Console.WriteLine("Group {0}; Item {1}",
                                                                              keyValuePair.Key, keyValuePair.Value);
                                                                      }
                                                                  });
    
                Console.ReadKey();
            }

    Если Вы выполните этот код несколько раз, то можете заметить, что последовательность обработки групп может быть разная, однако элементы в группе всегда будут обрабатываться последовательно.

    30 сентября 2013 г. 12:31
  • Большое спасибо, Кирилл, за оперативный ответ. Но есть нюанс - это онлайн:

    Данные приходят "порциями", каждый квант данных содержит информацию только по одному ключу. Мне просто надо параллельно обрабатывать приходящие данные разных ключей при том, что обработка "внутри ключа" идет последовательно.

    30 сентября 2013 г. 12:41
  • И сколько таких ключей может быть (в смысле сколько разных)?
    30 сентября 2013 г. 13:30
  • Максимум 15, может 20.

    Но в принципе все можно свести и к двум - один важный, другой - не очень. Мне на первых порах хотелось бы выделить некий важный ключ в отдельный поток/задачу, чтобы его обработку не задерживали приходящие другие (менее значимые) данные...

    30 сентября 2013 г. 13:40
  • А насколько объемен алгоритм обработки ?
    Может так получиться, если алгоритм невелик, что никакого параллелизма и не надо.
    Если порции данных приходят извне, это значит,
    что скорость поступления невелика и времени для обработки более чем достаточно.
    У меня как раз такой вариант - более 700 ключей и достаточно плотный трафик - до 1000 сообщений в секунду,
    да и алгоритм не хил.
    Тем не менее все катит без сучка и задоринки.
    Другое дело, что нужно организовать отдельные потоки для -
    а) приема и погружения сообщений в специальную очередь,
    б) обработки этих сообщений по соответствующему алгоритму,
    в) сохранения этих сообщений в файл, если это нужно.

    Поманипулируйте, чтобы сравнить время обработки со скоростью поступления сообщений.

    30 сентября 2013 г. 14:07
  • Могу предложить такой вариант:

        class Program
        {
            private static readonly TaskScheduler importantTaskScheduler = new LimitedConcurrencyLevelTaskScheduler(1);
            private static readonly TaskScheduler otherTaskScheduler = new LimitedConcurrencyLevelTaskScheduler(1);
    
            static void Main(string[] args)
            {
                var items = new[]
                            {
                                new KeyValuePair<String, Object>("1", "Value 1.1"),
                                new KeyValuePair<String, Object>("2", "Value 2.1"),
                                new KeyValuePair<String, Object>("ImportantKey", "Value 3.1"),
                                new KeyValuePair<String, Object>("1", "Value 1.2"),
                                new KeyValuePair<String, Object>("2", "Value 2.2"),
                                new KeyValuePair<String, Object>("ImportantKey", "Value 3.2"),
                                new KeyValuePair<String, Object>("1", "Value 1.3"),
                                new KeyValuePair<String, Object>("2", "Value 2.3"),
                                new KeyValuePair<String, Object>("ImportantKey", "Value 3.3")
                            };
    
                foreach (var keyValuePair in items)
                {
                    Process(keyValuePair);
                }
    
                Console.ReadKey();
            }
    
            private static void Process(KeyValuePair<String, Object> pair)
            {
                if (pair.Key.Equals("ImportantKey"))
                {
                    Task.Factory.StartNew(ImportantHandler, pair.Value, CancellationToken.None, TaskCreationOptions.None,
                        importantTaskScheduler);
                }
                else
                {
                    Task.Factory.StartNew(OtherHandler, pair.Value, CancellationToken.None, TaskCreationOptions.None,
                        otherTaskScheduler);
                }
            }
    
            private static void ImportantHandler(Object value)
            {
                // Обработка важных ключей
                // Типа долго
                Thread.Sleep(5000);
                Console.WriteLine(value);
            }
    
            private static void OtherHandler(Object value)
            {
                // Обработка остальных ключей
                Console.WriteLine(value);
            }
        }
    

    Что здесь происходит?

    Для важных ключей создается свой диспетчер задач, а для не важных соответственно свой (при этом задачи в каждом диспетчере будут выполняться последовательно, благодаря классу LimitedConcurrencyLevelTaskScheduler с параметром 1).

    Далее метод Process принимает на обработку очередную порцию данных, если это важный ключ, запускает выполнения метода ImportantHandler с указанным диспетчером задач. Если ключ не важный, то запускается OtherHandler.

    В выводе программы, можно заметить (специально поставил Sleep), что обработка важных ключей выполняется последовательно.

    Данный вариант можно расширить и для каждого ключа создавать свой диспетчер, с помощью которого и запускать обработку этих ключей.

    • Помечено в качестве ответа abb269 1 октября 2013 г. 5:49
    30 сентября 2013 г. 14:46
  • Большое спасибо, Кирилл. Я посматривал в сторону диспетчеров, теперь попробую реализовать. Это наверно правильнее, чем танцы с бубном вокруг Wait или Join.

    Спасибо еще раз

    1 октября 2013 г. 5:49
  • По крайней мере этот подход рекомендуем MS. Удачи!
    1 октября 2013 г. 6:32
  • Кирилл, уточните, пожалуйста!

    Доступен ли в VS 2012 Express класс LimitedConcurrencyLevelTaskScheduler ?
    Почему-то у меня не катит.

    2 октября 2013 г. 3:52
  • Я так понимаю, что его нужно "организовать" самому, В ссылке Кирилла LimitedConcurrencyLevelTaskScheduler он есть.

    2 октября 2013 г. 6:02
  • Да, можно просто скопировать как класс. А вообще этот и другие диспетчеры (и много чего интересного) есть в Samples for Parallel Programming with the .NET Framework в проекте ParallelExtensionsExtras.
    2 октября 2013 г. 6:20
  • Спасибо! Разобрался.
    2 октября 2013 г. 13:33