none
Azure Data Factory pipeline into compressed Parquet file: “java.lang.OutOfMemoryError:Java heap space” RRS feed

  • Question

  • I have met the following problem. I have a pipeline that reads data from an MS SQL Server and stores them into a file in a BLOB container in Azure Storage. The file has Parquet (or Apache Parquet, as it is also called) format. 
    So, when the “sink” (output) file is stored in a compressed way (snappy, or gzip – does not matter) AND the file is large enough (more than 50 Mb), the pipeline failed. The message was the following:

    "errorCode": "2200",
        "message": "Failure happened on 'Sink' side. ErrorCode=UserErrorJavaInvocationException,'Type=Microsoft.DataTransfer.Common.Shared.HybridDeliveryException,Message=An error occurred when invoking java, message: java.lang.OutOfMemoryError:Java heap space\ntotal entry:11\r\njava.util.ArrayDeque.doubleCapacity(Unknown Source)\r\njava.util.ArrayDeque.addFirst(Unknown Source)\r\njava.util.ArrayDeque.push(Unknown Source)\r\norg.apache.parquet.io.ValidatingRecordConsumer.endField(ValidatingRecordConsumer.java:108)\r\norg.apache.parquet.example.data.GroupWriter.writeGroup(GroupWriter.java:58)\r\norg.apache.parquet.example.data.GroupWriter.write(GroupWriter.java:37)\r\norg.apache.parquet.hadoop.example.GroupWriteSupport.write(GroupWriteSupport.java:87)\r\norg.apache.parquet.hadoop.example.GroupWriteSupport.write(GroupWriteSupport.java:37)\r\norg.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:123)\r\norg.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:292)\r\ncom.microsoft.datatransfer.bridge.parquet.ParquetBatchWriter.addRows(ParquetBatchWriter.java:60)\r\n,Source=Microsoft.DataTransfer.Common,''Type=Microsoft.DataTransfer.Richfile.JniExt.JavaBridgeException,Message=,Source=Microsoft.DataTransfer.Richfile.HiveOrcBridge,'",
        "failureType": "UserError",
        "target": "Work_Work"
    }

    The "Work_Work" is name of a Copy Data activity in the pipeline.
    If I turn the compression off (the generated BLOB file is uncompressed), the error does not happen.
    Is this the error described in https://docs.microsoft.com/en-us/azure/data-factory/format-parquet: the “...If you copy data to/from Parquet format using Self-hosted Integration Runtime and hit error saying "An error occurred when invoking java, message: java.lang.OutOfMemoryError:Java heap space", you can add an environment variable _JAVA_OPTIONS in the machine that hosts the Self-hosted IR to adjust the min/max heap size for JVM to empower such copy, then rerun the pipeline...”?

    If it is, have I understood correctly that I have to do the following:
    To go to a server where the “Self-hosted Integration Runtime” (still have no idea what it is) and increase the max heap size for JVM. Is this correct?
    If it is, my next question is: how large the max heap size should be? My pipeline can generate a file  whose size will be 30 GB.  
    What “max heap size” can guarantee that such a file will not cause the fail?

    Monday, September 16, 2019 7:54 PM

