From b9c7e1a0100ba7204e9ef2b370d417a19360531f Mon Sep 17 00:00:00 2001 From: jinhelin Date: Tue, 5 Mar 2024 11:28:03 +0800 Subject: [PATCH] disagg: Set client RPC timeout for FetchDisaggPages --- dbms/src/Core/Defines.h | 2 ++ dbms/src/Interpreters/Settings.h | 1 + .../Storages/DeltaMerge/Remote/RNWorkerFetchPages.cpp | 9 +++++++-- dbms/src/Storages/StorageDisaggregatedRemote.cpp | 8 +++++++- 4 files changed, 17 insertions(+), 3 deletions(-) diff --git a/dbms/src/Core/Defines.h b/dbms/src/Core/Defines.h index 3b015407a03..1144c5f5caa 100644 --- a/dbms/src/Core/Defines.h +++ b/dbms/src/Core/Defines.h @@ -55,6 +55,8 @@ static constexpr UInt64 DEFAULT_DISAGG_TASK_BUILD_TIMEOUT_SEC = 60; // It is now a short period to avoid long stale snapshots causing system // instable. static constexpr UInt64 DEFAULT_DISAGG_TASK_TIMEOUT_SEC = 5 * 60; +// Timeout for FetchDisaggPages in the TiFlash compute node. +static constexpr UInt64 DEFAULT_DISAGG_FETCH_PAGES_TIMEOUT_SEC = 30; #define DEFAULT_DAG_RECORDS_PER_CHUNK 1024L #define DEFAULT_BATCH_SEND_MIN_LIMIT (-1) diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index bd57841d496..0de9d8f4016 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -54,6 +54,7 @@ struct Settings M(SettingUInt64, mpp_task_waiting_timeout, DEFAULT_MPP_TASK_WAITING_TIMEOUT, "mpp task max time that waiting first data block from source input stream.") \ M(SettingUInt64, disagg_build_task_timeout, DEFAULT_DISAGG_TASK_BUILD_TIMEOUT_SEC, "disagg task establish timeout, unit is second.") \ M(SettingUInt64, disagg_task_snapshot_timeout, DEFAULT_DISAGG_TASK_TIMEOUT_SEC, "disagg task snapshot max endurable time, unit is second.") \ + M(SettingUInt64, disagg_fetch_pages_timeout, DEFAULT_DISAGG_FETCH_PAGES_TIMEOUT_SEC, "fetch disagg pages timeout for one segment, unit is second.") \ M(SettingInt64, safe_point_update_interval_seconds, 1, "The interval in seconds to update safe point from PD.") \ M(SettingUInt64, min_compress_block_size, DEFAULT_MIN_COMPRESS_BLOCK_SIZE, "The actual size of the block to compress, if the uncompressed data less than max_compress_block_size is no less than this value " \ "and no less than the volume of data for one mark.") \ diff --git a/dbms/src/Storages/DeltaMerge/Remote/RNWorkerFetchPages.cpp b/dbms/src/Storages/DeltaMerge/Remote/RNWorkerFetchPages.cpp index 739f382c16c..ee3641e23af 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/RNWorkerFetchPages.cpp +++ b/dbms/src/Storages/DeltaMerge/Remote/RNWorkerFetchPages.cpp @@ -184,6 +184,10 @@ void RNWorkerFetchPages::doFetchPages( seg_task->meta.store_address); grpc::ClientContext client_context; + // set timeout for the streaming call to avoid inf wait before `Finish()` + rpc.setClientContext( + client_context, + seg_task->meta.dm_context->db_context.getSettingsRef().disagg_fetch_pages_timeout); auto stream_resp = rpc.call(&client_context, request); SCOPE_EXIT({ @@ -317,9 +321,10 @@ void RNWorkerFetchPages::doFetchPages( // Verify all pending pages are now received. RUNTIME_CHECK_MSG( remaining_pages_to_fetch.empty(), - "Failed to fetch all pages (from {}), remaining_pages_to_fetch={}", + "Failed to fetch all pages for {}, remaining_pages_to_fetch={}, wn_address={}", seg_task->info(), - remaining_pages_to_fetch); + remaining_pages_to_fetch, + seg_task->meta.store_address); LOG_DEBUG( log, diff --git a/dbms/src/Storages/StorageDisaggregatedRemote.cpp b/dbms/src/Storages/StorageDisaggregatedRemote.cpp index 4c7fafaddb5..e554d227751 100644 --- a/dbms/src/Storages/StorageDisaggregatedRemote.cpp +++ b/dbms/src/Storages/StorageDisaggregatedRemote.cpp @@ -70,6 +70,7 @@ namespace ErrorCodes { extern const int DISAGG_ESTABLISH_RETRYABLE_ERROR; extern const int TIMEOUT_EXCEEDED; +extern const int UNKNOWN_EXCEPTION; } // namespace ErrorCodes BlockInputStreams StorageDisaggregated::readThroughS3(const Context & db_context, unsigned num_streams) @@ -227,7 +228,12 @@ void StorageDisaggregated::buildReadTaskForWriteNode( req->address(), log->identifier()); else if (!status.ok()) - throw Exception(rpc.errMsg(status)); + throw Exception( + ErrorCodes::UNKNOWN_EXCEPTION, + "EstablishDisaggTask failed, wn_address={}, errmsg={}, {}", + req->address(), + rpc.errMsg(status), + log->identifier()); const DM::DisaggTaskId snapshot_id(resp.snapshot_id()); LOG_DEBUG(