Skip to content

Commit

Permalink
chore: Bump networking deps (#2349)
Browse files Browse the repository at this point in the history
## What ❔

Bumps networking deps: hyper, http, reqwest, axum, jsonrpsee, and a few
others.

## Why ❔

hyper is a grown boy now. It's time to use 1.0.

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [ ] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [ ] Tests for the changes have been added / updated.
- [ ] Documentation comments have been added / updated.
- [ ] Code has been formatted via `zk fmt` and `zk lint`.
  • Loading branch information
popzxc authored Jul 1, 2024
1 parent 76508c4 commit 7dabdbf
Show file tree
Hide file tree
Showing 19 changed files with 831 additions and 549 deletions.
652 changes: 386 additions & 266 deletions Cargo.lock

Large diffs are not rendered by default.

16 changes: 8 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ categories = ["cryptography"]
anyhow = "1"
assert_matches = "1.5"
async-trait = "0.1"
axum = "0.6.19"
axum = "0.7.5"
backon = "0.4.4"
bigdecimal = "0.3.0"
bincode = "1"
Expand All @@ -111,16 +111,16 @@ envy = "0.4"
ethabi = "18.0.0"
flate2 = "1.0.28"
futures = "0.3"
google-cloud-auth = "0.13.0"
google-cloud-storage = "0.15.0"
google-cloud-auth = "0.16.0"
google-cloud-storage = "0.20.0"
governor = "0.4.2"
hex = "0.4"
http = "0.2.9"
hyper = "0.14.29"
http = "1.1"
hyper = "1.3"
iai = "0.1"
insta = "1.29.0"
itertools = "0.10"
jsonrpsee = { version = "0.21.0", default-features = false }
jsonrpsee = { version = "0.23", default-features = false }
lazy_static = "1.4"
leb128 = "0.2.5"
lru = { version = "0.12.1", default-features = false }
Expand All @@ -138,7 +138,7 @@ prost = "0.12.1"
rand = "0.8"
rayon = "1.3.1"
regex = "1"
reqwest = "0.11"
reqwest = "0.12"
rlp = "0.5"
rocksdb = "0.21.0"
rustc_version = "0.4.0"
Expand All @@ -165,7 +165,7 @@ tikv-jemallocator = "0.5"
tiny-keccak = "2"
tokio = "1"
tower = "0.4.13"
tower-http = "0.4.1"
tower-http = "0.5.2"
tracing = "0.1"
tracing-subscriber = "0.3"
tracing-opentelemetry = "0.21.0"
Expand Down
4 changes: 2 additions & 2 deletions core/lib/object_store/src/gcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl From<HttpError> for ObjectStoreError {
.status()
.map_or(false, |status| matches!(status, StatusCode::NOT_FOUND)),
HttpError::Response(response) => response.code == StatusCode::NOT_FOUND.as_u16(),
HttpError::TokenSource(_) => false,
_ => false,
};

if is_not_found {
Expand All @@ -145,7 +145,7 @@ impl From<HttpError> for ObjectStoreError {
has_transient_io_source(err)
|| get_source::<reqwest::Error>(err).is_some_and(is_transient_http_error)
}
HttpError::Response(_) => false,
_ => false,
};
ObjectStoreError::Other {
is_transient,
Expand Down
2 changes: 1 addition & 1 deletion core/lib/web3_decl/src/client/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ impl L2ClientMetrics {
let status = err
.downcast_ref::<transport::Error>()
.and_then(|err| match err {
transport::Error::RequestFailure { status_code } => Some(*status_code),
transport::Error::Rejected { status_code } => Some(*status_code),
_ => None,
});
let labels = HttpErrorLabels {
Expand Down
2 changes: 1 addition & 1 deletion core/lib/web3_decl/src/client/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ async fn wrapping_mock_client() {
Ok("slow")
})
.method("rate_limit", || {
let http_err = transport::Error::RequestFailure { status_code: 429 };
let http_err = transport::Error::Rejected { status_code: 429 };
Err::<(), _>(Error::Transport(http_err.into()))
})
.method("eth_getBlockNumber", || Ok(U64::from(1)))
Expand Down
7 changes: 4 additions & 3 deletions core/node/api_server/src/healthcheck.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ async fn run_server(
let app = Router::new()
.route("/health", get(check_health))
.with_state(app_health_check);

axum::Server::bind(bind_address)
.serve(app.into_make_service())
let listener = tokio::net::TcpListener::bind(bind_address)
.await
.unwrap_or_else(|err| panic!("Failed binding healthcheck server to {bind_address}: {err}"));
axum::serve(listener, app)
.with_graceful_shutdown(async move {
if stop_receiver.changed().await.is_err() {
tracing::warn!("Stop signal sender for healthcheck server was dropped without sending a signal");
Expand Down
13 changes: 5 additions & 8 deletions core/node/api_server/src/web3/backend_jsonrpsee/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@ use std::{cell::RefCell, mem, sync::Arc, time::Instant};

use thread_local::ThreadLocal;
use zksync_types::api;
use zksync_web3_decl::{
error::Web3Error,
jsonrpsee::{helpers::MethodResponseResult, MethodResponse},
};
use zksync_web3_decl::{error::Web3Error, jsonrpsee::MethodResponse};

#[cfg(test)]
use super::testonly::RecordedMethodCalls;
Expand Down Expand Up @@ -154,11 +151,11 @@ impl MethodCall<'_> {
self.is_completed = true;
let meta = &self.meta;
let params = &self.params;
match response.success_or_error {
MethodResponseResult::Success => {
API_METRICS.observe_response_size(meta.name, params, response.result.len());
match response.as_error_code() {
None => {
API_METRICS.observe_response_size(meta.name, params, response.as_result().len());
}
MethodResponseResult::Failed(error_code) => {
Some(error_code) => {
API_METRICS.observe_protocol_error(
meta.name,
params,
Expand Down
14 changes: 7 additions & 7 deletions core/node/api_server/src/web3/backend_jsonrpsee/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ mod tests {
use rand::{thread_rng, Rng};
use test_casing::{test_casing, Product};
use zksync_types::api;
use zksync_web3_decl::jsonrpsee::helpers::MethodResponseResult;
use zksync_web3_decl::jsonrpsee::{types::Id, ResponsePayload};

use super::*;

Expand Down Expand Up @@ -366,11 +366,11 @@ mod tests {
}
}

MethodResponse {
result: "{}".to_string(),
success_or_error: MethodResponseResult::Success,
is_subscription: false,
}
MethodResponse::response(
Id::Number(1),
ResponsePayload::success("{}".to_string()),
usize::MAX,
)
};

WithMethodCall::new(
Expand All @@ -394,7 +394,7 @@ mod tests {
assert_eq!(call.metadata.name, "test");
assert!(call.metadata.block_id.is_some());
assert_eq!(call.metadata.block_diff, Some(9));
assert!(call.response.is_success());
assert!(call.error_code.is_none());
}
}

Expand Down
6 changes: 3 additions & 3 deletions core/node/api_server/src/web3/backend_jsonrpsee/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
use std::{mem, sync::Mutex};

use zksync_web3_decl::jsonrpsee::{helpers::MethodResponseResult, MethodResponse};
use zksync_web3_decl::jsonrpsee::MethodResponse;

use super::metadata::MethodMetadata;

#[derive(Debug, Clone)]
pub(crate) struct RecordedCall {
pub metadata: MethodMetadata,
pub response: MethodResponseResult,
pub error_code: Option<i32>,
}

/// Test-only JSON-RPC recorded of all calls passing through `MetadataMiddleware`.
Expand All @@ -24,7 +24,7 @@ impl RecordedMethodCalls {
.expect("recorded calls are poisoned")
.push(RecordedCall {
metadata: metadata.clone(),
response: response.success_or_error,
error_code: response.as_error_code(),
});
}

Expand Down
12 changes: 6 additions & 6 deletions core/node/api_server/src/web3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -546,8 +546,8 @@ impl ApiServer {
"Overriding max response size to {limit}B for sync method `{method_name}`"
);
let sync_method = sync_method.clone();
MethodCallback::Sync(Arc::new(move |id, params, _max_response_size| {
sync_method(id, params, limit)
MethodCallback::Sync(Arc::new(move |id, params, _max_response_size, ext| {
sync_method(id, params, limit, ext)
}))
}
(MethodCallback::Async(async_method), Some(limit)) => {
Expand All @@ -556,8 +556,8 @@ impl ApiServer {
);
let async_method = async_method.clone();
MethodCallback::Async(Arc::new(
move |id, params, connection_id, _max_response_size| {
async_method(id, params, connection_id, limit)
move |id, params, connection_id, _max_response_size, ext| {
async_method(id, params, connection_id, limit, ext)
},
))
}
Expand All @@ -567,8 +567,8 @@ impl ApiServer {
);
let unsub_method = unsub_method.clone();
MethodCallback::Unsubscription(Arc::new(
move |id, params, connection_id, _max_response_size| {
unsub_method(id, params, connection_id, limit)
move |id, params, connection_id, _max_response_size, ext| {
unsub_method(id, params, connection_id, limit, ext)
},
))
}
Expand Down
25 changes: 8 additions & 17 deletions core/node/api_server/src/web3/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,13 @@ const POLL_INTERVAL: Duration = Duration::from_millis(50);
async fn setting_response_size_limits() {
let mut rpc_module = RpcModule::new(());
rpc_module
.register_method("test_limited", |params, _ctx| {
.register_method("test_limited", |params, _ctx, _ext| {
let response_size: usize = params.one()?;
Ok::<_, ErrorObjectOwned>("!".repeat(response_size))
})
.unwrap();
rpc_module
.register_method("test_unlimited", |params, _ctx| {
.register_method("test_unlimited", |params, _ctx, _ext| {
let response_size: usize = params.one()?;
Ok::<_, ErrorObjectOwned>("!".repeat(response_size))
})
Expand Down Expand Up @@ -954,7 +954,7 @@ impl HttpTest for RpcCallsTracingTest {

let calls = self.tracer.recorded_calls().take();
assert_eq!(calls.len(), 1);
assert!(calls[0].response.is_success());
assert!(calls[0].error_code.is_none());
assert_eq!(calls[0].metadata.name, "eth_blockNumber");
assert_eq!(calls[0].metadata.block_id, None);
assert_eq!(calls[0].metadata.block_diff, None);
Expand All @@ -965,7 +965,7 @@ impl HttpTest for RpcCallsTracingTest {

let calls = self.tracer.recorded_calls().take();
assert_eq!(calls.len(), 1);
assert!(calls[0].response.is_success());
assert!(calls[0].error_code.is_none());
assert_eq!(calls[0].metadata.name, "eth_getBlockByNumber");
assert_eq!(
calls[0].metadata.block_id,
Expand All @@ -978,7 +978,7 @@ impl HttpTest for RpcCallsTracingTest {

let calls = self.tracer.recorded_calls().take();
assert_eq!(calls.len(), 1);
assert!(calls[0].response.is_success());
assert!(calls[0].error_code.is_none());
assert_eq!(calls[0].metadata.name, "eth_getBlockByNumber");
assert_eq!(
calls[0].metadata.block_id,
Expand All @@ -993,10 +993,7 @@ impl HttpTest for RpcCallsTracingTest {

let calls = self.tracer.recorded_calls().take();
assert_eq!(calls.len(), 1);
assert_eq!(
calls[0].response.as_error_code(),
Some(ErrorCode::MethodNotFound.code())
);
assert_eq!(calls[0].error_code, Some(ErrorCode::MethodNotFound.code()));
assert!(!calls[0].metadata.has_app_error);

ClientT::request::<serde_json::Value, _>(&client, "eth_getBlockByNumber", rpc_params![0])
Expand All @@ -1005,10 +1002,7 @@ impl HttpTest for RpcCallsTracingTest {

let calls = self.tracer.recorded_calls().take();
assert_eq!(calls.len(), 1);
assert_eq!(
calls[0].response.as_error_code(),
Some(ErrorCode::InvalidParams.code())
);
assert_eq!(calls[0].error_code, Some(ErrorCode::InvalidParams.code()));
assert!(!calls[0].metadata.has_app_error);

// Check app-level error.
Expand All @@ -1022,10 +1016,7 @@ impl HttpTest for RpcCallsTracingTest {

let calls = self.tracer.recorded_calls().take();
assert_eq!(calls.len(), 1);
assert_eq!(
calls[0].response.as_error_code(),
Some(ErrorCode::InvalidParams.code())
);
assert_eq!(calls[0].error_code, Some(ErrorCode::InvalidParams.code()));
assert!(calls[0].metadata.has_app_error);

// Check batch RPC request.
Expand Down
6 changes: 4 additions & 2 deletions core/node/contract_verification_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ pub async fn start_server(
let bind_address = config.bind_addr();
let api = RestApi::new(master_connection_pool, replica_connection_pool).into_router();

axum::Server::bind(&bind_address)
.serve(api.into_make_service())
let listener = tokio::net::TcpListener::bind(bind_address)
.await
.context("Cannot bind to the specified address")?;
axum::serve(listener, api)
.with_graceful_shutdown(async move {
if stop_receiver.changed().await.is_err() {
tracing::warn!("Stop signal sender for contract verification server was dropped without sending a signal");
Expand Down
14 changes: 8 additions & 6 deletions core/node/metadata_calculator/src/api_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ impl AsyncTreeReader {
Ok(Json(response))
}

fn create_api_server(
async fn create_api_server(
self,
bind_address: &SocketAddr,
mut stop_receiver: watch::Receiver<bool>,
Expand All @@ -355,10 +355,11 @@ impl AsyncTreeReader {
.route("/proofs", routing::post(Self::get_proofs_handler))
.with_state(self);

let server = axum::Server::try_bind(bind_address)
.with_context(|| format!("Failed binding Merkle tree API server to {bind_address}"))?
.serve(app.into_make_service());
let local_addr = server.local_addr();
let listener = tokio::net::TcpListener::bind(bind_address)
.await
.with_context(|| format!("Failed binding Merkle tree API server to {bind_address}"))?;
let local_addr = listener.local_addr()?;
let server = axum::serve(listener, app);
let server_future = async move {
server.with_graceful_shutdown(async move {
if stop_receiver.changed().await.is_err() {
Expand Down Expand Up @@ -387,7 +388,8 @@ impl AsyncTreeReader {
bind_address: SocketAddr,
stop_receiver: watch::Receiver<bool>,
) -> anyhow::Result<()> {
self.create_api_server(&bind_address, stop_receiver)?
self.create_api_server(&bind_address, stop_receiver)
.await?
.run()
.await
}
Expand Down
1 change: 1 addition & 0 deletions core/node/metadata_calculator/src/api_server/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ async fn merkle_tree_api() {
.await
.unwrap()
.create_api_server(&api_addr, stop_receiver.clone())
.await
.unwrap();
let local_addr = *api_server.local_addr();
let api_server_task = tokio::spawn(api_server.run());
Expand Down
6 changes: 4 additions & 2 deletions core/node/proof_data_handler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ pub async fn run_server(
tracing::debug!("Starting proof data handler server on {bind_address}");
let app = create_proof_processing_router(blob_store, connection_pool, config, commitment_mode);

axum::Server::bind(&bind_address)
.serve(app.into_make_service())
let listener = tokio::net::TcpListener::bind(bind_address)
.await
.with_context(|| format!("Failed binding proof data handler server to {bind_address}"))?;
axum::serve(listener, app)
.with_graceful_shutdown(async move {
if stop_receiver.changed().await.is_err() {
tracing::warn!("Stop signal sender for proof data handler server was dropped without sending a signal");
Expand Down
5 changes: 3 additions & 2 deletions core/node/proof_data_handler/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use axum::{
response::Response,
Router,
};
use hyper::body::HttpBody;
use serde_json::json;
use tower::ServiceExt;
use zksync_basic_types::U256;
Expand Down Expand Up @@ -107,7 +106,9 @@ async fn request_tee_proof_inputs() {

assert_eq!(response.status(), StatusCode::OK);

let body = response.into_body().collect().await.unwrap().to_bytes();
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
let json = json
.get("Success")
Expand Down
Loading

0 comments on commit 7dabdbf

Please sign in to comment.