none
How can I prevent this query from leaking memory?

    Question

  • I have written some code which turns the FileSystemWatcher's Changed event in to an observable sequence.

    My goal is two split all file system changes in to separate streams and throttling them. I don't care about the data in the group, I just want each file to become a separate stream. The GroupBy() method gives me an Observable sequence of Observable sequences, but it's leaking memory.

    For example if I have 10 different files which change 3 times in half a second, I'll only get a notification once for each file.

    How can I solve this problem? Thanks for your time.

    FileSystemWatcher _watcher = new FileSystemWatcher("d:\\") {
        EnableRaisingEvents = true,
        NotifyFilter = NotifyFilters.LastWrite | NotifyFilters.Size
    };
    
    void Main()
    {
        var fileSystemEventStream = 
            Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>
                (
                    _ => _watcher.Changed += _, 
                    _ => _watcher.Changed -= _
                )
                .ObserveOn(ThreadPoolScheduler.Instance)
                .SubscribeOn(ThreadPoolScheduler.Instance)
                .GroupBy(ep => ep.EventArgs.FullPath, ep => ep.EventArgs.FullPath)
                ;
    
        var res = 
            from fileGroup in fileSystemEventStream
            from file in fileGroup.Throttle(TimeSpan.FromSeconds(1))
            select file;
    
        res.Subscribe(
            ReceiveFsFullPath, 
            exception => {
                Console.WriteLine ("Something went wrong - " + exception.Message + " " + exception.StackTrace);
            });
    
        Console.Read();
    }
    
    void ReceiveFsFullPath(string s){
        Console.WriteLine ("Received file system event on thread " + Thread.CurrentThread.ManagedThreadId);
        Console.WriteLine(s);
    }


    The universe is mostly hydrogen and ignorance.


    Monday, August 05, 2013 9:22 AM

Answers

  • Hi,

    I think you've misunderstood how GroupBy works.  Internally, it look something like this: IDictionary<IKey, IObservable<Item>>

    It doesn't buffer any items at all.  It just buffers observables by key.  The observables that it stores do not buffer items either.

    The problem with GroupBy is that the keys are never removed.  Every new file will cause a new group to be added.  If you have a growing number of files, then the internal dictionary will grow as well.

    GroupByUntil is the same as GroupBy except it allows you to specify a duration for each key.  In the example that I provided, I'm using the deleted event of the corresponding file to remove groups.

    Try running my example program, it works.

    > Although looking at the documentation it says that once a group expires and an item with the same key value is encountered, the group is reborn?

    It means that when the duration elapses for a key the group is removed, though not permanently.  If the same key is encountered again, a new group is created for that key.  That's probably the behavior that you'd want.

    - Dave


    http://davesexton.com/blog

    • Marked as answer by Vince Panuccio Tuesday, August 06, 2013 11:29 AM
    Tuesday, August 06, 2013 5:21 AM

