none
Errors after Data Lake Analytics Update

    Question

  • Hi!

    A few weeks ago I have created library for Data Lake Analytics and this library works correctly for a dozens of scripts, but today I have a problem with scripts which works correctly earlier. When I try extract data from large file, where data are in row oriented format like below:

    col1|col2|col3|col4|col5|col6|col7|col8|

    col1|col2|col3|col4|col5|col6|col7|col8|

    col1|col2|col3|col4|col5|col6|col7|col8|

    col1|col2|col3|col4|col5|col6|col7|col8|

    the code throws exception, which suggests that the file is split in incorrect way.

    Extractor code:

     public override IEnumerable<IRow> Extract(IUnstructuredReader input, IUpdatableRow output)
            {
                using (StreamReader sr = new StreamReader(input.BaseStream))
                {
                    string currentLine;
    
                    if (output.Schema.Count == _columnCount)
                    {
                        while ((currentLine = sr.ReadLine()) != null)
                        {
                            String[] substrings = currentLine.Split('|');
                            if (substrings.Length < _columnCount)
                            {
                                String Message = "";
                                foreach (String s in substrings)
                                {
                                    Message += s;
                                    Message += " ";
                                }
    
                                throw new Exception(Message);
                            }
    
                            output.Set<object>(output.Schema[0].Name, substrings[0]);
                            output.Set<object>(output.Schema[1].Name, substrings[1]);
                            output.Set<object>(output.Schema[2].Name, substrings[2]);
                            output.Set<object>(output.Schema[3].Name, substrings[3]);
                            output.Set<object>(output.Schema[4].Name, substrings[4]);
                            output.Set<object>(output.Schema[5].Name, substrings[5]);
                            output.Set<object>(output.Schema[6].Name, substrings[6]);
                            output.Set<object>(output.Schema[7].Name, substrings[7]);
    
                            yield return output.AsReadOnly();
                        }
                    }
                    else
                    {
                        throw new Exception("Incorrect number of columns");
                    }
                }
            }

    What happens after DLA update? I need help immediately. The file was uploaded as row-oriented using Visual Studio.


    Sunday, August 7, 2016 1:42 PM

