Answered by:
RX Scan not threadsafe?

Question
-
If I use an RX Scan and OnNext into it from 1 thread (see the following example in F#, purely for concideness of code)
open System open System.Reactive open System.Reactive.Linq open System.Reactive.Subjects let subject = new Subject<_>() let accumulator = subject.Scan([], fun acc value-> acc @ [value]) let replay = accumulator.Replay 1 let connect = replay.Connect() for i in 0..100 do subject.OnNext i // This works as expected //Expect 100 let result = replay.Subscribe(fun result -> printf "%d" (result|>List.length)) Console.ReadLine()|>ignore
In the subscribe I expect to see 100 - and I do.
If I use Onnext from multiple threads -
open System open System.Reactive open System.Reactive.Linq open System.Reactive.Subjects let subject = new Subject<_>() let accumulator = subject.Scan([], fun acc value-> acc @ [value]) let replay = accumulator.Replay 1 let connect = replay.Connect() let xxxxx = [|for i in 0..99 -> async {subject.OnNext i}|] |>Async.Parallel |>Async.RunSynchronously //Expect 100 let result = replay.Subscribe(fun result -> printf "%d" (result|>List.length)) Console.ReadLine()|>ignore
I dont get 100. The accumulator doesnt work when posted into from multiple threads.is this a bug?
Monday, September 19, 2011 8:19 AM
Answers
-
Hi,
The change is mentioned in the release notes for the first "stable" release, which is one before the first "official" stable release.
- Dave
http://davesexton.com/blog- Edited by Dave Sexton Tuesday, September 20, 2011 7:36 AM Updated link
- Proposed as answer by James Miles Friday, September 23, 2011 5:50 AM
- Marked as answer by Bart De Smet [MSFT] Sunday, November 13, 2011 1:46 AM
Tuesday, September 20, 2011 7:35 AM
All replies
-
My colleague has verified that this works as (I) expected with the beta (pre-release) version.Monday, September 19, 2011 2:43 PM
-
Hi,
It's not a bug. Subject<T> is a so-called "fast subject". It doesn't ensure that producers are well-behaved, so you must not arbitrarily call OnNext concurrently.
Alternatively, you can use the Synchronize method to change your subject into one that complies with the Rx serializability rule.
See §4.2 in the Rx Design Guidelines for more information about this rule.
For example (sorry I don't know F# :D):
let subject = Subject.Synchronize(new Subject<_>())
- Dave
http://davesexton.com/blog- Edited by Dave Sexton Monday, September 19, 2011 6:49 PM Formatting again
- Proposed as answer by James Miles Friday, September 23, 2011 5:50 AM
Monday, September 19, 2011 6:48 PM -
Thanks Dave. I'll give that a whirl and see how it goes. Do you happen know if this was a concious decision for change from the previous beta?Tuesday, September 20, 2011 7:29 AM
-
Hi,
The change is mentioned in the release notes for the first "stable" release, which is one before the first "official" stable release.
- Dave
http://davesexton.com/blog- Edited by Dave Sexton Tuesday, September 20, 2011 7:36 AM Updated link
- Proposed as answer by James Miles Friday, September 23, 2011 5:50 AM
- Marked as answer by Bart De Smet [MSFT] Sunday, November 13, 2011 1:46 AM
Tuesday, September 20, 2011 7:35 AM -
sorry I don't know F# :D
Friday, October 14, 2011 6:56 PM