diff --git a/freyja/src/cartographer.rs b/freyja/src/cartographer.rs index c1fe6550..910cb674 100644 --- a/freyja/src/cartographer.rs +++ b/freyja/src/cartographer.rs @@ -18,7 +18,7 @@ use freyja_contracts::{ provider_proxy_request::{ ProviderProxySelectorRequestKind, ProviderProxySelectorRequestSender, }, - signal::{Emission, EmissionPolicy, Signal, Target}, + signal::{EmissionPolicy, Target, SignalPatch}, }; /// Manages mappings from the mapping service @@ -47,6 +47,8 @@ impl /// # Arguments /// - `signals`: the shared signal store /// - `mapping_client`: the client for the mapping service + /// - `digital_twin_client`: the client for the digital twin service + /// - `provider_proxy_selector_client`: the client for the provider proxy selector /// - `poll_interval`: the interval at which the cartographer should poll for changes pub fn new( signals: Arc, @@ -66,11 +68,13 @@ impl /// Run the cartographer. This will do the following in a loop: /// - /// 1. Check to see if the mapping service has more work. If not, skip to step 5 - /// 2. Send the new inventory to the mapping service - /// 3. Get the new mapping from the mapping service - /// 4. Update the shared map for the emitter - /// 5. Sleep until the next iteration + /// 1. Check to see if the mapping service has more work. If not, skip to the last step + /// 1. ~~Send the new inventory to the mapping service~~ + /// 1. Get the new mapping from the mapping service + /// 1. Query the digital twin service for entity information + /// 1. Create or update provider proxies for the new entities + /// 1. Update the signal store with the new data + /// 1. Sleep until the next iteration pub async fn run(&self) -> Result<(), Box> { loop { let mapping_client_result = self @@ -89,23 +93,20 @@ impl // TODO: will this notion of checking and sending inventory exist? // self.mapping_client.send_inventory(SendInventoryRequest { inventory: self.known_providers.clone() }).await?; - let signals_result = self.get_mapping_as_signals().await; - if signals_result.is_err() { - log::error!("Falied to get mapping from mapping client: {signals_result:?}"); + let patches_result = self.get_mapping_as_signal_patches().await; + if patches_result.is_err() { + log::error!("Falied to get mapping from mapping client: {patches_result:?}"); continue; } - let mut signals = signals_result.unwrap(); + let mut patches = patches_result.unwrap(); let mut failed_signals = Vec::new(); - // Some of these API calls are not really necessary, but this code gets executed - // infrequently enough that the sub-optimal performance is not a major concern. - // If Ibeji had a bulk find_by_id API there would be even less of a concern. - // TODO: punt stuff to the dt client and we call find_all here - // TODO: if there's a bulk api for providers then there probably needs to be a bulk api for proxies - // TODO: handle errors - for signal in signals.iter_mut() { - let populate_result = self.populate_entity(signal).await; + for patch in patches.iter_mut() { + // Many of the API calls in populate_entity are probably unnecessary, but this code gets executed + // infrequently enough that the sub-optimal performance is not a major concern. + // A bulk find_by_id API in the digital twin service would make this a non-issue + let populate_result = self.populate_entity(patch).await; if populate_result.is_err() { match populate_result @@ -114,28 +115,28 @@ impl .downcast::() { Ok(e) if e.kind() == DigitalTwinAdapterErrorKind::EntityNotFound => { - warn!("Entity not found for signal {}", signal.id); + warn!("Entity not found for signal {}", patch.id); } Ok(e) => { log::error!( "Error fetching entity for signal {}: {e:?}", - signal.id + patch.id ); } Err(e) => { log::error!( "Error fetching entity for signal {}: {e:?}", - signal.id + patch.id ); } } - failed_signals.push(signal.id.clone()); + failed_signals.push(patch.id.clone()); } } self.signals.sync( - signals + patches .into_iter() .filter(|s| !failed_signals.contains(&s.id)), ); @@ -145,37 +146,35 @@ impl } } - /// Gets the mapping from the mapping client and returns a corresponding list of signals. - async fn get_mapping_as_signals( + /// Gets the mapping from the mapping client and returns a corresponding list of signal patches. + async fn get_mapping_as_signal_patches( &self, - ) -> Result, Box> { + ) -> Result, Box> { Ok(self .mapping_client .get_mapping(GetMappingRequest {}) .await? .map .into_iter() - .map(|(id, entry)| Signal { + .map(|(id, entry)| SignalPatch { id, + // this gets populated later, set to default for now + source: Default::default(), target: Target { metadata: entry.target, }, - emission: Emission { - policy: EmissionPolicy { - interval_ms: entry.interval_ms, - emit_only_if_changed: entry.emit_on_change, - conversion: Conversion::default(), - }, - ..Default::default() + emission_policy: EmissionPolicy { + interval_ms: entry.interval_ms, + emit_only_if_changed: entry.emit_on_change, + conversion: Conversion::default(), }, - ..Default::default() }) .collect()) } async fn populate_entity( &self, - signal: &mut Signal, + signal: &mut SignalPatch, ) -> Result<(), Box> { signal.source = self .digital_twin_client @@ -297,7 +296,7 @@ mod cartographer_tests { poll_interval: Duration::from_secs(1), }; - let result = uut.get_mapping_as_signals().await; + let result = uut.get_mapping_as_signal_patches().await; assert!(result.is_ok()); let mut signals = result.unwrap(); assert_eq!(signals.len(), 1); @@ -305,14 +304,14 @@ mod cartographer_tests { assert_eq!(signal.id, ID.to_string()); assert_eq!(signal.target.metadata, test_map_entry.target); assert_eq!( - signal.emission.policy.interval_ms, + signal.emission_policy.interval_ms, test_map_entry.interval_ms ); assert_eq!( - signal.emission.policy.emit_only_if_changed, + signal.emission_policy.emit_only_if_changed, test_map_entry.emit_on_change ); - assert_eq!(signal.emission.policy.conversion, test_map_entry.conversion); + assert_eq!(signal.emission_policy.conversion, test_map_entry.conversion); } #[tokio::test] @@ -327,7 +326,7 @@ mod cartographer_tests { protocol: "protocol".to_string(), }; - let test_signal = &mut Signal { + let test_signal_patch = &mut SignalPatch { id: ID.to_string(), ..Default::default() }; @@ -353,12 +352,12 @@ mod cartographer_tests { poll_interval: Duration::from_secs(1), }; - let result = uut.populate_entity(test_signal).await; + let result = uut.populate_entity(test_signal_patch).await; let join_result = listener_handler.await; assert!(result.is_ok()); assert!(join_result.is_ok()); - assert_eq!(test_signal.source, test_entity); + assert_eq!(test_signal_patch.source, test_entity); let proxy_request = join_result.unwrap(); assert!(proxy_request.is_some()); diff --git a/freyja/src/emitter.rs b/freyja/src/emitter.rs index 3e00eae6..93a0ed71 100644 --- a/freyja/src/emitter.rs +++ b/freyja/src/emitter.rs @@ -21,8 +21,7 @@ use freyja_contracts::{ const DEFAULT_SLEEP_INTERVAL_MS: u64 = 1000; -/// Data emitter for the digital twin sync project -/// Emits sensor data at regular intervals as defined by the map +/// Emits sensor data at regular intervals as configured in the store pub struct Emitter { /// The shared signal store signals: Arc, @@ -38,7 +37,7 @@ pub struct Emitter { } impl Emitter { - /// Creates a new instance of emitter + /// Creates a new instance of the Emitter /// /// # Arguments /// - `signals`: the shared signal store @@ -172,7 +171,7 @@ impl Emitter { } } - /// Applies conversion implicitly to a signal value and sends it to the cloud + /// Applies a conversion implicitly to a signal value and sends it to the cloud /// /// # Arguments /// - `signal`: The signal to emit diff --git a/freyja/src/main.rs b/freyja/src/main.rs index 8339bc59..ab18759c 100644 --- a/freyja/src/main.rs +++ b/freyja/src/main.rs @@ -57,7 +57,6 @@ async fn main() -> Result<(), Box> { .target(Target::Stdout) .init(); - // Setup shared resources let signal_store = Arc::new(SignalStore::new()); let (tx_provider_proxy_selector_request, rx_provider_proxy_selector_request) = mpsc::unbounded_channel::(); @@ -76,7 +75,6 @@ async fn main() -> Result<(), Box> { // Setup emitter let signal_values_queue: Arc> = Arc::new(SegQueue::new()); - let emitter = Emitter::new( signal_store.clone(), CloudAdapterImpl::create_new().unwrap(),