locked
How to force Subscribe to hook without delay? RRS feed

  • Question

  • Hi I am facing a problem with Subscribe, that I dont know if it is my fault, a by design behavior or a defect. I am trying to hook ObservableCollection added and removed events, so I wrote the following extension code.

    public static IObservable<T> GetAddedItems<T>(this ObservableCollection<T> collection)
            {
                var notificationObservable = Observable.FromEvent<NotifyCollectionChangedEventHandler, NotifyCollectionChangedEventArgs>(
                    h => new NotifyCollectionChangedEventHandler(h), 
                    handler => collection.CollectionChanged += handler,
                    handler => collection.CollectionChanged -= handler);
    
                var selection = notificationObservable
                    .SelectMany(arg =>
                        {
                            if (arg.EventArgs.Action == NotifyCollectionChangedAction.Add)
                            {
                                return from newItem in arg.EventArgs.NewItems.Cast<T> ()
                                       select newItem;
                            }
                            return new T [0];
                        });
    
                return selection;
            }
    
            public static IObservable<T> GetRemovedItems<T>(this ObservableCollection<T> collection)
            {
                var notificationObservable = Observable.FromEvent<NotifyCollectionChangedEventHandler, NotifyCollectionChangedEventArgs>(
                    h => new NotifyCollectionChangedEventHandler(h),
                    handler => collection.CollectionChanged += handler,
                    handler => collection.CollectionChanged -= handler);
    
                var selection = notificationObservable
                    .SelectMany(arg =>
                    {
                        if (arg.EventArgs.Action == NotifyCollectionChangedAction.Remove || arg.EventArgs.Action == NotifyCollectionChangedAction.Replace)
                        {
                            return from newItem in arg.EventArgs.OldItems.Cast<T>()
                                   select newItem;
                        }
                        return new T[0];
                    });
    
                return selection;
            }
    Then I wrote the following test to make sure it worked:

            [TestMethod]
            public void ObservableCollectionObserverExtensions()
            {
                var collection = new ObservableCollection<int>();
    
                var addedItemsObserver = collection.GetAddedItems();
                var removedItemsObserver = collection.GetRemovedItems();
    
                int added = 0;
                int removed = 0;
      
                addedItemsObserver.Subscribe(
                    item =>
                    {
                        added++;
                        Assert.AreNotEqual(0, item);
                    });
                removedItemsObserver.Subscribe(
                    item =>
                    {
                        removed++;
                        Assert.AreNotEqual(0, item);
                    });
    
                collection.Add(5);
                collection.Remove(0);
    
                addedItemsObserver.Take(1);
                removedItemsObserver.Take(1);
    
                Assert.AreEqual(1, added);
                Assert.AreEqual(1, removed);
    
                collection.Add(10);
                collection.Add(20);
    
                Assert.AreEqual(3, added);
                Assert.AreEqual(1, removed);
    
                collection[1] = 3;
            }
    As you can see the Subscribe got executed before inserting the collection, but there is a delay between the return of the Subscribe and the Collection.Add method, and the hook miss the first add. Any idea?

    BTW if someone know how to properly test reactive behaviors, I am all ears :D

    Greetings
    Federico
    Tuesday, January 12, 2010 5:11 PM

