locked
Multi Threading while uploading files to blob storage in C# RRS feed

  • Question

  • Hi guys,

    I have an issue here. My current code does threadpool to upload files to blob storage through PutBlob. However, I have modified the code to using PutBlockBlob, how should I code the threadpool with PutBlockBlob.

    I tried and not all the files were uploaded correctly, some were 0bytes. I guess that the BlockIDList got overwritten or somehow lost during transit.

    Can anyone help me out here? Is there a way where I can use threadpool with PutBlockBlob or can I configure the BlockIDList to be unique to each file while it's threading.

     

    Thanks!

    Sunday, May 29, 2011 7:31 AM

Answers

  • Hi kennetham,

    > I tried and not all the files were uploaded correctly, some were 0bytes.

    I experience the same issue by using the following code sample. Does the issue occurs both for storage emulator and real storage account? For my case, it only happens when it is targeting storage emulator.

    class MultipleThreadPutBlockBlob
    {
        readonly int blockSize = 40 * 1024; // 40 KB
        readonly string storageAccountConnectionString = "UseDevelopmentStorage=true";
        //readonly string storageAccountConnectionString = "DefaultEndpointsProtocol=https;AccountName=[account name];AccountKey=[account key]";
        readonly string blockContainer = "files";

        public void Run()
        {
            List<string> fileList = new List<string>()
            {
                @"D:\SHAWebService.zip",
                @"D:\RuneApp.zip"
            };

            Dictionary<string, List<Block>> blockList = GetBlockList(fileList);

            Upload(blockList);

            CheckUploaded(blockList);

            Submit(blockList);
        }

        private void Submit(Dictionary<string, List<Block>> blockList)
        {
            foreach (string filePath in blockList.Keys)
            {
                List<string> blockIDs = (from block in blockList[filePath]
                                            select block.BlockID).ToList();
                string fileName = Path.GetFileName(filePath);
                GetBlockBlob(fileName).PutBlockList(blockIDs);
            }
            Console.WriteLine("Upload completed.");
        }

        private void CheckUploaded(Dictionary<string, List<Block>> blockList)
        {
            while (true)
            {
                Thread.Sleep(1000);
                Console.WriteLine(String.Format("Pending upload ({0}):", DateTime.Now.ToString()));
                bool IsAllUploaded = true;
                foreach (string filePath in blockList.Keys)
                {
                    int penddingUploadCount = blockList[filePath].Where(p => !p.IsUploaded).Count();

                    Console.WriteLine(String.Format("File: {0}, Block count: {1}", filePath, penddingUploadCount));

                    if (penddingUploadCount > 0)
                    {
                        IsAllUploaded = false;
                    }
                }
                Console.WriteLine();
                if (IsAllUploaded)
                {
                    return;
                }
            }
        }

        void Upload(Dictionary<string, List<Block>> blockList)
        {
            foreach (string filePath in blockList.Keys)
            {
                foreach (Block block in blockList[filePath])
                {
                    new Thread(UploadBlock).Start(block);
                    //UploadBlock(block);
                }
            }
        }

        void UploadBlock(object blockObject)
        {
            Block block = (Block)blockObject;
            Stream data = GetBlockData(block);

            string fileName = Path.GetFileName(block.filePath);
            GetBlockBlob(fileName).PutBlock(block.BlockID, data, null);

            block.IsUploaded = true;
        }

        Stream GetBlockData(Block block)
        {
            FileStream fileStream = File.OpenRead(block.filePath);
            fileStream.Seek(block.Offset, SeekOrigin.Begin);

            byte[] bufferBytes = new byte[blockSize];
            int readCount = fileStream.Read(bufferBytes, 0, blockSize);

            return new MemoryStream(bufferBytes, 0, readCount);
        }

        CloudBlockBlob GetBlockBlob(string fileName)
        {
            var storageAccount = CloudStorageAccount.Parse(storageAccountConnectionString);
            CloudBlobClient blobStorage = storageAccount.CreateCloudBlobClient();
               
            CloudBlobContainer container = blobStorage.GetContainerReference(blockContainer);
            container.CreateIfNotExist();

            return container.GetBlockBlobReference(fileName);
        }

        Dictionary<string, List<Block>> GetBlockList(List<string> fileList)
        {
            Dictionary<string, List<Block>> blockList = new Dictionary<string, List<Block>>();

            foreach (string filePath in fileList)
            {
                List<Block> blocks = new List<Block>();

                using (FileStream fileStream = File.OpenRead(filePath))
                {
                    int blockCount = (int)(fileStream.Length / blockSize) + 1;
                    for (int i = 0; i < blockCount; i++)
                    {
                        string currentBlockId = Convert.ToBase64String(Encoding.UTF8.GetBytes(Guid.NewGuid().ToString()));
                        Block block = new Block()
                        {
                            filePath = filePath,
                            BlockID = Convert.ToBase64String(Encoding.UTF8.GetBytes(Guid.NewGuid().ToString())),
                            Offset = blockSize * i,
                            IsUploaded = false
                        };
                        blocks.Add(block);
                    }
                }

                blockList.Add(filePath, blocks);
            }

            return blockList;
        }

        class Block
        {
            public string filePath { get; set; }
            public string BlockID { get; set; }
            public long Offset { get; set; }
            public bool IsUploaded { get; set; }
        }
    }

    I will involve my colleague to look if it is an issue of storage emulator or not.

    Thanks,


    Wengchao Zeng
    Please mark the replies as answers if they help or unmark if not.
    If you have any feedback about my replies, please contact msdnmg@microsoft.com.
    Microsoft One Code Framework
    • Marked as answer by kennetham Tuesday, May 31, 2011 2:58 AM
    Monday, May 30, 2011 9:21 AM
  • To answer Wenchao's question about the storage emulator bug, yes there is a bug when using multiple threads to upload blocks to a new blob in the Storage Emulator. 

    The workaround for now is to first create the blob by doing something like blob.UploadText("a");, and then continue with your normal blob upload code.  This will be fixed in a future SDK release.

     

    To answer kennetham's question about how to upload files to blob storage using multiple threads, check out

    http://blogs.msdn.com/b/kwill/archive/2011/05/30/asynchronous-parallel-block-blob-transfers-with-progress-change-notification.aspx


    bill boyce
    Tuesday, May 31, 2011 9:58 PM