Answers

  • We released the bug fix that allows built-in custom extractors to work on files where the row-boundary is not aligned with the extent-boundary.

    Unfortunately, that seems to be affecting custom extractors that coded against the baseStream level and have an implicit assumption that the boundaries are aligned by using ReadLine().

    What you should do (and should have before), is to code against the right abstraction level in the UDO, splitting rows using the input.Split() method that will automatically take care of finding the end of row. E.g., the pattern should be:

    // _row_delim = _encoding.GetBytes(row_delim); 
    
        foreach (Stream current in input.Split(_row_delim)) 
                 { 
                     using (StreamReader streamReader = new StreamReader(current, _encoding)) 
                     { 
                         int num = 0; 
                         string[] array = streamReader.ReadToEnd().Split(new string[]{_col_delim}, StringSplitOptions.None); 
    

    If you need to continue to read at the baseStream level for some reason, then you should implement your own ReadLine that will use the ability of the new UDO model to peek into the first 4MB of the next extent. You will also need to handle the case of when the extents start with less data.


    Michael Rys

    Monday, August 8, 2016 7:59 PM
    Moderator
  • I see a couple of problems with your code:

    1. The stream interface only allows you to read a stream once. Since you read it with ToArray() the first time, the second time the stream has been consumed.
    2. because you are getting the whole file into the input if you set AtomicFileProcessing to true, and you materialize everything with ToArray(), you will run out of memory.
    3. Do not/never use Encoding.Default in server-side code. Since that will chose whatever the OS happens to use as its default encoding.

    Here is the code I would suggest to build on for your given input data:

    using Microsoft.Analytics.Types.Sql;
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.IO;
    
    namespace msdn
    {
        public class MSDNExtractor: IExtractor
        {
             private Encoding _encoding;
             private byte[] _row_delim;
             private string _col_delim;
    
            public MSDNExtractor(Encoding encoding= null, string row_delim = "\r\n", string col_delim="|")
            {            
                this._encoding = ((encoding == null) ? Encoding.UTF8 : encoding); 
                this._row_delim = this._encoding.GetBytes(row_delim); 
                this._col_delim = col_delim;  
            }
    
            public override IEnumerable<IRow> Extract(IUnstructuredReader input, IUpdatableRow output)
            {
                 foreach (Stream currentline in input.Split(this._row_delim)) 
                 { 
                     using (StreamReader streamReader = new StreamReader(currentline, this._encoding)) 
                     { 
                         string[] substrings = streamReader.ReadToEnd().Split(new string[]{this._col_delim}, StringSplitOptions.None); 
    
                         // let's drop the last dummy column after the last separator
                         if (substrings.Length != output.Schema.Count+1){
                             throw new Exception("Incorrect number of columns");
                         }
    
                         for (int i = 0; i < substrings.Length-1; i++) 
                         { 
                            output.Set<object>(i, substrings[i]);
    
                        }
                        yield return output.AsReadOnly();
                    }
                }
            } 
        }
    }
    

    This code will only stream through each row once, and should work with the right splitting.

    You can then use it in a script as follows (I assume the above is in a Visual Studio U-SQL code behind file):

    @data =
        EXTRACT col1 string,
                col2 string,
                col3 string,
                col4 string,
                col5 string,
                col6 string,
                col7 string,
                col8 string
        FROM "msdn.csv"
        USING new msdn.MSDNExtractor(encoding : Encoding.[ASCII]);
    
    OUTPUT @data
    TO "msdnres.csv"
    USING Outputters.Csv();


    Michael Rys

    Tuesday, August 9, 2016 8:00 PM
    Moderator

All replies

  • We released the bug fix that allows built-in custom extractors to work on files where the row-boundary is not aligned with the extent-boundary.

    Unfortunately, that seems to be affecting custom extractors that coded against the baseStream level and have an implicit assumption that the boundaries are aligned by using ReadLine().

    What you should do (and should have before), is to code against the right abstraction level in the UDO, splitting rows using the input.Split() method that will automatically take care of finding the end of row. E.g., the pattern should be:

    // _row_delim = _encoding.GetBytes(row_delim); 
    
        foreach (Stream current in input.Split(_row_delim)) 
                 { 
                     using (StreamReader streamReader = new StreamReader(current, _encoding)) 
                     { 
                         int num = 0; 
                         string[] array = streamReader.ReadToEnd().Split(new string[]{_col_delim}, StringSplitOptions.None); 
    

    If you need to continue to read at the baseStream level for some reason, then you should implement your own ReadLine that will use the ability of the new UDO model to peek into the first 4MB of the next extent. You will also need to handle the case of when the extents start with less data.


    Michael Rys

    Monday, August 8, 2016 7:59 PM
    Moderator
  • Is this possible to identify file encoding in extractor code?
    Tuesday, August 9, 2016 6:32 AM
  • I wrote something like this:

            public override IEnumerable<IRow> Extract(IUnstructuredReader input, IUpdatableRow output)
            {
                if (output.Schema.Count == _columnCount)
                {
                    Stream[] streams = input.Split(Encoding.Default.GetBytes("\n")).ToArray();
                    foreach (Stream stream in streams)
                    {
                        using(StreamReader streamReader = new StreamReader(stream, Encoding.Default))
                        {
                            String[] substrings = streamReader.ReadToEnd().Split(new string[] { "|" }, StringSplitOptions.None);
                            output.Set<object>(output.Schema[0].Name, substrings[0]);
                            output.Set<object>(output.Schema[1].Name, substrings[1]);
                            output.Set<object>(output.Schema[2].Name, substrings[2]);
                            output.Set<object>(output.Schema[3].Name, substrings[3]);
                            output.Set<object>(output.Schema[4].Name, substrings[4]);
                            output.Set<object>(output.Schema[5].Name, substrings[5]);
                            output.Set<object>(output.Schema[6].Name, substrings[6]);
                            output.Set<object>(output.Schema[7].Name, substrings[7]);
                            yield return output.AsReadOnly();
                        }
                    }
                }
                else
                {
                    throw new Exception("Incorrect number of columns");
                }
            }

    but code throws Exception "Stream was not readable". What should I do? And what should I do also for Extractors with AtomicFileProcessing set on true? For similar extractor code  throws exception: System.OutOfMemoryException.

    Tuesday, August 9, 2016 5:01 PM
  • I see a couple of problems with your code:

    1. The stream interface only allows you to read a stream once. Since you read it with ToArray() the first time, the second time the stream has been consumed.
    2. because you are getting the whole file into the input if you set AtomicFileProcessing to true, and you materialize everything with ToArray(), you will run out of memory.
    3. Do not/never use Encoding.Default in server-side code. Since that will chose whatever the OS happens to use as its default encoding.

    Here is the code I would suggest to build on for your given input data:

    using Microsoft.Analytics.Types.Sql;
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.IO;
    
    namespace msdn
    {
        public class MSDNExtractor: IExtractor
        {
             private Encoding _encoding;
             private byte[] _row_delim;
             private string _col_delim;
    
            public MSDNExtractor(Encoding encoding= null, string row_delim = "\r\n", string col_delim="|")
            {            
                this._encoding = ((encoding == null) ? Encoding.UTF8 : encoding); 
                this._row_delim = this._encoding.GetBytes(row_delim); 
                this._col_delim = col_delim;  
            }
    
            public override IEnumerable<IRow> Extract(IUnstructuredReader input, IUpdatableRow output)
            {
                 foreach (Stream currentline in input.Split(this._row_delim)) 
                 { 
                     using (StreamReader streamReader = new StreamReader(currentline, this._encoding)) 
                     { 
                         string[] substrings = streamReader.ReadToEnd().Split(new string[]{this._col_delim}, StringSplitOptions.None); 
    
                         // let's drop the last dummy column after the last separator
                         if (substrings.Length != output.Schema.Count+1){
                             throw new Exception("Incorrect number of columns");
                         }
    
                         for (int i = 0; i < substrings.Length-1; i++) 
                         { 
                            output.Set<object>(i, substrings[i]);
    
                        }
                        yield return output.AsReadOnly();
                    }
                }
            } 
        }
    }
    

    This code will only stream through each row once, and should work with the right splitting.

    You can then use it in a script as follows (I assume the above is in a Visual Studio U-SQL code behind file):

    @data =
        EXTRACT col1 string,
                col2 string,
                col3 string,
                col4 string,
                col5 string,
                col6 string,
                col7 string,
                col8 string
        FROM "msdn.csv"
        USING new msdn.MSDNExtractor(encoding : Encoding.[ASCII]);
    
    OUTPUT @data
    TO "msdnres.csv"
    USING Outputters.Csv();


    Michael Rys

    Tuesday, August 9, 2016 8:00 PM
    Moderator
  • Thank you for your help :) You resolve all my problems :)
    Thursday, August 11, 2016 6:01 PM