Skip to content

Commit

Permalink
feat: global exchange
Browse files Browse the repository at this point in the history
  • Loading branch information
TheEdward162 committed Mar 20, 2024
1 parent 9d15b0c commit 679fb80
Show file tree
Hide file tree
Showing 28 changed files with 334 additions and 224 deletions.
1 change: 1 addition & 0 deletions core/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion core/comlink_wasm/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use wasm_abi::{
AbiResultRepr, Handle, MessageExchange, MessageExchangeFfiFn, Ptr, Size, StaticMessageExchange
AbiResultRepr, Handle, MessageExchange, MessageExchangeFfiFn, Ptr, Size, StaticMessageExchange,
};

pub mod typescript_parser;
Expand Down
2 changes: 1 addition & 1 deletion core/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jsonschema = { workspace = true }
base64 = { workspace = true }
url = { workspace = true }

sf_std = { path = "../host_to_core_std", package = "host_to_core_std" }
sf_std = { path = "../host_to_core_std", package = "host_to_core_std", features = ["global_exchange"] }
map_std = { path = "../core_to_map_std", package = "core_to_map_std" }
interpreter_js = { path = "../interpreter_js" }
comlink = { path = "../comlink" }
Expand Down
41 changes: 19 additions & 22 deletions core/core/src/cache.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
use std::{
collections::HashMap,
io::Read,
time::{Duration, Instant}, ops::Deref,
ops::Deref,
time::{Duration, Instant},
};

use url::Url;

use sf_std::{
unstable::{http::{HttpCallError, HttpRequest}, provider::ProviderJson, fs::read_in},
HeaderName, HeadersMultiMap, abi::{MessageExchange, StreamExchange},
unstable::{
fs,
http::{GlobalHttpRequest as HttpRequest, HttpCallError},
provider::ProviderJson,
},
HeaderName, HeadersMultiMap,
};

use super::digest;
Expand Down Expand Up @@ -104,30 +109,20 @@ impl<E: std::fmt::Debug> std::fmt::Debug for DocumentCacheEntry<E> {
}
}

