Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Serializer pushes latest data first if configured to do so #365

Merged
merged 5 commits into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion configs/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ default_buf_size = 1024 # 1KB
# Maximum number of data streams that can be accepted by uplink
max_stream_count = 10

# All streams will first push the latest packet before pushing historical data in
# FIFO order, defaults to false. This solves the problem of bad networks leading to
# data being pushed so slow that it is practically impossible to track the device.
default_live_data_first = true

# MQTT client configuration
#
# Required Parameters
Expand Down Expand Up @@ -84,13 +89,19 @@ blacklist = ["cancollector_metrics", "candump_metrics", "pinger"]
# used when there is a network/system failure.
# - priority(optional, u8): Higher prioirity streams get to push their data
# onto the network first.
# - live_data_first(optional, bool): All streams will first push the latest packet
# before pushing historical data in FIFO order, defaults to false. This solves the
# problem of bad networks leading to data being pushed so slow that it is practically
# impossible to track the device.
#
# In the following config for the device_shadow stream we set batch_size to 1 and mark
# it as non-persistent. streams are internally constructed as a map of Name -> Config
# it as non-persistent, also setting up live_data_first to enable quick delivery of stats.
# Streams are internally constructed as a map of Name -> Config
[streams.device_shadow]
topic = "/tenants/{tenant_id}/devices/{device_id}/events/device_shadow/jsonarray"
flush_period = 5
priority = 75
live_data_first = true

# Example using compression
[streams.imu]
Expand Down
55 changes: 45 additions & 10 deletions uplink/src/base/serializer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{sync::Arc, time::Duration};

