none
How to wrap SqlDataReader with IObservable properly?

    Question

  • I would like to explore the possibility of using IObservable<T> as a wrapper around an SqlDataReader. Until now we were using the reader to avoid materializing the entire result in the memory and we did so using blocking synchronous API.

    Now we want to try and use asynchronous API in conjunction with the .NET Reactive Extensions.

    However, this code will have to coexist with a synchronous code as adopting the asynchronous ways is a gradual process.

    We already know that this mix of synchronous and asynchronous would not work in ASP.NET - for that the entire request execution path must be asynchronous all throughout. An excellent article on the subject is http://blog.stephencleary.com/2012/07/dont-block-on-async-code.html

    But I am talking about a plain WCF service. We already mix asynchronous and synchronous code there, however this is the first time we wish to introduce Rx and there are troubles.

    I have created simple unit tests to demonstrate the issues. My hope is that someone will be able to explain me what is going on. Please, find below the entire source code (using Moq):
    using System;
    using System.Data.Common;
    using System.Diagnostics;
    using System.Linq;
    using System.Reactive.Linq;
    using System.Threading;
    using System.Threading.Tasks;
    using Microsoft.VisualStudio.TestTools.UnitTesting;
    using Moq;
    
    namespace UnitTests
    {
        public static class Extensions
        {
            public static Task<List<T>> ToListAsync<T>(this IObservable<T> observable)
            {
                var res = new List<T>();
                var tcs = new TaskCompletionSource<List<T>>();
                observable.Subscribe(res.Add, e => tcs.TrySetException(e), () => tcs.TrySetResult(res));
                return tcs.Task;
            }
        }
    
        [TestClass]
        public class TestRx
        {
            public const int UNIT_TEST_TIMEOUT = 5000;
    
            private static DbDataReader CreateDataReader(int count = 100, int msWait = 10)
            {
                var curItemIndex = -1;
    
                var mockDataReader = new Mock<DbDataReader>();
                mockDataReader.Setup(o => o.ReadAsync(It.IsAny<CancellationToken>())).Returns<CancellationToken>(ct => Task.Factory.StartNew(() =>
                {
                    Thread.Sleep(msWait);
                    if (curItemIndex + 1 < count && !ct.IsCancellationRequested)
                    {
                        ++curItemIndex;
                        return true;
                    }
                    Trace.WriteLine(curItemIndex);
                    return false;
                }));
                mockDataReader.Setup(o => o[0]).Returns<int>(_ => curItemIndex);
                mockDataReader.CallBase = true;
                mockDataReader.Setup(o => o.Close()).Verifiable();
                return mockDataReader.Object;
            }
    
            private static IObservable<int> GetObservable(DbDataReader reader)
            {
                return Observable.Create<int>(async (obs, cancellationToken) =>
                {
                    using (reader)
                    {
                        while (!cancellationToken.IsCancellationRequested && await reader.ReadAsync(cancellationToken))
                        {
                            obs.OnNext((int)reader[0]);
                        }
                    }
                });
            }
    
            [TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
            public void ToListAsyncResult()
            {
                var reader = CreateDataReader();
                var numbers = GetObservable(reader).ToListAsync().Result;
                CollectionAssert.AreEqual(Enumerable.Range(0, 100).ToList(), numbers);
                Mock.Get(reader).Verify(o => o.Close());
            }
    
            [TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
            public void ToEnumerableToList()
            {
                var reader = CreateDataReader();
                var numbers = GetObservable(reader).ToEnumerable().ToList();
                CollectionAssert.AreEqual(Enumerable.Range(0, 100).ToList(), numbers);
                Mock.Get(reader).Verify(o => o.Close());
            }
    
            [TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
            public void ToEnumerableForEach()
            {
                var reader = CreateDataReader();
                int i = 0;
                foreach (var n in GetObservable(reader).ToEnumerable())
                {
                    Assert.AreEqual(i, n);
                    ++i;
                }
                Assert.AreEqual(100, i);
                Mock.Get(reader).Verify(o => o.Close());
            }
    
            [TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
            public void ToEnumerableForEachBreak()
            {
                var reader = CreateDataReader();
                int i = 0;
                foreach (var n in GetObservable(reader).ToEnumerable())
                {
                    Assert.AreEqual(i, n);
                    ++i;
                    if (i == 5)
                    {
                        break;
                    }
                }
                Mock.Get(reader).Verify(o => o.Close());
            }
    
            [TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
            public void ToEnumerableForEachThrow()
            {
                var reader = CreateDataReader();
                int i = 0;
                try
                {
                    foreach (var n in GetObservable(reader).ToEnumerable())
                    {
                        Assert.AreEqual(i, n);
                        ++i;
                        if (i == 5)
                        {
                            throw new Exception("xo-xo");
                        }
                    }
                    Assert.Fail();
                }
                catch (Exception exc)
                {
                    Assert.AreEqual("xo-xo", exc.Message);
                    Mock.Get(reader).Verify(o => o.Close());
                } 
            }
    
            [TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
            public void Subscribe()
            {
                var reader = CreateDataReader();
                var tcs = new TaskCompletionSource<object>();
                int i = 0;
                GetObservable(reader).Subscribe(n =>
                {
                    Assert.AreEqual(i, n);
                    ++i;
                }, () =>
                {
                    Assert.AreEqual(100, i);
                    Mock.Get(reader).Verify(o => o.Close());
                    tcs.TrySetResult(null);
                });
    
                tcs.Task.Wait();
            }
    
            [TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
            public void SubscribeCancel()
            {
                var reader = CreateDataReader();
                var tcs = new TaskCompletionSource<object>();
                var cts = new CancellationTokenSource();
                int i = 0;
                GetObservable(reader).Subscribe(n =>
                {
                    Assert.AreEqual(i, n);
                    ++i;
                    if (i == 5)
                    {
                        cts.Cancel();
                    }
                }, e =>
                {
                    Assert.IsTrue(i < 100);
                    Mock.Get(reader).Verify(o => o.Close());
                    tcs.TrySetException(e);
                }, () =>
                {
                    Assert.IsTrue(i < 100);
                    Mock.Get(reader).Verify(o => o.Close());
                    tcs.TrySetResult(null);
                }, cts.Token);
    
                tcs.Task.Wait();
            }
    
            [TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
            public void SubscribeThrow()
            {
                var reader = CreateDataReader();
                var tcs = new TaskCompletionSource<object>();
                int i = 0;
                GetObservable(reader).Subscribe(n =>
                {
                    Assert.AreEqual(i, n);
                    ++i;
                    if (i == 5)
                    {
                        throw new Exception("xo-xo");
                    }
                }, e =>
                {
                    Assert.AreEqual("xo-xo", e.Message);
                    Mock.Get(reader).Verify(o => o.Close());
                    tcs.TrySetResult(null);
                });
    
                tcs.Task.Wait();
            }
        }
    }
    These unit tests capture all the possible uses of an API returning an `IObservable<T>` wrapping a data reader:

     - Devs might want to materialize it completely (using my `ToListAsync` extension method or `.ToEnumerable().ToList()`).
     - Devs might want to iterate over it using the `ToEnumerable` extension method. I know it is stupid, since a data reader is already an enumerable. It blocks if the consumption is fast and it materializes the data in an internal queue if the consumption is slow, but this scenario is feasible and legitimate nonetheless.
     - Finally devs might use the observable directly by subscribing to it, but they would have to wait for the end (blocking the thread) at some point, since most of the code around is still synchronous.

    An essential requirement is that the data reader be promptly disposed of once the reading is over - regardless of the way the observable is consumed.

    Of all the unit tests 4 fail:

     - `SubscribeCancel` and `SubscribeThrow` time out
     - `ToEnumerableForEachBreak` and `ToEnumerableForEachThrow` fail the validation of the data reader disposal.

    The data reader disposal validation failure is a matter of timing - when `foreach` is left (either through exception or break) the respective `IEnumerator` is immediately disposed, which ultimately cancels the cancellation token used by the implementation of the observable. However, that implementation runs on another thread and by the time it notices the cancellation - the unit test is already over. In a real application the reader would be properly and rather promptly disposed of, but it is not synchronized with the end of the iteration. I am wondering whether it is possible to make the disposal of the aforementioned `IEnumerator` instance to wait until the cancellation is noticed by the respective `IObservable` implementation and the reader is disposed of.

    Any help is greatly appreciated.


    Monday, May 26, 2014 2:09 AM

All replies

  • Hi Mark,

    This a big question and I think open for lots of opinion. I will share mine, and you can take what you want from it :-)

    First thanks for actual posting a working copy of some unit tests. It makes it a lot easier to help.

    Quick bullet point answers

    1) The ToListAsync method just looks like the standard ToList().ToTask() operators. So is there a reason to create your own one?

    2) Unit testing with Rx shouldn't need any Thread.Sleep() calls. If you are expecting to introduce concurrency, then I strongly recommend using Schedulers (IScheduler implementations). You can substitute them in tests for a TestScheudler instance and control the virtual time. This leads to much more predictable unit tests that are incredibly fast.

    3)  In your SubscribeCancel() unit test, you Cancel the CancelationTokenSource that you pass to the subscribe method. This means you have disposed of your subscription. This means you wont get an OnCompleted or OnError call. This means your TaskCompletionSource will never get set, hence you block on the tcs.Task.Wait() forever. This to me can pretty much all be replaced by a .Take(5) operator.

    4) The SubscribeThrow test is just a misunderstanding of Rx. I see you have made another post about it, and a matching bug report. I doubt this will get "fixed" as I dont believe it to be broken. OnNext handlers should not throw, just like standard Events handlers should not throw.

    5 ) To reiterate the Concurrency in unit tests thing, the ToEnumerableForEachBreak() is a victim of this. Eliminate the concurrency in your tests and you will be fine.

    I think it would be quite a different answer to the Headline question "How to wrap SqlDataReader in IObservable<T>"

    I hope this start helps

    Lee Campbell


    Lee Campbell http://LeeCampbell.blogspot.com


    • Edited by LeeCampbell Tuesday, June 03, 2014 9:09 PM Spelling correction
    Tuesday, June 03, 2014 9:08 PM
  • An update to remove the concurrency from your tests could be to favour the use of TaskCompletionSource over Task.Factory. I see you do use in some places, so you are familiar with it.

    I can get the final set of tests passing by substituting the datareader iterator function from

        ct => Task.Factory.StartNew(() =>
        {
            Thread.Sleep(msWait);
            if (curItemIndex + 1 < count && !ct.IsCancellationRequested)
            {
                ++curItemIndex;
                return true;
            }
            Trace.WriteLine(curItemIndex);
            return false;
        }
     

    with this code

        ct =>
        {
            var tcs = new TaskCompletionSource<bool>();
            if (curItemIndex + 1 < count && !ct.IsCancellationRequested)
            {
                ++curItemIndex;
                tcs.SetResult(true);
            }
            else
            {
                Trace.WriteLine(curItemIndex);
                tcs.SetResult(false);
            }
    
            return tcs.Task;
        };

    Full code changes below:

    public const int UNIT_TEST_TIMEOUT = 500;
    
            private static DbDataReader CreateDataReader(int count = 100)
            {
                var curItemIndex = -1;
    
                var mockDataReader = new Mock<DbDataReader>();
                mockDataReader.Setup(o => o.ReadAsync(It.IsAny<CancellationToken>()))
                              .Returns<CancellationToken>(ct =>
                                {
                                    var tcs = new TaskCompletionSource<bool>();
                                    if (curItemIndex + 1 < count && !ct.IsCancellationRequested)
                                    {
                                        ++curItemIndex;
                                        tcs.SetResult(true);
                                    }
                                    else
                                    {
                                        Trace.WriteLine(curItemIndex);
                                        tcs.SetResult(false);
                                    }
    
                                    return tcs.Task;
                                });
                mockDataReader.Setup(o => o[0]).Returns<int>(_ => curItemIndex);
                mockDataReader.CallBase = true;
                mockDataReader.Setup(o => o.Close()).Verifiable();
                return mockDataReader.Object;
            }
    
            private static IObservable<int> GetObservable(DbDataReader reader)
            {
                return Observable.Create<int>(async (obs, cancellationToken) =>
                {
                    using (reader)
                    {
                        while (!cancellationToken.IsCancellationRequested && await reader.ReadAsync(cancellationToken))
                        {
                            obs.OnNext((int)reader[0]);
                        }
                    }
                });
            }
    
            [TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
            public void ToListAsyncResult()
            {
                var reader = CreateDataReader();
                //var numbers = GetObservable(reader).ToListAsync().Result;
                var numbers = GetObservable(reader).ToArray().ToTask().Result;
                CollectionAssert.AreEqual(Enumerable.Range(0, 100).ToList(), numbers);
                Mock.Get(reader).Verify(o => o.Close());
            }
    
            [TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
            public void ToEnumerableToList()
            {
                var reader = CreateDataReader();
                var numbers = GetObservable(reader).ToEnumerable().ToList();
                CollectionAssert.AreEqual(Enumerable.Range(0, 100).ToList(), numbers);
                Mock.Get(reader).Verify(o => o.Close());
            }
    
            [TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
            public void ToEnumerableForEach()
            {
                var reader = CreateDataReader();
                var actual = GetObservable(reader).ToEnumerable().ToArray();
                CollectionAssert.AreEqual(Enumerable.Range(0, 100).ToArray(), actual);
                Mock.Get(reader).Verify(o => o.Close());
            }
    
            [TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
            public void ToEnumerableForEachBreak()
            {
                var reader = CreateDataReader();
                var actual = GetObservable(reader).ToEnumerable().Take(5).ToArray();
                CollectionAssert.AreEqual(Enumerable.Range(0, 5).ToArray(), actual);
                Mock.Get(reader).Verify(o => o.Close());
            }
    
            [TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
            public void ToEnumerableForEachThrow()
            {
                var reader = CreateDataReader();
                int i = 0;
                try
                {
                    foreach (var n in GetObservable(reader).ToEnumerable())
                    {
                        Assert.AreEqual(i, n);
                        ++i;
                        if (i == 5)
                        {
                            throw new Exception("xo-xo");
                        }
                    }
                    Assert.Fail();
                }
                catch (Exception exc)
                {
                    Assert.AreEqual("xo-xo", exc.Message);
                    Mock.Get(reader).Verify(o => o.Close());
                }
            }
    
            [TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
            public void Subscribe()
            {
                var reader = CreateDataReader();
                var tcs = new TaskCompletionSource<object>();
                int i = 0;
                GetObservable(reader).Subscribe(n =>
                {
                    Assert.AreEqual(i, n);
                    ++i;
                }, () =>
                {
                    Assert.AreEqual(100, i);
                    Mock.Get(reader).Verify(o => o.Close());
                    tcs.TrySetResult(null);
                });
    
                tcs.Task.Wait();
            }
    
            [TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
            public void SubscribeCancel()
            {
                var reader = CreateDataReader();
                var tcs = new TaskCompletionSource<object>();
                var cts = new CancellationTokenSource();
                int i = 0;
                GetObservable(reader).Subscribe(n =>
                {
                    Assert.AreEqual(i, n);
                    ++i;
                    if (i == 5)
                    {
                        cts.Cancel();
                        tcs.TrySetResult(null);
                    }
                }, e =>
                {
                    Assert.IsTrue(i < 100);
                    Mock.Get(reader).Verify(o => o.Close());
                    tcs.TrySetException(e);
                }, () =>
                {
                    Assert.IsTrue(i < 100);
                    Mock.Get(reader).Verify(o => o.Close());
                    tcs.TrySetResult(null);
                }, cts.Token);
    
                tcs.Task.Wait();
            }
    
    
            //"This is a bogus test. It is a misunderstanding of Rx."
            [TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT), Ignore]
            public void SubscribeThrow()
            {
                var reader = CreateDataReader();
                var tcs = new TaskCompletionSource<object>();
                int i = 0;
                GetObservable(reader).Subscribe(n =>
                {
                    Assert.AreEqual(i, n);
                    ++i;
                    if (i == 5)
                    {
                        throw new Exception("xo-xo");
                    }
                }, e =>
                {
                    Assert.AreEqual("xo-xo", e.Message);
                    Mock.Get(reader).Verify(o => o.Close());
                    tcs.TrySetResult(null);
                });
    
                tcs.Task.Wait();
            }

    HTH

    Lee Campbell


    Lee Campbell http://LeeCampbell.blogspot.com


    • Edited by LeeCampbell Tuesday, June 03, 2014 9:52 PM Added full code rewrite
    Tuesday, June 03, 2014 9:42 PM
  • Hello Lee.

    Thank you for taking your time to have a look at my question.

    I would like to make a few comments.

    First, your version of DbDataReader.ReadAsync mock has a different semantics that mine. Yours always returns an already completed Task, whereas mine always returns a Task in progress and it takes it 10ms to run to completion. In my opinion if one wants to preserve the asynchronous nature of the mocked method, then the mock should return a Task in progress. The simplest I know of is to do Task.Factory.StartNew and sleep a bit inside.

    Next, let us examine SubscribeCancel. I am trying to emulate a situation where the OnNext event handler wishes to abort the reader. Suppose the reader is expected to produce 1000 records from the database, but for some reason, the consumer of the records decides that it no longer wishes to read any more data. The point is that it wishes to instruct the reader to abort the reading. The suggestion to complete the task right after cancelling the token implies that the respective TokenCompletionSource will have to be propagated to all the relevant parties along with the CancellationTokenSource. That makes it a cumbersome pattern. There must be a better way. 

    The same goes for SubscribeThrow. Code may throw and I have to deal with it. When an event handler throws an exception the thread does not deadlock. In my example SubscribeThrow deadlocks.

    The title of the post is the way it is because my ultimate goal is to expose a DB data reader as IObservable<SomeDTO>, because this allows me to expose the objects associated with the reader asynchronously and lazy, letting the caller to decide how (s)he wishes to materialize them.

    The unit tests were supposed to try out different use cases given such an observable. An important constraint, is that the server code is mostly synchronous. Introducing asynchronous IO is a gradual process, so an asynchronous code will have to coexist with synchronous one, meaning we are likely to wait at one point or another (in order to proceed synchronously).

    Wednesday, June 04, 2014 3:19 AM
  • Re SubscribeCancel, it is sort of ok to dispose of a Subscription in an onNext Handler, but the standard way would be to use Rx operators like Take/TakeUntil/Timeout etc... Either way the reason your version of the test "deadlocks" is because have explicitly coded it to never finish. I am not sure how else to say this, but you dispose of the subscription before it completes. You put your unit test synchronization constructs in your OnError/OnCompleted handlers. These will never get called. So why would this test ever work? This is not a deadlock, this a case of a task that blocks and is never unblocked. 

    Again, SubscribeThrow is not a deadlock, it is both a combination of a misunderstanding of Rx and a Blocked task that is never unblocked. 

    My question to you for both of these examples, is why do you think that these tests would actually run, i.e. what is the line of code that should signal the TaskCompletionSource so that the tcs.Task.Wait() would be unblocked?

    Once we get that sorted, then i think we can move to solving the real problem, but currently there are some misunderstandings that need to be overcome.


    Lee Campbell http://LeeCampbell.blogspot.com

    Wednesday, June 04, 2014 6:47 AM
  • It makes sense now that my unit test code, which is supposed to emulate real usage of the data reader observable in a synchronous environment is wrong.

    How would you write SubscribeCancel, then?

    The semantics I am looking for is this:

    1. Someone gets the observable and starts consuming the items.
    2. At some point that someone decides that the reader should abort, no need to read anything more from the DB.
    3. At the end the code is invoked by some synchronous code at the upper level, which will have to wait for the completion of the data consumption.

    Ideally, if our server was asynchronous all the way through, no code would wait - the task would make it way through to the WCF infrastructure, but right now it is synchronous.

    Wednesday, June 04, 2014 1:30 PM
  • This does become a difficult balancing act.

    One of the key elements here is how do you decide to abort/cancel the reading from the DB?

    The other is; why the psuedo async then blocking? Surely you are better off just going IEnumerable<T> all the way. In my experience, you need to be either all-in on async or not bother. Probably not what you want to hear however.

    To answer the first question, I will take a guess and assume there are three reasons you might want to abort:

    1. You only want a fixed number of items
    2. You only want items until a condition is met
    3. Too much time has elapsed
    4. The user has cancelled the action.

    All of these ultimately just boil down to different ways of calling Dispose on the underlying Subscription. You can do so with each of these operators;

    1. Take
    2. TakeUntil
    3. Timeout or TakeUntil(Observable.Timer())
    4. Just dispose of the subscription, or as you are favouring all TPL fluff, cancel your token.

    How do you now block until one of these or an OnError/OnComplete happen? Well that to me really does strongly suggest "Dont use Rx", but if we did want to go down this path we could look at perhaps the Observable.Using operator.

    This works, but ....it not nice

    var reader = CreateDataReader();
    //var tcs = new TaskCompletionSource<object>();
    var tcs = new TaskCompletionSource<Unit>();
    var i = 0;
    
    Observable.Using(
        ()=>Disposable.Create(()=>tcs.SetResult(Unit.Default)),
        _=>GetObservable(reader).Take(5)
    ).Subscribe(n =>i++);
    
    tcs.Task.Wait(TimeSpan.FromMilliseconds(1000));
    
    Assert.AreEqual(5, i);

    You can find out more on Using at IntroToRx.com

    http://introtorx.com/Content/v1.0.10621.0/03_LifetimeManagement.html

    and

    http://introtorx.com/Content/v1.0.10621.0/11_AdvancedErrorHandling.html#Using


    Lee Campbell http://LeeCampbell.blogspot.com

    Wednesday, June 04, 2014 2:43 PM
  • Thank you for this wealth of information. I will take my time to examine it closely.

    I understand your sentiment about mixing asynchronous and synchronous code. But what path would you suggest to educate the people and move them to go asynchronous ways? We are not talking about a from scratch project. The application already exists and is constantly evolving. I do not see but the gradual adoption of the asynchronous approach until at the end it becomes the preferred way of doing things.


    About using Rx. The only reason for using it is because I do not know how to read a collection of objects from DB both asynchronously and without materializing it all at once. IObservable<T> seems to be a perfect fit. If there is another way - I am eager to learn about it.

    About TPL. I use Task, because it is easy to translate Observable to task and Wait on the latter. Remember, I must wait at the end, at least as long as our server is not full asynchronous and it will never be if we do not start in the first place. By all means, if you see a better way to block waiting for the Rx work to be done - please, share. 

    Much appreciated.

    Friday, June 06, 2014 1:41 PM