[IoT] Aquarium monitor; Azure Stream Analytics
Things are progressing well in my quest to create an Azure connected aquarium monitor! This is post #4 in my blog series about the subject. In the previous post we ended up sending messages with measurements to an Azure IoT hub. In this post we’ll take a look at what happens next, since we’re not actually doing anything with the data now.
As stated before, you can compare the IoT hub with an event hub in Azure, which in turn is like a service bus. Actually, the IoT hub is more like the input side of a service bus. One of the differences is that the IoT hub also acts like a buffer, retaining all the sent messages for a certain amount of time (one day by default). This can help to be more flexible in the back-end, which might not be able to keep up with a huge number of messages. Needless to say we’ll probably won’t hit these kinds of restraints for this project.
Also, a service bus is about hooking up inputs and output in order for sent messages to also be received somewhere. The IoT hub doesn’t do that, it’s only the place where messages can be sent to and read from, there is no rule engine or anything like that, at least not inside of the IoT hub… With the messages being sent to the IoT hub, we now have several things we want to do with them. To start with, we probably want to save them somewhere so we can build a data store. We also want to have some alarms set-up which alarm us whenever certain values go out of bounds. In this case, the best example would be the temperature suddenly dropping (or spiking).
Introducing Azure Stream Analytics
A dispatcher might not be the best comparison for Azure Stream Analytics (ASA), but it does help in explaining how this works. ASA on one hand will read messages and then on the other hand will send certain messages to certain outputs (or: sinks). To use ASA, we need to set-up ASA jobs in the Azure portal. Each job consists of three components:
The concept is very simple. A stream analytics job is connected to one or more inputs. These are configurable, options include an event hub, blob storage or an IoT hub (which we will use).
On the other side, we have outputs. Again, this can be a single one or multiple ones. The outputs will be used to pass data to and include options like Document DB, blob storage, SQL database, service bus, Power BI and more.
Connecting the inputs and outputs together is one or more queries. In a query editor window, we can specify a SQL-based query to select data from the input and send it to one of the outputs.
Okay, cool. So let’s see how we can leverage ASA to write the data we’re sending to the cloud to disk. And by disk, in this case we mean blob storage.
Creating the required components
First we need to make sure we have everything we need in place. For now, we’ll be using blob storage which requires an Azure Storage Account. Setting up a storage account is simple, you give it a name, subscription, resource group and location and you’re good to go. This will give you an empty storage account.
Once created, we need to add a container. The container will be the place where the blobs are stored. To create one, open your storage account and click the “containers” tile. In the pane that appears, click “+ Container” to create one, the only thing you need to specify is the name. Leave it private cause we’ll be using it only inside of this subscription.
Next, open up the instance of the IoT hub you created earlier. Click “all settings” to open up the settings pane and then click “Messaging”. In the pane that shows, you can create different consumer groups which basically are a way to group messages of a specific type. We’ll be using this to set-up the ASA input later on. For this example you’ll only need one but you might want to plan the different messages types you will be using. I’ve got it set-up as follows:
Ok so now we have storage set-up and ready to go, we’ll need to create the ASA job. Head over to “Stream Analytics Jobs” and click “+ Add” to create a new one.
Again pretty straightforward; just enter a job name, choose a subscription, resource group and location and you’re good to go. Once the job has been created, it will be empty.
Creating the ASA job input
When you open up the job, you’ll see the following:
Let’s start with creating a new input, click the “Inputs” tile and click “Add”. The next input screen is a little bit more difficult than the previous ones:
The job we’ll be creating is tasked with storing all of the telemetry data coming in. This will be stored as files in the blob storage container we created.
We’ll start by choosing an input alias. Choose something that makes sense because you’ll be using the same alias to set-up the query afterwards. As source type we’ll chose a data stream and as source itself, an IoT hub. Now we need to specify the name of the hub (as you created it). The shared access policy name and policy key you can find in the IoT hub instance. Tip: you can open multiple instances of the Azure portal to find these kinds of things to avoid having to go back and forth all the time. If you click your IoT hub instance and then click the key icon, you’ll find the access policies. The one you want to use is iothubowner (paste in “Shared access policy name”), click it and use the primary key for “Shared access policy key”. The consumer group is one of the groups we configured above, in this case telemetrycg because we’re storing telemetry messages.
Click “Create” and wait a few seconds for Azure to create your input.
Creating the ASA Job output
Now click “Outputs” and then “Add” to create an output. Choose a name again, keep in mind you’ll need that in your query. This output will be pointing to the storage account we created earlier so now we’ll select Blob storage as Sink. Next we need to specify the storage account, key and container name. The account and key you can also find by opening the storage account in the portal and clicking the key icon. The container name is the one you chose yourself.
The path pattern is used to automatically create a folder structure in which the output files are stored. I chose “devicetelemetry/date”, along with the “YYYY/MM/DD” date format this will return in the following folder structure:
I selected CSV as the output type simply because the remote monitoring sample uses the same format, still need to find out why this is not JSON.
Click Create and wait a little while for the output to get created. Now we have both ends of the channel set-up, we can now write the query to connect the two.
Creating the ASA Job query
Click the Query tile to open up the query editor window. This is similar to what you might know of working in SQL Server Management Studio, although a bit more primitive. I haven’t yet investigated how complex you can (or should) make these queries, since for this example that’s not really needed. Here’s the code of the query:
WITH [StreamData] AS ( SELECT * FROM [DeviceDataStream] WHERE [ObjectType] IS NULL -- Filter out device info and command responses ) SELECT * INTO [Telemetry] FROM [StreamData]
If you know how to read SQL, this should not be difficult. The first bit assigns the input from the [DeviceDataStream] (our Input) to the name [StreamData], but filters out the records with an ObjectType specified. These are the messages I’m using for device identification, which you do not want to store in your telemetry data store.
Next, this entire data set is selected into the [Telemetry] output, which is our blob store. Notice that in this part of the job, the query doesn’t have any clue about whether the output is a blob store, document DB or anything else. Azure just takes care of all of the data transformations required, quite nice!
Do not forget: start the job!
Awesome, we now have an input, output and a query connecting the two. The only step left is to start the job. Should make sense that jobs that are not running will also not process any data. Starting the job is as easy as hitting “Start” and waiting a bit for the job to list as “Running”. It will now be processing data, so once you start sending messages, the job should write those to the blob storage where you can then find it, download the file and check out its content.
To find these files, open your storage account and click “Blobs”. Next select your container and on the right hand side you should see the folder structure beginning with the “devicetelemetry” folder. Navigate down and eventually you’ll see the file you can then download and open in Excel. The above image shows a sample of the data collected from my Raspberry Pi 🙂
Nice to knows
Whenever I think of things that are helpful to know, I’ll add them below. Hope it helps!
- If you made a mistake anywhere in the process, you’ll probably end up with a ASA job listed as “Degraded”. Now there are some logs you can inspect, but unfortunately those do not seem to be available in the new azure portal experience just yes. Instead, head over to https://manage.windowazure.com and find the job there. You should now see the option “Operation Logs” where you’ll find more info about why the job has stopped.
- In the old portal, you can also select the “Sample Data” option for ASA job inputs. You might try to use this to inspect the data coming from the input. Unfortunately this option just doesn’t seem to work. At least I haven’t found a way to get it going, not even with a perfectly functioning ASA job. So whenever you see the message: “There was an error while reading sample input. Please check if the input source is configured correctly and data is in correct format.”, this does not guarantee that there is an actual problem with your input, it might be totally fine. I hope this will get fixed once the “sample data” option makes it into the new portal experience.