From 534c74b873dee79cecde69b2c45b5e00a4bb0f84 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 14 Oct 2024 16:07:26 +0530 Subject: [PATCH] feat: allow option per-stream --- configs/config.toml | 10 ++++++++-- uplink/src/base/serializer/mod.rs | 4 ++-- uplink/src/config.rs | 5 ++++- uplink/tests/serializer.rs | 8 ++++++-- 4 files changed, 20 insertions(+), 7 deletions(-) diff --git a/configs/config.toml b/configs/config.toml index bf9c491e..70d75393 100644 --- a/configs/config.toml +++ b/configs/config.toml @@ -23,7 +23,7 @@ max_stream_count = 10 # All streams will first push the latest packet before pushing historical data in # FIFO order, defaults to false. This solves the problem of bad networks leading to # data being pushed so slow that it is practically impossible to track the device. -live_data_first = true +default_live_data_first = true # MQTT client configuration # @@ -89,13 +89,19 @@ blacklist = ["cancollector_metrics", "candump_metrics", "pinger"] # used when there is a network/system failure. # - priority(optional, u8): Higher prioirity streams get to push their data # onto the network first. +# - live_data_first(optional, bool): All streams will first push the latest packet +# before pushing historical data in FIFO order, defaults to false. This solves the +# problem of bad networks leading to data being pushed so slow that it is practically +# impossible to track the device. # # In the following config for the device_shadow stream we set batch_size to 1 and mark -# it as non-persistent. streams are internally constructed as a map of Name -> Config +# it as non-persistent, also setting up live_data_first to enable quick delivery of stats. +# Streams are internally constructed as a map of Name -> Config [streams.device_shadow] topic = "/tenants/{tenant_id}/devices/{device_id}/events/device_shadow/jsonarray" flush_period = 5 priority = 75 +live_data_first = true # Example using compression [streams.imu] diff --git a/uplink/src/base/serializer/mod.rs b/uplink/src/base/serializer/mod.rs index c12efef7..f0bc0c32 100644 --- a/uplink/src/base/serializer/mod.rs +++ b/uplink/src/base/serializer/mod.rs @@ -233,7 +233,7 @@ impl StorageHandler { let mut storage = Storage::new( &stream_config.topic, stream_config.persistence.max_file_size, - config.live_data_first, + stream_config.live_data_first, ); if stream_config.persistence.max_file_count > 0 { let mut path = config.persistence_path.clone(); @@ -264,7 +264,7 @@ impl StorageHandler { Storage::new( &stream.topic, self.config.default_buf_size, - self.config.live_data_first, + self.config.default_live_data_first, ) }) .write(publish) diff --git a/uplink/src/config.rs b/uplink/src/config.rs index 15679599..0c1921b3 100644 --- a/uplink/src/config.rs +++ b/uplink/src/config.rs @@ -93,6 +93,8 @@ pub struct StreamConfig { pub persistence: Persistence, #[serde(default)] pub priority: u8, + #[serde(default)] + pub live_data_first: bool, } impl Default for StreamConfig { @@ -104,6 +106,7 @@ impl Default for StreamConfig { compression: Compression::Disabled, persistence: Persistence::default(), priority: 0, + live_data_first: false, } } } @@ -537,7 +540,7 @@ pub struct Config { pub precondition_checks: Option, pub bus: Option, #[serde(default)] - pub live_data_first: bool, + pub default_live_data_first: bool, } impl Config { diff --git a/uplink/tests/serializer.rs b/uplink/tests/serializer.rs index c702948d..948dd77f 100644 --- a/uplink/tests/serializer.rs +++ b/uplink/tests/serializer.rs @@ -223,7 +223,6 @@ async fn prefer_live_data() { let mut config = Config::default(); config.default_buf_size = 1024 * 1024; config.mqtt.max_packet_size = 1024 * 1024; - config.live_data_first = true; let config = Arc::new(config); let (data_tx, data_rx) = bounded(0); let (net_tx, req_rx) = bounded(0); @@ -237,7 +236,12 @@ async fn prefer_live_data() { spawn(async { let mut default = MockCollector::new( "default", - StreamConfig { topic: "topic/default".to_owned(), batch_size: 1, ..Default::default() }, + StreamConfig { + topic: "topic/default".to_owned(), + batch_size: 1, + live_data_first: true, + ..Default::default() + }, data_tx, ); for i in 0.. {