diff --git a/dbms/src/Core/Defines.h b/dbms/src/Core/Defines.h index b0d8c2060e0..9a6b13ed477 100644 --- a/dbms/src/Core/Defines.h +++ b/dbms/src/Core/Defines.h @@ -51,6 +51,8 @@ static constexpr UInt64 DEFAULT_DISAGG_TASK_BUILD_TIMEOUT_SEC = 60; // It is now a short period to avoid long stale snapshots causing system // instable. static constexpr UInt64 DEFAULT_DISAGG_TASK_TIMEOUT_SEC = 5 * 60; +// Timeout for FetchDisaggPages in the TiFlash compute node. +static constexpr UInt64 DEFAULT_DISAGG_FETCH_PAGES_TIMEOUT_SEC = 30; #define DEFAULT_DAG_RECORDS_PER_CHUNK 1024L #define DEFAULT_BATCH_SEND_MIN_LIMIT (-1) diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index 9ca07deca56..fd92cb3aea7 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -54,6 +54,7 @@ struct Settings M(SettingUInt64, mpp_task_waiting_timeout, DEFAULT_MPP_TASK_WAITING_TIMEOUT, "mpp task max time that waiting first data block from source input stream.") \ M(SettingUInt64, disagg_build_task_timeout, DEFAULT_DISAGG_TASK_BUILD_TIMEOUT_SEC, "disagg task establish timeout, unit is second.") \ M(SettingUInt64, disagg_task_snapshot_timeout, DEFAULT_DISAGG_TASK_TIMEOUT_SEC, "disagg task snapshot max endurable time, unit is second.") \ + M(SettingUInt64, disagg_fetch_pages_timeout, DEFAULT_DISAGG_FETCH_PAGES_TIMEOUT_SEC, "fetch disagg pages timeout for one segment, unit is second.") \ M(SettingInt64, safe_point_update_interval_seconds, 1, "The interval in seconds to update safe point from PD.") \ M(SettingUInt64, min_compress_block_size, DEFAULT_MIN_COMPRESS_BLOCK_SIZE, "The actual size of the block to compress, if the uncompressed data less than max_compress_block_size is no less than this value " \ "and no less than the volume of data for one mark.") \ diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp b/dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp index de2befb029c..4294804edd9 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp @@ -551,6 +551,8 @@ void SegmentReadTask::doFetchPages(const disaggregated::FetchDisaggPagesRequest cluster->rpc_client, extra_remote_info->store_address); grpc::ClientContext client_context; + // set timeout for the streaming call to avoid inf wait before `Finish()` + rpc.setClientContext(client_context, dm_context->global_context.getSettingsRef().disagg_fetch_pages_timeout); auto stream_resp = rpc.call(&client_context, request); RUNTIME_CHECK(stream_resp != nullptr); SCOPE_EXIT({