none
Incremental loads using Watermarks in ADF v2

    Question

  • I noticed that the lookup task now allows connections other than Azure (it didn't when I developed my existing v2 pipelines).  I am going back and updating my pipelines to start using watermarks but want to make sure I'm doing so in a way that makes sense.

    I am going to store the Low and High watermarks for each source table in an Azure SQL DB.  In my pipeline I am going to lookup the previous High watermark directly before my copy activity.  Then in the copy activity I am going to have a Where clause between Old High water mark, and my Scheduled Trigger's windowStart time.  So I am going to grab data between the previous High WaterMark and Today's loads start time.  Then I'll store these two values in the watermark table for the next day's run. 

    I think this makes sense logically.  The only thing I don't like is it doesn't easily allow me to manually run the pipeline for a specific window of time without manually updating the watermark values in the Azure DB.  I was hoping of an easy way to set a windowStart and windowEnd parameter using two methods depending on how the pipeline is executed.  Using the trigger if executed by the trigger, or using the parameter prompts if executed by debug or trigger now.  But I haven't found a way to dynamically set the pipeline parameters within an activity task and using the if activity adds too complex of logic. 

    Has anyone found a good way to do incrementals that works well for scheduled runs and manual runs?

    Tuesday, June 12, 2018 2:11 PM

All replies

  • Hi FrankMn,

    If I got your question right, your key point is: how to dynamically pass value to your pipeline parameter and after copy finishes, automatically update watermark in Azure SQL DB using the value of the pipeline parameter, both in trigger and debug mode. Here the pipeline parameter is most likely a windowsStart or/and windowsEnd time, right?

    If it is, here I give an option.

    First, you can easily new a para in pipeline and then use '@pipeline().parameters.windowsStart' as the input of your storeProcedure to update watermark after every copy run.


    Second, in debug mode, pass your specific value to pipeline para 'windowsStart'.

    Last, in trigger mode, let's take tumbling trigger as example. Pass '@trigger().outputs.windowStartTime' (or '@trigger().outputs.windowStartTime' based on your requirement) to your pipeline para (windowsStart).

    In this way, watermark will be automatically updated as the parameter input value. Let me know if you hit other question.

    Doc for your reference: https://docs.microsoft.com/en-us/azure/data-factory/tutorial-incremental-copy-portal

    https://docs.microsoft.com/en-us/azure/data-factory/how-to-create-tumbling-window-trigger#tumbling-window-trigger-type-properties



    • Edited by Wang Zhang Wednesday, June 13, 2018 9:04 AM
    Wednesday, June 13, 2018 9:00 AM
  • Thanks for the response, Whang.  I am already doing exactly as you have described to capture the watermark and that works perfectly.  The part that is missing is the Copy activity and how it uses the watermark to define the filter scope of the query.  

    For instance, my copy activity has a filter of where WaterMarkColumn >= '@{formatDateTime(activity('LookupOldWaterMarkActivity').output.firstRow.HighWatermarkValue, 'yyyy-MM-dd HH:mm' )}' AND WaterMarkColumn < '@{formatDateTime(pipeline().parameters.windowEnd, 'yyyy-MM-dd HH:mm' )}'

    Because it is querying data between the last High Watermark and today's trigger start datetime stamp, it does not take into consideration the Pipeline Parameters.  So if I manually set the pipeline parameters, they will get captured by the stored procedure but will be ignored by the copy activity.  So the question might be how can I set the pipeline parameter "windowStart" and "windowEnd" by either the scheduled Trigger and/or a manual value?  If I could set the pipeline parameter windowStart/windowEnd from the Lookup activity task that would be a solution, but I don't think there's a way to do that yet.


    • Edited by FrankMn Wednesday, June 13, 2018 2:40 PM
    • Proposed as answer by Wang Zhang Wednesday, June 13, 2018 3:23 PM
    • Unproposed as answer by Wang Zhang Wednesday, June 13, 2018 3:31 PM
    Wednesday, June 13, 2018 2:20 PM
  • Hi FrankMn,

    if you want to query data between the last High Watermark and today's trigger start datetime stamp, jus use filter like

    where WaterMarkColumn >= '@{formatDateTime(activity('LookupOldWaterMarkActivity').output.firstRow.HighWatermarkValue, 'yyyy-MM-dd HH:mm' )}' AND WaterMarkColumn < '@{formatDateTime(pipeline().parameters.windowStart, 'yyyy-MM-dd HH:mm' )}'

    This filter means in each new window, we fetch newly updated data between last HighWatermarkValue (the last window start time) and this window start time. I think it indeed takes pipeline parameter into consideration.

    Is this your requirement?


    • Edited by Wang Zhang Wednesday, June 13, 2018 3:41 PM
    Wednesday, June 13, 2018 3:40 PM
  • Unfortunately no.  The High value isn't the issue it's the Low value (which is the previous day's High value).  I can't manually set that because it's already being set by the Lookup activity not a pipeline parameter.  I need a way to use the lookup activity OR the pipeline parameter.  

    So low value is set by 1 way only.  1) Lookup activity - get previous High watermark Value.

    High value is set by the pipeline parameter windowStart.  windowStart can be set 2 possible ways. 1) Scheduled Trigger or Tumble Trigger value. 2) Manual value via Debug/Trigger Now.

    For the low value, there is a disconnect between pipeline parameters and lookup tasks and pipeline parameters.  I can't set a pipeline parameter using the lookup task.  The high value works because I can set the pipeline parameter using the trigger.

    Wednesday, June 13, 2018 3:49 PM
  • Got it. Let's make it clearer.

    After copy task finishes in each trigger window, you want to set Low value by the previous High value, and set High value with the window start time. 

    High value is ok. For low value, why don't set itby the stored high value just in your store procedure? We don't have to use pipeline parameter to update low value. 

    Thursday, June 14, 2018 2:00 AM
  • I am already setting the low and high values to an Azure SQL DB using a stored procedure.  I think you're still missing the issue.  The problem is not capturing the values for auditing purposes.  The problem is using the values in the query where clause.  The Low Watermark can not be overridden if it is being set by a Lookup task.  Because you can't set a parameter using a Lookup task.

    So if we set LowWaterMarkValue by using a Lookup task and then use that value (activity('LookupOldWaterMarkActivity').output.firstRow.HighWatermarkValue) in the copy activity we can't override that value when running in debug or executing via trigger because we're using the Lookup task property directly and not a pipeline parameter.

    I believe the issue is that we can't set a pipeline parameter by using a Lookup Task.  
    • Edited by FrankMn Thursday, June 14, 2018 2:09 PM
    Thursday, June 14, 2018 2:08 PM
  • Hi FrankMn,

    Lookup activity in ADF v2 is aiming at reading and returning the content what you concern. So I'm afraid it doesn't support 'set a pipeline parameter' like you mentioned.

    If your question still exists, would you please kindly provide the query in copy activity, which will help me understand 'How Low Watermark is used in your query'. In my current understanding, you wanna copy data between high watermark and pipeline parameter (windowStart), and then update high watermark and low watermark in store procedure.

    Friday, June 15, 2018 3:54 AM
  • I think my workaround is the only available option due to the fact, like you confirmed, we can't set or read a parameter in a loopup task yet.  Hopefully, this changes in the future.

    Here is my Copy query statement to show what I am doing.

    select * from @{item().sourceTableName} where @{item().sourceTableWaterMarkColumn} >= '@{formatDateTime(activity('LookupOldWaterMarkActivity').output.firstRow.HighWatermarkValue, 'yyyy-MM-dd HH:mm' )}' AND @{item().sourceTableWaterMarkColumn} < '@{formatDateTime(pipeline().parameters.windowEnd, 'yyyy-MM-dd HH:mm' )}'

    Friday, June 15, 2018 2:46 PM
  • Well noted.

    Please use the workaround we talked before. Our dev team will continue investigating the improvement, let's stay tuned.

    Saturday, June 16, 2018 1:29 AM