Skip to content

Commit

Permalink
clippy + fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
wilyle committed Sep 1, 2023
1 parent 37e1252 commit bcb3b98
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ impl CloudAdapter for InMemoryMockCloudAdapter {
/// Creates a new instance of a CloudAdapter with default settings
fn create_new() -> Result<Self, CloudAdapterError> {
Self::from_config_file(Path::new(env!("OUT_DIR")).join(CONFIG_FILE))
//.map(|r| Box::new(r) as _)
}

/// Sends the signal to the cloud
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,7 @@ impl DigitalTwinAdapter for MockDigitalTwinAdapter {
let settings: Settings = serde_json::from_str(settings_content.as_str())
.map_err(DigitalTwinAdapterError::deserialize)?;

Ok(Self::with_uri(
&settings.base_uri_for_digital_twin_server,
))
Ok(Self::with_uri(&settings.base_uri_for_digital_twin_server))
}

/// Gets the info of an entity via an HTTP request.
Expand Down
41 changes: 30 additions & 11 deletions freyja/src/cartographer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ use log::{info, warn};

use freyja_contracts::{
conversion::Conversion,
digital_twin_adapter::{DigitalTwinAdapter, GetDigitalTwinProviderRequest, DigitalTwinAdapterError, DigitalTwinAdapterErrorKind},
digital_twin_adapter::{
DigitalTwinAdapter, DigitalTwinAdapterError, DigitalTwinAdapterErrorKind,
GetDigitalTwinProviderRequest,
},
mapping_client::{CheckForWorkRequest, GetMappingRequest, MappingClient},
provider_proxy_request::{
ProviderProxySelectorRequestKind, ProviderProxySelectorRequestSender,
Expand All @@ -36,7 +39,9 @@ pub struct Cartographer<TMappingClient, TDigitalTwinAdapter> {
poll_interval: Duration,
}

impl<TMappingClient: MappingClient, TDigitalTwinAdapter: DigitalTwinAdapter> Cartographer<TMappingClient, TDigitalTwinAdapter> {
impl<TMappingClient: MappingClient, TDigitalTwinAdapter: DigitalTwinAdapter>
Cartographer<TMappingClient, TDigitalTwinAdapter>
{
/// Create a new instance of a Cartographer
///
/// # Arguments
Expand Down Expand Up @@ -72,7 +77,7 @@ impl<TMappingClient: MappingClient, TDigitalTwinAdapter: DigitalTwinAdapter> Car
.mapping_client
.check_for_work(CheckForWorkRequest {})
.await;

if mapping_client_result.is_err() {
log::error!("Failed to check for mapping work; will try again later. Error: {mapping_client_result:?}");
continue;
Expand All @@ -89,7 +94,7 @@ impl<TMappingClient: MappingClient, TDigitalTwinAdapter: DigitalTwinAdapter> Car
log::error!("Falied to get mapping from mapping client: {signals_result:?}");
continue;
}

let mut signals = signals_result.unwrap();
let mut failed_signals = Vec::new();

Expand All @@ -103,23 +108,37 @@ impl<TMappingClient: MappingClient, TDigitalTwinAdapter: DigitalTwinAdapter> Car
let populate_result = self.populate_entity(signal).await;

if populate_result.is_err() {
match populate_result.err().unwrap().downcast::<DigitalTwinAdapterError>() {
match populate_result
.err()
.unwrap()
.downcast::<DigitalTwinAdapterError>()
{
Ok(e) if e.kind() == DigitalTwinAdapterErrorKind::EntityNotFound => {
warn!("Entity not found for signal {}", signal.id);
},
}
Ok(e) => {
log::error!("Error fetching entity for signal {}: {e:?}", signal.id);
},
log::error!(
"Error fetching entity for signal {}: {e:?}",
signal.id
);
}
Err(e) => {
log::error!("Error fetching entity for signal {}: {e:?}", signal.id);
},
log::error!(
"Error fetching entity for signal {}: {e:?}",
signal.id
);
}
}

failed_signals.push(signal.id.clone());
}
}

self.signals.sync(signals.into_iter().filter(|s| !failed_signals.contains(&s.id)));
self.signals.sync(
signals
.into_iter()
.filter(|s| !failed_signals.contains(&s.id)),
);
}

tokio::time::sleep(self.poll_interval).await;
Expand Down
55 changes: 31 additions & 24 deletions freyja/src/emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl<TCloudAdapter: CloudAdapter> Emitter<TCloudAdapter> {

/// Performs data emissions of the provided signals.
/// Returns the amount of time that the main emitter loop should sleep before the next iteration.
///
///
/// # Arguments
/// - `signals`: The set of signals to emit
async fn emit_data(&self, signals: Vec<Signal>) -> Result<u64, EmitterError> {
Expand Down Expand Up @@ -125,7 +125,8 @@ impl<TCloudAdapter: CloudAdapter> Emitter<TCloudAdapter> {
entity_id: signal.id.clone(),
};

let proxy_result = self.provider_proxy_selector_client
let proxy_result = self
.provider_proxy_selector_client
.send_request_to_provider_proxy_selector(request)
.map_err(EmitterError::provider_proxy_error);

Expand Down Expand Up @@ -157,7 +158,11 @@ impl<TCloudAdapter: CloudAdapter> Emitter<TCloudAdapter> {
let send_to_cloud_result = self.send_to_cloud(signal).await;

if send_to_cloud_result.is_err() {
log::error!("Error sending data to cloud while processing signal {}: {:?}", signal_id, send_to_cloud_result.err());
log::error!(
"Error sending data to cloud while processing signal {}: {:?}",
signal_id,
send_to_cloud_result.err()
);
}
}

Expand Down Expand Up @@ -221,7 +226,10 @@ proc_macros::error! {
mod emitter_tests {
use super::*;
use async_trait::async_trait;
use freyja_contracts::{cloud_adapter::{CloudAdapterError, CloudAdapterErrorKind}, signal::{Emission, EmissionPolicy}};
use freyja_contracts::{
cloud_adapter::{CloudAdapterError, CloudAdapterErrorKind},
signal::{Emission, EmissionPolicy},
};
use mockall::*;
use tokio::sync::mpsc;

Expand Down Expand Up @@ -267,8 +275,7 @@ mod emitter_tests {
let listener_handler = tokio::spawn(async move { rx.recv().await });

let mut mock_cloud_adapter = MockCloudAdapterImpl::new();
mock_cloud_adapter.expect_send_to_cloud()
.never();
mock_cloud_adapter.expect_send_to_cloud().never();

let mut uut = Emitter {
signals: Arc::new(SignalStore::new()),
Expand Down Expand Up @@ -313,9 +320,10 @@ mod emitter_tests {
let listener_handler = tokio::spawn(async move { rx.recv().await });

let mut mock_cloud_adapter = MockCloudAdapterImpl::new();
mock_cloud_adapter.expect_send_to_cloud()
mock_cloud_adapter
.expect_send_to_cloud()
.once()
.returning(|_| Ok(CloudMessageResponse { }));
.returning(|_| Ok(CloudMessageResponse {}));

let mut uut = Emitter {
signals: Arc::new(SignalStore::new()),
Expand Down Expand Up @@ -358,8 +366,7 @@ mod emitter_tests {
let listener_handler = tokio::spawn(async move { rx.recv().await });

let mut mock_cloud_adapter = MockCloudAdapterImpl::new();
mock_cloud_adapter.expect_send_to_cloud()
.never();
mock_cloud_adapter.expect_send_to_cloud().never();

let mut uut = Emitter {
signals: Arc::new(SignalStore::new()),
Expand Down Expand Up @@ -402,8 +409,7 @@ mod emitter_tests {
let listener_handler = tokio::spawn(async move { rx.recv().await });

let mut mock_cloud_adapter = MockCloudAdapterImpl::new();
mock_cloud_adapter.expect_send_to_cloud()
.never();
mock_cloud_adapter.expect_send_to_cloud().never();

let mut uut = Emitter {
signals: Arc::new(SignalStore::new()),
Expand All @@ -423,7 +429,6 @@ mod emitter_tests {
emit_only_if_changed: true,
..Default::default()
},
..Default::default()
},
..Default::default()
};
Expand All @@ -449,9 +454,10 @@ mod emitter_tests {
let listener_handler = tokio::spawn(async move { rx.recv().await });

let mut mock_cloud_adapter = MockCloudAdapterImpl::new();
mock_cloud_adapter.expect_send_to_cloud()
mock_cloud_adapter
.expect_send_to_cloud()
.once()
.returning(|_| Ok(CloudMessageResponse { }));
.returning(|_| Ok(CloudMessageResponse {}));

let mut uut = Emitter {
signals: Arc::new(SignalStore::new()),
Expand All @@ -470,7 +476,6 @@ mod emitter_tests {
emit_only_if_changed: true,
..Default::default()
},
..Default::default()
},
..Default::default()
};
Expand All @@ -496,9 +501,10 @@ mod emitter_tests {
let listener_handler = tokio::spawn(async move { rx.recv().await });

let mut mock_cloud_adapter = MockCloudAdapterImpl::new();
mock_cloud_adapter.expect_send_to_cloud()
mock_cloud_adapter
.expect_send_to_cloud()
.once()
.returning(|_| Ok(CloudMessageResponse { }));
.returning(|_| Ok(CloudMessageResponse {}));

let mut uut = Emitter {
signals: Arc::new(SignalStore::new()),
Expand All @@ -517,7 +523,6 @@ mod emitter_tests {
emit_only_if_changed: true,
..Default::default()
},
..Default::default()
},
..Default::default()
};
Expand All @@ -540,7 +545,8 @@ mod emitter_tests {
let provider_proxy_selector_client = ProviderProxySelectorRequestSender::new(tx);

let mut mock_cloud_adapter = MockCloudAdapterImpl::new();
mock_cloud_adapter.expect_send_to_cloud()
mock_cloud_adapter
.expect_send_to_cloud()
.times(2)
.returning(|_| Err(CloudAdapterErrorKind::Unknown.into()));

Expand Down Expand Up @@ -570,10 +576,11 @@ mod emitter_tests {

let (tx, _) = mpsc::unbounded_channel::<ProviderProxySelectorRequestKind>();
let provider_proxy_selector_client = ProviderProxySelectorRequestSender::new(tx);

let mut mock_cloud_adapter = MockCloudAdapterImpl::new();
mock_cloud_adapter.expect_send_to_cloud()
.returning(|_| Ok(CloudMessageResponse { }));
mock_cloud_adapter
.expect_send_to_cloud()
.returning(|_| Ok(CloudMessageResponse {}));

let test_signal = Signal {
id: ID.to_string(),
Expand Down Expand Up @@ -609,4 +616,4 @@ mod emitter_tests {
assert!(signal.emission.last_emitted_value.is_some());
assert_eq!(signal.emission.next_emission_ms, INTERVAL);
}
}
}

0 comments on commit bcb3b98

Please sign in to comment.