diff --git a/api/openapi.yaml b/api/openapi.yaml new file mode 100644 index 0000000..7228db2 --- /dev/null +++ b/api/openapi.yaml @@ -0,0 +1,160 @@ +openapi: 3.1.0 +info: + title: Drogue Doppelgaegner API + license: + identifier: Apache-2.0 + name: Apache 2.0 + contact: + url: https://drogue.io + version: "0.1.0" + +paths: + '/api/v1alpha1/{application}/{thing}': + get: + responses: + '200': + description: Returns the state of the thing. + '404': + description: The thing could not be found. + '500': + description: An internal error occurred. + content: + 'application/json': + schema: + $ref: '#/components/schemas/ErrorInformation' + + +components: + + parameters: + application: + name: application + in: path + description: The application scope + schema: + type: string + device: + name: thing + in: path + description: The name of the thing + schema: + type: string + + schemas: + + Thing: + type: object + properties: + metadata: + $ref: '#/components/schemas/Metadata' + schema: + $ref: '#/components/schemas/Schema' + + + Metadata: + type: object + properties: + name: + type: string + application: + type: string + generation: + type: number + minimum: 0 + resourceVersion: + type: string + labels: + type: string + additionalProperties: + type: string + annotations: + type: string + additionalProperties: + type: string + + required: + - name + - application + + Schema: + type: object + + ReportedState: + type: object + additionalProperties: + $ref: '#/components/schemas/ReportedFeature' + + DesiredState: + type: object + additionalProperties: + $ref: '#/components/schemas/DesiredFeature' + + SyntheticState: + type: object + additionalProperties: + $ref: '#/components/schemas/SyntheticFeature' + + Reconciliation: + type: object + + ReportedFeature: + type: object + properties: + lastUpdate: + $ref: "Timestamp" + description: "Timestamp of the last update" + value: + $ref: "#/components/schemas/AnyValue" + required: + - lastUpdate + + DesiredFeature: + type: object + properties: + lastUpdate: + $ref: "Timestamp" + description: "Timestamp of the last update" + validUntil: + $ref: "Timestamp" + lastAttempt: + $ref: "Timestamp" + value: + $ref: "#/components/schemas/AnyValue" + method: + type: object + description: The method of reconciling reported vs desired state + required: + - lastUpdate + + SyntheticFeature: + oneOf: + - type: object + properties: + script: + type: string + - type: object + properties: + wasm: + $ref: '#/components/schemas/Blob' + + ErrorInformation: + type: object + properties: + error: + type: string + description: The machine-readable error type. + message: + type: string + description: A human-readable error message. + required: + - error + + Timestamp: + type: string + format: "date-time" + + Blob: + type: string + format: "byte" + + AnyValue: {} diff --git a/ideas/brainstorming.adoc b/ideas/brainstorming.adoc new file mode 100644 index 0000000..123cb9a --- /dev/null +++ b/ideas/brainstorming.adoc @@ -0,0 +1,383 @@ += Some thoughts on digital twins + +NOTE: While I use YAML in this document, this will most likely all be JSON based. + +== Things to figure out + +* <> +* <> +* <> +* <> +* RBAC, Security, ACL, … + +[#state_store] +=== State store + +==== Postgres + +We know it, we love it, it sucks with frequent updates. It is overkill for a KV use-case. Available as-a-service. + +==== TiKV + +KV store, written in Rust, CNCF project. Would mean we would need to take care of operating it. + +==== MongoDB + +Well known, KV store, can be consumed as-a-service, but has license issues. Available as-a-service. + +==== Aerospike + +Looks promising, not open source. Available as-a-service. + +==== Apache Cassandra + +Again, looks promising, open source. But: Java. Kind of available as-a-service. + +==== Redis + +KV store, open source, efficient. Available as-a-service. Could even help with notifications. Plus Redis could help +with the notification process. + +Redis is mostly in-memory. Which isn't bad, actually rather good. It also supports eviction, so we should be able +to have "hot" things in memory, and "cold" things in storage. + +=== Multi-tenancy/multi-application + +It feels a bit like the twin is kind of an add-on, and it would be easier to focus on a single-tenant-instance +deployment model. + +Then again, small tenants might just be overkill for a single Redis/XYZ instance, so sharing that might make sense + +Also, someone needs to read from Kafka. Either through MQTT, or through Kafka directly. So that would be some process +that we need to scheduled too. And that runs all the logic. It might be easier to just schedule one (or more) per +application/tenant. + +Proposal: keep multi-tenant APIs, ensure that we use the data store in multi-tenant way, but focus on a +single-tenant-instance deployment model first. + +[#timers] +=== Timers + +Need: ask to be periodically called + +We could send an event to the topic to ensure that the right processor picks it up. Spams the topic though. On the +"pro" side, this would keep the order of events. Even a timer trigger should be queued. + +Besides, it is a wakeup timer. So not a periodic one. + +[#notifications] +=== Notifications + +Classic event topic, all listeners scan and filter. Pretty noisy: listeners might filter out most of the messages. + +Solution: Use Kafka, and Kafka partitions. First step now, second step later. See: <> + +[#features_or_not] +=== Explicit features vs raw JSON + +Do we need to enforce first level of features? Couldn't it be just a `Value`? + +== Data model + +What do I want to have? + +[source,yaml] +---- +metadata: + name: device-1 + resourceVersion: 8910186e-feb1-11ec-bc27-d45d6455d2cc # random version + generation: 1 # up-ticked with **every** change + labels: + room: "4711" + building: "south" + +internal: + wakeup: "2020-07-06T13:21:00.123Z" + +schema: + # enforce the state section + +reportedState: + feature1: + lastUpdate: "2020-07-06T13:21:00.123Z" + value: {} + feature2: + lastUpdate: "2020-07-06T13:21:00.123Z" + value: "some value" + feature3: + lastUpdate: "2020-07-06T13:21:00.123Z" + value: + value: 12 + type: u32 + feature4: + lastUpdate: "2020-07-06T13:21:00.123Z" + value: + sourceTimestamp: 1234567890 + value: true + quality: good + feature5: + lastUpdate: "2020-07-06T13:21:00.123Z" + value: + some: + complex: "datastructure" + even: + - with + - arrays + +desiredState: + feature2: + lastUpdate: "2020-07-06T13:21:00.123Z" + validUntil: "2020-07-06T13:26:00.123Z" + lastAttempt: "2020-07-06T13:21:00.456Z" + value: "some other value" + state: reconciling # success + method: + script: | + + command: + channel: set-feature2 + payload: + raw: {} + jsonInject: + base: # { "command": {"feature": "feature1", "value": {}}} + command: + feature: feature1 + value: {} + value: .command.value # -> replaces .command.value with the actual value + + +# passive, local calculations +syntheticState: # built on local features, maybe executed on fetch + feature6: + script: "" + wasm: {} # ?? + + +reconciliation: + changed: + - script: | + someScript(); + # example: put in group with "too-high" + if (changed("some-feature") && newState["some-feature"].value > 1000) { + addReference("other-device", "feature-too-high"); + } else { + //removeReference("other-device", "feature-too-high"); + updateState("other-device", [ + {"op": "remove", "path": "/feature-too-high/value", value: [this()]} + ]); + } + - script: | + # example + if (changed("some-feature")) { + updateAggregate("some-device", "some-other-feature", newState["some-feature"]) + } + + periodic: + command: + period: 1m + enabled: false # defaults to true + lastExecution: "2020-07-06T13:21:00.123Z" + script: | + if (desiredState["feature1"].value != state["feature1"].value ) { + sendCommand("set-feature", {"feature": "feature1", "value": desiredState["feature1"].value }); + } else { + disableTimer("command"); + } + +pendingEvents: + - {} + - {} +---- + +== Update process + +=== Processor + +NOTE: The processor runs a single device only on a single process, sequentially. Ensured by Kafka's partition key. + +* Process expired pending and unset events as batch (if no non-expired gap) (way 1, if any) -> ERR: retry (optimization) +* Fetch current state (oplock) +* Apply changes +* Run synthetics +* Run reconciliations +* Commit + ** Way 1 + *** Append pending events (if there are pending events, mark new as unsent) + *** Schedule wakeup if there are pending events + *** Write everything (W1) -> ERR: retry all + *** Send pending events if there had been none in the beginning, or those that are expired -> ERR: skip (to notify) + *** Clear all successfully sent events, and wakeup (optimization) (W2) -> ERR: skip (to notify) + ** Way 2 + *** Send events + *** Write everything (W1) -> ERR: retry all +* Notify listeners (of this device) + * Send to notification server (first version, just send to single pod) + + +==== Committing + +Way 1: +* (-) More complex +* (+) Less wrong/extra events + +Way 2: +* (+) Simpler +* (-) More duplicate events, in the case of persistence errors +* (-) Can spam kafka topic, in case changes cannot get persisted + +[#notifying] +==== Notifying + +* Step 1 +** Simple Kafka topic: notify with "generation" +* Step 2 +** Two layers: Kafka consumer (tied to single partition), Notifier (finds consumer by hash) +** Scaled up by times the number of Kafka partitions +** Further scale up: scale up kafka partitions + +=== Outbox clearer + +* Event on the wire + ** clear from device + ** perform internal direct update # required to unblock the processor queue + ** commit if changed -> might break oplock + +== Schema + +Adding a schema to the schema section would ensure that the data in the `reportedState` section validated by that +schema. + +Having no schema doesn't enforce anything. Applying/updating a schema is only possible if the new schema would validate +too. + +The schema could be JSON Schema. + +NOTE: What about desired and synthetic features? +NOTE: What about stuff like WoT? + +== Updating data + +Push messages: + +[source, yaml] +---- +mode: update # replace +state: + feature1: "value" + feature2: + value: 1 + type: u32 + timestamp: 123456789 + feature3: + some: + complex: + - thing + - with + - arrays + maybe: more + feature4: {} # erase +---- + +== Fetching data + +=== Get the full model + +[source] +---- +GET /api/v1alpha1/things/{application}/{thing} +---- + +[source,yaml] +---- +metadata: + … +state: + … +… +---- + +=== Get state only + +[source] +---- +GET /api/v1alpha1/things/{application}/{thing}/state +---- + +[source,yaml] +---- +feature1: {} +feature2: "some-value" +---- + +=== Get state only (detailed mode) + +[source] +---- +GET /api/v1alpha1/things/{application}/{thing}/state?details=true +---- + +[source,yaml] +---- +feature1: + lastUpdate: "2022…" + value: {} +feature2: + lastUpdate: "2022…" + value: "value" +---- + +=== Subscribe + +[source] +---- +CONNECT /api/v1alpha1/things/{application} +---- + +==== Start subscription + +[source,yaml] +---- +subscribe: + thing: device1 + scope: [] # defaults to ["state"] + details: false + diff: true # initial full state, then JSON patch +---- + +==== Stop subscription + +[source,yaml] +---- +unsubscribe: + thing: device1 +---- + +=== Updating + +==== Full state update + +[source] +---- +POST /api/v1alpha1/things/{application}/{thing} +---- + +==== Sub-resource state update (e.g. state) + +[source] +---- +POST /api/v1alpha1/things/{application}/{thing}/state +---- + +==== Patching + +[source] +---- +PATCH /api/v1alpha1/things/{application}/{thing} +---- + +==== Sub-resource patching (e.g. state) + +[source] +---- +PATCH /api/v1alpha1/things/{application}/{thing}/state +----