locked
How do I use TPL Dataflow to process Continuation tokens in Storage.Client 2.0? RRS feed

  • Question

  • I would like to process my table data as quickly as its read in.  That means when read a table with 20,000 entries, and the client offers me a continuation token for each 100 entries, I want to start working on the first batch right away.

    I think the right way for me to start is with the TPL Dataflow.  In exploring the TPL I came across the sample below.  How should I modify this sample (if appropriate) to process data as soon as it comes in... specifically with Client 2.0?

    Furthermore if TPL is not appropriate for this, what is a better option?  Perhaps TPL Dataflow?

    For your reference, I will be recursively calling this function in production, where each table entry refers to another table / partition combination.  In a sense I'm using Azure table to chain data together and want to minimize latency created by all the IO requests.

    namespace ProcessTasksAsTheyFinish
    {
        public partial class MainWindow : Window
        {
            // Declare a System.Threading.CancellationTokenSource.
            CancellationTokenSource cts;
    
            public MainWindow()
            {
                InitializeComponent();
            }
    
            private async void startButton_Click(object sender, RoutedEventArgs e)
            {
                resultsTextBox.Clear();
    
                // Instantiate the CancellationTokenSource.
                cts = new CancellationTokenSource();
    
                try
                {
                    // ***Set up the CancellationTokenSource to cancel after 25 seconds.
                    cts.CancelAfter(25000);
    
                    await AccessTheWebAsync(cts.Token);
                    resultsTextBox.Text += "\r\nDownloads complete.";
                }
                catch (OperationCanceledException te)
                {
                    
                    resultsTextBox.Text += "\r\nDownloads canceled.\r\n";
                }
                catch (Exception)
                {
                    resultsTextBox.Text += "\r\nDownloads failed.\r\n";
                }
    
                cts = null;
            }
    
    
            private void cancelButton_Click(object sender, RoutedEventArgs e)
            {
                if (cts != null)
                {
                    cts.Cancel();
                }
            }
    
    
            async Task AccessTheWebAsync(CancellationToken ct)
            {
                HttpClient client = new HttpClient();
    
                // Make a list of web addresses.
                List<string> urlList = SetUpURLList();
    
                // ***Create a query that, when executed, returns a collection of tasks.
                IEnumerable<Task<int>> downloadTasksQuery =
                    from url in urlList select ProcessURL(url, client, ct);
    
                // ***Use ToList to execute the query and start the tasks. 
                List<Task<int>> downloadTasks = downloadTasksQuery.ToList();
    
                // ***Add a loop to process the tasks one at a time until none remain.
                while (downloadTasks.Count > 0)
                {
                        // Identify the first task that completes.
                        Task<int> firstFinishedTask = await Task.WhenAny(downloadTasks);
    
                        // ***Remove the selected task from the list so that you don't
                        // process it more than once.
                        downloadTasks.Remove(firstFinishedTask);
    
                        // Await the completed task.
                        int length = await firstFinishedTask;
                        resultsTextBox.Text += String.Format("\r\nLength of the download:  {0}", length);
                }
            }
    
    
            private List<string> SetUpURLList()
            {
                List<string> urls = new List<string> 
                { 
                    "http://msdn.microsoft.com",
                    "http://msdn.microsoft.com/library/windows/apps/br211380.aspx",
                    "http://msdn.microsoft.com/en-us/library/hh290136.aspx",
                    "http://msdn.microsoft.com/en-us/library/dd470362.aspx",
                    "http://msdn.microsoft.com/en-us/library/aa578028.aspx",
                    "http://msdn.microsoft.com/en-us/library/ms404677.aspx",
                    "http://msdn.microsoft.com/en-us/library/ff730837.aspx"
                };
                return urls;
            }
    
    
            async Task<int> ProcessURL(string url, HttpClient client, CancellationToken ct)
            {
                // GetAsync returns a Task<HttpResponseMessage>. 
                HttpResponseMessage response = await client.GetAsync(url, ct);
                // Retrieve the website contents from the HttpResponseMessage.
                byte[] urlContents = await response.Content.ReadAsByteArrayAsync();
    
                // Thread.Sleep(3000);
               // await Task.Delay(1000, ct);
    
                return urlContents.Length;
            }
        }
    }
    
    // Sample Output:
    
    // Length of the download:  226093
    // Length of the download:  412588
    // Length of the download:  175490
    // Length of the download:  204890
    // Length of the download:  158855
    // Length of the download:  145790
    // Length of the download:  44908
    // Downloads complete.


    • Edited by ChrisLaMont Monday, November 12, 2012 5:36 PM
    Monday, November 5, 2012 6:36 PM

Answers

All replies

  • HI

    Please refer to this thread:

    http://stackoverflow.com/questions/10695367/speeding-up-azure-storage-api-with-net4-and-net-4-5-parallel-extensions

    I'm agree withPanagiotis Kanavos

    The best option is to partition your data so that you can send parallel requests to different partitions. You will have to use a partitioning scheme that does not create "hot" partitions.  While partitioning is important for all Azure applications, the use of asynchronous operations means that a lot more IO requests are executed for each web request.


    • Edited by KMoon Monday, November 12, 2012 9:46 AM
    Monday, November 12, 2012 9:44 AM
  • @KMoon  Assume partitioning has already been done and there are many entities in the partition.  My goal is to read the last 4 entries in the list, and I can't change the sort order, I need to quickly get to the last page.

    Also assume that this partition will never become "hot", since the concurrency is mostly limited to the active query.

    I'm still interested in using TPL Dataflow to access the data.  Can anyone offer sample code that works with the latest storage client?

    Monday, November 12, 2012 5:35 PM
  • Hi

    Currently I can't find any code about that.

    I suggest you submit your idea to this website.

    http://www.mygreatwindowsazureidea.com/forums/103009-windows-azure-code-samples-voting

    Dev team will check your idea there, and write sample code if they think it's useful.

    Thanks 

    • Marked as answer by Dino He Tuesday, November 20, 2012 3:17 AM
    Wednesday, November 14, 2012 7:55 AM