none
How can i subscribe list on any element added to list?

    질문

  • Hello,

    I need subscribe List<T> on any element added to list. I tried below code but it's not working. (because subscribe completed so initial list is empty)

    How can i implement my problem with rx?


      private void button1_Click(object sender, EventArgs e)
            {
                List<Exception> errors = new List<Exception>();
                var observer = errors.ToObservable();
                observer.Subscribe(err =>
                {
                    _txtSummary.AppendText(Environment.NewLine);
                    _txtSummary.AppendText(err.ToString());
                });
    
                for (int i = 0; i < 1000; i++)
                {
                    Thread.Sleep(100);
                    if (i % 3 == 0)
                        errors.Add(new Exception("yov"));
                }
            }


    Oğuzhan Eren

    2012년 2월 9일 목요일 오후 3:03

모든 응답

  • I solve my problem (except: illegal cross thread error) like below but i don't like this solution.  (SelectMany with Interval)

     private void button1_Click(object sender, EventArgs e)
            {
                List<exception> errors = new List<exception>();
                var observer = from t in Observable.Interval(TimeSpan.FromMilliseconds(100))
                               from item in errors.ToObservable()
                               select item;
                
                observer.Subscribe(err =>
                {
                    _txtSummary.AppendText(Environment.NewLine);
                    _txtSummary.AppendText(err.ToString());
                });
    
                for (int i = 0; i < 1000; i++)
                {
                    Thread.Sleep(100);
                    if (i % 3 == 0)
                        errors.Add(new Exception("yov"));
                }
            }</exception></exception>
    I need solve illegal cross thread error now?


    Oğuzhan Eren


    • 편집됨 oguzh4n 2012년 2월 9일 목요일 오후 3:13
    2012년 2월 9일 목요일 오후 3:12
  • Hi Oğuzhan,

    As it seems you're aware now, List<T> isn't reactive.  The ToObservable method does not listen for new items to be added because List<T> doesn't provide any way of doing that.

    Your second attempt reads the entire list at intervals on a pooled thread.  I don't think this meets your expectations though, because it will cause duplicate errors in the observable sequence.  Regardless, the reason for the cross-thread exception is that you are attempting to use a UI control (_txtSummary) on a pooled thread, which isn't allowed.  You can solve this by adding the ObserveOnDispatcher operator before calling Subscribe.

    Instead, try using ObservableCollection<T>.  It provides a CollectionChanged event that can be converted into an observable using Observable.FromEventPattern.

    - Dave


    http://davesexton.com/blog

    2012년 2월 9일 목요일 오후 5:39
  • I implemented this but it's not working:

    Additionally, how can i convert any collection (Dictionary,ConcurrentDictionary,Stack...) to Observable collection

    ObservableCollection<Exception> errors = new ObservableCollection<Exception>();
                var observable = Observable
                    .FromEventPattern<NotifyCollectionChangedEventHandler, NotifyCollectionChangedEventArgs>(h => errors.CollectionChanged += h, h => errors.CollectionChanged -= h);
    
    
                var sub = observable.SubscribeOnDispatcher().Subscribe(ev =>
                 {
                     foreach (var item in ev.EventArgs.NewItems)
                     {
                         _txtSummary.AppendText(item.ToString());
                     }
                 });
    
                for (int i = 0; i < 100; i++)
                {
                    Thread.Sleep(100);
                    errors.Add(new Exception(i.ToString()));
                }


    Oğuzhan Eren

    2012년 2월 10일 금요일 오전 8:58
  • Hi Oğuzhan,

    Use ObserveOnDispatcher, not SubscribeOnDispatcher.

    http://msdn.microsoft.com/en-us/library/hh242963(v=vs.103).aspx

    - Dave


    http://davesexton.com/blog

    2012년 2월 10일 금요일 오후 4:28
  • mabye you can consider the following direction.

    inherit the List<T> and meke it Observable<T>

        

    publicclassObservableList<T> : List<T>, IObservable<T>     {         privateISubject<T> _subject = newReplaySubject<T>();         publicnewvoid Add(T item)         {             base.Add(item);             _subject.OnNext(item);         }         publicnewvoid AddRange(IEnumerable<T> collection)         {             base.AddRange(collection);             foreach (var item in collection)             {                 _subject.OnNext(item);             }                     }         publicIDisposable Subscribe(IObserver<T> observer)         {             return _subject.Subscribe(observer);         }     }

    Notice that the Add and AddRange are not virtual method therefore using the base class may be tricky.

    now the following code will work

                ObservableList<Exception> errors = new ObservableList<Exception>();
                errors.Subscribe(err =>
                {
                    Trace.WriteLine(err.ToString());
                });
     
                for (int i = 0; i < 100; i++)
                {
                    Thread.Sleep(100);
                    if (i % 3 == 0)
                        errors.Add(new Exception("yov"));
                }

    Bnaya Eshet

    2012년 2월 16일 목요일 오전 6:06