diff --git a/tools/utils/src/push_data.rs b/tools/utils/src/push_data.rs index 0c6bdb59..d2aa49b4 100644 --- a/tools/utils/src/push_data.rs +++ b/tools/utils/src/push_data.rs @@ -25,6 +25,10 @@ struct ShadowPayload { async fn main() { let stream = std::env::args().nth(1).unwrap_or_else(|| "c2c_can".to_string()); let port = std::env::args().nth(2).unwrap_or_else(|| "127.0.0.1:5050".to_string()); + let rate = std::env::args().nth(3) + .and_then(|s| s.parse::().ok()) + .unwrap_or(1000); + let interval = Duration::from_micros(1_000_000 / rate); let mut framed = Framed::new(TcpStream::connect(port).await.unwrap(), LinesCodec::new()); let mut idx = 0; loop { @@ -46,6 +50,6 @@ async fn main() { }; let data_s = serde_json::to_string(&data).unwrap(); framed.send(data_s).await.unwrap(); - sleep(Duration::from_micros(500)).await; + sleep(interval).await; } } diff --git a/uplink/src/base/serializer/mod.rs b/uplink/src/base/serializer/mod.rs index 7fd4c0aa..cec919cd 100644 --- a/uplink/src/base/serializer/mod.rs +++ b/uplink/src/base/serializer/mod.rs @@ -168,8 +168,8 @@ pub struct Serializer { /// Read and reset in `send_stream_metrics` stream_metrics: HashMap, /// a monotonically increasing counter - /// assigned to each packet placed in the live data slot - /// when fetching packets, we sort by this and return the oldest live data + /// used to track when was the last + /// when fetching packets, we sort by this and return the live data that has the most stale data /// if this isn't done, live data for a high frequency stream can block live data for other streams live_data_counter: usize, sorted_storages: BTreeMap, (Box, Option, usize)>, @@ -232,9 +232,11 @@ impl Serializer { /// Prioritize live data over saved data /// Prioritize old live data over new live data, to ensure live data for all the streams is pushed fn fetch_next_packet_from_storage(&mut self) -> Option<(Publish, Arc)> { - if let Some((sk, (_, live_data, _))) = self.sorted_storages.iter_mut() + if let Some((sk, (_, live_data, live_data_version))) = self.sorted_storages.iter_mut() .filter(|(_, (_, live_data, _))| live_data.is_some()) .min_by_key(|(_, (_, _, live_data_version))| *live_data_version) { + self.live_data_counter += 1; + *live_data_version = self.live_data_counter; return Some((live_data.take().unwrap(), sk.clone())); } let mut cursor = BTreeCursorMut::new(&mut self.sorted_storages); @@ -269,16 +271,14 @@ impl Serializer { fn write_publish_to_storage(&mut self, sk: Arc, mut publish: Publish) { publish.pkid = 1; - self.live_data_counter += 1; if ! self.sorted_storages.contains_key(&sk) { self.sorted_storages.insert(sk.clone(), (self.create_storage_for_stream(&sk), None, 0)); } let mut packet_to_write = Some(publish); - let (storage, live_data, live_data_version) = self.sorted_storages.get_mut(&sk).unwrap(); + let (storage, live_data, _) = self.sorted_storages.get_mut(&sk).unwrap(); if self.config.prioritize_live_data { std::mem::swap(&mut packet_to_write, live_data); - *live_data_version = self.live_data_counter; } if let Some(publish) = packet_to_write { match storage.write_packet(publish) {