Create observable from seperate Action<t>


  • Hi,

    I'm trying to create an observable from a pubnub subscription.

    With pubnub you can subscribe to a real time stream and you have to supply three callbacks: One for new data that is received, one for if an error occurred and one for if the connectionstate changes. I have the following code (writelines are just to make it compile):

                var pubnub = new Pubnub("demo", "xxxxxxxxxxxxxxxxxxxxxxxxxxxx");
                Action<dynamic> userCallback = o => Console.Out.WriteLine("Data received: {0}", o);
                Action<dynamic> connectCallback = o => Console.Out.WriteLine("Connection changed: {0}", o);
                Action<dynamic> errorCallback = o => Console.Out.WriteLine("Error: {0}", o);
                pubnub.Subscribe("057bdc6b-9f9c-44e4-bc1a-363e4443ce87", userCallback, connectCallback, errorCallback);   
                IObservable<dynamic> obs = .................

    Now I want to create an observable that ties the userCallback to onNext and errorCallback to onError. Can someone give me an indication how to do this. I now have done it with a custom class that implements IObservable<T> but I don’t believe that this is best practice?!

    Kind regards,


    Wednesday, December 18, 2013 8:11 PM

All replies

  • This is a fairly common pattern that Rx solves particularly well.

    You want to wrap your service subscription in an Observable.Create, which can then turn the various callbacks into Observable notifications:

    var stream = Observable.Create<dynamic>(
    	obs => {
    		var pubnub = new Pubnub("demo", "xxxxxxxxxxxxxxxxxxxxxxxxxxxx");
    			o => {
    				Console.Out.WriteLine("Data received: {0}", o);
    			o => Console.Out.WriteLine("Connection changed: {0}", o), 
    			ex => {
    				Console.Out.WriteLine("Error: {0}", o);
    		return () => pubnub.Unsubscribe(...);

    This also means you can also explicitly unsubscribe from the underlying service when your stream completes (I'm assuming this will be required).

    Friday, January 31, 2014 9:46 AM