diff --git a/.gitignore b/.gitignore index 899e077..a55c35b 100644 --- a/.gitignore +++ b/.gitignore @@ -162,3 +162,6 @@ cython_debug/ # Project *.env requirements.txt +*.pem +*.crt +*.key diff --git a/Makefile b/Makefile index 1bda522..c6660c9 100644 --- a/Makefile +++ b/Makefile @@ -132,3 +132,7 @@ server: ## run server .PHONY: env env: ## create env files @sh scripts/create_env_files.sh + +.PHONY: mosquitto +mosquitto: ## run mosquitto + cd configs/mosquitto && mosquitto -c tls.conf diff --git a/configs/mosquitto/tls.conf b/configs/mosquitto/tls.conf new file mode 100644 index 0000000..dd2ae59 --- /dev/null +++ b/configs/mosquitto/tls.conf @@ -0,0 +1,12 @@ +per_listener_settings true + +listener 1883 +allow_anonymous true + +listener 8883 +allow_anonymous true +require_certificate true +cafile chain.pem +certfile localhost.crt +keyfile localhost.key +tls_version tlsv1.2 diff --git a/docs/scenarios/2_azure_event_grid_messaging.md b/docs/scenarios/2_azure_event_grid_messaging.md new file mode 100644 index 0000000..799ea6b --- /dev/null +++ b/docs/scenarios/2_azure_event_grid_messaging.md @@ -0,0 +1,56 @@ +# Azure Event Grid Messaging + +This scenario demonstrates how to handle messages from Azure Event Grid. + +## Architecture + +[![architecture](../assets/2_architecture.png)](../assets/2_architecture.png) + +## Setup + +```shell +# Create CA certificate and key +step ca init \ + --deployment-type standalone \ + --name MqttAppSamplesCA \ + --dns localhost \ + --address 127.0.0.1:443 \ + --provisioner MqttAppSamplesCAProvisioner + +# Create client certificate and key +CLIENT_NAME=client1 + +mkdir -p $CLIENT_NAME +step certificate create sample_client $CLIENT_NAME/sample_client.pem $CLIENT_NAME/sample_client.key \ + --ca ~/.step/certs/intermediate_ca.crt \ + --ca-key ~/.step/secrets/intermediate_ca_key \ + --no-password --insecure \ + --not-after 2400h + +# Create chain.pem +cat ~/.step/certs/root_ca.crt ~/.step/certs/intermediate_ca.crt > $CLIENT_NAME/chain.pem + +# Encoded Certificate +cat ~/.step/certs/intermediate_ca.crt | tr -d "\n" + +# Set up mosquitto +cat ~/.step/certs/root_ca.crt ~/.step/certs/intermediate_ca.crt > configs/mosquitto/chain.pem +step certificate create localhost configs/mosquitto/localhost.crt configs/mosquitto/localhost.key \ + --ca ~/.step/certs/intermediate_ca.crt \ + --ca-key ~/.step/secrets/intermediate_ca_key \ + --no-password \ + --insecure \ + --not-after 2400h + +# Run mosquitto +cd configs/mosquitto +mosquitto -c tls.conf +``` + +## Edge device + +To understand how to use the Azure Event Grid, we provide a script which is supposed to run on the edge device. This demonstrates how to send and receive messages from the Azure Event Grid. + +- [Azure-Samples/MqttApplicationSamples](https://github.com/Azure-Samples/MqttApplicationSamples) + +## Demo diff --git a/eventgrid.env.template b/eventgrid.env.template new file mode 100644 index 0000000..a74a2de --- /dev/null +++ b/eventgrid.env.template @@ -0,0 +1,12 @@ +MQTT_HOST_NAME="localhost" or "EVENT_GRID_NAMESPACE_NAME.japaneast-1.ts.eventgrid.azure.net" +MQTT_TCP_PORT="8883" +MQTT_USE_TLS="true" +MQTT_CLEAN_SESSION="true" +MQTT_KEEP_ALIVE_IN_SECONDS="30" +MQTT_CLIENT_ID="sample_client" +MQTT_USERNAME="sample_client" +MQTT_PASSWORD="" +MQTT_CA_FILE="chain.pem" +MQTT_CERT_FILE="sample_client.pem" +MQTT_KEY_FILE="sample_client.key" +MQTT_KEY_FILE_PASSWORD="" diff --git a/scripts/event_grid.py b/scripts/event_grid.py new file mode 100644 index 0000000..11de51f --- /dev/null +++ b/scripts/event_grid.py @@ -0,0 +1,149 @@ +import logging +import ssl +import time + +import paho.mqtt.client as mqtt +import typer +from dotenv import load_dotenv + +app = typer.Typer() +logger = logging.getLogger(__name__) + +logging.basicConfig(level=logging.DEBUG) + + +def get_connection_settings( + client_name: str, + host_name: str = "localhost", +): + return { + "MQTT_HOST_NAME": host_name, + "MQTT_USERNAME": client_name, + "MQTT_CLIENT_ID": client_name, + "MQTT_CERT_FILE": f"{client_name}/sample_client.pem", + "MQTT_KEY_FILE": f"{client_name}/sample_client.key", + "MQTT_CA_FILE": f"{client_name}/chain.pem", + "MQTT_TCP_PORT": 8883, + "MQTT_USE_TLS": True, + "MQTT_CLEAN_SESSION": True, + "MQTT_KEEP_ALIVE_IN_SECONDS": 60, + "MQTT_KEY_FILE_PASSWORD": None, + } + + +def get_mqtt_client( + connection_settings: dict, +): + client = mqtt.Client( + client_id=connection_settings["MQTT_CLIENT_ID"], + clean_session=connection_settings["MQTT_CLEAN_SESSION"], + protocol=mqtt.MQTTv311, + transport="tcp", + ) + if "MQTT_USERNAME" in connection_settings: + client.username_pw_set( + username=connection_settings["MQTT_USERNAME"], + password=connection_settings["MQTT_PASSWORD"] if "MQTT_PASSWORD" in connection_settings else None, + ) + if connection_settings["MQTT_USE_TLS"]: + context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + context.minimum_version = ssl.TLSVersion.TLSv1_2 + context.maximum_version = ssl.TLSVersion.TLSv1_3 + + if connection_settings["MQTT_CERT_FILE"]: + context.load_cert_chain( + certfile=connection_settings["MQTT_CERT_FILE"], + keyfile=connection_settings["MQTT_KEY_FILE"], + password=connection_settings["MQTT_KEY_FILE_PASSWORD"], + ) + if "MQTT_CA_FILE" in connection_settings: + context.load_verify_locations( + cafile=connection_settings["MQTT_CA_FILE"], + ) + else: + context.load_default_certs() + + client.tls_set_context(context) + return client + + +def attach_functions(client: mqtt.Client) -> mqtt.Client: + client.on_connect = lambda client, userdata, flags, rc: logger.info( + f"on_connect: client={client}, userdata={userdata}, flags={flags}, rc={rc}" + ) + client.on_disconnect = lambda client, userdata, rc: logger.info( + f"on_disconnect: client={client}, userdata={userdata}, rc={rc}" + ) + client.on_message = lambda client, userdata, message: logger.info( + f"on_message: client={client}, userdata={userdata}, message={message.payload.decode()}" + ) + return client + + +@app.command() +def publish( + topic: str = "sample/topic1", + payload: str = "Hello, World!", + client_name: str = "client1", + verbose: bool = False, +): + if verbose: + logging.basicConfig(level=logging.DEBUG) + + connection_settings = get_connection_settings(client_name=client_name) + mqtt_client = get_mqtt_client(connection_settings) + mqtt_client = attach_functions(mqtt_client) + + mqtt_client.connect( + host=connection_settings["MQTT_HOST_NAME"], + port=connection_settings["MQTT_TCP_PORT"], + ) + + mqtt_client.loop_start() + msg_count = 1 + result = mqtt_client.publish( + topic=topic, + payload=payload, + ) + + while True: + time.sleep(1) + msg = f"messages: {msg_count}" + result = mqtt_client.publish(topic, msg) + # result: [0, 1] + status = result[0] + if status == 0: + print(f"Send `{msg}` to topic `{topic}`") + else: + print(f"Failed to send message to topic {topic}, status={status}") + msg_count += 1 + if msg_count > 5: + break + + mqtt_client.loop_stop() + + +@app.command() +def subscribe( + topic: str = "sample/topic1", + client_name: str = "client1", + verbose: bool = False, +): + if verbose: + logging.basicConfig(level=logging.DEBUG) + + connection_settings = get_connection_settings(client_name=client_name) + mqtt_client = get_mqtt_client(connection_settings) + mqtt_client = attach_functions(mqtt_client) + + mqtt_client.connect( + host=connection_settings["MQTT_HOST_NAME"], + port=connection_settings["MQTT_TCP_PORT"], + ) + mqtt_client.subscribe(topic) + mqtt_client.loop_forever() + + +if __name__ == "__main__": + load_dotenv("iot_hub.env") + app()