Answers

  • Testing Rx is fun and I will be explaining how to do this in the future.  For now, some good strategies to use are to use the Now scheduler so that concurrency is limited.  I found a bug in the test code (collection.Remove(0) should be collection.Remove(5)) and if you change the SelectMany to use the overload that returns an Observable then everything is great (because you can specify to use the Scheduler.Now to do the conversion).  We will include a new overload of SelectMany that a scheduler in a future release.  Also, your Take(1) calls didn't do anything at all.

     public static class someextentions
        {
    public static IObservable<T> GetAddedItems<T>(this ObservableCollection<T> collection)
            {
                var notificationObservable = Observable.FromEvent<NotifyCollectionChangedEventHandler, NotifyCollectionChangedEventArgs>(
                    h => new NotifyCollectionChangedEventHandler(h), 
                    handler => collection.CollectionChanged += handler,
                    handler => collection.CollectionChanged -= handler);
    
                var selection = notificationObservable
                    .SelectMany(arg =>
                        {
                            if (arg.EventArgs.Action == NotifyCollectionChangedAction.Add)
                            {
                                return (from newItem in arg.EventArgs.NewItems.Cast<T> ()
                                        select newItem).ToObservable(Scheduler.Now);
                            }
                            return Observable.Empty<T>();
                        });
    
                return selection;
            }
    
    public static IObservable<T> GetRemovedItems<T>(this ObservableCollection<T> collection)
    {
        var notificationObservable = Observable.FromEvent<NotifyCollectionChangedEventHandler, NotifyCollectionChangedEventArgs>(
            h => new NotifyCollectionChangedEventHandler(h),
            handler => collection.CollectionChanged += handler,
            handler => collection.CollectionChanged -= handler);
    
        var selection = notificationObservable
            .SelectMany(arg =>
            {
                if (arg.EventArgs.Action == NotifyCollectionChangedAction.Remove || arg.EventArgs.Action == NotifyCollectionChangedAction.Replace)
                {
                    return (from newItem in arg.EventArgs.OldItems.Cast<T>()
                            select newItem).ToObservable(Scheduler.Now);
                }
                return Observable.Empty<T>();
            });
    
        return selection;
    }
        }
    
        class Program
        {
            static void Main(string[] args)
            {
                var collection = new ObservableCollection<int>();
    
                var addedItemsObserver = collection.GetAddedItems();
                var removedItemsObserver = collection.GetRemovedItems();
    
                int added = 0;
                int removed = 0;
      
                addedItemsObserver.Subscribe(
                    item =>
                    {
                        added++;
                        Assert.AreNotEqual(0, item);
                    });
                removedItemsObserver.Subscribe(
                    item =>
                    {
                        removed++;
                        Assert.AreNotEqual(0, item);
                    });
    
                collection.CollectionChanged += (_, __) => { };
    
                collection.Add(5);
                collection.Remove(5);
    
                addedItemsObserver.Take(1);
                removedItemsObserver.Take(1);
    
                Assert.AreEqual(1, added);
                Assert.AreEqual(1, removed);
    
                collection.Add(10);
                collection.Add(20);
    
                Assert.AreEqual(3, added);
                Assert.AreEqual(1, removed);
    
                collection[1] = 3;
            }
    Tuesday, January 19, 2010 4:34 AM

All replies

  • I reported a similar problem: http://social.msdn.microsoft.com/Forums/en-US/rx/thread/8a009bec-b021-4229-834e-21ddb47d2a15

    You could verify that it is the same issue by trying my quick fix I posted there (of course changed for your event type) instead using FromEvent(). If it fixes the delay your code will probably run without delay with the next update of Rx.

    Andreas
    Tuesday, January 12, 2010 6:11 PM
  • The thread almost hitted the bulls eye :)

    I reimplemented the hook to use a version based on your quick-fix:

            private class ObservableCollectionSubject<T> : Subject<IEvent<NotifyCollectionChangedEventArgs>>
            {
                private readonly ObservableCollection<T> mk_Args;
                public ObservableCollectionSubject(ObservableCollection<T> ak_Args)
                {
                    mk_Args = ak_Args;
                    mk_Args.CollectionChanged += OnCollectionChanged;
                }
    
                void OnCollectionChanged(object sender, NotifyCollectionChangedEventArgs e)
                {                
                    OnNext(Event.Create(sender, e));
                }
    
                protected override void Dispose(bool disposing)
                {
                    mk_Args.CollectionChanged -= OnCollectionChanged;
                    base.Dispose(disposing);
                }
            }
    I do not miss the call anymore when doing a Take(1) {Which is good by the way}. However in this particular case, the test code was supposed to be sort of Single Threaded... But the trampoline prevent the reentrancy in the CreateEvent call, generating a situation where the execution path gets screwed up because if implied parallelism (want to control the execution to be continue in the same thread in this case). Tryied using the Now.Schedule but didnt work either. Any idea on how to fix that?

    The test that fails is the following:

            [TestMethod]
            public void ObservableCollectionObserverExtensions()
            {
                var collection = new ObservableCollection<int>();
    
                var addedItemsObserver = collection.GetAddedItems();
                var removedItemsObserver = collection.GetRemovedItems();
    
                int added = 0;
                int removed = 0;
      
                addedItemsObserver.Subscribe(
                    item =>
                    {
                        added++;
                    });
                removedItemsObserver.Subscribe(
                    item =>
                    {
                        removed++;
                    });
    
                collection.Add(5);
                collection.Remove(0);
    
                Assert.AreEqual(1, added);
                Assert.AreEqual(1, removed);
    
                collection.Add(10);
                collection.Add(20);
    
                Assert.AreEqual(3, added);
                Assert.AreEqual(1, removed);
    
                collection[1] = 3;
            }
    Thanks in advance,
    Federico
    Tuesday, January 12, 2010 7:29 PM
  • Testing Rx is fun and I will be explaining how to do this in the future.  For now, some good strategies to use are to use the Now scheduler so that concurrency is limited.  I found a bug in the test code (collection.Remove(0) should be collection.Remove(5)) and if you change the SelectMany to use the overload that returns an Observable then everything is great (because you can specify to use the Scheduler.Now to do the conversion).  We will include a new overload of SelectMany that a scheduler in a future release.  Also, your Take(1) calls didn't do anything at all.

     public static class someextentions
        {
    public static IObservable<T> GetAddedItems<T>(this ObservableCollection<T> collection)
            {
                var notificationObservable = Observable.FromEvent<NotifyCollectionChangedEventHandler, NotifyCollectionChangedEventArgs>(
                    h => new NotifyCollectionChangedEventHandler(h), 
                    handler => collection.CollectionChanged += handler,
                    handler => collection.CollectionChanged -= handler);
    
                var selection = notificationObservable
                    .SelectMany(arg =>
                        {
                            if (arg.EventArgs.Action == NotifyCollectionChangedAction.Add)
                            {
                                return (from newItem in arg.EventArgs.NewItems.Cast<T> ()
                                        select newItem).ToObservable(Scheduler.Now);
                            }
                            return Observable.Empty<T>();
                        });
    
                return selection;
            }
    
    public static IObservable<T> GetRemovedItems<T>(this ObservableCollection<T> collection)
    {
        var notificationObservable = Observable.FromEvent<NotifyCollectionChangedEventHandler, NotifyCollectionChangedEventArgs>(
            h => new NotifyCollectionChangedEventHandler(h),
            handler => collection.CollectionChanged += handler,
            handler => collection.CollectionChanged -= handler);
    
        var selection = notificationObservable
            .SelectMany(arg =>
            {
                if (arg.EventArgs.Action == NotifyCollectionChangedAction.Remove || arg.EventArgs.Action == NotifyCollectionChangedAction.Replace)
                {
                    return (from newItem in arg.EventArgs.OldItems.Cast<T>()
                            select newItem).ToObservable(Scheduler.Now);
                }
                return Observable.Empty<T>();
            });
    
        return selection;
    }
        }
    
        class Program
        {
            static void Main(string[] args)
            {
                var collection = new ObservableCollection<int>();
    
                var addedItemsObserver = collection.GetAddedItems();
                var removedItemsObserver = collection.GetRemovedItems();
    
                int added = 0;
                int removed = 0;
      
                addedItemsObserver.Subscribe(
                    item =>
                    {
                        added++;
                        Assert.AreNotEqual(0, item);
                    });
                removedItemsObserver.Subscribe(
                    item =>
                    {
                        removed++;
                        Assert.AreNotEqual(0, item);
                    });
    
                collection.CollectionChanged += (_, __) => { };
    
                collection.Add(5);
                collection.Remove(5);
    
                addedItemsObserver.Take(1);
                removedItemsObserver.Take(1);
    
                Assert.AreEqual(1, added);
                Assert.AreEqual(1, removed);
    
                collection.Add(10);
                collection.Add(20);
    
                Assert.AreEqual(3, added);
                Assert.AreEqual(1, removed);
    
                collection[1] = 3;
            }
    Tuesday, January 19, 2010 4:34 AM