Direct Methods with IOTHub in C#

There is a new preview in town. With this you can invoke a direct method on a device. Currently only MQTT devices are supported in this scenario. There is a nice article with some NodeJS samples. When Roman Kiss posted on the Azure Forum that he would like to write his simulated device in C# I thought this might be an nice opportunity to figure out why this does not work.

Well the answer is pretty simple: It is not yet implemented in the C# SDK.

But being me I decided to make the “impossible” possible (for the fun sake of it). First I did pull the complete preview of the Azure IOT Sdks  from github. Then I spend some time in figuring out what the NodeJS implementation does. I love debugging JavaScript *sigh*.

And then I quickly modded (aka hacked) the Microsoft.Azure.Devices.Client (Be aware that this is not an optimal solution Smile). These are the changes I made:

Microsoft.Azure.Devices.Client – MqttIotHubAdapter

sealed class MqttIotHubAdapter : ChannelHandlerAdapter
...
const string TelemetryTopicFormat = "devices/{0}/messages/events/";
// ADDED =>
const string MethodTopicFilterFormat = "$iothub/methods/POST/#";
const string MethodTopicFormat = "$iothub/methods/res/{0}/?$rid={1}";

Microsoft.Azure.Devices.Client – MqttIotHubAdapter – Connect Function

This was the most difficult to find out, because I did not expect this “hack”. Expect the unexpectable!
async void Connect(IChannelHandlerContext context)
{
...
var connectPacket = new ConnectPacket
{
ClientId = this.deviceId,
HasUsername = true,
// CHANGED => You need to add this weird suffix to make it work!
Username = this.iotHubHostName + "/" + this.deviceId + "/DeviceClientType=azure-iot-device%2F1.1.0-dtpreview&api-version=2016-09-30-preview",
HasPassword = !string.IsNullOrEmpty(this.password),

Microsoft.Azure.Devices.Client – MqttIotHubAdapter – SubscribeAsync Function
Here I added the method topic subscription!

async Task SubscribeAsync(IChannelHandlerContext context)
{
if (this.IsInState(StateFlags.Receiving) || this.IsInState(StateFlags.Subscribing))
{
return;
}

this.stateFlags |= StateFlags.Subscribing;

this.subscribeCompletion = new TaskCompletionSource();
string topicFilter = CommandTopicFilterFormat.FormatInvariant(this.deviceId);
var subscribePacket = new SubscribePacket(Util.GetNextPacketId(), new SubscriptionRequest(topicFilter, this.mqttTransportSettings.ReceivingQoS));
System.Diagnostics.Debug.WriteLine($"Topic filter: {topicFilter}");
await Util.WriteMessageAsync(context, subscribePacket, ShutdownOnWriteErrorHandler);
await this.subscribeCompletion.Task;

// ADDED => WE are using the const I decleared earlier to construct the topicFilter
this.subscribeCompletion = new TaskCompletionSource();
topicFilter = MethodTopicFilterFormat.FormatInvariant(this.deviceId);
System.Diagnostics.Debug.WriteLine($"Topic filter: {topicFilter}");
subscribePacket = new SubscribePacket(Util.GetNextPacketId(), new SubscriptionRequest(topicFilter, this.mqttTransportSettings.ReceivingQoS/*QualityOfService.AtMostOnce*/));
await Util.WriteMessageAsync(context, subscribePacket, ShutdownOnWriteErrorHandler);
await this.subscribeCompletion.Task;
// <= ADDED

}
Microsoft.Azure.Devices.Client – MqttIotHubAdapter –SendMessageAsync Function
Since we do want to acknowledge the arrival of the method we need to modify this too:
async Task SendMessageAsync(IChannelHandlerContext context, Message message)
{
// CHANGED => For our publish message we need to send to a different topic
string topicName = null;
if (message.Properties.ContainsKey("methodName"))
topicName = string.Format(MethodTopicFormat, message.Properties["status"], message.Properties["requestID"]);
else
topicName = string.Format(TelemetryTopicFormat, this.deviceId);
// <= CHANGED

PublishPacket packet = await Util.ComposePublishPacketAsync(context, message, this.mqttTransportSettings.PublishToServerQoS, topicName);
...
Microsoft.Azure.Devices.Client – MqttTransportHandler – ReceiveAsync Function
Since we do not get a lockToken with the Methodcall, we should not enqueue the Null in our completionQueue
public override async Task<Message> ReceiveAsync(TimeSpan timeout)
{
...
Message message;
lock (this.syncRoot)
{
this.messageQueue.TryDequeue(out message);
message.LockToken = message.LockToken;
// Changed line below to exclude LockTokens that are null #HACK better check if it is a Method message
if ((message.LockToken != null)&&(this.qos == QualityOfService.AtLeastOnce) )
{
this.completionQueue.Enqueue(message.LockToken);
}
...
Microsoft.Azure.Devices.Client – Util– ComposePublishPacketAsync
A little change here to prevent that this method “destroys” our carefully constructed topic name earlier.
public static async Task<PublishPacket> ComposePublishPacketAsync(IChannelHandlerContext context, Message message, QualityOfService qos, string topicName)
{
var packet = new PublishPacket(qos, false, false);

// MODIFIED ==>
if (message.Properties.ContainsKey("methodName"))
packet.TopicName = topicName; // Make sure to keep our Topic Name
else
packet.TopicName = PopulateMessagePropertiesFromMessage(topicName, message);
// <== MODIFIED
...

Microsoft.Azure.Devices.Client – Util– PopulateMessagePropertiesFromPacket
And finally we need to populate our method Messages with properties like our requestID, methodName,…
public static void PopulateMessagePropertiesFromPacket(Message message, PublishPacket publish)
{
message.LockToken = publish.QualityOfService == QualityOfService.AtLeastOnce ? publish.PacketId.ToString() : null;

// MODIFIED ==>
Dictionary<string, string> properties = null;
if (publish.TopicName.StartsWith("$iothub/methods"))
{
var segments = publish.TopicName.Split('/');
properties = UrlEncodedDictionarySerializer.Deserialize(segments[4].Replace("?$rid", "requestID"), 0);
properties.Add("methodName", segments[3]);
properties.Add("verb", segments[2]);
}
else
properties = UrlEncodedDictionarySerializer.Deserialize(publish.TopicName, publish.TopicName.NthIndexOf('/', 0, 4) + 1);
// <== MODIFIED

foreach (KeyValuePair<string, string> property in properties)
{
...

Building the simulated device with the modded Microsoft.Azure.Devices.Client SDK
Just create a new Console application and reference the modded SDK
 
using Microsoft.Azure.Devices.Client;
using System;
using System.Collections.Generic;
using System.Text;

namespace DeviceClientCS
{
class Program
{
private static async void ReceiveCloudToDeviceMessageAsync(DeviceClient client,
string theDeviceID)
{
Console.WriteLine($"Receiving messages from Cloud for device {theDeviceID}");
while (true)
{
Message receivedMessage = await client.ReceiveAsync();
if (receivedMessage == null) continue;

Console.ForegroundColor = ConsoleColor.Yellow;
Console.WriteLine($"Received method ({receivedMessage.Properties["methodName"]}): {Encoding.ASCII.GetString(receivedMessage.GetBytes())} for device {theDeviceID} - Verb: {receivedMessage.Properties["verb"]}");
Console.ResetColor();

// ACKNOWLEDGE the method call
byte[] msg = Encoding.ASCII.GetBytes("Input was written to log.");
Message respondMethodMessage = new Message();
foreach (KeyValuePair<string, string> kv in receivedMessage.Properties)
respondMethodMessage.Properties.Add(kv.Key, kv.Value);
respondMethodMessage.Properties.Add("status", "200");
await client.SendEventAsync(respondMethodMessage);
}
}


static void Main(string[] args)
{
string deviceID= "myDeviceId";
string connectionString = "<Your device connection string goes here>";

DeviceClient client = DeviceClient.CreateFromConnectionString(connectionString, deviceID, TransportType.Mqtt);
ReceiveCloudToDeviceMessageAsync(client, deviceID);
Console.ReadLine();
}
}
}
 
And here is a final screen shot of my results:
Result
Cheers
AndiP

 
 
 
 

Ein Gedanke zu “Direct Methods with IOTHub in C#

  1. Pingback: Direct Methods and MQTT Box in action | spectologic

Kommentar verfassen

Trage deine Daten unten ein oder klicke ein Icon um dich einzuloggen:

WordPress.com-Logo

Du kommentierst mit Deinem WordPress.com-Konto. Abmelden / Ändern )

Twitter-Bild

Du kommentierst mit Deinem Twitter-Konto. Abmelden / Ändern )

Facebook-Foto

Du kommentierst mit Deinem Facebook-Konto. Abmelden / Ändern )

Google+ Foto

Du kommentierst mit Deinem Google+-Konto. Abmelden / Ändern )

Verbinde mit %s