Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Bump networking deps #2349

Merged
merged 2 commits into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
652 changes: 386 additions & 266 deletions Cargo.lock

Large diffs are not rendered by default.

16 changes: 8 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ categories = ["cryptography"]
anyhow = "1"
assert_matches = "1.5"
async-trait = "0.1"
axum = "0.6.19"
axum = "0.7.5"
backon = "0.4.4"
bigdecimal = "0.3.0"
bincode = "1"
Expand All @@ -111,16 +111,16 @@ envy = "0.4"
ethabi = "18.0.0"
flate2 = "1.0.28"
futures = "0.3"
google-cloud-auth = "0.13.0"
google-cloud-storage = "0.15.0"
google-cloud-auth = "0.16.0"
google-cloud-storage = "0.20.0"
governor = "0.4.2"
hex = "0.4"
http = "0.2.9"
hyper = "0.14.29"
http = "1.1"
hyper = "1.3"
iai = "0.1"
insta = "1.29.0"
itertools = "0.10"
jsonrpsee = { version = "0.21.0", default-features = false }
jsonrpsee = { version = "0.23", default-features = false }
lazy_static = "1.4"
leb128 = "0.2.5"
lru = { version = "0.12.1", default-features = false }
Expand All @@ -138,7 +138,7 @@ prost = "0.12.1"
rand = "0.8"
rayon = "1.3.1"
regex = "1"
reqwest = "0.11"
reqwest = "0.12"
rlp = "0.5"
rocksdb = "0.21.0"
rustc_version = "0.4.0"
Expand All @@ -165,7 +165,7 @@ tikv-jemallocator = "0.5"
tiny-keccak = "2"
tokio = "1"
tower = "0.4.13"
tower-http = "0.4.1"
tower-http = "0.5.2"
tracing = "0.1"
tracing-subscriber = "0.3"
tracing-opentelemetry = "0.21.0"
Expand Down
4 changes: 2 additions & 2 deletions core/lib/object_store/src/gcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl From<HttpError> for ObjectStoreError {
.status()
.map_or(false, |status| matches!(status, StatusCode::NOT_FOUND)),
HttpError::Response(response) => response.code == StatusCode::NOT_FOUND.as_u16(),
HttpError::TokenSource(_) => false,
_ => false,
};

