Skip to content

Commit

Permalink
Add Update manager how-to guide and concept (#340)
Browse files Browse the repository at this point in the history
[#339] Add Update manager how-to guide and concept

Signed-off-by: Kristiyan Gostev <[email protected]>
  • Loading branch information
k-gostev authored Jun 5, 2024
1 parent 22c8da9 commit 6874c0e
Show file tree
Hide file tree
Showing 10 changed files with 479 additions and 33 deletions.
2 changes: 1 addition & 1 deletion quickstart/hono_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def on_message(self, event):
print('[got response]')
response = json.loads(event.message.body)
print(json.dumps(response, indent=2))
if response["status"] == 204:
if 200 <= response["status"] <= 299:
print('[ok]', command)
else:
print('[error]')
Expand Down
237 changes: 237 additions & 0 deletions quickstart/hono_commands_um.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
# Copyright (c) 2024 Contributors to the Eclipse Foundation
#
# See the NOTICE file(s) distributed with this work for additional
# information regarding copyright ownership.
#
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
# which is available at https://www.apache.org/licenses/LICENSE-2.0.
#
# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0

import getopt
import json
import os
import signal
import sys
import threading
import time
import uuid

from string import Template
from proton import Message
from proton.handlers import MessagingHandler
from proton.reactor import Container

alpine_container_template = """
{
"id": "alpine",
"version": "latest",
"config": [
{
"key": "image",
"value": "docker.io/library/alpine:latest"
},
{
"key": "restartPolicy",
"value": "no"
}
]
},"""

containers_desired_state = Template("""
{
"topic": "$namespace/$name/things/live/messages/apply",
"headers": {
"content-type": "application/json",
"correlation-id": "$correlation_id",
"response-required": true
},
"path": "/features/UpdateManager/inbox/messages/apply",
"value": {
"activityId": "$activity_id",
"desiredState": {
"domains": [
{
"id": "containers",
"config": [],
"components": [
{
"id": "influxdb",
"version": "2.7.1",
"config": [
{
"key": "image",
"value": "docker.io/library/influxdb:$influxdb_version"
}
]
},
$alpine_container
{
"id": "hello-world",
"version": "latest",
"config": [
{
"key": "image",
"value": "docker.io/library/hello-world:latest"
},
{
"key": "restartPolicy",
"value": "no"
}
]
}
]
}
]
}
}
}
""")

containers_desired_state_clean_up = Template("""
{
"topic": "$namespace/$name/things/live/messages/apply",
"headers": {
"content-type": "application/json",
"correlation-id": "$correlation_id",
"response-required": true
},
"path": "/features/UpdateManager/inbox/messages/apply",
"value": {
"activityId": "$activity_id",
"desiredState": {
"domains": [
{
"id": "containers",
"config": [],
"components": []
}
]
}
}
}
""")

um_refresh_state = Template("""
{
"topic": "$namespace/$name/things/live/messages/apply",
"headers": {
"content-type": "application/json",
"correlation-id": "$correlation_id",
"response-required": true
},
"path": "/features/UpdateManager/inbox/messages/refresh",
"value": {
"activityId": "$activity_id",
}
}
""")


class CommandResponsesHandler(MessagingHandler):
def __init__(self, server, address):
super(CommandResponsesHandler, self).__init__()
self.server = server
self.address = address

def on_start(self, event):
conn = event.container.connect(self.server, user="consumer@HONO", password="verysecret")
event.container.create_receiver(conn, self.address)
print('[connected]')

def on_message(self, event):
print('[got response]')
response = json.loads(event.message.body)
print(json.dumps(response, indent=2))
if response["status"] == 204:
print('[ok]', "um")
else:
print('[error]')
event.receiver.close()
event.connection.close()

def on_connection_closed(self, event):
print('[connection closed]')
os.kill(os.getpid(), signal.SIGINT)


class CommandsInvoker(MessagingHandler):
def __init__(self, server, address):
super(CommandsInvoker, self).__init__()
self.server = server
self.address = address

def on_start(self, event):
conn = event.container.connect(self.server, sasl_enabled=True, allowed_mechs="PLAIN", allow_insecure_mechs=True,
user="consumer@HONO", password="verysecret")
event.container.create_sender(conn, self.address)

def on_sendable(self, event):
print('[sending command]')
correlation_id = str(uuid.uuid4())
namespaced_id = device_id.split(':', 1)
activity_id = str(uuid.uuid4())

influxdb_version = "1.8.4"
alpine_container = alpine_container_template
if operation == "update":
influxdb_version = "1.8.5"
alpine_container = ""
if operation == "clean":
payload = containers_desired_state_clean_up.substitute(namespace=namespaced_id[0],
name=namespaced_id[1],
correlation_id=correlation_id,
activity_id=activity_id)
else:
payload = containers_desired_state.substitute(namespace=namespaced_id[0], name=namespaced_id[1],
correlation_id=correlation_id,
influxdb_version=influxdb_version,
alpine_container=alpine_container,
activity_id=activity_id)
print(json.dumps(json.loads(payload), indent=2))
msg = Message(body=payload, address='{}/{}'.format(address, device_id),
content_type="application/json",
subject="um", reply_to=reply_to_address, correlation_id=correlation_id, id=str(uuid.uuid4()))
event.sender.send(msg)
event.sender.close()
event.connection.close()
print('[sent]')


# Parse command line args
options, reminder = getopt.getopt(sys.argv[1:], 't:d:o:')
opts_dict = dict(options)
tenant_id = os.environ.get("TENANT") or opts_dict['-t']
device_id = os.environ.get("DEVICE_ID") or opts_dict['-d']
operation = opts_dict['-o']

# AMQP global configurations
uri = 'amqps://hono.eclipseprojects.io:15671'
address = 'command/{}'.format(tenant_id)
reply_to_address = 'command_response/{}/replies'.format(tenant_id)
print('[starting] demo update manager app for tenant [{}], device [{}] at [{}]'.format(tenant_id, device_id, uri))

# Create command invoker and handler
response_handler = Container(CommandResponsesHandler(uri, reply_to_address))
commands_invoker = Container(CommandsInvoker(uri, address))
thread = threading.Thread(target=lambda: response_handler.run(), daemon=True)
thread.start()
# Give it some time to link
time.sleep(2)
# Send the command
commands_invoker.run()


def handler(signum, frame):
print('[stopping] demo update manager app for tenant [{}], device [{}] at [{}]'.format(tenant_id, device_id, uri))
response_handler.stop()
thread.join(timeout=5)
print('[stopped]')
exit(0)


signal.signal(signal.SIGINT, handler)
while True:
pass
20 changes: 14 additions & 6 deletions quickstart/hono_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,24 @@ def on_start(self, event):
print('[connected]')

def on_message(self, event):
print('[event received]')
if event.message.body is not None:
print(json.dumps(json.loads(event.message.body), indent=2))
else:
print('<empty>')
body = json.loads(event.message.body)
if topic_filter != "" and topic_filter != body['topic']:
return
print('[event received]')
print(json.dumps(body, indent=2))
else:
print('[empty event received]')


# Parse command line args
options, reminder = getopt.getopt(sys.argv[1:], 't:')
tenant_id = dict(options).get('-t') or os.environ["TENANT"]
options, reminder = getopt.getopt(sys.argv[1:], 't:f:')
opts_dict = dict(options)
tenant_id = os.environ.get("TENANT") or opts_dict['-t']
if '-f' in opts_dict:
topic_filter = opts_dict['-f']
else:
topic_filter = ""

uri = 'amqps://hono.eclipseprojects.io:15671'
address = 'event/{}'.format(tenant_id)
Expand Down
35 changes: 35 additions & 0 deletions web/site/content/docs/concepts/update-manager.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
---
title: "Update manager"
type: docs
description: >
Empower the edge device for OTA updates.
weight: 2
---

Update manager enables a lightweight core component which is capable to easily perform complex OTA update scenarios on a target device. The following capabilities are provided:

* **Lightweight** - consists of a single core component which orchestrates the update process
* **Flexible deployment** - supports different deployment models - natively, as an executable or container
* **Unified API** - all domain agents utilize a unified Update Agent API for interacting with the Update Manager
* **MQTT Connectivity** - the connectivity and communication between the Update Manager and domain agent is MQTT-based
* **Multi-domain integration** - easily integrates, scales and performs complex update operations across multiple domains
* **Default update agents** - integrates with the Kanto provided out-of-the box domain update agent implementation for deployment of container into the Kanto container management
* **Pluggable architecture** - provides an extensible model for plug-in custom orchestration logic
* **Asynchronous updates** - asynchronous and independent update process across the different domains
* **Multi-staged updates** - the update process is organized into different stages
* **Configurable** - offers a variety of configuration options for connectivity, supported domains, message reporting and etc

![Update manager](/kanto/images/docs/concepts/update-manager.png)

### How it works

The update process is initiated by sending the desired state specification as an MQTT message towards the device, which is handled by the Update Manager component.

The desired state specification in the scope of the Update Manager is a JSON-based document, which consists of multiple component definitions per domain, representing the desired state to be applied on the target device.
A component in the same context means a single, atomic and updatable unit, for example, OCI-compliant container, software application or firmware image.

Each domain agent is a separate and independent software component, which implements the Update Agent API for interaction with the Update Manager and manages the update logic for concrete domain. For example - container management.

The Update Manager, operating as a coordinator, is responsible for processing the desired state specification, distributing the split specification across the different domain agents, orchestrating the update process via MQTT-based commands, collecting and consolidating the feedback responses from the domain update agents, and reporting the final result of the update campaign to the backend.

As extra features and capabilities, the Update Manager enables reboot of the host after the update process is completed and reporting of the current state of the system to the backend.
3 changes: 2 additions & 1 deletion web/site/content/docs/getting-started/install.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ container-management.service \
software-update.service \
file-upload.service \
file-backup.service \
system-metrics.service
system-metrics.service \
kanto-update-manager.service
```

All listed services must be in an active running state.
Expand Down
Loading

0 comments on commit 6874c0e

Please sign in to comment.