diff --git a/Cargo.toml b/Cargo.toml index 57a1ad3e94..e8a29291f7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ static-openssl = ["openssl/vendored", "openssl-probe"] dummy-driver = ['ya-dummy-driver'] erc20-driver = ['ya-erc20-driver'] tos = [] -framework-test = ['ya-exe-unit/framework-test'] +framework-test = ['ya-exe-unit/framework-test', 'ya-activity/framework-test'] # Temporary to make goth integration tests work central-net = ['ya-net/central-net'] packet-trace-enable = [ @@ -236,7 +236,7 @@ members = [ # this entry is needed to make sqlx version >=0.5.9 work with diesel 1.4.* # diesel 1.4.* supports up to 0.23.0, but sqlx 0.5.9 requires 0.22.0 # sqlx 0.5.10 need 0.23.2, so 0.5.9 is last version possible -derive_more = "0.99.11" +derive_more = "0.99" erc20_payment_lib = { git = "https://github.com/golemfactory/erc20_payment_lib", rev = "4eb076ec19bf58cf4063b4cdb4cf370473892203" } erc20_processor = { git = "https://github.com/golemfactory/erc20_payment_lib", rev = "4eb076ec19bf58cf4063b4cdb4cf370473892203" } erc20_payment_lib_common = { git = "https://github.com/golemfactory/erc20_payment_lib", rev = "4eb076ec19bf58cf4063b4cdb4cf370473892203" } diff --git a/core/activity/Cargo.toml b/core/activity/Cargo.toml index d3e0fc8d6f..3511fbbfb1 100644 --- a/core/activity/Cargo.toml +++ b/core/activity/Cargo.toml @@ -4,6 +4,9 @@ version = "0.4.0" authors = ["Golem Factory "] edition = "2018" +[features] +framework-test = ['ya-gsb-http-proxy/framework-test'] + [dependencies] ya-core-model = { version = "0.9", features = ["activity", "market"] } ya-client-model = { version = "0.6", features = ["sgx"] } diff --git a/core/activity/src/http_proxy.rs b/core/activity/src/http_proxy.rs index d0c06a1b97..06e7bb5fde 100644 --- a/core/activity/src/http_proxy.rs +++ b/core/activity/src/http_proxy.rs @@ -44,9 +44,12 @@ async fn proxy_http_request( let activity_id = path_activity_url.activity_id; let path = path_activity_url.url; - let result = authorize_activity_executor(&db, id.identity, &activity_id, Role::Requestor).await; + let result = + authorize_activity_initiator(&db, id.identity, &activity_id, Role::Requestor).await; if let Err(e) = result { - log::error!("Authorize error {}", e); + log::info!( + "Proxy authorize error (currently not authorized requests are not rejected): {e}" + ); } let agreement = get_activity_agreement(&db, &activity_id, Role::Requestor).await?; diff --git a/exe-unit/components/gsb-http-proxy/Cargo.toml b/exe-unit/components/gsb-http-proxy/Cargo.toml index a9b50d0393..c399b7e02b 100644 --- a/exe-unit/components/gsb-http-proxy/Cargo.toml +++ b/exe-unit/components/gsb-http-proxy/Cargo.toml @@ -4,34 +4,37 @@ version = "0.1.0" edition = "2021" +[features] +framework-test = [] + [dependencies] ya-service-bus = { workspace = true } ya-counters = { path = "../counters" } ya-client-model = "0.6" ya-core-model = { version = "^0.9" } -thiserror = "1.0" -serde = { version = "1.0", features = ["derive"] } -chrono = "0.4" -http = "1.0" -serde_json = "1.0" -tokio = { version = "1.35", features = ["full"] } -reqwest = { version = "0.11", features = ["json", "stream"] } -log = { version = "0.4", features = [] } -async-stream = "0.3" -futures = { version = "0.3", features = [] } -futures-core = "0.3" -serde_derive = "1.0" actix = "0.13" actix-http = "3" actix-web = "4" actix-rt = "2.7" anyhow = "1.0" -rand = "0.8.5" -hex = "0.4.3" -env_logger = "0.10.2" +async-stream = "0.3" bytes = "1.6.0" -derive_more = "0.99.17" +chrono = "0.4" +derive_more = { workspace = true } +env_logger = "0.10.2" +futures = { version = "0.3", features = [] } +futures-core = "0.3" +hex = "0.4.3" +http = "1.0" +log = { version = "0.4", features = [] } +rand = { workspace = true } +reqwest = { version = "0.11", features = ["json", "stream"] } +serde = { version = "1.0", features = ["derive"] } +serde_derive = "1.0" +serde_json = "1.0" +thiserror = "1.0" +tokio = { version = "1", features = ["full"] } [dev-dependencies] mockito = "1.2" diff --git a/exe-unit/components/gsb-http-proxy/src/gsb_to_http.rs b/exe-unit/components/gsb-http-proxy/src/gsb_to_http.rs index 9d53d71f61..2e9c5dcb80 100644 --- a/exe-unit/components/gsb-http-proxy/src/gsb_to_http.rs +++ b/exe-unit/components/gsb-http-proxy/src/gsb_to_http.rs @@ -53,9 +53,9 @@ impl GsbToHttpProxy { pub fn bind(&mut self, gsb_path: &str) -> Handle { let this = self.clone(); - bus::bind(gsb_path, move |message: GsbHttpCallMessage| { + bus::bind_with_caller(gsb_path, move |caller, message: GsbHttpCallMessage| { let mut this = this.clone(); - async move { Ok(this.pass(message).await) } + async move { Ok(this.pass(caller, message).await) } }) } @@ -67,10 +67,13 @@ impl GsbToHttpProxy { }) } - pub async fn pass(&mut self, message: GsbHttpCallMessage) -> GsbHttpCallResponse { + pub async fn pass( + &mut self, + caller: String, + message: GsbHttpCallMessage, + ) -> GsbHttpCallResponse { let url = format!("{}{}", self.base_url, message.path); - log::info!("Gsb to http call - Url: {url}"); - + let path = message.path.clone(); let mut counters = self.counters.clone(); let method = match Method::from_bytes(message.method.to_uppercase().as_bytes()) { @@ -82,9 +85,10 @@ impl GsbToHttpProxy { ) } }; - let builder = Self::create_request_builder(method, &url, message.headers, message.body); + let builder = + Self::create_request_builder(method.clone(), &url, message.headers, message.body); - log::debug!("Calling {}", &url); + log::info!("Gsb proxy http call {method} to {url} from {caller}"); let response_handler = counters.on_request(); let response = builder .send() @@ -98,18 +102,27 @@ impl GsbToHttpProxy { response_handler.on_response(); match response.bytes().await { Ok(bytes) => { + log::info!( + "GSB http proxy: response for {method} `{path}` status: {status_code}" + ); GsbHttpCallResponse::new(bytes.to_vec(), response_headers, status_code) } - Err(err) => GsbHttpCallResponse::with_message( - format!("Error in response: {err}").into_bytes(), - StatusCode::INTERNAL_SERVER_ERROR.as_u16(), - ), + Err(err) => { + log::info!("GSB http proxy: response for {method} `{path}` status: {status_code}, error: {err}"); + GsbHttpCallResponse::with_message( + format!("Error in response: {err}").into_bytes(), + StatusCode::INTERNAL_SERVER_ERROR.as_u16(), + ) + } } } - Err(err) => GsbHttpCallResponse::with_message( - format!("Error in response: {err}").into_bytes(), - StatusCode::INTERNAL_SERVER_ERROR.as_u16(), - ), + Err(err) => { + log::info!("GSB http proxy: error calling {method} `{path}`: {err}"); + GsbHttpCallResponse::with_message( + format!("Error in response: {err}").into_bytes(), + StatusCode::INTERNAL_SERVER_ERROR.as_u16(), + ) + } } } @@ -272,8 +285,9 @@ mod tests { let mut requests_counter = gsb_call.requests_counter(); let mut requests_duration_counter = gsb_call.requests_duration_counter(); + let caller = "0x0000000000000000000000000000000000000000".to_string(); let message = message(); - let response = gsb_call.pass(message).await; + let response = gsb_call.pass(caller.clone(), message).await; let mut headers = vec![]; @@ -377,9 +391,10 @@ mod tests { async fn run_10_requests(mut gsb_call_proxy: GsbToHttpProxy) { let message = message(); + let caller = "0x0000000000000000000000000000000000000000".to_string(); for _ in 0..10 { let message = message.clone(); - let response = gsb_call_proxy.pass(message).await; + let response = gsb_call_proxy.pass(caller.clone(), message).await; assert_eq!("response".as_bytes(), response.body.msg_bytes); } } diff --git a/exe-unit/components/gsb-http-proxy/src/http_to_gsb.rs b/exe-unit/components/gsb-http-proxy/src/http_to_gsb.rs index a8be103500..66c54fdb7e 100644 --- a/exe-unit/components/gsb-http-proxy/src/http_to_gsb.rs +++ b/exe-unit/components/gsb-http-proxy/src/http_to_gsb.rs @@ -33,6 +33,15 @@ impl HttpToGsbProxy { bus_addr: bus_addr.to_string(), } } + + pub fn endpoint(&self) -> bus::Endpoint { + match &self.binding { + BindingMode::Local => bus::service(&self.bus_addr), + BindingMode::Net(binding) => ya_net::from(binding.from) + .to(binding.to) + .service(&self.bus_addr), + } + } } pub struct HttpToGsbProxyResponse { @@ -74,44 +83,52 @@ impl HttpToGsbProxy { }; let msg = GsbHttpCallMessage { - method, - path, + method: method.clone(), + path: path.clone(), body, headers: Headers::default().filter(&headers), }; - let response = match &self.binding { - BindingMode::Local => bus::service(&self.bus_addr).call(msg).await, - BindingMode::Net(binding) => { - ya_net::from(binding.from) - .to(binding.to) - .service(&self.bus_addr) - .call(msg) - .await - } - }; + let endpoint = self.endpoint(); - let result = response.unwrap_or_else(|e| Err(HttpProxyStatusError::from(e))); + log::info!("Proxy http {msg} call to [{}]", endpoint.addr()); + let result = endpoint + .call(msg) + .await + .unwrap_or_else(|e| Err(HttpProxyStatusError::from(e))); match result { - Ok(r) => HttpToGsbProxyResponse { - body: actix_web::web::Bytes::from(r.body.msg_bytes) - .try_into_bytes() - .map_err(|_| { - Error::GsbFailure("Failed to invoke GsbHttpProxy call".to_string()) - }), - status_code: r.header.status_code, - response_headers: r.header.response_headers, - }, - Err(err) => HttpToGsbProxyResponse { - body: actix_web::web::Bytes::from(format!("Error: {}", err)) - .try_into_bytes() - .map_err(|_| { - Error::GsbFailure("Failed to invoke GsbHttpProxy call".to_string()) - }), - status_code: StatusCode::INTERNAL_SERVER_ERROR.as_u16(), - response_headers: HashMap::new(), - }, + Ok(r) => { + log::info!( + "Http proxy: response for {method} `{path}` call to [{}]: status: {}", + endpoint.addr(), + r.header.status_code + ); + HttpToGsbProxyResponse { + body: actix_web::web::Bytes::from(r.body.msg_bytes) + .try_into_bytes() + .map_err(|_| { + Error::GsbFailure("Failed to invoke GsbHttpProxy call".to_string()) + }), + status_code: r.header.status_code, + response_headers: r.header.response_headers, + } + } + Err(err) => { + log::warn!( + "Http proxy: error calling {method} `{path}` at [{}]: error: {err}", + endpoint.addr() + ); + HttpToGsbProxyResponse { + body: actix_web::web::Bytes::from(format!("Error: {err}")) + .try_into_bytes() + .map_err(|_| { + Error::GsbFailure("Failed to invoke GsbHttpProxy call".to_string()) + }), + status_code: StatusCode::INTERNAL_SERVER_ERROR.as_u16(), + response_headers: HashMap::new(), + } + } } } diff --git a/exe-unit/components/gsb-http-proxy/src/message.rs b/exe-unit/components/gsb-http-proxy/src/message.rs index 7442d47d0c..322d415587 100644 --- a/exe-unit/components/gsb-http-proxy/src/message.rs +++ b/exe-unit/components/gsb-http-proxy/src/message.rs @@ -1,11 +1,15 @@ use crate::error::HttpProxyStatusError; use crate::response::{GsbHttpCallResponse, GsbHttpCallResponseStreamChunk}; + +use derive_more::Display; use serde_derive::{Deserialize, Serialize}; use std::collections::HashMap; + use ya_service_bus::{RpcMessage, RpcStreamMessage}; -#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Display)] #[serde(rename_all = "camelCase")] +#[display(fmt = "{method} `{path}`")] pub struct GsbHttpCallMessage { pub method: String, pub path: String, diff --git a/goth_tests/e2e/vm/assets/test_e2e_outbound_perf/image/outbound-bench/Cargo.toml b/goth_tests/e2e/vm/assets/test_e2e_outbound_perf/image/outbound-bench/Cargo.toml index 5f17dba732..423b43a06c 100644 --- a/goth_tests/e2e/vm/assets/test_e2e_outbound_perf/image/outbound-bench/Cargo.toml +++ b/goth_tests/e2e/vm/assets/test_e2e_outbound_perf/image/outbound-bench/Cargo.toml @@ -10,6 +10,6 @@ clap = { version = "4.3.4", features = ["derive"] } rand = "0.8.5" serde = { version = "1.0.164", features = ["derive"] } serde_json = "1" -tokio = { version = "1.28.2", features = ["full"] } -reqwest = "0.11.18" +tokio = { version = "1", features = ["full"] } +reqwest = "0.11" anyhow = "1.0.71" \ No newline at end of file