locked
Subscribe to Results of Multiple Async Operations for List<FileInfo> Items RRS feed

  • Question

  • I have a List of FileInfo objects (List<FileInfo>). For each FileInfo, I need to make a call to a repository.GetEmptyDocumentFromFilename passing the FileInfo object, some additional params, and a callback of type Action<IOperationResult<Document>>. I need to aggregate the results back to a List<Document> object. The IOperationResult object has a Result property of type Document. I've hacked and Googled quite a bit and I can't quite figure this out. Help is appreciated. Thanks!
    Friday, February 4, 2011 7:07 PM

Answers

  • OK. Your solution was mighty fine, however I took it to 11 by making it Func'y.

     

    var func = new Func<FileInfo, IObservable<Document>>(
     info => Observable.Create<Document>(
      observer => {
       _contractRepository.GetEmptyDocumentFromFilename(
        SelectedCompany.Id,
        SelectedDocumentType,
        UserCode,
        info.Name,
        r => {
                    try {
                      if (r.Error != null)
                        throw r.Error;
                      observer.OnNext(r.Result);
                      observer.OnCompleted();
                    } catch (Exception exception) {
                      observer.OnError(exception);
                    }
        }
        );
    
       return () => { };
      }
     )
    );
    
    var query = SelectedFiles.Select(func)
     .Merge()
     .Aggregate(new List<Document>(), (list, result) => {
                list.Add(result);
                return list;
               });
    query.Subscribe(documents => _uiService.ShowView(ViewNames.DocumentsListView, documents));
    

     

    There was also a slight edit. Notice the list.Add(result) in the Aggregate, you have it as result.Document. Thanks man! This rocks!

     

    P.S. I love anonymous methods and lambdas. Just sayin'.

     

    • Marked as answer by davidbitton Saturday, February 5, 2011 3:56 AM
    Friday, February 4, 2011 9:32 PM
  • Hi,

    Note that your exception handling isn't correct; it violates the recommended Rx Design Guidelines for propagating exceptions (see §6.4 Notes).  Try the following instead:

    Func<FileInfo, IObservable<Document>> func = 
    	info => Observable.Create<Document>(observer =>
    	{
    		try
    		{
    			_contractRepository.GetEmptyDocumentFromFilename(
    				SelectedCompany.Id,
    				SelectedDocumentType,
    				UserCode,
    				info.Name,
    				r =>
    				{
    					if (r.Error == null)
    					{
    						observer.OnNext(r.Result);
    						observer.OnCompleted();
    					}
    					else
    						observer.OnError(r.Error);
    				}
    			);
    		}
    		catch (Exception ex)
    		{
    			observer.OnError(ex);
    		}
    
    		return () => { };
    	});
    

    - Dave


    http://davesexton.com/blog
    • Marked as answer by davidbitton Saturday, February 5, 2011 3:57 AM
    Friday, February 4, 2011 10:34 PM

