none
Aggregation operators blocking feels wrong... RRS feed

  • Question

  • Just as a disclaimer, it's worth knowing that I developed a somewhat similar (though obviously much less polished and much less featureful) framework called Push LINQ a while ago. It feels like anything Push LINQ can do, Reactive LINQ should be able to do.

    Let's say we have a text file, and we want to know the total number of lines, the minimum line length and the maximum line length. I already have a method which will return me an IEnumerable<string> for a file, so I should be able to just observe the lines and get the min/max/count at the end, right?

    First problem: when I subscribe to an IObservable created from an IEnumerable, it will start iterating for that one subscriber. That's not handy - I want three subscribers. I found a way to fix that: use a Subject<string>, add each of the three subscribers, then subscribe the subject to the observable. It's a bit clunky though, and it feels like there ought to be a better way. I sort of want a deferred observable from the IEnumerable<T>: one which lets me subscribe multiple things, and then hit "go". I could build one myself, of course, but I suspect there's already one somewhere in the framework. Thoughts?

    The other problem is somewhat bigger: if I call Observable.Count, that's going to return an int immediately. That means it's got to block until it's received everything. At this point we've lost a lot of the benefit of using a push model, IMO. In Push LINQ, I return a Future<int> instead: you hook up all your observers, remember your futures, push all the data through the pipeline, and then ask for the results when everything's finished. The file is only read once, and all is well. I see that Future<T> has been removed from PFX now, but I've so failed to find the blog post explaining why...

    Now, Wes, Erik et al are much smarter than me - which suggests there's a better way of handling this in Rx. Would anyone care to shine some light on why my train of thought is bad/wrong/stupid/illogical?


    Web site: http://www.pobox.com/~skeet Blog: http://www.msmvps.com/jon.skeet
    Friday, January 15, 2010 11:58 AM

Answers

  • The reason that they are blocking is simply for parity with Linq to Objects.  Older internal versions of Rx, used to have non-blocking versions as well.  We definitely think these are interesting and useful and perhaps they will make a comeback.

    Also, you can usually write your code without resorting to subjects and tasks and semicolons.

    Let me know if there are any other concerns.
    Tuesday, January 19, 2010 3:08 AM

