none
Completing cyclic graphs

    Вопрос

  • This has been partially address in another thread on complex graphs, but I want to bring up the topic of figuring out how to complete a cyclic graph.

    I've written a simple simulator program that pretends to download a web site by traversing the links using a TransformBlock (for a page) and a TransformManyBlock (to produce the links). 

    The difficulty was finding a way to know when all the pages had been downloaded and signal the blocks for completion (since I can't know ahead of time how many pages are in a site). However, I couldn't use the Complete method within the block itself and had to resort to using a cancellation token. It works, but has serious code smell. Is there any easier/better way?

     

    const int MAX_PAGES = 10;
            const int NUMBER_LINKS = 5;
    
            static void Main(string[] args)
            {
                ConcurrentDictionary<int, Page> pagesHarvested = new ConcurrentDictionary<int, Page>();
                ConcurrentDictionary<int, Page> pagesLinked = new ConcurrentDictionary<int, Page>();
                CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
    
                TransformBlock<int, Page> downloaderTB = new TransformBlock<int, Page>(i =>
                {
                    if (!pagesHarvested.ContainsKey(i))
                    {
                        // Download the page
                        Page p = DownloadPage(i);
                        pagesHarvested.GetOrAdd(i, p);
                        // check to see if we're done
                        if (pagesHarvested.Keys.SequenceEqual(pagesLinked.Keys))
                        {
                            cancellationTokenSource.Cancel();
                        }
                        return p;
                    }
                    else
                    {
                        Console.WriteLine("Already found page " + i);
                        return null;
                    }
                },
                new ExecutionDataflowBlockOptions
                {
                    CancellationToken = cancellationTokenSource.Token
                });
    
                TransformManyBlock<Page, int> linkParserTMB = new TransformManyBlock<Page, int>(page =>
                {
                    List<int> links;
                    if (page == null)
                    {
                        links = new List<int>();
                    }
                    else
                    {
                        links = ParseLinks(page);
                        foreach (var l in links)
                        {
                            pagesLinked.GetOrAdd(l, new Page() { Id = l });
                        }
                    }
                    return links;
                },
                new ExecutionDataflowBlockOptions
                {
                    CancellationToken = cancellationTokenSource.Token
                });
                downloaderTB.LinkTo(linkParserTMB);
                downloaderTB.Completion.ContinueWith(delegate { linkParserTMB.Complete(); });
                linkParserTMB.LinkTo(downloaderTB);
                try
                {
                    downloaderTB.Post(1);
                    downloaderTB.Completion.Wait(cancellationTokenSource.Token);
                }
                catch (OperationCanceledException oce)
                {
                    try
                    {
                        // we're done - set the completion flag
                        downloaderTB.Complete();
                        downloaderTB.Completion.Wait();
                    }
                    catch (Exception ex)
                    { 
                        // pee-yew 
                    }
                }
    
                Console.WriteLine("\n\n\n\nTotal pages downloaded: " + pagesHarvested.Count);
                var pages = pagesHarvested.OrderBy(p => p.Key);
                foreach (var p in pages)
                    Console.WriteLine("Page: " + p.Value.Id);
                Console.ReadLine();
            }
    
            public static Page DownloadPage(int url)
            {
                // pretend to download the page
                Thread.Sleep(100);
                Page p = new Page() { Id = url };
                Console.WriteLine("Downloaded page " + url);
                return p;
            }
    
            public static List<int> ParseLinks(Page page)
            {
                List<int> links = new List<int>();
                // make up some links for the page
                Random rand = new Random(DateTime.Now.Millisecond);
                for (int i = 0; i < NUMBER_LINKS; i++)
                {
                    int link = rand.Next(MAX_PAGES) + 1;
                    links.Add(link);
                    Console.WriteLine("Adding link " + link);
                }
    
                return links;
            }
    


    26 сентября 2011 г. 19:11

Ответы

  • Hi. The key factor in your scenario is that you have a closed system, i.e. messages keep piling up among the blocks and never go away.

     

    You can link completion among blocks, and no block will complete more than once. However, there will be one block that will not complete. The reason why a block won’t complete is that that block has output messages while there is no target linked to it that will accept them, because they have all completed.

     

    To make this work, there are two options:

    1)   When the time to complete comes, do not produce an output message. You do this by throwing OperationCanceledException from the transformation callback. This is the preferred option – it will keep your network clean from “garbage” messages.

    2)   If messages are already in the system, you have to manually drain any output messages from the block that would get stuck with them.

     

    See this example:

     

                TransformBlock<int, int> ping = null;

                TransformBlock<int, int> pong = null;

     

                ping = new TransformBlock<int, int>(value =>

                    {

                        Console.WriteLine("PING value={0}", value);

                        if (value == 3)

                        {

                            Console.WriteLine("ping trigger completion");

                            ping.Complete();

                           

                            // Option 1:

                            // Signal a cooperative cancellation.

                            throw new OperationCanceledException();

                        }

                        return value;

                    });

     

                pong = new TransformBlock<int, int>(value =>

                {

                    Console.WriteLine("PONG value={0}", value);

                    return value + 1;

                });

     

                ping.LinkTo(pong);

                pong.LinkTo(ping);

     

                ping.Completion.ContinueWith(_ignored =>

                    {

                        Console.WriteLine("PING completed");

                        pong.Complete();

     

                        // Option 2:

                        // Help pong get rid of its output messages.

                        // Since ping is no longer accepting, we have to drain them manually.

                        pong.LinkTo(new ActionBlock<int>(value =>

                            {

                                Console.WriteLine("PONG: Drained value={0}", value);

                            }));

                    });

     

                pong.Completion.ContinueWith(_ignored =>

                    {

                        Console.WriteLine("PONG completed");

                    });

     

     

                ping.Post(0);

     

                Task.WaitAll(ping.Completion, pong.Completion);

                Thread.Sleep(2000);

     

     

    In your scenario you can complete downloaderTB and throw OperationCanceledException where you currently cancel the cancellation token.

     

    Zlatko Michailov

    Software Development Engineer, Parallel Computing Platform

    Microsoft Corp.


    This posting is provided "AS IS" with no warranties, and confers no rights.
    27 сентября 2011 г. 17:33

