Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry pick 8f56310fae4e1ad9d262b33e05dbcd5393f0e67c and aba6f6f4433d1f29986ec2c190e8c50628748f1a into 1.8 #11052

Merged
merged 3 commits into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
259 changes: 174 additions & 85 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ parking_lot = "0.12.0"
paste = "1.0.7"
percent-encoding = "2.1.0"
pin-project = "1.0.10"
poem = { version = "=1.3.55", features = ["anyhow", "rustls"] }
poem = { version = "=1.3.59", features = ["anyhow", "rustls"] }
poem-openapi = { version = "=2.0.11", features = ["swagger-ui", "url"] }
poem-openapi-derive = "=2.0.11"
pprof = { version = "0.11", features = ["flamegraph", "protobuf-codec"] }
Expand Down
2 changes: 1 addition & 1 deletion api/src/accept_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use poem::{web::Accept, FromRequest, Request, RequestBody, Result};
/// Accept types from input headers
///
/// Determines the output type of each API
#[derive(PartialEq, Eq, Debug)]
#[derive(Clone, PartialEq, Eq, Debug)]
pub enum AcceptType {
/// Convert and resolve types to JSON
Json,
Expand Down
59 changes: 34 additions & 25 deletions api/src/accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

use crate::{
accept_type::AcceptType,
context::Context,
context::{api_spawn_blocking, Context},
failpoint::fail_point_poem,
page::determine_limit,
response::{
Expand Down Expand Up @@ -66,14 +66,13 @@ impl AccountsApi {
fail_point_poem("endpoint_get_account")?;
self.context
.check_api_output_enabled("Get account", &accept_type)?;
let account = Account::new(
self.context.clone(),
address.0,
ledger_version.0,
None,
None,
)?;
account.account(&accept_type)

let context = self.context.clone();
api_spawn_blocking(move || {
let account = Account::new(context, address.0, ledger_version.0, None, None)?;
account.account(&accept_type)
})
.await
}

/// Get account resources
Expand Down Expand Up @@ -113,14 +112,19 @@ impl AccountsApi {
fail_point_poem("endpoint_get_account_resources")?;
self.context
.check_api_output_enabled("Get account resources", &accept_type)?;
let account = Account::new(
self.context.clone(),
address.0,
ledger_version.0,
start.0.map(StateKey::from),
limit.0,
)?;
account.resources(&accept_type)

let context = self.context.clone();
api_spawn_blocking(move || {
let account = Account::new(
context,
address.0,
ledger_version.0,
start.0.map(StateKey::from),
limit.0,
)?;
account.resources(&accept_type)
})
.await
}

/// Get account modules
Expand Down Expand Up @@ -160,14 +164,19 @@ impl AccountsApi {
fail_point_poem("endpoint_get_account_modules")?;
self.context
.check_api_output_enabled("Get account modules", &accept_type)?;
let account = Account::new(
self.context.clone(),
address.0,
ledger_version.0,
start.0.map(StateKey::from),
limit.0,
)?;
account.modules(&accept_type)

let context = self.context.clone();
api_spawn_blocking(move || {
let account = Account::new(
context,
address.0,
ledger_version.0,
start.0.map(StateKey::from),
limit.0,
)?;
account.modules(&accept_type)
})
.await
}
}

Expand Down
5 changes: 3 additions & 2 deletions api/src/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use crate::{
accept_type::AcceptType,
context::Context,
context::{api_spawn_blocking, Context},
generate_error_response, generate_success_response,
response::{InternalError, ServiceUnavailableError},
ApiTags,
Expand Down Expand Up @@ -83,7 +83,8 @@ impl BasicApi {
/// If not provided, the healthcheck will always succeed
duration_secs: Query<Option<u32>>,
) -> HealthCheckResult<HealthCheckSuccess> {
let ledger_info = self.context.get_latest_ledger_info()?;
let context = self.context.clone();
let ledger_info = api_spawn_blocking(move || context.get_latest_ledger_info()).await?;

// If we have a duration, check that it's close to the current time, otherwise it's ok
if let Some(duration) = duration_secs.0 {
Expand Down
31 changes: 20 additions & 11 deletions api/src/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use crate::{
accept_type::AcceptType,
context::Context,
context::{api_spawn_blocking, Context},
failpoint::fail_point_poem,
response::{BasicResponse, BasicResponseStatus, BasicResultWith404},
ApiTags,
Expand All @@ -16,6 +16,7 @@ use poem_openapi::{
use std::sync::Arc;

/// API for block transactions and information
#[derive(Clone)]
pub struct BlocksApi {
pub context: Arc<Context>,
}
Expand Down Expand Up @@ -51,11 +52,15 @@ impl BlocksApi {
fail_point_poem("endpoint_get_block_by_height")?;
self.context
.check_api_output_enabled("Get block by height", &accept_type)?;
self.get_by_height(
accept_type,
block_height.0,
with_transactions.0.unwrap_or_default(),
)
let api = self.clone();
api_spawn_blocking(move || {
api.get_by_height(
accept_type,
block_height.0,
with_transactions.0.unwrap_or_default(),
)
})
.await
}

/// Get blocks by version
Expand Down Expand Up @@ -87,11 +92,15 @@ impl BlocksApi {
fail_point_poem("endpoint_get_block_by_version")?;
self.context
.check_api_output_enabled("Get block by version", &accept_type)?;
self.get_by_version(
accept_type,
version.0,
with_transactions.0.unwrap_or_default(),
)
let api = self.clone();
api_spawn_blocking(move || {
api.get_by_version(
accept_type,
version.0,
with_transactions.0.unwrap_or_default(),
)
})
.await
}
}

Expand Down
13 changes: 13 additions & 0 deletions api/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1280,3 +1280,16 @@ pub struct GasLimitCache {
last_updated_epoch: Option<u64>,
block_gas_limit: Option<u64>,
}

/// This function just calls tokio::task::spawn_blocking with the given closure and in
/// the case of an error when joining the task converts it into a 500.
pub async fn api_spawn_blocking<F, T, E>(func: F) -> Result<T, E>
where
F: FnOnce() -> Result<T, E> + Send + 'static,
T: Send + 'static,
E: InternalError + Send + 'static,
{
tokio::task::spawn_blocking(func)
.await
.map_err(|err| E::internal_with_code_no_info(err, AptosErrorCode::InternalError))?
}
34 changes: 22 additions & 12 deletions api/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use crate::{
accept_type::AcceptType,
accounts::Account,
context::Context,
context::{api_spawn_blocking, Context},
failpoint::fail_point_poem,
page::Page,
response::{
Expand All @@ -27,6 +27,7 @@ use poem_openapi::{
};
use std::sync::Arc;

#[derive(Clone)]
pub struct EventsApi {
pub context: Arc<Context>,
}
Expand Down Expand Up @@ -75,14 +76,18 @@ impl EventsApi {
);

// Ensure that account exists
let account = Account::new(self.context.clone(), address.0, None, None, None)?;
account.verify_account_or_object_resource()?;
self.list(
account.latest_ledger_info,
accept_type,
page,
EventKey::new(creation_number.0 .0, address.0.into()),
)
let api = self.clone();
api_spawn_blocking(move || {
let account = Account::new(api.context.clone(), address.0, None, None, None)?;
account.verify_account_or_object_resource()?;
api.list(
account.latest_ledger_info,
accept_type,
page,
EventKey::new(creation_number.0 .0, address.0.into()),
)
})
.await
}

/// Get events by event handle
Expand Down Expand Up @@ -137,9 +142,14 @@ impl EventsApi {
limit.0,
self.context.max_events_page_size(),
);
let account = Account::new(self.context.clone(), address.0, None, None, None)?;
let key = account.find_event_key(event_handle.0, field_name.0.into())?;
self.list(account.latest_ledger_info, accept_type, page, key)

let api = self.clone();
api_spawn_blocking(move || {
let account = Account::new(api.context.clone(), address.0, None, None, None)?;
let key = account.find_event_key(event_handle.0, field_name.0.into())?;
api.list(account.latest_ledger_info, accept_type, page, key)
})
.await
}
}

Expand Down
7 changes: 4 additions & 3 deletions api/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

use crate::{
accept_type::AcceptType,
context::Context,
context::{api_spawn_blocking, Context},
response::{BasicResponse, BasicResponseStatus, BasicResult},
ApiTags,
};
Expand Down Expand Up @@ -36,7 +36,7 @@ impl IndexApi {

let node_role = self.context.node_role();

match accept_type {
api_spawn_blocking(move || match accept_type {
AcceptType::Json => {
let index_response = IndexResponse::new(
ledger_info.clone(),
Expand All @@ -53,6 +53,7 @@ impl IndexApi {
let index_response = IndexResponseBcs::new(ledger_info.clone(), node_role);
BasicResponse::try_from_bcs((index_response, &ledger_info, BasicResponseStatus::Ok))
},
}
})
.await
}
}
60 changes: 40 additions & 20 deletions api/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use crate::{
accept_type::AcceptType,
context::api_spawn_blocking,
failpoint::fail_point_poem,
response::{
api_forbidden, build_not_found, module_not_found, resource_not_found, table_item_not_found,
Expand Down Expand Up @@ -35,6 +36,7 @@ use poem_openapi::{
use std::{convert::TryInto, sync::Arc};

/// API for retrieving individual state
#[derive(Clone)]
pub struct StateApi {
pub context: Arc<Context>,
}
Expand Down Expand Up @@ -76,12 +78,17 @@ impl StateApi {
fail_point_poem("endpoint_get_account_resource")?;
self.context
.check_api_output_enabled("Get account resource", &accept_type)?;
self.resource(
&accept_type,
address.0,
resource_type.0,
ledger_version.0.map(|inner| inner.0),
)

let api = self.clone();
api_spawn_blocking(move || {
api.resource(
&accept_type,
address.0,
resource_type.0,
ledger_version.0.map(|inner| inner.0),
)
})
.await
}

/// Get account module
Expand Down Expand Up @@ -117,7 +124,11 @@ impl StateApi {
fail_point_poem("endpoint_get_account_module")?;
self.context
.check_api_output_enabled("Get account module", &accept_type)?;
self.module(&accept_type, address.0, module_name.0, ledger_version.0)
let api = self.clone();
api_spawn_blocking(move || {
api.module(&accept_type, address.0, module_name.0, ledger_version.0)
})
.await
}

/// Get table item
Expand Down Expand Up @@ -160,12 +171,16 @@ impl StateApi {
fail_point_poem("endpoint_get_table_item")?;
self.context
.check_api_output_enabled("Get table item", &accept_type)?;
self.table_item(
&accept_type,
table_handle.0,
table_item_request.0,
ledger_version.0,
)
let api = self.clone();
api_spawn_blocking(move || {
api.table_item(
&accept_type,
table_handle.0,
table_item_request.0,
ledger_version.0,
)
})
.await
}

/// Get raw table item
Expand Down Expand Up @@ -207,12 +222,16 @@ impl StateApi {
self.context
.check_api_output_enabled("Get raw table item", &accept_type)?;

self.raw_table_item(
&accept_type,
table_handle.0,
table_item_request.0,
ledger_version.0,
)
let api = self.clone();
api_spawn_blocking(move || {
api.raw_table_item(
&accept_type,
table_handle.0,
table_item_request.0,
ledger_version.0,
)
})
.await
}

/// Get raw state value.
Expand Down Expand Up @@ -250,7 +269,8 @@ impl StateApi {
self.context
.check_api_output_enabled("Get raw state value", &accept_type)?;

self.raw_value(&accept_type, request.0, ledger_version.0)
let api = self.clone();
api_spawn_blocking(move || api.raw_value(&accept_type, request.0, ledger_version.0)).await
}
}

Expand Down
Loading
Loading