Skip to content

Commit

Permalink
[Node API] Use spawn_blocking for synchronous work (#10643)
Browse files Browse the repository at this point in the history
  • Loading branch information
banool authored Oct 24, 2023
1 parent 9bf597e commit 8f56310
Show file tree
Hide file tree
Showing 10 changed files with 361 additions and 265 deletions.
2 changes: 1 addition & 1 deletion api/src/accept_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use poem::{web::Accept, FromRequest, Request, RequestBody, Result};
/// Accept types from input headers
///
/// Determines the output type of each API
#[derive(PartialEq, Eq, Debug)]
#[derive(Clone, PartialEq, Eq, Debug)]
pub enum AcceptType {
/// Convert and resolve types to JSON
Json,
Expand Down
59 changes: 34 additions & 25 deletions api/src/accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

use crate::{
accept_type::AcceptType,
context::Context,
context::{api_spawn_blocking, Context},
failpoint::fail_point_poem,
page::determine_limit,
response::{
Expand Down Expand Up @@ -66,14 +66,13 @@ impl AccountsApi {
fail_point_poem("endpoint_get_account")?;
self.context
.check_api_output_enabled("Get account", &accept_type)?;
let account = Account::new(
self.context.clone(),
address.0,
ledger_version.0,
None,
None,
)?;
account.account(&accept_type)

let context = self.context.clone();
api_spawn_blocking(move || {
let account = Account::new(context, address.0, ledger_version.0, None, None)?;
account.account(&accept_type)
})
.await
}

/// Get account resources
Expand Down Expand Up @@ -113,14 +112,19 @@ impl AccountsApi {
fail_point_poem("endpoint_get_account_resources")?;
self.context
.check_api_output_enabled("Get account resources", &accept_type)?;
let account = Account::new(
self.context.clone(),
address.0,
ledger_version.0,
start.0.map(StateKey::from),
limit.0,
)?;
account.resources(&accept_type)

let context = self.context.clone();
api_spawn_blocking(move || {
let account = Account::new(
context,
address.0,
ledger_version.0,
start.0.map(StateKey::from),
limit.0,
)?;
account.resources(&accept_type)
})
.await
}

/// Get account modules
Expand Down Expand Up @@ -160,14 +164,19 @@ impl AccountsApi {
fail_point_poem("endpoint_get_account_modules")?;
self.context
.check_api_output_enabled("Get account modules", &accept_type)?;
let account = Account::new(
self.context.clone(),
address.0,
ledger_version.0,
start.0.map(StateKey::from),
limit.0,
)?;
account.modules(&accept_type)

let context = self.context.clone();
api_spawn_blocking(move || {
let account = Account::new(
context,
address.0,
ledger_version.0,
start.0.map(StateKey::from),
limit.0,
)?;
account.modules(&accept_type)
})
.await
}
}

Expand Down
5 changes: 3 additions & 2 deletions api/src/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use crate::{
accept_type::AcceptType,
context::Context,
context::{api_spawn_blocking, Context},
generate_error_response, generate_success_response,
response::{InternalError, ServiceUnavailableError},
ApiTags,
Expand Down Expand Up @@ -83,7 +83,8 @@ impl BasicApi {
/// If not provided, the healthcheck will always succeed
duration_secs: Query<Option<u32>>,
) -> HealthCheckResult<HealthCheckSuccess> {
let ledger_info = self.context.get_latest_ledger_info()?;
let context = self.context.clone();
let ledger_info = api_spawn_blocking(move || context.get_latest_ledger_info()).await?;

// If we have a duration, check that it's close to the current time, otherwise it's ok
if let Some(duration) = duration_secs.0 {
Expand Down
31 changes: 20 additions & 11 deletions api/src/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use crate::{
accept_type::AcceptType,
context::Context,
context::{api_spawn_blocking, Context},
failpoint::fail_point_poem,
response::{BasicResponse, BasicResponseStatus, BasicResultWith404},
ApiTags,
Expand All @@ -16,6 +16,7 @@ use poem_openapi::{
use std::sync::Arc;

/// API for block transactions and information
#[derive(Clone)]
pub struct BlocksApi {
pub context: Arc<Context>,
}
Expand Down Expand Up @@ -51,11 +52,15 @@ impl BlocksApi {
fail_point_poem("endpoint_get_block_by_height")?;
self.context
.check_api_output_enabled("Get block by height", &accept_type)?;
self.get_by_height(
accept_type,
block_height.0,
with_transactions.0.unwrap_or_default(),
)
let api = self.clone();
api_spawn_blocking(move || {
api.get_by_height(
accept_type,
block_height.0,
with_transactions.0.unwrap_or_default(),
)
})
.await
}

/// Get blocks by version
Expand Down Expand Up @@ -87,11 +92,15 @@ impl BlocksApi {
fail_point_poem("endpoint_get_block_by_version")?;
self.context
.check_api_output_enabled("Get block by version", &accept_type)?;
self.get_by_version(
accept_type,
version.0,
with_transactions.0.unwrap_or_default(),
)
let api = self.clone();
api_spawn_blocking(move || {
api.get_by_version(
accept_type,
version.0,
with_transactions.0.unwrap_or_default(),
)
})
.await
}
}

Expand Down
13 changes: 13 additions & 0 deletions api/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1297,3 +1297,16 @@ pub struct GasLimitCache {
last_updated_epoch: Option<u64>,
block_gas_limit: Option<u64>,
}

/// This function just calls tokio::task::spawn_blocking with the given closure and in
/// the case of an error when joining the task converts it into a 500.
pub async fn api_spawn_blocking<F, T, E>(func: F) -> Result<T, E>
where
F: FnOnce() -> Result<T, E> + Send + 'static,
T: Send + 'static,
E: InternalError + Send + 'static,
{
tokio::task::spawn_blocking(func)
.await
.map_err(|err| E::internal_with_code_no_info(err, AptosErrorCode::InternalError))?
}
34 changes: 22 additions & 12 deletions api/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use crate::{
accept_type::AcceptType,
accounts::Account,
context::Context,
context::{api_spawn_blocking, Context},
failpoint::fail_point_poem,
page::Page,
response::{
Expand All @@ -27,6 +27,7 @@ use poem_openapi::{
};
use std::sync::Arc;

#[derive(Clone)]
pub struct EventsApi {
pub context: Arc<Context>,
}
Expand Down Expand Up @@ -75,14 +76,18 @@ impl EventsApi {
);

// Ensure that account exists
let account = Account::new(self.context.clone(), address.0, None, None, None)?;
account.verify_account_or_object_resource()?;
self.list(
account.latest_ledger_info,
accept_type,
page,
EventKey::new(creation_number.0 .0, address.0.into()),
)
let api = self.clone();
api_spawn_blocking(move || {
let account = Account::new(api.context.clone(), address.0, None, None, None)?;
account.verify_account_or_object_resource()?;
api.list(
account.latest_ledger_info,
accept_type,
page,
EventKey::new(creation_number.0 .0, address.0.into()),
)
})
.await
}

/// Get events by event handle
Expand Down Expand Up @@ -137,9 +142,14 @@ impl EventsApi {
limit.0,
self.context.max_events_page_size(),
);
let account = Account::new(self.context.clone(), address.0, None, None, None)?;
let key = account.find_event_key(event_handle.0, field_name.0.into())?;
self.list(account.latest_ledger_info, accept_type, page, key)

let api = self.clone();
api_spawn_blocking(move || {
let account = Account::new(api.context.clone(), address.0, None, None, None)?;
let key = account.find_event_key(event_handle.0, field_name.0.into())?;
api.list(account.latest_ledger_info, accept_type, page, key)
})
.await
}
}

Expand Down
7 changes: 4 additions & 3 deletions api/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

use crate::{
accept_type::AcceptType,
context::Context,
context::{api_spawn_blocking, Context},
response::{BasicResponse, BasicResponseStatus, BasicResult},
ApiTags,
};
Expand Down Expand Up @@ -36,7 +36,7 @@ impl IndexApi {

let node_role = self.context.node_role();

match accept_type {
api_spawn_blocking(move || match accept_type {
AcceptType::Json => {
let index_response = IndexResponse::new(
ledger_info.clone(),
Expand All @@ -53,6 +53,7 @@ impl IndexApi {
let index_response = IndexResponseBcs::new(ledger_info.clone(), node_role);
BasicResponse::try_from_bcs((index_response, &ledger_info, BasicResponseStatus::Ok))
},
}
})
.await
}
}
60 changes: 40 additions & 20 deletions api/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use crate::{
accept_type::AcceptType,
context::api_spawn_blocking,
failpoint::fail_point_poem,
response::{
api_forbidden, build_not_found, module_not_found, resource_not_found, table_item_not_found,
Expand Down Expand Up @@ -35,6 +36,7 @@ use poem_openapi::{
use std::{convert::TryInto, sync::Arc};

/// API for retrieving individual state
#[derive(Clone)]
pub struct StateApi {
pub context: Arc<Context>,
}
Expand Down Expand Up @@ -76,12 +78,17 @@ impl StateApi {
fail_point_poem("endpoint_get_account_resource")?;
self.context
.check_api_output_enabled("Get account resource", &accept_type)?;
self.resource(
&accept_type,
address.0,
resource_type.0,
ledger_version.0.map(|inner| inner.0),
)

let api = self.clone();
api_spawn_blocking(move || {
api.resource(
&accept_type,
address.0,
resource_type.0,
ledger_version.0.map(|inner| inner.0),
)
})
.await
}

/// Get account module
Expand Down Expand Up @@ -117,7 +124,11 @@ impl StateApi {
fail_point_poem("endpoint_get_account_module")?;
self.context
.check_api_output_enabled("Get account module", &accept_type)?;
self.module(&accept_type, address.0, module_name.0, ledger_version.0)
let api = self.clone();
api_spawn_blocking(move || {
api.module(&accept_type, address.0, module_name.0, ledger_version.0)
})
.await
}

/// Get table item
Expand Down Expand Up @@ -160,12 +171,16 @@ impl StateApi {
fail_point_poem("endpoint_get_table_item")?;
self.context
.check_api_output_enabled("Get table item", &accept_type)?;
self.table_item(
&accept_type,
table_handle.0,
table_item_request.0,
ledger_version.0,
)
let api = self.clone();
api_spawn_blocking(move || {
api.table_item(
&accept_type,
table_handle.0,
table_item_request.0,
ledger_version.0,
)
})
.await
}

/// Get raw table item
Expand Down Expand Up @@ -207,12 +222,16 @@ impl StateApi {
self.context
.check_api_output_enabled("Get raw table item", &accept_type)?;

self.raw_table_item(
&accept_type,
table_handle.0,
table_item_request.0,
ledger_version.0,
)
let api = self.clone();
api_spawn_blocking(move || {
api.raw_table_item(
&accept_type,
table_handle.0,
table_item_request.0,
ledger_version.0,
)
})
.await
}

/// Get raw state value.
Expand Down Expand Up @@ -250,7 +269,8 @@ impl StateApi {
self.context
.check_api_output_enabled("Get raw state value", &accept_type)?;

self.raw_value(&accept_type, request.0, ledger_version.0)
let api = self.clone();
api_spawn_blocking(move || api.raw_value(&accept_type, request.0, ledger_version.0)).await
}
}

Expand Down
Loading

0 comments on commit 8f56310

Please sign in to comment.