Skip to content

Commit

Permalink
feat: handle acks, change data format
Browse files Browse the repository at this point in the history
  • Loading branch information
ctron committed Feb 21, 2022
1 parent b0d9506 commit 8201d52
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 26 deletions.
63 changes: 41 additions & 22 deletions doppelgaenger-input/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
use bson::Bson;
use cloudevents::binding::rdkafka::MessageExt;
use cloudevents::event::ExtensionValue;
use cloudevents::{AttributesReader, Data, Event};
use bson::{Bson, Document};
use cloudevents::{
binding::rdkafka::MessageExt, event::ExtensionValue, AttributesReader, Data, Event,
};
use config::{Config, Environment};
use futures_util::stream::StreamExt;
use indexmap::IndexMap;
use mongodb::bson::doc;
use mongodb::options::{ClientOptions, UpdateOptions};
use mongodb::{Client, Database};
use rdkafka::config::FromClientConfig;
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::util::DefaultRuntime;
use mongodb::{
bson::doc,
options::{ClientOptions, UpdateOptions},
Client, Database,
};
use rdkafka::consumer::CommitMode;
use rdkafka::{
config::FromClientConfig,
consumer::{Consumer, StreamConsumer},
util::DefaultRuntime,
};
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use std::collections::HashMap;
Expand Down Expand Up @@ -91,14 +96,22 @@ impl Processor {
loop {
match stream.next().await.map(|r| {
r.map_err::<anyhow::Error, _>(|err| err.into())
.and_then(|msg| msg.to_event().map_err(|err| err.into()))
.and_then(|msg| {
msg.to_event()
.map_err(|err| err.into())
.map(|evt| (msg, evt))
})
}) {
None => break,
Some(Ok(event)) => {
if let Err(err) = self.handle(event).await {
Some(Ok(msg)) => {
if let Err(err) = self.handle(msg.1).await {
log::info!("Failed to handle event: {}", err);
break;
} else if let Err(err) = self.consumer.commit_message(&msg.0, CommitMode::Async)
{
log::info!("Failed to ack: {err}");
break;
}
// need to think about ACKing events
}
Some(Err(err)) => {
log::warn!("Failed to receive from Kafka: {err}");
Expand All @@ -119,21 +132,27 @@ impl Processor {

let collection = self.db.collection::<ThingState>(&event.application);

// let features: Bson = Bson::try_from(event.features)?;

//let features = bson::Document::try_from(event.features)?;
let features: Bson = event
.features
.try_into()
.map_err(|err| TwinEventError::Conversion(format!("Failed to convert: {err}")))?;
let mut update = Document::new();
for (k, v) in event.features {
let v: Bson = v.try_into()?;
update.insert(format!("features.{k}.properties"), v);
}

let update = doc! {
"$setOnInsert": {
"creationTimestamp": "$currentDate"
},
"$inc": {
"revision": 1,
},
"$set": features,
"$set": update,
"$set": {
"lastUpdateTimestamp": "$currentDate"
}
};

log::info!("Request update: {:#?}", update);

collection
.update_one(
doc! {
Expand Down
2 changes: 1 addition & 1 deletion doppelgaenger-websocket/templates/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ <h1>Dashboard example</h1>
setState("Connecting");

let address = location.origin.replace(/^http/, 'ws') + '/socket';
if (socketOverride !== undefined) {
if (typeof socketOverride !== 'undefined') {
address = socketOverride;
}
const websocket = new WebSocket(address);
Expand Down
6 changes: 3 additions & 3 deletions doppelgaenger-websocket/templates/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ function updateDevice(update) {

const content = $(`<dl></dl>`);
for (const feature of Object.keys(state.features).sort()) {
const properties = state.features[feature]
content.append($(`<dt>${feature}</dt><dd><code>${JSON.stringify(properties, null, 2)}</code></dd>`))
const properties = state.features[feature].properties;
content.append($(`<dt>${feature}</dt><dd><code>${JSON.stringify(properties, null, 2)}</code></dd>`));
}

// update
card.find(".card-text").empty().append(content)
card.find(".card-text").empty().append(content);
card.addClass("updated");
setTimeout(() => {
card.removeClass("updated");
Expand Down

0 comments on commit 8201d52

Please sign in to comment.