Moving observable subscription from UI to domain tier - Where/How to observe/subscribe?
-
יום רביעי 25 יולי 2012 10:10
Hi
I have a griddataview (gridRaces below) sitting on a winform which gets populated - successfully - from an observable via the following
Data.Subscriptions(). TodaysRaces(). SubscribeOn(SynchronizationContext.Current). Subscribe(races => { gridRaces.DataSource = new BindingList<RaceOverview>(races.ToList()); });At the moment it is all working but I am doing too much in the UI tier. The RaceOverview is really a DTO. Really, I am wanting to retrieve and transform this DTO into a domain object (RaceSummary) in the domain tier rather than handling it all in the UI tier. (The underlying data is coming from a webservice).
Ideally I'd like to expose the domain objects both synchronously and as an observable.
a) What is the best way to force the observable to complete/block so that the data can be retrieved from the domain tier synchronously? (I am currently using OnCompleted - though when running from the UI this never seems to get hit - though it does from within MSTest - so I'm guessing I should be observing or subscribing on a specific thread?)
b) What are the threading issues when moving the data retrieval into the domain tier. I understand I'll need to SubscribeOn to the current sync context when referencing the subscription to the domain tier in the UI tier (as in the code sample above), but I'm not sure of any other ObserveOn/SubscribeOn issues which may be required in the domain tier in this scenario.
c) Further I wondered if there would be different implications wrt threading and when trying to access the data synchronously.
Sorry if this is a bit general. Please ask for clarification if there are things which aren't intelligible from what I've written.
Thx in advance
S
PS Here is when I am up to wrt implementating the stuff in the domain tier
public IObservable<IEnumerable<RaceOverview>> TodaysRacesDTO() { return (Observable.Using<IEnumerable<RaceOverview>, WSGateway>( () => NewConnection(), (c) => c.TodaysRacesSubscription())); <=== I have a test which proves this works } public IEnumerable<RaceSummary> TodaysRacesSync() { var completed = false; var races = Enumerable.Empty<RaceSummary>(); var rss = TodaysRacesDTO().Subscribe( rs => { races = from r in rs select new RaceSummary(r); }, () => { completed = true; }); <==== this is never getting hit when run from UI but does from MSTEST while (!completed) { } return races; }
public IObservable<IEnumerable<RaceSummary>> TodaysRacesSummary()
{
return TodaysRacesDTO().SelectMany(rs =>
{
var races = (from r in rs
select new RaceSummary(r));
return races;
});; best attempt but doesn't work ...unsure how to transform this to the required return type
; error Cannot implicitly convert type 'System.IObservable<RaceSummary>' to 'System.IObservable<System.Collections.Generic.IEnumerable<RaceSummary>>'.
;An explicit conversion exists (are you missing a cast?)
}
- נערך על-ידי Simon Woods יום רביעי 25 יולי 2012 10:10 spelling
- נערך על-ידי Simon Woods יום רביעי 25 יולי 2012 10:11 Clarification
- נערך על-ידי Simon Woods יום רביעי 25 יולי 2012 10:12 Clarification
- נערך על-ידי Simon Woods יום רביעי 25 יולי 2012 10:13 Clarification
- נערך על-ידי Simon Woods יום רביעי 25 יולי 2012 10:14 Clarification
- נערך על-ידי Simon Woods יום רביעי 25 יולי 2012 10:28 More details
- נערך על-ידי Simon Woods יום רביעי 25 יולי 2012 10:35 More details
- נערך על-ידי Simon Woods יום רביעי 25 יולי 2012 10:37 More details
- נערך על-ידי Simon Woods יום רביעי 25 יולי 2012 10:38 More details
- נערך על-ידי Simon Woods יום רביעי 25 יולי 2012 10:40 Clarification
- נערך על-ידי Simon Woods יום רביעי 25 יולי 2012 10:40 Spelling
- נערך על-ידי Simon Woods יום רביעי 25 יולי 2012 10:42 Change heading
- נערך על-ידי Simon Woods יום רביעי 25 יולי 2012 11:51 more details
- נערך על-ידי Simon Woods יום רביעי 25 יולי 2012 11:56 Clarification
- נערך על-ידי Simon Woods יום רביעי 25 יולי 2012 12:15 More info
כל התגובות
-
יום רביעי 25 יולי 2012 12:26
Simon,
First of all I'd say that if you need to block on an async call, that's a code smell that you should probably change to be non-blocking.
You should also perhaps consider returning Task<T> from your domain tier rather than IObservable<T>, simply because it communicates that you're only expecting a single value rather than a stream of values. See the notes and example in Lee's book in this regard.
If, you do specifically want to return an IObservable<T>, your service class will still only need the one method:
public IObservable<IEnumerable<RaceSummary>> TodaysRacesDTO() { return Observable.Using<IEnumerable<RaceOverview>, WSGateway>( () => NewConnection(), (c) => c.TodaysRacesSubscription()) .SubscribeOn(Scheduler.TaskPool) .Take(1) .Select(r => r.Select(x => new RaceSummary(r))); }
This will map the result of the WS call to an IEnumerable<RaceSummary>, which is returned on the Observable stream. I've added a Take(1) underlying call so it guarantees completion after one result, regardless of what TodaysRacesSubscription() returns.
For a blocking subscription to an Observable, you don't need your TodaysRacesSync method. Just call .First()/.Single()/.Last() on your TodaysRacesSummary() method. It's by no means ideal, and will result in deadlocks if a value never arrives, but it will certainly block until that observable receives a value. Once again, see Lee's book.
Make sure you consider carefully where you use ObserveOn and SubscribeOn; frivolous use leads to excessive context switching and performance will degrade as a result, despite the opposite intention. In the UI tier, you should only need to use ObserveOn, and only if/when you want to schedule back onto the Dispatcher. Don't use it in the domain tier - you should call it at the last possible moment. Use SubscribeOn once in your domain tier, when making the call to the underlying method - and only if the method isn't already run async.
Hope this helps.
Marcus.
- הוצע כתשובה על-ידי Marcus Whitworth יום חמישי 26 יולי 2012 08:44
-
יום רביעי 25 יולי 2012 15:13
This is very helpful. Thx Marcus
Can I just pick your brain a bit more about the Task idea as I can't quite understand the interaction between the Task and the IObservable wrt GC?
So I've copied your code thus
private IObservable<IEnumerable<RaceSummary>> TodaysRacesSummary() { return Observable.Using<IEnumerable<RaceOverview>, WSGateway>( () => NewConnection(), (c) => c.TodaysRacesSubscription()) .SubscribeOn(Scheduler.TaskPool) .Take(1) .Select(rs => rs.Select(r => new RaceSummary(r))); }I find that if I execute this code from the UI, passing the IEnumerable<RaceSummary> to a usercontrol,
ucRaces.Populate(Data.Subscriptions().TodaysRacesSummary().First();
I can watch Fiddler make 3 calls to the WS, one to logon one to get the data and one to logoff which should happen as the observable is disposed.
I have wrapped up the observable with a Task
public Task<IEnumerable<RaceSummary>> TodaysRacesSummaryTask() { return TodaysRacesSummary().ToTask<IEnumerable<RaceSummary>>(); }If I then execute this
var racesTask = Data.Subscriptions().TodaysRacesSummaryTask(); ucRaces.Populate(racesTask.Result); racesTask.Dispose();I see fiddler only executing 2 calls and the logout call is never made - which I was anticipating when disposing the Task. It looks like the Task doesn't dispose of the Observable and when I try and run it a second time, there are locks in place.
I can only assume this isn't quite what you had in mind?
Thx again
S
- נערך על-ידי Simon Woods יום רביעי 25 יולי 2012 15:27 More questions
-
יום רביעי 25 יולי 2012 15:39
I agree with Marcus (I have to, he is bigger than me and sits next to me).
It seems like you are forcing a pattern here (maybe not). Anyway, if you have existing code that is synchronous like
private IEnumerable<string> GetStuff() { //... }You can easily shove this into Rx land by just wrapping an Observable.Start around it
Observable.Start(()=>GetStuff()) .ObserveOnDispatcher() .Subscribe(stuff=> { foreach(item in stuff) { vm.ObsCollection.Add(item); } });Now you have your synchronous version, but it is easy to make Async with Rx.
Happy days.
PS If you are thinking of Task & Rx together, I would go the other way around to what you have. Instead of trying to make Rx look like TPL, take a Task and whack .ToObservable() on it. Much easier.
Lee Campbell http://LeeCampbell.blogspot.com
-
יום רביעי 25 יולי 2012 16:56
Lee ... thx for your reply.
I don't know if you could elaborate when you say "It seems like you may be forcing a pattern ..." - I may well be!
Fundamentally the data is coming in asynchronously from a web service - I'm using rx effectively to poll the ws thereby generating a stream of data which having passed thru and been transformed in the domain/service tier, then updates the UI.
I think where I may be going wrong is trying to get this data directly into a usercontrol than via a viewmodel or at least using some data binding. Consequently I'm having to block things to get it working - which I think was the smell that Marcus originally highlighted ... but the penny has just dropped for me (unless it hasn't and I'm still not seeing it right!)
Appreciate any thoughts/advice!
Thx very much.
S
- נערך על-ידי Simon Woods יום רביעי 25 יולי 2012 17:07 missing word!
-
יום רביעי 25 יולי 2012 18:47
Ok, so can we assume that in your call to the underlying service you're polling using Observable.Interval or similar? In which case we are dealing with a continuing IObservable<IEnumerable<T>> stream, rather than just a stream which returns a single value (which is what we were assuming).
I don't understand why you need to block the call to update your viewmodel. You should be able to just use something like the following:
Data.Subscriptions() .TodaysRacesSummary() .ObserveOnDispatcher() .Subscribe(x => ucRaces.Populate(x));
This will then populate your vm asynchronously, on the Dispatcher, whenever the WS returns a value and pushes it onto your IObservable stream.
- הוצע כתשובה על-ידי Marcus Whitworth יום חמישי 26 יולי 2012 08:44
-
יום רביעי 25 יולי 2012 18:55
D'oh! I really don't need to block the call at all!
Wrt to Observer.Interval ... yes ... sort of. In fact Lee suggested a slightly different way here
Excellent.
Thx very much both you and Lee.
S
- נערך על-ידי Simon Woods יום רביעי 25 יולי 2012 18:58 More info