none
How to read compressed file using custom extractor?

    Question

  • Hi! Is this possible to read compressed file (gzip) using custom extractor? Could you advise me how should i write a code of custom gzip extractor (maybe example)?  Build in extractors supposedly can read compressed data, but how file is splitting between nodes? 

    Is this a good solution?

            public override IEnumerable<IRow> Extract(IUnstructuredReader input, IUpdatableRow output)
            {
                using (var decompress = new GZipStream(input.BaseStream, CompressionMode.Decompress))
                using (StreamReader sr = new StreamReader(decompress))
                {
                    do
                    {
                       //read line by line from stream
    
                    } while (line != null);
                }
            }

    Friday, April 22, 2016 3:13 PM

Answers

  • The underlining framework senses the naming convention of gzips ".gz" and decompresses it before the extract call.

    NOTE: When running locally the runtime did not decompress it.  Only the cloud runtime did.

    I suggested to MS to add a configuration switch to override the convention.  Because if you write a gzip file the runtime doesn't compress it , so why should it decompress.

    Here is my code, notice the gzip line is commented out.

    using Microsoft.Analytics.Interfaces;
    using Microsoft.Analytics.Types.Sql;
    using System;
    using System.Collections.Generic;
    using System.IO.Compression;
    using System.Linq;
    using System.Text;
    
    namespace usqldm
    {
        [SqlUserDefinedExtractor(AtomicFileProcessing = true)]
        public class GZipExtractor : IExtractor
        {
    
    
            public GZipExtractor()
            {
    
            }
            public override IEnumerable<IRow> Extract(IUnstructuredReader input, IUpdatableRow output)
            {
    
               // using (var archive = new GZipStream(input.BaseStream, CompressionMode.Decompress))
                {
    
    
                    using (var sr = new System.IO.StreamReader(input.BaseStream))
                    {
                        Dictionary<string, int> headers = new Dictionary<string, int>();
                        string[] tmp = null;
                        if (!sr.EndOfStream)
                        {
                            tmp = sr.ReadLine().Split('\t');
                            for (int i = 0; i < tmp.Length; i++)
                            {
                                headers.Add(tmp[i], i);
                            }
                        }
    
    
                        while (!sr.EndOfStream)
                        {
                            var line = sr.ReadLine();
                            var cols = line.Split('\t');
    
                            foreach (var c in output.Schema)
                            {
    
                                int colidx;
                                if (headers.TryGetValue(c.Name, out colidx))
                                {
                                    if (colidx < cols.Length)
                                    {
                                        output.Set<object>(c.Name, cols[colidx]);
                                    }
                                }
    
                            }
    
                            yield return output.AsReadOnly();
                        }
    
    
                    }
    
    
                }
            }
        }
    
    }
    
    
    


    -Brian-

    Friday, April 22, 2016 4:15 PM

All replies

  • The underlining framework senses the naming convention of gzips ".gz" and decompresses it before the extract call.

    NOTE: When running locally the runtime did not decompress it.  Only the cloud runtime did.

    I suggested to MS to add a configuration switch to override the convention.  Because if you write a gzip file the runtime doesn't compress it , so why should it decompress.

    Here is my code, notice the gzip line is commented out.

    using Microsoft.Analytics.Interfaces;
    using Microsoft.Analytics.Types.Sql;
    using System;
    using System.Collections.Generic;
    using System.IO.Compression;
    using System.Linq;
    using System.Text;
    
    namespace usqldm
    {
        [SqlUserDefinedExtractor(AtomicFileProcessing = true)]
        public class GZipExtractor : IExtractor
        {
    
    
            public GZipExtractor()
            {
    
            }
            public override IEnumerable<IRow> Extract(IUnstructuredReader input, IUpdatableRow output)
            {
    
               // using (var archive = new GZipStream(input.BaseStream, CompressionMode.Decompress))
                {
    
    
                    using (var sr = new System.IO.StreamReader(input.BaseStream))
                    {
                        Dictionary<string, int> headers = new Dictionary<string, int>();
                        string[] tmp = null;
                        if (!sr.EndOfStream)
                        {
                            tmp = sr.ReadLine().Split('\t');
                            for (int i = 0; i < tmp.Length; i++)
                            {
                                headers.Add(tmp[i], i);
                            }
                        }
    
    
                        while (!sr.EndOfStream)
                        {
                            var line = sr.ReadLine();
                            var cols = line.Split('\t');
    
                            foreach (var c in output.Schema)
                            {
    
                                int colidx;
                                if (headers.TryGetValue(c.Name, out colidx))
                                {
                                    if (colidx < cols.Length)
                                    {
                                        output.Set<object>(c.Name, cols[colidx]);
                                    }
                                }
    
                            }
    
                            yield return output.AsReadOnly();
                        }
    
    
                    }
    
    
                }
            }
        }
    
    }
    
    
    


    -Brian-

    Friday, April 22, 2016 4:15 PM
  • Thank you :) So if i want write compressed file I should use code like below?

            public override void Output(IRow input, IUnstructuredWriter output)
            {
    
                using (GZipStream gz = new GZipStream(output.BaseStream, CompressionMode.Compress))
                using(StreamWriter sw = new StreamWriter(gz))
                {
                        //save row to file
                }
    
            }



    Friday, April 22, 2016 4:57 PM
  • First we will be adding gzip compression when writing to a .gz file in one of the upcoming releases.

    Until then you would need to compress your files in the way you outline, but you would need to run it with atomicFileProcessing turned on. Otherwise you will only compress sections of your file and probably produce a file that you cannot decompress anymore.


    Michael Rys

    Friday, April 22, 2016 5:34 PM
    Moderator
  • If the output is small your code is fine.

    The code only runs on two CPUs.  So it can take a very long time to compress the stream.

    I ran for 10 hours and then it failed.  I was compressing about 100GB.  If I run that with 7zip with 32 CPUs set to 'fastest' it takes about 40 mins.

    Plus if your parallel level is above one, you will waste all the clusters other nodes while you are compressing.

    Next time I'm planning on trying powershell ADLS cmdlets to combine data with my header files.  Then stream content down to a VM in the same data center and compress it. 

    I'll be waiting for the next release when this is built in.

    BTW:  I have looked at several zip libs and none of them can decompress on a forward only stream.  All of them try and jump to the end and or copy the entry in to memory.

    In theory the local headers should be able to provide the details needed to unzip the content.  If I have some free time I was thinking about rewriting DoNetZip to not use the centralized directory at the end of the file/stream. 


    -Brian-

    Monday, April 25, 2016 9:12 PM
  • Hi Brian, if you write a better decompress that is more streaming, please share the link here!

    Michael Rys

    Tuesday, April 26, 2016 12:00 AM
    Moderator