All replies

  • Hi Jon,

    > First problem: when I subscribe to an IObservable created from an IEnumerable, it will start iterating for that one subscriber.
    > That's not handy - I want three subscribers.  I found a way to fix that: use a Subject<string>, [snip]


    Hmm, good idea to subscribe the three aggregates before attaching Subject<T> to the source.

    Essentially what you need is a non-greedy dispatcher (Subject<T> seems to be a greedy dispatcher) - something that accepts multiple subscriptions and pushes the values to each as they appear, without subscribing to the source until activated.  I haven't seen anything like what you need in Rx though.  Perhaps it would be somewhat simple to create a non-greedy ActivatedSubject<T> that attaches to the source only when a specific method was called, such as Activate().

    > The other problem is somewhat bigger: if I call Observable.Count, that's going to return an int immediately. [snip]

    You could use a closure and keep the count in a call to Subscribe.  It's not as elegant as a Count of Future<T> (I like your idea) but it'll work.  Perhaps a new LazyCount() extension method would be useful to provide this alternative.

    > I see that Future<T> has been removed from PFX now [snip]

    I think it's been superseded by Lazy<T> in .NET 4.0.

    http://msdn.microsoft.com/en-us/library/dd642331(VS.100).aspx

    Edit: I was just looking for some definitive evidence about this replacement and found the following instead (read the comments):

    https://connect.microsoft.com/VisualStudio/feedback/ViewFeedback.aspx?FeedbackID=513279&wa=wsignin1.0 

    - Dave

    • Edited by Dave Sexton Friday, January 15, 2010 1:47 PM Added link about Lazy<T>
    Friday, January 15, 2010 1:33 PM
  • Does this http://social.msdn.microsoft.com/Forums/en-US/rx/thread/0663ac98-60b0-4b88-9c7e-3b2c75c971d8/ help for Observable.Count (Wes's post at the bottom re: Observable.Defer)?   "It takes a function that returns an observable and invokes the function at the time of subscription for each subscription."  Except that you want to invoke the function OnCompleted, not at the time of subscription....
    Friday, January 15, 2010 1:34 PM
  • First problem: when I subscribe to an IObservable created from an IEnumerable, it will start iterating for that one
    subscriber. That's not handy - I want three subscribers. I found a  way to fix that: use a Subject<string>, add each of the three subscribers, then subscribe the subject to the observable. It's a bit clunky though, and it feels like there ought to be a better way. I sort of want a deferred observable from the IEnumerable<T>: one which lets me subscribe multiple things, and then hit "go". I could build one myself, of course, but I suspect there's already one somewhere in the framework. Thoughts?
    Operators like .Publish() or .Replay() allows to share source values and side-effects between many observables...

    The other problem is somewhat bigger: if I call Observable.Count, that's going to return an int immediately. That means it's got  to block until it's received everything. At this point we've lost a lot of the benefit of using a push model, IMO.
    It's all alright when you stay at monad, problems starts when you try to get from it :)
    Friday, January 15, 2010 1:55 PM
  • Hi,

    > Operators like .Publish() or .Replay() allows to share source values and side-effects between many observables...

    Publish will create a Subject<T> that is greedy.  If you add multiple subscriptions, the latter ones will miss values pushed to the former.  What Jon needs is a non-greedy Subject<T> that waits for activation.

    Replay will replay the entire sequence to each subscription.  Jon wants to avoid this - the goal is to only enumerate the sequence once.  (Please correct me if I'm wrong.)

    - Dave
    http://davesexton.com/blog
    Friday, January 15, 2010 2:00 PM
  • Just in case anyone's interested, I've now blogged about this whole experience. Comments very welcome, of course.
    Web site: http://www.pobox.com/~skeet Blog: http://www.msmvps.com/jon.skeet
    Saturday, January 16, 2010 8:18 PM
  • Hi Jon,

    the most powerful aggregation function Aggregate is nearly available as additional asynchronous version: Observable.Scan() . Actually Aggreate usese Scan internally. :) .. Scan() differs from Aggregate of course in the way that it returns/generates an updated aggregation value for each incoming value.
    But I fully agree with you that Count, All, Any, First, FirstOrDefault, Single, SingleOrDefault, Last, LastOrDefault and IsEmpty should be provided as non-blocking methods as well. The most natural way for me would be to return IObservable<>s which generate a value when the blocking versions would return. If I would like to blocking versions I could anytime use Observable.ToEnumerabe() and call First() and all the other methods on it. As soon as an asynchronous Last would be available it would of course be easy to convert Scan() into the real asynchronous counterpart to Aggregate by using Scan().AsyncLast() ...

    Andreas
    Saturday, January 16, 2010 11:42 PM
  • Hi Jon,

    I've read your blog post but I still think there's an easier way to do the aggregations so that the source is only enumerated once: use the Do method and closures.

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Threading;
    
    namespace ReactiveProgrammingConsole
    {
      class Program
      {
        static void Main()
        {
          int min = int.MaxValue, max = int.MinValue, count = 0;
    
          var aggregates = WhenLineProduced()
            .Select(line => line.Length)
            .Do(length => min = Math.Min(min, length))
            .Do(length => max = Math.Max(max, length))
            .Do(_ => count++)
            .Finally(() => Console.WriteLine("Count: {0}, Min: {1}, Max: {2}", count, min, max));
    
          using (aggregates.Subscribe())
          {
            Console.ReadKey();
          }
        }
    
        private static IObservable<string> WhenLineProduced()
        {
          return ProduceLines().ToObservable();
        }
    
        private static IEnumerable<string> ProduceLines()
        {
          Console.WriteLine("Yield line 1");
          yield return "line 1 - length=18";
          Console.WriteLine("Yield line 2");
          yield return "line 2 - length= 19";
          Console.WriteLine("Yield line 3");
          yield return "line 3 - length=  20";
          Console.WriteLine("Yield line 4");
          yield return "line 4 - length=   21";
          Console.WriteLine("Yield line 5");
          yield return "line 5 - length=    22";
        }
      }
    }
    


    - Dave
    http://davesexton.com/blog
    Saturday, January 16, 2010 11:58 PM
  • Hi Jon,

    Ok, the grouping example was decidedly harder to solve in a more elegant way.  But I think I've found something if you're willing to defer the aggregate computation until after the grouping, at the point of consumption.

    using System;
    using System.Collections.Generic;
    using System.Linq;
    
    
    namespace ReactiveProgrammingConsole
    {
      class Program
      {
        static void Main()
        {
          var lineGroups = from line in WhenLineProduced()
                           group line by line.Length into grouped
                           select grouped;
    
          foreach (var group in lineGroups.ToEnumerable())
            Console.WriteLine("Length: {0}; Count: {1}", group.Key, group.Count());
    
          Console.ReadKey();
        }
    
        private static IObservable<string> WhenLineProduced()
        {
          return ProduceLines().ToObservable();
        }
    
        private static IEnumerable<string> ProduceLines()
        {
          Console.WriteLine("Yield line 1");
          yield return "line 1 - length=18";
          Console.WriteLine("Yield line 2");
          yield return "line 2 - length= 19";
          Console.WriteLine("Yield line 3");
          yield return "line 3 - length=18";
          Console.WriteLine("Yield line 4");
          yield return "line 4 - length=  20";
          Console.WriteLine("Yield line 5");
          yield return "line 5 - length= 19";
        }
      }
    }
    


    - Dave
    http://davesexton.com/blog
    Sunday, January 17, 2010 12:27 AM
  • Hi,

    I thought that MostRecentValue or LatestValue usable.
    But it may be a misunderstanding .
    I do not understand MostRecentValue/LatestValue movement well .

    // sorry, my english is based on machine translate.

    public class PushCollection<T> : IObservable<T>
    {
        Subject<T> subject = new Subject<T>();
        IEnumerable<T> source;
    
        public PushCollection(IEnumerable<T> source)
        {
            this.source = source;
        }
    
        public void Force()
        {
            foreach (var item in source)
            {
                try { subject.OnNext(item); }
                catch (Exception e) { subject.OnError(e); return; }
            }
            subject.OnCompleted();
        }
    
        public IDisposable Subscribe(IObserver<T> observer)
        {
            return subject.Subscribe(observer);
        }
    }
    
    public static class Extensinos
    {
        public static PushCollection<T> ToPushCollection<T>(this IEnumerable<T> source)
        {
            return new PushCollection<T>(source);
        }
    
        public static IPropertyGetter<int> DeferredSum(this IObservable<int> source)
        {
            return source.Scan((x, y) => x + y).MostRecentValue(0);
        }
       
        public static IPropertyGetter<int> DeferredMax(this IObservable<int> source)
        {
            return source.Scan((x, y) => (x > y) ? x : y).MostRecentValue(0);
        }
    
        public static IPropertyGetter<int> DeferredCount<T>(this IObservable<T> source)
        {
            // source.Scan(0,(count,_)=>++count) can't move. i don't know why not move.
            var count = 0;
            return source.Do(_ => ++count).Select(_ => count).MostRecentValue(0);
        }
    }
    
    // Example
    class Program
    {
        static void Main(string[] args)
        {
            var range = Enumerable.Range(1, 10).ToPushCollection();
            var deferredSum = range.DeferredSum();
            var deferredMax = range.DeferredMax();
            var deferredCount = range.DeferredCount();
            range.Force(); // raise
            Console.WriteLine(deferredSum.Get()); // 55
            Console.WriteLine(deferredMax.Get()); // 10
            Console.WriteLine(deferredCount.Get()); // 10
           
        }
    }

    My CodePlex Project:http://linqjs.codeplex.com/
    Sunday, January 17, 2010 12:30 AM
  • Andreas,

    I wondered about making the non-blockers return IObserverable<> instead of Task<>. I couldn't quite work out how I'd be able to get the value in a "just fetch it after everything's finished" kind of way - it's possible that a mixture of Defer and Return would do that though. I'm still woefully inexperienced at the moment :)

    Jon

    Web site: http://www.pobox.com/~skeet Blog: http://www.msmvps.com/jon.skeet
    Sunday, January 17, 2010 8:03 AM
  • Thanks Dave - that's definitely useful. I hadn't seen "Do" before. I suspect it's tricky to turn that into something which can be examined in a "pull" fashion after the fact, but that's not too much of a problem. It would be nice to encapsulate the side-effects of each operator isolated, but I'm not sure how at the moment.
    Web site: http://www.pobox.com/~skeet Blog: http://www.msmvps.com/jon.skeet
    Sunday, January 17, 2010 8:07 AM
  • No, I definitely don't want to defer the aggregation. The idea is that there shouldn't be any reason for this to be a problem on a data source with hundreds of millions of lines... it really should only have to keep track of the groups it knows about and the count within each group. You should never need more than one line in memory.

    Basically this is showing the same problem as in LINQ to Objects - you can easily write a query which does what you need, but it ends up loading the whole file (or whatever) into memory, which isn't nice :(

    Jon

    Web site: http://www.pobox.com/~skeet Blog: http://www.msmvps.com/jon.skeet
    Sunday, January 17, 2010 8:09 AM
  • Excellent - that's nice. I think I'd call it "MaxSoFar" and "CountSoFar" etc - it can give results before the sequence is complete, which is good in some situations and bad in others. I think your count could be implemented as:

    source.Scan(0, (count, _) => count + 1).MoveRecentValue(0);

    Web site: http://www.pobox.com/~skeet Blog: http://www.msmvps.com/jon.skeet
    Sunday, January 17, 2010 8:13 AM
  • I think we're all working out how to use this API in the right way. Here's an attempt at a non-blocking Last() which returns an IObservable.

    public static IObservable<T> Last<T>(this IObservable<T> source)
    {
       T last=default(T);
       bool hasValue = false;
       return Observable.CreateWithDisposable<T>(observer =>
       {
            var disp = source.Subscribe(x => {last=x; hasValue=true;},
                () => {
                        if (hasValue)
                        {
                            observer.OnNext(last);
                        }
                        observer.OnCompleted() ;
                    });
            return disp;
        });
    }

    And test code to verify:
    var xs = Enumerable.Range(1, 1000).ToObservable();
    var xsdelay = Observable.Delay(xs, new TimeSpan(0, 0, 5));
    xs.Scan((a, n) => a + 1).Last().Subscribe(x => Console.WriteLine("Last = {0}", x));
    xsdelay.Scan((a, n) => a + 1).Last().Subscribe(x => Console.WriteLine("Last (delayed) = {0}", x));
    Console.ReadKey();
    



    --Scott W.
    http://weblogs.asp.net/sweinstein
    Sunday, January 17, 2010 4:31 PM
  • Hi Jon,

    > No, I definitely don't want to defer the aggregation.

    At least the file is only read once - each line is placed into the correct group as it's read.  But yes, I acknowledge that keeping the groups in memory does not solve the problem you raised.

    Though, a closure would do the trick again:

    using System;
    using System.Collections.Concurrent;
    using System.Collections.Generic;
    using System.Linq;
    
    namespace ReactiveProgrammingConsole
    {
      class Program
      {
        static void Main()
        {
          var counts = new ConcurrentDictionary<int, int>();
    
          var lines = WhenLineProduced()
            .Do(line => counts.AddOrUpdate(line.Length, 1, (_, v) => ++v))
            .Finally(() =>
            {
              foreach (var count in counts)
                Console.WriteLine("Length: {0}; Count: {1}", count.Key, count.Value);
            });
    
          using (lines.Subscribe())
          {
            Console.ReadKey();
          }
        }
    
        private static IObservable<string> WhenLineProduced()
        {
          return ProduceLines().ToObservable();
        }
    
        private static IEnumerable<string> ProduceLines()
        {
          Console.WriteLine("Yield line 1");
          yield return "line 1 - length=18";
          Console.WriteLine("Yield line 2");
          yield return "line 2 - length= 19";
          Console.WriteLine("Yield line 3");
          yield return "line 3 - length=18";
          Console.WriteLine("Yield line 4");
          yield return "line 4 - length=  20";
          Console.WriteLine("Yield line 5");
          yield return "line 5 - length= 19";
        }
      }
    }
    


    - Dave
    • Edited by Dave Sexton Sunday, January 17, 2010 4:56 PM Fixed code formatting
    Sunday, January 17, 2010 4:54 PM
  • Hi Jon,

    > It would be nice to encapsulate the side-effects of each operator isolated [snip]

    Could you please elaborate on this?


    > I hadn't seen "Do" before. I suspect it's tricky to turn that into something which can be examined in a "pull" fashion after the fact [snip]

    I'm not sure what you mean here - since it's an IObservable it's already working in a pull fashion.  Maybe it's the parameterless Subscribe that's bothering you?  Do you instead expect to wrap this into an IObservable<T> that produces only one element, where T is a value that contains all of the aggregates?  In that case, I'm not sure that IObservable<T> is the most specific type that can be used.

    Instead, we can use Lazy<T> to represent the pull nature.  And we can even move the closures into Subscribe instead of Do if you prefer that.  Perhaps the following code would be more acceptable:

    using System;
    using System.Collections.Generic;
    using System.Linq;
    
    namespace ReactiveProgrammingConsole
    {
      class Program
      {
        static void Main()
        {
          var lazy = Aggregate(WhenLineProduced());
    
          var aggregates = lazy.Value;
    
          Console.WriteLine("Count: {0}, Min: {1}, Max: {2}", aggregates.Item1, aggregates.Item2, aggregates.Item3);
    
          Console.ReadKey();
        }
    
        private static Lazy<Tuple<int, int, int>> Aggregate(IObservable<string> lines)
        {
          var ev = new System.Threading.ManualResetEvent(false);
          int min = int.MaxValue, max = int.MinValue, count = 0;
    
          lines
            .Finally(() => ev.Set())
            .Subscribe(line =>
            {
              int length = line.Length;
              min = Math.Min(min, length);
              max = Math.Max(max, length);
              count++;
            });
    
          return new Lazy<Tuple<int, int, int>>(() =>
            {
              using (ev)
              {
                ev.WaitOne();
                return new Tuple<int, int, int>(count, min, max);
              }
            });
        }
    
        private static IObservable<string> WhenLineProduced()
        {
          return ProduceLines().ToObservable();
        }
    
        private static IEnumerable<string> ProduceLines()
        {
          Console.WriteLine("Yield line 1");
          yield return "line 1 - length=18";
          Console.WriteLine("Yield line 2");
          yield return "line 2 - length= 19";
          Console.WriteLine("Yield line 3");
          yield return "line 3 - length=  20";
          Console.WriteLine("Yield line 4");
          yield return "line 4 - length=   21";
          Console.WriteLine("Yield line 5");
          yield return "line 5 - length=    22";
        }
      }
    }
    


    - Dave
    http://davesexton.com/blog
    Sunday, January 17, 2010 5:38 PM
  • > It would be nice to encapsulate the side-effects of each operator isolated [snip]

    Could you please elaborate on this?
    Sure. If you look at your solution, it's got variables being side-effected by the "Do". The version in my blog post has each of those side-effected variables set up within their own methods (FutureMin, FutureMax etc) which appeals somewhat more: we can look at each method, examine the side effect and deem it okay, without having to worry about it in a more global context.
    > I hadn't seen "Do" before. I suspect it's tricky to turn that into something which can be examined in a "pull" fashion after the fact [snip]

    I'm not sure what you mean here - since it's an IObservable it's already working in a pull fashion.
    No, it's already working in a push fashion. It pushes the results out to the console  IEnumerable is the pull model for sequences. Again we could use side effects and mutate captured variables to fetch the value later, but I prefer something that encapsulates the "this will have a value in the future" concept.

    Lazy<T> and a ManualReset event does indeed do the job, but that's really just the same as me using Task<T> and a TaskCompletionSource<T> in my blog post, isn't it? (I'd argue that the task-based version is slightly simpler.)

    Not that I don't appreciate all these suggestions, don't get me wrong... but by the time you've introduced the Lazy<T>, I'm not sure we've gained much. My point is that I'd expect to see this "future aggregation" as part of the framework. I still feel I'm missing something.

    Btw, isn't the Finally going to give you a value even when there's an error? I would expect a prematurely-ended sequence to result in an exception rather than completion, which is why I've just used the "triple action" overload for FutureAggregate in my blog post. Is there an advantage to Finally that I've missed?

    Jon

    Web site: http://www.pobox.com/~skeet Blog: http://www.msmvps.com/jon.skeet
    • Proposed as answer by Ryan RileyMVP Friday, September 16, 2011 6:15 AM
    Monday, January 18, 2010 1:02 AM
  • > No, I definitely don't want to defer the aggregation.

    At least the file is only read once - each line is placed into the correct group as it's read.  But yes, I acknowledge that keeping the groups in memory does not solve the problem you raised.

    Though, a closure would do the trick again:
    <snip>

    Sure, that would work - but at the cost of rewriting both GroupBy and Count, effectively. What happened to the compositional beauty of the framework? :) As soon as you've got future aggregation, you can use the normal grouping (including doing it in a query expression, if there's other stuff you want) and you're back to elegance.

    The more of the framework you're willing to rewrite the way you want it to work, the more you'll be able to do exactly what you want - but I was hoping to not have to rewrite anything :)

    Jon

    Web site: http://www.pobox.com/~skeet Blog: http://www.msmvps.com/jon.skeet
    Monday, January 18, 2010 1:05 AM
  • Hi Jon,

    > The version in my blog post has each of those side-effected variables set up within their own methods [snip]

    Yep, I was just trying to show how easy it is to produce the same result without library support.  Of course you know this - I'm certainly not questioning your knowledge of C# - but I understood your original issue with Rx as not being able to do this, whereas I felt using closures actually made it very easy.  After all, it's pretty straight forward and not much code.

    > No, it's already working in a push fashion.

    Of course it is.  I don't know how I accidentally reversed it - but thanks for correcting me :p

    > Lazy<T> and a ManualReset event does indeed do the job, but that's really just the same as me using Task<T> [snip]

    Hmm, well actually the reason why I made that very complicated method was to solve a non-issue.  I thought you were asking about how to make the already-a-push model into a push model.  I just accidentally switched the terminology somehow and misunderstood pull for push :)

    Frankly, I think both solutions are overkill.  I still think my original closure solution is the most elegant without library support.  Wouldn't mind library support though - I think Andreas' extensions are quite nice:

    http://social.msdn.microsoft.com/Forums/en-US/rx/thread/a2e4ea29-491c-498b-9ec5-a2a9d7ceb742

    > Not that I don't appreciate all these suggestions, don't get me wrong... but by the time you've
    > introduced the Lazy<T>, I'm not sure we've gained much.  [snip]


    Agreed.  My misunderstanding.

    > Btw, isn't the Finally going to give you a value even when there's an error?

    In my Lazy<T> example, the Finally is required to unblock the Lazy<T> function.

    If you wanted to add error handling to my Aggregate method, you could simply accept an onError continuation as a parameter and pass it to the Subscribe function.

    - Dave
    http://davesexton.com/blog
    Monday, January 18, 2010 2:03 AM
  • Hi Jon,

    > Sure, that would work - but at the cost of rewriting both GroupBy and Count, effectively.

    Hehe, yes.  Having library support would be nice - like I said from the very beginning, I liked your Future<T> idea.

    But you have to admit that using a closure is quite simple.  When you say "rewriting" it sounds like a lot of work - but it's clearly not.

    - Dave
    http://davesexton.com/blog
    Monday, January 18, 2010 2:07 AM
  • It's fairly simple - but I'd rather take a bit of extra time to have something reusable and composable than have one-off solutions throughout a codebase. Personal preference, I guess :)

    Jon

    Web site: http://www.pobox.com/~skeet Blog: http://www.msmvps.com/jon.skeet
    Monday, January 18, 2010 6:29 AM
  • How about using ReplaySubject as in

          ReplaySubject<long> sc = new ReplaySubject<long>();
    
          var ies = Observable.Interval( TimeSpan.FromMilliseconds( 250))
                               .Take(16);
          ies.Subscribe( sc);
          Console.WriteLine( sc.Count().ToString());
          Console.WriteLine( sc.Aggregate<long>((x,y) => Math.Max(x,y)).ToString());
          Console.WriteLine( sc.Aggregate<long>((x,y) => Math.Min(x,y)).ToString());
    
    Graeme



    Monday, January 18, 2010 10:16 AM
  • Should simply use:

          ReplaySubject<long> sc = Observable.Interval( TimeSpan.FromMilliseconds( 250))
                               .Take(16).Record();
    

    Monday, January 18, 2010 12:46 PM
  • Hi Graeme,

    Jon's trying to avoid the memory consumption.  I'm just assuming, but ReplaySubject probably buffers all of the contents so that it can be replayed.

    - Dave
    http://davesexton.com/blog
    Monday, January 18, 2010 2:18 PM
  • Hi Jon

    I agree it would be nice to have non blocking aggregation results.

    Is this a solution for your first problem?

    Task<int> min = null;
    Task<int> max = null;
    Task<int> count = null;
    var source = ReadLines("../../Program.cs");
    source.ToObservable(Scheduler.Now).Select(line => line.Length).Publish(xs =>
    {
        min = xs.FutureMin();
        max = xs.FutureMax();
        count = xs.FutureCount();
        return xs;
    }).Subscribe();
    Console.WriteLine("Count: {0}, Min: {1}, Max: {2}", count.Result, min.Result, max.Result);

     

    var source = ReadLines("../../Program.cs");
    var groups = from line in source.ToObservable(Scheduler.Now)
                 group line.Length by line.Length into grouped
                 select new { Length = grouped.Key, Count = grouped.FutureCount() };
    foreach (var group in groups.ToEnumerable())
    {
        Console.WriteLine("Length: {0}; Count: {1}", group.Length, group.Count.Result);
    }
    Monday, January 18, 2010 3:47 PM
  • Hi Jon,

    >  [snip] I'd rather take a bit of extra time to have something reusable and composable than have one-off solutions throughout a codebase.

    Touche'.  :)

    I guess what bothers me about your solution is the TPL.  Rx, to me, is supposed to hide Task<T> and the like to simplify composability.  My solutions were attempts at using Rx to extend Rx.

    That said, your solution is better than mine for keeping Rx composable while encapsulating reusable algorithms, but I think Andreas solution is really good (already linked above) and might be the best so far.  Do you see any problems with it?

    - Dave
    http://davesexton.com/blog
    Monday, January 18, 2010 4:32 PM
  • Yes, selecting them all in one go does solve the "immediate consumption" problem - as well as meaning you don't require that extra step.

    It still feels like a method called "Subscribe" is really doing a bit too much, but that's probably just my prejudice having written a framework which behaves a different way.

    Jon


    Web site: http://www.pobox.com/~skeet Blog: http://www.msmvps.com/jon.skeet
    Monday, January 18, 2010 7:42 PM
  • >  [snip] I'd rather take a bit of extra time to have something reusable and composable than have one-off solutions throughout a codebase.

    Touche'.  :)
    Sorry, that sounded snippier than it was meant to. I am keen on composability though - particularly from a framework which touts that as one of its strengths :)
    I guess what bothers me about your solution is the TPL.  Rx, to me, is supposed to hide Task<T> and the like to simplify composability.  My solutions were attempts at using Rx to extend Rx.

    That said, your solution is better than mine for keeping Rx composable while encapsulating reusable algorithms, but I think Andreas solution is really good (already linked above) and might be the best so far.  Do you see any problems with it?

    Only that I don't know enough Rx to really understand it at more than a skin-deep level at the moment - I'm still very new to this :)

    I assume I could then call the normal Single method on those observables to get the value after I believe they should be done... I may give that a try another time. It's not obvious to me whether AsyncSubject ends up using extra threads - this should all be doable within a single thread. However, that's really not much of a problem.

    It's great to so many bright and keen developers willing to help me overcome these initial hurdles. Sadly enough, at the moment I've only got enough time to learn "just enough" Rx to write about it in my book - and that's almost entirely in terms of how the LINQ pattern can still be applied even to a completely dual data model - but hopefully I'll be able to get properly into Rx when the book's done...

    Jon

     


    Web site: http://www.pobox.com/~skeet Blog: http://www.msmvps.com/jon.skeet
    Monday, January 18, 2010 7:50 PM
  • The reason that they are blocking is simply for parity with Linq to Objects.  Older internal versions of Rx, used to have non-blocking versions as well.  We definitely think these are interesting and useful and perhaps they will make a comeback.

    Also, you can usually write your code without resorting to subjects and tasks and semicolons.

    Let me know if there are any other concerns.
    Tuesday, January 19, 2010 3:08 AM
  • I missed the memory consumption requirement. So what goes on in the background if you simply do:

          var sa = Observable.Interval( TimeSpan.FromMilliseconds( 250))
                               .Take(16);
          
          var q = Observable.Merge(
            Observable.Start(() => sa.Aggregate<long,long>(0,(x,y) => x+1).ToString()),
            Observable.Start(() => sa.Aggregate<long>((x,y) => Math.Max(x,y)).ToString()),
            Observable.Start(() => sa.Aggregate<long>((x,y) => Math.Min(x,y)).ToString())
            );
          
          q.Subscribe( s => Console.WriteLine( s));
    
    Graeme

    Tuesday, January 19, 2010 10:48 AM
  • Hi Graeme,

    Well that solution has other problems.  First, each aggregate will restart sa.  That means the file will be read 3 times.  Secondly, the output is a sequence with three values, which are the results of the three aggregates.  The problem is that they could appear in any order because of Merge, so you'll have no way of identifying which value was calculated by which aggregate.

    It does seem, however, to kind of solve the problem with memory consumption because it will only process a single line at a time.  Although, since each aggregate will be running on a separate thread you'll actually have 3 lines in memory at once instead of just one.

    - Dave
    http://davesexton.com/blog
    Tuesday, January 19, 2010 6:14 PM
  • Just curious...why not compute all count, min, and max all at once?

    var q = sa.Aggregate<long, long>(new { count = 0, max = long.MinValue, min = long.MaxValue }, (x, y) => new { count = x.count + 1, max = Math.Max(x.max, y), min = Math.Min(x.min, y) });

    q.Subscribe(s => Console.WriteLine(s));

    ??
    Tuesday, January 19, 2010 6:24 PM
  • Hi Wes,

    Isn't that roughly the same as my closure solution?

    - Dave
    http://davesexton.com/blog
    Tuesday, January 19, 2010 7:10 PM
  • Yes with one caveat.  You need to wrap both the definition of the observable and the variables that are closed over with an Observable.Defer otherwise multiple subscriptions will share the variables which is probably not what is intended.
    Tuesday, January 19, 2010 7:33 PM
  • And now a follow-up to that blog post...


    Jon


    Web site: http://www.pobox.com/~skeet Blog: http://www.msmvps.com/jon.skeet
    Tuesday, January 19, 2010 8:51 PM