none
IOCP RRS feed

  • Question

  • Recently I have tooken a deeper look into a Stream a-sync behavior, because I was getting the feeling that it doesn't perform as expected.

    For example, I was expecting that FileStream.WriteAsync will use IOCP for writing
    in order to avoid ThreadPool  starvation while the disk controller is doing its work.

    I wrote a small test which seem to indicate that this is not the case (actually when I was testing it with WCF it was performing as expected).

    The test is very simple:

    private async static Task WriteFileThreadReport()

    {

        using (var f = File.OpenWrite(FILE_NAME))

        {

            byte[] buffer = Encoding.UTF8.GetBytes(new string('X', 100000000));

            var sw = Stopwatch.StartNew();

            Task t = f.WriteAsync(buffer, 0, buffer.Length);

            Thread trd = new Thread(() =>

            {

                var last = Tuple.Create(0, 0);

                while (!t.IsCompleted)

                {

                    var cur = WritePoolUsage();

                    if (!cur.Equals(last))

                    {

                        Console.WriteLine("While writing: Worker = {0}, IOCP = {1}", cur.Item1, cur.Item2);

                        last = cur;

                    }

                }

            });

            trd.Start();

            await t;

            sw.Stop();

            Console.WriteLine("Duration: {0}", sw.ElapsedMilliseconds);

            WritePoolUsage("Cont write");

        }

    }

    private static void WritePoolUsage(string title)

    {

        int wrkTrd, ioTrd;

        ThreadPool.GetAvailableThreads(out wrkTrd, out ioTrd);

        Console.WriteLine("{0}: Worker = {1}, IO = {2}", title, _maxWrkTrd - wrkTrd, _maxIoTrd - ioTrd);

    }

    private static Tuple<int, int> WritePoolUsage()

    {

        int wrkTrd, ioTrd;

        ThreadPool.GetAvailableThreads(out wrkTrd, out ioTrd);

        return Tuple.Create( _maxWrkTrd - wrkTrd, _maxIoTrd - ioTrd);

    }

    Do I miss anything?


    Bnaya Eshet

    Sunday, February 10, 2013 12:37 PM

Answers

  • "Do I miss anything?"

    Yes, you're not opening the file for async IO. You have to create the stream by using one of the FileStream constructors that allows you to specify async IO. This one for example: http://msdn.microsoft.com/EN-US/library/ms143396.aspx (with FileOptions.Asynchronous).

    • Proposed as answer by Blackwood Sunday, February 10, 2013 5:57 PM
    • Marked as answer by Bnaya Eshet Sunday, February 10, 2013 6:02 PM
    Sunday, February 10, 2013 5:49 PM
    Moderator
  • I find the answer, you must open the FileStream as a-sync:

    new FileStream(path, FileMode.OpenOrCreate, FileAccess.Write, FileShare.None, bufferSize, FileOptions.Asynchronous);

    or

    new FileStream(path, FileMode.OpenOrCreate, FileAccess.Write, FileShare.None, bufferSize, true);

    but it will still call the callback on the worker thread because the FileStream infrastructure
    is opening new worker thread when the IOCP complete:

    internal sealed class FileStreamAsyncResult : IAsyncResult
    {
         …
         internal void CallUserCallback()
         {
                  if (this._userCallback != null)
                 {
                     this._completedSynchronously = false;
                     ThreadPool.QueueUserWorkItem(delegate(object state)
                     {
                         ((FileStreamAsyncResult)state).CallUserCallbackWorker();
                     }, this);
                     return;
                 }
                 this._isComplete = true;
                 Thread.MemoryBarrier();
                 if (this._waitHandle != null)
                 {
                     this._waitHandle.Set();
                 }
           }
    }


    Bnaya Eshet



    • Edited by Bnaya Eshet Sunday, February 10, 2013 5:53 PM
    • Marked as answer by Bnaya Eshet Sunday, February 10, 2013 5:54 PM
    Sunday, February 10, 2013 5:49 PM

