Skip to content

Commit

Permalink
Devkelley/find agemo w chariott (#53)
Browse files Browse the repository at this point in the history
* Add chariott support for Managed Subscribe

* Update README with how to run chariott

* fixed spacing errors

* fixed grammar

* Add enum for service's source

* updated readme and fixed whitespace issues

* minor change to discover_service_using_chariott

* Added comments for structs and enum definitions
  • Loading branch information
devkelley authored Oct 17, 2023
1 parent 09c7238 commit 3073a31
Show file tree
Hide file tree
Showing 7 changed files with 245 additions and 43 deletions.
2 changes: 2 additions & 0 deletions core/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ parking_lot = { workspace = true }
prost = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_derive = { workspace = true }
strum = { workspace = true }
strum_macros = { workspace = true }
regex = {workspace = true }
tokio = { workspace = true }
tonic = { workspace = true }
Expand Down
124 changes: 117 additions & 7 deletions core/common/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,49 @@
// Licensed under the MIT license.
// SPDX-License-Identifier: MIT

use config::{Config, File, FileFormat};
use log::debug;
use config::{Config, ConfigError, File, FileFormat};
use core_protobuf_data_access::chariott::service_discovery::core::v1::{
service_registry_client::ServiceRegistryClient, DiscoverRequest,
};
use log::{debug, info};
use serde_derive::Deserialize;
use std::future::Future;
use strum_macros::Display;
use tokio::time::{sleep, Duration};
use tonic::{Request, Status};

/// An identifier used when discovering a service through Chariott.
#[derive(Debug, Deserialize)]
pub struct ServiceIdentifier {
/// The namespace of the service.
pub namespace: String,
/// The name of the service.
pub name: String,
/// The version of the service.
pub version: String,
}

/// An enum representing where to discover a service's URI.
#[derive(Display, Debug, Deserialize)]
pub enum ServiceUriSource {
/// Use the local configuration settings to find the service's URI.
Local { service_uri: String },
/// Use Chariott to discover the service's URI.
Chariott { chariott_uri: String, service_identifier: ServiceIdentifier },
}

/// Load the settings.
///
/// # Arguments
/// * `config_filename` - Name of the config file to load settings from.
pub fn load_settings<T>(config_filename: &str) -> T
pub fn load_settings<T>(config_filename: &str) -> Result<T, ConfigError>
where
T: for<'de> serde::Deserialize<'de>,
{
let config =
Config::builder().add_source(File::new(config_filename, FileFormat::Yaml)).build().unwrap();

let settings: T = config.try_deserialize().unwrap();
Config::builder().add_source(File::new(config_filename, FileFormat::Yaml)).build()?;

settings
config.try_deserialize()
}

/// Retry a function that returns an error.
Expand Down Expand Up @@ -64,6 +88,92 @@ where
last_error
}

/// Use Chariott to discover a service.
///
/// # Arguments
/// * `chariott_uri` - Chariott's URI.
/// * `namespace` - The service's namespace.
/// * `name` - The service's name.
/// * `version` - The service's version.
/// # `expected_communication_kind` - The service's expected communication kind.
/// # `expected_communication_reference` - The service's expected communication reference.
pub async fn discover_service_using_chariott(
chariott_uri: &str,
namespace: &str,
name: &str,
version: &str,
expected_communication_kind: &str,
expected_communication_reference: &str,
) -> Result<String, Status> {
let mut client = ServiceRegistryClient::connect(chariott_uri.to_string())
.await
.map_err(|e| Status::internal(e.to_string()))?;

let request = Request::new(DiscoverRequest {
namespace: namespace.to_string(),
name: name.to_string(),
version: version.to_string(),
});

let response = client.discover(request).await?;

let service = response.into_inner().service.ok_or_else(|| Status::not_found("Did not find a service in Chariott with namespace '{namespace}', name '{name}' and version {version}"))?;

if service.communication_kind != expected_communication_kind
&& service.communication_reference != expected_communication_reference
{
Err(Status::not_found(
"Did not find a service in Chariott with namespace '{namespace}', name '{name}' and version {version} that has communication kind '{communication_kind} and communication_reference '{communication_reference}''",
))
} else {
Ok(service.uri)
}
}

