none
Another discussion on Glitches and Rx.

    Question

  • Hi,

    I referred to the following discussion on Glitch prevention and Rx, however I didn't find it very conclusive: 

    https://social.msdn.microsoft.com/Forums/en-US/3eccc8b4-86c7-4656-8f51-63aaf19c3789/does-rx-prevent-glitches?forum=rx

    The vision paper on distributed reactive programming [1] defines glitches as "Temporary inconsistencies due to the update propagation/evaluation order". 

    According to the literature survey [2] on reactive programming, some reactive programming languages like Scala.React, FrTime, Flapjax etc support glitch avoidance in local (single node) processing by means of topological sorting. (None of the surveyed reactive programming languages that also support distribution provide glitch avoidance in a distributed setting) 

    If we visualize our reactive application as a directed acyclic graph(where nodes are the operators and edges represent the flow of data between these operators), then the languages that do prevent glitches execute dependent computations(downstream operators) in a topologically sorted order. Each node in the representative directed acyclic graph is assigned a height that is higher than that of any nodes that this node depends on.  The nodes are then processed in a priority queue using the heights as the priority. 

    The same survey paper [2] states that the exact semantics of Rx are unknown to comment on whether it avoids glitches even in local (single-node) processing. A more recent paper [3] states "it(Rx) does neither ensure consistency guarantees nor glitch freedom". Although this paper[3] does not go into details or substantiate this statement further. 

    If we consider the following example in.NET Rx: 

    var src1 = Observable.Interval(TimeSpan.FromSeconds(5)).Publish(); var src2= Observable.Interval(TimeSpan.FromSeconds(1)); var func1= src1.Select( i => i); var func2= src1.Select( i => i); var func3= src1.Merge(src2).Select(i => i); var zipped= func1.Zip(func2, (x,y)=>{ Console.WriteLine("zipping: {0}, {1}, {2}",x,y,x+y); return x+y; }); var result= zipped.CombineLatest(func3,(x,y) =>{ Console.WriteLine("combining latest:{0}, {1}, {2}",x,y,x+y); return x+y; }); result.Subscribe(i => Console.WriteLine(i)); src1.Connect();


    Func1 and Func2 only depend on src1, while Func3 can get updated either due to an update in src1 or src2. The result stream should get re-evaluated whenever src1 (thereby Func1/Func2/Func3) or src2(thereby only Func3) changes. 

    The output of the program is as follows: 

    zipping: 0,0,0
    combining latest: 0,3,3
    3
    
    combining latest: 0,0,0
    0
    
    combining latest: 0,4,4
    4
    
    combining latest: 0,5,5
    5
    
    combining latest: 0,6,6
    6
    
    combining latest: 0,7,7
    7
    
    combining latest: 0,8,8
    8
    
    zipping: 1,1,2
    combining latest: 2,8,10
    10
    
    combining latest: 2,1,3
    3 
    
    combining latest: 2,9,11 
    11

    From what I understand, the highlighted outputs represent temporary inconsistencies or glitches.  Since when src1 updates, the dependent Func1, Func2 and Func3 should get updated and only then the CombineLatest (which occurs further downstream in the dependency graph) should get evaluated.

    If the evaluation was taking place in topological order, then when src1 issues 0 output should be:

    Func1=0, Func2=0, Func3=0 and result=0.

    However, the result(highlighted) is 3(evaluated with updated values of Func1 and Func2, but with previous value of Func3. I.e. the zip of Func1 and Func2 gets propagated before Func3 is recomputed, triggering the evaluation of CombineLatest which should ideally occur after Func3 also gets updated). This gets corrected in the next evaluation step when the result becomes 0. 

    This happens once again when src1 issues 1 and the result at first sums up 2(Zip of Func1 and Func2) with previous value of Func3 (8) to give 10 as the result. Then in the next step, the correct sum of 3 gets output (2+1). 

    Perhaps there is a way in Rx to ensure proper order of evaluation(as per topological sorting of the dependency graph) however I am not aware of it. If not, I presume Rx does not offer glitch freedom like other reactive languages like Scala.React, Flapjax etc. (The authors of their paper state they do topological sorting to ensure correct order of change propagation). 

    Thank you for your time and inputs! 

    References: 

    [1] Towards Distributed Reactive Programming by Guido Salvaneschi et al 

    [2] A survey on Reactive Programming by Bainomugisha et al 

    [3] We have a DREAM: Distributed Reactive Programming with Consistency Guarantees by Alessandro Margara, Guido Salveneschi. 





    • Edited by kharesp Friday, January 16, 2015 10:20 PM
    Friday, January 16, 2015 10:11 PM

