When to use Rx Subjects RRS feed

  • Question

  • Hi, 

    I have a couple of use cases that I think I'll need to create a Subject<T> for. However I'm aware that generally we shouldn't be creating subjects explicitly, instead favoring the factory methods. In these cases though I'm not sure the factory methods fit.

    Scenario 1:- Workflow Status

    I have a particular workflow that can be in a number of states, lets say InitialState, LoggingIn, LoggedIn, Disconnected, Connected, . I need to expose the current state to the consumer of my class. I could do this using a simple CurrentState property and an Action<state> to notify changes. However I think it makes a good candidate for Rx in that the consumer can subscribe, get the current state and then be notified of any changes. This would seem to point to a ReplaySubject (to always give the current state). The condition of the current state is decided from number of overridden methods which I have no control over.

    e.g. OnLogonStateReceived(State state){}, OnConnectedStateReceived(State state){}

    I could create an custom event on each of these methods and then use the FromEvent pattern but this would seem to introduce an event for no other reason.

    Are there any alternative ways to do this?

    Scenario 2:- 

    This is quite similar to the first scenario in that I have an overrriden method hook to get some messages. This time it is only one method though. e.g.

    OnSomeDataResponse(SomeData data){}

    I would like to create an Observable of these data types received so my consumer can process them.

    Any help/guidance appreciated.

    Monday, July 16, 2012 3:21 PM

All replies

  • Hi, 

    I have a couple of use cases that I think I'll need to create a Subject<T> for. However I'm aware that generally we shouldn't be creating subjects explicitly, instead favoring the factory methods. In these cases though I'm not sure the factory methods fit.

    Sure you can create Subjects explicitly, you are mixing up the recommendation to not implement IObservable explicitly, instead use Observable.Create, with the recommendation to not use Subjects within a query pipeline, which makes sense - there you want the Subject to be an IObservable, which you can easily convert a Subject to.  Subjects are fine on "end points" of your application.  Make them private/protected and expose the Subject only through a property as an IObservable, that way you can ensure that someone doesn't start feeding stuff into your Subject somewhere in your pipeline!  That's the recommendation, AFAIK.
    Monday, July 16, 2012 11:17 PM
  • I would agree with Richard that there is clear guidance that you should not implement the IObservable/IObserver interfaces yourself. Where it becomes fuzzy is when people (like me) recommend that you generally don't need to use Subjects. In most cases you can either;

    1. use Observable.Create(...) to produce an observable sequence
    2. transition from another paradigm using FromEventPattern/FromAsyncPattern/ToObservable etc...
    3. or, compose sequences together to create your desired output.

    My favorite caveat to this guidance is precisely for your example. You have suggested that you may use a ReplaySubject. I would suggest using a BehaviourSubject. This will cache the last value and requires a default value. This guarantees that consumers will always get a value (unless it OnErrors/OnCompletes). I most commonly see this on a Status Property

    public class MyModel
        private readonly ISubject<Status> _status = new BehaviorSubject<Status>(Status.InitialState);
        public IObservable<Status> Status
            get { return _status.AsObservable(); }
    public abstract class Status
        public static readonly Status InitialState = new InitialStatus();
        public static readonly Status LoggingIn = new LoggingInStatus();
        public virtual bool IsAuthenicated { get { return false; }}
        public virtual bool CanSearch { get { return false; }}

    With regards to Scenario 2: I am not too sure what the question is. If you mean that you override a method OnSomeDataResponse(SomeData data){} in a sub class, and you want to expose the data values externally via Rx...then you could do this

    private readonly ISubject<SomeData> _data = new Subject<SomeData>();
    public IObservable<SomeData> Data
        get { return _data.AsObservable() ; }
    public override void OnSomeDataResponse(SomeData data)

    But now we are back to creating Subjects. :-(

    What would be better, if you have this much control, is to update the base class to just expose the data as an observable sequence to start with! i.e. Move the 

    public IObservable<SomeData> Data

    property down into the base class.


    Lee Campbell


    Lee Campbell http://LeeCampbell.blogspot.com

    Tuesday, July 17, 2012 12:19 PM
  • Hey Richard & Lee, thanks for your feedback.

    So it seems like I'm correct to use subjects in this case. I mean there doesn't really seem to be another way and you have confirmed that.

    As for scenario2 I don't control the base class so its not possible to change to IObservable.

    Thanks for the guidance.

    Tuesday, July 17, 2012 1:12 PM
  • Richard and Lee have already provided great guidance. For completeness, a few notes right from the source:

    • If you need an addressable endpoint to fire messages into subscribed observers, the use of a Subject<T> is well justified. This is much like raising an event from inside the type defining the event. However, make sure not to expose the subject as-is to your consumers. At the very least, apply AsObservable to it.
    • If you need to share the side-effects of subscriptions by multicasting the message stream to multiple consumers, the use of subjects is also fine. However, in most cases, you don't even have to see those thanks to convenience operators like Publish, Replay, etc. All of those are really Multicast in disguise.
    • Don't try to write query operations based on subjects. It's tempting to build e.g. a merge operator by subscribing the input sequences to a common subject and returning the subject. This is a bad idea for various reasons: things get incredibly stateful, the "temperature" of the resulting sequence can be messed up, laziness can be broken, etc. Instead, resort to your best friend: Observable.Create!

    using (Microsoft.Sql.Cloud.DataProgrammability.Rx) { Signature.Emit("Bart De Smet"); }

    • Proposed as answer by LeeCampbell Monday, July 23, 2012 1:06 PM
    Tuesday, July 17, 2012 11:31 PM