Skip to content

Commit

Permalink
impl
Browse files Browse the repository at this point in the history
  • Loading branch information
ks6088ts committed Oct 23, 2024
1 parent 3a34b20 commit 35843cd
Show file tree
Hide file tree
Showing 6 changed files with 236 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -162,3 +162,6 @@ cython_debug/
# Project
*.env
requirements.txt
*.pem
*.crt
*.key
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,7 @@ server: ## run server
.PHONY: env
env: ## create env files
@sh scripts/create_env_files.sh

.PHONY: mosquitto
mosquitto: ## run mosquitto
cd configs/mosquitto && mosquitto -c tls.conf
12 changes: 12 additions & 0 deletions configs/mosquitto/tls.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
per_listener_settings true

listener 1883
allow_anonymous true

listener 8883
allow_anonymous true
require_certificate true
cafile chain.pem
certfile localhost.crt
keyfile localhost.key
tls_version tlsv1.2
56 changes: 56 additions & 0 deletions docs/scenarios/2_azure_event_grid_messaging.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Azure Event Grid Messaging

This scenario demonstrates how to handle messages from Azure Event Grid.

## Architecture

[![architecture](../assets/2_architecture.png)](../assets/2_architecture.png)

## Setup

```shell
# Create CA certificate and key
step ca init \
--deployment-type standalone \
--name MqttAppSamplesCA \
--dns localhost \
--address 127.0.0.1:443 \
--provisioner MqttAppSamplesCAProvisioner

# Create client certificate and key
CLIENT_NAME=client1

mkdir -p $CLIENT_NAME
step certificate create sample_client $CLIENT_NAME/sample_client.pem $CLIENT_NAME/sample_client.key \
--ca ~/.step/certs/intermediate_ca.crt \
--ca-key ~/.step/secrets/intermediate_ca_key \
--no-password --insecure \
--not-after 2400h

# Create chain.pem
cat ~/.step/certs/root_ca.crt ~/.step/certs/intermediate_ca.crt > $CLIENT_NAME/chain.pem

# Encoded Certificate
cat ~/.step/certs/intermediate_ca.crt | tr -d "\n"

# Set up mosquitto
cat ~/.step/certs/root_ca.crt ~/.step/certs/intermediate_ca.crt > configs/mosquitto/chain.pem
step certificate create localhost configs/mosquitto/localhost.crt configs/mosquitto/localhost.key \
--ca ~/.step/certs/intermediate_ca.crt \
--ca-key ~/.step/secrets/intermediate_ca_key \
--no-password \
--insecure \
--not-after 2400h

# Run mosquitto
cd configs/mosquitto
mosquitto -c tls.conf
```

## Edge device

To understand how to use the Azure Event Grid, we provide a script which is supposed to run on the edge device. This demonstrates how to send and receive messages from the Azure Event Grid.