All replies

  • Hi,

    I think I've answered the question on Stack Overflow. Please let me know if you think that I missed the mark.

    http://stackoverflow.com/a/25779152/3970148

    - Dave


    http://davesexton.com/blog

    Sunday, January 18, 2015 10:29 AM
  • Hi Dave, 

    Thank you for your response. I read through your reply on stackoverflow and found it very helpful.  

    However, I feel that the concept of glitch-prevention is based on the premise - "whether proper order of change/update propagation" is provided by the system or not. More concretely, if a source furnishes a new update then any affected downstream operator should be scheduled for re-evaluation only after all its input dependencies which also update/re-evaluate on account of the new input, have finished updating. 

    "There is no lossless, general operator that can be defined... is that given two observables a and b with independent notifications, any operator that attempts to decide when it receives a notification from a or b whether it must push immediately or wait for the "other" value, must rely on arbitrary timings. So this hypothetical operator must either drop values (e.g., DistinctUntilChanged or Throttle), or drop time (e.g., Zip or Buffer)". 

    I understand the above statement, but these operators are there for implementing different application-level semantics. And using these operators I define what constitutes my operator's "Notification Unit" like you have explained. For example, if I want my operator to re-evaluate when one of its sources updates, I can use ComibneLatest; or if I want my operator to wait for an update on all its input sources I can use Zip.

    But it is up to the run-time system to ensure that when in fact my operator executes (irrespective of whether it will drop values or drop time) it does not see any stale values.   

    So, in the example above: When source src1 updates, functions func1, func2 and func3 must get updated before the downstream Zip or CombineLatest re-evaluates. However, what happens is that func1 and func2 get updated followed by Zip and then CombineLatest, resulting in a temporary glitch. Only later does func3 update, triggering CombineLatest again and finally giving us the correct result. 

    In my opinion, when src1 updates func1, func2 and func3 must execute, followed by Zip and then CombineLatest to ensure that when CombineLatest executes it does not use a stale value of func3.  

    The use of CombineLatest in the example is only to capture application semantics  - i.e. the result can get updated either when src1 or src2 updates. Depending on my application semantics I could very well use another operator that fulfills my need. However, I would expect that whenever CombineLatest gets triggered it does not see any stale values ( which should be ensured by the run-time). 

    " Rx's serial contract prevents an operator from taking advantage of overlapping notifications in an attempt to achieve this goal"

    I understand the above statement as well. I think this is also not what we are looking for. From what I have read, none of the reactive languages that ensure glitch-freedom support concurrent updates through the dependency graph- they all serialize their update propagation turns. In the example above - either src1 or src2 can update and their update propagation will be serialized by the system.  But the system should ensure that the change gets propagated in a way that does not result in any inconsistencies. 

    I hope I have conveyed my confusion/doubt in a way that you can follow. If not, please let me know and I will try to think of a simpler example or articulate my concerns better. 

    thank you! 



    Tuesday, January 20, 2015 11:42 PM
  • Hi,

    Think about the information available to an Rx operator. Other than the operator-specific parameters, and the IObserver<T> that will eventually be passed to the IObservable<T> that the operator returns, the operator has no knowledge at all about the query before it or the query after it.

    Even if Rx were to do some sleight of hand, how could it ever determine the "correct" propagation order on its own? It can't possibly know about the semantics of every operator that any user could ever write. Of course, Rx could be built with specializations for its specific operators; e.g., CombineLatest, Zip, etc., assuming that no additional information would have to be provided by the caller to indicate the "correct" ordering.

    Perhaps the problem is simply the fact that Rx makes no guarantees about the order of observations with respect to the order of subscriptions in general, though in practice they are often the same.

    https://social.msdn.microsoft.com/Forums/en-US/ac721f91-4dbc-40b8-a2b2-19f00998239f/order-of-subscriptions-order-of-observations?forum=rx

    Ultimately, if you want to control the order of observations in a hot observable, such as that returned by Publish, then you must control the order of subscriptions. That means composing the operators in the "correct" order, or in the worst case, implementing new operators with similar semantics that change the order of subscriptions to meet your needs.

    So I guess the notion that Rx is "glitchy" presumes that Rx could or should make the ordering of observations an explicit part of its contract, whether in regard to the ordering of subscriptions (though as Bart noted in the link above, there's a race condition when concurrency is involved) or in regard to some kind of out-of-band user configuration.

    Frankly, it seems like asking for glitch-free queries is asking Rx to do too much. Rx provides general operators over IObservabe<T>, which offers absolutely no information about the query context in which operators are used. If you care about the order of observations, then you must either reify your notifications into a notification unit, or precisely control the order of subscriptions yourself.

    Does that make sense?

    - Dave


    http://davesexton.com/blog

    • Edited by Dave Sexton Wednesday, January 21, 2015 12:55 AM
    Wednesday, January 21, 2015 12:54 AM
  • Note that the first observation in your example isn't technically a glitch as you've described; it's simply caused by the behavior of the CombineLatest method, which doesn't generate any values until it has observed at least one value from both inputs.

    If you want to prime CombineLatest with a default value on the slower input, then do something like zipped.StartWith(0).CombineLatest(func3, ...).

    In your example, src3 generates a value each second and has reached a value of 3 (which is 0-based, so 4-5 seconds have elapsed) by the time that src1 generates its first value, at 5 seconds. There's a race condition at the 5 second mark and it just-so-happens that zipped pushed its value first, thus the first output is the first zipped notification combined with the 4th (value=3) notification from src2: combining latest: 0,3,3

    I'm not sure this says anything about "glitchy-ness" in Rx. It seems more like a misunderstanding about the behavior of CombineLatest in particular (e.g., you could always create your own operator that implicitly uses StartWith) and the introduction of an uncontrolled race condition.

    Instead of relying on timers and race conditions, consider coding an example that uses virtual time; e.g., TestScheduler.

    The notification that follows (combining latest: 0,0,0) is caused by the fact that you're effectively subscribing to the Merge query after you've subscribed to the zipped query. As I stated in my previous reply, the order of subscriptions matters in practice with respect to hot observables. It's generally FIFO.

    So you're subscribing to CombineLatest, which presumably subscribes to its left argument first (zipped) and then its right argument (func3). The Publish operator has an internal Subject that pushes its notifications in that order. Therefore, the first notification from src1 goes to zipped first, which causes CombineLatest to push its first notification as described above. Then the first notification from src1 also goes to func3, which causes CombineLatest to generate another value, this time with the inputs (0,0); i.e., the latest (and thusfar only) value from zipped and the first value from src1 directly.

    So it seems the order of subscriptions is ultimately the cause of the "glitch". You subscribed in the "wrong" order and expected Rx to correct it for you. I think ultimately it's just up to you to ensure that you subscribe in the "correct" order.

    Perhaps a simple way to accomplish that in your example is to reverse the order of the arguments to CombineLatest, though I haven't tried it myself.

    - Dave


    http://davesexton.com/blog

    • Edited by Dave Sexton Wednesday, January 21, 2015 1:43 AM
    Wednesday, January 21, 2015 1:36 AM
  • I think you can also eliminate the Zip operator from your example. It seems not to have any effect on the output. Perhaps change it to the Do operator if you want to continue logging the order of observations.

    - Dave


    http://davesexton.com/blog

    • Edited by Dave Sexton Wednesday, January 21, 2015 1:50 AM
    Wednesday, January 21, 2015 1:49 AM