none
Recommended alternative to Console.ReadLine() in Rx. RRS feed

  • Question

  • hi,

    What is the recommended alternative to using Console.ReadLine(), to wait for an asynchronous Rx processing pipeline to finish its execution - one which does not introduce overhead as to affect query processing performance i.e. throughput. (& preferably does not rely on onCompleted to be called compulsorily.)

    var finished=false;
    source_stream
                .ProcessingOperator()
                .Subscribe(data=>{},ex => finished=true,()=>finished=true);
    while(!finished)
    {
       Thread.Sleep(x);
    }


    In case of  Thread.Sleep(x), if the sleep interval(x) is less then query performance(throughput) is adversely affected. In this case we need to have an estimate for how long the query is expected to run and then to sleep for an interval slightly greater than expected execution time. 

    I find the query-performance(throughput) in case of using readLine() or Thread.Sleep, to be consistently better than using AutoResetEvent. Why is that? 

    AutoResetEvent signal=new AutoResetEvent(false);
    source_stream
                .ProcessingOperator()
                .Subscribe(data=>{},ex=>signal.set(),()=>signal.set());
    
    signal.WaitOne();
    

    thank you. 


    Monday, September 15, 2014 4:15 AM

Answers

  • Hi,

    Modern unit test frameworks support asynchronous tests.  For example, MSTest allows you to define test methods that return Task.  If you've created your own test runner program, then I'd suggest doing the same.

    Then, if you don't care about notifications, simply call ToTask instead of Subscribe.  If you also care about notifications, you can always apply Do.

    Once you have Task, then if you must block you can always call Wait.

    - Dave


    http://davesexton.com/blog

    • Marked as answer by kharesp Tuesday, September 23, 2014 8:04 PM
    Tuesday, September 16, 2014 7:14 AM

All replies

  • Generally the application is written to not be a blocking application. Maybe you want to use some other data structure like a Queue/BlockingCollection instead of an Observable Sequence?

    Alternatively, if the Observable sequence is the correct tool, then maybe you want use a continuation pattern?

    If you could provide some more information on what you are trying to achieve, mybe I could provide some more help.

    Lee

    See BlockingCollection<T> - http://msdn.microsoft.com/en-us/library/dd997371(v=vs.110).aspx


    Lee Campbell http://LeeCampbell.blogspot.com

    Monday, September 15, 2014 4:46 PM
  • Hi Lee,

    Thank you for your response. I am processing sensor data coming in as a stream, so using an Observable Sequence is the correct construct.  I will be running this processing pipeline multiple times, each time parameterized with a different aspect I want to test - ex. scheduler being used to understand the difference in behavior and performance.

    So I want to automate the process using a script and want to do away with the manual step of waiting for the processing to complete, then pressing enter. I would like to have an alternative way of waiting for the processing to complete, but one that has a minimal affect on query processing performance itself. 

    Can you give some suggestion in this regard? If the continuation pattern is applicable, can you direct me to the resource. 

    thanks. 

    Monday, September 15, 2014 5:10 PM
  • Hi,

    Modern unit test frameworks support asynchronous tests.  For example, MSTest allows you to define test methods that return Task.  If you've created your own test runner program, then I'd suggest doing the same.

    Then, if you don't care about notifications, simply call ToTask instead of Subscribe.  If you also care about notifications, you can always apply Do.

    Once you have Task, then if you must block you can always call Wait.

    - Dave


    http://davesexton.com/blog

    • Marked as answer by kharesp Tuesday, September 23, 2014 8:04 PM
    Tuesday, September 16, 2014 7:14 AM
  • using System;
    using System.Reactive.Linq;
    using System.Reactive.Threading.Tasks;
    
    namespace RxConsoleApplication
    {
        class Program
        {
            static void Main()
            {
                var connectable = Observable.Interval(TimeSpan.FromSeconds(1))
                    .Take(10)
                    .Publish();
    
                using (connectable.Subscribe(Console.WriteLine))
                {
                    var task = connectable.ToTask();
    
                    connectable.Connect();
    
                    Console.WriteLine("Waiting");
                    connectable.Wait();
                }
            }
        }
    }
    


    Thursday, October 16, 2014 8:46 PM