Tracking added/removed keys for a 2nd subscription
-
Friday, January 18, 2013 10:03 PM
I'm finding it tricky doing this neatly - looking for some ideas I might not have thought of.
A 'batch', has a collection of 'Operations' in it. 'Operations' can be added/removed from a 'batch'. 'Operations' have progress.
I have a UI where I want a user to be able to enter in a 'Batch' ID, and subscribe to a list of 'Operations' for that batch from a server, as well as show progress for those batches. The first call to the server looks like this:
public IObservable<BatchContentChangeEventArgs> GetOperationsForBatch(int batchId)
Where:
class BatchContentChangeEventArgs { //Operations Added since last publication public IEnumerable<string> AddedOpIds; //And those removed public IEnumerable<string> RemovedOpIds; }Example results will be something like:
// The first published item always has all the current batches in its 'Added'. The client will need to subscribe to the progress of each Added item.
(Initial image) - AddedOpIds { "A", "B", "C" }, Removed {}
(Update 1) - AddedOpIds {"D"}, Removed { "C"} // The app would subscribe to progress of the new 'D', but stop listening to the progress of 'C'.
When I say 'subscribe to progress', I am talking about a new server call which might look like:
public IObservable<OpProgress> GetOperationProgress(string opId)
I'm having difficulty writing a 'nice' Rx/Linq query to do this and was wondering if there was a better way than what I have already done.
To list the requirements, I need my software to:
- For a given batch Id, subscribe to an observable stream of 'BatchContentChangeEventArgs' (for as long as the user is viewing that batch id)
- For each OpId in 'AddedOpIds', I want to subscribe to the observable stream of operation progress.
- If another BatchContentChangeEventArgs arrives with Ids I have previously subscribed to in 'RemovedOpIds', I should no longer subscribe to their progress.
- If another batch is selected, I should unsubscribe from all previous Progress requests, as well as the 'GetOperationsForBatch' query for that batch.
- I also want to be able to notify something 'outside' of the stream to when an item is Added/Removed to/from the current batch, so I can do stuff on the UI (but still keep the inner subscriptions and state neatly encapsulated).
I have managed a solution which I'll post below, but it involves using a Subject... (2 in fact), so I figure there is a better way.
void Main() { //Unhappy that I am passing subjects in as IObserver's var addedItemsObserver = new Subject<IEnumerable<string>>(); var removedItemsObserver = new Subject<IEnumerable<string>>(); var query = from batchId in newBatchSelected //observable constructed from ui stream (not important) let stream = GetOperationsForBatch(batchId) //IObservable<BatchContentChangeEventArgs> select from progress in GetUpdatesForAddedKeys(stream, addedItemsObserver, removedItemsObserver) select progress; query.Switch().Subscribe(batchItemProgress => { //Could update rows on a grid on the UI here, being certain that the row exists }); addedItemsObserver.Subscribe(operationAdded => { //Could add rows from a grid on the UI here, being certain that I wont get an update on it until I have }); removedItemsObserver.Subscribe(operationRemoved => { //Could remove rows from a grid here, knowing that I wont get any further updates on it }); } IObservable<OpProgress> GetUpdatesForAddedKeys( IObservable<BatchContentChangeEventArgs> changeEventArgsStream, IObserver<IEnumerable<string>> addedObserver, IObserver<IEnumerable<string>> removedObserver ) { return Observable.Create<OpProgress>(o => { //Maintain a collection of our subscriptions to indivudual progress updates var progressSubscriptions = new Dictionary<string, IDisposable>(); //Subscribe to updates of our ChangeEventArgs return changeEventArgsStream.Subscribe(updatedArgs => { if(updatedArgs.AddedOpIds.Any()) { //Do this first so our UI can set up any new rows before we subscribe to updates on it addedObserver.OnNext(updatedArgs.AddedOpIds); //Subscribe to state of added items. Our UI will get these via the this observable. updatedArgs.AddedOpIds.ForEach(add => { if(!progressSubscriptions.ContainsKey(add)) progressSubscriptions.Add(add, GetOperationProgress(add).Subscribe(o)); }); } if(updatedArgs.RemovedOpIds.Any()) { //First we need to unsubscribe to the states, before telling the UI //via those subjects (opposite to above) updatedArgs.RemovedOpIds.ForEach(remove => { IDisposable existingSub; if(progressSubscriptions.TryGetValue(remove, out existingSub)) { //Unsubscribe from our operation progress, and remove it from the tracker existingSub.Dispose(); progressSubscriptions.Remove(remove); } }); removedObserver.OnNext(updatedArgs.RemovedOpIds); } }); }); }Just looking for some other ways of thinking about the problem. I'll keep playing around in the meanwhile!
Thanks.
All Replies
-
Saturday, January 19, 2013 12:56 AM
Hi,
You could Publish the sequence and create separate observables. In practice, the order of subscriptions created on the UI thread will correspond to the order in which the UI is updated; however, it's not recommended to rely on this behavior in general.
Edit: I missed the requirement to cancel progress notifications when a batch changes, so I've just updated TakeUntil to react to batch changes as well as removed operations. I'm assuming that newBatchSelected is already hot.
var operationChanges = newBatchSelected .Select(GetOperationsForBatch) .Switch() .Publish(); var addedOperations = operationChanges.SelectMany(change => change.AddedOpIds); var removedOperations = operationChanges.SelectMany(change => change.RemovedOpIds); var operationsProgress = from added in addedOperations let whenRemoved = removedOperations.Where(id => id == added) from progress in GetOperationProgress(added).TakeUntil(whenRemoved.Select(_ => -1).Amb(newBatchSelected)) select progress; addedOperations.ObserveOnDispatcher().Subscribe(operation => ...); operationsProgress.ObserveOnDispatcher().Subscribe(progress => ...); removedOperations.ObserveOnDispatcher().Subscribe(operation => ...); operationChanges.Connect();
- Dave
- Edited by Dave Sexton Saturday, January 19, 2013 1:04 AM Added a missing requirement.
- Edited by Dave Sexton Saturday, January 19, 2013 1:07 AM Added a missing requirement.
- Marked As Answer by h_andr Saturday, January 19, 2013 6:44 PM
-
Saturday, January 19, 2013 1:05 AM
Hi,
I just updated my original reply to meet a requirement that I had missed.
- Dave
-
Saturday, January 19, 2013 5:52 PM
Hi Dave - thanks very much, I'll try this soon. I tried myself to use Publish, and was also aware of the TakeUntil option etc, but couldn't piece it together as neatly as you have. I didn't think to create a separate 'whenRemoved' stream for each key in order to make the 'TakeUntil' work (I was trying to use a single stream of removals - and it didn't work). The use of the Amb is also clever, as I couldn't work out how to, when 'published', get the subscriptions to progress to end after a new batch was selected (as they weren't really covered as part of the original 'switch'. I played around with these ideas but didn't post them as they never quite worked out.
I forgot to be explicit about the fact that Operations can be re-added back into a batch after they have been removed, but looking at the code, I expect that will still work.
Thanks - you make everything look so easy and obvious afterwards. :)
[Edit... some time later]
Works beautifully. Thank you.

