none
Are Subjects Actors? RRS feed

Answers

  • You can consider IObservable<out S> as output channels and IObserver<in T> as input channels; so yes, subjects of type ISubject<T,S> are very much like (asynchronous) actors, transforming "messages" of type T to "messages" of type S. 

    So when people ask "Why does E.Meijer Not Like Subjects" they are really ask "Why does E.Meijer not like side-effecting functions".

    That said, it would be nice to convert some examples to Rx
    • Marked as answer by Ryan RileyMVP Tuesday, December 1, 2009 3:10 PM
    Tuesday, December 1, 2009 3:00 PM

All replies

  • You can consider IObservable<out S> as output channels and IObserver<in T> as input channels; so yes, subjects of type ISubject<T,S> are very much like (asynchronous) actors, transforming "messages" of type T to "messages" of type S. 

    So when people ask "Why does E.Meijer Not Like Subjects" they are really ask "Why does E.Meijer not like side-effecting functions".

    That said, it would be nice to convert some examples to Rx
    • Marked as answer by Ryan RileyMVP Tuesday, December 1, 2009 3:10 PM
    Tuesday, December 1, 2009 3:00 PM
  • Does using ISubject<T,S> remove the side-effects present in an ISubject<T>? I don't like side-effecting functions either, thanks to your excellent videos on C9. I also like this approach better than the F# MailboxProcessor Matthew used. It seems cleaner. I'll try to work on converting some of his samples to ISubject<T,S>.
    Tuesday, December 1, 2009 3:10 PM
  • Does using ISubject<T,S> remove the side-effects present in an ISubject<T>? 

    Nope. 
    Tuesday, December 1, 2009 4:57 PM
  • Well, here's my first stab at the Ping Pong example with Rx. Is this close to a correct implementation? Also, when might I be able to try this in F#? Is the complete Rx in F# 1.9.7?

    using System;
    using System.Collections.Generic;
    using System.Linq;
    
    namespace RxPingPong
    {
        class Program
        {
            static void Main(string[] args)
            {
                var ping = new Ping();
                var pong = new Pong();
    
                ping.Subscribe(pong);
                pong.Subscribe(ping);
    
                Console.ReadKey();
            }
        }
    
        class Ping : ISubject<Pong, Ping>
        {
            #region Implementation of IObserver<Pong>
    
            /// <summary>
            /// Notifies the observer of a new value in the sequence.
            /// </summary>
            public void OnNext(Pong value)
            {
                Console.WriteLine("Ping received Pong.");
            }
    
            /// <summary>
            /// Notifies the observer that an exception has occurred.
            /// </summary>
            public void OnError(Exception exception)
            {
                Console.WriteLine("Ping experienced an exception and had to quit playing.");
            }
    
            /// <summary>
            /// Notifies the observer of the end of the sequence.
            /// </summary>
            public void OnCompleted()
            {
                Console.WriteLine("Ping finished.");
            }
    
            #endregion
    
            #region Implementation of IObservable<Ping>
    
            /// <summary>
            /// Subscribes an observer to the observable sequence.
            /// </summary>
            public IDisposable Subscribe(IObserver<Ping> observer)
            {
                return Observable
                    .CreateWithDisposable<Ping>(o =>
                        Observable.Interval(TimeSpan.FromSeconds(2))
                            .Select(value => this)
                            .Subscribe(o))
                    .Subscribe(observer);
            }
    
            #endregion
    
            #region Implementation of IDisposable
    
            /// <summary>
            /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
            /// </summary>
            /// <filterpriority>2</filterpriority>
            public void Dispose()
            {
                // Nothing to dispose, of which I know.
            }
    
            #endregion
        }
    
        class Pong : ISubject<Ping, Pong>
        {
            #region Implementation of IObserver<Ping>
    
            /// <summary>
            /// Notifies the observer of a new value in the sequence.
            /// </summary>
            public void OnNext(Ping value)
            {
                Console.WriteLine("Pong received Ping.");
            }
    
            /// <summary>
            /// Notifies the observer that an exception has occurred.
            /// </summary>
            public void OnError(Exception exception)
            {
                Console.WriteLine("Pong experienced an exception and had to quit playing.");
            }
    
            /// <summary>
            /// Notifies the observer of the end of the sequence.
            /// </summary>
            public void OnCompleted()
            {
                Console.WriteLine("Pong finished.");
            }
    
            #endregion
    
            #region Implementation of IObservable<Pong>
    
            /// <summary>
            /// Subscribes an observer to the observable sequence.
            /// </summary>
            public IDisposable Subscribe(IObserver<Pong> observer)
            {
                return Observable
                    .CreateWithDisposable<Pong>(o =>
                        Observable.Interval(TimeSpan.FromSeconds(1))
                            .Select(value => this)
                            .Subscribe(o))
                    .Subscribe(observer);
            }
    
            #endregion
    
            #region Implementation of IDisposable
    
            /// <summary>
            /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
            /// </summary>
            /// <filterpriority>2</filterpriority>
            public void Dispose()
            {
                // Nothing to dispose, of which I know.
            }
    
            #endregion
        }
    }
    

    Thursday, December 3, 2009 7:04 PM
  • I'm really having a hard time with this thing. I went back to Matthew Podwysocki's article and realized I was doing this all wrong, but now I can't understand to what to subscribe Pong. I should probably use one of the concrete Subject implementations, but I really want to understand how these guys work. Here's my latest with comments denoting the areas in which I'm having problems:

    using System;
    using System.Collections.Generic;
    using System.Linq;
    
    namespace RxPingPong
    {
        /// Simple Ping Pong Actor model using Rx
        /// 
        /// You'll need to install the Reactive Extensions (Rx) for this to work.
        /// You can get the installer from 
        /// 
        class Program
        {
            static void Main(string[] args)
            {
                var pong = new Pong();
                var ping = new Ping(10);
    
                var pongSubscription = ping.Subscribe(pong);
                var pingSubscription = pong.Subscribe(ping);
    
                Console.ReadKey();
    
                pingSubscription.Dispose();
                pongSubscription.Dispose();
            }
        }
    
        class Ping : ISubject
        {
            private readonly int _iterations;
    
            /// 
            /// Initializes a new instance of the  class.
            /// 
            /// The iterations.
            public Ping(int iterations)
            {
                _iterations = iterations;
            }
    
            #region Implementation of IObserver
    
            /// 
            /// Notifies the observer of a new value in the sequence.
            /// 
            public void OnNext(int value)
            {
                Console.WriteLine("Ping received Pong.");
            }
    
            /// 
            /// Notifies the observer that an exception has occurred.
            /// 
            public void OnError(Exception exception)
            {
                Console.WriteLine("Ping experienced an exception and had to quit playing.");
            }
    
            /// 
            /// Notifies the observer of the end of the sequence.
            /// 
            public void OnCompleted()
            {
                Console.WriteLine("Ping finished.");
            }
    
            #endregion
    
            #region Implementation of IObservable
    
            /// 
            /// Subscribes an observer to the observable sequence.
            /// 
            public IDisposable Subscribe(IObserver observer)
            {
                return Enumerable
                    .Range(1, _iterations)
                    .Reverse()
                    .ToObservable()
                    .Asynchronous()
                    .Subscribe(observer);
            }
    
            #endregion
    
            #region Implementation of IDisposable
    
            /// 
            /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
            /// 
            /// 2
            public void Dispose()
            {
                OnCompleted();
            }
    
            #endregion
        }
    
        class Pong : ISubject
        {
            #region Implementation of IObserver
    
            /// 
            /// Notifies the observer of a new value in the sequence.
            /// 
            public void OnNext(int value)
            {
                Console.WriteLine("Pong received Ping.");
                // Need to publish a response.
            }
    
            /// 
            /// Notifies the observer that an exception has occurred.
            /// 
            public void OnError(Exception exception)
            {
                Console.WriteLine("Pong experienced an exception and had to quit playing.");
            }
    
            /// 
            /// Notifies the observer of the end of the sequence.
            /// 
            public void OnCompleted()
            {
                Console.WriteLine("Pong finished.");
            }
    
            #endregion
    
            #region Implementation of IObservable
    
            /// 
            /// Subscribes an observer to the observable sequence.
            /// 
            public IDisposable Subscribe(IObserver observer)
            {
                return Observable.Return(1).Asynchronous().Subscribe(observer);
            }
    
            #endregion
    
            #region Implementation of IDisposable
    
            /// 
            /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
            /// 
            /// 2
            public void Dispose()
            {
                OnCompleted();
            }
    
            #endregion
        }
    }
    

    • Edited by Ryan RileyMVP Friday, December 4, 2009 2:59 PM The code didn't paste in correctly.
    Friday, December 4, 2009 2:58 PM