locked
ASA Delay Investigation RRS feed

  • Question

  • Hi everyone,

    I am running a relatively complex query in ASA (compared to all the examples I found) and facing quite a huge delay of event processing: from IoT Hub event enqueued time until ASA event output there is a delay of about 11s. I'm trying to find out where this delay comes from. What I already found out:

    • Changing out of order policy from the default 5s to 0 reduces the query delay to about 6s.
    • Using PARTITION BY PartitionId when reading from IoT Hub input gives just under 1s reduction. (Even if only one IoT Hub partition is actually filled with data)
    • Adding extra streaming units doesn't have any effect (resource utilization was 10% before adding units, no backlogged input events).

    Particularly I've got the following questions:

    • Can the execution of query steps be partitioned / parallelized? (this article says it's possible, but not exactly how)
    • Why does TIMESTAMP BY customTimestamp delay execution? (the answer to this forum entry says it does) How to work around this (I need the timestamp for a JOIN ON DATEDIFF)?
    • Do reference data JOINs delay execution? (I'm reading reference data from BLOB storage)
    • Is writing to the first output in my query affected by writing to other outputs?

    Some more infos about my query and application:

    • 1 stream input (IoT Hub)
    • 1 reference data input (BLOB)
    • 4 stream outputs (BLOB, Azure Function, 2x PowerBI)
    • query has several steps
    • event frequency is about 6 events/sec from 3 different devices

    Thanks for your help!

    Monday, January 20, 2020 3:23 PM

All replies

  • Hello awo-SYS , 

    Please do share the query and we will see if you can make more sense . Below is the answers to your question .

    1.) "Changing out of order policy from the default 5s to 0 reduces the query delay to about 6s."

    1.a) When using the 5 second out of order policy there will always be a 5 second delay as the job is waiting for possible out of order events. This delay is expected.

     

    2.)Using PARTITION BY PartitionId when reading from IoT Hub input gives just under 1s reduction. (Even if only one IoT Hub partition is actually filled with data)

    2.a) Whether or not including Partition by ParitionId will improve performance depends on the customer's inputs and outputs. Inputs must be partitioned (when setting up their IoT hub or Event hub customer will select number of partitions up to 32). Outputs must also support partitioning in order to take advantage of scaling. Some do not, for example PowerBI.

     

    3.)Adding extra streaming units doesn't have any effect (resource utilization was 10% before adding units, no backlogged input events).
    3.a) We must determine cause of the delay before we start trying to increase the streaming units. If the delay is not do to high resource utilization increase the SU will not have any impact.

     

    4.)Can the execution of query steps be partitioned / parallelized? (this article says it's possible, but not exactly how)

    4.a) Yes, if the customer's inputs and outputs both support partitioning, and they include Partition By PartitionId at each query step (or switch to compat level 1.2) they can take advantage of scaling. 

     

    5.)Why does TIMESTAMP BY customTimestamp delay execution? (the answer to this forum entry says it does) How to work around this (I need the timestamp for a JOIN ON DATEDIFF)?

    5.a) I am not aware of Timestamp By causing any delays in processing.

     

    6.) Do reference data JOINs delay execution? (I'm reading reference data from BLOB storage)

    6.a) Depending on the number of joins and the volume of data ref data joins could cause some delay.



    7.) Is writing to the first output in my query affected by writing to other outputs?

    7.a) If the query is not partitioned then all query steps will be delayed based on the slowest output. For example, if PowerBI is the slowest output, all other outputs will be held back.


    Thanks Himanshu


    Friday, January 24, 2020 6:59 PM
  • Hi Himanshu,

    thank you for your extensive reply! Here's my query (sry, it's a big one...):

    WITH 
    /* separate input streams */
    ActSpeed AS(
      SELECT 
        * 
      FROM 
        iothub 
        /* read IoT Hub partitions in parallel (only for performance) */
        PARTITION BY PartitionId
        /* use event creation timestamp rather than ASA arrival timestamp 
        (important for JOIN ON DATEDIFF) */
        TIMESTAMP BY [iothub-creation-time-utc] 
      WHERE 
        TransformedMeasurementName LIKE 'fanActualSpeed%' 
    ), 
    TargSpeed AS(
      SELECT 
        * 
      FROM 
        iothub 
        /* read IoT Hub partitions in parallel (only for performance) */
        PARTITION BY PartitionId
        /* use event creation timestamp rather than ASA arrival timestamp 
        (important for JOIN ON DATEDIFF) */
        TIMESTAMP BY [iothub-creation-time-utc] 
      WHERE 
        TransformedMeasurementName LIKE 'fanTargetSpeed%' 
    ), 
    
    /* join actual value and target value streams */
    SpeedSet AS(
      SELECT 
        /* additionally, forward metadata (stream ID, timestamps)*/
        A.[iothub-creation-time-utc] AS [timestamp],
        A.ModbusModuleTimestamp AS edgetime1,
        A.ProcessingModuleTimestamp AS edgetime2,
        A.EventEnqueuedUtcTime AS iothubtime,
        A.EventProcessedUtcTime AS asatime,
        UDF.getCurrentTime('') AS asatime2,
        A.TransformedMeasurementValue AS actualspeed,
        T.TransformedMeasurementValue AS targetspeed,
        faninputvoltage = T.TransformedMeasurementValue2 / 1000,
        stream = 'speed',
        A.UnitID AS UnitID
      FROM 
        ActSpeed A 
        JOIN TargSpeed T 
          /* event rate = 1/500ms -> maximal allowed skew = 500ms/2 */
          ON DATEDIFF(millisecond, A, T) BETWEEN -250 AND 250
            /* separate event streams by unit ID */
            AND A.UnitID = T.UnitID
    ), 
    
    /* check, if data has arrived within the last 15 seconds */
    /* sub-query inspired by
      https://docs.microsoft.com/en-us/azure/stream-analytics/stream-analytics-stream-analytics-query-patterns#periodically-output-values 
    */
    SparseInputTest AS(
      SELECT 
        COUNT(*) AS eventCount, 
        TopOne() OVER (ORDER BY [timestamp] DESC) AS LastEvent
      FROM SpeedSet
      /* assuming, that within 15s, there are some events under normal conditions */
      GROUP BY HOPPINGWINDOW(second, 15, 1)
    ),
    
    /* value preprocessing for fault/critical/warning evaluation*/
    /* check, if fault condition is true */
    FaultTest AS(
      SELECT 
        S.UnitID AS UnitID,
        S.[timestamp] AS [timestamp],
        UDF.compare(S.actualspeed, S.targetspeed, R.fault) AS ComparisonResult 
      FROM 
        SpeedSet S 
        JOIN [referencedata-speed] R 
          ON S.stream = R.blockName 
    ), 
    /* determine start of condition being true */
    FaultCriteria AS(
      SELECT 
        UnitID,
        [timestamp],
        ComparisonResult,
        /* find last time, at which condition wasn't true */
        LAST([timestamp]) OVER(
          /* separate event streams by unit ID */
          PARTITION BY UnitID
          LIMIT DURATION(minute, 1)
          WHEN ComparisonResult != 1) AS start 
      FROM 
        FaultTest 
    ), 
    /* check, if critical condition is true */
    CriticalTest AS(
      SELECT 
        S.UnitID AS UnitID,
        S.[timestamp] AS [timestamp],
        UDF.compare(S.actualspeed, S.targetspeed, R.critical) AS ComparisonResult 
      FROM 
        SpeedSet S 
        JOIN [referencedata-speed] R 
          ON S.stream = R.blockName 
    ), 
    /* determine start of condition being true */
    CriticalCriteria AS(
      SELECT 
        UnitID,
        [timestamp],
        ComparisonResult,
        /* find last time, at which condition wasn't true */
        LAST([timestamp]) OVER(
          /* separate event streams by unit ID */
          PARTITION BY UnitID
          LIMIT DURATION(minute, 1)
          WHEN ComparisonResult != 1) AS start 
      FROM 
        CriticalTest 
    ), 
    /* check, if warning condition is true */
    WarningTest AS(
      SELECT 
        S.[timestamp] AS [timestamp],
        UDF.getCurrentTime('') AS asatime3,
        S.UnitID AS UnitID,
        UDF.compare(S.actualspeed, S.targetspeed, R.warning) AS ComparisonResult 
      FROM 
        SpeedSet S 
        JOIN [referencedata-speed] R 
          ON S.stream = R.blockName 
    ), 
    /* determine start of condition being true */
    WarningCriteria AS(
      SELECT 
        [timestamp],
        UnitID,
        asatime3,
        UDF.getCurrentTime('') AS asatime4,
        ComparisonResult,
        /* find last time, at which condition wasn't true */
        LAST([timestamp]) OVER(
          /* separate event streams by unit ID */
          PARTITION BY UnitID
          LIMIT DURATION(minute, 1) 
          WHEN ComparisonResult != 1) AS start
      FROM 
        WarningTest 
    ), 
    
    /* status generation from preprocessed values and thresholds */
    Result AS(
      SELECT 
        *,
        /* determine durations of conditions being true */
        faultDuration = CASE 
          WHEN F.ComparisonResult = 1 
          THEN DATEDIFF(millisecond, F.start, F.[timestamp]) 
          ELSE 0 
        END,
        criticalDuration = CASE 
          WHEN C.ComparisonResult = 1 
          THEN DATEDIFF(millisecond, C.start, C.[timestamp]) 
          ELSE 0 
        END,
        warningDuration = CASE 
          WHEN W.ComparisonResult = 1 
          THEN DATEDIFF(millisecond, W.start, W.[timestamp]) 
          ELSE 0 
        END,
        state = CASE 
          /* check, which status condition is true and holds for long enough time */
          /* if input stream is sparse, set status to unknown */
          WHEN S.eventCount < 7 OR R.noReport = 1 THEN 5 /*'unknown'*/
          WHEN F.ComparisonResult = 1 AND 
            DATEDIFF(millisecond, F.start, F.[timestamp]) >= R.fault.[duration_s]*1000
            THEN 4 /*'fault'*/
          WHEN C.ComparisonResult = 1 AND 
            DATEDIFF(millisecond, C.start, C.[timestamp]) >= R.critical.[duration_s]*1000
            THEN 3 /*'critical'*/
          WHEN W.ComparisonResult = 1 AND 
            DATEDIFF(millisecond, W.start, W.[timestamp]) >= R.warning.[duration_s]*1000
            THEN 2 /*'warning'*/
          ELSE 1 /*'good'*/
        END,
        /* additionally, forward metadata (stream ID, timestamps)*/
        S.LastEvent.[timestamp] AS [timestamp],
        S.LastEvent.edgetime1 AS edgetime1,
        S.LastEvent.edgetime2 AS edgetime2,
        S.LastEvent.iothubtime AS iothubtime,
        S.LastEvent.asatime AS asatime,
        S.LastEvent.asatime2 AS asatime2,
        W.asatime3 AS asatime3,
        W.asatime4 AS asatime4,
        UDF.getCurrentTime('') AS asatime5,
        S.LastEvent.stream AS stream,
        S.LastEvent.UnitID AS UnitID
      /* source queries: status condition queries... */
      FROM 
        SparseInputTest S
      JOIN WarningCriteria AS W 
        ON DATEDIFF(millisecond, S, W) BETWEEN -250 AND 250 
          /* separate event streams by unit ID */
          AND W.UnitID = S.LastEvent.UnitID
      JOIN CriticalCriteria AS C 
        ON DATEDIFF(millisecond, S, C) BETWEEN -250 AND 250 
          /* separate event streams by unit ID */
          AND C.UnitID = S.LastEvent.UnitID
      JOIN FaultCriteria AS F 
        ON DATEDIFF(millisecond, S, F) BETWEEN -250 AND 250 
          /* separate event streams by unit ID */
          AND F.UnitID = S.LastEvent.UnitID
      /* ...joined with reference data */
      JOIN [referencedata-speed] R 
        ON S.LastEvent.stream = R.blockName 
    ) 
    
    /* output results to db */
    SELECT *, UDF.getCurrentTime('') AS asatime6
    INTO [blobstorage-results] 
    FROM Result 
    
    /* output data on state change to status function */
    SELECT 
      UnitID,
      stream,
      state AS status,
      LAG(state, 1) OVER(
        /* separate event streams by unit ID */
        PARTITION BY UnitID
        LIMIT DURATION(second, 10)) AS previousStatus,
      [timestamp],
      edgetime1,
      edgetime2,
      iothubtime,
      asatime,
      asatime2,
      asatime3,
      asatime4,
      asatime5,
      UDF.getCurrentTime('') AS asatime7
    INTO [function-status] 
    FROM Result 
    WHERE 
      /* send event only on state change */
      LAG(state, 1) OVER(
        /* separate event streams by unit ID */
        PARTITION BY UnitID
        /* last state may be 15s old (see SparseInputTest) 
        -> duration here must be bigger */
        LIMIT DURATION(second, 20)) <> state AND 
      /* prevent state change from fault/unknown back to lower state */
      (state > LAG(state, 1) OVER(PARTITION BY UnitID LIMIT DURATION(second, 20)) OR
        (LAG(state, 1) OVER(PARTITION BY UnitID LIMIT DURATION(second, 20))) <= 3)
    
    /* output results to PowerBI */
    SELECT 
      UnitID,
      state,
      W.ComparisonResult AS warning,
      C.ComparisonResult AS critical,
      F.ComparisonResult AS fault,
      warningDuration,
      criticalDuration,
      faultDuration,
      S.eventCount AS eventCount,
      [timestamp] 
    INTO [powerbi-results] 
    FROM Result 
    
    /* forward raw inputs to PowerBI */
    SELECT 
      UnitID,
      [timestamp],
      actualspeed,
      targetspeed,
      faninputvoltage 
    INTO [powerbi-raw] 
    FROM SpeedSet 

    Especially about "7.) Is writing to the first output in my query affected by writing to other outputs?", I'm wondering, how to parallelize writing to outputs by partitioning... Can I write to multiple outputs in parallel per query execution?

    Do you have any other ideas concerning my special query?

    Thanks! Axel

    Monday, February 3, 2020 3:36 PM
  • Btw.: As you can see, I'm taking several timestamps inside the query. Here's an example timestamp sequence:

    "asatime": "2020-02-03T15:23:21.9321421Z",
        "asatime2": "2020-02-03T15:23:27.349Z",
        "asatime3": "2020-02-03T15:23:27.352Z",
        "asatime4": "2020-02-03T15:23:27.353Z",
        "asatime5": "2020-02-03T15:23:27.358Z",
        "asatime7": "2020-02-03T15:23:27.359Z"

    As you can see, the biggest delay doesn't occur during execution, but between EventProcessedTimeUtc and the first timestamp inside the query (>5s). How can that be, if I changed all event ordering policies to 0?

    Thanks again!


    • Edited by awo-SYS Monday, February 3, 2020 3:41 PM
    Monday, February 3, 2020 3:40 PM
  • Any updates on this?
    Monday, February 17, 2020 2:07 PM
  • Hello Awo-sys ,

    For a deeper investigation and immediate assistance on this issue, if you have a support plan you may file a support ticket, else could you please send an email to azcommunity@microsoft.com with the below details, so that we can create a one-time-free support ticket for you to work closely on this matter. 

    Thread URL:https://social.msdn.microsoft.com/Forums/en-US/f7225022-af12-4557-a1f8-8255241e22ff/asa-delay-investigation?forum=AzureStreamAnalytics



    Subscription ID: 
    Subject : Attn HImanshu
    Please let me know once you have done the same

    Tuesday, February 18, 2020 3:02 PM