/// Get a service's URI from settings or from Chariott.
///
/// # Arguments
/// * `service_uri_source` - Enum providing information on how to get the service URI.
/// # `expected_communication_kind` - The service's expected communication kind.
/// # `expected_communication_reference` - The service's expected communication reference.
pub async fn get_service_uri(
service_uri_source: ServiceUriSource,
expected_communication_kind: &str,
expected_communication_reference: &str,
) -> Result<String, Status> {
let result = match service_uri_source {
ServiceUriSource::Local { service_uri } => {
info!("URI set in settings.");
service_uri
}
ServiceUriSource::Chariott { chariott_uri, service_identifier } => {
info!("Retrieving URI from Chariott.");

execute_with_retry(
30,
Duration::from_secs(1),
|| {
discover_service_using_chariott(
&chariott_uri,
&service_identifier.namespace,
&service_identifier.name,
&service_identifier.version,
expected_communication_kind,
expected_communication_reference,
)
},
Some(format!(
"Attempting to discover service '{}' with chariott.",
service_identifier.name
)),
)
.await?
}
};

Ok(result)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
14 changes: 9 additions & 5 deletions core/invehicle-digital-twin/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use core_protobuf_data_access::chariott::service_discovery::core::v1::{
};
use core_protobuf_data_access::invehicle_digital_twin::v1::invehicle_digital_twin_server::InvehicleDigitalTwinServer;
use env_logger::{Builder, Target};
use futures::Future;
use log::{debug, error, info, LevelFilter};
use parking_lot::RwLock;
use std::boxed::Box;
Expand Down Expand Up @@ -89,10 +88,10 @@ async fn register_invehicle_digital_twin_service_with_chariott(
/// 5. Call and return from the block `.add_module()` on the server with the updated middleware and
/// module.
#[allow(unused_assignments, unused_mut)] // Necessary when no extra modules are built.
fn build_server_and_serve<S>(
async fn build_server_and_serve<S>(
addr: SocketAddr,
base_service: S,
) -> impl Future<Output = Result<(), tonic::transport::Error>>
) -> Result<(), Box<dyn std::error::Error>>
where
S: Service<http::Request<Body>, Response = http::Response<BoxBody>, Error = Infallible>
+ NamedService
Expand All @@ -107,7 +106,10 @@ where
// (1) Adds the Managed Subscribe module to the service.
let server = {
// (2) Initialize the Managed Subscribe module, which implements GrpcModule.
let managed_subscribe_module = ManagedSubscribeModule::new();
let managed_subscribe_module = ManagedSubscribeModule::new().await.map_err(|error| {
error!("Unable to create Managed Subscribe module.");
error
})?;

// (3) Create interceptor layer to be added to the server.
let managed_subscribe_layer =
Expand All @@ -117,6 +119,8 @@ where
let current_middleware = server.middleware.clone();
let new_middleware = current_middleware.layer(managed_subscribe_layer);

info!("Initialized Managed Subscribe module.");

// (5) Add the module with the updated middleware stack to the server.
server.add_module(new_middleware, Box::new(managed_subscribe_module))
};
Expand All @@ -125,7 +129,7 @@ where
let builder = server.construct_server().add_service(base_service);

// Start the server.
builder.serve(addr)
builder.serve(addr).await.map_err(|error| error.into())
}

#[tokio::main]
Expand Down
46 changes: 32 additions & 14 deletions core/module/managed_subscribe/src/managed_subscribe_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use core_protobuf_data_access::module::managed_subscribe::v1::{
};

use common::grpc_module::GrpcModule;
use common::utils::{execute_with_retry, load_settings};
use common::utils::{execute_with_retry, get_service_uri, load_settings, ServiceUriSource};
use log::{debug, error, info};
use parking_lot::RwLock;
use serde_derive::Deserialize;
Expand All @@ -37,6 +37,9 @@ use super::managed_subscribe_interceptor::ManagedSubscribeInterceptor;
const CONFIG_FILENAME: &str = "managed_subscribe_settings";
const SERVICE_PROTOCOL: &str = "grpc";

const MANAGED_SUBSCRIBE_COMMUNICATION_KIND: &str = "mqtt_v5";
const MANAGED_SUBSCRIBE_COMMUNICATION_REFERENCE: &str = "pubsub.v1.pubsub.proto";

// Managed Subscribe action constants.
const PUBLISH_ACTION: &str = "PUBLISH";
const STOP_PUBLISH_ACTION: &str = "STOP_PUBLISH";
Expand All @@ -58,43 +61,58 @@ pub enum TopicAction {
Delete,
}

/// Settings retrieved from a configuration file.
#[derive(Debug, Deserialize)]
pub struct ConfigSettings {
/// Where to host the Managed Subscribe module.
pub base_authority: String,
pub managed_subscribe_uri: String,
pub chariott_uri: Option<String>,
/// Where to retrieve the Managed Subscribe Service URI from.
pub managed_subscribe_uri_source: ServiceUriSource,
}

/// Struct that handles communication with the Managed Subscribe service.
#[derive(Clone, Debug)]
pub struct ManagedSubscribeModule {
/// The URI of the Managed Subscribe service.
pub managed_subscribe_uri: String,
/// The URI of the Managed Subscribe module.
pub service_uri: String,
/// The protocol used to communicate with the Managed Subscribe module.
pub service_protocol: String,
/// Shared store for the Managed Subscribe module.
pub store: Arc<RwLock<ManagedSubscribeStore>>,
}

impl Default for ManagedSubscribeModule {
fn default() -> Self {
Self::new()
}
}

impl ManagedSubscribeModule {
/// Creates a new managed subscribe module object.
pub fn new() -> Self {
pub async fn new() -> Result<Self, Status> {
// Get module information from the configuration settings.
let config = load_settings::<ConfigSettings>(CONFIG_FILENAME);
let config = load_settings::<ConfigSettings>(CONFIG_FILENAME).map_err(|error| {
Status::internal(format!(
"Unable to load 'Managed Subscribe' config with error: {error}."
))
})?;
let endpoint = config.base_authority;
let service_uri = format!("http://{endpoint}"); // Devskim: ignore DS137138

let store = Arc::new(RwLock::new(ManagedSubscribeStore::new()));

ManagedSubscribeModule {
managed_subscribe_uri: config.managed_subscribe_uri,
info!("Getting Managed Subscribe URI.");

// Get the uri of the managed subscribe service from settings or Chariott.
let managed_subscribe_uri = get_service_uri(
config.managed_subscribe_uri_source,
MANAGED_SUBSCRIBE_COMMUNICATION_KIND,
MANAGED_SUBSCRIBE_COMMUNICATION_REFERENCE,
)
.await?;

Ok(ManagedSubscribeModule {
managed_subscribe_uri,
service_uri,
service_protocol: SERVICE_PROTOCOL.to_string(),
store,
}
})
}

/// Creates a new managed subscribe interceptor that shares data with the current instance of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,20 @@
# Example: "0.0.0.0:80"
base_authority: <<value>>

# The URI that the Managed Subscribe service listens on for requests.
managed_subscribe_uri: <<value>>
# Information for how to get the Managed Subscribe URI. Only one source can be uncommented at a time.
managed_subscribe_uri_source:

# The URI that the Chariott service listens on for requests.
# If you wish to use Chariott to discover Agemo, then uncomment this setting.
# chariott_uri: <<value>>
# The Managed Subscribe URI will be retrieved from this settings file.
# 'service_uri' - The URI that the Managed Subscribe service listens on for requests.
Local:
service_uri: <<value>>

# The Managed Subscribe URI will be discovered through Chariott.
# 'chariott_uri' - The URI that the Chariott service listens on for requests.
# 'service_identifier' - The service identifier for the Managed Subscribe service.
# Chariott:
# chariott_uri: <<value>>
# service_identifier:
# namespace: <<value>>
# name: <<value>>
# version: <<value>>
3 changes: 3 additions & 0 deletions samples/managed_subscribe/.accepted_words.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@ https
Ibeji
InVehicle
invehicle
md
MQTT
pubsub
repo
sdv
svg
TopicManagementCB
uri
Expand Down
Loading

0 comments on commit 3073a31

Please sign in to comment.