All replies

  • Hi,

    Edit: Ah, nevermind... just noticed the "callback" in your description.

    - Dave


    http://davesexton.com/blog
    Friday, February 4, 2011 8:13 PM
  • One approach that could would would be to use a Subject.

    var subject = new Subject<Document>();

    so on Repo.GetEmptyDoc(fileinfo, (ioResultDoc) => subject.OnNext(ioResultDoc.Document));

    Then just subscribe to the subject.

     

     


    --Scott W.
    http://weblogs.asp.net/sweinstein
    Friday, February 4, 2011 8:14 PM
  • Scott,

      How I iterate the collection the List<FileInfo> ? Does this make sense?

     

    var documents = new List<Document>();
    var subject = new Subject<Document>();
    
    foreach (var info in SelectedFiles) {
     _contractRepository.GetEmptyDocumentFromFilename(
      SelectedCompany.Id,
      SelectedDocumentType,
      UserCode,
      info.Name,
      r => subject.OnNext(r.Result));
    }
    
    subject.Subscribe(
     d => documents.Add(d),
     e => UpdateStatus("Upload failed.", e),
     () => _uiService.ShowView(ViewNames.DocumentsListView, documents)
    );
    

     

     

    Friday, February 4, 2011 8:21 PM
  • Hi,

    Try something like this:

    void GetEmptyDocuments()
    {
    	List<FileInfo> files = GetFiles();
    
    	var query = files.Select(GetEmptyDocumentFromFilename)
    		.Merge()
    		.Aggregate(new List<Document>(), (list, result) =>
    			{
    				list.Add(result.Document);
    				return list;
    			});
    
    	query.Subscribe(documents => UseDocuments(documents));
    }
    
    
    public IObservable<Document> GetEmptyDocumentFromFilename(FileInfo file)
    {
    	return Observable.Create<Document>(observer =>
    	{
    		try
    		{
    			repository.GetEmptyDocumentFromFilename(
    				file, 
    				result =>
    				{
    					observer.OnNext(result.Document);
    					observer.OnCompleted();
    				}
    			);
    		}
    		catch (Exception ex)
    		{
    			observer.OnError(ex);
    		}
    
    		return () => { };
    	});
    }

    http://davesexton.com/blog
    • Edited by Dave Sexton Friday, February 4, 2011 8:27 PM Changed method to use Create instead of CreateWithDisposable
    • Proposed as answer by fixedpoint Saturday, February 5, 2011 12:28 AM
    Friday, February 4, 2011 8:22 PM
  • yup. Though Dave's approach is cleaner in that it doesn't require the awkwardness of the Subject.


    --Scott W.
    http://weblogs.asp.net/sweinstein
    Friday, February 4, 2011 8:52 PM
  • OK. Your solution was mighty fine, however I took it to 11 by making it Func'y.

     

    var func = new Func<FileInfo, IObservable<Document>>(
     info => Observable.Create<Document>(
      observer => {
       _contractRepository.GetEmptyDocumentFromFilename(
        SelectedCompany.Id,
        SelectedDocumentType,
        UserCode,
        info.Name,
        r => {
                    try {
                      if (r.Error != null)
                        throw r.Error;
                      observer.OnNext(r.Result);
                      observer.OnCompleted();
                    } catch (Exception exception) {
                      observer.OnError(exception);
                    }
        }
        );
    
       return () => { };
      }
     )
    );
    
    var query = SelectedFiles.Select(func)
     .Merge()
     .Aggregate(new List<Document>(), (list, result) => {
                list.Add(result);
                return list;
               });
    query.Subscribe(documents => _uiService.ShowView(ViewNames.DocumentsListView, documents));
    

     

    There was also a slight edit. Notice the list.Add(result) in the Aggregate, you have it as result.Document. Thanks man! This rocks!

     

    P.S. I love anonymous methods and lambdas. Just sayin'.

     

    • Marked as answer by davidbitton Saturday, February 5, 2011 3:56 AM
    Friday, February 4, 2011 9:32 PM
  • Hi,

    Note that your exception handling isn't correct; it violates the recommended Rx Design Guidelines for propagating exceptions (see §6.4 Notes).  Try the following instead:

    Func<FileInfo, IObservable<Document>> func = 
    	info => Observable.Create<Document>(observer =>
    	{
    		try
    		{
    			_contractRepository.GetEmptyDocumentFromFilename(
    				SelectedCompany.Id,
    				SelectedDocumentType,
    				UserCode,
    				info.Name,
    				r =>
    				{
    					if (r.Error == null)
    					{
    						observer.OnNext(r.Result);
    						observer.OnCompleted();
    					}
    					else
    						observer.OnError(r.Error);
    				}
    			);
    		}
    		catch (Exception ex)
    		{
    			observer.OnError(ex);
    		}
    
    		return () => { };
    	});
    

    - Dave


    http://davesexton.com/blog
    • Marked as answer by davidbitton Saturday, February 5, 2011 3:57 AM
    Friday, February 4, 2011 10:34 PM
  • Dave,

      Now a month later, I have some additional thoughts on this. The problem with OnError is it kills the loop entirely. What if I want each collection item to be observed atomically? My idea is to not use the OnError, but in fact return the collection item itself in OnNext if it failed processing. My subscription could then resubmit in a later operation. Opinion?

    Thursday, March 10, 2011 8:49 PM
  • Dave(x2),

    You may want to start with the end in mind and decide how you want to consumer this. Eg if it is ok for one of the items in the collection to throw but the rest of the items in the collection to continue then you may want to look at some of the TPL exception concepts.

    Perhaps you could merge the exceptions into an Aggregate Exception that is thrown once the last item is yielded, or perhaps you may want to put the exception in to the IOperartionResult<T> (so there is either a document or an exception in the result).

    Either way I think you want to possibly change your last query to something like:

    var query = SelectedFiles.Select(func)
     //.Merge() This will kill the whole thing on any error.
     .Aggregate(new List<Document>(), (list, resultObs) => {
          resultObs.Subscribe(
                //Put either results or exception into the OperationResult
                result=>list.Add(new OperationResult(result)),
                ex=>list.Add(new OperationResult(ex)));
          return list;
          });
    query.Subscribe(documents => _uiService.ShowView(ViewNames.DocumentsListView, documents));
     
    

    Specifically you want to kill the Merge as it will OnError if any of the inner observables OnError.

    I Hope that helps.

     

    Lee

     


    Lee Campbell http://LeeCampbell.blogspot.com
    Saturday, March 26, 2011 11:33 AM
  • Hi Dave,

    An alternative to Lee's solution is to simply not call OnError.  For example, in the code that you've marked as the answer: 

    if (r.Error == null)
    {
    	observer.OnNext(r.Result);
    	observer.OnCompleted();
    }
    else
    	observer.OnError(r.Error);
    

    it seems you're already catching errors and assigning them to r.Error, thus your selector can be changed to the following:

    observer.OnNext(r.Result);
    observer.OnCompleted();
    

    Then, if Merge or any of the other operators throw it should be considered fatal, because the error occurred outside of the boundaries of what was being explicitly caught and assigned to r.Error.

    - Dave


    http://davesexton.com/blog
    Saturday, March 26, 2011 6:35 PM
  • Dave,

      I posted a follow-on to this a few weeks back, 

    Use Completed Results from IObservable<byte[]> to Feed Second IObservable<string>

     and i'm following you're last suggestion. Take a look. Thanks!

     

    Lee,

      Thanks for the suggestion. I scrubbed your weblog prior to coming up with the solution in the aforementioned thread. Thanks!


    Monday, March 28, 2011 2:08 PM