All replies

  • Hello VicSotnikov and thank you for your inquiry.

    A self-hosted integration runtime is a piece of software you install on a local-machine / on-premisies / on an owned VM that allows Data Factory to access non-Azure resources, i.e. a file on your computer, or a database at your place of work. 

    Since you were unaware of any Self-Hosted Integration Runtime (SHIR), first check whether one is being used.  Go to the two Linked Services involved in your copy activity.  See picture below (UI).

    Please let me know what is selected in the highlighted area.  If it is anything other than 'AutoResolveIntegrationRuntime' (the default), then this is something you have control over.

    Monday, September 16, 2019 11:09 PM
    Moderator
  • Hi,

    Please let me know what is selected in the highlighted area” – OK, here is what I have:
    The Copy Data activity in the pipeline has a source dataset and a sink dataset. The sink dataset is a BLOB Parquet file in an Azure Blob Storage. Its Linked Service has the default value Connect via integration runtime: 'AutoResolveIntegrationRuntime'.
    The source dataset has type SQL Server. Its Linked Service has Connect via integration runtime: 'NameRelatedToMyCompany'. Sorry – cannot say you what is the real 'NameRelatedToMyCompany' because of a non disclosure agreement. It is name of my company plus the word ‘Gateway’. So – this is “something I have (may, at least) control over”, judging by the name; agree?

    Then I have the following questions:
    1.
    Are you sure that the above error related to this “source” Linked Service (SQL Server)? I mean that the error message above contained string "Failure happened on 'Sink' side. This word ‘sink’ confuses me. As I said, the ‘sink’ dataset is a BLOB Parquet file with default 'AutoResolveIntegrationRuntime'.

    2. What are my next actions? I do not have direct access to that 'NameRelatedToMyCompany' object (is it a separate server?); only the admins of my company do. What should I ask the admins to test/check out/change on that 'NameRelatedToMyCompany' to fix my problem?

    Thanks in advance.

    Tuesday, September 17, 2019 10:45 AM
  • Thank you for the detailed information.

    There is a known issue when parquet is used as sink AND the SHIR version < 3.20.7159.1  The solution is to upgrade the SHIR to the latest version.  Upgrade/update settings can be configured in the UI.  Go to 'Connections' > 'Integration Runtimes' > edit your integration runtime > 'Auto Update'.  You can find the version number by clicking monitor instead of edit on your integration runtime.

    If this does not apply/help, I will provide you with further steps.

    On point #1.

    This would not be the first time that the error message is misleading, if not outright wrong.  (With so many interacting services, things get complicated very quickly, and just passing exceptions becomes non-trivial.)

    On point #2.

    You could implement partitioning.  This would break up your monolithic file, allowing for a number of benefits, including:

    • Ability to leverage parallelism when copying
    • Faster searching when using a distributed system such as HDInsight (includes Spark, Hadoop, and more) or data warehouse query.

    Here is what I suspect is happening:

        The integration runtime on the source side, your on-premises SQL, is trying to place the entirety of your data in a buffer prior to encoding it as a parquet before sending it to the blob storage.  30 GB is a significant size for some machines to hold in memory.

    I am escalating internally to verify my understanding.  More suggestions will follow.  In case the product group takes an interest, could you please provide me with the pipeline run ID?  If you are not comfortable posting the run ID in this forum, I can provide an email address if that works better for you.

    You mentioned this happens when sink is compressed AND filesize > 50MB.  Does this happen if you do not compress?  By compress, do you mean that you compress the file after it is changed into a parquet, or do you consider the parquet a form of compression?

    If you do not want to partition your data, there are other options.

    Tuesday, September 17, 2019 8:01 PM
    Moderator
  • Hi,
    Unfortunately, right now I cannot go to the computer where SHIR is installed and check out its version. Last 10 days the problem does not happen anymore (on the same data sizes; and with even significantly more datasizes, up to 17 Gb in a not-compressed file and 4 Gb in the same file compressed as Snappy). So, my management has decided to put it away for now, hoping that it would not happen anymore.
    However, as it may happen again, I will use the opportunity and as you as much info I can – maybe I will have to go back to the problem in future.

    So:
    1. You asked for “pipeline run ID”. Sorry, do not have one. I did not save it on September 6th, when the problem happened. Last 3 days all the runs are OK. As all this is done in Debug mode (the Pipeline is not still Published; so I just click Debug button in the pipeline to run it), it seems it is not possible to find the pipeline run ID in some “history list”. Or I am wrong and it is possible to look into the history of the pipeline runs made as Debug? If so, please tell me.

    2. You asked “You mentioned this happens when sink is compressed AND filesize > 50MB.  Does this happen if you do not compress?  By compress, do you mean that you compress the file after it is changed into a parquet, or do you consider the parquet a form of compression?

    I meant the following: I have a Copy Data activity whose “sink dataset” is a Parquet file in BLOB. On September 6th I ran the pipeline when the dataset had Compression type:none on its Connection page. It worked Ok and generated a Parquet about 50MB. Then I changed the Compression type from none to snappy; and ran the pipeline again. The pipeline failed with the “java.lang.OutOfMemoryError:Java heap space” error.

    3. You told “you could implement partitioning”. Yes, I got the idea; but now I do not see – how to implement it. Here is what I see in my “sink dataset”:


    Where can I specify "to partition the Parquet file"?

     
    Wednesday, September 18, 2019 4:48 PM
  • Since you indicated that the same job that was failing, is now succeeding, even with higher loads, this suggests that the cause could be related to multiple jobs running at the same time.  There is a setting that lets you specify the maximum number of concurrent jobs on the node.  You do not need direct access to the machine to set this.  Below is a screenshot from my Data Factory UI.

    I believe reducing the number of concurrent jobs will be enough to safeguard your jobs.

    If you are still interested in partitioning, that will be a little more complicated with your current setup.

    One approach would be to take advantage of your source being SQL.  Identify a column with a small quantity of unique values you wouldn't mind iterating through, or, choose a condition you don't mind filtering on (i.e. Date > 2019-01-01 vs Date <= 2019-01-01).  Parameterize a SQL query in your source to use your chosen filter, and place the copy activity inside a ForEach activity.

    Another approach would be to use Mapping Data Flow (this has turnkey partitioning out of the box).  Unfortunately, Mapping Data Flow is unable to accept data directly from on-premises.  The work-around there would be to temporarily load the data into blob storage as flat file where Mapping Data Flow is able to work on it.  This however would defeat the purpose, since you are tying to move to blob anyway.

    Wednesday, September 18, 2019 11:38 PM
    Moderator
  • VicSotnikov, please let me know if you need any more help.
    Friday, September 20, 2019 11:34 PM
    Moderator
  • Since I have not heard back, I will assume my suggestion helped you find a solution.
    Tuesday, September 24, 2019 6:01 PM
    Moderator