Adding additional column in data being saved in CosmosDB by Azure data factory;s copy activity


  • I am using azure data factory's copy activity to copy data from a csv file in blob to CosmosDB(with SQL API). In the Sink's linked service if I do not import any schema , my copy activity on execution reads headers from CSV and then saves the data in json form in cosmosDB. Till here it works fine.

    I need to add a batch id column in the data being added in cosmosDB (batch id as GUID / pipelinerunID) so that I can track which all data in a set was copied as batch.

    How can I keep all my source columns and add my batch id column in it and save it in my cosmos DB.

    The schema is not fixed and can change on each adf pipeline trigger so cannot do import schema and do one o one column mapping in copy activity.

    Tuesday, June 26, 2018 6:30 PM

All replies

  • I have dynamic datasets coming in from a source and am copying into a Data Lake using ADF and I am storing my select statements in an Azure SQL DB to control what is being extracted.  Then, like you, I have dynamic datasets that don't have a mapped schema.  You can do the same, or something similar, and create a Dynamic select statement in your copy activity.  So something like SELECT @{item().sourceTableCustomColumnList}, @pipeline().RunId FROM @{item().sourceTableName} 

    You can get much more complex than that if needed.  You could do SELECT *, @pipeline().RunId FROM yourtable.  

    Edit: I made the assumption you were using version 2...if you are using version 1 than this does not apply.

    Tuesday, June 26, 2018 6:42 PM
  • I am using ADF V2. regarding the Select *, @pipeline().RunId ...    Does it work if my copy activity is reading data from csv file instead of SQL table.  if yes, what could be the select statement be, for csv file. 
    Wednesday, June 27, 2018 10:47 AM
  • I see the dilemma you are referring to if your source object is a csv file.  I don't think there's a way to do it natively in the copy task (I tried fooling the task by using dynamic content on the schema but it seems to get ignored) but I think you still have a few different options. 

    1. You could create a custom activity and write code to do whatever you need.
    2. You could stage the csv file to an area in a data lake, then execute a U-SQL script to read the data from the file and append the new column with the pipeline rundId.  Then output it to a new area in the data lake to be picked up by the rest of your pipeline.  To do this, you simply pass a Parameter to U-SQL from ADF - you can google how.
    3. Punt!

    I would go with option 2. Especially if you're already leveraging Azure Data Lake Store and Analytics.

    • Proposed as answer by FrankMn Thursday, June 28, 2018 6:42 PM
    Wednesday, June 27, 2018 7:33 PM