use bytes::Bytes;
use flume::{bounded, Receiver, RecvError, Sender, TrySendError};
use log::{debug, error, info, trace};
use log::{debug, error, info, trace, warn};
use lz4_flex::frame::FrameEncoder;
use rumqttc::*;
use thiserror::Error;
Expand Down Expand Up @@ -133,11 +133,17 @@ impl MqttClient for AsyncClient {

pub struct Storage {
inner: storage::Storage,
live_data_first: bool,
latest_data: Option<Publish>,
}

impl Storage {
pub fn new(name: impl Into<String>, max_file_size: usize) -> Self {
Self { inner: storage::Storage::new(name, max_file_size) }
pub fn new(name: impl Into<String>, max_file_size: usize, live_data_first: bool) -> Self {
Self {
inner: storage::Storage::new(name, max_file_size),
live_data_first,
latest_data: None,
}
}

pub fn set_persistence(
Expand All @@ -151,6 +157,13 @@ impl Storage {
// Stores the provided publish packet by serializing it into storage, after setting its pkid to 1.
// If the write buffer is full, it is flushed/written onto disk based on config.
pub fn write(&mut self, mut publish: Publish) -> Result<Option<u64>, storage::Error> {
if self.live_data_first {
let Some(previous) = self.latest_data.replace(publish) else { return Ok(None) };
publish = previous;
} else if self.latest_data.is_some() {
warn!("Latest data should be unoccupied if not using the live data first scheme");
}

publish.pkid = 1;
if let Err(e) = publish.write(self.inner.writer()) {
error!("Failed to fill disk buffer. Error = {e}");
Expand All @@ -170,6 +183,10 @@ impl Storage {
// ## Panic
// When any packet other than a publish is deserialized.
pub fn read(&mut self, max_packet_size: usize) -> Option<Publish> {
if let Some(publish) = self.latest_data.take() {
return Some(publish);
}

// TODO(RT): This can fail when packet sizes > max_payload_size in config are written to disk.
// This leads to force switching to normal mode. Increasing max_payload_size to bypass this
match Packet::read(self.inner.reader(), max_packet_size) {
Expand All @@ -184,6 +201,15 @@ impl Storage {

// Ensures all data is written into persistence, when configured.
pub fn flush(&mut self) -> Result<Option<u64>, storage::Error> {
// Write live cache to disk when flushing
if let Some(mut publish) = self.latest_data.take() {
publish.pkid = 1;
if let Err(e) = publish.write(self.inner.writer()) {
error!("Failed to fill disk buffer. Error = {e}");
return Ok(None);
}
}

self.inner.flush()
}
}
Expand All @@ -202,8 +228,11 @@ impl StorageHandler {
// NOTE: persist action_status if not configured otherwise
streams.insert("action_status".into(), config.action_status.clone());
for (stream_name, stream_config) in streams {
let mut storage =
Storage::new(&stream_config.topic, stream_config.persistence.max_file_size);
let mut storage = Storage::new(
&stream_config.topic,
stream_config.persistence.max_file_size,
stream_config.live_data_first,
);
if stream_config.persistence.max_file_count > 0 {
let mut path = config.persistence_path.clone();
path.push(&stream_name);
Expand All @@ -229,7 +258,13 @@ impl StorageHandler {
match self
.map
.entry(stream.to_owned())
.or_insert_with(|| Storage::new(&stream.topic, self.config.default_buf_size))
.or_insert_with(|| {
Storage::new(
&stream.topic,
self.config.default_buf_size,
self.config.default_live_data_first,
)
})
.write(publish)
{
Ok(Some(deleted)) => {
Expand Down Expand Up @@ -907,7 +942,7 @@ pub mod tests {
fn read_write_storage() {
let config = Arc::new(default_config());

let mut storage = Storage::new("hello/world", 1024);
let mut storage = Storage::new("hello/world", 1024, false);
let mut publish = Publish::new(
"hello/world",
QoS::AtLeastOnce,
Expand Down Expand Up @@ -1024,7 +1059,7 @@ pub mod tests {
.storage_handler
.map
.entry(Arc::new(Default::default()))
.or_insert(Storage::new("hello/world", 1024));
.or_insert(Storage::new("hello/world", 1024, false));

let (stream_name, stream_config) = (
"hello",
Expand Down Expand Up @@ -1082,7 +1117,7 @@ pub mod tests {
topic: "hello/world".to_string(),
..Default::default()
}))
.or_insert(Storage::new("hello/world", 1024));
.or_insert(Storage::new("hello/world", 1024, false));

let (stream_name, stream_config) = (
"hello",
Expand Down Expand Up @@ -1194,7 +1229,7 @@ pub mod tests {
priority: 0,
..Default::default()
}))
.or_insert(Storage::new("topic/default", 1024));
.or_insert(Storage::new("topic/default", 1024, false));
default.write(publish("topic/default".to_string(), 0)).unwrap();
default.write(publish("topic/default".to_string(), 2)).unwrap();

Expand Down
5 changes: 5 additions & 0 deletions uplink/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ pub struct StreamConfig {
pub persistence: Persistence,
#[serde(default)]
pub priority: u8,
#[serde(default)]
pub live_data_first: bool,
}

impl Default for StreamConfig {
Expand All @@ -104,6 +106,7 @@ impl Default for StreamConfig {
compression: Compression::Disabled,
persistence: Persistence::default(),
priority: 0,
live_data_first: false,
}
}
}
Expand Down Expand Up @@ -536,6 +539,8 @@ pub struct Config {
pub logging: Option<LogcatConfig>,
pub precondition_checks: Option<PreconditionCheckerConfig>,
pub bus: Option<BusConfig>,
#[serde(default)]
pub default_live_data_first: bool,
}

impl Config {
Expand Down
60 changes: 56 additions & 4 deletions uplink/tests/serializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use bytes::Bytes;
use flume::bounded;
use rumqttc::{Publish, QoS, Request};
use tempdir::TempDir;
use tokio::{runtime::Runtime, spawn};
use tokio::{runtime::Runtime, spawn, time::sleep};

use uplink::{
base::{bridge::Payload, serializer::Serializer},
Expand Down Expand Up @@ -83,18 +83,18 @@ async fn preferential_send_on_network() {
};

// write packets for one, two and top onto disk
let mut one = Storage::new("topic/one", 1024 * 1024);
let mut one = Storage::new("topic/one", 1024 * 1024, false);
one.set_persistence(persistence_path(&config.persistence_path, "one"), 1).unwrap();
one.write(publish("topic/one".to_string(), 4)).unwrap();
one.write(publish("topic/one".to_string(), 5)).unwrap();
one.flush().unwrap();

let mut two = Storage::new("topic/two", 1024 * 1024);
let mut two = Storage::new("topic/two", 1024 * 1024, false);
two.set_persistence(persistence_path(&config.persistence_path, "two"), 1).unwrap();
two.write(publish("topic/two".to_string(), 3)).unwrap();
two.flush().unwrap();

let mut top = Storage::new("topic/top", 1024 * 1024);
let mut top = Storage::new("topic/top", 1024 * 1024, false);
top.set_persistence(persistence_path(&config.persistence_path, "top"), 1).unwrap();
top.write(publish("topic/top".to_string(), 1)).unwrap();
top.write(publish("topic/top".to_string(), 2)).unwrap();
Expand Down Expand Up @@ -216,3 +216,55 @@ async fn fifo_data_push() {
assert_eq!(topic, "topic/default");
assert_eq!(payload, "[{\"sequence\":2,\"timestamp\":0,\"msg\":\"Hello, World!\"}]");
}

#[tokio::test]
// Ensures that live data is pushed first if configured to do so
async fn prefer_live_data() {
let mut config = Config::default();
config.default_buf_size = 1024 * 1024;
config.mqtt.max_packet_size = 1024 * 1024;
config.default_live_data_first = true;
let config = Arc::new(config);
let (data_tx, data_rx) = bounded(0);
let (net_tx, req_rx) = bounded(0);
let (metrics_tx, _metrics_rx) = bounded(1);
let client = MockClient { net_tx };
let serializer = Serializer::new(config, data_rx, client, metrics_tx).unwrap();

// start serializer in the background
thread::spawn(|| _ = Runtime::new().unwrap().block_on(serializer.start()));

spawn(async {
let mut default = MockCollector::new(
"default",
StreamConfig { topic: "topic/default".to_owned(), batch_size: 1, ..Default::default() },
data_tx,
);
for i in 0.. {
default.send(i).await.unwrap();
sleep(Duration::from_millis(250)).await;
}
});

sleep(Duration::from_millis(750)).await;
let Request::Publish(Publish { topic, payload, .. }) = req_rx.recv_async().await.unwrap()
else {
unreachable!()
};
assert_eq!(topic, "topic/default");
assert_eq!(payload, "[{\"sequence\":0,\"timestamp\":0,\"msg\":\"Hello, World!\"}]");

let Request::Publish(Publish { topic, payload, .. }) = req_rx.recv_async().await.unwrap()
else {
unreachable!()
};
assert_eq!(topic, "topic/default");
assert_eq!(payload, "[{\"sequence\":2,\"timestamp\":0,\"msg\":\"Hello, World!\"}]");

let Request::Publish(Publish { topic, payload, .. }) = req_rx.recv_async().await.unwrap()
else {
unreachable!()
};
assert_eq!(topic, "topic/default");
assert_eq!(payload, "[{\"sequence\":1,\"timestamp\":0,\"msg\":\"Hello, World!\"}]");
}
Loading