locked
Puzzled by subscribe behavior in recursive method RRS feed

  • Question

  • First let me say I'm trying to learn the Rx framework and what will be shown might seem a big mess, I'm open to better ideas about how to structure this using Rx mindset.

    Here's the problem:

    In a setter that runs on the UI thread (data bound in WPF) I have the following code:

                    Observable.Start(() => {
                        var tlf = ActiveServer.Folders.TopLevelFolders;
                            WalkFolders(MainWindow.FolderTreeView.Items, tlf);
                        },
                        Scheduler.ThreadPool);

    This starts the WalkFolders method on a free thread pool thread.  The WalkFolders method is recursive and defined as follows:

            void WalkFolders(ItemCollection items, ICollection<IFolder> folders) {
                var itemRef = new Dictionary<IFolder,ItemCollection>();
                object abc;
                using (var flds = folders.OrderBy(f => f.Name).ToObservable().Subscribe(
                    folder => { 
                               abc = MainWindow.Dispatcher.Invoke(new Action(() => {
                                   Debug.WriteLine(folder.Name);
                                   var tvi = new TreeViewItem();
                                   tvi.Header = folder.Name;
                                   tvi.Tag = folder;
                                   items.Add(tvi);
                                   itemRef.Add(folder, tvi.Items);
                                   }));
                                },
                    ex => Debug.Fail(ex.Message),
                    () => { foreach (var f in itemRef.Keys) {
                                WalkFolders(itemRef[f], f.ChildFolders());
                                }
                            }
                    )) {}
                }

    The first time through everything is peachy and the top level folder names show up in the treeview.  When the first level of recursion is entered (to populate the children of the top level folders), breakpoints indicate that the method is reentered but when I step through the using statement nothing is executed (it moves immediately to the {} of the using statements content).  Neither the Debug.Writeline is execution nor the Debug.Fail. The UI remains responsive and I don't seem to have a runaway thread.  

    I do end up with a *.vhost.exe of this program hanging about even after I close VS, so it would appear that something is hung. I imagine the problem is simple but I just don't see it. Process explorer tells me this is an orphan process (non-existent parent).  This may be a red-herring because when I run the debug version directly (outside of VS) it runs and closes without leaving a orphan.

    Thanks

    Wednesday, February 8, 2012 11:42 PM

Answers

  • Hi,

    I didn't try running your example but this is what I suspect is happening:

    ToObservable uses Scheduler.CurrentThread by default.  This scheduler inserts a trampoline on the current thread, which is basically just a work queue that makes Subscribe asynchronous on a single thread.  In your code, the recursion starts when the OnCompleted notification is received by the first observer.  At this point the first subscription still hasn't completed executing on the current thread, so when Subscribe is called again it's simply enqueued.  It then returns immediately and the using statement disposes of the second subscription immediately, thus cancelling it before it has even started.

    I assume that you're using ToObservable because you thought that it would force concurrency, but by default it doesn't.  You'd have to pass in a scheduler; e.g., ToObservable(Scheduler.ThreadPool).  But that's not going to work in your code either because making Subscribe asynchronous would cause it to return immediately and then the using statement would cancel it immediately; it's the same problem that you're experiencing with your second iteration now.

    Rx provides extension methods for scheduling recursive operations, but you're probably better off just making normal recursive calls on a single thread-pool thread, unless you've got a really deep tree.

    Actually, if ChildFolders isn't an expensive operation, then I'd probably just do the entire operation on the UI thread; i.e., don't use Observable.Start or IObservable<T> at all.  Marshaling back to the UI thread for every folder may be more costly than just running the entire operation on a single thread as fast as possible.

    But if you really must use Observable.Start, then here's how you could implement WalkFolders so that it runs on a single pooled thread with normal recursion: (Untested)

    void WalkFolders(ItemCollection items, ICollection<IFolder> folders)
    {
    	foreach (var folder in folders.OrderBy(f => f.Name))
    	{
    		TreeViewItem item;
    
    		MainWindow.Dispatcher.Invoke(new Action(() =>
    		{
    			Debug.WriteLine(folder.Name);
    
    			item = new TreeViewItem()
    			{
    				Header = folder.Name, 
    				Tag = folder
    			};
    
    			items.Add(item);
    		}));
    
    		WalkFolders(item.Items, folder.ChildFolders());
    	}
    }

    - Dave


    http://davesexton.com/blog

    • Edited by Dave Sexton Thursday, February 9, 2012 2:10 AM Fixed some code issues
    • Marked as answer by Static Shock Thursday, February 9, 2012 5:36 PM
    Thursday, February 9, 2012 2:07 AM

