Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add logs for GSB http proxy #3304

Merged
merged 6 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ static-openssl = ["openssl/vendored", "openssl-probe"]
dummy-driver = ['ya-dummy-driver']
erc20-driver = ['ya-erc20-driver']
tos = []
framework-test = ['ya-exe-unit/framework-test']
framework-test = ['ya-exe-unit/framework-test', 'ya-activity/framework-test']
# Temporary to make goth integration tests work
central-net = ['ya-net/central-net']
packet-trace-enable = [
Expand Down Expand Up @@ -236,7 +236,7 @@ members = [
# this entry is needed to make sqlx version >=0.5.9 work with diesel 1.4.*
# diesel 1.4.* supports up to 0.23.0, but sqlx 0.5.9 requires 0.22.0
# sqlx 0.5.10 need 0.23.2, so 0.5.9 is last version possible
derive_more = "0.99.11"
derive_more = "0.99"
erc20_payment_lib = { git = "https://github.com/golemfactory/erc20_payment_lib", rev = "4eb076ec19bf58cf4063b4cdb4cf370473892203" }
erc20_processor = { git = "https://github.com/golemfactory/erc20_payment_lib", rev = "4eb076ec19bf58cf4063b4cdb4cf370473892203" }
erc20_payment_lib_common = { git = "https://github.com/golemfactory/erc20_payment_lib", rev = "4eb076ec19bf58cf4063b4cdb4cf370473892203" }
Expand Down
3 changes: 3 additions & 0 deletions core/activity/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ version = "0.4.0"
authors = ["Golem Factory <[email protected]>"]
edition = "2018"

[features]
framework-test = ['ya-gsb-http-proxy/framework-test']

[dependencies]
ya-core-model = { version = "0.9", features = ["activity", "market"] }
ya-client-model = { version = "0.6", features = ["sgx"] }
Expand Down
7 changes: 5 additions & 2 deletions core/activity/src/http_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,12 @@ async fn proxy_http_request(
let activity_id = path_activity_url.activity_id;
let path = path_activity_url.url;

let result = authorize_activity_executor(&db, id.identity, &activity_id, Role::Requestor).await;
let result =
authorize_activity_initiator(&db, id.identity, &activity_id, Role::Requestor).await;
prawilny marked this conversation as resolved.
Show resolved Hide resolved
if let Err(e) = result {
log::error!("Authorize error {}", e);
log::info!(
"Proxy authorize error (currently not authorized requests are not rejected): {e}"
);
}

let agreement = get_activity_agreement(&db, &activity_id, Role::Requestor).await?;
Expand Down
35 changes: 19 additions & 16 deletions exe-unit/components/gsb-http-proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,37 @@ version = "0.1.0"
edition = "2021"


[features]
framework-test = []

[dependencies]
ya-service-bus = { workspace = true }
ya-counters = { path = "../counters" }
ya-client-model = "0.6"
ya-core-model = { version = "^0.9" }

thiserror = "1.0"
serde = { version = "1.0", features = ["derive"] }
chrono = "0.4"
http = "1.0"
serde_json = "1.0"
tokio = { version = "1.35", features = ["full"] }
reqwest = { version = "0.11", features = ["json", "stream"] }
log = { version = "0.4", features = [] }
async-stream = "0.3"
futures = { version = "0.3", features = [] }
futures-core = "0.3"
serde_derive = "1.0"
actix = "0.13"
actix-http = "3"
actix-web = "4"
actix-rt = "2.7"
anyhow = "1.0"
rand = "0.8.5"
hex = "0.4.3"
env_logger = "0.10.2"
async-stream = "0.3"
bytes = "1.6.0"
derive_more = "0.99.17"
chrono = "0.4"
derive_more = { workspace = true }
env_logger = "0.10.2"
futures = { version = "0.3", features = [] }
futures-core = "0.3"
hex = "0.4.3"
http = "1.0"
log = { version = "0.4", features = [] }
rand = { workspace = true }
reqwest = { version = "0.11", features = ["json", "stream"] }
serde = { version = "1.0", features = ["derive"] }
serde_derive = "1.0"
serde_json = "1.0"
thiserror = "1.0"
tokio = { version = "1", features = ["full"] }

[dev-dependencies]
mockito = "1.2"
Expand Down
49 changes: 32 additions & 17 deletions exe-unit/components/gsb-http-proxy/src/gsb_to_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ impl GsbToHttpProxy {

pub fn bind(&mut self, gsb_path: &str) -> Handle {
let this = self.clone();
bus::bind(gsb_path, move |message: GsbHttpCallMessage| {
bus::bind_with_caller(gsb_path, move |caller, message: GsbHttpCallMessage| {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I understand correctly that it aims to resolve golemfactory/ya-runtime-ai#105?

If so, how does it solve the problem if we don't make use of that parameter in pass() apart from logging? Or do we only want to log the messages for now and fix it after collecting more data?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't. The problem mentioned is solved here: 27f9350

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or do we solve the problem by switching authorize_activity_executor() to authorize_activity_initiator() and log to ensure that there are no problems with it after the change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it's solves the problem with spam that reports incorrectly that something is wrong when it isn't.
I don't want to block possibility of using other yagna as proxy, so the call is not rejected even if it is from incorrect identity

let mut this = this.clone();
async move { Ok(this.pass(message).await) }
async move { Ok(this.pass(caller, message).await) }
})
}

Expand All @@ -67,10 +67,13 @@ impl GsbToHttpProxy {
})
}

pub async fn pass(&mut self, message: GsbHttpCallMessage) -> GsbHttpCallResponse {
pub async fn pass(
&mut self,
caller: String,
message: GsbHttpCallMessage,
) -> GsbHttpCallResponse {
let url = format!("{}{}", self.base_url, message.path);
log::info!("Gsb to http call - Url: {url}");

let path = message.path.clone();
let mut counters = self.counters.clone();

let method = match Method::from_bytes(message.method.to_uppercase().as_bytes()) {
Expand All @@ -82,9 +85,10 @@ impl GsbToHttpProxy {
)
}
};
let builder = Self::create_request_builder(method, &url, message.headers, message.body);
let builder =
Self::create_request_builder(method.clone(), &url, message.headers, message.body);

log::debug!("Calling {}", &url);
log::info!("Gsb proxy http call {method} to {url} from {caller}");
let response_handler = counters.on_request();
let response = builder
.send()
Expand All @@ -98,18 +102,27 @@ impl GsbToHttpProxy {
response_handler.on_response();
match response.bytes().await {
Ok(bytes) => {
log::info!(
"GSB http proxy: response for {method} `{path}` status: {status_code}"
);
GsbHttpCallResponse::new(bytes.to_vec(), response_headers, status_code)
}
Err(err) => GsbHttpCallResponse::with_message(
format!("Error in response: {err}").into_bytes(),
StatusCode::INTERNAL_SERVER_ERROR.as_u16(),
),
Err(err) => {
log::info!("GSB http proxy: response for {method} `{path}` status: {status_code}, error: {err}");
GsbHttpCallResponse::with_message(
format!("Error in response: {err}").into_bytes(),
StatusCode::INTERNAL_SERVER_ERROR.as_u16(),
)
}
}
}
Err(err) => GsbHttpCallResponse::with_message(
format!("Error in response: {err}").into_bytes(),
StatusCode::INTERNAL_SERVER_ERROR.as_u16(),
),
Err(err) => {
log::info!("GSB http proxy: error calling {method} `{path}`: {err}");
GsbHttpCallResponse::with_message(
format!("Error in response: {err}").into_bytes(),
StatusCode::INTERNAL_SERVER_ERROR.as_u16(),
)
}
}
}

Expand Down Expand Up @@ -272,8 +285,9 @@ mod tests {
let mut requests_counter = gsb_call.requests_counter();
let mut requests_duration_counter = gsb_call.requests_duration_counter();

let caller = "0x0000000000000000000000000000000000000000".to_string();
let message = message();
let response = gsb_call.pass(message).await;
let response = gsb_call.pass(caller.clone(), message).await;

let mut headers = vec![];

Expand Down Expand Up @@ -377,9 +391,10 @@ mod tests {

async fn run_10_requests(mut gsb_call_proxy: GsbToHttpProxy) {
let message = message();
let caller = "0x0000000000000000000000000000000000000000".to_string();
for _ in 0..10 {
let message = message.clone();
let response = gsb_call_proxy.pass(message).await;
let response = gsb_call_proxy.pass(caller.clone(), message).await;
assert_eq!("response".as_bytes(), response.body.msg_bytes);
}
}
Expand Down
79 changes: 48 additions & 31 deletions exe-unit/components/gsb-http-proxy/src/http_to_gsb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ impl HttpToGsbProxy {
bus_addr: bus_addr.to_string(),
}
}

pub fn endpoint(&self) -> bus::Endpoint {
match &self.binding {
BindingMode::Local => bus::service(&self.bus_addr),
BindingMode::Net(binding) => ya_net::from(binding.from)
.to(binding.to)
.service(&self.bus_addr),
}
}
}

pub struct HttpToGsbProxyResponse<T> {
Expand Down Expand Up @@ -74,44 +83,52 @@ impl HttpToGsbProxy {
};

let msg = GsbHttpCallMessage {
method,
path,
method: method.clone(),
path: path.clone(),
body,
headers: Headers::default().filter(&headers),
};

let response = match &self.binding {
BindingMode::Local => bus::service(&self.bus_addr).call(msg).await,
BindingMode::Net(binding) => {
ya_net::from(binding.from)
.to(binding.to)
.service(&self.bus_addr)
.call(msg)
.await
}
};
let endpoint = self.endpoint();

let result = response.unwrap_or_else(|e| Err(HttpProxyStatusError::from(e)));
log::info!("Proxy http {msg} call to [{}]", endpoint.addr());
let result = endpoint
.call(msg)
.await
.unwrap_or_else(|e| Err(HttpProxyStatusError::from(e)));

match result {
Ok(r) => HttpToGsbProxyResponse {
body: actix_web::web::Bytes::from(r.body.msg_bytes)
.try_into_bytes()
.map_err(|_| {
Error::GsbFailure("Failed to invoke GsbHttpProxy call".to_string())
}),
status_code: r.header.status_code,
response_headers: r.header.response_headers,
},
Err(err) => HttpToGsbProxyResponse {
body: actix_web::web::Bytes::from(format!("Error: {}", err))
.try_into_bytes()
.map_err(|_| {
Error::GsbFailure("Failed to invoke GsbHttpProxy call".to_string())
}),
status_code: StatusCode::INTERNAL_SERVER_ERROR.as_u16(),
response_headers: HashMap::new(),
},
Ok(r) => {
log::info!(
"Http proxy: response for {method} `{path}` call to [{}]: status: {}",
endpoint.addr(),
r.header.status_code
);
HttpToGsbProxyResponse {
body: actix_web::web::Bytes::from(r.body.msg_bytes)
.try_into_bytes()
.map_err(|_| {
Error::GsbFailure("Failed to invoke GsbHttpProxy call".to_string())
}),
status_code: r.header.status_code,
response_headers: r.header.response_headers,
}
}
Err(err) => {
log::warn!(
"Http proxy: error calling {method} `{path}` at [{}]: error: {err}",
endpoint.addr()
);
HttpToGsbProxyResponse {
body: actix_web::web::Bytes::from(format!("Error: {err}"))
.try_into_bytes()
.map_err(|_| {
Error::GsbFailure("Failed to invoke GsbHttpProxy call".to_string())
}),
status_code: StatusCode::INTERNAL_SERVER_ERROR.as_u16(),
response_headers: HashMap::new(),
prawilny marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
}

Expand Down
6 changes: 5 additions & 1 deletion exe-unit/components/gsb-http-proxy/src/message.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
use crate::error::HttpProxyStatusError;
use crate::response::{GsbHttpCallResponse, GsbHttpCallResponseStreamChunk};

use derive_more::Display;
use serde_derive::{Deserialize, Serialize};
use std::collections::HashMap;

use ya_service_bus::{RpcMessage, RpcStreamMessage};

#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Display)]
#[serde(rename_all = "camelCase")]
#[display(fmt = "{method} `{path}`")]
pub struct GsbHttpCallMessage {
pub method: String,
pub path: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ clap = { version = "4.3.4", features = ["derive"] }
rand = "0.8.5"
serde = { version = "1.0.164", features = ["derive"] }
serde_json = "1"
tokio = { version = "1.28.2", features = ["full"] }
reqwest = "0.11.18"
tokio = { version = "1", features = ["full"] }
reqwest = "0.11"
anyhow = "1.0.71"
Loading