none
Homework: Find the Call/CC operator

    Question

  • I asked about Call with Current Continuation, or call/cc, on C9 Live today, and Erik mentioned that it was in the box with Rx. My guess, without looking very thoroughly, is that it's the Yield() operator. I'll take a closer look, but that seems the most likely option. :) And no, I'm not one of Erik's former students, I'm a current one, via Channel 9 Lectures. Thanks, Erik! And a big thanks to Wes and the rest of the Rx team, too. This looks fantastic! I can't wait to dive in!
    • Changed type Ryan RileyMVP Wednesday, November 18, 2009 1:56 AM It's sort of a question, but it's much more a response to Erik's challenge on @ch9live today.
    • Edited by Ryan RileyMVP Tuesday, November 24, 2009 4:59 AM Changed 'Learning' to 'Lectures'
    • Changed type Ryan RileyMVP Tuesday, January 19, 2010 1:50 PM This has really become a question. :)
    Wednesday, November 18, 2009 1:56 AM

Answers

  • Observable.Create and Observable.CreateWithDisposable.

    Guess #5. Good thing this is a bounded problem. :)

           -Steve
    Programming blog: http://nitoprograms.blogspot.com/
      Including my TCP/IP .NET Sockets FAQ

    Microsoft Certified Professional Developer
    • Marked as answer by Ryan RileyMVP Tuesday, January 19, 2010 1:51 PM
    Wednesday, November 25, 2009 4:55 PM
  • OnCompleted() is exit, or the "current continuation." Well, technically, I suppose you need the 1-2 combination of OnNext() followed by OnCompleted(); otherwise you won't return anything. So the IObserver<T> is the current continuation, though that concept is different than other implementations such as Haskell.

    The 1-2 OnNext-OnCompleted calls can be condesed into Observable.Return<T>. Most likely that is why it is called return. So it would look something like:


    var main =
        Observable.Start(() => Console.WriteLine("What's your name?"))
        .SelectMany(_ => Observable.Start<string>(Console.ReadLine))
        .SelectMany(name => Observable.CreateWithDisposable<string>(exit =>
            {
                validateName(name, exit);
                return Observable
                    .Return("Nice to meet you, " + name + "!")
                    .Subscribe(exit);
            }))
        .Subscribe(response => Console.WriteLine(response));

    I also agree that we will likely not get a 1-to-1 mapping of C# to something like Haskell simply because the syntax may be different, or because first-class functions are something that C# doesn't have.
    • Edited by McAravey Thursday, January 14, 2010 7:04 PM Can't spell today.
    • Marked as answer by Ryan RileyMVP Tuesday, January 19, 2010 1:54 PM
    Thursday, January 14, 2010 7:03 PM
  • Try giving the ValidateName function this type instead:

    IObservable<Unit> ValidateName(string name, IObserver<string> exit);

    And try changing WhatsYourName just a bit to simplify it...

    So how about something like this:

    static void Main(string[] args)
    {
        do
        {
            var result = from what in Observable.Start(() => Console.WriteLine("What is your name?"))
                         from read in Observable.Start(() => Console.ReadLine())
                         from name in WhatIsYourName(read)
                         from write in Observable.Start(() => Console.WriteLine(name))
                         select write;
    
            result.Subscribe();
        } while (Console.ReadKey().Key != ConsoleKey.Escape);
    }
    
    private static IObservable<string> WhatIsYourName(string name)
    {
        return Observable.CreateWithDisposable<string>(exit =>
             {
                 var result = from unit in ValidateName(name, exit)
                              from message in Observable.Return("Welcome, " + name + "!")
                              select message;
                 return result.Subscribe(exit);
             });
    }
    
    private static IObservable<Unit> ValidateName(string name, IObserver<string> exit)
    {
        return Observable.CreateWithDisposable<Unit>(observer =>
            {
                if (string.IsNullOrEmpty(name))
                {
                    return Observable.Return("You forgot to tell me your name.").Subscribe(exit);
                }
    
                return Observable.Return(new Unit()).Subscribe(observer);
            });
    }
    • Marked as answer by Ryan RileyMVP Tuesday, January 19, 2010 7:43 PM
    Tuesday, January 19, 2010 6:53 PM
  • Standard ML of New Jersey
    val callcc : ('a cont -> 'a) -> 'a
    callcc f : Apply f to the "current continuation". If f invokes this continuation with argument x, it is as if (callcc f) had returned x as a result.

    That is the case with Create. If f in Create invokes Observable.Return(x).Subscribe(observer), it is as if Create had returned Observable.Return(x).

    • Marked as answer by Ryan RileyMVP Thursday, September 08, 2011 11:56 PM
    Monday, February 01, 2010 7:03 AM

