Skip to content

Commit

Permalink
make sure live data never becomes stale
Browse files Browse the repository at this point in the history
  • Loading branch information
amokfa committed Nov 28, 2024
1 parent dccc109 commit 5d39851
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 7 deletions.
6 changes: 5 additions & 1 deletion tools/utils/src/push_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ struct ShadowPayload {
async fn main() {
let stream = std::env::args().nth(1).unwrap_or_else(|| "c2c_can".to_string());
let port = std::env::args().nth(2).unwrap_or_else(|| "127.0.0.1:5050".to_string());
let rate = std::env::args().nth(3)
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(1000);
let interval = Duration::from_micros(1_000_000 / rate);
let mut framed = Framed::new(TcpStream::connect(port).await.unwrap(), LinesCodec::new());
let mut idx = 0;
loop {
Expand All @@ -46,6 +50,6 @@ async fn main() {
};
let data_s = serde_json::to_string(&data).unwrap();
framed.send(data_s).await.unwrap();
sleep(Duration::from_micros(500)).await;
sleep(interval).await;
}
}
12 changes: 6 additions & 6 deletions uplink/src/base/serializer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ pub struct Serializer<C: MqttClient> {
/// Read and reset in `send_stream_metrics`
stream_metrics: HashMap<String, StreamMetrics>,
/// a monotonically increasing counter
/// assigned to each packet placed in the live data slot
/// when fetching packets, we sort by this and return the oldest live data
/// used to track when was the last
/// when fetching packets, we sort by this and return the live data that has the most stale data
/// if this isn't done, live data for a high frequency stream can block live data for other streams
live_data_counter: usize,
sorted_storages: BTreeMap<Arc<StreamConfig>, (Box<dyn storage::Storage>, Option<Publish>, usize)>,
Expand Down Expand Up @@ -232,9 +232,11 @@ impl<C: MqttClient> Serializer<C> {
/// Prioritize live data over saved data
/// Prioritize old live data over new live data, to ensure live data for all the streams is pushed
fn fetch_next_packet_from_storage(&mut self) -> Option<(Publish, Arc<StreamConfig>)> {
if let Some((sk, (_, live_data, _))) = self.sorted_storages.iter_mut()
if let Some((sk, (_, live_data, live_data_version))) = self.sorted_storages.iter_mut()
.filter(|(_, (_, live_data, _))| live_data.is_some())
.min_by_key(|(_, (_, _, live_data_version))| *live_data_version) {
self.live_data_counter += 1;
*live_data_version = self.live_data_counter;
return Some((live_data.take().unwrap(), sk.clone()));
}
let mut cursor = BTreeCursorMut::new(&mut self.sorted_storages);
Expand Down Expand Up @@ -269,16 +271,14 @@ impl<C: MqttClient> Serializer<C> {

fn write_publish_to_storage(&mut self, sk: Arc<StreamConfig>, mut publish: Publish) {
publish.pkid = 1;
self.live_data_counter += 1;
if ! self.sorted_storages.contains_key(&sk) {
self.sorted_storages.insert(sk.clone(), (self.create_storage_for_stream(&sk), None, 0));
}

let mut packet_to_write = Some(publish);
let (storage, live_data, live_data_version) = self.sorted_storages.get_mut(&sk).unwrap();
let (storage, live_data, _) = self.sorted_storages.get_mut(&sk).unwrap();
if self.config.prioritize_live_data {
std::mem::swap(&mut packet_to_write, live_data);
*live_data_version = self.live_data_counter;
}
if let Some(publish) = packet_to_write {
match storage.write_packet(publish) {
Expand Down

0 comments on commit 5d39851

Please sign in to comment.