Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
wilyle committed Sep 1, 2023
1 parent 693415f commit 86a62c3
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 49 deletions.
85 changes: 42 additions & 43 deletions freyja/src/cartographer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use freyja_contracts::{
provider_proxy_request::{
ProviderProxySelectorRequestKind, ProviderProxySelectorRequestSender,
},
signal::{Emission, EmissionPolicy, Signal, Target},
signal::{EmissionPolicy, Target, SignalPatch},
};

/// Manages mappings from the mapping service
Expand Down Expand Up @@ -47,6 +47,8 @@ impl<TMappingClient: MappingClient, TDigitalTwinAdapter: DigitalTwinAdapter>
/// # Arguments
/// - `signals`: the shared signal store
/// - `mapping_client`: the client for the mapping service
/// - `digital_twin_client`: the client for the digital twin service
/// - `provider_proxy_selector_client`: the client for the provider proxy selector
/// - `poll_interval`: the interval at which the cartographer should poll for changes
pub fn new(
signals: Arc<SignalStore>,
Expand All @@ -66,11 +68,13 @@ impl<TMappingClient: MappingClient, TDigitalTwinAdapter: DigitalTwinAdapter>

/// Run the cartographer. This will do the following in a loop:
///
/// 1. Check to see if the mapping service has more work. If not, skip to step 5
/// 2. Send the new inventory to the mapping service
/// 3. Get the new mapping from the mapping service
/// 4. Update the shared map for the emitter
/// 5. Sleep until the next iteration
/// 1. Check to see if the mapping service has more work. If not, skip to the last step
/// 1. ~~Send the new inventory to the mapping service~~
/// 1. Get the new mapping from the mapping service
/// 1. Query the digital twin service for entity information
/// 1. Create or update provider proxies for the new entities
/// 1. Update the signal store with the new data
/// 1. Sleep until the next iteration
pub async fn run(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
loop {
let mapping_client_result = self
Expand All @@ -89,23 +93,20 @@ impl<TMappingClient: MappingClient, TDigitalTwinAdapter: DigitalTwinAdapter>
// TODO: will this notion of checking and sending inventory exist?
// self.mapping_client.send_inventory(SendInventoryRequest { inventory: self.known_providers.clone() }).await?;

let signals_result = self.get_mapping_as_signals().await;
if signals_result.is_err() {
log::error!("Falied to get mapping from mapping client: {signals_result:?}");
let patches_result = self.get_mapping_as_signal_patches().await;
if patches_result.is_err() {
log::error!("Falied to get mapping from mapping client: {patches_result:?}");
continue;
}

let mut signals = signals_result.unwrap();
let mut patches = patches_result.unwrap();
let mut failed_signals = Vec::new();

// Some of these API calls are not really necessary, but this code gets executed
// infrequently enough that the sub-optimal performance is not a major concern.
// If Ibeji had a bulk find_by_id API there would be even less of a concern.
// TODO: punt stuff to the dt client and we call find_all here
// TODO: if there's a bulk api for providers then there probably needs to be a bulk api for proxies
// TODO: handle errors
for signal in signals.iter_mut() {
let populate_result = self.populate_entity(signal).await;
for patch in patches.iter_mut() {
// Many of the API calls in populate_entity are probably unnecessary, but this code gets executed
// infrequently enough that the sub-optimal performance is not a major concern.
// A bulk find_by_id API in the digital twin service would make this a non-issue
let populate_result = self.populate_entity(patch).await;

if populate_result.is_err() {
match populate_result
Expand All @@ -114,28 +115,28 @@ impl<TMappingClient: MappingClient, TDigitalTwinAdapter: DigitalTwinAdapter>
.downcast::<DigitalTwinAdapterError>()
{
Ok(e) if e.kind() == DigitalTwinAdapterErrorKind::EntityNotFound => {
warn!("Entity not found for signal {}", signal.id);
warn!("Entity not found for signal {}", patch.id);
}
Ok(e) => {
log::error!(
"Error fetching entity for signal {}: {e:?}",
signal.id
patch.id
);
}
Err(e) => {
log::error!(
"Error fetching entity for signal {}: {e:?}",
signal.id
patch.id
);
}
}

failed_signals.push(signal.id.clone());
failed_signals.push(patch.id.clone());
}
}

self.signals.sync(
signals
patches
.into_iter()
.filter(|s| !failed_signals.contains(&s.id)),
);
Expand All @@ -145,37 +146,35 @@ impl<TMappingClient: MappingClient, TDigitalTwinAdapter: DigitalTwinAdapter>
}
}

/// Gets the mapping from the mapping client and returns a corresponding list of signals.
async fn get_mapping_as_signals(
/// Gets the mapping from the mapping client and returns a corresponding list of signal patches.
async fn get_mapping_as_signal_patches(
&self,
) -> Result<Vec<Signal>, Box<dyn std::error::Error + Send + Sync>> {
) -> Result<Vec<SignalPatch>, Box<dyn std::error::Error + Send + Sync>> {
Ok(self
.mapping_client
.get_mapping(GetMappingRequest {})
.await?
.map
.into_iter()
.map(|(id, entry)| Signal {
.map(|(id, entry)| SignalPatch {
id,
// this gets populated later, set to default for now
source: Default::default(),
target: Target {
metadata: entry.target,
},
emission: Emission {
policy: EmissionPolicy {
interval_ms: entry.interval_ms,
emit_only_if_changed: entry.emit_on_change,
conversion: Conversion::default(),
},
..Default::default()
emission_policy: EmissionPolicy {
interval_ms: entry.interval_ms,
emit_only_if_changed: entry.emit_on_change,
conversion: Conversion::default(),
},
..Default::default()
})
.collect())
}

async fn populate_entity(
&self,
signal: &mut Signal,
signal: &mut SignalPatch,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
signal.source = self
.digital_twin_client
Expand Down Expand Up @@ -297,22 +296,22 @@ mod cartographer_tests {
poll_interval: Duration::from_secs(1),
};

let result = uut.get_mapping_as_signals().await;
let result = uut.get_mapping_as_signal_patches().await;
assert!(result.is_ok());
let mut signals = result.unwrap();
assert_eq!(signals.len(), 1);
let signal = signals.pop().unwrap();
assert_eq!(signal.id, ID.to_string());
assert_eq!(signal.target.metadata, test_map_entry.target);
assert_eq!(
signal.emission.policy.interval_ms,
signal.emission_policy.interval_ms,
test_map_entry.interval_ms
);
assert_eq!(
signal.emission.policy.emit_only_if_changed,
signal.emission_policy.emit_only_if_changed,
test_map_entry.emit_on_change
);
assert_eq!(signal.emission.policy.conversion, test_map_entry.conversion);
assert_eq!(signal.emission_policy.conversion, test_map_entry.conversion);
}

#[tokio::test]
Expand All @@ -327,7 +326,7 @@ mod cartographer_tests {
protocol: "protocol".to_string(),
};

let test_signal = &mut Signal {
let test_signal_patch = &mut SignalPatch {
id: ID.to_string(),
..Default::default()
};
Expand All @@ -353,12 +352,12 @@ mod cartographer_tests {
poll_interval: Duration::from_secs(1),
};

let result = uut.populate_entity(test_signal).await;
let result = uut.populate_entity(test_signal_patch).await;
let join_result = listener_handler.await;

assert!(result.is_ok());
assert!(join_result.is_ok());
assert_eq!(test_signal.source, test_entity);
assert_eq!(test_signal_patch.source, test_entity);

let proxy_request = join_result.unwrap();
assert!(proxy_request.is_some());
Expand Down
7 changes: 3 additions & 4 deletions freyja/src/emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ use freyja_contracts::{

const DEFAULT_SLEEP_INTERVAL_MS: u64 = 1000;

/// Data emitter for the digital twin sync project
/// Emits sensor data at regular intervals as defined by the map
/// Emits sensor data at regular intervals as configured in the store
pub struct Emitter<TCloudAdapter> {
/// The shared signal store
signals: Arc<SignalStore>,
Expand All @@ -38,7 +37,7 @@ pub struct Emitter<TCloudAdapter> {
}

impl<TCloudAdapter: CloudAdapter> Emitter<TCloudAdapter> {
/// Creates a new instance of emitter
/// Creates a new instance of the Emitter
///
/// # Arguments
/// - `signals`: the shared signal store
Expand Down Expand Up @@ -172,7 +171,7 @@ impl<TCloudAdapter: CloudAdapter> Emitter<TCloudAdapter> {
}
}

/// Applies conversion implicitly to a signal value and sends it to the cloud
/// Applies a conversion implicitly to a signal value and sends it to the cloud
///
/// # Arguments
/// - `signal`: The signal to emit
Expand Down
2 changes: 0 additions & 2 deletions freyja/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
.target(Target::Stdout)
.init();

// Setup shared resources
let signal_store = Arc::new(SignalStore::new());
let (tx_provider_proxy_selector_request, rx_provider_proxy_selector_request) =
mpsc::unbounded_channel::<ProviderProxySelectorRequestKind>();
Expand All @@ -76,7 +75,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {

// Setup emitter
let signal_values_queue: Arc<SegQueue<SignalValue>> = Arc::new(SegQueue::new());

let emitter = Emitter::new(
signal_store.clone(),
CloudAdapterImpl::create_new().unwrap(),
Expand Down

0 comments on commit 86a62c3

Please sign in to comment.