Multicast Subscribe Pattern
-
Tuesday, November 20, 2012 9:18 AM
Hi,
I'm trying to come up with a pattern to create a multicast style observable and at the same time make another subscription (non rx) to the underlying datasource. Below is some sample code to illustrate what I'm doing. My idea was to use StartsWith to return any initial state and also kick off a subscription to the underlying datasource if one is not already active.
The problem is the subscription to the underlying datasource can start producing values before the subscription is returned so a number of values could be missed. I know I could probably use a ReplaySubject to ensure no values are missed but I don't want to cache all the values.
Here is a unit test to illustrate the "problem"
private readonly Subject<long> _longSubject = new Subject<long>(); [TestMethod] public void TestStartsWithUsingSubject() { var waitForDataEvent = new CountdownEvent(1); var sequence = _longSubject.AsObservable() .Publish() .RefCount() .StartWith(GenerateStartValues()); Console.WriteLine("Got Subscription"); var seqSub = sequence.Subscribe(x => Console.WriteLine("Received {0}", x)); waitForDataEvent.Wait(TimeSpan.FromSeconds(2)); Console.WriteLine("Finsihed"); seqSub.Dispose(); } public long[] GenerateStartValues() { KickOffAsycnOneTimeOnlySubscription(); Console.WriteLine("Generating Start Values"); return new long[]{-1, -2, -3}; } private bool _isSubscribed; private void KickOffAsycnOneTimeOnlySubscription() { if(!_isSubscribed) { _isSubscribed = true; Task.Factory.StartNew(() => { //some data returned from somewhere //Thread.Sleep(500); _longSubject.OnNext(100); _longSubject.OnNext(99); _longSubject.OnNext(98); }); } }The output is:-
Generating Start Values Got Subscription Received -1 Received -2 Received -3 Finsihed
Thanks in advance for any help
All Replies
-
Tuesday, November 20, 2012 2:41 PM
Hi,
First, convert your data source into a cold observable. For example:
var dataSource = Observable.Create<long>( observer => { // Push any starting data before subscribing. observer.OnNext(-1); observer.OnNext(-2); observer.OnNext(-3); // Subscribe to your source here and convert all // notifications into calls to observer.OnNext(val). // If it fails, call observer.OnError(exception). // If it completes, call observer.OnCompleted(). });If you need to share the subscription, then apply the Publish operator:
var shared = dataSource.Publish(); shared.Subscribe(...); shared.Subscribe(...); shared.Subscribe(...); shared.Connect();
- Dave
- Marked As Answer by TomacN Tuesday, November 20, 2012 4:40 PM
-
Tuesday, November 20, 2012 4:41 PM
Yes of course! Thanks Dave.
-
Wednesday, November 28, 2012 4:53 PM
Subjects are baaaad, mmmmkay
;-)
Lee Campbell http://LeeCampbell.blogspot.com

