Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disagg: Set client RPC timeout for FetchDisaggPages (release-7.5) #8821

Merged
merged 1 commit into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dbms/src/Core/Defines.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ static constexpr UInt64 DEFAULT_DISAGG_TASK_BUILD_TIMEOUT_SEC = 60;
// It is now a short period to avoid long stale snapshots causing system
// instable.
static constexpr UInt64 DEFAULT_DISAGG_TASK_TIMEOUT_SEC = 5 * 60;
// Timeout for FetchDisaggPages in the TiFlash compute node.
static constexpr UInt64 DEFAULT_DISAGG_FETCH_PAGES_TIMEOUT_SEC = 30;

#define DEFAULT_DAG_RECORDS_PER_CHUNK 1024L
#define DEFAULT_BATCH_SEND_MIN_LIMIT (-1)
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ struct Settings
M(SettingUInt64, mpp_task_waiting_timeout, DEFAULT_MPP_TASK_WAITING_TIMEOUT, "mpp task max time that waiting first data block from source input stream.") \
M(SettingUInt64, disagg_build_task_timeout, DEFAULT_DISAGG_TASK_BUILD_TIMEOUT_SEC, "disagg task establish timeout, unit is second.") \
M(SettingUInt64, disagg_task_snapshot_timeout, DEFAULT_DISAGG_TASK_TIMEOUT_SEC, "disagg task snapshot max endurable time, unit is second.") \
M(SettingUInt64, disagg_fetch_pages_timeout, DEFAULT_DISAGG_FETCH_PAGES_TIMEOUT_SEC, "fetch disagg pages timeout for one segment, unit is second.") \
M(SettingInt64, safe_point_update_interval_seconds, 1, "The interval in seconds to update safe point from PD.") \
M(SettingUInt64, min_compress_block_size, DEFAULT_MIN_COMPRESS_BLOCK_SIZE, "The actual size of the block to compress, if the uncompressed data less than max_compress_block_size is no less than this value " \
"and no less than the volume of data for one mark.") \
Expand Down
9 changes: 7 additions & 2 deletions dbms/src/Storages/DeltaMerge/Remote/RNWorkerFetchPages.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,10 @@ void RNWorkerFetchPages::doFetchPages(
seg_task->meta.store_address);

grpc::ClientContext client_context;
// set timeout for the streaming call to avoid inf wait before `Finish()`
rpc.setClientContext(
client_context,
seg_task->meta.dm_context->db_context.getSettingsRef().disagg_fetch_pages_timeout);
auto stream_resp = rpc.call(&client_context, request);

SCOPE_EXIT({
Expand Down Expand Up @@ -317,9 +321,10 @@ void RNWorkerFetchPages::doFetchPages(
// Verify all pending pages are now received.
RUNTIME_CHECK_MSG(
remaining_pages_to_fetch.empty(),
"Failed to fetch all pages (from {}), remaining_pages_to_fetch={}",
"Failed to fetch all pages for {}, remaining_pages_to_fetch={}, wn_address={}",
seg_task->info(),
remaining_pages_to_fetch);
remaining_pages_to_fetch,
seg_task->meta.store_address);

LOG_DEBUG(
log,
Expand Down
8 changes: 7 additions & 1 deletion dbms/src/Storages/StorageDisaggregatedRemote.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ namespace ErrorCodes
{
extern const int DISAGG_ESTABLISH_RETRYABLE_ERROR;
extern const int TIMEOUT_EXCEEDED;
extern const int UNKNOWN_EXCEPTION;
} // namespace ErrorCodes

BlockInputStreams StorageDisaggregated::readThroughS3(const Context & db_context, unsigned num_streams)
Expand Down Expand Up @@ -227,7 +228,12 @@ void StorageDisaggregated::buildReadTaskForWriteNode(
req->address(),
log->identifier());
else if (!status.ok())
throw Exception(rpc.errMsg(status));
throw Exception(
ErrorCodes::UNKNOWN_EXCEPTION,
"EstablishDisaggTask failed, wn_address={}, errmsg={}, {}",
req->address(),
rpc.errMsg(status),
log->identifier());

const DM::DisaggTaskId snapshot_id(resp.snapshot_id());
LOG_DEBUG(
Expand Down