All replies

  • Maybe it uses here a different techniques: based on overlapped IO, using WriteFile function and OVERLAPPED structure from Windows API, and a manual-reset event.

    For deeper investigations, try debugging .NET source code: http://msdn.microsoft.com/en-us/library/cc667410(v=vs.110).aspx.

    Sunday, February 10, 2013 4:19 PM
  • "Do I miss anything?"

    Yes, you're not opening the file for async IO. You have to create the stream by using one of the FileStream constructors that allows you to specify async IO. This one for example: http://msdn.microsoft.com/EN-US/library/ms143396.aspx (with FileOptions.Asynchronous).

    • Proposed as answer by Blackwood Sunday, February 10, 2013 5:57 PM
    • Marked as answer by Bnaya Eshet Sunday, February 10, 2013 6:02 PM
    Sunday, February 10, 2013 5:49 PM
    Moderator
  • I find the answer, you must open the FileStream as a-sync:

    new FileStream(path, FileMode.OpenOrCreate, FileAccess.Write, FileShare.None, bufferSize, FileOptions.Asynchronous);

    or

    new FileStream(path, FileMode.OpenOrCreate, FileAccess.Write, FileShare.None, bufferSize, true);

    but it will still call the callback on the worker thread because the FileStream infrastructure
    is opening new worker thread when the IOCP complete:

    internal sealed class FileStreamAsyncResult : IAsyncResult
    {
         …
         internal void CallUserCallback()
         {
                  if (this._userCallback != null)
                 {
                     this._completedSynchronously = false;
                     ThreadPool.QueueUserWorkItem(delegate(object state)
                     {
                         ((FileStreamAsyncResult)state).CallUserCallbackWorker();
                     }, this);
                     return;
                 }
                 this._isComplete = true;
                 Thread.MemoryBarrier();
                 if (this._waitHandle != null)
                 {
                     this._waitHandle.Set();
                 }
           }
    }


    Bnaya Eshet



    • Edited by Bnaya Eshet Sunday, February 10, 2013 5:53 PM
    • Marked as answer by Bnaya Eshet Sunday, February 10, 2013 5:54 PM
    Sunday, February 10, 2013 5:49 PM
  • That was a funny coincidence, I've seen "duplicate" answers before but usually right after a question is posted, not hours later... :)

    "but it will still call the callback on the worker thread because the FileStream infrastructure
    is opening new worker thread when the IOCP complete:"

    Yes but that happens after the async operation completes so it's not like a thread pool thread is kept busy during IO.

    Sunday, February 10, 2013 6:32 PM
    Moderator
  • thats right but it got me confuse at first glance

    Bnaya Eshet

    Sunday, February 10, 2013 9:18 PM
  • actually after reading the following post

    I was making my code snippet even better, but if you will try to run it
    you find that eventhough read is using ICOP it is blocking until the read's completion and
    writing does not using IOCP at all (the code snippet was tested on 2 separate machines ones with SSD and the other without)

    I was puting a stopwatch sampling before and after the a-sync call (which should return immediately, but it took it the entire reading time)

    I'm adding my current code snippet, please try to run it and tell me whether you are getting different results
    or if you think of any way to impove it.

    I will also appreciate if you can share other snippet that do the work.

        class Program
        {
            private const string FOLDER_NAME = "FILES";
            private const int STR_SIZE = 500000000;
            private static readonly byte[] BUFFER = Encoding.UTF8.GetBytes(new string('X', STR_SIZE));
    
            private static int _maxWrkTrd, _maxIoTrd;
    
            /// <summary>
            /// Mains the specified args.
            /// </summary>
            /// <param name="args">The args.</param>
            static void Main(string[] args)
            {
                BUFFER[0] = (byte)'#';
                BUFFER[STR_SIZE - 1] = (byte)'%';
    
                ThreadPool.GetMaxThreads(out _maxWrkTrd, out _maxIoTrd);
                WritePoolUsage("Start");
                //WritePoolUsage("Press any key to continue");
                //Console.ReadKey();
    
                Console.WriteLine("--------- FILE Read--------------");
                RedFile().Wait();
                Console.WriteLine();
    
                Console.WriteLine("--------- FILE Write--------------");
                WriteFile().Wait();
                Console.WriteLine();
    
                Console.WriteLine("------- FILE APM Write------------");
                WriteFileAPM().Wait();
                Console.WriteLine();
    
                //Console.WriteLine("--------- Wcf --------------");
                //WriteWcfFile().Wait();
                //Console.WriteLine();
    
                //Console.WriteLine("--------- Web Api --------------");
                //WriteWebApiFile().Wait();
                //Console.WriteLine();
    
                //Console.WriteLine("--------- EF --------------");
                //SaveDataAsync().Wait();
                //Console.WriteLine();
    
                Console.WriteLine("Complete");
                Console.ReadKey();
            }
    
            #region RedFile
    
            private async static Task RedFile()
            {
                using (var f = OpenFileRead())
                {
                    var cts = new CancellationTokenSource();
                    WatchingThread(cts.Token);
                    var sw = Stopwatch.StartNew();
                    Console.WriteLine("@ Before {0}", sw.ElapsedMilliseconds);
                    byte[] buffer = new byte[STR_SIZE];
                    Task<int> t = f.ReadAsync(buffer, 0, buffer.Length);
                    Console.WriteLine("@After {0}", sw.ElapsedMilliseconds);           
                    await t;
                    sw.Stop();
                    cts.Cancel();
                    if (t.Result != STR_SIZE || buffer[0] != (byte)'#' || buffer[STR_SIZE - 1 ] != (byte)'%')
                        Console.WriteLine("Corrupt data");
                    Console.WriteLine("Duration: {0}", sw.ElapsedMilliseconds);
                    WritePoolUsage("Cont write");
                }
            }
    
            #endregion // WriteFile
    
            #region WriteFile
    
            private async static Task WriteFile()
            {
                using (var f = OpenFileWrite())
                {
                    var cts = new CancellationTokenSource();
                    WatchingThread(cts.Token);
                    var sw = Stopwatch.StartNew();
                    Console.WriteLine("@ Before {0}", sw.ElapsedMilliseconds);
                    Task t = f.WriteAsync(BUFFER, 0, STR_SIZE, CancellationToken.None);
                    Console.WriteLine("@After {0}", sw.ElapsedMilliseconds);
                    await t;
                    sw.Stop();
                    cts.Cancel();
                    Console.WriteLine("Duration: {0}", sw.ElapsedMilliseconds);
                    WritePoolUsage("Cont write");
                }
            }
    
            #endregion // WriteFile
    
            #region WriteFileContinueWith
    
            private static Task WriteFileContinueWith()
            {
                var f = OpenFileWrite();
                byte[] buffer = Encoding.UTF8.GetBytes(new string('X', 100000000));
                var sw = Stopwatch.StartNew();
                return f.WriteAsync(buffer, 0, buffer.Length).ContinueWith(t =>
                    {
                        sw.Stop();
                        Console.WriteLine("Duration: {0}", sw.ElapsedMilliseconds);
                        WritePoolUsage("Cont write");
                        f.Dispose();
                    });
            }
    
            #endregion // WriteFileContinueWith
    
            #region WriteFileAPM
    
            private async static Task WriteFileAPM()
            {
                using (var f = OpenFileWrite())
                {
                    var cts = new CancellationTokenSource();
                    WatchingThread(cts.Token);
                    var sw = Stopwatch.StartNew();
                    Console.WriteLine("@ Before {0}", sw.ElapsedMilliseconds);
                    Task t = Task.Factory.FromAsync(f.BeginWrite, f.EndWrite, BUFFER, 0, STR_SIZE, null);
                    Console.WriteLine("@After {0}", sw.ElapsedMilliseconds);
                    await t;
                    sw.Stop();
                    cts.Cancel();
                    Console.WriteLine("Duration: {0}", sw.ElapsedMilliseconds);
                    WritePoolUsage("Cont write");
    
                }
            }
    
            #endregion // WriteFileAPM
    
            #region WriteWcfFile
    
            private async static Task WriteWcfFile()
            {
                using (var pxy = new Wcf.AsyncClient())
                {
                    var sw = Stopwatch.StartNew();
                    string result = await pxy.GetDataAsync();
                    sw.Stop();
                    Console.WriteLine("Duration: {0}", sw.ElapsedMilliseconds);
                    WritePoolUsage("Cont write");
                    Console.WriteLine("------------------------");
                    Console.WriteLine(result);
                    Console.WriteLine("------------------------");
                }
            }
    
            #endregion // WriteWcfFile
    
            #region WriteWebApiFile
    
            private async static Task WriteWebApiFile()
            {
                string uri = "http://localhost:8000/api/async";
                //string uri = "http://localhost:1395/api/async";
    
                var client = new HttpClient();
                var sw = Stopwatch.StartNew();
                byte[] buffer= await client.GetByteArrayAsync(uri);
                sw.Stop();
                Console.WriteLine("Duration: {0}", sw.ElapsedMilliseconds);
                WritePoolUsage("Cont write");
                Console.WriteLine("------------------------");
                string result = Encoding.UTF8.GetString(buffer);
                Console.WriteLine(result);
                Console.WriteLine("------------------------");
            }
    
            #endregion // WriteWebApiFile
    
            #region SaveDataAsync
    
            private static async Task SaveDataAsync()
            {
                using (var context = new CodeFirstContext())
                {
                    var cts = new CancellationTokenSource();
                    for (int i = 0; i < 1000; i++)
                    {
                        var dto = new JustAPoco { Name = "X " + i };
                        context.Poco.Add(dto);
                    }
                    var sw = Stopwatch.StartNew();
                    WatchingThread(cts.Token);
                    Console.WriteLine("@ Before {0}", sw.ElapsedMilliseconds);
                    Task t = context.SaveChangesAsync();
                    Console.WriteLine("@After {0}", sw.ElapsedMilliseconds);
                    await t;
                    sw.Stop();
                    cts.Cancel();
                    Console.WriteLine("Duration: {0}", sw.ElapsedMilliseconds);
                }
                WritePoolUsage("Cont write");
            }
    
            #endregion // SaveDataAsync
    
            #region WatchingThread
    
            private static void WatchingThread(CancellationToken token)
            {
                Thread trd = new Thread((state) =>
                {
                    var cancellation = (CancellationToken)state;
                    Tuple<int, int> last = null;
    
                    int i = 0;
                    while (!cancellation.IsCancellationRequested && i < 7)
                    {
                        var cur = WritePoolUsage();
                        if (!object.Equals(cur, last))
                        {
                            Console.WriteLine("\t*** Worker = {0}, IOCP = {1}  ***", cur.Item1, cur.Item2);
                            last = cur;
                            i++;
                        }
                    }
                });
                trd.Start(token);
            }
    
            #endregion // WatchingThread
    
            #region OpenFileWrite
    
            private static FileStream OpenFileWrite()
            {
                string fileName = Path.GetTempFileName();
                var path = Path.Combine(FOLDER_NAME, fileName);
    
                if (Directory.Exists(FOLDER_NAME))
                {
                    Directory.Delete(FOLDER_NAME);
                    Thread.Sleep(1);
                }
                byte[] buffer = new byte[STR_SIZE];
                File.WriteAllBytes(path, buffer);
    
                return new FileStream(path, FileMode.Open, FileAccess.Write, FileShare.None, STR_SIZE, FileOptions.Asynchronous | FileOptions.SequentialScan | FileOptions.WriteThrough);
                //return new FileStream(path, FileMode.Open, FileAccess.Write, FileShare.None, STR_SIZE, true);
            }
    
            #endregion // OpenFileWrite
    
            #region OpenFileRead
    
            private static FileStream OpenFileRead()
            {
                string fileName = Path.GetTempFileName();
                var path = Path.Combine(FOLDER_NAME, fileName);
    
                if (Directory.Exists(FOLDER_NAME))
                {
                    Directory.Delete(FOLDER_NAME);
                    Thread.Sleep(1);
                }
    
                File.WriteAllBytes(path, BUFFER);
                //return new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.None, STR_SIZE, FileOptions.Asynchronous | FileOptions.SequentialScan | FileOptions.WriteThrough);
                return new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.None, STR_SIZE, true);
            }
    
            #endregion // OpenFileRead
    
            #region WritePoolUsage
    
            private static void WritePoolUsage(string title)
            {
                int wrkTrd, ioTrd;
                ThreadPool.GetAvailableThreads(out wrkTrd, out ioTrd);
                Console.WriteLine("{0}: Worker = {1}, IO = {2}", title, _maxWrkTrd - wrkTrd, _maxIoTrd - ioTrd);
            }
    
            private static Tuple<int, int> WritePoolUsage()
            {
                int wrkTrd, ioTrd;
                ThreadPool.GetAvailableThreads(out wrkTrd, out ioTrd);
                return Tuple.Create( _maxWrkTrd - wrkTrd, _maxIoTrd - ioTrd);
            }
    
            #endregion // WritePoolUsage
        }
    


    Bnaya Eshet

    Wednesday, February 13, 2013 10:08 PM
  • "I was making my code snippet even better, but if you will try to run it
    you find that eventhough read is using ICOP it is blocking until the read's completion and
    writing does not using IOCP at all"

    Try getting rid of the FileStream's buffer, set it to something like 4096. Using STR_SIZE will likely cause additional copying of the data and anyway you really don't want such a large buffer.

    Wednesday, February 13, 2013 10:46 PM
    Moderator
  • using a small buffer does help for the write scenario but doen't for the read

    Thanks

    Bnaya Eshet


    • Edited by Bnaya Eshet Thursday, February 14, 2013 4:40 PM
    Thursday, February 14, 2013 4:35 PM
  • Hmm, I see. But it's the best you can get in this situation.

    Async IO via IOCP works best with unbuffered IO but you can't do unbuffered IO with .NET's FileStream. and even if you could unbuffered IO is not for everything. With buffered IO the system needs to copy data from the file cache to the user provided buffer, this requires CPU and obviously it cannot be done async, at least not in the way you expect.

    Thursday, February 14, 2013 9:29 PM
    Moderator