none
Rx - Observable and Webclient to get csv

    Frage

  • Hi

    I have a function in my lightswitch application that downloads a csv file from a site which i want to re-write using Rx framework and provide provide possibility to call it synchronously.

    Provide below are the code snippets for old and new function. The new function however doesn't work, the call to ParseCSV never happens. I would like to know why and if exists a better solution, feel free to provide.

    Old Code:

    private void ObservableCollection<Data> collection;
    public ObservableCollection<Data> GetData(string url, ObservableCollection<Data> targetCollection)
    {
    	collection = targetCollection;
    	if (!string.IsNullOrEmpty(url))
    	{
    		WebClient wc = new WebClient();
    		wc.OpenReadCompleted += new OpenReadCompletedEventHandler(OpenReadCompleted_ParseCSV);
    		wc.OpenReadAsync(new Uri(url));
    	}
    	return collection;
    }
    
    private void OpenReadCompleted_ParseCSV(object sender, OpenReadCompletedEventArgs e)
    {
    	if (e.Error != null) return;
    
    	var webClient = sender as WebClient;
    	if (webClient == null) return;
    
    	try
    	{
    		using (StreamReader reader = new StreamReader(e.Result))
    		{
    			string contents = reader.ReadToEnd();
    			...
    		}
    	}
    	catch (Exception ex)
    	{
    		System.Diagnostics.Debug.WriteLine("Error parsing CSV!\n" + ex.Message);
    	}
    }

    New Code (with Rx):

    private void ObservableCollection<Data> collection;
    public ObservableCollection<Data> GetData(string url, ObservableCollection<Data> targetCollection)
    {
    	collection = targetCollection;
    	if (!string.IsNullOrEmpty(url))
    	{
    		var result = Observable.FromEventPattern<OpenReadCompletedEventHandler, OpenReadCompletedEventArgs>
    					 (
    						ev => webClient.OpenReadCompleted += ev,
    						ev => webClient.OpenReadCompleted -= ev
    					 )
    					 .Select(o => o.EventArgs.Result)
    					 .FirstOrDefault()
    					 .ParseCSV();
    
    		// Call the Async method
    		webClient.OpenReadAsync(new Uri(url));
    	}
    	return collection;
    }
        
    private void ParseCSV(this Stream stream)
    {
    	try
    	{
    		using (StreamReader reader = new StreamReader(e.Result))
    		{
    			string contents = reader.ReadToEnd();
    			...
    		}
    	}
    	catch (Exception ex)
    	{
    		System.Diagnostics.Debug.WriteLine("Unable to get history data!\n" + ex.Message);
    	}
    }

    Donnerstag, 16. Februar 2012 09:55

Alle Antworten

  • Hi,

    FirstOrDefault is not a reactive operation.  It blocks the current thread until the first value is observed.  You should use Subscribe instead.

    Alternatively, Rxx provides several related extensions that could make this much easier.  Here's the related lab:

    http://rxx.codeplex.com/SourceControl/changeset/view/65171#1055791

    Here's another example.  This one hosts CSV text locally and uses Rxx parsers in the client code.

    Note: The client appends a new line to the input before parsing (e.g., (reader.ReadToEnd() + Environment.NewLine)) because of a glaring oversight in the parsers implementation: it's missing an "End of Sequence" operator, which I plan on adding for the next release.

    using System;
    using System.Collections.Generic;
    using System.IO;
    using System.Net;
    using System.Reactive.Linq;
    using System.Text;
    using Rxx.Parsers.Linq;
    
    namespace Rxx.Labs.Reactive
    {
    	public sealed class WebClientCsvParserLab : BaseConsoleLab
    	{
    		protected override void Main()
    		{
    			var csv = @"
    item 1, item 2, item 3
    item 4, item 5, item 6
    
    item 7, item 8, item 9
    item 10";
    
    			var url = new Uri("http://localhost:1111");
    
    			var server =
    				from request in ObservableHttpListener.Start(url.Host, url.Port)
    				let bytes = Encoding.UTF8.GetBytes(csv)
    				from sent in Observable.Using(
    					() => request.Response,
    					response => Observable.Using(
    						() => response.OutputStream,
    						stream => stream.WriteObservable(bytes, 0, bytes.Length)))
    				select sent;
    
    			var client = DownloadLinesCsv(url)
    				.Select((line, index) => new
    				{
    					Line = line,
    					LineNumber = index + 1
    				});
    
    			using (server.Subscribe(_ => TraceLine("Server sent response."), TraceError))
    			using (client.Subscribe(
    					result =>
    					{
    						Trace("Client parsed line {0}: ", result.LineNumber);
    
    						foreach (var item in result.Line)
    						{
    							Trace(item + ',');
    						}
    
    						TraceLine();
    					},
    					TraceError,
    					() => TraceLine("Client Completed")))
    			{
    				WaitForKey();
    			}
    		}
    
    		public static IObservable<IList<string>> DownloadLinesCsv(Uri url)
    		{
    			return
    				from stream in ObservableWebClient.OpenRead(url)
    				from line in
    					Observable.Using(
    						() => new StreamReader(stream),
    						reader => (reader.ReadToEnd() + Environment.NewLine)
    							.ParseString(parser =>
    								from next in parser
    								let newLine = parser.Word(Environment.NewLine)
    								let separator = parser.Word(",")
    								let item = from value in next.NoneOrMoreNonGreedy().Join()
    													 from _ in separator.Or(newLine.NonGreedy())
    													 select value
    								let line = item.NoneOrMoreNonGreedy().ToList()
    								select from items in line
    											 from _ in newLine
    											 where items.Count > 0
    											 select items)
    							.ToObservable())
    				select line;
    		}
    	}
    }

    - Dave


    http://davesexton.com/blog

    Donnerstag, 16. Februar 2012 13:42
  • Hi,

    RE: it's missing an "End of Sequence" operator

    I just realized that:

    next.None()

    would work too, but I'm about to add a new operator anyway that is optimized for detecting the end of the sequence.

    - Dave


    http://davesexton.com/blog

    Donnerstag, 16. Februar 2012 14:50
  • Hi Dave

    Thank you for the response. Could you tell me how to use your code in a synchronous manner? I have this lightswitch application that i want to get updated at start.

    Dienstag, 21. Februar 2012 13:10
  • Hi,

    Why do you want to call it synchronously?  Rx provides an asynchronous programming model.

    - Dave


    http://davesexton.com/blog

    Dienstag, 21. Februar 2012 13:38
  • Like mentioned in the above post, i have this LS application that needs to be updated at start meaning get the csv file from the website, parse it, do calculations based on it, update the data and present it (only then). I want to use the webclient class for this and unfortunately in an LS or SL app i can only use async methods.
    Dienstag, 21. Februar 2012 14:17
  • Hi, 

    Just to be clear, you're going to be calling it asynchronously elsewhere in your application, correct?

    If not, then you probably shouldn't use Rx in the first place.  There are ways to wait for any kind of async method, if that's what you really need.

    Otherwise, it's still unclear to me as to why you must call it synchronously at all.  Your requirements seems like a perfect fit for an asynchronous operation: App starts > Kick-off async request for CSV > show UI (without data; some controls disabled).... Concurrently: receive response > parse > calculate > marshal to the UI thread > present.

    Admittedly, though, I'm not familiar with LightSwitch so perhaps you simply can't update the UI asynchronously?

    If you're sure that you actually want to block the current thread to wait for an Rx query to complete, then simply call ForEach instead of Subscribe.

    - Dave


    http://davesexton.com/blog

    Dienstag, 21. Februar 2012 14:32