none
Why is the OnError callback never called when throwing from the given subscriber?

    Question

  • Please, observer the following unit test:

    using System;
    using System.Reactive.Linq;
    using System.Threading;
    using System.Threading.Tasks;
    using Microsoft.VisualStudio.TestTools.UnitTesting;
    
    namespace UnitTests
    {
        [TestClass]
        public class TestRx
        {
            public const int UNIT_TEST_TIMEOUT = 5000;
    
            private static IObservable<int> GetObservable(int count = 100, int msWait = 10)
            {
                return Observable.Create<int>(async (obs, cancellationToken) =>
                {
                    for (int i = 0; i < count && !cancellationToken.IsCancellationRequested; ++i)
                    {
                        int value = i;
                        obs.OnNext(await Task.Factory.StartNew(() =>
                        {
                            Thread.Sleep(msWait);
                            return value;
                        }));
                    }
                });
            }
    
            [TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
            public void Subscribe()
            {
                var tcs = new TaskCompletionSource<object>();
                int i = 0;
                GetObservable().Subscribe(n =>
                {
                    Assert.AreEqual(i, n);
                    ++i;
                }, e => Assert.Fail(), () =>
                {
                    Assert.AreEqual(100, i);
                    tcs.TrySetResult(null);
                });
    
                tcs.Task.Wait();
            }
    
            [TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
            public void SubscribeCancel()
            {
                var tcs = new TaskCompletionSource<object>();
                var cts = new CancellationTokenSource();
                int i = 0;
                GetObservable().Subscribe(n =>
                {
                    Assert.AreEqual(i, n);
                    ++i;
                    if (i == 5)
                    {
                        cts.Cancel();
                    }
                }, e =>
                {
                    Assert.IsTrue(i < 100);
                    tcs.TrySetResult(null);
                }, () =>
                {
                    Assert.IsTrue(i < 100);
                    tcs.TrySetResult(null);
                }, cts.Token);
    
                tcs.Task.Wait();
            }
    
            [TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
            public void SubscribeThrow()
            {
                var tcs = new TaskCompletionSource<object>();
                int i = 0;
                GetObservable().Subscribe(n =>
                {
                    Assert.AreEqual(i, n);
                    ++i;
                    if (i == 5)
                    {
                        throw new Exception("xo-xo");
                    }
                }, e =>
                {
                    Assert.AreEqual("xo-xo", e.Message);
                    tcs.TrySetResult(null);
                }, Assert.Fail);
    
                tcs.Task.Wait();
            }
        }
    }

    The unit tests SubscribeCancel and SubscribeThrow time out, because the OnError callback is never called and thus the waiting on the task never ends.

    What is wrong?

    P.S.

    https://rx.codeplex.com/workitem/74

    Monday, May 26, 2014 1:49 PM

All replies

    • Edited by Dave Sexton Tuesday, May 27, 2014 12:00 AM Added Rx Design Guidelines link
    Monday, May 26, 2014 11:39 PM
  • No problem. Please, use the following method instead of the original one:

    private static IObservable<int> GetObservable(int count = 100, int msWait = 10)
    {
        return Observable.Create<int>(async (obs, cancellationToken) =>
        {
            try
            {
                for (int i = 0; i < count && !cancellationToken.IsCancellationRequested; ++i)
                {
                    int value = i;
                    obs.OnNext(await Task.Factory.StartNew(() =>
                    {
                        Thread.Sleep(msWait);
                        return value;
                    }));
                }
                obs.OnCompleted();
            }
            catch (Exception exc)
            {
                obs.OnError(exc);
            }
        });
    }

    Is it now compliant with §6.5 - "Subscribe implementations should not throw" ?

    However, the result is still the same - SubscribeCancel and SubscribeThrow time out, while Subscribe passes.



    Tuesday, May 27, 2014 2:22 AM
  • This is still not correct. You have just made your Observable Sequence contain error handling (for things it shouldnt know about).

    The erroneous lines of code are 

    if (i == 5)
                    {
                        throw new Exception("xo-xo");
                    }

    Maybe it would help to think of it a different way. Imagine if you were reading from a database and you parsed some data incorrectly and it threw an exception. Would you ask your DBA to fix this by putting error handliing in their StoredProcedure to capture your error? No, the DB has given you their data and then washed their hands of you.

    Rx is the same. It is a one way pipe. We can observe events coming from the source. If we throw in our onNext handler then we have failed. As another analogy, consider an observable sequence of tweets from twitter, if you errored reading a tweet should Twitter handle this? No. It has broadcast the event and it is up to you to deal with it.

    Lastly, once you introduce Concurrency to Rx (with SubscribeOn/ObserveOn or with async/tasks) it is quite unreasonable to expect a code block on another thread that has most likely moved onto different work to somehow catch an error that your OnNextHandler has produced.

    The correct place for you try/catch is in your OnNext handler. I think that is very clear by that guideline

     §6.5 - "Subscribe implementations should not throw" ?

    which your code is not compliant with.

    HTH

    Lee Campbell


    Lee Campbell http://LeeCampbell.blogspot.com


    • Edited by LeeCampbell Wednesday, June 04, 2014 7:00 AM Spelling
    Wednesday, June 04, 2014 6:58 AM