Process Console Out Redirection and RX
-
2012年4月23日 20:09
Simple question given this code below, how do I turn StarndardOuput into a IObservable<string> that remains in loop until some condition?
Process p = new Process(); p.EnableRaisingEvents = true; p.Disposed += (s, e) => { }; p.StartInfo.CreateNoWindow = true; p.StartInfo.FileName = fileName; p.StartInfo.Arguments = args; p.StartInfo.RedirectStandardOutput = true; p.Start(); //p.StandardOutput is StreamReader //How do I use RX to put this into a loop and allow subscription? //Can't figure this out ---------> p.StandardOutput.ToObservable(IObservable<string> textAvailable); //I want to send OnNext when new data arrives
JP Cowboy Coders Unite!
- 已编辑 Mr. Javaman II 2012年4月23日 20:11
全部回复
-
2012年4月23日 21:34
internal class StartJob { public Subject<string> NextLine = new Subject<string>(); public void Process(string fileName, string args) { CancellationTokenSource cts = new CancellationTokenSource(); CancellationToken token = cts.Token; Task t = Task.Factory.StartNew((Action)delegate() { Process p = new Process(); p.EnableRaisingEvents = true; p.Disposed += (s, e) => { }; p.StartInfo.CreateNoWindow = true; p.StartInfo.FileName = fileName; p.StartInfo.Arguments = args; p.StartInfo.RedirectStandardOutput = true; p.StartInfo.UseShellExecute = false; p.Start(); StreamReader myStreamReader = p.StandardOutput; while (myStreamReader != null) { var thing = myStreamReader.ReadLine(); NextLine.OnNext(thing); } p.WaitForExit(); cts.Cancel(); }, token); } }Works but....ehhhhh?JP Cowboy Coders Unite!
- 已标记为答案 Mr. Javaman II 2012年5月8日 22:42
-
2012年4月24日 2:27
Hi,
StreamReader isn't reactive; i.e., it doesn't push notifications. Instead, it requires you to pull data. Therefore, to make it reactive you must introduce concurrency yourself. The code you've posted is a viable solution.
However, the Process class exposes an event for receiving notifications when output has been written:
http://msdn.microsoft.com/en-us/library/system.diagnostics.process.outputdatareceived.aspx
You should be able to use Observable.FromEventPattern to create your observable, without introducing concurrency and without a subject.
For example: (untested)
public IObservable<string> StandardOutputObservable(this Process process) { process.EnableRaisingEvents = true; process.UseShellExecute = false; process.StartInfo.RedirectStandardOutput = true; var received = Observable.FromEventPattern<DataReceivedEventHandler, DataReceivedEventArgs>( h => h, h => process.OutputDataReceived += h, h => process.OutputDataReceived -= h) .TakeUntil(Observable.FromEventPattern( h => process.Exited += h, h => process.Exited -= h)) .Select(e => e.EventArgs.Data); process.BeginOutputReadLine(); return received; /* Or if cancellation is important to you... return Observable.Create<string>(observer => { var cancel = Disposable.Create(process.CancelOutputRead); return new CompositeDisposable( cancel, received.Subscribe(observer)); }); */ }This is a good candidate for an Rxx extension. I've added a work item.
- Dave
- 已编辑 Dave Sexton 2012年4月24日 2:32 Clarification; added work item link
- 已标记为答案 Mr. Javaman II 2012年5月8日 22:42
-
2012年4月24日 18:13
Yes agreed, good candidate for RXX. What's interesting is how we forget about kicking off Shell jobs or other Processes. Having done them zillions of times in the past, they seem to obscure themselves until that one new project requires it. Then it's "where's that darn reusable class that I can never find?" I did see the event handler in the process class but didn't consider wiring it up using FromEventPattern. Geez...
Sometimes the new hat just doesn't fit so well...
JP Cowboy Coders Unite!

