[Azure] Adding more intelligence to Stream Analytics queries

If you’ve read my previous blog on Azure Stream Analytics, you know how Stream Analytics can be used to process all sorts of incoming data and send the end result to one or multiple outputs. This is particularly useful for ensuring the right data is saved, manipulating the data before saving or only filtering out data in which you’re interested. And that last category is what I used it for: notifications! The query I used previously is not very dynamic, here’s a snippet:

SELECT Stream.deviceId,
     'temperature1' as ReadingType,
     Stream.temperature1 as Reading,
     CASE WHEN (Ref.Temperature1Min IS NOT null AND Stream.temperature1 < Ref.Temperature1Min) THEN Ref.Temperature1Min
     ELSE Ref.Temperature1Max END AS Threshold,
     Stream.EventEnqueuedUtcTime AS [Time]
FROM [IoTHubStream] Stream
JOIN [DeviceRulesBlob] Ref ON Stream.deviceId = Ref.DeviceId
     (Ref.Temperature1Min IS NOT null AND Stream.temperature1 < Ref.Temperature1Min) OR
     (Ref.Temperature1Max IS NOT null AND Stream.temperature1 > Ref.Temperature1Max)

Works, but what if we start adding more sensor values? Hmm, we’d need to change the query each time. Not really what we want to do, right? Time for a better solution.

A more dynamic solution

That better solution starts with a better way of sending the data to Azure. Any kind of data structure that prescibes certain fields is going to be a pain in the end if you know the data you’re going to send might change in the future. Instead, you want to choose a data structure that is dynamic to begin with. Here’s an example:

	"ObjectType": "Telemetry",
	"DeviceId": "repsaj-neptune-win10pi",
	"SensorData": [{
		"SensorName": "FlowIn",
		"Value": 1172.6177074353293
		"SensorName": "FlowOut",
		"Value": 1285.5341503329269
		"SensorName": "temperature1",
		"Value": 28.334656342787042
		"SensorName": "temperature2",
		"Value": 20.774892884434617
		"SensorName": "pH",
		"Value": 6.8432815807141747

Instead of hardcoding the structure, I’ve introduced a SensorData array which can hold as many sensors as we wish. Adding one is now very easy, just add another item to the array and you’re done.

Reference data

In the previous post I showed how to use reference data to store thresholds. We apply the same approach to the reference data, as follows:

	"deviceId": "repsaj-neptune-win10pi",
	"sensorRules": [{
		"sensorName": "pH",
		"threshold": 6.5,
		"operator": "<"
		"sensorName": "temperature2",
		"threshold": 35.0,
		"operator": ">"
		"sensorName": "temperature1",
		"threshold": 21.0,
		"operator": "<"
		"sensorName": "temperature1",
		"threshold": 27.0,
		"operator": ">"

As you can see; the reference data now uses the same approach. Instead of storing a rigid set of sensor values, I’ve introduced a dynamic array with items that define the sensorName, threshold and operator. Of course it’s important for the sensorName to match the name of the incoming data.


The last part of the puzzle is the query. So how are we going to process these structures in stream analytics. Well there’s a very nifty trick for that called CROSS APPLY. Using CROSS APPLY, we can transpose an array into fields and multiple rows. And we can do this twice, once for the stream data and once for the reference data. The query then looks like this:

WITH [SensorData] AS (
        event.deviceId AS deviceId,
        sensorElement.ArrayValue.sensorName AS sensorName, 
        sensorElement.ArrayValue.value AS sensorValue,
        thresholdElement.ArrayValue.threshold AS threshold,
        thresholdElement.ArrayValue.operator AS operator
    FROM [IoTHubStream] AS event
    JOIN [DeviceRulesBlob] ref ON event.deviceId = ref.deviceId
    CROSS APPLY GetArrayElements(event.sensorData) AS sensorElement
    CROSS APPLY GetArrayElements(ref.sensorRules) AS thresholdElement
    WHERE sensorElement.ArrayValue.sensorName = thresholdElement.ArrayValue.sensorName AND
          NOT(thresholdElement.ArrayValue.threshold IS NULL) AND
              (thresholdElement.ArrayValue.operator = '>' AND 
               sensorElement.ArrayValue.value > thresholdElement.ArrayValue.threshold) OR 
              (thresholdElement.ArrayValue.operator = '<' AND 
               sensorElement.ArrayValue.value < thresholdElement.ArrayValue.threshold) OR
              (thresholdElement.ArrayValue.operator = '=' AND 
               thresholdElement.ArrayValue.threshold = sensorElement.ArrayValue.value)

INTO [DeviceRulesMonitoring]
FROM SensorData

Note the following things:

  • We first JOIN the reference data. This ensures that each incoming record is matched to the reference record with the correct deviceId. This way we can store multiple reference records for different devices.
  • Next, the CROSS APPLY in combination with GetArrayElements is used to get the sensorData array and represent it as ‘sensorElement’. Note that this object only has two values: ArrayValue and ArrayIndex. To get to the values of the array object, use ArrayValue.
  • A second CROSS APPLY is used to transpose the sensorRules from the threshold record.
  • And then there’s a WHERE clause with a bunch of statements. Here we make sure that the threshold value isn’t NULL and that it is either smaller than, greater then or exactly equal to the measured value.

Awesome! So now if we add a new sensor to the telemetry data and a matching rule to the reference data, the query will process it without any need to change it! The output looks something like this:


, , ,

Related posts

Latest posts

Leave a Comment

Leave a Reply

Your email address will not be published. Required fields are marked *