Skip to content

Commit

Permalink
run event grid scripts to communicate via MQTT broker of both mosquit…
Browse files Browse the repository at this point in the history
…to and Event Grid
  • Loading branch information
ks6088ts committed Oct 23, 2024
1 parent 35843cd commit e133540
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 43 deletions.
67 changes: 50 additions & 17 deletions docs/scenarios/2_azure_event_grid_messaging.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,49 +8,82 @@ This scenario demonstrates how to handle messages from Azure Event Grid.

## Setup

Refer to [Quickstart: Publish and subscribe to MQTT messages on Event Grid Namespace with Azure portal](https://learn.microsoft.com/en-us/azure/event-grid/mqtt-publish-and-subscribe-portal) to create an Event Grid Namespace and a topic.

[Azure-Samples/MqttApplicationSamples](https://github.com/Azure-Samples/MqttApplicationSamples) provides a sample application to publish and subscribe messages to the Event Grid.

### Create CA certificate and key

```shell
# Create CA certificate and key
step ca init \
--deployment-type standalone \
--name MqttAppSamplesCA \
--dns localhost \
--address 127.0.0.1:443 \
--provisioner MqttAppSamplesCAProvisioner
```

### Create client certificate and key

```shell
CLIENT_DIR=configs/clients
mkdir -p $CLIENT_DIR

# Create client certificate and key
CLIENT_NAME=client1

mkdir -p $CLIENT_NAME
step certificate create sample_client $CLIENT_NAME/sample_client.pem $CLIENT_NAME/sample_client.key \
step certificate create $CLIENT_NAME $CLIENT_DIR/$CLIENT_NAME.pem $CLIENT_DIR/$CLIENT_NAME.key \
--ca ~/.step/certs/intermediate_ca.crt \
--ca-key ~/.step/secrets/intermediate_ca_key \
--no-password --insecure \
--no-password \
--insecure \
--not-after 2400h

# Create chain.pem
cat ~/.step/certs/root_ca.crt ~/.step/certs/intermediate_ca.crt > $CLIENT_NAME/chain.pem
# Display certificate fingerprint to register the client on Azure Event Grid Namespace
step certificate fingerprint $CLIENT_DIR/$CLIENT_NAME.pem
```

### Create mosquitto configuration

# Encoded Certificate
cat ~/.step/certs/intermediate_ca.crt | tr -d "\n"
```shell
MOSQUITTO_DIR=configs/mosquitto
mkdir -p $MOSQUITTO_DIR

# Set up mosquitto
cat ~/.step/certs/root_ca.crt ~/.step/certs/intermediate_ca.crt > configs/mosquitto/chain.pem
step certificate create localhost configs/mosquitto/localhost.crt configs/mosquitto/localhost.key \
cat ~/.step/certs/root_ca.crt ~/.step/certs/intermediate_ca.crt > $MOSQUITTO_DIR/chain.pem

step certificate create localhost $MOSQUITTO_DIR/localhost.crt $MOSQUITTO_DIR/localhost.key \
--ca ~/.step/certs/intermediate_ca.crt \
--ca-key ~/.step/secrets/intermediate_ca_key \
--no-password \
--insecure \
--not-after 2400h

# Run mosquitto
cd configs/mosquitto
mosquitto -c tls.conf
make mosquitto
```

## Edge device
## Demo

To understand how to use the Azure Event Grid, we provide a script which is supposed to run on the edge device. This demonstrates how to send and receive messages from the Azure Event Grid.
```shell
# If you use localhost, run mosquitto first
make mosquitto

- [Azure-Samples/MqttApplicationSamples](https://github.com/Azure-Samples/MqttApplicationSamples)
# Set the host name of the Event Grid Namespace. If you use localhost, set it to localhost.
HOST_NAME=localhost
# HOST_NAME="EVENT_GRID_NAME.japaneast-1.ts.eventgrid.azure.net"

## Demo
# Subscribe the topic
poetry run python scripts/event_grid.py subscribe \
--topic "sample/topic1" \
--client-name client1 \
--host-name $HOST_NAME \
--verbose

# Publish messages to the topic
poetry run python scripts/event_grid.py publish \
--topic "sample/topic1" \
--payload "helloworld" \
--client-name client2 \
--host-name $HOST_NAME \
--verbose
```
46 changes: 20 additions & 26 deletions scripts/event_grid.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,18 @@
app = typer.Typer()
logger = logging.getLogger(__name__)

logging.basicConfig(level=logging.DEBUG)


def get_connection_settings(
host_name: str,
client_name: str,
host_name: str = "localhost",
):
) -> dict:
return {
"MQTT_HOST_NAME": host_name,
"MQTT_USERNAME": client_name,
"MQTT_CLIENT_ID": client_name,
"MQTT_CERT_FILE": f"{client_name}/sample_client.pem",
"MQTT_KEY_FILE": f"{client_name}/sample_client.key",
"MQTT_CA_FILE": f"{client_name}/chain.pem",
"MQTT_CERT_FILE": f"configs/clients/{client_name}.pem",
"MQTT_KEY_FILE": f"configs/clients/{client_name}.key",
"MQTT_CA_FILE": "configs/mosquitto/chain.pem",
"MQTT_TCP_PORT": 8883,
"MQTT_USE_TLS": True,
"MQTT_CLEAN_SESSION": True,
Expand Down Expand Up @@ -56,7 +54,7 @@ def get_mqtt_client(
keyfile=connection_settings["MQTT_KEY_FILE"],
password=connection_settings["MQTT_KEY_FILE_PASSWORD"],
)
if "MQTT_CA_FILE" in connection_settings:
if connection_settings["MQTT_HOST_NAME"] == "localhost":
context.load_verify_locations(
cafile=connection_settings["MQTT_CA_FILE"],
)
Expand Down Expand Up @@ -85,12 +83,16 @@ def publish(
topic: str = "sample/topic1",
payload: str = "Hello, World!",
client_name: str = "client1",
host_name: str = "localhost",
verbose: bool = False,
):
if verbose:
logging.basicConfig(level=logging.DEBUG)

connection_settings = get_connection_settings(client_name=client_name)
connection_settings = get_connection_settings(
client_name=client_name,
host_name=host_name,
)
mqtt_client = get_mqtt_client(connection_settings)
mqtt_client = attach_functions(mqtt_client)

Expand All @@ -100,25 +102,13 @@ def publish(
)

mqtt_client.loop_start()
msg_count = 1
result = mqtt_client.publish(
topic=topic,
payload=payload,
)

while True:
time.sleep(1)
msg = f"messages: {msg_count}"
result = mqtt_client.publish(topic, msg)
# result: [0, 1]
status = result[0]
if status == 0:
print(f"Send `{msg}` to topic `{topic}`")
else:
print(f"Failed to send message to topic {topic}, status={status}")
msg_count += 1
if msg_count > 5:
break
logger.info(result)
# fixme: wait for the message to be sent
time.sleep(1)

mqtt_client.loop_stop()

Expand All @@ -127,12 +117,16 @@ def publish(
def subscribe(
topic: str = "sample/topic1",
client_name: str = "client1",
host_name: str = "localhost",
verbose: bool = False,
):
if verbose:
logging.basicConfig(level=logging.DEBUG)

connection_settings = get_connection_settings(client_name=client_name)
connection_settings = get_connection_settings(
client_name=client_name,
host_name=host_name,
)
mqtt_client = get_mqtt_client(connection_settings)
mqtt_client = attach_functions(mqtt_client)

Expand All @@ -145,5 +139,5 @@ def subscribe(


if __name__ == "__main__":
load_dotenv("iot_hub.env")
load_dotenv("event_grid.env")
app()

0 comments on commit e133540

Please sign in to comment.