All replies

  • Nope. That is not the one ;-)
    Wednesday, November 18, 2009 5:29 PM
    Owner
  • I'm not familiar with call/cc, and the last time I used a functional language was... er... more than a decade ago. :)

    But here's my guess: the Aggregate overload that takes TAccumulate.

           -Steve
    Programming blog: http://nitoprograms.blogspot.com/
      Including my TCP/IP .NET Sockets FAQ

    Microsoft Certified Professional Developer
    Wednesday, November 18, 2009 6:06 PM
  • Well, that was just an initial guess. After looking through the signatures and names, my second guess is Finally() or Then() (two since I'm really not certain :). My doubt here is that it seems like it still uses CPS, which you mentioned is no longer necessary. I don't see what I was really expecting in the form of what may be found in a language like Ruby:

    # imperative in Ruby, blocking
    a
    b
    c
    
    # CPS in Ruby, not blocking
    call a do
      call b do
        call c
      end
    end
    
    #callcc in Ruby, not blocking
    callcc a
    callcc b
    callcc c

    Obviously, this will hang off of a LINQ exprssion, so it seems Finally() or Then() is the way to go. How can I dig into the docs better to get more info on each of the operators? I'm still looking. I won't give up easily! If only I had more time to concentrate ... dang work! :-P
    Thursday, November 19, 2009 3:13 PM
  • I think our bosses should let us have the week off to play with Rx. ;)

    I'm also looking forward to more complete docs, especially conceptual docs (what's a Notification? what is meant by Publish? what is a Subject? why are PropertyGetter/PropertySetter necessary? what is Unit?).

    In the meantime, I've found that (for some operators) it can be helpful to understand the IEnumerable equivalent before trying to understand the IObservable. This is probably just because IEnumerable is very familiar, so it's not difficult to grasp new operators over a known set. IObservable, on the other hand, is still somewhat foreign, so we're trying to understand new operators in the context of an unfamiliar set. Besides, System.Interactive looks much better in Reflector than System.Reactive. ;)

    As an example, a month or so ago, I was struggling with the whole "Let" idea in the drag/drop example on Erik Meijer's slides (my notes here). I understood the concept somewhat, but it had been forever since I'd done functional, and I wasn't able to wrap my head entirely around how to tell when it's necessary. Then I found an excellent blog post here that implemented (and thoroughly explained) Let for IEnumerable (which Rx also has). Once I understood Let for IEnumerables, I found the concept translated well to IObservables, and I was finally enlightened. :)

           -Steve


    Programming blog: http://nitoprograms.blogspot.com/
      Including my TCP/IP .NET Sockets FAQ

    Microsoft Certified Professional Developer
    Thursday, November 19, 2009 3:58 PM
  •  > it can be helpful to understand the IEnumerable equivalent before trying to understand the IObservable.

    Most definitively. And often an easy way to write a little test to find out what an operator does is to use ToObservable and ToEnumerable.
    Thursday, November 19, 2009 4:19 PM
    Owner
  •   var xs = new[] { 5, 10, 15, 20, 25, 30 }.ToObservable();
                
                var ys = new[] { 31, 32 }.ToObservable()
                    .Concat(Observable.Throw<int>(new ArithmeticException()))
                    .Concat(new [] { 33, 34 }.ToObservable());
    
                var zs = xs.Select(x => x * 2); //.Concat(Observable.Throw<int>(new ArithmeticException()));
    
                var obs = new int[]{}.ToObservable()
                    .OnErrorResumeNext(xs)
                    .OnErrorResumeNext(ys)
                    .OnErrorResumeNext(zs);
    
                obs.Subscribe((t) => this.textBox2.Text += t.ToString() + " ");
    This is send 5 10 15 20 25 30 31 32 10 20 30 40 50 60 to textBox2.Text.  If I Concat another Observable.Throw<int>(new Exception()) to the end of zs, then the exception will be thrown, but if the exception is thrown in the middle it just continues.  I am guessing that this means you can do a query over a IObservable<Func>> and maybe that leads to call/cc? 
    Thursday, November 19, 2009 5:48 PM
  • OnErrorResumeNext is pretty fun, but it isn't CallCC.  Keep looking ;)

    Note that the process of discovering CallCC is decidable and has bounded time.  So don't lose hope.
    Thursday, November 19, 2009 5:57 PM
  • Publish?

          -Steve
    Programming blog: http://nitoprograms.blogspot.com/
      Including my TCP/IP .NET Sockets FAQ

    Microsoft Certified Professional Developer
    Thursday, November 19, 2009 6:34 PM
  • It has to be Let, because it takes as a parameter, a Func<T, TResult>, and it has to take a function as a parameter, if I understand correctly?
    Saturday, November 21, 2009 12:24 AM
  • It is neither Publish nor Let.  Keep looking...
    Saturday, November 21, 2009 5:49 AM
  • LOL, this is getting embarrassing. 
    Saturday, November 21, 2009 6:01 AM
  • Given the definition of implementation of Amb provided by links in the Amb thread, I must assume that it's either Materialize or Dematerialize. I finally realized that I'm looking for a definition of a continuation that follows CPS and that I don't actually know what the signature of CallCC really is, which makes it difficult to identify. :)

    If it's not one of those, then my guess is Defer. I would actually pick this one first but for the definition of Amb using CallCC.

    It's Replay. A re-read of the Amb article seemed to make it pretty clear. ReplaySubject is the "call site." (And if I have goofed by striking out the above, I'll kick myself. :)

    • Edited by Ryan RileyMVP Saturday, November 21, 2009 5:48 PM Expanded my answer.
    Saturday, November 21, 2009 5:27 PM
  • I now think it's Defer. It's re-invoked on subscription, and it would carry its continuation state implicitly inside the lambda.

         -Steve
    Programming blog: http://nitoprograms.blogspot.com/
      Including my TCP/IP .NET Sockets FAQ

    Microsoft Certified Professional Developer
    Saturday, November 21, 2009 11:31 PM
  • You are closer in one of the other threads where you replied ......
    Sunday, November 22, 2009 4:12 AM
    Owner
  • It's not FromAsyncPattern, is it?
    Sunday, November 22, 2009 5:03 AM
  • Is it just Notification? I tried looking at it in Reflector, and it made my head hurt. :) The Interactive version had an AmbHelper that I thought would be the answer, but I don't see a similar type within Reactive except the compiler generated <Amb>b_db<TSource>, whatever that is. The lack of consistency in the helper leads me to think that it has to be Notification.

    Edit: If I need to pick a method, I'll pick the Accept method on Notification. Here's hoping...!
    • Edited by Ryan RileyMVP Sunday, November 22, 2009 11:10 PM Added to the response.
    Sunday, November 22, 2009 10:44 PM
  • Notification can't be right, since it's what makes up the Materialize and Dematerialize methods, which was wrong above. Digging around some more, I noticed Amb returns the result of Combine. Combine is internal but looks very much like AmbHelper and is used in System.Reactive. Also, reading (and trying to understand) call-with-current-continuation from Scheme lead me to the idea that call/cc is really a selector of sorts for determining control flow. Combine seems to do just that. CombineLatest is the public method, so it must be the publicly exposed call/cc.
    Monday, November 23, 2009 1:48 AM
  • OK, I'm going to go with Scan, then (with TAccumulate).

    This is my fourth guess. :)

          -Steve
    Programming blog: http://nitoprograms.blogspot.com/
      Including my TCP/IP .NET Sockets FAQ

    Microsoft Certified Professional Developer
    Monday, November 23, 2009 3:12 AM
  • The delay in the response and my recent experiments seem to indicate that I am still wrong. I think that NextValue looks promising. However, I am not sure of the differences among Latest, MostRecent, and Next and their respective Value methods. Is a Rx video on thee methods on the menu?
    Tuesday, November 24, 2009 11:26 PM
  • It is not scan.  It is not Notification.  Keep looking...
    Tuesday, November 24, 2009 11:34 PM
  • Record.

    Wednesday, November 25, 2009 3:49 PM
  • Observable.Create and Observable.CreateWithDisposable.

    Guess #5. Good thing this is a bounded problem. :)

           -Steve
    Programming blog: http://nitoprograms.blogspot.com/
      Including my TCP/IP .NET Sockets FAQ

    Microsoft Certified Professional Developer
    • Marked as answer by Ryan RileyMVP Tuesday, January 19, 2010 1:51 PM
    Wednesday, November 25, 2009 4:55 PM
  • I'm thinking it's the Subscribe call, since you can pass in functions that will be called later when the async operation is complete.
    Wednesday, November 25, 2009 6:22 PM
  • I had thought about Subscribe before also, but that seems like CPS to me.
    Wednesday, November 25, 2009 9:27 PM
  • What about Latest or LatestValue?
    Wednesday, November 25, 2009 9:30 PM
  • I'm going with Richard Hein's "Record" suggestion above.

    A continuation in RX is invoked at the point that OnNext() is called. Therefore we're looking for something that would return something that we can call OnNext on, which is what Record allows us to do.

    Any thoughts on this reasoning?

    Wednesday, November 25, 2009 10:29 PM
  • Bingo! Now in order to earn lunch with the Rx team you'll have to explain why it is Create :->
    • Marked as answer by Ryan RileyMVP Tuesday, January 19, 2010 1:51 PM
    • Unmarked as answer by Ryan RileyMVP Thursday, September 08, 2011 11:57 PM
    Wednesday, November 25, 2009 11:20 PM
    Owner
  • The fact is that it was a guess. Bear in mind that I do *not* have a good understanding of call/cc, I do have a four-month-old (corollary: I do not get enough sleep), and my brain hurts. For these reasons, what follows may not make any sense at all. ;)

    I guessed Create mainly because it has the right signature. It returns an IObservable and takes a subscription delegate. The subscription delegate is logically an IObservable implementation, so the function signature is logically equivalent to a function taking a delegate (IObservable implementation) and returning an IObservable (interface).

    The subscription delegate is invoked each time an IObserver subscribes, and that's like calling the continuation. I think.

           -Steve


    Programming blog: http://nitoprograms.blogspot.com/
      Including my TCP/IP .NET Sockets FAQ

    Microsoft Certified Professional Developer
    Thursday, November 26, 2009 5:17 AM
  • I guessed Create mainly because it has the right signature. It returns an IObservable and takes a subscription delegate. The subscription delegate is logically an IObservable implementation, so the function signature is logically equivalent to a function taking a delegate (IObservable implementation) and returning an IObservable (interface).

    The subscription delegate is invoked each time an IObserver subscribes, and that's like calling the continuation. I think.

    I watched the Expert to Expert video on Rx again, and at about 53:50 Brian and Eric start talking about the "essence of computation" ... in C# it's S1 ; S2 where ; acts as the bind operator.  S1 can stop or terminate (OnDone), throw an exception (OnError) or produce a value by "side-effecting the global state" (OnNext).  IObserver<T> is a continuation monad, and "can model any sequential control flow", including ;.  I'm still trying to absorb all the ideas, obviously, but there's some pieces of the puzzle.

    So Create is taking a continuation monad as a parameter.  Call/CC would have to as well.  I didn't get that IObserver<T> was the continuation monad, by itself, until I reviewed the video.  Why shouldn't it be called IContinuationMonad then, if that's what it really is?  ;)  Half-joking about that.


    Thursday, November 26, 2009 12:34 PM
  • I hope you guys will put a video on how Create implements Call/CC because right now trying to figure this out is a mentally blocking operation for me.  :)
    Thursday, November 26, 2009 4:24 PM
  • I completely ignored Create b/c of its name. :( So far, I can't see how Create does anything different than .ToObservable in most scenarios. However, it fulfills Call/CC in it's callback Action when it is disposed. Yet I can't get it to work in my test scenario. What am I doing wrong?

    using System;
    using System.Collections.Generic;
    using System.Linq;
    
    namespace RxSamples
    {
        class Program
        {
            static void Main(string[] args)
            {
                // Imperative
                Console.WriteLine("Starting imperative sample...\n\n");
    
                Console.WriteLine("Really delete?");
                var response = Console.ReadLine() ?? "NO";
                if (response.ToUpperInvariant() == "YES")
                {
                    Console.WriteLine("Deleted!");
                }
                else
                {
                    Console.WriteLine("Deleted action aborted.");
                }
    
                // CPS
                Console.WriteLine("\n\nStarting CPS sample...\n\n");
                var cpsDisposable =
                    new YesNoMessage("Really delete?").Render()
                        .Subscribe(
                        result =>
                            {
                                if (result)
                                    new InfoMessage("Deleted!").Render().Subscribe().Dispose();
                                else
                                    new InfoMessage("Deleted action aborted.").Render().Subscribe().Dispose();
                            });
    
                Console.ReadKey();
                cpsDisposable.Dispose();
    
                // Call/CC - Doesn't compile or work. I know I'm doing this wrong, but I don't know exactly what.
                Console.WriteLine("\n\nStarting Call/CC sample...\n\n");
                if (Observable.Create<bool>(o => new YesNoMessage("Really delete?").Render().Subscribe()))
                    Observable.Create<Unit>(o => new InfoMessage("Deleted!").Render().Subscribe());
                else
                    Observable.Create<Unit>(o => new InfoMessage("Deleted action aborted.").Render().Subscribe());
    
                Console.ReadKey();
            }
        }
    
        class InfoMessage
        {
            protected readonly string _message;
    
            public InfoMessage(string message)
            {
                _message = message;
            }
    
            public virtual IObservable<bool> Render()
            {
                Console.WriteLine(_message);
                return UserResponses().ToObservable()
                    .Select(response => true);
            }
    
            protected IEnumerable<string> UserResponses()
            {
                while(true)
                {
                    yield return Console.ReadLine();
                }
            }
        }
    
        class YesNoMessage : InfoMessage
        {
            public YesNoMessage(string message) : base(message)
            {
            }
    
            public override IObservable<bool> Render()
            {
                Console.WriteLine(_message);
                return UserResponses().ToObservable()
                    .Select(response => response.ToUpperInvariant() == "YES" ? true : false);
            }
        }
    }
    

    Is my problem that I'm trying to return something from Subscribe or the Observable? Is there some other way to return a value? I have Create() in the Call/CC version, but those are really the CreateWithObservable. I couldn't get anything to return an Action. Everything reported an error of trying to return a lambda. If I can't return something out of Create (besides the observable), how is that not still CPS?
    • Edited by Ryan RileyMVP Thursday, November 26, 2009 6:36 PM Added an additional question.
    Thursday, November 26, 2009 6:33 PM
  • Is my problem that I'm trying to return something from Subscribe or the Observable? Is there some other way to return a value? I have Create() in the Call/CC version, but those are really the CreateWithObservable. I couldn't get anything to return an Action. Everything reported an error of trying to return a lambda. If I can't return something out of Create (besides the observable), how is that not still CPS?

    You have to Subscribe to get something out of Create ... or pass it to a variable or something.  I got it to work but I am sure there is a better way.  Here's what I tried:

                Console.WriteLine("\n\nStarting Call/CC sample...\n\n");
                var observable = Observable.Create<bool>(o =>
                {
                    new YesNoMessage("Really delete?").Render()
                        .Subscribe(r =>
                        {
                            var o2 = Observable.Create<Unit>(od =>
                            {
                                if (r) {
                                    new InfoMessage("Deleted!").Render();
                                } else {
                                    new InfoMessage("Deleted action aborted.").Render();
                                }
                                return () => { };
                            });
    
                            o2.Subscribe();
                        }
                    );
                    return () => { };
                });
    
                observable.Subscribe(s => Console.WriteLine(s));
    
                Console.ReadKey();
    • Edited by Richard Hein Thursday, November 26, 2009 8:16 PM removed redundant bits
    Thursday, November 26, 2009 8:10 PM
  • Subscribe is how you do continuation passing style. Since you must use Subscribe, I don't understand how Create is Call/CC. Create creates an anonymous Observable, which is also odd because it does so from a subscriber. If I got back the subscribed-to observable with a way to execute the observer, that might make sense. Also, if the observable was already executing because of the function I passed in so that I could just dispose it and continue, that would make sense, too. I suppose if the InfoMessage type implemented IObserver, I could create the message and pass it into Create, but I would still need to call Subscribe, which seems to be CPS. In the case above, I would rather stick with the CPS approach rather than go with the call/cc approach, which looks horrendous in the working example. Call/cc should simplify the code and make it appear more imperative since it turns the existing call site into the continuation (observer), unless I grossly misunderstand the def of call/cc.
    Thursday, November 26, 2009 10:20 PM
  • Right... I've implemented what I think is a call/cc example in ASP.NET . It's a simple handler that asks for one number (one form) which is posted back, then asks for a second number which is posted back, and then displays the sum. The essence I had liked to have achieved was this, but I fell somewhat short of this target:

    public void AskForTwoIntegersAndDisplaySum()
    {
        var q =
            from r1 in GetInteger("Please enter first number")
            from r2 in GetInteger("Please enter second number")
            select r1 + r2
    
        q.Subscribe(answer =>
        {
            HttpContext.Current.Response.Write("The sum of the first and second number is: " + answer);
        });
    }

    This is what I ended up with in the end:

    public class AskForTwoIntegersAndDisplaySumHandler : IHttpHandler
    {
        // The essence ...  (well, it's a bit of a comprise really, weaving through HttpContext made it more messy than what I had in mind)
        public void AskForTwoIntegersAndDisplaySum()
        {
            var q =
                from _ in Observable.Empty<HttpContext>().StartWith(HttpContext.Current)
                from r1 in GetInteger(_, "Please enter first number")
                let number1 = r1.Value
                let c1 = r1.TheContext
                from r2 in GetInteger(c1, "Please enter second number")
                let number2 = r2.Value
                let c2 = r2.TheContext
                select new { Value = number1 + number2, TheContext = c2 };
    
            q.Subscribe(answer =>
            {
                answer.TheContext.Response.Write("The sum of the first and second number is: " + answer.Value);
            });
        }
    
        // The nitty gritty ...
        private static List<IObserver<HttpContext>> Continuations = new List<IObserver<HttpContext>>();
    
        private HttpRequest Request;
        private HttpResponse Response;
        
        public void ProcessRequest(HttpContext context)
        {
            Request = context.Request;
            Response = context.Response;
    
            Response.ContentType = "text/html";
    
            if (Request["continuationid"] == null) {
                AskForTwoIntegersAndDisplaySum();
            } else {
                Continuations[int.Parse(Request["continuationid"])].OnNext(context);
            }
    
            Thread.Sleep(500); // HACK
        }
    
        public static IObservable<Context<int>> GetInteger(HttpContext context, string prompt)
        {
            var sb = new StringBuilder();
            sb.Append("<form>");
            sb.Append("<input type=\"hidden\" name=\"continuationid\" value=\"" + (Continuations.Count) + "\" />");
            sb.Append(prompt + ": <input type=\"text\" name=\"GetInteger\" />");
            sb.Append("<br /><input type=\"submit\" value=\"Submit\" />");
            sb.Append("</form>");
            context.Response.Write(sb.ToString());
    
            return Observable.Create<HttpContext>(observer =>
            {
                Continuations.Add(observer);
    
                return () => {};
            }).Select(cntxt => {
                return new Context<int>(cntxt, int.Parse(cntxt.Request["GetInteger"]));
            });
        }
    
        public bool IsReusable
        {
            get
            {
                return false;
            }
        }
    
        public class Context<T>
        {
            public Context(HttpContext context, T value)
            {
                this.TheContext = context;
                this.Value = value;
            }
    
            public HttpContext TheContext { get; private set; }
            public T Value { get; private set; }
        }
    }

    Looking forward to your comments/thoughts on this.

    You'll notice some pretty horrid kludges.
     1. Weaving through HttpContext is an eye sore, any better ways of achieving this?
     2. Because the RX continuations execute on a different thread, I'm having to resort to Thread.Sleep hack; I guess I want some kind of SynchronisationContext??? Any thoughts on this appreciated.
    Thursday, November 26, 2009 11:41 PM
  • Bingo! Now in order to earn lunch with the Rx team you'll have to explain why it is Create :->

    To quote Wikipedia: “Taking a function f as its only argument, call/cc takes the current continuation (i.e., a "snapshot" of the current control context or control state of the program) as an object and applies f to it”.

    So in our case the Create function “captures” the current state of the program, and will run (or apply) the subscription function to that context. Meaning that thing like closures in the function have access to the ambient world as it was at the time we called Create. And all this wrapped up in a nice continuation monad that we can execute as often as we want.

    Hopefully I didn’t overtly lie to everyone, but that is what I have been trying to learn over the last couple of days.

     

    -Samuel McAravey

    Saturday, November 28, 2009 9:11 PM
  • I finally found a good definition of Call/CC in Mark Lillibridge's article "Unchecked Exceptions can be Strictly More Powerful than Call/CC".

    call/cc : ∀α:Ω. ((∀u:Ω.α→u)→α)→α
    The above is quite difficult to read, but the gist is that it takes a function that takes a continuation -- IObserver<α> in this case, defined as an α → () -- then returns the α back to the call site. (Please correct me if I'm wrong on this; I'm still trying to grasp it and can't quite read Fω completely.) Here's my dilemma: this returns an α whereas Observable.Create returns an IObservable<α>. So we have three options:
    1. We are stuck with a continuation passing style using Subscribe on the returned IObservable, which doesn't make sense if Observable.Create == Call/CC;
    2. The entire call site must be defined within the Observable.Create call and Subscribed to execute, which could work but would likely be very ugly;
    3. We must use NextValue().Get() to retrieve the value from the observable, as I have done below.

    I think the intent is number 2 above, but I would really need to see some examples to understand how that will work. I took a shot at it by trying to modify the Call/CC sample below, but it gets really ugly really fast, and I still don't have a good way of retrieving a value without the use of pre-defining a local variable and using a closure, as Xenocide described. The definition of Call/CC provided by Xenocide is what I expected to find, but I don't see that in Observable.Create. What am I missing? Anyway, here's a working sample (though I have some odd ReadKey behavior I'm still trying to sort through):

    using System;
    using System.Collections.Generic;
    using System.Linq;
    
    namespace RxSamples
    {
        class Program
        {
            static void Main(string[] args)
            {
                // Imperative
                Console.WriteLine("Starting imperative sample...\n\n");
    
                Console.WriteLine("Really delete?");
                var response = Console.ReadLine() ?? "NO";
                if (response.ToUpperInvariant() == "YES")
                {
                    Console.WriteLine("Deleted!");
                }
                else
                {
                    Console.WriteLine("Deleted action aborted.");
                }
    
                // CPS
                Console.WriteLine("\n\nStarting CPS sample...\n\n");
                var cpsDisposable =
                    new YesNoMessage("Really delete?").Render()
                        .Subscribe(
                        result =>
                            {
                                if (result)
                                    new InfoMessage("Deleted!").Render().Subscribe().Dispose();
                                else
                                    new InfoMessage("Deleted action aborted.").Render().Subscribe().Dispose();
                            });
    
                Console.ReadKey();
                cpsDisposable.Dispose();
    
                // Call/CC 1
                Console.WriteLine("\n\nStarting Call/CC sample...\n\n");
                if (Observable.CreateWithDisposable<bool>(observer => new YesNoMessage("Really delete?").Render().Subscribe(observer)).NextValue().Get())
                    Observable.CreateWithDisposable<bool>(observer => new InfoMessage("Deleted!").Render().Subscribe(observer)).NextValue().Get();
                else
                    Observable.CreateWithDisposable<bool>(observer => new InfoMessage("Deleted action aborted.").Render().Subscribe(observer)).NextValue().Get();
    
                Console.ReadKey();
            }
        }
    
        class InfoMessage
        {
            protected readonly string _message;
    
            public InfoMessage(string message)
            {
                _message = message;
            }
    
            public virtual IObservable<bool> Render()
            {
                Console.WriteLine(_message);
                return UserResponses().ToObservable()
                    .Select(response => true);
            }
    
            protected IEnumerable<string> UserResponses()
            {
                while(true)
                {
                    yield return Console.ReadLine();
                }
            }
        }
    
        class YesNoMessage : InfoMessage
        {
            public YesNoMessage(string message) : base(message)
            {
            }
    
            public override IObservable<bool> Render()
            {
                Console.WriteLine(_message);
                return UserResponses().ToObservable()
                    .Select(response => response.ToUpperInvariant() == "YES" ? true : false);
            }
        }
    }
    

    • Edited by Ryan RileyMVP Sunday, November 29, 2009 4:25 AM I updated my definition of the call/cc signature.
    Sunday, November 29, 2009 12:43 AM
  • Bingo! Now in order to earn lunch with the Rx team you'll have to explain why it is Create :->

    To quote Wikipedia: “Taking a function f as its only argument, call/cc takes the current continuation (i.e., a "snapshot" of the current control context or control state of the program) as an object and applies f to it”.

    So in our case the Create function “captures” the current state of the program, and will run (or apply) the subscription function to that context. Meaning that thing like closures in the function have access to the ambient world as it was at the time we called Create. And all this wrapped up in a nice continuation monad that we can execute as often as we want.

    Ah! I think I finally see it. The observer in the Observable.Create(observer => ...) is the current call site! (Or is that not correct?) That's why my sample is now working, though I wonder why I'm getting the strange Semaphore behavior. I'm sure I'm doing something wrong. :(
    • Edited by Ryan RileyMVP Monday, November 30, 2009 6:42 PM I'm now questioning whether the passed in observer is the current call site.
    Sunday, November 29, 2009 4:33 AM
  • It has been a while, but I am quite curious to find out the answer to this one. It seems that we were all wrong on this one, so is there an answer?
    Monday, December 21, 2009 1:40 AM
  • My answer, why Create is the Call-with-current-continuation:
    Observable.Create set the function subscribe as the continuation of the function IObservable.Subscribe(observer)

    Observable.Create<T>(subscribe).Subscribe(observer);

    The subscribe continuation jumps back (from the right IObservable<T>.Subscribe to the left continuation subscribe).

    In Rx there are other continuations: OnNext, OnError and OnCompleted. These continuations are going forward, from the left function to the right continuation.

    Monday, December 21, 2009 11:51 AM
  • Hi,

    Wes has a blog post that is pretty helpful at describing CPS, with a line or two about call/cc as well.  I'm still a novice in the functional world but I think it helped my understanding of this stuff tremendously.

    http://blogs.msdn.com/wesdyer/archive/2007/12/22/continuation-passing-style.aspx 

    My understanding of Create as a call/cc function:

    I don't think Subscribe is a requirement.  Any continuation would do.  Create is a call/cc function because it takes a function that can be invoked with whatever continuation appears after it.  (The key being after - the observable that is being created is called with whatever continuations are attached later.)

    The actual continuation could be Subscribe, or it could be any other Rx extension function tacked on to the observable; e.g., filters, projections.  I think whatever code is executed by any observers is also part of the continuation.

    But none of those are necessary.  Perhaps you create an observable that is ready to be called with the current continuation, but there is none, so its marbles just drop off the edge :).  Or perhaps you specify continuations but never start observing; i.e., the function to be called with the current continuation is never called.  Either case doesn't make Create any less of a call/cc implementation.  It's just that the call has no continuation or is never executed in the first place, respectively.

    Or perhaps the continuation changes (a benefit of higher-order functions, especially in CPS it seems).  In that case you can execute the same function with different continuations; i.e., start with one observer, dispose it, and then attach another.

    The glaring difference between CPS and call/cc appears to be that a CPS function will accept any continuation, while a call/cc function will accept a function that accepts the current continuation, whatever it may be; furthermore, the continuation in CPS is explicitly provided to the function so it must be defined before the call, whereas the continuation in call/cc is defined after the call, just like imperative style programming.

    Hope all of this isn't just BS ;)

    - Dave
    http://davesexton.com/blog
    • Edited by Dave Sexton Saturday, January 02, 2010 7:45 AM fixed typo
    Saturday, January 02, 2010 7:43 AM
  • Dave’s hint and Wes Dyer continuation-passing-style blog was very helpful.
    With this information my understanding of Create as a call/cc function is now:
    IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, Action> subscribe) is similar to CallCC(Action<Action> f).
    CallCC(Return =>
    {
        ...
        // Exit Continuations: Get me out of here
        Return(); //jump out, goto, 
        ...
    });
    In Observable.Create is the call IObserver.OnCompleted() or OnError(e) like the return or jump out.
    Example:
    var observable = Observable.Create<int>(observer =>
    {
        observer.OnNext(1);
        observer.OnCompleted(); //like return, jump out
        observer.OnNext(2); //ignored
        …
    });
    The observable generates only the value 1, the OnNext calls after OnCompleted are ignored.
    • Edited by Steffen Zeidler Tuesday, January 05, 2010 1:31 PM CallCC(Action<Action> f)
    Saturday, January 02, 2010 6:49 PM
  • Dave’s hint and Wes Dyer continuation-passing-style blog was very helpful.
    With this information my understanding of Create as a call/cc function is now:
    IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, Action> subscribe) is similar to CallCC(Action<Action> f).
    CallCC(Return =>
    {
        ...
        // Exit Continuations: Get me out of here
        Return(); //jump out, goto, 
        ...
    });
    
    In Observable.Create is the call IObserver.OnCompleted() or OnError(e) like the return or jump out.
    Example:
    var observable = Observable.Create<int>(observer =>
    {
        observer.OnNext(1);
        observer.OnCompleted(); //like return, jump out
        observer.OnNext(2); //ignored
        …
    });
    
    The observable generates only the value 1, the OnNext calls after OnCompleted are ignored.
    Yes, that's it exactly. However, look back at Wes's description of Call/CC. It returns a value. None of these return a value. The beauty of Call/CC is that you converting your call site to a function and passing it along; you are returning back to the original call site with the appropriate value. In order to do something similar with what I see in Create, you would have to use a closure over a mutable variable or the NextValue, LatestValue, etc. to get the value back. Or maybe you have to wait for the OnCompleted() and get the LatestValue from that? I don't know which is correct, and that is what confuses me.

    Subscribe is a CPS continuation. You pass in the continuation you wish to occur. Call/CC is intended to not require that, or so I thought. So there must be another way of getting the value you want. NextValue or LatestValue seem to be what I want, but those just seem funny and make the whole thing a bit awkward to use.


    Tuesday, January 05, 2010 1:58 PM
  • Hi Ryan,

    > However, look back at Wes's description of Call/CC. It returns a value. None of these return a value.
    > The beauty of Call/CC is that you converting your call site to a function and passing it along; you are
    > returning back to the original call site with the appropriate value


    I think that's because Create is actually a continuation monad, since it returns IObservable, which is a monad.  The CLR doesn't support real call/cc.

    - Dave
    http://davesexton.com/blog
    Tuesday, January 05, 2010 4:43 PM
  • EDIT: (I meant IObserver<T>).

    IObserver<T> is the continuation monad, actually.  Call/CC is "hidden" within Create, according to Erik Meijer.  Look earlier in the thread and he says that Create is Call/CC, and in another spot someplace he even said they considered calling it CallCC.
    Tuesday, January 05, 2010 5:31 PM
  • I think that's because Create is actually a continuation monad, since it returns IObservable, which is a monad.  The CLR doesn't support real call/cc.

    - Dave
    http://davesexton.com/blog
    I know call/cc isn't natively implemented in the CLR, but I can't imagine Erik getting excited over a partial solution. I suppose I just need the marble diagram to help me see this more clearly. If it's call/cc, that's great, but all I can see is more CPS. Create looks more a way to create a custom observable than a simplification of CPS to maintain procedural-looking readability, which is, so far as I know, the biggest benefit of call/cc. If this is all scary looking closures and enclosed lambdas, I don't see this being quite as exciting as I had hoped.
    Tuesday, January 05, 2010 10:14 PM
  • Hi Richard, 

    Ok that may be so, but I believe that it's the monad that enables call/cc to exist in the first place without any language support.  A consequence of this seems to be a difference in the way the continuation accepts input from the "call", as Ryan pointed out.  In Rx, the call's output value is passed to the continuation via the IObservable monad.

    So it seems to me that Create may be a call/cc implementation, but a subtle difference is that the continuation monad is required (even if it's hidden as IObserver) to enable the value of the call to be passed to its continuation.

    - Dave
    http://davesexton.com/blog
    Tuesday, January 05, 2010 10:16 PM
  • EDIT: (I meant IObserver<T>).

    IObserver<T> is the continuation monad, actually.  Call/CC is "hidden" within Create, according to Erik Meijer.  Look earlier in the thread and he says that Create is Call/CC, and in another spot someplace he even said they considered calling it CallCC.
    I think Dave is actually correct here. IObservable<T> is the continuation monad while IObserver<T> is the continuation. If you take a look at Wes's post "The Marvels of Monads" (http://blogs.msdn.com/wesdyer/archive/2008/01/11/the-marvels-of-monads.aspx) you'll notice that the identity function .ToContinuation() is not the same as the actual continuation function, z => z.ToString().Replace('1', 'a'). Here, we might say .ToObservable() with an OnNext action in Subscribe (instead of directly passing to the query expression). At least, I think that's correct. I keep getting confused on what's what, as well. :)
    • Edited by Ryan RileyMVP Wednesday, January 06, 2010 4:27 PM I didn't accurately reflect Wes's post. I changed the continuation function and noted that the continuation is passed to the query expression, not the Select method. I'm not sure what I was thinking when I wrote this!
    Tuesday, January 05, 2010 10:17 PM
  • To clarify why I'm excited about call/cc, I think we can get more people writing asynchronous, observable apps if Call/CC exists b/c, in the end, the code looks very similar to what they are used to. CPS with Subscribe still looks a little strange to a lot of people, and I personally know many who avoid lambdas and LINQ queries at all costs. A little Call/CC sugar would help the LINQ medicine go down. (Ugh, I can't believe I just typed that... and didn't erase it.)
    Tuesday, January 05, 2010 10:19 PM
  • Take a look at http://channel9.msdn.com/shows/Going+Deep/Expert-to-Expert-Brian-Beckman-and-Erik-Meijer-Inside-the-NET-Reactive-Framework-Rx/ at 54 minutes in, they talk about the continuation monad.  Tell me, if you think they are talking about IObservable<T> or IObserver<T> or both - I might be wrong, probably am, since you need an observable to use an observer anyways.

    Tuesday, January 05, 2010 11:24 PM
  • I tried to simulate return this way as well but I couldn`t get it quite right.
    When you set up the observer, the OnNext, OnCompleted and OnError, you are essentially creating a control flow programmatically, since OnNext, OnCompleted and OnError are all you need to model sequential control flow (at least).

    If this was CPS we would have to pass the observer to Subscribe etc..., but we do not have to do that.

    Tuesday, January 05, 2010 11:31 PM
  • Bingo! Now in order to earn lunch with the Rx team you'll have to explain why it is Create :->

    To quote Wikipedia: “Taking a function f as its only argument, call/cc takes the current continuation (i.e., a "snapshot" of the current control context or control state of the program) as an object and applies f to it ”.

    So in our case the Create function “captures” the current state of the program, and will run (or apply) the subscription function to that context. Meaning that thing like closures in the function have access to the ambient world as it was at the time we called Create. And all this wrapped up in a nice continuation monad that we can execute as often as we want.

    Hopefully I didn’t overtly lie to everyone, but that is what I have been trying to learn over the last couple of days.

     

    -Samuel McAravey

    It was right here all along. I think you have it backwards, Samuel, and I think Steffan was right below. Given the definition here, call/cc :: f -> cc -> result, so it has to take a function f : Func<IObserver<T>, Action> and apply it to the current continuation cc : IObserver<T> passed into the Subscribe() method, or any of the other overloads. So your actual application has to go into the Subscribe method, which is not typical of call/cc, which captures the call site as it's current continuation.

    A full example would be:
    Observable.Create<bool>(observer =>
    {
        try {
            bool value = GetNextValue();
            observer.OnNext(value);
        } catch (Exception ex) {
            observer.OnError(ex);
        } finally {
            return observer.OnComplete();
    } }).Subscribe( // IObserver<T> or individual functions: result => Console.WriteLine("I can continue!"), exn => Console.WriteLine(exn.Message), () => Console.WriteLine("All done"));

    I haven't tried this, but that should be the case, though I'm not sure if I'm doing the return correctly. Should the whole thing be wrapped in a return () => with no return on the observer.OnComplete()? Or should there be a return () => () after the try/catch?

    Ryan
    Wednesday, January 06, 2010 5:35 AM
  • If this was CPS we would have to pass the observer to Subscribe etc..., but we do not have to do that.

    I don't follow. The extensions to take those functions are just creating an IObserver to pass into the actual IObservable<T>.Subscribe() method, which only takes an IObserver<T>. So it seems to me that you do have to pass the observer to Subscribe. Maybe I missed what you were saying? Or I could be wrong. :)
    Wednesday, January 06, 2010 5:42 AM
  • I've watched that video several times. It's very good, if a bit difficult to grasp at parts. The IObservable interface is the actual monad unit, much as IEnumerable is the monad unit for the list monad in C#. IObserver is a continuation. If you look at F#'s Async.RunWithContinuations, you will see a signature exactly like that of Subscribe(). To verify that IObservable is indeed the monad, you need look at which interface is used as the combinator extension point. For the list monad, IEnumerable is that interface and has SelectMany (Bind) defined on it. IObserver doesn't have a Bind operator, but IObservable does. Also, IObservable is the dual of IEnumerable, not IObserver. The difficulty lies in the fact that one is the continuation monad and the other is a continuation. The words appear almost interchangeable.
    Wednesday, January 06, 2010 5:49 AM
  • This might look cleaner:
    Observable.Create<bool>(observer =>
    {
        try {
            bool value = GetNextValue();
            observer.OnNext(value);
        } catch (Exception ex) {
            observer.OnError(ex);
        } finally {<br/>
            return observer.OnComplete();
     }
    }).Subscribe(result =>
    if (result) { // do something });

    Wednesday, January 06, 2010 5:55 AM
  • advanced sample with call/cc
    var observable = Observable.Create<long>(observer =>
    {
        Action Return = observer.OnCompleted;
        //simulation of source.TakeUntil(other)
        //source: value every second
        IDisposable disposable1 = Observable.Interval(TimeSpan.FromSeconds(1)).Subscribe(observer.OnNext);
        //other: goto return continuation after 10 seconds
        IDisposable disposable2 = Observable.Interval(TimeSpan.FromSeconds(10)).Subscribe(_ => Return());
        return () => { disposable1.Dispose(); disposable2.Dispose(); };
    });
    var result = observable.ToEnumerable().ToArray();
    Wednesday, January 06, 2010 7:38 AM
  • Hi,

    It seems like the monad and continuation have been identified repeatedly in this thread, so maybe the difficulty is in understanding the call part of call/cc, as implemented by Observable.Create.

    So to state the apparent consensus, IObservable<T> is the monad (e.g., it enables method chaining) and IObserver<T> is the continuation (e.g., it's executed after a value is pushed down the composed monadic pipeline).

    That being said, I'll examine the signature for Observable.Create and see if I can specify exactly where I believe the call part is.

    public static IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, Action> subscribe)
    


    The subscribe argument is a function that accepts a continuation (IObserver<T>) and returns an Action function.  For now, ignore the Action part.

    The important part is that when Create is invoked the continuation is currently unknown.  The subscribe function accepts the continuation as a parameter.  The subscribe function also takes on the responsibility of calling the continuation when it's ready to push a new value down the monadic pipeline.  It does this by invoking IObserver<T>.OnNext.

    But subscribe is not executed immediately either.  After all, how could it unless there was a continuation to pass to it?  A continuation is only attached when an IObserver<T> subscribes.  Only then can the Create method call the specified subscribe function with the current continuation.

    This may be counterintuitive when compared to the formal definition of call/cc.  The CLR and C# do not know about the call/cc semantics of Observable.Create; i.e., the rest of your imperative code can't just be wrapped up into an IObserver<T> implementation (from immediately after the call-site of Create) and get passed to the subscribe function.  I believe this is why a continuation monad is necessary.  It allows us to compose a new segment into the monadic pipeline; specifically, it allows us to specify the continuation in the form of another method (typically, an anonymous method; e.g., lambda expression).

    Each extension method that allows us to compose additional semantics onto the returned IObservable<T> is a continuation (they may call Subscribe, but that's just an implementation detail that we don't have to know about).  These composing functions specify code that will be executed when OnNext is called.  So the Subscribe method is just one example that we can use in our code to add another continuation, and it provides a sort of end cap for the pipeline; i.e., composibility is lost since the Subscribe method does not return another IObservable<T>.

    System.IDisposable Subscribe(System.IObserver<T> observer)

    But it does return IDisposable.  This allows the subscription to be unsubscribed at the whims of the caller.  Well this appears to be the purpose of the subscribe function's return value as well.  The one we ignored before :)  It's a function of type Action so it takes no parameters and returns void.  It's apparently called once by Rx when the call/cc will no longer be used; e.g., when OnCompleted or OnError are called.


    --
    Something else interesting to me is that it's also technically possible for Create to invoke the subscribe function immediately, passing an internal IObserver<T> implementation, even if no additional monadic pipeline functions are composed with the returned IObservable<T>.  For example, the Create implementation could be greedy and buffer all of the OnNext calls.  Then it could just play them out whenever the first IObserver<T> subscribes.

    Now imagine a special extension method is created to Buffer the values.  It's technically a subscriber even if the Subscribe method hasn't been called in user code yet.  Perhaps Buffer is also greedy and causes the Create method to start the observable producing values so that it can buffer them.  In this case, a continuation into user code wasn't specified but the call in the call/cc has already been invoked.  (Note: I'm not saying the actual Rx Buffer implementation is greedy, I just thought this might be interesting to point out - I don't know how it is actually implemented.)


    I hope all of that is correct :p

    - Dave
    http://davesexton.com/blog
    Wednesday, January 06, 2010 9:26 AM
  • Dave, I think you are absolutely right. I flipped all the signatures to F# last night to get a better look at what is going on, substituting the name f for subscribe:

    Observable.Create : (f: IObserver<TSource> -> Action) -> IObservable<TSource>
    Observable.CreateWithDisposable : (f: IObserver<TSource> -> IDisposable) -> IObservable<TSource>
    observable.Subscribe : IObserver<TSource> -> IDisposable

    I think this makes it much more clear what's happening inside of Create. (For those who haven't peeked in Reflector, Create internally calls CreateWithDisposable and wraps the Action in an AnonymousDisposable, so they are really the same without having to create your own IDisposable.) Subscribe is the easiest to look at because its signature matches that of the function passed into CreateWithDisposable and thus Create. Another way to look at this is to see how you would manually add a CreateFromEvent extention (known as FromEvent):

    IObservable<TArgs> CreateFromEvent<TArgs>(IEvent<TArgs> event) {
        return Observable.Create<TArgs>(o => {
            Action<object, TArgs> action = (s, e) => {
                if (e.Error != null)
                    o.OnError(e.Error);
                else if (e.Cancelled)
                    o.OnCompleted();
                else
                    o.OnNext(e.Value);
            };
            event += action;
            return () => event -= action;
        });
    }

    Well, that's most of it, anyway, and it should be close to accurate. Create provides the source of observable data called when Subscribe or another retrieval call is made and uses the observer passed into, say, Subscribe().
    • Edited by Ryan RileyMVP Wednesday, January 06, 2010 7:09 PM I changed the signature of the CreateFromEvent method to more accurately reflect the intent.
    Wednesday, January 06, 2010 6:17 PM
  • I thought I might add why I care about Call/CC. Call/CC allows you to write more imperative looking code but without blocking. Someone asked why I wouldn't just write something like this:

    if (new YesNoMessage("Do something?").GetResponse())
        Console.WriteLine("Did something.");
    else
        Console.WriteLine("Didn't do anything.");

    The answer is that you will have to wait on the result of the GetResponse() call before your program can continue. If you are doing a very sequential task and the user isn't able to do anything in your app while you are waiting, that's no big deal. But what if you have a number of your components and you want your user to be able to look up some information in another component for the response? Sorry. It's sort of like the at&t commercials about Verizon not allowing you to both talk and surf the web at the same time.

    So the answer is that you can use continuations. CPS looks kind of ugly, but call/cc usually just adds a keyword:

    if (callcc (() => new YesNoMessage("Do something?").GetResponse()))
        Console.WriteLine("Did something.");
    else
        Console.WriteLine("Didn't do anything.");

    That's pretty awesome. However, as Dave and others have mentioned, the CLR doesn't support this. But what is this doing? You are essentially taking everything after the (callcc new YesNoMessage("Do something?").GetResponse()), including the if, and transforming that into a continuation that is called once GetResponse returns. This means that your program will continue on and return back here once it receives a response.

    How does this look with Rx? As best as I can see it, it would look like so:

    Observable.Create(o => {
        var result = new YesNoMessage("Do something?").GetResponse();
        o.OnNext(result);
        o.OnCompleted();
        return () => ();
    }).Subscribe(result => {
        if (result)
            Console.WriteLine("Did something.");
        else
            Console.WriteLine("Didn't do anything.");
    });

    You have to convert the current continuation (i.e. the rest of your program) yourself and stick it in Subscribe. You can still see most of what you are doing, but it's not quite as nice, especially if you have nested callcc calls:

    if (callcc (() => new YesNoMessage("Do something?").GetResponse()))
        callcc (() => new OkMessageDialog("You did something!").GetResponse());
    else
        callcc (() => new OkMessageDialog("You didn't do anything.").GetResponse());

    In fact, if you create your own continuation type that takes an Action delegate, you'll notice that my example above of Observable.Create really still looks like a CPS call:

    // In class definition for YesNoMessage
    void GetResponse(Action<bool> continuation) {
        var result = // get response from user input
        continuation(result);
    }
    
    new YesNoMessage("Do something?").GetResponse(
        result => {
            if (result)
                Console.WriteLine("Did something.");
            else
                Console.WriteLine("Didn't do anything.")
        });

    Now don't get me wrong. I love Rx. You would be hard pressed to compose the above. However, I still don't see how Observable.Create is necessarily call/cc. It looks like it is still CPS to me. I would love to understand this better, especially if my understanding of call/cc is completely wrong.

    Cheers!
    Ryan
    • Edited by Ryan RileyMVP Wednesday, January 06, 2010 7:18 PM I made my make-believe callcc syntax more realistic for C#.
    Wednesday, January 06, 2010 6:41 PM
  • I thought the whole point was so that methods like GetResponse in your example, do not need to have the continuation passed to it.  If it was really all CPS, you'd have to pass the continuation into every method, like you did here:

    new YesNoMessage("Do something?").GetResponse(
        result => {
            if (result) Console.WriteLine("Did something.");
            else
                Console.WriteLine("Didn't do anything.")
        });

    So then you'd have to go through all your code and make overloads that take the continuation (i.e.  as an Action).  I mentioned Subscribe as an example earlier but it wasn't a great example, because it happens to take an IObserver, but it's not necessarily the SAME continutation ... i.e. it's not the same IObserver used in Create, if you don't want it to be.  So you can change the current continuation.  I don't know though, I am just learning and don't claim to understand yet.

    Wednesday, January 06, 2010 8:07 PM
  • Hi,

    Richard, your reasoning about why Observable.Create is a call/cc implementation seems more like an explanation of how Rx actually works now. Functions are composed together via IObservable and continuations are attached via IObserver.  In your example, if YesNoMessage returns an IObservable then GetResponse would be like Subscribe.  So in fact, it is CPS.  More specifically, Rx uses a continuation monad.  Observable.Create is a different animal I think.

    To explain why Create is call/cc I think we can put it another way: 

    Ryan's example of a true callcc usage shows that the continuation wouldn't have to be passed to the function, if this were supported by .NET.  It's implied that the continuation will be called automatically when the function is finished executing.  Clearly that's more limiting than CPS, whereby the continuation function can be invoked as needed; e.g., to produce multiple values.

    Observable.Create is like callcc because it allows the programmer to specify the method that will be called with whatever continuation is attached later.  The continuation doesn't have to be defined before you call Create.

    Contrasting that to CPS, you must pass the continuation into the method; therefore, it must be created before you call the method.  This is backwards :)

    Since we don't have platform support for call/cc, the "current" part in "current continuation" must be defined explicitly as another method.  So Observable.Create is call/cc but the current continuation is a method, not the actual continuation at the point of the call to Create.

    - Dave

    http://davesexton.com/blog
    Wednesday, January 06, 2010 8:56 PM
  • i.e. it's not the same IObserver used in Create, if you don't want it to be.  So you can change the current continuation.  I don't know though, I am just learning and don't claim to understand yet.

    That's what I initially thought, too. However, if you go look at the assembly in Reflector, you'll find that the observer passed into the Create function is exactly the observer passed to Subscribe. Also, Subscribe is just a nice duality for GetEnumerator; otherwise, you could just add `(observer)` to the end of your query. Subscribe is the method that accepts the continuation for the continuation monad. The observable query you compose still needs the continuation in order to execute, much as an IEnumerable query needs to be iterated.

    This is some hard stuff to grasp. I knew what I was looking for and couldn't find it, then didn't understand what it was when Erik pointed it out. :) I'm still not totally sure, but this forum has been very enlightening and helpful in an exploratory way.
    Wednesday, January 06, 2010 10:22 PM
  • Ryan's example of a true callcc usage shows that the continuation wouldn't have to be passed to the function, if this were supported by .NET.  It's implied that the continuation will be called automatically when the function is finished executing.  Clearly that's more limiting than CPS, whereby the continuation function can be invoked as needed; e.g., to produce multiple values.

    Observable.Create is like callcc because it allows the programmer to specify the method that will be called with whatever continuation is attached later.  The continuation doesn't have to be defined before you call Create.

    Contrasting that to CPS, you must pass the continuation into the method; therefore, it must be created before you call the method.  This is backwards :)

    Since we don't have platform support for call/cc, the "current" part in "current continuation" must be defined explicitly as another method.  So Observable.Create is call/cc but the current continuation is a method, not the actual continuation at the point of the call to Create.
    I think I agree with what you are stating. True call/cc transforms the rest of your program after the call/cc statement into a continuation for you. So above, the "if (result) ..." to the end of the method would be picked up and turned into a continuation. For Rx, that would correlate with what is passed into Subscribe. I don't know that I agree that Observable.Create is a different animal or "backwards", though; it just allows more flexibility. Most of the built-in transformations, e.g. FromEvent, FromAsyncPattern, etc., are all collection-like. In other words, you're expecting zero or more responses much like you would when iterating a list. With Observable.Create, you have total control with what you want to pass back to your continuation. So it's more flexible, but you are still working within the continuation monad.

    I think a word picture would be useful here. The example I gave above could be a WinForms app. It asks if you want to do something, to which you must click either Yes or No. Once you click Yes or No, you'll receive a notification in the console. (For the one with multiple callcc calls, you would get a second notification window to which you must click Ok). What call/cc does there is allow you to do that sequence in a non-blocking fashion. In other words, if you wanted to pop that message up there then run some other background task, you could. The user will see the pop-up, while the program will continue executing along its merry way. Once the user responds, the program will jump back to the current continuation, execute, then hop back to where it was before, never missing a beat. Event loop, ftw!

    Without call/cc, you would be waiting for a response the whole time. The beauty of call/cc is that you just add the keyword and a few extra, lambda-related symbols to get the non-blocking behavior. It's very much like the workflow computations in F# where you can just wrap an async { ... } and add a few '!` operators.

    The last example showed something similar with CPS. It's less readable b/c you are passing the continuation along into the method that asks for a response. There's nothing wrong there; it just doesn't flow in a familiar way. I feel the Observable.Create approach is like this latter option. The symbols for wrapping everything are much more obtuse, though you do gain the additional combinators. I'm not sure those are very useful here, though, as call/cc doesn't usually take additional factors into consideration and returns the first value from the "observable."

    I am not as familiar with call-with-current-continuation in Lisp, so I may just be smoking a Ruby pipe. If there is a better way, I'm open to it. I'm satisfied with what is available regardless.
    Wednesday, January 06, 2010 10:42 PM
  • I thought I would take a stab at converting the nested callcc example into Rx:

    // Make-believe syntax
    if (callcc (() => new YesNoMessage("Do something?").GetResponse()))
        callcc (() => new OkMessageDialog("You did something!").GetResponse());
    else
        callcc (() => new OkMessageDialog("You didn't do anything.").GetResponse());
    
    // Rx syntax?
    Observable.Create(o => {
        var result = new YesNoMessage("Do something?").GetResponse();
        o.OnNext(result);
        o.OnCompleted();
        return () => new Unit();
    }).Subscribe(result => {
        if (result)
            Observable.Create(o => {
                new OkMessageDialog("You did something!").GetResponse();
                o.OnCompleted();
                return () => new Unit();
            }).Subscribe(result => new Unit());
        else
            Observable.Create(o => {
                new OkMessageDialog("You didn't do anything!").GetResponse();
                o.OnCompleted();
                return () => new Unit();
            }).Subscribe(result => new Unit());
    });

    For those in the know: is that right? Is there a simpler way? Also, given this implementation, will I have any memory leaks by not disposing the disposable?
    • Edited by Ryan RileyMVP Thursday, January 07, 2010 3:44 PM I added an additional question I came up with.
    Wednesday, January 06, 2010 10:54 PM
  • Hi Ryan,

    The async property of call/cc is also shared by CPS.  In fact, the Asyncronous Programming Model (APM) with its callback-accepting overload seems just like CPS to me.

    I still think the difference is in the "current" part of call/cc.  Let's rename CPS so that it's clearer:

    call/cc = call with current continuation
    CPS     = call with any     continuation
    


    To me, it seems that it's the fact that CPS requires the continuation to be defined before the call that distinguishes it from call/cc, which requires the continuation to be defined after the call.

    In true call/cc, it seems, the code following the call site is the continuation, whereas CPS requires the function to be defined before it makes the call so that it can pass it as an argument.  This is why I mentioned it seemed backward to me :)

    Furthermore, if both implementations accept a parameter that is the continuation (as is the case in Rx), then call/cc does appear to be more flexible in one sense.  For example, I can change the continuation in the Rx call/cc by simply attaching a different observer at any time; i.e., change the current continuation.  I can't do that with CPS (without passing the continuation by ref ;).

    - Dave
    http://davesexton.com/blog
    Thursday, January 07, 2010 7:57 AM
  • Hi Dave,

    I like the way you described the relationship in your last post. We are of an accord. :) I also now understand what you mean by backward, and I completely agree. Well said! I would only add that, typically, call/cc only returns one value, hence my thoughts (far) above about NextValue or LatestValue as the call/cc implementation.

    Oops! I just noticed the bit about being able to change the current continuation in call/cc. I don't think that's the general idea for call/cc. All the examples I've ever seen were used to transform the succeeding call site into a continuation and allow the program to continue without blocking. Changing the current continuation, or in this case adding a new observer, seems to break that idea. So that could be another downside to Observable.Create as call/cc.

    Ryan
    Thursday, January 07, 2010 3:42 PM
  • @Dave - Earlier you said that the continuation monad was required and made call/cc possible - I didn't say otherwise, in fact how do we do anything with a continuation without the continuation monad?  We have to represent it as a object or function somehow.

    Maybe the confusion comes because of what we are expecting from the continuation monad.

    According to http://haskell.org/all_about_monads/html/contmonad.html, "The Continuation monad represents computations in continuation-passing style."  Which I found enlightening.  If that's really the case ... then that would imply that we are going to see CPS sytle code, but wrapped up in the monad itself, bringing the benefits of monads to CPS, allowing controlling of side-effects and compositionality.

    I think I'll try the Haskell example (http://haskell.org/all_about_monads/examples/example18.hs) from that page tonight, if someone else doesn't first.

    Thursday, January 07, 2010 4:28 PM
  • Hi Ryan,

    Yea I see what you mean.  The Rx call/cc appears to be adding flexibility that may not be part of genuine call/cc.  Seems to me like it's a consequence of having the continuation defined manually instead of with actual platform support, and also partly because of the continuation monad format.

    However, I don't agree that call/cc must only return one value.  If the continuation is passed as an argument instead of just being implicitly invoked at the end, then it could be invoked multiple times just like the Rx implementation allows; e.g., calling OnNext in a loop.  I'd imagine this simplifies tail-end recursion as well in languages that natively support call/cc.  I think it would prevent stack overflows.

    - Dave
    http://davesexton.com/blog
    Thursday, January 07, 2010 6:06 PM
  • Hi Richard,

    > @Dave - Earlier you said that the continuation monad was required and made call/cc possible [snip]

    Well I believe that the monad makes call/cc possible in Rx without platform support.

    > how do we do anything with a continuation without the continuation monad?

    Well my understanding is that the monad enables composition, but CPS doesn't actually require a monad.  I could define a method that accepts a callback function to be invoked with the method's result.  That's CPS without a monad.  The benefit of a continuation monad seems to be that it enables composition of continuations.  Sounds perfect for a reactive framework :)

    RE Haskell: I like that definition but all of this is really starting to make my head hurt.  I wonder when the experts will decide to chime in ;)

    - Dave
    http://davesexton.com/blog
    Thursday, January 07, 2010 6:25 PM
  • @Dave - You are probably right; I suppose I have just never seen an example of callcc that returns multiple values.
    Thursday, January 07, 2010 8:43 PM
  • @Dave - I think your post about the nature of continuations and the continuation monad is spot on. The Haskell type signature is interesting and doesn't seem to quite align with the signature in F omega above. I tried to reconcile these with Create earlier but ended with the monad as IDisposable instead of IObservable. :( I think the Rx guys are enjoying our squirming. :) if we look at the Haskell sig, and replace the IO types with Haskell types, we get: m a Create((a -> m b) -> m a) Given that def, m is IObservable. The subscribe function would then need to be a delegate like this: m a Subscribe(a -> m b) We don't have a Create with that signature, so I know I am missing something. The F omega version appears to fit better, except that it also seems to return an IO within the subscribe: m a Create((something -> (unit -> unit)) -> m a) I don't know what that something is, but it could be an IObserver. There is an Action in there, as well, I think. Again, this is a best guess. The hard part is the extra return of m a. I think we are getting close!
  • Create is call/cc: dualism IEnumerable - IDisposable
    IEnumerable<T> Create<T>()
    {
        do
        {
            ...
            if (completed)
            {
                //Exit Continuations
                yield break; // -> OnCompleted()
            }
            else if (error)
            {
                throw new Exception(); // -> OnError(e)
            }
            else
         {
                yield return x; // -> OnNext(x)
         }
        }
    }
    Saturday, January 09, 2010 3:06 PM
  • Dave’s hint and Wes Dyer continuation-passing-style blog was very helpful.
    With this information my understanding of Create as a call/cc function is now:
    IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, Action> subscribe) is similar to CallCC(Action<Action> f).
    CallCC(Return =>
    
    {
    
        ...
    
        // Exit Continuations: Get me out of here
    
        Return(); //jump out, goto, 
    
        ...
    
    });
    
    
    In Observable.Create is the call IObserver.OnCompleted() or OnError(e) like the return or jump out.
    Example:
    var observable = Observable.Create<int>(observer =>
    
    {
    
        observer.OnNext(1);
    
        observer.OnCompleted(); //like return, jump out
    
        observer.OnNext(2); //ignored
    
        …
    
    });
    
    
    The observable generates only the value 1, the OnNext calls after OnCompleted are ignored.

    The problem I have with this is that observer.OnNext(2) is invoked, but the subscriber is no longer receiving notifications.  Is that really the same thing?  If you have a Console.WriteLine before or after observer.OnNext(2) it will execute.  Is that supposed to happen, with a real Call/CC?  If so, then I have managed to sucessfully translate some nice examples, otherwise it's still wrong, and requires an if statement or something else I haven't figured out, to actually prevent the code following observer.OnCompleted() from executing.
    Thursday, January 14, 2010 3:09 PM
  • The problem I have with this is that observer.OnNext(2) is invoked, but the subscriber is no longer receiving notifications.  Is that really the same thing?  If you have a Console.WriteLine before or after observer.OnNext(2) it will execute.  Is that supposed to happen, with a real Call/CC?  If so, then I have managed to sucessfully translate some nice examples, otherwise it's still wrong, and requires an if statement or something else I haven't figured out, to actually prevent the code following observer.OnCompleted() from executing.
    Richard, the sample you provided is correct. observer.OnCompleted() acts like Return(). It certainly looks ridiculous in the example, as you would never ever receive the second call to observer.OnNext(2). However, the example is just to prove that point. If you follow the Haskell links you gave above, you'll see that if conditions are important and frequently used to determine when and where to jump out.

    Now, if we can just reconcile the signatures, I'll be happy. I think we have a pretty good understanding of the use of Observable.Create as call/cc, but I am not certain as to how the signature quite matches.
    Thursday, January 14, 2010 3:16 PM
  • Ok, so I tried this example in Haskell, translated to C#, does it look right?

    Haskell:

    whatsYourName name =
      (`runCont` id) $ do                      -- 1
        response <- callCC $ \exit -> do       -- 2
          validateName name exit               -- 3
          return $ "Welcome, " ++ name ++ "!"  -- 4
        return response                        -- 5

    validateName name exit = do
      when (null name) (exit "You forgot to tell me your name!")


    static void Main(string[] args) {
                
                Action<string, IObserver<string>> validateName = (name, exit) => {
                    if (string.IsNullOrEmpty(name)) {
                        exit.OnNext("You forgot to tell me your name!"); // Could use OnError too
                        exit.OnCompleted(); // stop all future exit.OnNext()s
                    } 
                };
    
                var main =
                    Observable.Start(() => Console.WriteLine("What's your name?"))
                    .SelectMany(_ => Observable.Start<string>(Console.ReadLine))
                    .SelectMany(name => Observable.Create<string>(exit =>
                    {
                        validateName(name, exit);
                        exit.OnNext("Nice to meet you, " + name + "!");
                        //exit.OnCompleted(); // not necessary here
                        return () => { };
                    }))
                    .Subscribe(response => Console.WriteLine(response));
                    
                            
                Console.ReadKey();
            }
    Thursday, January 14, 2010 3:56 PM
  • Ok, so I tried this example in Haskell, translated to C#, does it look right?

    Haskell:

    whatsYourName name =
      (`runCont` id) $ do                      -- 1
        response <- callCC $ \exit -> do       -- 2
          validateName name exit               -- 3
          return $ "Welcome, " ++ name ++ "!"  -- 4
        return response                        -- 5

    validateName name exit = do
      when (null name) (exit "You forgot to tell me your name!")


    static void Main(string[] args) {
                
                Action<string, IObserver<string>> validateName = (name, exit) => {
                    if (string.IsNullOrEmpty(name)) {
                        exit.OnNext("You forgot to tell me your name!"); // Could use OnError too
                        exit.OnCompleted(); // stop all future exit.OnNext()s
                    } 
                };
    
                var main =
                    Observable.Start(() => Console.WriteLine("What's your name?"))
                    .SelectMany(_ => Observable.Start<string>(Console.ReadLine))
                    .SelectMany(name => Observable.Create<string>(exit =>
                    {
                        validateName(name, exit);
                        exit.OnNext("Nice to meet you, " + name + "!");
                        //exit.OnCompleted(); // not necessary here
                        return () => { };
                    }))
                    .Subscribe(response => Console.WriteLine(response));
                    
                            
                Console.ReadKey();
            }
    
    That is excellent, Richard. It really helps me understand why the type signatures are not aligning like I thought they should. Let's match up the methods to the Haskell equivalents:
    • Subscribe() is runCont
    • OnNext() is return
    • OnCompleted() is exit, or the "current continuation." Well, technically, I suppose you need the 1-2 combination of OnNext() followed by OnCompleted(); otherwise you won't return anything. So the IObserver<T> is the current continuation, though that concept is different than other implementations such as Haskell.

    Given that, I don't think we'll arrive at an exact signature mapping between Observable.Create and the other callcc implementations.
    • Marked as answer by Ryan RileyMVP Tuesday, January 19, 2010 1:54 PM
    • Unmarked as answer by Ryan RileyMVP Thursday, September 08, 2011 11:57 PM
    Thursday, January 14, 2010 5:49 PM
  • OnCompleted() is exit, or the "current continuation." Well, technically, I suppose you need the 1-2 combination of OnNext() followed by OnCompleted(); otherwise you won't return anything. So the IObserver<T> is the current continuation, though that concept is different than other implementations such as Haskell.

    The 1-2 OnNext-OnCompleted calls can be condesed into Observable.Return<T>. Most likely that is why it is called return. So it would look something like:


    var main =
        Observable.Start(() => Console.WriteLine("What's your name?"))
        .SelectMany(_ => Observable.Start<string>(Console.ReadLine))
        .SelectMany(name => Observable.CreateWithDisposable<string>(exit =>
            {
                validateName(name, exit);
                return Observable
                    .Return("Nice to meet you, " + name + "!")
                    .Subscribe(exit);
            }))
        .Subscribe(response => Console.WriteLine(response));

    I also agree that we will likely not get a 1-to-1 mapping of C# to something like Haskell simply because the syntax may be different, or because first-class functions are something that C# doesn't have.
    • Edited by McAravey Thursday, January 14, 2010 7:04 PM Can't spell today.
    • Marked as answer by Ryan RileyMVP Tuesday, January 19, 2010 1:54 PM
    Thursday, January 14, 2010 7:03 PM
  • Nice, thanks.  I was messing around with many different ways to write the same thing.  Now, if I try to take the Haskell example and remove all the do notation and $'s I get this, all as one line (please correct it, if it's wrong):

     

     

    whatsYourName name = callCC (\exit -> validateName name exit >> return ("Welcome, " ++ name ++ "!") >>= \response -> return response) `runCont` id

    Mapping that back to C# and what we've come up with so far (and removing extra details, making this a Func ... etc...):

    Func<string, string> whatIsYourName = name =>
    {
         return Observable.CreateWithDisposable<string>(exit =>
         {
              validateName(name, exit);
              // return Observable.Return("Nice to meet you, " + name + "!").SelectMany(response => Observable.Return(response)).Subscribe(exit);
              // or
              return Observable.Return("Nice to meet you, " + name + "!").Subscribe(exit);
         }).Single();
    };
    
    Console.WriteLine("What's your name?");
    Console.WriteLine(whatIsYourName(Console.ReadLine()));
    It's not exactly the same, but it's pretty darn close.  I guess >> can map to Observable.Then ... maybe that's the next step?

    Thursday, January 14, 2010 9:24 PM
  • @Ryan:  I'm not sure if Subscribe is runCont, because runCont :: (a -> r) -> r, so it's more like Single or other operators taking a parameter similar to <Func<Action, TResult>, TResult>, then your Call/CC returns TResult as expected.  So then the signatures do all line up, it appears.  I still feel like I am missing something, though.  I don't use id ... so it just ends up being Single() or Single(id => id).  Maybe Subscribe is withCont or one of the other functions ....

    Update: As Xenocide said, in Haskell the return is Observable.Return() in C#.  OnNext passes whatever parameter you give it to the exit continuation, specifically exit.OnNext (in C# we have to use a method to do this, because IObserver<T> is not a function, it's an object), because IObserver<T> actually has 3 (possible) continuations.

    Is this right Rx Team?  What's missing?
    • Edited by Richard Hein Thursday, January 14, 2010 11:16 PM 3 /possible/ continuations
    Thursday, January 14, 2010 10:53 PM
  • @Ryan:  I'm not sure if Subscribe is runCont, because runCont :: (a -> r) -> r, so it's more like Single or other operators taking a parameter similar to <Func<Action, TResult>, TResult>, then your Call/CC returns TResult as expected.  So then the signatures do all line up, it appears.  I still feel like I am missing something, though.  I don't use id ... so it just ends up being Single() or Single(id => id).  Maybe Subscribe is withCont or one of the other functions ....

    Is this right Rx Team?  What's missing?
    You are probably right. I was thinking of it in terms of just running the continuation. There are several functions that will do that, of which Single is one. NextValue and LatestValue were others that looked promising.
    Thursday, January 14, 2010 11:02 PM
  • I don't use id ... so it just ends up being Single() or Single(id => id).  Maybe Subscribe is withCont or one of the other functions ....


    I wanted to quickly mention this while I think. I found an explination from a different example that should help:
    1.Firstly, the (`runCont` id) at the top just means that we run the Cont block that follows with a final continuation of id. This is necessary as the result type of fun doesn't mention Cont.
    Source: http://en.wikibooks.org/wiki/Haskell/Continuation_passing_style
    Thursday, January 14, 2010 11:42 PM
  • So for my benefit, and maybe everyone else, I wanted to take Richards program and break it down.

    \response -> return response

    This would map to: response => Observable.Return(response)

    >>=

    This is just Observable.SelectMany<T>

    return ("Welcome, " ++ name ++ "!")

    This maps to: Observable.Return("Welcome, " + name + "!")

    >>

    This is equivilent to the C# semicolon ;


    Edit:
    I noticed after posting this that Richard had already worked it out:

              validateName(name, exit);
              // return Observable.Return("Nice to meet you, " + name + "!").SelectMany(response => Observable.Return(response)).Subscribe(exit);
              // or
              return Observable.Return("Nice to meet you, " + name + "!").Subscribe(exit);
    
    Friday, January 15, 2010 12:07 AM
  • @Ryan:  I'm not sure if Subscribe is runCont, because runCont :: (a -> r) -> r, so it's more like Single or other operators taking a parameter similar to <Func<Action, TResult>, TResult>, then your Call/CC returns TResult as expected.  So then the signatures do all line up, it appears.  I still feel like I am missing something, though.  I don't use id ... so it just ends up being Single() or Single(id => id).  Maybe Subscribe is withCont or one of the other functions ....

    Is this right Rx Team?  What's missing?
    You are probably right. I was thinking of it in terms of just running the continuation. There are several functions that will do that, of which Single is one. NextValue and LatestValue were others that looked promising.

    I believe Single or First would be appropriate becase 'runCont' id just converts from (in C#) IObservable<T> to T. My choice would be Single since it blocks until it gets an OnCompleted.
    Friday, January 15, 2010 12:30 AM
  • Wow, you guys have made a lot of progress on this.  I would try to rework how you translated the Haskell sample and investigate the inner workings of ValidateName and why it is passed exit.  Some hints: (>>) is SelectMany where you ignore the result of the source.  Return is Observable.Return and exit should have type IObserver<...> and remember CallCC is Create (as we already confirmed).  Furthermore, `runcont` is Subscribe.

    Good luck!  You are really close.
    • Marked as answer by Ryan RileyMVP Tuesday, January 19, 2010 1:56 PM
    • Unmarked as answer by Ryan RileyMVP Thursday, September 08, 2011 11:58 PM
    Tuesday, January 19, 2010 8:50 AM
  • My translation of the Haskell sample:

    IObservable<string> whatsYourName(string name)
    {
        return
            Observable.Create<string>(exit =>
            {
                validateName(name, exit);
                Observable.Return("Welcome, " + name + "!").Subscribe(exit);
                return () => { };
            });
    }
    void validateName(string name, IObserver<string> exit)
    {
        if (string.IsNullOrEmpty(name))
        {
            Observable.Return("You forgot to tell me your name!").Subscribe(exit);
        }
    }
    Tuesday, January 19, 2010 10:08 AM
  • My translation of the Haskell sample:

    IObservable<string> whatsYourName(string name)
    {
        return
            Observable.Create<string>(exit =>
            {
                validateName(name, exit);
                Observable.Return("Welcome, " + name + "!").Subscribe(exit);
                return () => { };
            });
    }
    void validateName(string name, IObserver<string> exit)
    {
        if (string.IsNullOrEmpty(name))
        {
            Observable.Return("You forgot to tell me your name!").Subscribe(exit);
        }
    }
    
    I think that is excellent, Steffan, though I would change the Observable.Create thusly:
    using System;
    using System.Linq;
    
    namespace CallCC
    {
        /// <summary>An attempt to understand Call/CC in Rx</summary>
        class Program
        {
            static void Main(string[] args)
            {
                Observable.Start(() => Console.WriteLine("What's your name?"))
                    .SelectMany(_ => Observable.Start<string>(Console.ReadLine))
                    .SelectMany(name => WhatsYourName(name))
                    .Subscribe(Console.WriteLine);
    
                Console.ReadKey();
            }
    
            static IObservable<string> WhatsYourName(string name)
            {
                return Observable.Create<string>(
                    exit =>
                        {
                            ValidateName(name, exit);
                            var disposable = Observable.Return("Welcome, " + name + "!").Subscribe(exit);
                            return disposable.Dispose;
                        });
            }
    
            static void ValidateName(string name, IObserver<string> exit)
            {
                if (string.IsNullOrEmpty(name))
                {
                    Observable.Return("You forgot to tell me your name!").Subscribe(exit);
                }
            }
        }
    }

    Tuesday, January 19, 2010 2:11 PM
  • Try giving the ValidateName function this type instead:

    IObservable<Unit> ValidateName(string name, IObserver<string> exit);

    And try changing WhatsYourName just a bit to simplify it...
    Tuesday, January 19, 2010 5:40 PM
  • Try giving the ValidateName function this type instead:

    IObservable<Unit> ValidateName(string name, IObserver<string> exit);

    And try changing WhatsYourName just a bit to simplify it...

    So how about something like this:

    static void Main(string[] args)
    {
        do
        {
            var result = from what in Observable.Start(() => Console.WriteLine("What is your name?"))
                         from read in Observable.Start(() => Console.ReadLine())
                         from name in WhatIsYourName(read)
                         from write in Observable.Start(() => Console.WriteLine(name))
                         select write;
    
            result.Subscribe();
        } while (Console.ReadKey().Key != ConsoleKey.Escape);
    }
    
    private static IObservable<string> WhatIsYourName(string name)
    {
        return Observable.CreateWithDisposable<string>(exit =>
             {
                 var result = from unit in ValidateName(name, exit)
                              from message in Observable.Return("Welcome, " + name + "!")
                              select message;
                 return result.Subscribe(exit);
             });
    }
    
    private static IObservable<Unit> ValidateName(string name, IObserver<string> exit)
    {
        return Observable.CreateWithDisposable<Unit>(observer =>
            {
                if (string.IsNullOrEmpty(name))
                {
                    return Observable.Return("You forgot to tell me your name.").Subscribe(exit);
                }
    
                return Observable.Return(new Unit()).Subscribe(observer);
            });
    }
    • Marked as answer by Ryan RileyMVP Tuesday, January 19, 2010 7:43 PM
    Tuesday, January 19, 2010 6:53 PM
  • That looks pretty good.  Note the use of SelectMany to compose ValidateName with Observable.Return.

    xs.SelectMany(_ => ys) is like (;)
    • Marked as answer by Ryan RileyMVP Tuesday, January 19, 2010 7:43 PM
    • Unmarked as answer by Ryan RileyMVP Thursday, September 08, 2011 11:56 PM
    Tuesday, January 19, 2010 7:31 PM
  • Slightly different ... using SelectMany to bind ValidateName to the rest:

    using System;
    using System.Linq;
    
    namespace CallCC {
        /// <summary>An attempt to understand Call/CC in Rx</summary>
        class Program {
            static void Main(string[] args) {
                Observable.Start(() => Console.WriteLine("What's your name?"))
                    .SelectMany(_ => Observable.Start<string>(Console.ReadLine))
                    .SelectMany(name => WhatsYourName(name))
                    .Subscribe(Console.WriteLine);
    
                Console.ReadLine();
            }
    
            static IObservable<string> WhatsYourName(string name) {
                return Observable.Create<string>(
                    exit => Observable.Start(() => ValidateName(name, exit))
                        .SelectMany(Observable.Return("Welcome, " + name + "!"))
                        .Subscribe(exit)
                        .Dispose);
            }
    
            static void ValidateName(string name, IObserver<string> exit) {
                if (string.IsNullOrEmpty(name)) {
                    Observable.Return("You forgot to tell me your name!").Subscribe(exit);
                }
            }
        }
    }










    Tuesday, January 19, 2010 7:31 PM
  • Try giving the ValidateName function this type instead:

    IObservable<Unit> ValidateName(string name, IObserver<string> exit);

    And try changing WhatsYourName just a bit to simplify it...
    Xenocide beat me to it, but I also didn't use another "call/cc" inside ValidateName. Should I have?

    using System;
    using System.Linq;
    
    namespace CallCC
    {
        /// <summary>An attempt to understand Call/CC in Rx</summary>
        class Program
        {
            static void Main(string[] args)
            {
                Observable.Start(() => Console.WriteLine("What's your name?"))
                    .SelectMany(_ => Observable.Start<string>(Console.ReadLine))
                    .SelectMany(name => WhatsYourName(name))
                    .Subscribe(Console.WriteLine);
    
                Console.ReadKey();
            }
    
            static IObservable<string> WhatsYourName(string name)
            {
                var response = Observable.Create<string>(exit =>
                    ValidateName(name, exit)
                        .SelectMany(Observable.Return("Welcome, " + name + "!"))
                        .Subscribe(exit)
                        .Dispose);
                return response;
            }
    
            static IObservable<Unit> ValidateName(string name, IObserver<string> exit)
            {
                return Observable.Start(() =>
                    {
                        if (string.IsNullOrEmpty(name))
                            Observable.Return("You forgot to tell me your name!").Subscribe(exit);
                    });
            }
        }
    }

    Tuesday, January 19, 2010 7:41 PM
  • Ok, so now after an interesting discussion and investigation, why is the Create operator really CallCC?
    Tuesday, January 19, 2010 7:46 PM
  • Ok, so now after an interesting discussion and investigation, why is the Create operator really CallCC?
    Given the craziness of the code above, I think this text from Haskell page on Control.Monad.Cont.Class says it all: :)

    "Abuse of the Continuation monad can produce code that is impossible to understand and maintain."
    Tuesday, January 19, 2010 8:10 PM
  • Ok, so now after an interesting discussion and investigation, why is the Create operator really CallCC?

    Well the way I see it, Create passes in the "what next" part of the computation. So the power is that it lets us call 'exit' whenever we want. That way we can let ValidateName immediately jump out with an error message, or let it continue the computation.

     

    CallCC also captures the "what next" and passes into an ordinary function that can choose when to 'jump out' by continuing the continuation passed in by CallCC.

    Tuesday, January 19, 2010 8:10 PM
  • Ok, so now after an interesting discussion and investigation, why is the Create operator really CallCC?

    Create is call/cc because it supplies return semantics to the computation in the form of a continuation.

    - Dave
    http://davesexton.com/blog
    Tuesday, January 19, 2010 8:36 PM
  • Ok, so now after an interesting discussion and investigation, why is the Create operator really CallCC?
    Joking aside, Wikipedia states, "Taking a function f as its only argument, call/cc takes the current continuation (i.e., a "snapshot" of the current control context or control state of the program) as an object and applies f to it."

    The Scheme wiki adds that call/cc allows for more than just "escape continuations." We should be able to jump back in from outside. I don't have a sample for this yet, but I'm starting to see how this could be possible and allow for coroutines to be created.

    The part that allows this to work as call/cc is that it captures the current state as an observer through the Subscribe method. So once the current state is set as an observer, the function passed into Create can be passed to the observer via Subscribe and the "jumps" occur as we return to the original "state."

    Tuesday, January 19, 2010 8:48 PM
  • So Wes asked why ValidateName has to take the exit parameter, but ValidateName doesn't have to take IObserver<T> which makes this much better:

    namespace CallCC {
        /// <summary>An attempt to understand Call/CC in Rx</summary>
        class Program {
            static void Main(string[] args) {
                do {
    
                    Observable.Start(() => Console.WriteLine("What's your name?"))
                        .SelectMany(_ => Observable.Start(() => Console.ReadLine()))
                        .SelectMany(name => WhatsYourName(name))
                        .SelectMany(response => Observable.Start(() => Console.WriteLine(response)))
                        .Subscribe();
                    
                } while (Console.ReadKey().Key != ConsoleKey.Escape);
            }
    
            static IObservable<string> WhatsYourName(string name) {
                return Observable.Create<string>(exit =>
                {
                    return 
                        Observable.Start(() => {
                            if (string.IsNullOrEmpty(name)) Observable.Return("You forgot to tell me your name.").Subscribe(exit);
                        })
                        .SelectMany<Unit, string>(Observable.Return<string>("Welcome, " + name + "!"))
                        .Subscribe(exit)
                        .Dispose;
                });
            }
    
    
        }
    }
    Obviously I can write ValidateName here without taking anything but name as a parameter ... which is good, because that was bugging me.  In the example above I just inlined it because it's shorter.  However I am making it an IObservable<string> not IObservable<Unit>.
    Tuesday, January 19, 2010 9:27 PM
  • In Haskell is
    callCC :: ((a -> m b) -> m a) -> m a

    This is like
    callCC :: ((a -> IObservable<b>) -> IObservable<a>) -> IObservable<a>

    In the sample whatsYourName is
    callCC :: ((string -> IObservable<Unit>) -> IObservable<string>) -> IObservable<string>

    The signature of Observable.Create is not identical but we can derive:
    The exit function (a -> IObservable<b>) can build from the observer parameter like the validateName function. The middle IObservable<a> we subscribe und don’t return. The exit function causes to ignore the following computation and return the value from the parameter to the right IObservable<a>.

    Exit function k:
    Func<a, IObservable<Unit>> k = x => { Observable.Return(x).Subscribe(observer); return Observable.Return<Unit>(new Unit()); };

    • Edited by Steffen Zeidler Wednesday, January 20, 2010 9:06 PM Exit function k
    Wednesday, January 20, 2010 6:08 PM
  • @Steffen - That looks really good and helps me better understand what was happening in the Haskell definition. I'm curious to know if this is the right answer.
    Wednesday, January 20, 2010 7:06 PM
  • My translation of the Haskell should be closer to this, but there is still two Subscribes, I think there should only be one.  I was working towards passing IObserver<string> id to WhatIsYourName in the same way validateName takes it, so at least it will be a Subscribe(id), but it gives unexpected results ...:

                Func<string, Func<IObserver<string>, IObservable<Unit>>> validateName = name => exit =>
                {
                    if (string.IsNullOrEmpty(name))
                    {
                        Observable.Return("You forgot to tell me your name!").Subscribe(exit);
                    }
                    return Observable.Return<Unit>(new Unit());
                };
    
                Func<string, IObservable<string>> WhatIsYourName = name =>
                    (Observable.Create<string>(exit =>
                    validateName(name)(exit).SelectMany(/*>>*/
                    Observable.Return<string>("Welcome, " + name + "!")).Subscribe(exit).Dispose)); // Extra Subscribe(exit) and the .Dispose return bug me.
           
                do
                {
    
                    Observable.Start(() => Console.WriteLine("What's your name?")).SelectMany(
                        _ => Observable.Start(() => Console.ReadLine())).SelectMany(
                        name => WhatIsYourName(name)).SelectMany(
                        response => Observable.Start(() => Console.WriteLine(response))).Subscribe();
    
                } while (Console.ReadKey().Key != ConsoleKey.Escape);
    • Edited by Richard Hein Thursday, January 21, 2010 2:28 AM alignment
    • Proposed as answer by Richard Hein Thursday, January 21, 2010 12:19 PM
    Thursday, January 21, 2010 2:27 AM
  • Wes said:

    ...  I would try to rework how you translated the Haskell sample and investigate the inner workings of ValidateName and why it is passed exit. ...
    I am still waiting to find out if
    Func<string, Func<IObserver<string>, IObservable<Unit>>> validateName = name => exit =>
                {
                    if (string.IsNullOrEmpty(name))
                    {
                        Observable.Return("You forgot to tell me your name!").Subscribe(exit);
                    }
                    return Observable.Return<Unit>(new Unit());
                };

    is correct, so that you can call it like validateName(name)(exit), since the Haskell space is function application.  validateName name exit should be a curried function ... right?

    Friday, January 29, 2010 7:14 PM
  • I think a lazy call of exit is a more precise translation.

    IObservable<Unit> validateName(string name, IObserver<string> exit)
    {
      return Observable.Return<Unit>(new Unit()).Do(_ =>
      {
        if (string.IsNullOrEmpty(name))
        {
          Observable.Return("You forgot to tell me your name!").Subscribe(exit);
        }
      });
    }

    >why it is passed exit
    Without exit we can only return values on the returned observable of the function validateName. But with the use of the exit parameter, we can directly return values on the returned observable of Create.

    Saturday, January 30, 2010 6:22 PM
  • The Haskell signature has a few too many "m"-s which obscures things.
    You might want to have look at callcc in Scheme and SML as well.
    Sunday, January 31, 2010 10:19 PM
    Owner
  • Standard ML of New Jersey
    val callcc : ('a cont -> 'a) -> 'a
    callcc f : Apply f to the "current continuation". If f invokes this continuation with argument x, it is as if (callcc f) had returned x as a result.

    That is the case with Create. If f in Create invokes Observable.Return(x).Subscribe(observer), it is as if Create had returned Observable.Return(x).

    • Marked as answer by Ryan RileyMVP Thursday, September 08, 2011 11:56 PM
    Monday, February 01, 2010 7:03 AM