locked
Rx design questions RRS feed

  • Question

  • I'm working on WP7 platform and want to create a file download manager (resumeable downloads) and I want to use Rx to push out notifications. Pre Rx I would have done this with events where a request to download would be queued, processed asynchronously with events being generated as and when required.

    Now I am thinking of something like the following, is this good or a bad idea?

    public sealed class DownloadStatus
    {
    }
    
    public sealed class FileDownloader : IObservable<DownloadStatus>, IDisposable
    {
        private readonly Subject<DownloadStatus> subject;
        private readonly List<IObserver<DownloadStatus>> observers;
    
        public FileDownloader()
        {
          this.subject = new Subject<DownloadStatus>();
          this.observers = new List<IObserver<DownloadStatus>>();
        }
    
        public void Dispose()
        {
          subject.Dispose();
        }
    
        public void QueueFile(string url, string fullPath)
        {
          // Do work asynchronously and notify observers...
    
          var status = new DownloadStatus();
          this.observers.ForEach(o => o.OnNext(status));
        }
    
        public IDisposable Subscribe(IObserver<DownloadStatus> observer)
        {
          this.observers.Add(observer);
    
          return subject;
        }
    }
    




    Trying to remember what I learned yesterday
    Tuesday, July 26, 2011 6:46 PM

