Skip to content

Commit

Permalink
disagg: Set client RPC timeout for FetchDisaggPages (#8821)
Browse files Browse the repository at this point in the history
close #8806
  • Loading branch information
JinheLin authored Mar 5, 2024
1 parent 0510585 commit def7eaa
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 3 deletions.
2 changes: 2 additions & 0 deletions dbms/src/Core/Defines.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ static constexpr UInt64 DEFAULT_DISAGG_TASK_BUILD_TIMEOUT_SEC = 60;
// It is now a short period to avoid long stale snapshots causing system
// instable.
static constexpr UInt64 DEFAULT_DISAGG_TASK_TIMEOUT_SEC = 5 * 60;
// Timeout for FetchDisaggPages in the TiFlash compute node.
static constexpr UInt64 DEFAULT_DISAGG_FETCH_PAGES_TIMEOUT_SEC = 30;

#define DEFAULT_DAG_RECORDS_PER_CHUNK 1024L
#define DEFAULT_BATCH_SEND_MIN_LIMIT (-1)
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ struct Settings
M(SettingUInt64, mpp_task_waiting_timeout, DEFAULT_MPP_TASK_WAITING_TIMEOUT, "mpp task max time that waiting first data block from source input stream.") \
M(SettingUInt64, disagg_build_task_timeout, DEFAULT_DISAGG_TASK_BUILD_TIMEOUT_SEC, "disagg task establish timeout, unit is second.") \
M(SettingUInt64, disagg_task_snapshot_timeout, DEFAULT_DISAGG_TASK_TIMEOUT_SEC, "disagg task snapshot max endurable time, unit is second.") \
M(SettingUInt64, disagg_fetch_pages_timeout, DEFAULT_DISAGG_FETCH_PAGES_TIMEOUT_SEC, "fetch disagg pages timeout for one segment, unit is second.") \
M(SettingInt64, safe_point_update_interval_seconds, 1, "The interval in seconds to update safe point from PD.") \
M(SettingUInt64, min_compress_block_size, DEFAULT_MIN_COMPRESS_BLOCK_SIZE, "The actual size of the block to compress, if the uncompressed data less than max_compress_block_size is no less than this value " \
"and no less than the volume of data for one mark.") \
Expand Down
9 changes: 7 additions & 2 deletions dbms/src/Storages/DeltaMerge/Remote/RNWorkerFetchPages.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,10 @@ void RNWorkerFetchPages::doFetchPages(
seg_task->meta.store_address);

grpc::ClientContext client_context;
// set timeout for the streaming call to avoid inf wait before `Finish()`
rpc.setClientContext(
client_context,
seg_task->meta.dm_context->db_context.getSettingsRef().disagg_fetch_pages_timeout);
auto stream_resp = rpc.call(&client_context, request);

SCOPE_EXIT({
Expand Down Expand Up @@ -317,9 +321,10 @@ void RNWorkerFetchPages::doFetchPages(
// Verify all pending pages are now received.
RUNTIME_CHECK_MSG(
remaining_pages_to_fetch.empty(),
"Failed to fetch all pages (from {}), remaining_pages_to_fetch={}",
"Failed to fetch all pages for {}, remaining_pages_to_fetch={}, wn_address={}",
seg_task->info(),
remaining_pages_to_fetch);
remaining_pages_to_fetch,
seg_task->meta.store_address);

LOG_DEBUG(
log,
Expand Down
8 changes: 7 additions & 1 deletion dbms/src/Storages/StorageDisaggregatedRemote.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ namespace ErrorCodes
{
extern const int DISAGG_ESTABLISH_RETRYABLE_ERROR;
extern const int TIMEOUT_EXCEEDED;
extern const int UNKNOWN_EXCEPTION;
} // namespace ErrorCodes

BlockInputStreams StorageDisaggregated::readThroughS3(const Context & db_context, unsigned num_streams)
Expand Down Expand Up @@ -227,7 +228,12 @@ void StorageDisaggregated::buildReadTaskForWriteNode(
req->address(),
log->identifier());
else if (!status.ok())
throw Exception(rpc.errMsg(status));
throw Exception(
ErrorCodes::UNKNOWN_EXCEPTION,
"EstablishDisaggTask failed, wn_address={}, errmsg={}, {}",
req->address(),
rpc.errMsg(status),
log->identifier());

const DM::DisaggTaskId snapshot_id(resp.snapshot_id());
LOG_DEBUG(
Expand Down

0 comments on commit def7eaa

Please sign in to comment.