none
Работа с потоками, совмещение работы нескольких потоков. RRS feed

  • Вопрос

  • Как:

    1. Сделать так, чтобы делегат выполнялся не в том потоке, из которого производится его вызов, но при этом не каждый раз в разном, а в одном и том же. То есть, к примеру, поток 1 вызыввает делегат. Надо, чтобы делегат выполнялся не в потоке 1, а в новом потоке (2), и чтобы при всех последующих вызовах этого делегата он ТАКЖЕ выполнялся в потоке 2 (а не в каком-нибудь другом). То есть, чтобы при многочисленных вызовах делегата, поток 1 сразу же освобождался, а у потока 2 скапливалось что-то типа очереди, которую он последовательно обрабатывал.

    2. Раскидываю некоторую работу по потокам через:

    ThreadPool.QueueUserWorkItem((st) => handler(this, e)); 
    

    Таким образом, все обработчики запускаются в произвольных потоках из пула. Запуститься может произвольное количество потоков, в зависимости от количества вызовов. Но, после этого, мне нужно остановится, и дождаться, когда все из них завершат свою работу, потому что дальнейший код должен использовать результаты выполнения всех только что запущенных потоков. Как это реализовать?

    13 августа 2011 г. 13:15

Ответы

  • если я правильно понял, то см. continuation http://www.albahari.com/threading/part5.aspx 
    и barrier http://www.albahari.com/threading/part4.aspx#_The_Barrier_Class
    P.S.
    описание TPL - http://msdn.microsoft.com/ru-ru/library/dd460717.aspx
    • Помечено в качестве ответа Qwester33 17 августа 2011 г. 12:16
    13 августа 2011 г. 16:22
  • > это ведь наверняка стандартная проблема кодинга со стандартным решение и кодом.

    см. TPL Dataflow (TDF) http://msdn.microsoft.com/en-us/devlabs/gg585582
    описание с рисунками http://www.microsoft.com/download/en/details.aspx?id=14782

    > новые данные надо "поставить в очередь."

    один поток принимает данные, ставит в очередь.
    другой поток (или несколько) выбирает данные из очереди, определяет какой делегат соотвествует данным, т.е. в какой метод их надо передать. 
    данные и соответствующий делегат сохраняются в dictionary<идентфикатор_метода, локальная_очередь_метода>.
    третий поток отслеживает загрузку методов. как только метод свободен, то из локальной_очереди_метода извлекаются данные, создается новый поток, который вызывает метод.
    метод после завершение работы должен сказать третьему потоку о том, что он свободен.

    > А так как доступа к методам нет, мне нечем инициализировать экземпляр делегата

    почему нет? где находятся методы? как я понимаю, есть сборка (dll или exe). она подключена к проекту. или она загружается в рантайме как плагин?

    • Предложено в качестве ответа Malobukv 17 августа 2011 г. 8:35
    • Помечено в качестве ответа Qwester33 17 августа 2011 г. 11:26
    15 августа 2011 г. 18:25
  • в определении MethodImpl указано Inherited = false - значит его действие не распространяется на override.
    поэтому MethodImpl... надо указать над public override void qwqw
    • Помечено в качестве ответа Qwester33 17 августа 2011 г. 13:08
    17 августа 2011 г. 13:02
  • > при этом надо каждому запускаемому методу поставить атрибут [MethodImpl(MethodImplOptions.Synchronized)]

    метод с этим атрибутом будет выполняться только одним потоком, остальные станут в очередь. но управлять очередью, если не ошибаюсь, невозможно. если в дальнейшем понадобится ограничивать очередь или менять приоритет, то лучше написать свой менеджер потоков.

    P.S.
    можно сделать обертки для методов. для генерации кода оберток см. T4 http://msdn.microsoft.com/en-us/library/bb126445.aspx (Code Generation and T4 Text Templates)

    • Помечено в качестве ответа Qwester33 17 августа 2011 г. 16:19
    17 августа 2011 г. 13:22

