locked
thread safety of processing files in parallel RRS feed

  • Question

  • User-153404742 posted

    Hi,

    I haven't worked too much with threads and running tasks in parallel but I'm trying to process multiple files together as they come in...so I have a fileSystemWatcher object that puts files in concurrent queue as they come in...I have a seperate worker thread that calls the function to process the files and in this function, I'm using foreach.parallel to process each file. My question is, are the variables thread safe within the processFile function?  so my code is like below:

      
     #region Fields
         
            private readonly ConcurrentQueue<string> files = new ConcurrentQueue<string>();
            private Thread thread;
            private readonly EventWaitHandle waitHandle = new AutoResetEvent(true);
            private static readonly object lockObject = new object();
           
    #endregion
    
    public void EnqueueFile(string path)
            {
                // Queue the file
                //lock (lockObject)
                //{
                    files.Enqueue(path);
                // }
    
    
                //Initialize and start the worker thread when the first file is queued
                //or when it has been stopped and thus terminated.
                    if (thread == null || shouldStop)
                    {
    
                        thread = new Thread(new ThreadStart(Work));
                        thread.Start();
    
    
                    }
                    // If the thread is waiting then start it
                    else if (thread.ThreadState == ThreadState.WaitSleepJoin)
                    {
                        waitHandle.Set();
                    }
    
            }
    
      private void Work()
            {               
                while (!shouldStop)
                {
                    string path = String.Empty;
                    lock (lockObject)
                    {
                        if (files.Count > 0)
                        {
                         ProcessQueue(files);   
                        }
                    }
    
                    //if there are no files left to process then wait for the next one to come in queue from FileSystemWatcher
                    string tryFile;              
                    if (!files.TryPeek(out tryFile))
                    {
                        waitHandle.WaitOne();
                    }
                }
            }
    
    
     public void ProcessQueue(ConcurrentQueue<string> queue)
    {
                try
                {
    
                  Parallel.ForEach(queue, new ParallelOptions
                    {
                        MaxDegreeOfParallelism = 5 //System.Environment.ProcessorCount
                    },
                       q =>
                       {
                           string f;
                           if (queue.TryDequeue(out f))
                           {
                               ProcessFile(f);  
                           }
                       });
                }
                catch (Exception ex)
                {
                    //log error
                }
    }
    

    Question #1:  Do I need to lock the object when Queuing the files in ConcurrentQueue?

    Question #2:  The ProcessFile(f) function reads the file and creates a dataTable that will be inserted/saved/updated in the database table.  So if I drop in 10 files at onces, the FileSystemWatcher will add all 10 in the queue within a second, and the processQueue function will run at most 5 in parallel...so 5 files will be processing at the same time...so will all the variables and dataTable objects created in their own tasks will be thread safe?  The processing is really quick...about 1-2 seconds for each file.

    Monday, June 20, 2016 6:25 PM

Answers

All replies

  • User765422875 posted

    The concurrent queue uses an underlying list of small arrays and lock-free System.Threading.Interlocked operations on the head and tail arrays.  This allows for thread-safe operations without the need for heavy locks.

    #1) No you don't need a lock when you call Enqueue.

    #2) The DataTable is thread safe for multithreaded read operations. You must synchronize any write operations.

    Monday, June 20, 2016 7:11 PM
  • User-153404742 posted

    thanks.  Write operations to the file or to the database?  so If 5 files are processing in parallel, each will have created a dataTable and each table will be inserted and/or update records to the same database table but this is thread safe as the DT is thread safe correct?...not understanding which write operation needs to be synchronized and what's meant by that.

    Monday, June 20, 2016 7:40 PM
  • User765422875 posted

    You did not mention write operations to the file. You mentioned reading from files and creating a datatable from the reads.

    "The ProcessFile(f) function reads the file and creates a dataTable that will be inserted/saved/updated in the database table"

    The write operation on the DataTable is not thread safe and therefore you will need to synchronize that code. It's not clear how you are updating the database after the DataTable is created.

    That being said, regarding writing to the database -  if different threads using different connections issue different commands to the same database you should be ok with this multi-threaded approach you've described.

    Monday, June 20, 2016 9:03 PM
  • User-153404742 posted

    I am only reading from file and creating a dataTable from that file....never writing to the file. 

    the dataTable object is created inside the ProcessFile(f) function when it's called....and since that variable is local, it should be thread safe as it should be stored on the stack...?  I'm creating new connection object depending on where the file data needs to be stored, as depending on the file data, it will go to different database....so I create the DataTable by reading rows from the csvFile passed in the ProcessFile(f) function (f is the csv file I will read).  Once done, the dataTable is passed to stored procedure as a parameter.  I'm not trying to read each file and creating a master dataTable...but each file I'm reading will create it's own dataTable and pass it as a parameter to stored procedure that will insert/update the data in sql table

    Monday, June 20, 2016 10:24 PM
  • User765422875 posted

    Reading from the file is thread safe, but adding rows to the DataTable within this context is not. Microsoft is very clear about this in their MSDN documentation. You must synchronize the adding of rows to the DataTable. And to be safe, any modification on the DataTable within that Parallel.Foeach needs to be synchronized.

    All you have to do is lock the operations on the DataTable.

    Does all of this make sense?

    Tuesday, June 21, 2016 2:37 PM
  • User36583972 posted

    Hi inkaln,

    I found some links in the below. They discussed a similar problem with you. You can refer them.

    Read and process files in parallel C#:

    http://stackoverflow.com/questions/20928705/read-and-process-files-in-parallel-c-sharp

    C# Parallel.foreach - Making variables thread safe:

    http://stackoverflow.com/questions/26975318/c-sharp-parallel-foreach-making-variables-thread-safe

    Parallel Loops:

    https://msdn.microsoft.com/en-us/library/ff963552.aspx

    Best Regards,

    Yohann Lu

    • Marked as answer by Anonymous Thursday, October 7, 2021 12:00 AM
    Wednesday, June 22, 2016 2:46 AM