All replies

  • Hi,

    I didn't try running your example but this is what I suspect is happening:

    ToObservable uses Scheduler.CurrentThread by default.  This scheduler inserts a trampoline on the current thread, which is basically just a work queue that makes Subscribe asynchronous on a single thread.  In your code, the recursion starts when the OnCompleted notification is received by the first observer.  At this point the first subscription still hasn't completed executing on the current thread, so when Subscribe is called again it's simply enqueued.  It then returns immediately and the using statement disposes of the second subscription immediately, thus cancelling it before it has even started.

    I assume that you're using ToObservable because you thought that it would force concurrency, but by default it doesn't.  You'd have to pass in a scheduler; e.g., ToObservable(Scheduler.ThreadPool).  But that's not going to work in your code either because making Subscribe asynchronous would cause it to return immediately and then the using statement would cancel it immediately; it's the same problem that you're experiencing with your second iteration now.

    Rx provides extension methods for scheduling recursive operations, but you're probably better off just making normal recursive calls on a single thread-pool thread, unless you've got a really deep tree.

    Actually, if ChildFolders isn't an expensive operation, then I'd probably just do the entire operation on the UI thread; i.e., don't use Observable.Start or IObservable<T> at all.  Marshaling back to the UI thread for every folder may be more costly than just running the entire operation on a single thread as fast as possible.

    But if you really must use Observable.Start, then here's how you could implement WalkFolders so that it runs on a single pooled thread with normal recursion: (Untested)

    void WalkFolders(ItemCollection items, ICollection<IFolder> folders)
    {
    	foreach (var folder in folders.OrderBy(f => f.Name))
    	{
    		TreeViewItem item;
    
    		MainWindow.Dispatcher.Invoke(new Action(() =>
    		{
    			Debug.WriteLine(folder.Name);
    
    			item = new TreeViewItem()
    			{
    				Header = folder.Name, 
    				Tag = folder
    			};
    
    			items.Add(item);
    		}));
    
    		WalkFolders(item.Items, folder.ChildFolders());
    	}
    }

    - Dave


    http://davesexton.com/blog

    • Edited by Dave Sexton Thursday, February 9, 2012 2:10 AM Fixed some code issues
    • Marked as answer by Static Shock Thursday, February 9, 2012 5:36 PM
    Thursday, February 9, 2012 2:07 AM
  • Hi,

    I just realized that my use of item.Items outside of the UI thread may cause WPF to throw an exception.  If that's true, then it's another good reason to just do the entire operation on the UI thread.

    - Dave


    http://davesexton.com/blog

    Thursday, February 9, 2012 2:14 AM
  • Thank you for your reply, I believe that is the correct problem.  I was not aware that the OnComplete call was run in an asynchronous manner.  I revised the code to look as follows

            void WalkFolders(ItemCollection items, ICollection<IFolder> folders) {
                ItemCollection itms = null;
                foreach (var folder in folders.OrderBy(f => f.Name)) {
                    MainWindow.Dispatcher.Invoke(new Action(() => {
                        Debug.WriteLine(folder.Name);
                        TreeViewItem item;
                        item = new TreeViewItem() {
                            Header = folder.Name,
                            Tag = folder
                            };
                        itms = item.Items;
                        items.Add(item);
                        }));
    
                    WalkFolders(itms, folder.ChildFolders());
                    }
                }

    The implementation sans Rx is much cleaner. The ChildFolders call causes a remote query and the tree size is usually small but can be quite large. While this app will be useful I'm mainly trying to learn Rx by doing.

    Thanks!

    Thursday, February 9, 2012 6:02 PM
  • Hi,

    Rx is about composing reactive queries and events.  If ChildFolders performs a query over a network, then perhaps it would be best for it to return IObservable<IFolder> and do the work asynchronously.  The cost of making several network requests may actually make it worthwhile to use pooled threads and marshal the results back to the UI thread for each iteration.

    For example, ChildFolders could look something like this:

    interface IFolder
    {
    	...
    	IObservable<IFolder> ChildFolders();
    }
    
    class Folder : IFolder
    {
    	...
    	
    	public IObservable<IFolder> ChildFolders()
    	{
    		// Rxx extension: http://rxx.codeplex.com/SourceControl/changeset/view/65047#973546
    		return from result in ObservableWebClient.DownloadString(url)
    			from folder in ParseFolders(result).OrderBy(f => f.Name)
    			select folder;
    	}
    }

    Then you can define WalkFolders as follows: (Untested) 

    IDisposable StartWalkFolders()
    {
    	var subscriptions = new CompositeDisposable();
    
    	WalkFolders(
    		MainWindow.FolderTreeView.Items, 
    		ActiveServer.Folders.TopLevelFolders.ToObservable(), 
    		subscriptions);
    
    	return subscriptions;
    }
    
    void WalkFolders(ItemCollection items, IObservable<IFolder> folders, CompositeDisposable subscriptions)
    {
    	subscriptions.Add(
    		folders.ObserveOnDispatcher().Subscribe(
    			folder =>
    			{
    				Debug.WriteLine(folder.Name);
    
    				var item = new TreeViewItem()
    				{
    					Header = folder.Name, 
    					Tag = folder
    				};
    
    				items.Add(item);
    
    				WalkFolders(item.Items, folder.ChildFolders(), subscriptions);
    			}, 
    			ex => items.Add(new TreeViewItem()
    			{
    				Header = ex.Message, 
    				Tag = ex, 
    				Foreground = Brushes.Red
    			})));
    }

    - Dave


    http://davesexton.com/blog

    Thursday, February 9, 2012 7:20 PM