Все ответы

  • если я правильно понял, то см. continuation http://www.albahari.com/threading/part5.aspx 
    и barrier http://www.albahari.com/threading/part4.aspx#_The_Barrier_Class
    P.S.
    описание TPL - http://msdn.microsoft.com/ru-ru/library/dd460717.aspx
    • Помечено в качестве ответа Qwester33 17 августа 2011 г. 12:16
    13 августа 2011 г. 16:22
  • У меня вот какя проблема:  есть событие (приход данных с сервера), которое может подниматься с некоторыми аргументами. Для каждого аргумента существует свой метод обработки, который вызывается только для этого аргумента. Одновременно и многопоточно можно выполнять обработчики для событий с разными аргументами. Но, возможна ситуация, что до окончания работы обработчика события с некоторым аргументом, будет поднято события еще раз и с этим же аргументом, и получится, что будет еще раз вызван уже выполняющийся метод. Мне надо, чтобы в этом случае второе поднятие как-бы вставало в очередь, до того, как завершится обработка предыдущего поднятия.

    То есть, мне неважно в каком потоке будет запущен метод, но мне нужна гарантия, что этот метод не будет повторно запущен в другом потоке, во время выполнения в текущем. Видимо, надо как-то привязать метод и поток, в котором он выполняется. Я не знаю наперед, когда будет вызвано следующее событие и с каким аргументом.

    То есть, ContinueWith позволяет присоединить к задаче, но я не могу написать:

    Task task1 = Task.Factory.StartNew (() => Console.Write ("antecedant.."));
    Task task2 = task1.ContinueWith (ant => Console.Write ("..continuation"));
    

    К примеру, пришли данные с некоторым аргументом - запустил task1.

    Далее, Пришли новые данные - я ничего не знаю про прошлые данные и про поток, в котором они обрабатываются. Одновременно task1 и task2 у меня не существуют. Если я просто сделаю новую задачу,  она может оказаться с тем же аргументом, что и прошлая, и произойдет конфликт, так как будет вызов того же метода (соответствующего текущему аргументу), но в новом потоке.

    У меня пока это как-то не укладывается в голове. Мне что, надо делать какой-то свой менеджер потоков, который хранит имя потока, в котором идет выполнение метода с некоторым аргументом?

    14 августа 2011 г. 18:15
  • > возможна ситуация, что до окончания работы обработчика события с некоторым аргументом, будет поднято события еще раз и с этим же аргументом, и получится, что будет еще раз вызван уже выполняющийся метод
    > мне нужна гарантия, что этот метод не будет повторно запущен в другом потоке, во время выполнения в текущем

    тело метода можно поместить в "блок": Monitor.TryEnter и Monitor.Exit

    15 августа 2011 г. 5:21
  • > ... задачу, она может оказаться с тем же аргументом, что и прошлая, и произойдет конфликт, так как будет вызов того же метода (соответствующего текущему аргументу), но в новом потоке.

    на основе пришедших данных можно вычислять hashcode.
    hashcode использовать как ключ в словаре ConcurrentDictionary, а значением будет ссылка на Task
    после завершение задачи: удалить соотвествующую запись из словаря.

    если во время выполнения задачи придут новые данные, то их можно поместить в очередь ConcurrentQueue.

    15 августа 2011 г. 5:31
  • Спасибо. Но это ведь наверняка стандартная проблема кодинга со стандартным решение и кодом. С сервера приходят данные, есть несколько обработчиков, которые надо многопоточно запускать в зависимости от данных. При этом, могут прийти новые данные для одного и того-же обработчика еще до того, как он закончил работу с прошлыми данными. В этом случае, эти новые данные надо "поставить в очередь." Если бы не последнее условие, то можно было бы просто запускать новые потоки каждый раз. Но последнее условие все усложняет.
    Какое решение вижу я (но не могу пока реализовать): допустим, есть четыре метода-обработчика. Я пробую создать четыре экземпляра Task, по одному для каждого метода, чтобы любой метод запускался на выполнение только через свой экземпляр Task.
    Тогда, для запуска соответствующего метода (в зависимости от данных, пришедших с сервера), можно будет просто сделать:
    var task = Task.Factory.StartNew(() => имя задачи для конкретного метода.ContinueWith(делегат, указывающий на этот конкретный метод)
    

    В итоге, будет идти абсолютно безопасная многопоточная обработка. Если задачи для конкретного метода нет, то задача создается и метод выполняется в произвольном потоке. Если задача есть, то есть уже запланирована или выполняется в каком-то потоке, то произойдет прикрепление к окончанию ее выполнения через ContinueWith, и она последовательно выполнится с новыми данными в том-же потоке, в котором выполнялась с прошлыми данными.
    Но я не понимаю, как заранее создать задачу, не имея доступа к методам обработчикам? Обработчики вызываются через делегат с сигнатурой
     
    public delegate void UpdateDelegate(parametr1, parametr1, parametr1);
    
    А так как доступа к методам нет, мне нечем инициализировать экземпляр делегата, и задача заранее не создается.
     Task task = new Task(new UpdateDelegate(????));
    
    Или, может есть более правильный вариант решения описанной в начале сообщения проблемы? Как иначе связать метод и поток, в котором он выполняется?


    15 августа 2011 г. 13:52
  • > это ведь наверняка стандартная проблема кодинга со стандартным решение и кодом.

    см. TPL Dataflow (TDF) http://msdn.microsoft.com/en-us/devlabs/gg585582
    описание с рисунками http://www.microsoft.com/download/en/details.aspx?id=14782

    > новые данные надо "поставить в очередь."

    один поток принимает данные, ставит в очередь.
    другой поток (или несколько) выбирает данные из очереди, определяет какой делегат соотвествует данным, т.е. в какой метод их надо передать. 
    данные и соответствующий делегат сохраняются в dictionary<идентфикатор_метода, локальная_очередь_метода>.
    третий поток отслеживает загрузку методов. как только метод свободен, то из локальной_очереди_метода извлекаются данные, создается новый поток, который вызывает метод.
    метод после завершение работы должен сказать третьему потоку о том, что он свободен.

    > А так как доступа к методам нет, мне нечем инициализировать экземпляр делегата

    почему нет? где находятся методы? как я понимаю, есть сборка (dll или exe). она подключена к проекту. или она загружается в рантайме как плагин?

    • Предложено в качестве ответа Malobukv 17 августа 2011 г. 8:35
    • Помечено в качестве ответа Qwester33 17 августа 2011 г. 11:26
    15 августа 2011 г. 18:25
  • Я вначале не совсем корректно описал задачу по  п.1 - мне надо не обеспечивать выполнение в том же потоке, а гарантировать НЕзапуск этого же метода в другом потоке до окончания выполнения в текущем.
    Я просто пока не оконачательно  разобрался, тему не помечал, а то потом не будет ответов.
    По TPL Dataflow по русски ничего не удалось ни нагуглить, ни в книгах найти.
    Методы у меня в моем-же коде, но они экземплярные, а доступа к этим экземплярам у меня нет (чтобы не нарушать архитектуру приложения).
     
    Ну да, первый поток последовательно публикует события прихода данных, то есть фактически создает очередь. Работу второго потока выполняет первый поток, так как там все очень быстро, но можно и в новом потоке запускать, но думаю, расходы на создание потоков будут слишком велики, по сравнению с выполняемой работой.
    Вроде бы есть решение проблемы. Внутри кода, разбирающего пришедшие события и определяющего нужный для вызова метод, вызов метода делать так, как я уже писал:
    ThreadPool.QueueUserWorkItem((st) =>...)
    

    Но при этом надо каждому запускаемому методу поставить атрибут
    [MethodImpl(MethodImplOptions.Synchronized)]
    

    и пока метод выполняется одним потоком, выполнение этого метода с новыми данными другой поток начать не сможет, и будет стоять в очереди. При этом, будут использоваться преимущества многопоточности, то есть если придут данные, для обработки которых потребуется метод другого экземпляра, который в данный момент свободен (метод), то он запустится в другом потоке. То есть, при таких требованиях к выполнению, "менеджер" методов-потоков (третий поток) не нужен.
    17 августа 2011 г. 12:08
  • Возник такой вопрос:

    Если есть метод:

        [MethodImpl(MethodImplOptions.Synchronized)]
        public abstract void qwqw
    

    который потом

    public override void qwqw
    

    То при выполнении этого переопределенного метода, атрибут, объявленный для базового абстрактного метода, будет действовать, или его надо снова объявить для переопределенного метода?

    17 августа 2011 г. 12:50
  • в определении MethodImpl указано Inherited = false - значит его действие не распространяется на override.
    поэтому MethodImpl... надо указать над public override void qwqw
    • Помечено в качестве ответа Qwester33 17 августа 2011 г. 13:08
    17 августа 2011 г. 13:02
  • > при этом надо каждому запускаемому методу поставить атрибут [MethodImpl(MethodImplOptions.Synchronized)]

    метод с этим атрибутом будет выполняться только одним потоком, остальные станут в очередь. но управлять очередью, если не ошибаюсь, невозможно. если в дальнейшем понадобится ограничивать очередь или менять приоритет, то лучше написать свой менеджер потоков.

    P.S.
    можно сделать обертки для методов. для генерации кода оберток см. T4 http://msdn.microsoft.com/en-us/library/bb126445.aspx (Code Generation and T4 Text Templates)

    • Помечено в качестве ответа Qwester33 17 августа 2011 г. 16:19
    17 августа 2011 г. 13:22