locked
Is there anyway for this GroupBy to not leak memory? RRS feed

  • Question

  • The following fragment will rapidly reach the memory limit.

     

    Observable.Repeat(0).Select(_ => Guid.NewGuid()).GroupBy(g => g).Select(g => g.Take(1).ToArray()).Subscribe()
    


    I would have thought that the inner Take(1), by OnCompleting the stream would enable GroupBy to stop grouping on that key, and free up memory. Is this not the case, meaning not a bug, but by design, or am I simply doing it wrong?

     

    Here's my custom implementation

        public static IObservable<IList<T>> GroupbyIdAndLength<T,TKey>(this IObservable<T> src, Func<T,TKey> keySelector, Func<T,int> lenSelector)
        {
          Dictionary<TKey,Tuple<int,List<T>>> dict = new Dictionary<TKey,Tuple<int,List<T>>>();
          return Observable.Create<IList<T>>(obsvr => 
            {
              var disp = src.Subscribe(nval =>
                      {
                        Tuple<int,List<T>> element;
                        var key = keySelector(nval);
                        if (dict.TryGetValue(key, out element))
                        {
                          element.Item2.Add(nval);
    
                          if (element.Item2.Count == element.Item1)
                          {
                            dict.Remove(key);
                            obsvr.OnNext(element.Item2);
                          }
                        }
                        else
                        {
                          int len = lenSelector(nval);
                          List<T> list = new List<T>() { nval };
                          if (len == 1)
                          {
                            obsvr.OnNext(list);
                          }
                          else
                          {
                            element = new Tuple<int, List<T>>(len, list);
                            dict.Add(key, element);
                          }
                        }
                      });
              return () =>
                {
                  disp.Dispose();
                  dict.Clear();
                };
            }
          );
        }
    
    


     


    --Scott W.
    http://weblogs.asp.net/sweinstein
    Thursday, June 23, 2011 3:11 PM

Answers

  • This behavior is by design, but you could use GroupByUntil (something like)

    Observable.Repeat(0).Select(_ => Guid.NewGuid()).GroupByUntil(g => g, g=> g)

     

    Thursday, June 23, 2011 10:21 PM

All replies

  • This behavior is by design, but you could use GroupByUntil (something like)

    Observable.Repeat(0).Select(_ => Guid.NewGuid()).GroupByUntil(g => g, g=> g)

     

    Thursday, June 23, 2011 10:21 PM
  • Hi, Scott,

    As Wes has written, the behavior of GroupBy is the only way it can work. You are likely to be missing 2 key points:

    1. Your Take(1) actually does NOT stop the group by subscription. It only stops subscriptions to new individual group observables created by GroupBy, while the subscription to Select(_ => Guid.NewGuid()) is never ended.
    2. GroupBy has to internally keep records of all groups it has ever produced. It should be clear that it needs this to be able to direct the next element from the source observable sequence to an appropriate group. It means it has a container of references to all groups (some kind of list, presumably). And you group by unique keys, which means that the number of groups is constantly increasing, hence the memory leak.

    In your sample the same result can be achieved by a simplier query:

    Observable.Repeat(0).Select(_ => Guid.NewGuid()).Select(g => g.ToArray()).Subscribe()
    


    In a real-world scenario, if you do need to group an observable, there might be 2 cases:

    1. You have a finite source observable. Then the GroupBy's behavior is not a problem, because it will create the required number of groups and then release them as soon as the source ends.
    2. You have an infinite source. Then you simply cannot use GroupBy for the reasons I've written above. You need to place some limit on the grouping process, for example by following Wes's advice.

    By the way, your GroupByIdAndLength operator is analogous to GroupByUntil.

    Friday, June 24, 2011 1:59 AM