- [Azure-Samples/MqttApplicationSamples](https://github.com/Azure-Samples/MqttApplicationSamples)

## Demo
12 changes: 12 additions & 0 deletions eventgrid.env.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
MQTT_HOST_NAME="localhost" or "EVENT_GRID_NAMESPACE_NAME.japaneast-1.ts.eventgrid.azure.net"
MQTT_TCP_PORT="8883"
MQTT_USE_TLS="true"
MQTT_CLEAN_SESSION="true"
MQTT_KEEP_ALIVE_IN_SECONDS="30"
MQTT_CLIENT_ID="sample_client"
MQTT_USERNAME="sample_client"
MQTT_PASSWORD=""
MQTT_CA_FILE="chain.pem"
MQTT_CERT_FILE="sample_client.pem"
MQTT_KEY_FILE="sample_client.key"
MQTT_KEY_FILE_PASSWORD=""
149 changes: 149 additions & 0 deletions scripts/event_grid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
import logging
import ssl
import time

import paho.mqtt.client as mqtt
import typer
from dotenv import load_dotenv

app = typer.Typer()
logger = logging.getLogger(__name__)

logging.basicConfig(level=logging.DEBUG)


def get_connection_settings(
client_name: str,
host_name: str = "localhost",
):
return {
"MQTT_HOST_NAME": host_name,
"MQTT_USERNAME": client_name,
"MQTT_CLIENT_ID": client_name,
"MQTT_CERT_FILE": f"{client_name}/sample_client.pem",
"MQTT_KEY_FILE": f"{client_name}/sample_client.key",
"MQTT_CA_FILE": f"{client_name}/chain.pem",
"MQTT_TCP_PORT": 8883,
"MQTT_USE_TLS": True,
"MQTT_CLEAN_SESSION": True,
"MQTT_KEEP_ALIVE_IN_SECONDS": 60,
"MQTT_KEY_FILE_PASSWORD": None,
}


def get_mqtt_client(
connection_settings: dict,
):
client = mqtt.Client(
client_id=connection_settings["MQTT_CLIENT_ID"],
clean_session=connection_settings["MQTT_CLEAN_SESSION"],
protocol=mqtt.MQTTv311,
transport="tcp",
)
if "MQTT_USERNAME" in connection_settings:
client.username_pw_set(
username=connection_settings["MQTT_USERNAME"],
password=connection_settings["MQTT_PASSWORD"] if "MQTT_PASSWORD" in connection_settings else None,
)
if connection_settings["MQTT_USE_TLS"]:
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.minimum_version = ssl.TLSVersion.TLSv1_2
context.maximum_version = ssl.TLSVersion.TLSv1_3

if connection_settings["MQTT_CERT_FILE"]:
context.load_cert_chain(
certfile=connection_settings["MQTT_CERT_FILE"],
keyfile=connection_settings["MQTT_KEY_FILE"],
password=connection_settings["MQTT_KEY_FILE_PASSWORD"],
)
if "MQTT_CA_FILE" in connection_settings:
context.load_verify_locations(
cafile=connection_settings["MQTT_CA_FILE"],
)
else:
context.load_default_certs()

client.tls_set_context(context)
return client


def attach_functions(client: mqtt.Client) -> mqtt.Client:
client.on_connect = lambda client, userdata, flags, rc: logger.info(
f"on_connect: client={client}, userdata={userdata}, flags={flags}, rc={rc}"
)
client.on_disconnect = lambda client, userdata, rc: logger.info(
f"on_disconnect: client={client}, userdata={userdata}, rc={rc}"
)
client.on_message = lambda client, userdata, message: logger.info(
f"on_message: client={client}, userdata={userdata}, message={message.payload.decode()}"
)
return client


@app.command()
def publish(
topic: str = "sample/topic1",
payload: str = "Hello, World!",
client_name: str = "client1",
verbose: bool = False,
):
if verbose:
logging.basicConfig(level=logging.DEBUG)

connection_settings = get_connection_settings(client_name=client_name)
mqtt_client = get_mqtt_client(connection_settings)
mqtt_client = attach_functions(mqtt_client)

mqtt_client.connect(
host=connection_settings["MQTT_HOST_NAME"],
port=connection_settings["MQTT_TCP_PORT"],
)

mqtt_client.loop_start()
msg_count = 1
result = mqtt_client.publish(
topic=topic,
payload=payload,
)

while True:
time.sleep(1)
msg = f"messages: {msg_count}"
result = mqtt_client.publish(topic, msg)
# result: [0, 1]
status = result[0]
if status == 0:
print(f"Send `{msg}` to topic `{topic}`")
else:
print(f"Failed to send message to topic {topic}, status={status}")
msg_count += 1
if msg_count > 5:
break

mqtt_client.loop_stop()


@app.command()
def subscribe(
topic: str = "sample/topic1",
client_name: str = "client1",
verbose: bool = False,
):
if verbose:
logging.basicConfig(level=logging.DEBUG)

connection_settings = get_connection_settings(client_name=client_name)
mqtt_client = get_mqtt_client(connection_settings)
mqtt_client = attach_functions(mqtt_client)

mqtt_client.connect(
host=connection_settings["MQTT_HOST_NAME"],
port=connection_settings["MQTT_TCP_PORT"],
)
mqtt_client.subscribe(topic)
mqtt_client.loop_forever()


if __name__ == "__main__":
load_dotenv("iot_hub.env")
app()

0 comments on commit 35843cd

Please sign in to comment.