none
Process Console Out Redirection and RX

    Question

  • Simple question given this code below, how do I turn StarndardOuput into a IObservable<string> that remains in loop until some condition? 

            Process p = new Process();
                    p.EnableRaisingEvents = true;
                    p.Disposed += (s, e) => { };
                    p.StartInfo.CreateNoWindow = true;
                    p.StartInfo.FileName = fileName;
                    p.StartInfo.Arguments = args;
                    p.StartInfo.RedirectStandardOutput = true;
                    p.Start();
                    //p.StandardOutput is StreamReader
                    //How do I use RX to put this into a loop and allow subscription?
                    //Can't figure this out ---------> p.StandardOutput.ToObservable(IObservable<string> textAvailable);
                    //I want to send OnNext when new data arrives

    JP Cowboy Coders Unite!



    Monday, April 23, 2012 8:09 PM

Answers

  •     internal class StartJob
        {
            public Subject<string> NextLine = new Subject<string>();
            public void Process(string fileName, string args)
            {
                CancellationTokenSource cts = new CancellationTokenSource();
                CancellationToken token = cts.Token;
                Task t = Task.Factory.StartNew((Action)delegate()
                {
                    Process p = new Process();
                    p.EnableRaisingEvents = true;
                    p.Disposed += (s, e) => { };
                    p.StartInfo.CreateNoWindow = true;
                    p.StartInfo.FileName = fileName;
                    p.StartInfo.Arguments = args;
                    p.StartInfo.RedirectStandardOutput = true;
                    p.StartInfo.UseShellExecute = false;
                    p.Start();
                    StreamReader myStreamReader = p.StandardOutput;
                    while (myStreamReader != null)
                    {
                        var thing = myStreamReader.ReadLine();
                        NextLine.OnNext(thing);
                    }
                    p.WaitForExit();
                    cts.Cancel();
                }, token);
            }
        }
    Works but....ehhhhh?

    JP Cowboy Coders Unite!

    Monday, April 23, 2012 9:34 PM
  • Hi,

    StreamReader isn't reactive; i.e., it doesn't push notifications.  Instead, it requires you to pull data.  Therefore, to make it reactive you must introduce concurrency yourself.  The code you've posted is a viable solution.

    However, the Process class exposes an event for receiving notifications when output has been written:

    http://msdn.microsoft.com/en-us/library/system.diagnostics.process.outputdatareceived.aspx

    You should be able to use Observable.FromEventPattern to create your observable, without introducing concurrency and without a subject.

    For example: (untested)

    public IObservable<string> StandardOutputObservable(this Process process)
    {
    	process.EnableRaisingEvents = true;
    	process.UseShellExecute = false;
    	process.StartInfo.RedirectStandardOutput = true;
    
    	var received = Observable.FromEventPattern<DataReceivedEventHandler, DataReceivedEventArgs>(
    		h => h, 
    		h => process.OutputDataReceived += h, 
    		h => process.OutputDataReceived -= h)
    		.TakeUntil(Observable.FromEventPattern(
    			h => process.Exited += h, 
    			h => process.Exited -= h))
    		.Select(e => e.EventArgs.Data);
    
    	process.BeginOutputReadLine();
    
    	return received;
    
    	/* Or if cancellation is important to you...
    
    	return Observable.Create<string>(observer =>
    		{
    			var cancel = Disposable.Create(process.CancelOutputRead);
    
    			return new CompositeDisposable(
    				cancel, 
    				received.Subscribe(observer));
    		});
    	 */
    }

    This is a good candidate for an Rxx extension.  I've added a work item.

    - Dave


    http://davesexton.com/blog

    Tuesday, April 24, 2012 2:27 AM

All replies

  •     internal class StartJob
        {
            public Subject<string> NextLine = new Subject<string>();
            public void Process(string fileName, string args)
            {
                CancellationTokenSource cts = new CancellationTokenSource();
                CancellationToken token = cts.Token;
                Task t = Task.Factory.StartNew((Action)delegate()
                {
                    Process p = new Process();
                    p.EnableRaisingEvents = true;
                    p.Disposed += (s, e) => { };
                    p.StartInfo.CreateNoWindow = true;
                    p.StartInfo.FileName = fileName;
                    p.StartInfo.Arguments = args;
                    p.StartInfo.RedirectStandardOutput = true;
                    p.StartInfo.UseShellExecute = false;
                    p.Start();
                    StreamReader myStreamReader = p.StandardOutput;
                    while (myStreamReader != null)
                    {
                        var thing = myStreamReader.ReadLine();
                        NextLine.OnNext(thing);
                    }
                    p.WaitForExit();
                    cts.Cancel();
                }, token);
            }
        }
    Works but....ehhhhh?

    JP Cowboy Coders Unite!

    Monday, April 23, 2012 9:34 PM
  • Hi,

    StreamReader isn't reactive; i.e., it doesn't push notifications.  Instead, it requires you to pull data.  Therefore, to make it reactive you must introduce concurrency yourself.  The code you've posted is a viable solution.

    However, the Process class exposes an event for receiving notifications when output has been written:

    http://msdn.microsoft.com/en-us/library/system.diagnostics.process.outputdatareceived.aspx

    You should be able to use Observable.FromEventPattern to create your observable, without introducing concurrency and without a subject.

    For example: (untested)

    public IObservable<string> StandardOutputObservable(this Process process)
    {
    	process.EnableRaisingEvents = true;
    	process.UseShellExecute = false;
    	process.StartInfo.RedirectStandardOutput = true;
    
    	var received = Observable.FromEventPattern<DataReceivedEventHandler, DataReceivedEventArgs>(
    		h => h, 
    		h => process.OutputDataReceived += h, 
    		h => process.OutputDataReceived -= h)
    		.TakeUntil(Observable.FromEventPattern(
    			h => process.Exited += h, 
    			h => process.Exited -= h))
    		.Select(e => e.EventArgs.Data);
    
    	process.BeginOutputReadLine();
    
    	return received;
    
    	/* Or if cancellation is important to you...
    
    	return Observable.Create<string>(observer =>
    		{
    			var cancel = Disposable.Create(process.CancelOutputRead);
    
    			return new CompositeDisposable(
    				cancel, 
    				received.Subscribe(observer));
    		});
    	 */
    }

    This is a good candidate for an Rxx extension.  I've added a work item.

    - Dave


    http://davesexton.com/blog

    Tuesday, April 24, 2012 2:27 AM
  • Yes agreed, good candidate for RXX.  What's interesting is how we forget about kicking off Shell jobs or other Processes.  Having done them zillions of times in the past, they seem to obscure themselves until that one new project requires it.  Then it's "where's that darn reusable class that I can never find?"   I did see the event handler in the process class but didn't consider wiring it up using FromEventPattern.  Geez... 

    Sometimes the new hat just doesn't fit so well...


    JP Cowboy Coders Unite!

    Tuesday, April 24, 2012 6:13 PM