Proposed Answer Publish() doesn't work

  • Sunday, May 08, 2011 4:57 PM
     
     
    The version of Publish() that takes just one argument (homogeneously-typed subject) doesn't implement it's supposed semantics. I've looked into the code (with some help from ILSpy), and it turns out that this version of Publish simply subscribes the subject to the source RIGHT AWAY, and then returns subject.AsObservable().
    This has a nasty effect of draining any replayed values (if the source has replay semantics) through the subject at the moment of calling Publish, as opposed to the moment of subscribing to it's result, as it's supposed to be.
    In contrast, the other overload of Publish works just as expected, only subscribing to the source once a subscription comes in from downstream.
    To work around this, I had to replace all calls to "x.Publish(y)" with "x.Publish( y, z => z )". This takes care of the problem, but it cost me quite some time to find out what was wrong.
    Am I missing something? Or is it going to be fixed?

All Replies

  • Sunday, May 08, 2011 5:15 PM
     
     
    Ok, on second thought, turns out I was wrong. The other overload of Publish() doesn't work as well. Only it doesn't work in a different way: it actually does as many Multicast()-s as there are subscriptions. Which pretty much defeats the whole idea of Multicast().
  • Monday, May 09, 2011 7:11 AM
     
     

    It would help if you posted some code demonstrating the problem.


    James Miles http://enumeratethis.com
  • Monday, May 09, 2011 7:15 AM
     
     

    Are you using Rx v1.0.10425?


    James Miles http://enumeratethis.com
  • Friday, May 20, 2011 6:30 PM
    Owner
     
     Proposed Answer
    Please send us a repro and we'll have a look. Based on your description we can't repro this issue in the current version of Rx. Thanks!
    using (Microsoft.Sql.Cloud.DataProgrammability.Rx) { Signature.Emit("Bart De Smet"); }
  • Friday, July 01, 2011 7:25 PM
     
      Has Code

    Sorry for the late reply: been kinda busy.

    Here's your repro code:

        static void Main()
        {
          var part1 = Observable.Range( 0, 3 ).Select( x => "abc"[x].ToString() );
          var part2 = Observable.Interval( TimeSpan.FromMilliseconds( 100 ) ).Take( 3 ).Select( x => x.ToString() );
          var source = part1.Concat( part2 );
          var published = source.Publish( new Subject<string>() );
    
          Console.WriteLine( "Published:   " + string.Join( " ", published.ToEnumerable() ) );
          Console.WriteLine( "Non-published: " + string.Join( " ", source.ToEnumerable() ) );
    
          Console.ReadLine();
        }
    
    


    The output of this program is:

          Published:       0 1 2

          Non-published: a b c 0 1 2

     

    That "abc" part gets drained at the moment of calling Publish(), because Publish() creates a subscription right away instead of deferring it until a subscription from downstream arrives. The version of Rx I'm using is 1.0.2838.0, which I have downloaded the day before I posted the question. I see there is a newer version available now. I will try it out.

    Just for reference, here is a replacement for Publish() I've been able to come up with. It works as expected.

        public static IObservable<T> Publish<T>( this IObservable<T> source, ISubject<T> subject )
        {
          Contract.Requires( source != null );
          Contract.Requires( subject != null );
          Contract.Ensures( Contract.Result<IObservable<T>>() != null );
    
          var multi = source.Multicast( subject );
          RefCountDisposable disp = null;
          return Observable.CreateWithDisposable<T>( or =>
            {
              var subscription = multi.Subscribe( or );
    
              IDisposable res = null;
              CompositeDisposable comp = null;
              lock ( multi )
              {
                if ( disp == null )
                {
                  comp = new CompositeDisposable( Disposable.Create( () => { lock ( multi ) disp = null; } ) );
                  res = disp = new RefCountDisposable( comp );
                }
                else res = disp.GetDisposable();
              }
    
              if ( comp != null ) comp.Add( multi.Connect() );
              return new CompositeDisposable( subscription, res );
            } )
            .NeverNull();
        }
    
    


  • Saturday, July 02, 2011 7:30 AM
     
     

    Yes, that bug did exist in 2838. I think you'll find that its fine now.


    James Miles http://enumeratethis.com