All replies

  • Hi,

    Maybe it would be better not to implement IObservable yourself - I see a few problems with your implementation, but regardless it's not necessary anyway.

    Consider having QueueFile return an IObservable<DownloadStatus> instead.  You probably won't need to use a Subject either.

    You might want to use a Queue though if you need explicit control over concurrency; otherwise, you can simply begin your async downloads as they are requested and the I/O Completion Ports (assuming they're supported on WP7) will act as your queue.

    - Dave


    http://davesexton.com/blog
    Tuesday, July 26, 2011 10:43 PM
  • Hi Ollie,

    Your `Subject<DownloadStatus>` isn't being used anywhere in your code other than as the return from the `Subscribe` method. This means that every subscriber gets a reference to your single subject so the first subscriber to dispose of the subscription will dispose the subject for all subscribers.

    As I said, it's not being used right now, but I assume you intend to use it so watch out.

    Oh, and I agree with Dave. It's usually a sign that you're doing something wrong if you implement your own `IObserver` or `IObservable` classes.

    Cheers.


    James C-S
    Wednesday, July 27, 2011 4:53 AM
  • I agree with what you are saying about implementing your own 'IObserver' & 'IObservable'. Also agree about this being the wrong use of the Subject.

    Dave - I don't believe completion ports are available on WP7.

    The way I was thinking of using this was as follows:

     this.downloader = new DownloadManager();
     this.downloader.Subscribe(result => Debug.WriteLine("1 - " + result.Status));
     this.downloader.Start();
    
     this.downloader.Queue("http://localhost/file1.mp3", "\\files\file1.mp3");
     this.downloader.Queue("http://localhost/file2.mp3", "\\files\file2.mp3");
     this.downloader.Queue("http://localhost/file3.mp3", "\\files\file3.mp3");
    

    Basically what i wanted was the ability to subscribe once and receive all status changes via a single subscriber.

    Using event I would do somethiing simiilar:

     this.downloader = new DownloadManager();
     this.downloader..StatusChanged += OnStatusChanged;
     this.downloader.Start();
    
     this.downloader.Queue("http://localhost/file1.mp3", "\\files\file1.mp3");
     this.downloader.Queue("http://localhost/file2.mp3", "\\files\file2.mp3");
     this.downloader.Queue("http://localhost/file3.mp3", "\\files\file3.mp3");
    


     

     


    Trying to remember what I learned yesterday
    Wednesday, July 27, 2011 9:17 AM
  • Hi,

    Even if IOCP isn't available you can still start your async operations and let the thread pool act as your queue.  Alternatively, if you actually require the ability to control concurrency, then you could create an IObservable<IObservable<T>> and use Merge(maxConcurrent).

    > The way I was thinking of using this was as follows: [snip]

    My suggestion implies the following usage:

    var d = new DownloadManager();
    
    IObservable<DownloadStatus> download1 = d.Enqueue("http://localhost/file1.mp3", "\\files\file1.mp3");
    IObservable<DownloadStatus> download2 = d.Enqueue("http://localhost/file2.mp3", "\\files\file2.mp3");
    IObservable<DownloadStatus> download3 = d.Enqueue("http://localhost/file3.mp3", "\\files\file3.mp3");
    
    new[] { download1, download2, download3 }
    	.Merge()
    	.Subscribe(Debug.WriteLine);
    

    This offers more control and flexibility because each download has its own observable.  And as I've shown you can easily get the same merged behavior if you need a single observable sequence simply by using Merge.

    However, if you really need to encapsulate the merge behavior within the manager class, for example to control maximum concurrency, then your original solution was on the right track.  Perhaps try something like this: (Untested)

    public sealed class FileDownloader : IObservable<DownloadStatus>
    {
    	private readonly Subject<IObservable<DownloadStatus>> subject = new Subject<IObservable<DownloadStatus>>();
    	private readonly IObservable<DownloadStatus> queue;
    
    	public FileDownloader(int maxConcurrent)
    	{
    		// Prime is provided by Rxx; alternatively, you could use RefCount although the semantics aren't exactly the same.
    		queue = subject.Merge(maxConcurrent).Publish().Prime();
    		queue.Subscribe();
    	}
    
    	public void Enqueue(Uri url, string fullPath)
    	{
    		// ObservableWebClient is provided by Rxx.
    		// Note that WP7 prevents concurrent downloads using WebClient (I think) so you may have to use ObservableWebRequest instead.
    		var download = Observable.Defer(() => 
    			ObservableWebClient.DownloadFileWithProgress(url, fullPath).Select(either => new DownloadStatus(...)));
    
    		subject.OnNext(download);
    	}
    
    	public IDisposable Subscribe(IObserver<DownloadStatus> observer)
    	{
    		return queue.Subscribe(observer);
    	}
    }
    

    - Dave


    http://davesexton.com/blog
    • Edited by Dave Sexton Wednesday, July 27, 2011 10:11 AM Forgot to call Subscribe on queue
    Wednesday, July 27, 2011 10:08 AM
  • thanks dave,

    concurrency on the phone is going to be 1.

    The only issue left is the modifying of the list of downloads, add, remove, re-prioritise items, I believe you second solution above is more applicable for this unless there is away to remove subscription after a Merge.

     

    ta

     

    Ollie


    Trying to remember what I learned yesterday
    Wednesday, July 27, 2011 10:40 AM
  • I agree with Dave. His first solution is the way to go.

    Here's why, IMHO.

    Rx is a functional programming paradigm - not object oriented. It's about letting the operators manage the state for you. When every you find yourself managing state (ie implementing `IObservable` and `IObserver`) you're doing things wrong.

    When helping the guys at work learn Rx I've always said to them "stay in the monad until the last possible moment". Changing from observables to enumerables, or observables to objects is leaving the monad. Once you do that all of the protection that the monad gives you is gone. The monad is the `IObservable<>` (or `IEnumerable<>`, or `Task<>` or `Nullable<>`, etc.). Stay within any one monad and don't leave it. Even doing work inside the observer code is leaving the monad. The observer should therefore be as simple code as possible.

    Dave's solution which returns `IObservable<>` from the `Enqueue` method is spot on. There's nothing stopping this solution also providing a `maxConcurrent` downloads either - it would just do start excess observables until others had completed.

    Think in terms of using existing extension methods or writing your own extension methods well before you consider writing your own classes.

    Cheers.


    James C-S
    Wednesday, July 27, 2011 10:48 AM
  • Thanks for the info - there are other concerns the download manager has to deal which aren't exposed, what I wanted way a to expose 'status' changes, I am not looking for a complete functional approach to the problem

     


    Trying to remember what I learned yesterday
    Wednesday, July 27, 2011 3:31 PM