All replies

  • So the code looks fine and I believe that it works. What is the problem you are facing? i.e. what is the perceived behaviour and what is the expected behaviour?

    Cheers

    Lee

    p.s. Normally the variable name '_' indicates that you dont use it. I would suggest changing your useage of it to 'h' for handler, or even just 'handler'.

    Oh and normally I would put the SubscribeOn before the ObserveOn, but that is just a style thing. It helps me read the code as it would flow i.e. You subscribe first and then you observe values. However in this case, I dont think adding a SubscribeOn adds any value at all, so it can be removed.


    Lee Campbell http://LeeCampbell.blogspot.com


    • Edited by LeeCampbell Monday, August 05, 2013 9:42 AM Adding ObserveOn/SubscribeOn comments
    Monday, August 05, 2013 9:38 AM
  • Hi Lee,

    Thanks for your prompt response.

    Just reading back over my question and it appears I actually put the question in the subject.

    The problem with this code is that the GroupBy() method is causing it to "leak" memory over time. I need the functionality of the group by as I want to create an observable sequence unique for each new file encountered, but I don't care so much about the group.


    The universe is mostly hydrogen and ignorance.

    Monday, August 05, 2013 10:58 AM
  • Hi,

    What do you mean by "I don't care so much about the group".  It seems like grouping is exactly what you want.

    Note that Rxx provides a few extensions for FileSystemWatcher that you may find useful.  Here are some labs that illustrate their usage:

    As for leaking memory, are you referring to the fact that once a group is opened it never closes?

    Use GroupByUntil instead.  For example, you could close a group when its file is deleted:

    using System;
    using System.IO;
    using System.Reactive.Linq;
    using System.Threading.Tasks;
    
    namespace RxLabs.Net45
    {
      class FileSystemWatcherGroupingLab
      {
        internal static void Main()
        {
          const string fileExtension = ".fswlab";
          var directory = Path.GetTempPath();
    
          var watcher = new FileSystemWatcher(directory, "*" + fileExtension)
          {
            EnableRaisingEvents = true,
            NotifyFilter = NotifyFilters.LastWrite | NotifyFilters.Size | NotifyFilters.FileName
          };
    
          var changes = Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
            eh => watcher.Changed += eh,
            eh => watcher.Changed -= eh)
            .Select(e => e.EventArgs.FullPath)
            .Select(file => Path.GetFileName(file));    // Simplifies diagnostic output
    
          var deletes = Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
            eh => watcher.Deleted += eh,
            eh => watcher.Deleted -= eh)
            .Select(e => e.EventArgs.FullPath)
            .Select(file => Path.GetFileName(file));    // Simplifies diagnostic output
    
          var groups = changes
            .Do(file => WriteLine("File changed: {0}", file, ConsoleColor.DarkGray))
            .GroupByUntil(
              file => file,
              file => file,
              group => deletes.Where(file => file == group.Key)
                .Do(file => WriteLine("Removing group for file: {0}", file, ConsoleColor.Magenta)))
            .Do(group => WriteLine("Created group for file: {0}", group.Key, ConsoleColor.DarkGreen));
    
          var files =
            from fileGroup in groups
            from file in fileGroup.Throttle(TimeSpan.FromSeconds(1))
              .TimeInterval()    // For diagnostic output only
            select file;
    
          using (files.Subscribe(file => WriteLine(
            file.Interval + " -> " + file.Value,
            file.Interval < TimeSpan.FromSeconds(1) ? ConsoleColor.Red : ConsoleColor.Green)))
          using (Observable.Range(1, 10).Select(fileNumber => Observable.StartAsync(
            async cancel =>
            {
              var name = fileNumber + fileExtension;
              var file = Path.Combine(directory, name);
    
              if (!cancel.IsCancellationRequested)
              {
                try
                {
                  WriteLine("Creating file: {0}", name, ConsoleColor.DarkGreen);
    
                  using (var stream = File.CreateText(file))
                  {
                    for (var i = 1; i < 10 && !cancel.IsCancellationRequested; i++)
                    {
                      await stream.WriteLineAsync("Line #" + i);
                      await stream.FlushAsync();
                      await Task.Delay(TimeSpan.FromSeconds(fileNumber * i * .1), cancel);
                    }
                  }
                }
                finally
                {
                  WriteLine("Deleting file: {0}", name, ConsoleColor.Magenta);
    
                  File.Delete(file);
                }
              }
            }))
            .Merge()
            .Subscribe())
          {
            Console.ReadLine();
          }
        }
    
        private static readonly object gate = new object();
    
        private static void WriteLine(string format, ConsoleColor color = ConsoleColor.Gray)
        {
          WriteLine(format, null, color);
        }
    
        private static void WriteLine(string format, object arg = null, ConsoleColor color = ConsoleColor.Gray)
        {
          lock (gate)
          {
            Console.ForegroundColor = color;
            Console.WriteLine(format, arg);
            Console.ResetColor();
          }
        }
      }
    }

    - Dave


    http://davesexton.com/blog

    • Edited by Dave Sexton Monday, August 05, 2013 4:51 PM Formatting
    Monday, August 05, 2013 4:50 PM
  • Sorry, to further clarify; do you mean that because each new event for a new file creates a new sequence, that you may have ever increasing numbers of internal sequences? If this is the problem, then logically how do you see this being solved? i.e. should the sequence just complete and thus clean itself up after some given criteria?If so what criteria would you plan to use:

    • After a given period of inactivity (i.e. after 5seconds where a file is not modified, just terminate the sequence)?
    • When the file is deleted?

    I am sure we can come up with something if you can be more specific about your problem.

    Regards

    Lee Campbell


    Lee Campbell http://LeeCampbell.blogspot.com


    • Edited by LeeCampbell Monday, August 05, 2013 5:02 PM
    Monday, August 05, 2013 5:02 PM
  • What I mean by "I don't care so much about the group" is that when a change happens to a file, the group itself is irrelevant, what is relevant is that a file changed. I'm using the group so I can create a new observable sequence for each file but I don't care about the items in the group, make sense? All I care about is the file which changed and that it occurs on it's own sequence.

    I'm not also worried about the number of internal sequences (created by each unique file), I'm only worried the group growing over time.

    Now that I think about it, the solution would almost be like GroupBy(), except that the internal dictionary maintaining the items in the group would not contain more than one item. As an item is added to the group, the the last item is removed.

    I will look at GroupByUntil(). Although looking at the documentation it says that once a group expires and an item with the same key value is encountered, the group is reborn?


    The universe is mostly hydrogen and ignorance.


    Tuesday, August 06, 2013 4:38 AM
  • Hi,

    I think you've misunderstood how GroupBy works.  Internally, it look something like this: IDictionary<IKey, IObservable<Item>>

    It doesn't buffer any items at all.  It just buffers observables by key.  The observables that it stores do not buffer items either.

    The problem with GroupBy is that the keys are never removed.  Every new file will cause a new group to be added.  If you have a growing number of files, then the internal dictionary will grow as well.

    GroupByUntil is the same as GroupBy except it allows you to specify a duration for each key.  In the example that I provided, I'm using the deleted event of the corresponding file to remove groups.

    Try running my example program, it works.

    > Although looking at the documentation it says that once a group expires and an item with the same key value is encountered, the group is reborn?

    It means that when the duration elapses for a key the group is removed, though not permanently.  If the same key is encountered again, a new group is created for that key.  That's probably the behavior that you'd want.

    - Dave


    http://davesexton.com/blog

    • Marked as answer by Vince Panuccio Tuesday, August 06, 2013 11:29 AM
    Tuesday, August 06, 2013 5:21 AM
  • Oh, seems like a misunderstanding then.

    With LINQ over IEnumerable sequences, you have a key and a group with all the items in it matching the key. I was led to believe (probably because of the way LINQ on IEnumerable works), that each group accumulates over time, but what your saying is that GroupBy() simply creates a sequence of sequences based on the key and doesn't actually accumulate like it does with IEnumerable.

    If that's the case then I think the name GroupBy is misleading.

    Thanks Lee & Dave for your time and for the code samples. I believe GroupByUntil() is actually what I'm after.

    Vince


    The universe is mostly hydrogen and ignorance.

    Tuesday, August 06, 2013 11:28 AM
  • Hi Vince,

    > GroupBy() simply creates a sequence of sequences based on the key and doesn't actually accumulate like it does with IEnumerable.

    Correct.  It's reactive, not interactive.

    The enumerable GroupBy has to buffer all items before it returns; otherwise, the source may change before you enumerate the results, which could break the groupings!

    The observable GroupBy pushes new observables as it receives new keys and redirects items into the correct observables based on their key.  There's no need to buffer items.

    > If that's the case then I think the name GroupBy is misleading.

    It's the duality between IObservable<T> and IEnumerable<T> that makes the name appropriate.  For example, you can define the xs term in the following query as either IObservable<T> or IEnumerable<T> without having to change the query at all!

    var xs = GetSequence(); // IObservable<T> or IEnumerable<T>
    
    var query =
        from x in xs
        group x by x.Name;

     

    - Dave


    http://davesexton.com/blog

    Tuesday, August 06, 2013 2:08 PM
  • Logically it might make sense (the duality) but to a newcomer like myself, they would assume the same behavior as the IEnumerable<T> equivalent.

    No big deal, I get it now. Hopefully they next guy asking the same question stumbles across this post.


    The universe is mostly hydrogen and ignorance.

    Wednesday, August 07, 2013 7:11 AM