Skip to content

Commit

Permalink
Make data adapters pluggable in freyja_main (#114)
Browse files Browse the repository at this point in the history
Updates the `freyja_main` function and the `freyja_main!` macro to add support for pluggable data adapters.

Closes #19
  • Loading branch information
wilyle authored Jan 19, 2024
1 parent 3e12ab3 commit 4251a2d
Show file tree
Hide file tree
Showing 12 changed files with 165 additions and 44 deletions.
6 changes: 5 additions & 1 deletion common/src/data_adapter_selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,12 @@ use crate::{data_adapter::DataAdapterFactory, entity::Entity};
#[async_trait]
pub trait DataAdapterSelector {
/// Registers a `DataAdapterFactory` with this selector.
fn register<TFactory: DataAdapterFactory + Send + Sync + 'static>(
///
/// # Arguments
/// - `factory`: the factory to register
fn register(
&mut self,
factory: Box<dyn DataAdapterFactory + Send + Sync + 'static>,
) -> Result<(), DataAdapterSelectorError>;

/// Updates an existing data adapter to include an entity if possible,
Expand Down
11 changes: 5 additions & 6 deletions freyja/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,6 @@ proc-macros = { workspace = true }
time = { workspace = true }
tokio = { workspace = true }

grpc-data-adapter = { workspace = true }
http-mock-data-adapter = { workspace = true }
in-memory-mock-data-adapter = { workspace = true }
managed-subscribe-data-adapter = { workspace = true }
mqtt-data-adapter = { workspace = true }

[dev-dependencies]
# Dependencies for testing
mockall = { workspace = true }
Expand All @@ -34,3 +28,8 @@ in-memory-mock-digital-twin-adapter = { workspace = true }
in-memory-mock-mapping-adapter = { workspace = true }
mock-digital-twin-adapter = { workspace = true }
mock-mapping-service-adapter = { workspace = true }
grpc-data-adapter = { workspace = true }
http-mock-data-adapter = { workspace = true }
in-memory-mock-data-adapter = { workspace = true }
managed-subscribe-data-adapter = { workspace = true }
mqtt-data-adapter = { workspace = true }
34 changes: 31 additions & 3 deletions freyja/examples/in-memory-with-fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,46 @@
// Licensed under the MIT license.
// SPDX-License-Identifier: MIT

use freyja_common::data_adapter::DataAdapterFactory;
use grpc_data_adapter::grpc_data_adapter_factory::GRPCDataAdapterFactory;
use http_mock_data_adapter::http_mock_data_adapter_factory::HttpMockDataAdapterFactory;
use in_memory_mock_cloud_adapter::in_memory_mock_cloud_adapter::InMemoryMockCloudAdapter;
use in_memory_mock_data_adapter::in_memory_mock_data_adapter_factory::InMemoryMockDataAdapterFactory;
use in_memory_mock_digital_twin_adapter::in_memory_mock_digital_twin_adapter::InMemoryMockDigitalTwinAdapter;
use in_memory_mock_mapping_adapter::in_memory_mock_mapping_adapter::InMemoryMockMappingAdapter;
use managed_subscribe_data_adapter::managed_subscribe_data_adapter_factory::ManagedSubscribeDataAdapterFactory;
use mqtt_data_adapter::mqtt_data_adapter_factory::MqttDataAdapterFactory;

// This example shows how you can use the freyja_main function manually rather than using the freyja_main! macro.
// This is useful when you need to do some additional work such as complex adapter setup or dependency resolution before invoking freyja_main.
// The following code is functionally equivalent to the expanded macro.
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// This example shows how you can use the freyja_main function manually rather than using the freyja_main! macro.
// This is useful when you need to do some additional work such as complex adapter setup or dependency resolution before invoking freyja_main.
let factories: Vec<Box<dyn DataAdapterFactory + Send + Sync>> = vec![
Box::new(
GRPCDataAdapterFactory::create_new().expect("Could not create GRPCDataAdapterFactory"),
),
Box::new(
HttpMockDataAdapterFactory::create_new()
.expect("Could not create HttpMockDataAdapterFactory"),
),
Box::new(
InMemoryMockDataAdapterFactory::create_new()
.expect("Could not create InMemoryMockDataAdapterFactory"),
),
Box::new(
ManagedSubscribeDataAdapterFactory::create_new()
.expect("Could not create ManagedSubscribeDataAdapterFactory"),
),
Box::new(
MqttDataAdapterFactory::create_new().expect("Could not create MqttDataAdapterFactory"),
),
];

freyja::freyja_main::<
InMemoryMockDigitalTwinAdapter,
InMemoryMockCloudAdapter,
InMemoryMockMappingAdapter,
>()
>(factories)
.await
}
8 changes: 7 additions & 1 deletion freyja/examples/in-memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@
// SPDX-License-Identifier: MIT

use in_memory_mock_cloud_adapter::in_memory_mock_cloud_adapter::InMemoryMockCloudAdapter;
use in_memory_mock_data_adapter::in_memory_mock_data_adapter_factory::InMemoryMockDataAdapterFactory;
use in_memory_mock_digital_twin_adapter::in_memory_mock_digital_twin_adapter::InMemoryMockDigitalTwinAdapter;
use in_memory_mock_mapping_adapter::in_memory_mock_mapping_adapter::InMemoryMockMappingAdapter;

freyja::freyja_main! {InMemoryMockDigitalTwinAdapter, InMemoryMockCloudAdapter, InMemoryMockMappingAdapter}
freyja::freyja_main! {
InMemoryMockDigitalTwinAdapter,
InMemoryMockCloudAdapter,
InMemoryMockMappingAdapter,
[InMemoryMockDataAdapterFactory],
}
8 changes: 7 additions & 1 deletion freyja/examples/mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,14 @@
// Licensed under the MIT license.
// SPDX-License-Identifier: MIT

use http_mock_data_adapter::http_mock_data_adapter_factory::HttpMockDataAdapterFactory;
use in_memory_mock_cloud_adapter::in_memory_mock_cloud_adapter::InMemoryMockCloudAdapter;
use mock_digital_twin_adapter::mock_digital_twin_adapter::MockDigitalTwinAdapter;
use mock_mapping_service_adapter::mock_mapping_service_adapter::MockMappingServiceAdapter;

freyja::freyja_main! {MockDigitalTwinAdapter, InMemoryMockCloudAdapter, MockMappingServiceAdapter}
freyja::freyja_main! {
MockDigitalTwinAdapter,
InMemoryMockCloudAdapter,
MockMappingServiceAdapter,
[HttpMockDataAdapterFactory],
}
2 changes: 1 addition & 1 deletion freyja/src/cartographer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ mod cartographer_tests {

#[async_trait]
impl DataAdapterSelector for DataAdapterSelector {
fn register<TFactory: DataAdapterFactory + Send + Sync + 'static>(&mut self) -> Result<(), DataAdapterSelectorError>;
fn register(&mut self, factory: Box<dyn DataAdapterFactory + Send + Sync + 'static>) -> Result<(), DataAdapterSelectorError>;
async fn create_or_update_adapter(&self, entity: &Entity) -> Result<(), DataAdapterSelectorError>;
async fn request_entity_value(&self, entity_id: &str) -> Result<(), DataAdapterSelectorError>;
}
Expand Down
13 changes: 8 additions & 5 deletions freyja/src/data_adapter_selector_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,14 @@ impl DataAdapterSelectorImpl {
#[async_trait]
impl DataAdapterSelector for DataAdapterSelectorImpl {
/// Registers a `DataAdapterFactory` with this selector.
fn register<TFactory: DataAdapterFactory + Send + Sync + 'static>(
///
/// # Arguments
/// - `factory`: the factory to register
fn register(
&mut self,
factory: Box<dyn DataAdapterFactory + Send + Sync + 'static>,
) -> Result<(), DataAdapterSelectorError> {
let factory =
TFactory::create_new().map_err(DataAdapterSelectorError::data_adapter_error)?;
self.factories.push(Box::new(factory));
self.factories.push(factory);
Ok(())
}

Expand Down Expand Up @@ -232,7 +234,8 @@ mod data_adapter_selector_tests {
async fn handle_start_data_adapter_request_return_err_test() {
let signals: Arc<SignalStore> = Arc::new(SignalStore::new());
let mut uut = DataAdapterSelectorImpl::new(signals);
uut.register::<GRPCDataAdapterFactory>().unwrap();
uut.register(Box::new(GRPCDataAdapterFactory::create_new().unwrap()))
.unwrap();

let entity = Entity {
id: String::from(AMBIENT_AIR_TEMPERATURE_ID),
Expand Down
2 changes: 1 addition & 1 deletion freyja/src/emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ mod emitter_tests {

#[async_trait]
impl DataAdapterSelector for DataAdapterSelector {
fn register<TFactory: DataAdapterFactory + Send + Sync + 'static>(&mut self) -> Result<(), DataAdapterSelectorError>;
fn register(&mut self, factory: Box<dyn DataAdapterFactory + Send + Sync + 'static>) -> Result<(), DataAdapterSelectorError>;
async fn create_or_update_adapter(&self, entity: &Entity) -> Result<(), DataAdapterSelectorError>;
async fn request_entity_value(&self, entity_id: &str) -> Result<(), DataAdapterSelectorError>;
}
Expand Down
21 changes: 9 additions & 12 deletions freyja/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,22 @@ use emitter::Emitter;
use freyja_common::{
cloud_adapter::CloudAdapter,
cmd_utils::{get_log_level, parse_args},
data_adapter::DataAdapterFactory,
data_adapter_selector::DataAdapterSelector,
digital_twin_adapter::DigitalTwinAdapter,
mapping_adapter::MappingAdapter,
signal_store::SignalStore,
};

use grpc_data_adapter::grpc_data_adapter_factory::GRPCDataAdapterFactory;
use http_mock_data_adapter::http_mock_data_adapter_factory::HttpMockDataAdapterFactory;
use in_memory_mock_data_adapter::in_memory_mock_data_adapter_factory::InMemoryMockDataAdapterFactory;
use managed_subscribe_data_adapter::managed_subscribe_data_adapter_factory::ManagedSubscribeDataAdapterFactory;
use mqtt_data_adapter::mqtt_data_adapter_factory::MqttDataAdapterFactory;

use crate::data_adapter_selector_impl::DataAdapterSelectorImpl;

pub async fn freyja_main<
TDigitalTwinAdapter: DigitalTwinAdapter,
TCloudAdapter: CloudAdapter,
TMappingAdapter: MappingAdapter,
>() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
>(
factories: Vec<Box<dyn DataAdapterFactory + Send + Sync>>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let args = parse_args(env::args()).expect("Failed to parse args");

// Setup logging
Expand All @@ -51,11 +48,11 @@ pub async fn freyja_main<
let signal_store = Arc::new(SignalStore::new());

let mut data_adapter_selector = DataAdapterSelectorImpl::new(signal_store.clone());
data_adapter_selector.register::<GRPCDataAdapterFactory>()?;
data_adapter_selector.register::<HttpMockDataAdapterFactory>()?;
data_adapter_selector.register::<InMemoryMockDataAdapterFactory>()?;
data_adapter_selector.register::<ManagedSubscribeDataAdapterFactory>()?;
data_adapter_selector.register::<MqttDataAdapterFactory>()?;
for factory in factories.into_iter() {
data_adapter_selector
.register(factory)
.expect("Could not register factory");
}

let data_adapter_selector = Arc::new(Mutex::new(data_adapter_selector));

Expand Down
16 changes: 15 additions & 1 deletion proc_macros/src/freyja_main/generate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,27 @@ pub(crate) fn generate(ir: FreyjaMainOutput) -> TokenStream {
dt_adapter_type,
cloud_adapter_type,
mapping_adapter_type,
data_adapter_factory_types,
},
} = ir;

quote! {
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
freyja::freyja_main::<#dt_adapter_type, #cloud_adapter_type, #mapping_adapter_type>().await
use freyja_common::data_adapter::DataAdapterFactory;
let factories: Vec<Box<dyn DataAdapterFactory + Send + Sync>> = vec![
#(Box::new(
#data_adapter_factory_types::create_new()
.expect(concat!("Could not create ", stringify!(#data_adapter_factory_types)))
)),*
];

freyja::freyja_main::<
#dt_adapter_type,
#cloud_adapter_type,
#mapping_adapter_type
>(factories)
.await
}
}
}
76 changes: 66 additions & 10 deletions proc_macros/src/freyja_main/parse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// SPDX-License-Identifier: MIT

use proc_macro2::TokenStream;
use syn::bracketed;
use syn::parse::{Parse, ParseStream};
use syn::{punctuated::Punctuated, Ident, Token};

Expand All @@ -21,6 +22,7 @@ pub(crate) struct FreyjaMainArgs {
pub dt_adapter_type: Ident,
pub cloud_adapter_type: Ident,
pub mapping_adapter_type: Ident,
pub data_adapter_factory_types: Vec<Ident>,
}

impl Parse for FreyjaMainArgs {
Expand All @@ -30,16 +32,36 @@ impl Parse for FreyjaMainArgs {
///
/// - `input`: the input stream
fn parse(input: ParseStream) -> syn::Result<Self> {
let mut args = Punctuated::<Ident, Token![,]>::parse_terminated(input)?.into_iter();

if args.len() != 3 {
panic!("Expected exactly three arguments to freyja_main");
let dt_adapter_type = input.parse::<Ident>().unwrap();
let _ = input.parse::<Token![,]>().unwrap();
let cloud_adapter_type = input.parse::<Ident>().unwrap();
let _ = input.parse::<Token![,]>().unwrap();
let mapping_adapter_type = input.parse::<Ident>().unwrap();
let _ = input.parse::<Token![,]>().unwrap();

let data_adapter_content;
let _ = bracketed!(data_adapter_content in input);
let data_adapter_factory_types =
Punctuated::<Ident, Token![,]>::parse_terminated(&data_adapter_content)
.unwrap()
.into_iter()
.collect();

let trailing_comma_result = if !input.is_empty() {
Some(input.parse::<Token![,]>())
} else {
None
};

if !input.is_empty() || trailing_comma_result.is_some_and(|r| r.is_err()) {
panic!("Unexpected tokens at end of input");
}

Ok(FreyjaMainArgs {
dt_adapter_type: args.next().unwrap(),
cloud_adapter_type: args.next().unwrap(),
mapping_adapter_type: args.next().unwrap(),
dt_adapter_type,
cloud_adapter_type,
mapping_adapter_type,
data_adapter_factory_types,
})
}
}
Expand All @@ -56,25 +78,33 @@ mod freyja_main_parse_tests {
let foo_ident = format_ident!("Foo");
let bar_ident = format_ident!("Bar");
let baz_ident = format_ident!("Baz");
let factory_idents = vec![format_ident!("DA1"), format_ident!("DA2")];
let factory_idents_clone = factory_idents.clone();

let input = quote! { #foo_ident, #bar_ident, #baz_ident };
let input = quote! { #foo_ident, #bar_ident, #baz_ident, [#(#factory_idents),*] };
let output = parse(input);

assert_eq!(output.dt_adapter_type, foo_ident);
assert_eq!(output.cloud_adapter_type, bar_ident);
assert_eq!(output.mapping_adapter_type, baz_ident);
for ident in factory_idents.iter() {
assert!(output.data_adapter_factory_types.contains(ident));
}

// Now try a different order
let input = quote! { #baz_ident, #foo_ident, #bar_ident };
let input = quote! { #baz_ident, #foo_ident, #bar_ident, [#(#factory_idents_clone),*] };
let output = parse(input);

assert_eq!(output.dt_adapter_type, baz_ident);
assert_eq!(output.cloud_adapter_type, foo_ident);
assert_eq!(output.mapping_adapter_type, bar_ident);
for ident in factory_idents {
assert!(output.data_adapter_factory_types.contains(&ident));
}
}

#[test]
fn parse_panics_with_incorrect_number_of_arguments() {
fn parse_panics_with_invalid_input() {
let foo_ident = format_ident!("Foo");
let bar_ident = format_ident!("Bar");
let baz_ident = format_ident!("Baz");
Expand All @@ -88,4 +118,30 @@ mod freyja_main_parse_tests {
let result = catch_unwind(|| parse(input));
assert!(result.is_err());
}

#[test]
fn parse_accepts_trailing_comma() {
let foo_ident = format_ident!("Foo");
let bar_ident = format_ident!("Bar");
let baz_ident = format_ident!("Baz");
let factory_idents = vec![format_ident!("DA1"), format_ident!("DA2")];

let input = quote! { #foo_ident, #bar_ident, #baz_ident, [#(#factory_idents),*], };
let result = catch_unwind(|| parse(input));
assert!(result.is_ok());
}

#[test]
fn parse_panics_with_invalid_trailing_content() {
let foo_ident = format_ident!("Foo");
let bar_ident = format_ident!("Bar");
let baz_ident = format_ident!("Baz");
let factory_idents = vec![format_ident!("DA1"), format_ident!("DA2")];
let qux_ident = format_ident!("Qux");

let input =
quote! { #foo_ident, #bar_ident, #baz_ident, [#(#factory_idents),*], #qux_ident };
let result = catch_unwind(|| parse(input));
assert!(result.is_err());
}
}
12 changes: 10 additions & 2 deletions proc_macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ pub fn error(ts: TokenStream) -> TokenStream {
///
/// *FreyjaMainPredicate*:
///
/// &nbsp;&nbsp;&nbsp;&nbsp;*DigitalTwinAdapterType* `,` *CloudAdapterType* `,` *MappingClientType*
/// &nbsp;&nbsp;&nbsp;&nbsp;*DigitalTwinAdapterType* `,` *CloudAdapterType* `,` *MappingAdapterType* `, [` *DataAdapterFactoryTypeList* `]`
///
/// *DigitalTwinAdapterType*:
///
Expand All @@ -93,7 +93,15 @@ pub fn error(ts: TokenStream) -> TokenStream {
///
/// &nbsp;&nbsp;&nbsp;&nbsp;IDENTIFIER
///
/// *MappingClientType*:
/// *MappingAdapterType*:
///
/// &nbsp;&nbsp;&nbsp;&nbsp;IDENTIFIER
///
/// *DataAdapterFactoryTypeList*:
///
/// &nbsp;&nbsp;&nbsp;&nbsp;*DataAdapterFactoryType* (`,` *DataAdapterFactoryTypeList*)
///
/// *DataAdapterFactoryType*:
///
/// &nbsp;&nbsp;&nbsp;&nbsp;IDENTIFIER
///
Expand Down

0 comments on commit 4251a2d

Please sign in to comment.