none
Странности foreach при работе с LimitedConcurrencyLevelTaskScheduler RRS feed

  • Вопрос

  • Здравствуйте,
    Тут вот какая проблема. Есть консольное приложение для обработки данных.
    Часть важных данных, порядок следования которых критичен, обрабатывается в отдельном потоке
    через LimitedConcurrencyLevelTaskScheduler (ссылка https://msdn.microsoft.com/en-us/library/ee789351%28v=vs.100%29.aspx)

            private static readonly TaskScheduler key1ts =
                new LimitedConcurrencyLevelTaskScheduler(1);

            private static void NewData(string data)
            {
                Data d = new Data();
                d.line = data;

                if (data.StartsWith("key1"))
                {
                    Task.Factory.StartNew(
                        GetKey1Handler,
                        d,
                        CancellationToken.None,
                        TaskCreationOptions.None,
                        key1ts);
                    return;
                }
        }

    С другой стороны есть несколько алгоритмов обработки этих данных (экземпляров класса Algo)
    Управление в эти алгоритмы передается так

            private static void GetKey1Handler(object o)
            {
                Data d = (Data)o;

                tD.par1 = d.line;
                tD.par2 = d.line;

                int ii = 0;

                for (int i = 0; i < 10; i++)
                {
                    ii++;
                    foreach (Algo a in aList)
                    {
                        a.GetHandler();
                    }
                    sw.WriteLine(ii);
                    sw.Flush();
                }
            }

    Очень странно, но при некоторых неблагоприятных условиях (долгая обработка в a.GetHandler) в файл sw попадют не все значения ii.
    Я это понимаю так, что новая порция данных сбивает обработку, а сама  куда-то исчезает.
    Все мои попытки воспроизвести ситуацию на упрощенном примере ни к чему не приводят: в файле sw последовательное перечисление индекса.
    Но боевой вариант точно где-то сбоит: в логе обрыв на полуслове обработки в отдельном хэндлере, а в файле не все значения ii.
    Есть какие-то мысли почему так происходит?

                             
    • Изменено abb269 30 апреля 2016 г. 14:22
    30 апреля 2016 г. 14:08

Ответы

  • Ага, понял.

    Алгоритм действительно рабочий, хотя и непривычный. Соорудил пример - работает корректно. Конкуррентного доступа к файлу не должно быть.

    Возможны ли исключения где-то в коде? Можно их отловить так:

    Task.Factory.StartNew(GetKey1Handler, d, CancellationToken.None, TaskCreationOptions.None, key1ts)
        .ContinueWith(t => Console.WriteLine("Error: " + t.Exception), TaskContinuationOptions.OnlyOnFaulted);

    Продолжение с параметром OnlyOnFaulted сработает именно в случае выброса исключения. Вывод в консоль заменить на подходящее логирование.

    30 апреля 2016 г. 17:36

Все ответы

  • Что такое sw? Если это StreamWriter, то читайте документацию: Потокобезопасность. Немудрено, что данные теряются при многопоточном доступе. Нужно лочить каждую запись.

    -----

    Однако, сразу не разглядел: у вас задано значение 1 в качестве максимальной степени параллелизации. В боевом коде именно единица?

    Но тогда зачем всё это? Запустите одну единственную таску вызовом Task.Run и всё. Гуру асинхронщины дотнета: Тауб и Клири - шибко ругают метод Task.Factory.StartNew, - мол, слишком он мудрёный, его сложно использовать правильно.

    И вообще, зачем вам этот LimitedConcurrencyLevelTaskScheduler? При запуске одной единственной задачи он бессмысленен. Его следует передавать в циклы типа Parallel.For, Parallel.ForEach. Но и там степень параллелизации можно задавать с помощью ParallelOptions.

    -----

    Возвращаясь к проблеме. Метод NewData вызывается один раз? Или в несколько потоков? Потому что только так можно объяснить исчезновение части данных.

    30 апреля 2016 г. 14:59
  • Спасибо за ответ

    Да, sw = new StreamWriter. Но в этом обработчике, если я правильно читаю документацию о шедулере(1), только один поток.

    Вот как я понимаю ситуацию. Есть данные, разной структуры, которые надо быстро обработать. С Key1 - важная и объемная информация, обработку которой я вынес в отдельный поток. Но очень важна последовательность прихода данных - отсюда ограничение параллелизма. Грубо говоря, надо организовать 2 параллельных канала, но с соблюдением порядка прихода данных.

    Метод NewData вызывается каждый раз при приходе новых данных, вызывается в основном потоке, разделение по потокам происходит в этом методе по ключам, причем новые данные "ждут" пока обработаются старые.



    • Изменено abb269 30 апреля 2016 г. 16:57
    30 апреля 2016 г. 16:52
  • Ага, понял.

    Алгоритм действительно рабочий, хотя и непривычный. Соорудил пример - работает корректно. Конкуррентного доступа к файлу не должно быть.

    Возможны ли исключения где-то в коде? Можно их отловить так:

    Task.Factory.StartNew(GetKey1Handler, d, CancellationToken.None, TaskCreationOptions.None, key1ts)
        .ContinueWith(t => Console.WriteLine("Error: " + t.Exception), TaskContinuationOptions.OnlyOnFaulted);

    Продолжение с параметром OnlyOnFaulted сработает именно в случае выброса исключения. Вывод в консоль заменить на подходящее логирование.

    30 апреля 2016 г. 17:36
  • Petalvik, большое Вам спасибо за внимание и ответы. После праздников буду пробовать.

    Еще думаю переписать все под другую логику обработки. А в этом варианте хочу распараллелить обработку в классах Algo - то, что в я описал как a.GetHandler() в цикле foreach (Algo a in aList). Как на Ваш взгляд это сделать аккуратно?

     Можно ли просто Parallel.ForEach(aList, a => { a.GetHandler(); });
                                   



    • Изменено abb269 30 апреля 2016 г. 18:53
    30 апреля 2016 г. 18:24