pub struct DocumentCache<E, Me: MessageExchange, Se: StreamExchange> {
message_exchange: Me,
stream_exchange: Se,
pub struct DocumentCache<E> {
map: HashMap<String, DocumentCacheEntry<E>>,
cache_duration: Duration,
registry_url: Url,
user_agent: Option<String>,
}
impl<E, Me: MessageExchange, Se: StreamExchange> DocumentCache<E, Me, Se> {
impl<E> DocumentCache<E> {
const FILE_URL_PREFIX: &'static str = "file://";
const HTTP_URL_PREFIX: &'static str = "http://";
const HTTPS_URL_PREFIX: &'static str = "https://";
const BASE64_URL_PREFIX: &'static str = "data:;base64,";

pub fn new(
cache_duration: Duration,
registry_url: Url,
user_agent: Option<String>,
message_exchange: Me,
stream_exchange: Se
) -> Self {
pub fn new(cache_duration: Duration, registry_url: Url, user_agent: Option<String>) -> Self {
Self {
message_exchange,
stream_exchange,
map: HashMap::new(),
cache_duration,
registry_url,
Expand All @@ -136,7 +131,9 @@ impl<E, Me: MessageExchange, Se: StreamExchange> DocumentCache<E, Me, Se> {
}

pub fn get_key_value(&self, url: &str) -> Option<(&str, &E)> {
self.map.get_key_value(url).map(|(k, e)| (k.deref(), &e.data))
self.map
.get_key_value(url)
.map(|(k, e)| (k.deref(), &e.data))
}

pub fn cache<PostProcessError: std::error::Error>(
Expand Down Expand Up @@ -198,29 +195,29 @@ impl<E, Me: MessageExchange, Se: StreamExchange> DocumentCache<E, Me, Se> {

fn cache_file<PostProcessError: std::error::Error>(
&self,
url: &str
url: &str,
) -> Result<Vec<u8>, DocumentCacheError<PostProcessError>> {
match url.strip_prefix(Self::FILE_URL_PREFIX) {
None => Err(DocumentCacheError::FileLoadFailed(
url.to_string(),
std::io::ErrorKind::NotFound.into(),
)),
Some(path) => read_in(path, &self.message_exchange, &self.stream_exchange)
Some(path) => fs::read(path)
.map_err(|err| DocumentCacheError::FileLoadFailed(path.to_string(), err)),
}
}

fn cache_http<PostProcessError: std::error::Error>(
&self,
url: &str,
user_agent: Option<&str>
user_agent: Option<&str>,
) -> Result<Vec<u8>, DocumentCacheError<PostProcessError>> {
let mut headers = HeadersMultiMap::new();
if let Some(user_agent) = user_agent {
headers.insert(HeaderName::from("user-agent"), vec![user_agent.to_string()]);
}

let mut response = HttpRequest::fetch_in("GET", url, &headers, &Default::default(), None, &self.message_exchange, &self.stream_exchange)
let mut response = HttpRequest::fetch("GET", url, &headers, &Default::default(), None)
.and_then(|v| v.into_response())
.map_err(|err| DocumentCacheError::HttpLoadFailed(url.to_string(), err))?;

Expand All @@ -247,7 +244,7 @@ impl<E, Me: MessageExchange, Se: StreamExchange> DocumentCache<E, Me, Se> {
}
}
}
impl<E: std::fmt::Debug, Me: MessageExchange, Se: StreamExchange> std::fmt::Debug for DocumentCache<E, Me, Se> {
impl<E: std::fmt::Debug> std::fmt::Debug for DocumentCache<E> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DocumentCache")
.field("map", &self.map)
Expand Down
2 changes: 1 addition & 1 deletion core/core/src/exception.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use sf_std::unstable::exception::{PerformException, PerformExceptionErrorCode};

use comlink::json_schema_validator::JsonSchemaValidatorError;
use super::cache::DocumentCacheError;
use comlink::json_schema_validator::JsonSchemaValidatorError;

impl<PostProcessError: std::error::Error> From<DocumentCacheError<PostProcessError>>
for PerformException
Expand Down
101 changes: 52 additions & 49 deletions core/core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
use std::{collections::BTreeMap, str::FromStr};

use sf_std::{unstable::{
use sf_std::unstable::{
exception::PerformException,
perform::{PerformInput, set_perform_output_result_in, set_perform_output_error_in, set_perform_output_exception_in},
perform::{
set_perform_output_error, set_perform_output_exception, set_perform_output_result,
PerformInput,
},
HostValue,
}, abi::{MessageExchange, StreamExchange}};
};

use comlink::json_schema_validator::JsonSchemaValidator;
use interpreter_js::JsInterpreter;
use map_std::unstable::{
security::{prepare_provider_parameters, prepare_security_map},
services::prepare_services_map,
MapValue, MapValueObject,
};
use comlink::json_schema_validator::JsonSchemaValidator;

use crate::{
exception::FromJsonSchemaValidationError,
metrics::PerformMetricsData,
};
use crate::{exception::FromJsonSchemaValidationError, metrics::PerformMetricsData};

pub use sf_std;

mod cache;
mod config;
Expand All @@ -41,25 +43,23 @@ use self::{
};

#[derive(Debug)]
pub struct OneClientCore<Me: MessageExchange, Se: StreamExchange> {
message_exchange: Me,
stream_exchange: Se,
profile_cache: DocumentCache<ProfileCacheEntry, Me, Se>,
provider_cache: DocumentCache<ProviderJsonCacheEntry, Me, Se>,
map_cache: DocumentCache<MapCacheEntry, Me, Se>,
pub struct OneClientCore {
profile_cache: DocumentCache<ProfileCacheEntry>,
provider_cache: DocumentCache<ProviderJsonCacheEntry>,
map_cache: DocumentCache<MapCacheEntry>,
security_validator: JsonSchemaValidator,
parameters_validator: JsonSchemaValidator,
mapstd_config: MapStdImplConfig,
}
impl<Me: MessageExchange + Clone + 'static, Se: StreamExchange + Clone + 'static> OneClientCore<Me, Se> {
impl OneClientCore {
const MAP_STDLIB_JS: &'static str = include_str!("../assets/js/map_std.js");
const SECURITY_VALUES_JSON_SCHEMA: &'static str =
include_str!("../assets/schemas/security_values.json");
const PARAMETERS_VALUES_JSON_SCHEMA: &'static str =
include_str!("../assets/schemas/parameters_values.json");

// TODO: Use thiserror and define specific errors
pub fn new(config: &CoreConfiguration, message_exchange: Me, stream_exchange: Se) -> anyhow::Result<Self> {
pub fn new(config: &CoreConfiguration) -> anyhow::Result<Self> {
tracing::info!(target: "@user", config = ?config);

crate::observability::log_metric!(Init);
Expand All @@ -69,22 +69,16 @@ impl<Me: MessageExchange + Clone + 'static, Se: StreamExchange + Clone + 'static
config.cache_duration,
config.registry_url.clone(),
Some(config.user_agent.clone()),
message_exchange.clone(),
stream_exchange.clone()
),
provider_cache: DocumentCache::new(
config.cache_duration,
config.registry_url.clone(),
Some(config.user_agent.clone()),
message_exchange.clone(),
stream_exchange.clone()
),
map_cache: DocumentCache::new(
config.cache_duration,
config.registry_url.clone(),
Some(config.user_agent.clone()),
message_exchange.clone(),
stream_exchange.clone()
),
security_validator: JsonSchemaValidator::new(
&serde_json::Value::from_str(Self::SECURITY_VALUES_JSON_SCHEMA)
Expand All @@ -101,8 +95,6 @@ impl<Me: MessageExchange + Clone + 'static, Se: StreamExchange + Clone + 'static
log_http_transactions_body_max_size: config.user_log_http_body_max_size,
user_agent: config.user_agent.clone(),
},
message_exchange,
stream_exchange,
})
}

Expand Down Expand Up @@ -154,15 +146,15 @@ impl<Me: MessageExchange + Clone + 'static, Se: StreamExchange + Clone + 'static
pub fn perform(&mut self) {
#[cfg(feature = "core_mock")]
{
return crate::mock::perform(self.message_exchange.clone());
return crate::mock::perform();
}

// we can't send metrics if we don't even know the profile and provider urls
let perform_input = match PerformInput::take_in(&self.message_exchange) {
let perform_input = match PerformInput::take() {
Ok(i) => i,
Err(exception) => {
set_perform_output_exception_in(exception.into(), &self.message_exchange);
return
set_perform_output_exception(exception.into());
return;
}
};

Expand All @@ -184,15 +176,15 @@ impl<Me: MessageExchange + Clone + 'static, Se: StreamExchange + Clone + 'static
map_url = metrics_data.map_url,
map_content_hash = metrics_data.map_content_hash
);

match map_result {
Ok(result) => set_perform_output_result_in(self.map_value_to_host_value(result), &self.message_exchange),
Err(error) => set_perform_output_error_in(self.map_value_to_host_value(error), &self.message_exchange)
Ok(result) => set_perform_output_result(self.map_value_to_host_value(result)),
Err(error) => set_perform_output_error(self.map_value_to_host_value(error)),
}
}
Err(exception) => {
tracing::error!(target: "@user", "Perform failed unexpectedly: {}", exception);

tracing::debug!(perform_metrics = ?metrics_data);
crate::observability::log_metric!(
Perform
Expand All @@ -207,15 +199,23 @@ impl<Me: MessageExchange + Clone + 'static, Se: StreamExchange + Clone + 'static
map_content_hash = metrics_data.map_content_hash
);

set_perform_output_exception_in(exception, &self.message_exchange)
set_perform_output_exception(exception)
}
}
}

fn perform_impl<'metrics, 'me: 'metrics>(&'me mut self, perform_input: PerformInput, metrics_data: &'metrics mut PerformMetricsData<'me>) -> Result<Result<MapValue, MapValue>, PerformException> {
fn perform_impl<'metrics, 'me: 'metrics>(
&'me mut self,
perform_input: PerformInput,
metrics_data: &'metrics mut PerformMetricsData<'me>,
) -> Result<Result<MapValue, MapValue>, PerformException> {
// first cache documents
self.profile_cache.cache(&perform_input.profile_url, ProfileCacheEntry::from_data)?;
self.provider_cache.cache(&perform_input.provider_url, ProviderJsonCacheEntry::from_data)?;
self.profile_cache
.cache(&perform_input.profile_url, ProfileCacheEntry::from_data)?;
self.provider_cache.cache(
&perform_input.provider_url,
ProviderJsonCacheEntry::from_data,
)?;
self.map_cache.cache(&perform_input.map_url, |data| {
// TODO: this is temporary, should be extracted from the map manifest
let file_name = perform_input.map_url.split('/').last().unwrap().to_string();
Expand Down Expand Up @@ -273,7 +273,10 @@ impl<Me: MessageExchange + Clone + 'static, Se: StreamExchange + Clone + 'static
})?;

// parse provider json
let (provider_url, provider_entry) = self.provider_cache.get_key_value(&perform_input.provider_url).unwrap();
let (provider_url, provider_entry) = self
.provider_cache
.get_key_value(&perform_input.provider_url)
.unwrap();
// TODO: validate provider json with json schema, to verify OneClient will understand it?

metrics_data.provider_url = provider_url;
Expand All @@ -284,27 +287,27 @@ impl<Me: MessageExchange + Clone + 'static, Se: StreamExchange + Clone + 'static
let mut provider_parameters = prepare_provider_parameters(&provider_entry.provider_json);
provider_parameters.append(&mut map_parameters);
let map_parameters = provider_parameters;
let map_security = prepare_security_map(
&provider_entry.provider_json,
&perform_input.map_security
)?;
let map_security =
prepare_security_map(&provider_entry.provider_json, &perform_input.map_security)?;
let map_services = prepare_services_map(&provider_entry.provider_json, &map_parameters)?;

let (profile_url, profile_entry) = self.profile_cache.get_key_value(&perform_input.profile_url).unwrap();
let (profile_url, profile_entry) = self
.profile_cache
.get_key_value(&perform_input.profile_url)
.unwrap();
metrics_data.profile_url = profile_url;
metrics_data.profile_content_hash = Some(&profile_entry.content_hash);

// start interpreting stdlib and then map code
// TODO: should this be here or should we hold an instance of the interpreter in global state
// and clear per-perform data each time it is called?
let mut interpreter = JsInterpreter::new(MapStdImpl::new(
self.mapstd_config.to_owned(),
self.message_exchange.clone(),
self.stream_exchange.clone()
))?;
let mut interpreter = JsInterpreter::new(MapStdImpl::new(self.mapstd_config.to_owned()))?;
interpreter.eval_code("map_std.js", Self::MAP_STDLIB_JS)?;

let (map_url, map_entry) = self.map_cache.get_key_value(&perform_input.map_url).unwrap();
let (map_url, map_entry) = self
.map_cache
.get_key_value(&perform_input.map_url)
.unwrap();
metrics_data.map_url = map_url;
metrics_data.map_content_hash = Some(&map_entry.content_hash);

Expand Down
Loading

0 comments on commit 679fb80

Please sign in to comment.