Scenario that uses "SubscribeOn"


  • Hi,

    Could you please provide scenarios where we would need to use "SubscribeOn" ?

    It seems that "SubscribeOn" is used only to specify the thread that should build up the subscription mechanism between the Observable and the Observer, but I am wondering why do that matter for us as Rx consumer.



    Saturday, October 22, 2011 1:39 PM


  • Hi Riana,

    Use of SubscribeOn is certainly rare (personally, I can't recall ever using it).  In most cases an observable's Subscribe behavior is exactly what you need, or at least the operator that generates the observable has an IScheduler parameter so that it can be controlled (See §5.4 in the Rx Design Guidelines).

    Though there are times when an observable behaves incorrectly, either because it wasn't designed properly (and you don't have the code so you can't fix it) or because it's being shared by multiple APIs in your code and its Subscribe behavior is actually correct for most of those cases, but not all.  You could solve this problem by creating multiple observable sequences with the correct Subscribe behaviors, but perhaps sometimes applying the DRY principal is easier done by using the same core observable and adding SubscribeOn where it's needed.

    Here are two cases where SubscribeOn is useful:

    • Making Subscribe asynchronous.

      Use it to fix a long-running observable that isn't asynchronous when calling Subscribe.  (Although, this would seem like a bad design to begin with.  If it's your code, fix it at the source.)
    • Requiring thread affinity on Subscribe, but not for observations.

      Use it when your code is executed on an arbitrary thread (e.g., thread pool) as a side effect of calling Subscribe, yet it must have thread affinity independent of observations.  For example, you have an observable that calls a user-supplied callback function when Subscribe is called, yet your particular callback must run on the UI thread because it requires access to a control.  Though you still want observations to be scheduled normally.

      According to §5.4 in the Rx Design Guidelines, all concurrency-introducing operators should have an overload with an IScheduler parameter;  however, presumably an operator can use that parameter to schedule observations and not subscriptions, if justified by the operator's semantics, although I don't know of any native Rx operators with this behavior.  Typically a scheduler is used to schedule the call to Subscribe and all observations, but you may have custom operators or factory methods that only use the scheduler to schedule observations, calling a user-supplied callback function on the thread on which Subscribe is called.

      For example, consider the following contrived method on a custom class:
      IObservable<int> Foo(Func<int> getNumber, IScheduler scheduler)
      	return Observable.Defer(() =>
      		var n = getNumber();
      		return Observable.Range(0, Math.Max(0, n), scheduler);

      The semantics of Foo are clear.  Return a cold observable that for each call to Subscribe will call getNumber and generate a range of values on the specified scheduler.

      Now consider calling Foo like this:
      var xs = Foo(() => int.Parse(maxTextBox.Text), Scheduler.ThreadPool);

      There's no problem if you call xs.Subscribe() on the UI thread, but on any other thread an exception will be thrown because the call to getNumber always occurs on the thread on which Subscribe is called.  (Also note that the specified scheduler is only used to schedule the generated sequence, not the call to getNumber.)

      If xs is being passed to a UI-agnostic method in your business layer, then perhaps it's best to add SubscribeOn first to ensure that the business layer can call Subscribe on any thread that it chooses.

    - Dave
    • Edited by Dave Sexton Saturday, October 22, 2011 9:36 PM Spelling error and formatting
    • Marked as answer by Riana Rambonimanana Tuesday, October 25, 2011 8:36 AM
    Saturday, October 22, 2011 9:31 PM