if is_not_found {
Expand All @@ -145,7 +145,7 @@ impl From<HttpError> for ObjectStoreError {
has_transient_io_source(err)
|| get_source::<reqwest::Error>(err).is_some_and(is_transient_http_error)
}
HttpError::Response(_) => false,
_ => false,
};
ObjectStoreError::Other {
is_transient,
Expand Down
2 changes: 1 addition & 1 deletion core/lib/web3_decl/src/client/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ impl L2ClientMetrics {
let status = err
.downcast_ref::<transport::Error>()
.and_then(|err| match err {
transport::Error::RequestFailure { status_code } => Some(*status_code),
transport::Error::Rejected { status_code } => Some(*status_code),
_ => None,
});
let labels = HttpErrorLabels {
Expand Down
2 changes: 1 addition & 1 deletion core/lib/web3_decl/src/client/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ async fn wrapping_mock_client() {
Ok("slow")
})
.method("rate_limit", || {
let http_err = transport::Error::RequestFailure { status_code: 429 };
let http_err = transport::Error::Rejected { status_code: 429 };
Err::<(), _>(Error::Transport(http_err.into()))
})
.method("eth_getBlockNumber", || Ok(U64::from(1)))
Expand Down
7 changes: 4 additions & 3 deletions core/node/api_server/src/healthcheck.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ async fn run_server(
let app = Router::new()
.route("/health", get(check_health))
.with_state(app_health_check);

axum::Server::bind(bind_address)
.serve(app.into_make_service())
let listener = tokio::net::TcpListener::bind(bind_address)
.await
.unwrap_or_else(|err| panic!("Failed binding healthcheck server to {bind_address}: {err}"));
axum::serve(listener, app)
.with_graceful_shutdown(async move {
if stop_receiver.changed().await.is_err() {
tracing::warn!("Stop signal sender for healthcheck server was dropped without sending a signal");
Expand Down
13 changes: 5 additions & 8 deletions core/node/api_server/src/web3/backend_jsonrpsee/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@ use std::{cell::RefCell, mem, sync::Arc, time::Instant};

use thread_local::ThreadLocal;
use zksync_types::api;
use zksync_web3_decl::{
error::Web3Error,
jsonrpsee::{helpers::MethodResponseResult, MethodResponse},
};
use zksync_web3_decl::{error::Web3Error, jsonrpsee::MethodResponse};

#[cfg(test)]
use super::testonly::RecordedMethodCalls;
Expand Down Expand Up @@ -154,11 +151,11 @@ impl MethodCall<'_> {
self.is_completed = true;
let meta = &self.meta;
let params = &self.params;
match response.success_or_error {
MethodResponseResult::Success => {
API_METRICS.observe_response_size(meta.name, params, response.result.len());
match response.as_error_code() {
None => {
API_METRICS.observe_response_size(meta.name, params, response.as_result().len());
}
MethodResponseResult::Failed(error_code) => {
Some(error_code) => {
API_METRICS.observe_protocol_error(
meta.name,
params,
Expand Down
14 changes: 7 additions & 7 deletions core/node/api_server/src/web3/backend_jsonrpsee/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ mod tests {
use rand::{thread_rng, Rng};
use test_casing::{test_casing, Product};
use zksync_types::api;
use zksync_web3_decl::jsonrpsee::helpers::MethodResponseResult;
use zksync_web3_decl::jsonrpsee::{types::Id, ResponsePayload};

use super::*;

Expand Down Expand Up @@ -366,11 +366,11 @@ mod tests {
}
}

MethodResponse {
result: "{}".to_string(),
success_or_error: MethodResponseResult::Success,
is_subscription: false,
}
MethodResponse::response(
Id::Number(1),
ResponsePayload::success("{}".to_string()),
usize::MAX,
)
};

WithMethodCall::new(
Expand All @@ -394,7 +394,7 @@ mod tests {
assert_eq!(call.metadata.name, "test");
assert!(call.metadata.block_id.is_some());
assert_eq!(call.metadata.block_diff, Some(9));
assert!(call.response.is_success());
assert!(call.error_code.is_none());
}
}

Expand Down
6 changes: 3 additions & 3 deletions core/node/api_server/src/web3/backend_jsonrpsee/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@

use std::{mem, sync::Mutex};

use zksync_web3_decl::jsonrpsee::{helpers::MethodResponseResult, MethodResponse};
use zksync_web3_decl::jsonrpsee::MethodResponse;

use super::metadata::MethodMetadata;

#[derive(Debug, Clone)]
pub(crate) struct RecordedCall {
pub metadata: MethodMetadata,
pub response: MethodResponseResult,
pub error_code: Option<i32>,
}

/// Test-only JSON-RPC recorded of all calls passing through `MetadataMiddleware`.
Expand All @@ -24,7 +24,7 @@ impl RecordedMethodCalls {
.expect("recorded calls are poisoned")
.push(RecordedCall {
metadata: metadata.clone(),
response: response.success_or_error,
error_code: response.as_error_code(),
});
}

Expand Down
12 changes: 6 additions & 6 deletions core/node/api_server/src/web3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -546,8 +546,8 @@ impl ApiServer {
"Overriding max response size to {limit}B for sync method `{method_name}`"
);
let sync_method = sync_method.clone();
MethodCallback::Sync(Arc::new(move |id, params, _max_response_size| {
sync_method(id, params, limit)
MethodCallback::Sync(Arc::new(move |id, params, _max_response_size, ext| {
sync_method(id, params, limit, ext)
}))
}
(MethodCallback::Async(async_method), Some(limit)) => {
Expand All @@ -556,8 +556,8 @@ impl ApiServer {
);
let async_method = async_method.clone();
MethodCallback::Async(Arc::new(
move |id, params, connection_id, _max_response_size| {
async_method(id, params, connection_id, limit)
move |id, params, connection_id, _max_response_size, ext| {
async_method(id, params, connection_id, limit, ext)
},
))
}
Expand All @@ -567,8 +567,8 @@ impl ApiServer {
);
let unsub_method = unsub_method.clone();
MethodCallback::Unsubscription(Arc::new(
move |id, params, connection_id, _max_response_size| {
unsub_method(id, params, connection_id, limit)
move |id, params, connection_id, _max_response_size, ext| {
unsub_method(id, params, connection_id, limit, ext)
},
))
}
Expand Down
25 changes: 8 additions & 17 deletions core/node/api_server/src/web3/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,13 @@ const POLL_INTERVAL: Duration = Duration::from_millis(50);
async fn setting_response_size_limits() {
let mut rpc_module = RpcModule::new(());
rpc_module
.register_method("test_limited", |params, _ctx| {
.register_method("test_limited", |params, _ctx, _ext| {
let response_size: usize = params.one()?;
Ok::<_, ErrorObjectOwned>("!".repeat(response_size))
})
.unwrap();
rpc_module
.register_method("test_unlimited", |params, _ctx| {
.register_method("test_unlimited", |params, _ctx, _ext| {
let response_size: usize = params.one()?;
Ok::<_, ErrorObjectOwned>("!".repeat(response_size))
})
Expand Down Expand Up @@ -954,7 +954,7 @@ impl HttpTest for RpcCallsTracingTest {

let calls = self.tracer.recorded_calls().take();
assert_eq!(calls.len(), 1);
assert!(calls[0].response.is_success());
assert!(calls[0].error_code.is_none());
assert_eq!(calls[0].metadata.name, "eth_blockNumber");
assert_eq!(calls[0].metadata.block_id, None);
assert_eq!(calls[0].metadata.block_diff, None);
Expand All @@ -965,7 +965,7 @@ impl HttpTest for RpcCallsTracingTest {

let calls = self.tracer.recorded_calls().take();
assert_eq!(calls.len(), 1);
assert!(calls[0].response.is_success());
assert!(calls[0].error_code.is_none());
assert_eq!(calls[0].metadata.name, "eth_getBlockByNumber");
assert_eq!(
calls[0].metadata.block_id,
Expand All @@ -978,7 +978,7 @@ impl HttpTest for RpcCallsTracingTest {

let calls = self.tracer.recorded_calls().take();
assert_eq!(calls.len(), 1);
assert!(calls[0].response.is_success());
assert!(calls[0].error_code.is_none());
assert_eq!(calls[0].metadata.name, "eth_getBlockByNumber");
assert_eq!(
calls[0].metadata.block_id,
Expand All @@ -993,10 +993,7 @@ impl HttpTest for RpcCallsTracingTest {

let calls = self.tracer.recorded_calls().take();
assert_eq!(calls.len(), 1);
assert_eq!(
calls[0].response.as_error_code(),
Some(ErrorCode::MethodNotFound.code())
);
assert_eq!(calls[0].error_code, Some(ErrorCode::MethodNotFound.code()));
assert!(!calls[0].metadata.has_app_error);

ClientT::request::<serde_json::Value, _>(&client, "eth_getBlockByNumber", rpc_params![0])
Expand All @@ -1005,10 +1002,7 @@ impl HttpTest for RpcCallsTracingTest {

let calls = self.tracer.recorded_calls().take();
assert_eq!(calls.len(), 1);
assert_eq!(
calls[0].response.as_error_code(),
Some(ErrorCode::InvalidParams.code())
);
assert_eq!(calls[0].error_code, Some(ErrorCode::InvalidParams.code()));
assert!(!calls[0].metadata.has_app_error);

// Check app-level error.
Expand All @@ -1022,10 +1016,7 @@ impl HttpTest for RpcCallsTracingTest {

let calls = self.tracer.recorded_calls().take();
assert_eq!(calls.len(), 1);
assert_eq!(
calls[0].response.as_error_code(),
Some(ErrorCode::InvalidParams.code())
);
assert_eq!(calls[0].error_code, Some(ErrorCode::InvalidParams.code()));
assert!(calls[0].metadata.has_app_error);

// Check batch RPC request.
Expand Down
6 changes: 4 additions & 2 deletions core/node/contract_verification_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ pub async fn start_server(
let bind_address = config.bind_addr();
let api = RestApi::new(master_connection_pool, replica_connection_pool).into_router();

axum::Server::bind(&bind_address)
.serve(api.into_make_service())
let listener = tokio::net::TcpListener::bind(bind_address)
.await
.context("Cannot bind to the specified address")?;
axum::serve(listener, api)
.with_graceful_shutdown(async move {
if stop_receiver.changed().await.is_err() {
tracing::warn!("Stop signal sender for contract verification server was dropped without sending a signal");
Expand Down
14 changes: 8 additions & 6 deletions core/node/metadata_calculator/src/api_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ impl AsyncTreeReader {
Ok(Json(response))
}

fn create_api_server(
async fn create_api_server(
self,
bind_address: &SocketAddr,
mut stop_receiver: watch::Receiver<bool>,
Expand All @@ -355,10 +355,11 @@ impl AsyncTreeReader {
.route("/proofs", routing::post(Self::get_proofs_handler))
.with_state(self);

let server = axum::Server::try_bind(bind_address)
.with_context(|| format!("Failed binding Merkle tree API server to {bind_address}"))?
.serve(app.into_make_service());
let local_addr = server.local_addr();
let listener = tokio::net::TcpListener::bind(bind_address)
.await
.with_context(|| format!("Failed binding Merkle tree API server to {bind_address}"))?;
let local_addr = listener.local_addr()?;
let server = axum::serve(listener, app);
let server_future = async move {
server.with_graceful_shutdown(async move {
if stop_receiver.changed().await.is_err() {
Expand Down Expand Up @@ -387,7 +388,8 @@ impl AsyncTreeReader {
bind_address: SocketAddr,
stop_receiver: watch::Receiver<bool>,
) -> anyhow::Result<()> {
self.create_api_server(&bind_address, stop_receiver)?
self.create_api_server(&bind_address, stop_receiver)
.await?
.run()
.await
}
Expand Down
1 change: 1 addition & 0 deletions core/node/metadata_calculator/src/api_server/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ async fn merkle_tree_api() {
.await
.unwrap()
.create_api_server(&api_addr, stop_receiver.clone())
.await
.unwrap();
let local_addr = *api_server.local_addr();
let api_server_task = tokio::spawn(api_server.run());
Expand Down
6 changes: 4 additions & 2 deletions core/node/proof_data_handler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ pub async fn run_server(
tracing::debug!("Starting proof data handler server on {bind_address}");
let app = create_proof_processing_router(blob_store, connection_pool, config, commitment_mode);

axum::Server::bind(&bind_address)
.serve(app.into_make_service())
let listener = tokio::net::TcpListener::bind(bind_address)
.await
.with_context(|| format!("Failed binding proof data handler server to {bind_address}"))?;
axum::serve(listener, app)
.with_graceful_shutdown(async move {
if stop_receiver.changed().await.is_err() {
tracing::warn!("Stop signal sender for proof data handler server was dropped without sending a signal");
Expand Down
5 changes: 3 additions & 2 deletions core/node/proof_data_handler/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use axum::{
response::Response,
Router,
};
use hyper::body::HttpBody;
use serde_json::json;
use tower::ServiceExt;
use zksync_basic_types::U256;
Expand Down Expand Up @@ -107,7 +106,9 @@ async fn request_tee_proof_inputs() {

assert_eq!(response.status(), StatusCode::OK);

let body = response.into_body().collect().await.unwrap().to_bytes();
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
let json = json
.get("Success")
Expand Down
Loading
Loading