Все ответы

  • Edit: Asked a related question, but answered it myself. 

     

     

     


    • Изменено DanBailiff 26 сентября 2011 г. 20:24
    26 сентября 2011 г. 20:08
  • Hi. The key factor in your scenario is that you have a closed system, i.e. messages keep piling up among the blocks and never go away.

     

    You can link completion among blocks, and no block will complete more than once. However, there will be one block that will not complete. The reason why a block won’t complete is that that block has output messages while there is no target linked to it that will accept them, because they have all completed.

     

    To make this work, there are two options:

    1)   When the time to complete comes, do not produce an output message. You do this by throwing OperationCanceledException from the transformation callback. This is the preferred option – it will keep your network clean from “garbage” messages.

    2)   If messages are already in the system, you have to manually drain any output messages from the block that would get stuck with them.

     

    See this example:

     

                TransformBlock<int, int> ping = null;

                TransformBlock<int, int> pong = null;

     

                ping = new TransformBlock<int, int>(value =>

                    {

                        Console.WriteLine("PING value={0}", value);

                        if (value == 3)

                        {

                            Console.WriteLine("ping trigger completion");

                            ping.Complete();

                           

                            // Option 1:

                            // Signal a cooperative cancellation.

                            throw new OperationCanceledException();

                        }

                        return value;

                    });

     

                pong = new TransformBlock<int, int>(value =>

                {

                    Console.WriteLine("PONG value={0}", value);

                    return value + 1;

                });

     

                ping.LinkTo(pong);

                pong.LinkTo(ping);

     

                ping.Completion.ContinueWith(_ignored =>

                    {

                        Console.WriteLine("PING completed");

                        pong.Complete();

     

                        // Option 2:

                        // Help pong get rid of its output messages.

                        // Since ping is no longer accepting, we have to drain them manually.

                        pong.LinkTo(new ActionBlock<int>(value =>

                            {

                                Console.WriteLine("PONG: Drained value={0}", value);

                            }));

                    });

     

                pong.Completion.ContinueWith(_ignored =>

                    {

                        Console.WriteLine("PONG completed");

                    });

     

     

                ping.Post(0);

     

                Task.WaitAll(ping.Completion, pong.Completion);

                Thread.Sleep(2000);

     

     

    In your scenario you can complete downloaderTB and throw OperationCanceledException where you currently cancel the cancellation token.

     

    Zlatko Michailov

    Software Development Engineer, Parallel Computing Platform

    Microsoft Corp.


    This posting is provided "AS IS" with no warranties, and confers no rights.
    27 сентября 2011 г. 17:33