All replies

  • Hi kennetham,

    > Can anyone help me out here?

    May I ask you to share some code so that maybe we can find something from it? It will be the best if a sample project can be provided.

    Thanks,


    Wengchao Zeng
    Please mark the replies as answers if they help or unmark if not.
    If you have any feedback about my replies, please contact msdnmg@microsoft.com.
    Microsoft One Code Framework
    Monday, May 30, 2011 9:05 AM
  • Hi kennetham,

    > I tried and not all the files were uploaded correctly, some were 0bytes.

    I experience the same issue by using the following code sample. Does the issue occurs both for storage emulator and real storage account? For my case, it only happens when it is targeting storage emulator.

    class MultipleThreadPutBlockBlob
    {
        readonly int blockSize = 40 * 1024; // 40 KB
        readonly string storageAccountConnectionString = "UseDevelopmentStorage=true";
        //readonly string storageAccountConnectionString = "DefaultEndpointsProtocol=https;AccountName=[account name];AccountKey=[account key]";
        readonly string blockContainer = "files";

        public void Run()
        {
            List<string> fileList = new List<string>()
            {
                @"D:\SHAWebService.zip",
                @"D:\RuneApp.zip"
            };

            Dictionary<string, List<Block>> blockList = GetBlockList(fileList);

            Upload(blockList);

            CheckUploaded(blockList);

            Submit(blockList);
        }

        private void Submit(Dictionary<string, List<Block>> blockList)
        {
            foreach (string filePath in blockList.Keys)
            {
                List<string> blockIDs = (from block in blockList[filePath]
                                            select block.BlockID).ToList();
                string fileName = Path.GetFileName(filePath);
                GetBlockBlob(fileName).PutBlockList(blockIDs);
            }
            Console.WriteLine("Upload completed.");
        }

        private void CheckUploaded(Dictionary<string, List<Block>> blockList)
        {
            while (true)
            {
                Thread.Sleep(1000);
                Console.WriteLine(String.Format("Pending upload ({0}):", DateTime.Now.ToString()));
                bool IsAllUploaded = true;
                foreach (string filePath in blockList.Keys)
                {
                    int penddingUploadCount = blockList[filePath].Where(p => !p.IsUploaded).Count();

                    Console.WriteLine(String.Format("File: {0}, Block count: {1}", filePath, penddingUploadCount));

                    if (penddingUploadCount > 0)
                    {
                        IsAllUploaded = false;
                    }
                }
                Console.WriteLine();
                if (IsAllUploaded)
                {
                    return;
                }
            }
        }

        void Upload(Dictionary<string, List<Block>> blockList)
        {
            foreach (string filePath in blockList.Keys)
            {
                foreach (Block block in blockList[filePath])
                {
                    new Thread(UploadBlock).Start(block);
                    //UploadBlock(block);
                }
            }
        }

        void UploadBlock(object blockObject)
        {
            Block block = (Block)blockObject;
            Stream data = GetBlockData(block);

            string fileName = Path.GetFileName(block.filePath);
            GetBlockBlob(fileName).PutBlock(block.BlockID, data, null);

            block.IsUploaded = true;
        }

        Stream GetBlockData(Block block)
        {
            FileStream fileStream = File.OpenRead(block.filePath);
            fileStream.Seek(block.Offset, SeekOrigin.Begin);

            byte[] bufferBytes = new byte[blockSize];
            int readCount = fileStream.Read(bufferBytes, 0, blockSize);

            return new MemoryStream(bufferBytes, 0, readCount);
        }

        CloudBlockBlob GetBlockBlob(string fileName)
        {
            var storageAccount = CloudStorageAccount.Parse(storageAccountConnectionString);
            CloudBlobClient blobStorage = storageAccount.CreateCloudBlobClient();
               
            CloudBlobContainer container = blobStorage.GetContainerReference(blockContainer);
            container.CreateIfNotExist();

            return container.GetBlockBlobReference(fileName);
        }

        Dictionary<string, List<Block>> GetBlockList(List<string> fileList)
        {
            Dictionary<string, List<Block>> blockList = new Dictionary<string, List<Block>>();

            foreach (string filePath in fileList)
            {
                List<Block> blocks = new List<Block>();

                using (FileStream fileStream = File.OpenRead(filePath))
                {
                    int blockCount = (int)(fileStream.Length / blockSize) + 1;
                    for (int i = 0; i < blockCount; i++)
                    {
                        string currentBlockId = Convert.ToBase64String(Encoding.UTF8.GetBytes(Guid.NewGuid().ToString()));
                        Block block = new Block()
                        {
                            filePath = filePath,
                            BlockID = Convert.ToBase64String(Encoding.UTF8.GetBytes(Guid.NewGuid().ToString())),
                            Offset = blockSize * i,
                            IsUploaded = false
                        };
                        blocks.Add(block);
                    }
                }

                blockList.Add(filePath, blocks);
            }

            return blockList;
        }

        class Block
        {
            public string filePath { get; set; }
            public string BlockID { get; set; }
            public long Offset { get; set; }
            public bool IsUploaded { get; set; }
        }
    }

    I will involve my colleague to look if it is an issue of storage emulator or not.

    Thanks,


    Wengchao Zeng
    Please mark the replies as answers if they help or unmark if not.
    If you have any feedback about my replies, please contact msdnmg@microsoft.com.
    Microsoft One Code Framework
    • Marked as answer by kennetham Tuesday, May 31, 2011 2:58 AM
    Monday, May 30, 2011 9:21 AM
  • I think that works! Thanks! But I guess I will look into another solution to see if I can open multiple HTTP connections to upload the block blobs instead of threading because still the threading will upload to 1 connection which still incurs an overhead in the network.

     

    Thanks!

    Tuesday, May 31, 2011 2:58 AM
  • To answer Wenchao's question about the storage emulator bug, yes there is a bug when using multiple threads to upload blocks to a new blob in the Storage Emulator. 

    The workaround for now is to first create the blob by doing something like blob.UploadText("a");, and then continue with your normal blob upload code.  This will be fixed in a future SDK release.

     

    To answer kennetham's question about how to upload files to blob storage using multiple threads, check out

    http://blogs.msdn.com/b/kwill/archive/2011/05/30/asynchronous-parallel-block-blob-transfers-with-progress-change-notification.aspx


    bill boyce
    Tuesday, May 31, 2011 9:58 PM
  • I am updating this old question because I still get a lot of blog hits from Bill's post.  Please note that there is a new version of my blog post using the *FromStream methods in Azure Storage Client library 2.0.  This new code is more performant and more reliable, and still provides all of the progress reporting.

    Asynchronous Parallel Block Blob Transfers with Progress Change Notification 2.0

    Wednesday, March 6, 2013 9:37 PM