locked
RX Scan not threadsafe? RRS feed

  • Question

  • If I use an RX Scan and OnNext into it from 1 thread (see the following example in F#, purely for concideness of code)

    open System
    open System.Reactive
    open System.Reactive.Linq
    open System.Reactive.Subjects
    
    let subject = new Subject<_>()
    
    let accumulator = subject.Scan([], fun acc value-> acc @ [value])
    let replay = accumulator.Replay 1
    let connect = replay.Connect()
    
    for i in 0..100 do subject.OnNext i // This works as expected
    
    //Expect 100
    let result = replay.Subscribe(fun result -> printf "%d" (result|>List.length))
    
    Console.ReadLine()|>ignore

    In the subscribe I expect to see 100 - and I do.

     

    If I use Onnext from multiple threads - 

    open System
    open System.Reactive
    open System.Reactive.Linq
    open System.Reactive.Subjects
    
    let subject = new Subject<_>()
    
    let accumulator = subject.Scan([], fun acc value-> acc @ [value])
    let replay = accumulator.Replay 1
    let connect = replay.Connect()
    
    let xxxxx  = [|for i in 0..99 -> async {subject.OnNext i}|]
                  |>Async.Parallel
                  |>Async.RunSynchronously
    
    //Expect 100
    let result = replay.Subscribe(fun result -> printf "%d" (result|>List.length))
    
    Console.ReadLine()|>ignore


    I dont get 100. The accumulator doesnt work when posted into from multiple threads. 

     

    is this a bug? 

    Monday, September 19, 2011 8:19 AM

Answers

All replies