From c1a9215d649d0d2bd6596560b659baaa2dd8c0f0 Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Tue, 26 Mar 2024 14:09:46 +0800 Subject: [PATCH] This is an automated cherry-pick of #8853 Signed-off-by: ti-chi-bot --- contrib/tiflash-proxy-cmake/CMakeLists.txt | 29 + dbms/src/Common/TiFlashMetrics.h | 793 ++++++++++++++++++ dbms/src/Debug/DBGInvoker.cpp | 10 + dbms/src/Debug/{ => MockKVStore}/MockTiKV.h | 0 dbms/src/Debug/MockTiDB.cpp | 2 +- dbms/src/Debug/dbgFuncInvestigator.cpp | 308 +++++++ dbms/src/Debug/dbgFuncInvestigator.h | 26 + dbms/src/Debug/dbgFuncMockRaftCommand.cpp | 6 + dbms/src/Debug/dbgFuncMockRaftSnapshot.cpp | 8 + dbms/src/Debug/dbgFuncRegion.cpp | 6 + dbms/src/Debug/dbgNaturalDag.cpp | 2 +- dbms/src/Debug/dbgTools.cpp | 5 + .../KVStore/Read/LearnerReadWorker.cpp | 470 +++++++++++ .../KVStore/tests/gtest_async_tasks.cpp | 319 +++++++ 14 files changed, 1982 insertions(+), 2 deletions(-) rename dbms/src/Debug/{ => MockKVStore}/MockTiKV.h (100%) create mode 100644 dbms/src/Debug/dbgFuncInvestigator.cpp create mode 100644 dbms/src/Debug/dbgFuncInvestigator.h create mode 100644 dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp create mode 100644 dbms/src/Storages/KVStore/tests/gtest_async_tasks.cpp diff --git a/contrib/tiflash-proxy-cmake/CMakeLists.txt b/contrib/tiflash-proxy-cmake/CMakeLists.txt index 67ce514997b..8a1e25781a4 100644 --- a/contrib/tiflash-proxy-cmake/CMakeLists.txt +++ b/contrib/tiflash-proxy-cmake/CMakeLists.txt @@ -14,10 +14,39 @@ if (CMAKE_BUILD_TYPE_UC STREQUAL "DEBUG" OR SAN_DEBUG) set(_TIFLASH_PROXY_BUILD_PROFILE "debug") +<<<<<<< HEAD set(_TIFLASH_PROXY_MAKE_COMMAND make debug) else() set(_TIFLASH_PROXY_BUILD_PROFILE "release") set(_TIFLASH_PROXY_MAKE_COMMAND make release) +======= + if (ENABLE_JEMALLOC) + if (APPLE) + message(STATUS "proxy's jemalloc is disabled (AppleOS)") + set(_TIFLASH_PROXY_MAKE_COMMAND make debug) + else() + message(STATUS "proxy's jemalloc is enabled") + set(_TIFLASH_PROXY_MAKE_COMMAND ENABLE_FEATURES="external-jemalloc" make debug) + endif() + else() + message(STATUS "proxy's jemalloc is disabled") + set(_TIFLASH_PROXY_MAKE_COMMAND make debug) + endif() +else() + set(_TIFLASH_PROXY_BUILD_PROFILE "release") + if (ENABLE_JEMALLOC) + if (APPLE) + message(STATUS "proxy's jemalloc is disabled (AppleOS)") + set(_TIFLASH_PROXY_MAKE_COMMAND make release) + else() + message(STATUS "proxy's jemalloc is enabled") + set(_TIFLASH_PROXY_MAKE_COMMAND ENABLE_FEATURES="external-jemalloc" make release) + endif() + else() + message(STATUS "proxy's jemalloc is disabled") + set(_TIFLASH_PROXY_MAKE_COMMAND make release) + endif() +>>>>>>> ce8ae39fb9 (Debug: Add find key debug invoker (#8853)) endif() set(_TIFLASH_PROXY_SOURCE_DIR "${TiFlash_SOURCE_DIR}/contrib/tiflash-proxy") diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index 87d41844a38..9c1193b49f1 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -41,6 +41,7 @@ namespace DB /// 2. Keep metrics with same prefix next to each other. /// 3. Add metrics of new subsystems at tail. /// 4. Keep it proper formatted using clang-format. +<<<<<<< HEAD // clang-format off #define APPLY_FOR_METRICS(M, F) \ M(tiflash_coprocessor_request_count, "Total number of request", Counter, F(type_cop, {"type", "cop"}), \ @@ -289,6 +290,798 @@ namespace DB F(type_bg_read, {"type", "bg_read"}), \ F(type_fg_write, {"type", "fg_write"}), \ F(type_bg_write, {"type", "bg_write"})) +======= +#define APPLY_FOR_METRICS(M, F) \ + M(tiflash_coprocessor_request_count, \ + "Total number of request", \ + Counter, \ + F(type_cop, {"type", "cop"}), \ + F(type_cop_executing, {"type", "cop_executing"}), \ + F(type_cop_stream, {"type", "cop_stream"}), \ + F(type_cop_stream_executing, {"type", "cop_stream_executing"}), \ + F(type_batch, {"type", "batch"}), \ + F(type_batch_executing, {"type", "batch_executing"}), \ + F(type_dispatch_mpp_task, {"type", "dispatch_mpp_task"}), \ + F(type_mpp_establish_conn, {"type", "mpp_establish_conn"}), \ + F(type_cancel_mpp_task, {"type", "cancel_mpp_task"}), \ + F(type_run_mpp_task, {"type", "run_mpp_task"}), \ + F(type_remote_read, {"type", "remote_read"}), \ + F(type_remote_read_constructed, {"type", "remote_read_constructed"}), \ + F(type_remote_read_sent, {"type", "remote_read_sent"}), \ + F(type_disagg_establish_task, {"type", "disagg_establish_task"}), \ + F(type_disagg_fetch_pages, {"type", "disagg_fetch_pages"})) \ + M(tiflash_coprocessor_handling_request_count, \ + "Number of handling request", \ + Gauge, \ + F(type_cop, {"type", "cop"}), \ + F(type_cop_executing, {"type", "cop_executing"}), \ + F(type_cop_stream, {"type", "cop_stream"}), \ + F(type_cop_stream_executing, {"type", "cop_stream_executing"}), \ + F(type_batch, {"type", "batch"}), \ + F(type_batch_executing, {"type", "batch_executing"}), \ + F(type_dispatch_mpp_task, {"type", "dispatch_mpp_task"}), \ + F(type_mpp_establish_conn, {"type", "mpp_establish_conn"}), \ + F(type_cancel_mpp_task, {"type", "cancel_mpp_task"}), \ + F(type_run_mpp_task, {"type", "run_mpp_task"}), \ + F(type_remote_read, {"type", "remote_read"}), \ + F(type_remote_read_executing, {"type", "remote_read_executing"}), \ + F(type_disagg_establish_task, {"type", "disagg_establish_task"}), \ + F(type_disagg_fetch_pages, {"type", "disagg_fetch_pages"})) \ + M(tiflash_coprocessor_executor_count, \ + "Total number of each executor", \ + Counter, \ + F(type_ts, {"type", "table_scan"}), \ + F(type_sel, {"type", "selection"}), \ + F(type_agg, {"type", "aggregation"}), \ + F(type_topn, {"type", "top_n"}), \ + F(type_limit, {"type", "limit"}), \ + F(type_join, {"type", "join"}), \ + F(type_exchange_sender, {"type", "exchange_sender"}), \ + F(type_exchange_receiver, {"type", "exchange_receiver"}), \ + F(type_projection, {"type", "projection"}), \ + F(type_partition_ts, {"type", "partition_table_scan"}), \ + F(type_window, {"type", "window"}), \ + F(type_window_sort, {"type", "window_sort"}), \ + F(type_expand, {"type", "expand"})) \ + M(tiflash_memory_exceed_quota_count, "Total number of cases where memory exceeds quota", Counter) \ + M(tiflash_coprocessor_request_duration_seconds, \ + "Bucketed histogram of request duration", \ + Histogram, \ + F(type_cop, {{"type", "cop"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_cop_stream, {{"type", "cop_stream"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_batch, {{"type", "batch"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_dispatch_mpp_task, {{"type", "dispatch_mpp_task"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_mpp_establish_conn, {{"type", "mpp_establish_conn"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_cancel_mpp_task, {{"type", "cancel_mpp_task"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_run_mpp_task, {{"type", "run_mpp_task"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_disagg_establish_task, {{"type", "disagg_establish_task"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_disagg_fetch_pages, {{"type", "type_disagg_fetch_pages"}}, ExpBuckets{0.001, 2, 20})) \ + M(tiflash_coprocessor_request_memory_usage, \ + "Bucketed histogram of request memory usage", \ + Histogram, \ + F(type_cop, {{"type", "cop"}}, ExpBuckets{1024 * 1024, 2, 16}), \ + F(type_cop_stream, {{"type", "cop_stream"}}, ExpBuckets{1024 * 1024, 2, 16}), \ + F(type_batch, {{"type", "batch"}}, ExpBuckets{1024 * 1024, 2, 20}), \ + F(type_run_mpp_task, {{"type", "run_mpp_task"}}, ExpBuckets{1024 * 1024, 2, 20}), \ + F(type_run_mpp_query, {{"type", "run_mpp_query"}}, ExpBuckets{1024 * 1024, 2, 20})) \ + M(tiflash_coprocessor_request_error, \ + "Total number of request error", \ + Counter, \ + F(reason_meet_lock, {"reason", "meet_lock"}), \ + F(reason_region_not_found, {"reason", "region_not_found"}), \ + F(reason_epoch_not_match, {"reason", "epoch_not_match"}), \ + F(reason_kv_client_error, {"reason", "kv_client_error"}), \ + F(reason_internal_error, {"reason", "internal_error"}), \ + F(reason_other_error, {"reason", "other_error"})) \ + M(tiflash_coprocessor_request_handle_seconds, \ + "Bucketed histogram of request handle duration", \ + Histogram, \ + F(type_cop, {{"type", "cop"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_cop_stream, {{"type", "cop_stream"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_batch, {{"type", "batch"}}, ExpBuckets{0.001, 2, 20})) \ + M(tiflash_coprocessor_response_bytes, \ + "Total bytes of response body", \ + Counter, \ + F(type_cop, {{"type", "cop"}}), \ + F(type_cop_stream, {{"type", "cop_stream"}}), \ + F(type_batch_cop, {{"type", "batch_cop"}}), \ + F(type_dispatch_mpp_task, {{"type", "dispatch_mpp_task"}}), \ + F(type_mpp_establish_conn, {{"type", "mpp_tunnel"}}), \ + F(type_mpp_establish_conn_local, {{"type", "mpp_tunnel_local"}}), \ + F(type_cancel_mpp_task, {{"type", "cancel_mpp_task"}}), \ + F(type_disagg_establish_task, {{"type", "type_disagg_establish_task"}})) \ + M(tiflash_exchange_data_bytes, \ + "Total bytes sent by exchange operators", \ + Counter, \ + F(type_hash_original, {"type", "hash_original"}), \ + F(type_hash_none_compression_remote, {"type", "hash_none_compression_remote"}), \ + F(type_hash_none_compression_local, {"type", "hash_none_compression_local"}), \ + F(type_hash_lz4_compression, {"type", "hash_lz4_compression"}), \ + F(type_hash_zstd_compression, {"type", "hash_zstd_compression"}), \ + F(type_broadcast_original, {"type", "broadcast_original"}), \ + F(type_broadcast_none_compression_local, {"type", "broadcast_none_compression_local"}), \ + F(type_broadcast_none_compression_remote, {"type", "broadcast_none_compression_remote"}), \ + F(type_broadcast_lz4_compression, {"type", "broadcast_lz4_compression"}), \ + F(type_broadcast_zstd_compression, {"type", "broadcast_zstd_compression"}), \ + F(type_passthrough_original, {"type", "passthrough_original"}), \ + F(type_passthrough_none_compression_local, {"type", "passthrough_none_compression_local"}), \ + F(type_passthrough_none_compression_remote, {"type", "passthrough_none_compression_remote"}), \ + F(type_passthrough_lz4_compression, {"type", "passthrough_lz4_compression"}), \ + F(type_passthrough_zstd_compression, {"type", "passthrough_zstd_compression"})) \ + M(tiflash_sync_schema_applying, "Whether the schema is applying or not (holding lock)", Gauge) \ + M(tiflash_schema_trigger_count, \ + "Total number of each kinds of schema sync trigger", \ + Counter, \ + F(type_timer, {"type", "timer"}), \ + F(type_raft_decode, {"type", "raft_decode"}), \ + F(type_cop_read, {"type", "cop_read"}), \ + F(type_sync_table_schema, {"type", "sync_table_schema"})) \ + M(tiflash_schema_internal_ddl_count, \ + "Total number of each kinds of internal ddl operations", \ + Counter, \ + F(type_create_table, {"type", "create_table"}), \ + F(type_create_db, {"type", "create_db"}), \ + F(type_drop_table, {"type", "drop_table"}), \ + F(type_drop_db, {"type", "drop_db"}), \ + F(type_rename_table, {"type", "rename_table"}), \ + F(type_modify_column, {"type", "modify_column"}), \ + F(type_apply_partition, {"type", "apply_partition"}), \ + F(type_exchange_partition, {"type", "exchange_partition"})) \ + M(tiflash_schema_apply_duration_seconds, \ + "Bucketed histogram of ddl apply duration", \ + Histogram, \ + F(type_sync_schema_apply_duration, {{"type", "sync_schema_duration"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_sync_table_schema_apply_duration, {{"type", "sync_table_schema_duration"}}, ExpBuckets{0.001, 2, 20})) \ + M(tiflash_raft_read_index_count, "Total number of raft read index", Counter) \ + M(tiflash_stale_read_count, "Total number of stale read", Counter) \ + M(tiflash_raft_read_index_duration_seconds, \ + "Bucketed histogram of raft read index duration", \ + Histogram, \ + F(type_raft_read_index_duration, {{"type", "tmt_raft_read_index_duration"}}, ExpBuckets{0.001, 2, 20})) \ + M(tiflash_raft_wait_index_duration_seconds, \ + "Bucketed histogram of raft wait index duration", \ + Histogram, \ + F(type_raft_wait_index_duration, {{"type", "tmt_raft_wait_index_duration"}}, ExpBuckets{0.001, 2, 20})) \ + M(tiflash_raft_eager_gc_duration_seconds, \ + "Bucketed histogram of RaftLog eager", \ + Histogram, \ + F(type_run, {{"type", "run"}}, ExpBuckets{0.0005, 2, 20})) \ + M(tiflash_raft_eager_gc_count, \ + "Total number processed in RaftLog eager GC", \ + Counter, \ + F(type_num_raft_logs, {"type", "num_raft_logs"}), \ + F(type_num_skip_regions, {"type", "num_skip_regions"}), \ + F(type_num_process_regions, {"type", "num_process_regions"})) \ + M(tiflash_syncing_data_freshness, \ + "The freshness of tiflash data with tikv data", \ + Histogram, \ + F(type_syncing_data_freshness, {{"type", "data_freshness"}}, ExpBuckets{0.001, 2, 20})) \ + M(tiflash_storage_read_tasks_count, "Total number of storage engine read tasks", Counter) \ + M(tiflash_storage_command_count, \ + "Total number of storage's command, such as delete range / shutdown /startup", \ + Counter, \ + F(type_delete_range, {"type", "delete_range"}), \ + F(type_ingest, {"type", "ingest"}), \ + F(type_ingest_checkpoint, {"type", "ingest_check_point"})) \ + M(tiflash_storage_subtask_count, \ + "Total number of storage's sub task", \ + Counter, \ + F(type_delta_merge_bg, {"type", "delta_merge_bg"}), \ + F(type_delta_merge_bg_gc, {"type", "delta_merge_bg_gc"}), \ + F(type_delta_merge_fg, {"type", "delta_merge_fg"}), \ + F(type_delta_merge_manual, {"type", "delta_merge_manual"}), \ + F(type_delta_compact, {"type", "delta_compact"}), \ + F(type_delta_flush, {"type", "delta_flush"}), \ + F(type_seg_split_bg, {"type", "seg_split_bg"}), \ + F(type_seg_split_fg, {"type", "seg_split_fg"}), \ + F(type_seg_split_ingest, {"type", "seg_split_ingest"}), \ + F(type_seg_merge_bg_gc, {"type", "seg_merge_bg_gc"}), \ + F(type_place_index_update, {"type", "place_index_update"})) \ + M(tiflash_storage_subtask_duration_seconds, \ + "Bucketed histogram of storage's sub task duration", \ + Histogram, \ + F(type_delta_merge_bg, {{"type", "delta_merge_bg"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_delta_merge_bg_gc, {{"type", "delta_merge_bg_gc"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_delta_merge_fg, {{"type", "delta_merge_fg"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_delta_merge_manual, {{"type", "delta_merge_manual"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_delta_compact, {{"type", "delta_compact"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_delta_flush, {{"type", "delta_flush"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_seg_split_bg, {{"type", "seg_split_bg"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_seg_split_fg, {{"type", "seg_split_fg"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_seg_split_ingest, {{"type", "seg_split_ingest"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_seg_merge_bg_gc, {{"type", "seg_merge_bg_gc"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_place_index_update, {{"type", "place_index_update"}}, ExpBuckets{0.001, 2, 20})) \ + M(tiflash_storage_subtask_throughput_bytes, \ + "Calculate the throughput of (maybe foreground) tasks of storage in bytes", \ + Counter, /**/ \ + F(type_delta_flush, {"type", "delta_flush"}), /**/ \ + F(type_delta_compact, {"type", "delta_compact"}), /**/ \ + F(type_write_to_cache, {"type", "write_to_cache"}), /**/ \ + F(type_write_to_disk, {"type", "write_to_disk"})) /**/ \ + M(tiflash_storage_subtask_throughput_rows, \ + "Calculate the throughput of (maybe foreground) tasks of storage in rows", \ + Counter, /**/ \ + F(type_delta_flush, {"type", "delta_flush"}), /**/ \ + F(type_delta_compact, {"type", "delta_compact"}), /**/ \ + F(type_write_to_cache, {"type", "write_to_cache"}), /**/ \ + F(type_write_to_disk, {"type", "write_to_disk"})) /**/ \ + M(tiflash_storage_throughput_bytes, \ + "Calculate the throughput of tasks of storage in bytes", \ + Gauge, /**/ \ + F(type_write, {"type", "write"}), /**/ \ + F(type_ingest, {"type", "ingest"}), /**/ \ + F(type_delta_merge, {"type", "delta_merge"}), /**/ \ + F(type_split, {"type", "split"}), /**/ \ + F(type_merge, {"type", "merge"})) /**/ \ + M(tiflash_storage_throughput_rows, \ + "Calculate the throughput of tasks of storage in rows", \ + Gauge, /**/ \ + F(type_write, {"type", "write"}), /**/ \ + F(type_ingest, {"type", "ingest"}), /**/ \ + F(type_delta_merge, {"type", "delta_merge"}), /**/ \ + F(type_split, {"type", "split"}), /**/ \ + F(type_merge, {"type", "merge"})) /**/ \ + M(tiflash_storage_write_stall_duration_seconds, \ + "The write stall duration of storage, in seconds", \ + Histogram, \ + F(type_write, {{"type", "write"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_delta_merge_by_write, {{"type", "delta_merge_by_write"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_delta_merge_by_delete_range, {{"type", "delta_merge_by_delete_range"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_flush, {{"type", "flush"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_split, {{"type", "split"}}, ExpBuckets{0.001, 2, 20})) \ + M(tiflash_storage_page_gc_count, \ + "Total number of page's gc execution.", \ + Counter, \ + F(type_v2, {"type", "v2"}), \ + F(type_v2_low, {"type", "v2_low"}), \ + F(type_v3, {"type", "v3"}), \ + F(type_v3_mvcc_dumped, {"type", "v3_mvcc_dumped"}), \ + F(type_v3_bs_full_gc, {"type", "v3_bs_full_gc"})) \ + M(tiflash_storage_page_gc_duration_seconds, \ + "Bucketed histogram of page's gc task duration", \ + Histogram, \ + F(type_v2, {{"type", "v2"}}, ExpBuckets{0.0005, 2, 20}), \ + F(type_v2_data_compact, {{"type", "v2_data_compact"}}, ExpBuckets{0.0005, 2, 20}), \ + F(type_v2_ver_compact, \ + {{"type", "v2_ver_compact"}}, \ + ExpBuckets{0.0005, 2, 20}), /* Below are metrics for PageStorage V3 */ \ + F(type_compact_wal, {{"type", "compact_wal"}}, ExpBuckets{0.0005, 2, 20}), \ + F(type_compact_directory, {{"type", "compact_directory"}}, ExpBuckets{0.0005, 2, 20}), \ + F(type_compact_spacemap, {{"type", "compact_spacemap"}}, ExpBuckets{0.0005, 2, 20}), \ + F(type_fullgc_rewrite, {{"type", "fullgc_rewrite"}}, ExpBuckets{0.0005, 2, 20}), \ + F(type_fullgc_commit, {{"type", "fullgc_commit"}}, ExpBuckets{0.0005, 2, 20}), \ + F(type_clean_external, {{"type", "clean_external"}}, ExpBuckets{0.0005, 2, 20}), \ + F(type_v3, {{"type", "v3"}}, ExpBuckets{0.0005, 2, 20})) \ + M(tiflash_storage_page_command_count, \ + "Total number of PageStorage's command, such as write / read / scan / snapshot", \ + Counter, \ + F(type_write, {"type", "write"}), \ + F(type_read, {"type", "read"}), \ + F(type_read_page_dir, {"type", "read_page_dir"}), \ + F(type_read_blob, {"type", "read_blob"}), \ + F(type_scan, {"type", "scan"}), \ + F(type_snapshot, {"type", "snapshot"})) \ + M(tiflash_storage_page_write_batch_size, \ + "The size of each write batch in bytes", \ + Histogram, \ + F(type_v3, {{"type", "v3"}}, ExpBuckets{4 * 1024, 4, 10})) \ + M(tiflash_storage_page_write_duration_seconds, \ + "The duration of each write batch", \ + Histogram, \ + F(type_total, \ + {{"type", "total"}}, \ + ExpBuckets{0.0001, 2, 20}), /* the bucket range for apply in memory is 50us ~ 120s */ \ + F(type_choose_stat, {{"type", "choose_stat"}}, ExpBuckets{0.00005, 1.8, 26}), \ + F(type_search_pos, {{"type", "search_pos"}}, ExpBuckets{0.00005, 1.8, 26}), \ + F(type_blob_write, {{"type", "blob_write"}}, ExpBuckets{0.00005, 1.8, 26}), \ + F(type_latch, {{"type", "latch"}}, ExpBuckets{0.00005, 1.8, 26}), \ + F(type_wait_in_group, {{"type", "wait_in_group"}}, ExpBuckets{0.00005, 1.8, 26}), \ + F(type_wal, {{"type", "wal"}}, ExpBuckets{0.00005, 1.8, 26}), \ + F(type_commit, {{"type", "commit"}}, ExpBuckets{0.00005, 1.8, 26})) \ + M(tiflash_storage_logical_throughput_bytes, \ + "The logical throughput of read tasks of storage in bytes", \ + Histogram, \ + F(type_read, {{"type", "read"}}, EqualWidthBuckets{1 * 1024 * 1024, 60, 50 * 1024 * 1024})) \ + M(tiflash_storage_io_limiter, \ + "Storage I/O limiter metrics", \ + Counter, \ + F(type_fg_read_req_bytes, {"type", "fg_read_req_bytes"}), \ + F(type_fg_read_alloc_bytes, {"type", "fg_read_alloc_bytes"}), \ + F(type_bg_read_req_bytes, {"type", "bg_read_req_bytes"}), \ + F(type_bg_read_alloc_bytes, {"type", "bg_read_alloc_bytes"}), \ + F(type_fg_write_req_bytes, {"type", "fg_write_req_bytes"}), \ + F(type_fg_write_alloc_bytes, {"type", "fg_write_alloc_bytes"}), \ + F(type_bg_write_req_bytes, {"type", "bg_write_req_bytes"}), \ + F(type_bg_write_alloc_bytes, {"type", "bg_write_alloc_bytes"})) \ + M(tiflash_storage_rough_set_filter_rate, \ + "Bucketed histogram of rough set filter rate", \ + Histogram, \ + F(type_dtfile_pack, {{"type", "dtfile_pack"}}, EqualWidthBuckets{0, 6, 20})) \ + M(tiflash_disaggregated_object_lock_request_count, \ + "Total number of S3 object lock/delete request", \ + Counter, \ + F(type_lock, {"type", "lock"}), \ + F(type_delete, {"type", "delete"}), \ + F(type_owner_changed, {"type", "owner_changed"}), \ + F(type_error, {"type", "error"}), \ + F(type_lock_conflict, {"type", "lock_conflict"}), \ + F(type_delete_conflict, {"type", "delete_conflict"}), \ + F(type_delete_risk, {"type", "delete_risk"})) \ + M(tiflash_disaggregated_object_lock_request_duration_seconds, \ + "Bucketed histogram of S3 object lock/delete request duration", \ + Histogram, \ + F(type_lock, {{"type", "lock"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_delete, {{"type", "delete"}}, ExpBuckets{0.001, 2, 20})) \ + M(tiflash_disaggregated_read_tasks_count, "Total number of storage engine disaggregated read tasks", Counter) \ + M(tiflash_disaggregated_breakdown_duration_seconds, \ + "", \ + Histogram, \ + F(type_rpc_establish, {{"type", "rpc_establish"}}, ExpBuckets{0.01, 2, 20}), \ + F(type_total_establish_backoff, {{"type", "total_establish_backoff"}}, ExpBuckets{0.01, 2, 20}), \ + F(type_resolve_lock, {{"type", "resolve_lock"}}, ExpBuckets{0.01, 2, 20}), \ + F(type_rpc_fetch_page, {{"type", "rpc_fetch_page"}}, ExpBuckets{0.01, 2, 20}), \ + F(type_write_page_cache, {{"type", "write_page_cache"}}, ExpBuckets{0.01, 2, 20}), \ + F(type_cache_occupy, {{"type", "cache_occupy"}}, ExpBuckets{0.01, 2, 20}), \ + F(type_worker_fetch_page, {{"type", "worker_fetch_page"}}, ExpBuckets{0.01, 2, 20}), \ + F(type_worker_prepare_stream, {{"type", "worker_prepare_stream"}}, ExpBuckets{0.01, 2, 20}), \ + F(type_stream_wait_next_task, {{"type", "stream_wait_next_task"}}, ExpBuckets{0.01, 2, 20}), \ + F(type_stream_read, {{"type", "stream_read"}}, ExpBuckets{0.01, 2, 20}), \ + F(type_deserialize_page, {{"type", "deserialize_page"}}, ExpBuckets{0.01, 2, 20})) \ + M(tiflash_disaggregated_details, \ + "", \ + Counter, \ + F(type_cftiny_read, {{"type", "cftiny_read"}}), \ + F(type_cftiny_fetch, {{"type", "cftiny_fetch"}})) \ + M(tiflash_fap_task_result, \ + "", \ + Counter, \ + F(type_total, {{"type", "total"}}), \ + F(type_success_transform, {{"type", "success_transform"}}), \ + F(type_failed_other, {{"type", "failed_other"}}), \ + F(type_failed_cancel, {{"type", "failed_cancel"}}), \ + F(type_failed_no_suitable, {{"type", "failed_no_suitable"}}), \ + F(type_failed_timeout, {{"type", "failed_timeout"}}), \ + F(type_failed_baddata, {{"type", "failed_baddata"}}), \ + F(type_failed_repeated, {{"type", "failed_repeated"}}), \ + F(type_restore, {{"type", "restore"}}), \ + F(type_succeed, {{"type", "succeed"}})) \ + M(tiflash_fap_task_state, \ + "", \ + Gauge, \ + F(type_ongoing, {{"type", "ongoing"}}), \ + F(type_ingesting_stage, {{"type", "ingesting_stage"}}), \ + F(type_writing_stage, {{"type", "writing_stage"}}), \ + F(type_queueing_stage, {{"type", "queueing_stage"}}), \ + F(type_blocking_cancel_stage, {{"type", "blocking_cancel_stage"}}), \ + F(type_selecting_stage, {{"type", "selecting_stage"}})) \ + M(tiflash_fap_nomatch_reason, \ + "", \ + Counter, \ + F(type_conf, {{"type", "conf"}}), \ + F(type_region_state, {{"type", "region_state"}}), \ + F(type_no_meta, {{"type", "no_meta"}})) \ + M(tiflash_fap_task_duration_seconds, \ + "", \ + Histogram, \ + F(type_select_stage, {{"type", "select_stage"}}, ExpBucketsWithRange{0.1, 2, 60}), \ + F(type_write_stage, {{"type", "write_stage"}}, ExpBucketsWithRange{0.05, 2, 60}), \ + F(type_ingest_stage, {{"type", "ingest_stage"}}, ExpBucketsWithRange{0.05, 2, 30}), \ + F(type_total, {{"type", "total"}}, ExpBucketsWithRange{0.1, 2, 300}), \ + F(type_queue_stage, {{"type", "queue_stage"}}, ExpBucketsWithRange{0.1, 2, 300}), \ + F(type_phase1_total, {{"type", "phase1_total"}}, ExpBucketsWithRange{0.2, 2, 80})) \ + M(tiflash_raft_command_duration_seconds, \ + "Bucketed histogram of some raft command: apply snapshot and ingest SST", \ + Histogram, /* these command usually cost several seconds, increase the start bucket to 50ms */ \ + F(type_remove_peer, {{"type", "remove_peer"}}, ExpBuckets{0.05, 2, 10}), \ + F(type_ingest_sst, {{"type", "ingest_sst"}}, ExpBuckets{0.05, 2, 10}), \ + F(type_ingest_sst_sst2dt, {{"type", "ingest_sst_sst2dt"}}, ExpBuckets{0.05, 2, 10}), \ + F(type_ingest_sst_upload, {{"type", "ingest_sst_upload"}}, ExpBuckets{0.05, 2, 10}), \ + F(type_apply_snapshot_predecode, {{"type", "snapshot_predecode"}}, ExpBuckets{0.05, 2, 15}), \ + F(type_apply_snapshot_total, {{"type", "snapshot_total"}}, ExpBucketsWithRange{0.1, 2, 600}), \ + F(type_apply_snapshot_predecode_sst2dt, {{"type", "snapshot_predecode_sst2dt"}}, ExpBuckets{0.05, 2, 15}), \ + F(type_apply_snapshot_predecode_parallel_wait, \ + {{"type", "snapshot_predecode_parallel_wait"}}, \ + ExpBuckets{0.1, 2, 10}), \ + F(type_apply_snapshot_predecode_upload, {{"type", "snapshot_predecode_upload"}}, ExpBuckets{0.05, 2, 10}), \ + F(type_apply_snapshot_flush, {{"type", "snapshot_flush"}}, ExpBuckets{0.05, 2, 10})) \ + M(tiflash_raft_process_keys, \ + "Total number of keys processed in some types of Raft commands", \ + Counter, \ + F(type_write_put, {"type", "write_put"}), \ + F(type_lock_put, {"type", "lock_put"}), \ + F(type_default_put, {"type", "default_put"}), \ + F(type_write_del, {"type", "write_del"}), \ + F(type_lock_del, {"type", "lock_del"}), \ + F(type_default_del, {"type", "default_del"}), \ + F(type_apply_snapshot, {"type", "apply_snapshot"}), \ + F(type_apply_snapshot_default, {"type", "apply_snapshot_default"}), \ + F(type_apply_snapshot_write, {"type", "apply_snapshot_write"}), \ + F(type_large_txn_lock_put, {"type", "large_txn_lock_put"}), \ + F(type_large_txn_lock_del, {"type", "large_txn_lock_del"}), \ + F(type_ingest_sst, {"type", "ingest_sst"})) \ + M(tiflash_raft_apply_write_command_duration_seconds, \ + "Bucketed histogram of applying write command Raft logs", \ + Histogram, \ + F(type_write, {{"type", "write"}}, ExpBuckets{0.0005, 2, 20}), \ + F(type_admin, {{"type", "admin"}}, ExpBuckets{0.0005, 2, 20}), \ + F(type_admin_batch_split, {{"type", "admin_batch_split"}}, ExpBuckets{0.0005, 2, 20}), \ + F(type_admin_prepare_merge, {{"type", "admin_prepare_merge"}}, ExpBuckets{0.0005, 2, 20}), \ + F(type_admin_commit_merge, {{"type", "admin_commit_merge"}}, ExpBuckets{0.0005, 2, 20}), \ + F(type_admin_change_peer, {{"type", "admin_change_peer"}}, ExpBuckets{0.0005, 2, 20}), \ + F(type_flush_region, {{"type", "flush_region"}}, ExpBuckets{0.0005, 2, 20})) \ + M(tiflash_raft_upstream_latency, \ + "The latency that tikv sends raft log to tiflash.", \ + Histogram, \ + F(type_write, {{"type", "write"}}, ExpBuckets{0.001, 2, 30})) \ + M(tiflash_raft_write_data_to_storage_duration_seconds, \ + "Bucketed histogram of writting region into storage layer", \ + Histogram, \ + F(type_decode, {{"type", "decode"}}, ExpBuckets{0.0005, 2, 20}), \ + F(type_write, {{"type", "write"}}, ExpBuckets{0.0005, 2, 20})) \ + M(tiflash_raft_raft_log_gap_count, \ + "Bucketed histogram raft index gap between applied and truncated index", \ + Histogram, \ + F(type_applied_index, {{"type", "applied_index"}}, EqualWidthBuckets{0, 100, 15}), \ + F(type_eager_gc_applied_index, {{"type", "eager_gc_applied_index"}}, EqualWidthBuckets{0, 100, 10}), \ + F(type_unflushed_applied_index, {{"type", "unflushed_applied_index"}}, EqualWidthBuckets{0, 100, 15})) \ + M(tiflash_raft_raft_events_count, \ + "Raft event counter", \ + Counter, \ + F(type_pre_exec_compact, {{"type", "pre_exec_compact"}}), \ + F(type_flush_apply_snapshot, {{"type", "flush_apply_snapshot"}}), \ + F(type_flush_ingest_sst, {{"type", "flush_ingest_sst"}}), \ + F(type_flush_useless_admin, {{"type", "flush_useless_admin"}}), \ + F(type_flush_useful_admin, {{"type", "flush_useful_admin"}}), \ + F(type_flush_passive, {{"type", "flush_passive"}}), \ + F(type_flush_proactive, {{"type", "flush_proactive"}}), \ + F(type_flush_log_gap, {{"type", "flush_log_gap"}}), \ + F(type_flush_size, {{"type", "flush_size"}}), \ + F(type_flush_rowcount, {{"type", "flush_rowcount"}}), \ + F(type_flush_eager_gc, {{"type", "flush_eager_gc"}})) \ + M(tiflash_raft_raft_frequent_events_count, \ + "Raft frequent event counter", \ + Counter, \ + F(type_write_commit, {{"type", "write_commit"}}), \ + F(type_write, {{"type", "write"}})) \ + M(tiflash_raft_region_flush_bytes, \ + "Bucketed histogram of region flushed bytes", \ + Histogram, \ + F(type_flushed, {{"type", "flushed"}}, ExpBucketsWithRange{32, 4, 32 * 1024 * 1024}), \ + F(type_unflushed, {{"type", "unflushed"}}, ExpBucketsWithRange{32, 4, 32 * 1024 * 1024})) \ + M(tiflash_raft_entry_size, \ + "Bucketed histogram entry size", \ + Histogram, \ + F(type_normal, {{"type", "normal"}}, ExpBuckets{1, 2, 13})) \ + M(tiflash_raft_ongoing_snapshot_total_bytes, \ + "Ongoing snapshot total size", \ + Gauge, \ + F(type_raft_snapshot, {{"type", "raft_snapshot"}}), \ + F(type_dt_on_disk, {{"type", "dt_on_disk"}}), \ + F(type_dt_total, {{"type", "dt_total"}})) \ + M(tiflash_raft_throughput_bytes, \ + "Raft handled bytes in global", \ + Counter, \ + F(type_write, {{"type", "write"}}), \ + F(type_snapshot_committed, {{"type", "snapshot_committed"}}), \ + F(type_write_committed, {{"type", "write_committed"}})) \ + M(tiflash_raft_write_flow_bytes, \ + "Bucketed histogram of bytes for each write", \ + Histogram, \ + F(type_ingest_uncommitted, {{"type", "ingest_uncommitted"}}, ExpBucketsWithRange{16, 4, 64 * 1024}), \ + F(type_snapshot_uncommitted, {{"type", "snapshot_uncommitted"}}, ExpBucketsWithRange{16, 4, 1024 * 1024}), \ + F(type_write_committed, {{"type", "write_committed"}}, ExpBucketsWithRange{16, 2, 1024 * 1024}), \ + F(type_big_write_to_region, \ + {{"type", "big_write_to_region"}}, \ + ExpBucketsWithRange{RAFT_REGION_BIG_WRITE_THRES, 4, RAFT_REGION_BIG_WRITE_MAX})) \ + M(tiflash_raft_snapshot_total_bytes, \ + "Bucketed snapshot total size", \ + Histogram, \ + F(type_approx_raft_snapshot, {{"type", "approx_raft_snapshot"}}, ExpBuckets{1024, 2, 24})) /* 16G */ \ + M(tiflash_raft_learner_read_failures_count, \ + "Raft learner read failure reason counter", \ + Counter, \ + F(type_not_found_tiflash, {{"type", "not_found_tiflash"}}), \ + F(type_epoch_not_match, {{"type", "epoch_not_match"}}), \ + F(type_not_leader, {{"type", "not_leader"}}), \ + F(type_not_found_tikv, {{"type", "not_found_tikv"}}), \ + F(type_bucket_epoch_not_match, {{"type", "bucket_epoch_not_match"}}), \ + F(type_flashback, {{"type", "flashback"}}), \ + F(type_key_not_in_region, {{"type", "key_not_in_region"}}), \ + F(type_tikv_server_issue, {{"type", "tikv_server_issue"}}), \ + F(type_tikv_lock, {{"type", "tikv_lock"}}), \ + F(type_other, {{"type", "other"}})) \ + /* required by DBaaS */ \ + M(tiflash_server_info, \ + "Indicate the tiflash server info, and the value is the start timestamp (s).", \ + Gauge, \ + F(start_time, {"version", TiFlashBuildInfo::getReleaseVersion()}, {"hash", TiFlashBuildInfo::getGitHash()})) \ + M(tiflash_object_count, \ + "Number of objects", \ + Gauge, \ + F(type_count_of_establish_calldata, {"type", "count_of_establish_calldata"}), \ + F(type_count_of_mpptunnel, {"type", "count_of_mpptunnel"})) \ + M(tiflash_thread_count, \ + "Number of threads", \ + Gauge, \ + F(type_max_threads_of_thdpool, {"type", "thread_pool_total_max"}), \ + F(type_active_threads_of_thdpool, {"type", "thread_pool_active"}), \ + F(type_max_active_threads_of_thdpool, {"type", "thread_pool_active_max"}), \ + F(type_total_threads_of_thdpool, {"type", "thread_pool_total"}), \ + F(type_max_threads_of_raw, {"type", "total_max"}), \ + F(type_total_threads_of_raw, {"type", "total"}), \ + F(type_threads_of_client_cq_pool, {"type", "rpc_client_cq_pool"}), \ + F(type_threads_of_receiver_read_loop, {"type", "rpc_receiver_read_loop"}), \ + F(type_threads_of_receiver_reactor, {"type", "rpc_receiver_reactor"}), \ + F(type_max_threads_of_establish_mpp, {"type", "rpc_establish_mpp_max"}), \ + F(type_active_threads_of_establish_mpp, {"type", "rpc_establish_mpp"}), \ + F(type_max_threads_of_dispatch_mpp, {"type", "rpc_dispatch_mpp_max"}), \ + F(type_active_threads_of_dispatch_mpp, {"type", "rpc_dispatch_mpp"}), \ + F(type_active_rpc_async_worker, {"type", "rpc_async_worker_active"}), \ + F(type_total_rpc_async_worker, {"type", "rpc_async_worker_total"})) \ + M(tiflash_task_scheduler, \ + "Min-tso task scheduler", \ + Gauge, \ + F(type_min_tso, {"type", "min_tso"}), \ + F(type_waiting_queries_count, {"type", "waiting_queries_count"}), \ + F(type_active_queries_count, {"type", "active_queries_count"}), \ + F(type_waiting_tasks_count, {"type", "waiting_tasks_count"}), \ + F(type_active_tasks_count, {"type", "active_tasks_count"}), \ + F(type_global_estimated_thread_usage, {"type", "global_estimated_thread_usage"}), \ + F(type_estimated_thread_usage, {"type", "estimated_thread_usage"}), \ + F(type_thread_soft_limit, {"type", "thread_soft_limit"}), \ + F(type_thread_hard_limit, {"type", "thread_hard_limit"}), \ + F(type_hard_limit_exceeded_count, {"type", "hard_limit_exceeded_count"}), \ + F(type_group_entry_count, {"type", "group_entry_count"})) \ + M(tiflash_task_scheduler_waiting_duration_seconds, \ + "Bucketed histogram of task waiting for scheduling duration", \ + Histogram, \ + F(type_task_scheduler_waiting_duration, {{"type", "task_waiting_duration"}}, ExpBuckets{0.001, 2, 20})) \ + M(tiflash_storage_read_thread_counter, \ + "The counter of storage read thread", \ + Counter, \ + F(type_sche_no_pool, {"type", "sche_no_pool"}), \ + F(type_sche_no_slot, {"type", "sche_no_slot"}), \ + F(type_sche_no_ru, {"type", "sche_no_ru"}), \ + F(type_sche_no_segment, {"type", "sche_no_segment"}), \ + F(type_sche_active_segment_limit, {"type", "sche_active_segment_limit"}), \ + F(type_sche_from_cache, {"type", "sche_from_cache"}), \ + F(type_sche_new_task, {"type", "sche_new_task"}), \ + F(type_ru_exhausted, {"type", "ru_exhausted"}), \ + F(type_push_block_bytes, {"type", "push_block_bytes"}), \ + F(type_add_cache_succ, {"type", "add_cache_succ"}), \ + F(type_add_cache_stale, {"type", "add_cache_stale"}), \ + F(type_add_cache_reach_count_limit, {"type", "add_cache_reach_count_limit"}), \ + F(type_add_cache_total_bytes_limit, {"type", "add_cache_total_bytes_limit"}), \ + F(type_get_cache_miss, {"type", "get_cache_miss"}), \ + F(type_get_cache_part, {"type", "get_cache_part"}), \ + F(type_get_cache_hit, {"type", "get_cache_hit"}), \ + F(type_get_cache_copy, {"type", "get_cache_copy"})) \ + M(tiflash_storage_read_thread_gauge, \ + "The gauge of storage read thread", \ + Gauge, \ + F(type_merged_task, {"type", "merged_task"})) \ + M(tiflash_storage_read_thread_seconds, \ + "Bucketed histogram of read thread", \ + Histogram, \ + F(type_merged_task, {{"type", "merged_task"}}, ExpBuckets{0.001, 2, 20})) \ + M(tiflash_mpp_task_manager, \ + "The gauge of mpp task manager", \ + Gauge, \ + F(type_mpp_query_count, {"type", "mpp_query_count"})) \ + M(tiflash_mpp_task_monitor, \ + "Monitor the lifecycle of MPP Task", \ + Gauge, \ + F(type_longest_live_time, {"type", "longest_live_time"}), ) \ + M(tiflash_exchange_queueing_data_bytes, \ + "Total bytes of data contained in the queue", \ + Gauge, \ + F(type_send, {{"type", "send_queue"}}), \ + F(type_receive, {{"type", "recv_queue"}})) \ + M(tiflash_compute_request_unit, \ + "Request Unit used by tiflash compute", \ + Counter, \ + F(type_mpp, \ + {{"type", "mpp"}, \ + ComputeLabelHolder::instance().getClusterIdLabel(), \ + ComputeLabelHolder::instance().getProcessIdLabel()}), \ + F(type_cop, \ + {{"type", "cop"}, \ + ComputeLabelHolder::instance().getClusterIdLabel(), \ + ComputeLabelHolder::instance().getProcessIdLabel()}), \ + F(type_cop_stream, \ + {{"type", "cop_stream"}, \ + ComputeLabelHolder::instance().getClusterIdLabel(), \ + ComputeLabelHolder::instance().getProcessIdLabel()}), \ + F(type_batch, \ + {{"type", "batch"}, \ + ComputeLabelHolder::instance().getClusterIdLabel(), \ + ComputeLabelHolder::instance().getProcessIdLabel()})) \ + M(tiflash_shared_block_schemas, \ + "statistics about shared block schemas of ColumnFiles", \ + Gauge, \ + F(type_current_size, {{"type", "current_size"}}), \ + F(type_still_used_when_evict, {{"type", "still_used_when_evict"}}), \ + F(type_miss_count, {{"type", "miss_count"}}), \ + F(type_hit_count, {{"type", "hit_count"}})) \ + M(tiflash_storage_remote_stats, \ + "The file stats on remote store", \ + Gauge, \ + F(type_total_size, {"type", "total_size"}), \ + F(type_valid_size, {"type", "valid_size"}), \ + F(type_num_files, {"type", "num_files"})) \ + M(tiflash_storage_checkpoint_seconds, \ + "PageStorage checkpoint elapsed time", \ + Histogram, /* these command usually cost several seconds, increase the start bucket to 50ms */ \ + F(type_dump_checkpoint_snapshot, {{"type", "dump_checkpoint_snapshot"}}, ExpBuckets{0.05, 2, 20}), \ + F(type_dump_checkpoint_data, {{"type", "dump_checkpoint_data"}}, ExpBuckets{0.05, 2, 20}), \ + F(type_upload_checkpoint, {{"type", "upload_checkpoint"}}, ExpBuckets{0.05, 2, 20}), \ + F(type_copy_checkpoint_info, {{"type", "copy_checkpoint_info"}}, ExpBuckets{0.05, 2, 20})) \ + M(tiflash_storage_checkpoint_flow, \ + "The bytes flow cause by remote checkpoint", \ + Counter, \ + F(type_incremental, {"type", "incremental"}), \ + F(type_compaction, {"type", "compaction"})) \ + M(tiflash_storage_checkpoint_keys_by_types, \ + "The keys flow cause by remote checkpoint", \ + Counter, \ + F(type_raftengine, {"type", "raftengine"}), \ + F(type_kvengine, {"type", "kvengine"}), \ + F(type_kvstore, {"type", "kvstore"}), \ + F(type_data, {"type", "data"}), \ + F(type_log, {"type", "log"}), \ + F(type_meta, {"type", "kvstore"}), \ + F(type_localkv, {"type", "localkv"}), \ + F(type_unknown, {"type", "unknown"})) \ + M(tiflash_storage_checkpoint_flow_by_types, \ + "The bytes flow cause by remote checkpoint", \ + Counter, \ + F(type_raftengine, {"type", "raftengine"}), \ + F(type_kvengine, {"type", "kvengine"}), \ + F(type_kvstore, {"type", "kvstore"}), \ + F(type_data, {"type", "data"}), \ + F(type_log, {"type", "log"}), \ + F(type_meta, {"type", "kvstore"}), \ + F(type_localkv, {"type", "localkv"}), \ + F(type_unknown, {"type", "unknown"})) \ + M(tiflash_storage_page_data_by_types, \ + "The existing bytes stored in UniPageStorage", \ + Gauge, \ + F(type_raftengine, {"type", "raftengine"}), \ + F(type_kvengine, {"type", "kvengine"}), \ + F(type_kvstore, {"type", "kvstore"}), \ + F(type_data, {"type", "data"}), \ + F(type_log, {"type", "log"}), \ + F(type_meta, {"type", "kvstore"}), \ + F(type_localkv, {"type", "localkv"}), \ + F(type_unknown, {"type", "unknown"})) \ + M(tiflash_storage_s3_request_seconds, \ + "S3 request duration in seconds", \ + Histogram, \ + F(type_put_object, {{"type", "put_object"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_put_dmfile, {{"type", "put_dmfile"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_copy_object, {{"type", "copy_object"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_get_object, {{"type", "get_object"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_create_multi_part_upload, {{"type", "create_multi_part_upload"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_upload_part, {{"type", "upload_part"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_complete_multi_part_upload, {{"type", "complete_multi_part_upload"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_list_objects, {{"type", "list_objects"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_delete_object, {{"type", "delete_object"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_head_object, {{"type", "head_object"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_read_stream, {{"type", "read_stream"}}, ExpBuckets{0.0001, 2, 20})) \ + M(tiflash_storage_s3_http_request_seconds, \ + "S3 request duration breakdown in seconds", \ + Histogram, \ + F(type_dns, {{"type", "dns"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_connect, {{"type", "connect"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_request, {{"type", "request"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_response, {{"type", "response"}}, ExpBuckets{0.001, 2, 20})) \ + M(tiflash_pipeline_scheduler, \ + "pipeline scheduler", \ + Gauge, \ + F(type_waiting_tasks_count, {"type", "waiting_tasks_count"}), \ + F(type_cpu_pending_tasks_count, {"type", "cpu_pending_tasks_count"}), \ + F(type_cpu_executing_tasks_count, {"type", "cpu_executing_tasks_count"}), \ + F(type_io_pending_tasks_count, {"type", "io_pending_tasks_count"}), \ + F(type_io_executing_tasks_count, {"type", "io_executing_tasks_count"}), \ + F(type_cpu_task_thread_pool_size, {"type", "cpu_task_thread_pool_size"}), \ + F(type_io_task_thread_pool_size, {"type", "io_task_thread_pool_size"})) \ + M(tiflash_pipeline_task_duration_seconds, \ + "Bucketed histogram of pipeline task duration in seconds", \ + Histogram, /* these command usually cost several hundred milliseconds to several seconds, increase the start bucket to 5ms */ \ + F(type_cpu_execute, {{"type", "cpu_execute"}}, ExpBuckets{0.005, 2, 20}), \ + F(type_io_execute, {{"type", "io_execute"}}, ExpBuckets{0.005, 2, 20}), \ + F(type_cpu_queue, {{"type", "cpu_queue"}}, ExpBuckets{0.005, 2, 20}), \ + F(type_io_queue, {{"type", "io_queue"}}, ExpBuckets{0.005, 2, 20}), \ + F(type_await, {{"type", "await"}}, ExpBuckets{0.005, 2, 20})) \ + M(tiflash_pipeline_task_execute_max_time_seconds_per_round, \ + "Bucketed histogram of pipeline task execute max time per round in seconds", \ + Histogram, /* these command usually cost several hundred milliseconds to several seconds, increase the start bucket to 5ms */ \ + F(type_cpu, {{"type", "cpu"}}, ExpBuckets{0.005, 2, 20}), \ + F(type_io, {{"type", "io"}}, ExpBuckets{0.005, 2, 20})) \ + M(tiflash_pipeline_task_change_to_status, \ + "pipeline task change to status", \ + Counter, \ + F(type_to_waiting, {"type", "to_waiting"}), \ + F(type_to_running, {"type", "to_running"}), \ + F(type_to_io, {"type", "to_io"}), \ + F(type_to_finished, {"type", "to_finished"}), \ + F(type_to_error, {"type", "to_error"}), \ + F(type_to_cancelled, {"type", "to_cancelled"})) \ + M(tiflash_storage_s3_gc_status, \ + "S3 GC status", \ + Gauge, \ + F(type_lifecycle_added, {{"type", "lifecycle_added"}}), \ + F(type_lifecycle_failed, {{"type", "lifecycle_failed"}}), \ + F(type_owner, {{"type", "owner"}}), \ + F(type_running, {{"type", "running"}})) \ + M(tiflash_storage_s3_gc_seconds, \ + "S3 GC subprocess duration in seconds", \ + Histogram, /* these command usually cost several seconds, increase the start bucket to 500ms */ \ + F(type_total, {{"type", "total"}}, ExpBuckets{0.5, 2, 20}), \ + F(type_one_store, {{"type", "one_store"}}, ExpBuckets{0.5, 2, 20}), \ + F(type_read_locks, {{"type", "read_locks"}}, ExpBuckets{0.5, 2, 20}), \ + F(type_clean_locks, {{"type", "clean_locks"}}, ExpBuckets{0.5, 2, 20}), \ + F(type_clean_manifests, {{"type", "clean_manifests"}}, ExpBuckets{0.5, 2, 20}), \ + F(type_scan_then_clean_data_files, {{"type", "scan_then_clean_data_files"}}, ExpBuckets{0.5, 2, 20}), \ + F(type_clean_one_lock, {{"type", "clean_one_lock"}}, ExpBuckets{0.5, 2, 20})) \ + M(tiflash_storage_remote_cache, \ + "Operations of remote cache", \ + Counter, \ + F(type_dtfile_hit, {"type", "dtfile_hit"}), \ + F(type_dtfile_miss, {"type", "dtfile_miss"}), \ + F(type_dtfile_evict, {"type", "dtfile_evict"}), \ + F(type_dtfile_full, {"type", "dtfile_full"}), \ + F(type_dtfile_download, {"type", "dtfile_download"}), \ + F(type_dtfile_download_failed, {"type", "dtfile_download_failed"}), \ + F(type_page_hit, {"type", "page_hit"}), \ + F(type_page_miss, {"type", "page_miss"}), \ + F(type_page_evict, {"type", "page_evict"}), \ + F(type_page_full, {"type", "page_full"}), \ + F(type_page_download, {"type", "page_download"})) \ + M(tiflash_storage_remote_cache_bytes, \ + "Flow of remote cache", \ + Counter, \ + F(type_dtfile_evict_bytes, {"type", "dtfile_evict_bytes"}), \ + F(type_dtfile_download_bytes, {"type", "dtfile_download_bytes"}), \ + F(type_dtfile_read_bytes, {"type", "dtfile_read_bytes"}), \ + F(type_page_evict_bytes, {"type", "page_evict_bytes"}), \ + F(type_page_download_bytes, {"type", "page_download_bytes"}), \ + F(type_page_read_bytes, {"type", "page_read_bytes"})) \ + M(tiflash_storage_io_limiter_pending_seconds, \ + "I/O limiter pending duration in seconds", \ + Histogram, \ + F(type_fg_read, {{"type", "fg_read"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_bg_read, {{"type", "bg_read"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_fg_write, {{"type", "fg_write"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_bg_write, {{"type", "bg_write"}}, ExpBuckets{0.001, 2, 20})) \ + M(tiflash_system_seconds, \ + "system calls duration in seconds", \ + Histogram, \ + F(type_fsync, {{"type", "fsync"}}, ExpBuckets{0.0001, 2, 20})) \ + M(tiflash_storage_delta_index_cache, "", Counter, F(type_hit, {"type", "hit"}), F(type_miss, {"type", "miss"})) \ + M(tiflash_resource_group, \ + "meta info of resource group", \ + Gauge, \ + F(type_remaining_tokens, {"type", "remaining_tokens"}), \ + F(type_avg_speed, {"type", "avg_speed"}), \ + F(type_total_consumption, {"type", "total_consumption"}), \ + F(type_bucket_fill_rate, {"type", "bucket_fill_rate"}), \ + F(type_bucket_capacity, {"type", "bucket_capacity"}), \ + F(type_compute_ru_consumption, {"type", "compute_ru_consumption"}), \ + F(type_storage_ru_consumption, {"type", "storage_ru_consumption"}), \ + F(type_compute_ru_exhausted, {"type", "compute_ru_exhausted"}), \ + F(type_gac_req_acquire_tokens, {"type", "gac_req_acquire_tokens"}), \ + F(type_gac_req_ru_consumption_delta, {"type", "gac_req_ru_consumption_delta"}), \ + F(type_gac_resp_tokens, {"type", "gac_resp_tokens"}), \ + F(type_gac_resp_capacity, {"type", "gac_resp_capacity"})) \ + M(tiflash_storage_io_limiter_pending_count, \ + "I/O limiter pending count", \ + Counter, \ + F(type_fg_read, {"type", "fg_read"}), \ + F(type_bg_read, {"type", "bg_read"}), \ + F(type_fg_write, {"type", "fg_write"}), \ + F(type_bg_write, {"type", "bg_write"})) +>>>>>>> ce8ae39fb9 (Debug: Add find key debug invoker (#8853)) // clang-format on diff --git a/dbms/src/Debug/DBGInvoker.cpp b/dbms/src/Debug/DBGInvoker.cpp index 021d5457dac..e3a78910a55 100644 --- a/dbms/src/Debug/DBGInvoker.cpp +++ b/dbms/src/Debug/DBGInvoker.cpp @@ -25,6 +25,12 @@ #include #include #include +<<<<<<< HEAD +======= +#include +#include +#include +>>>>>>> ce8ae39fb9 (Debug: Add find key debug invoker (#8853)) #include #include @@ -104,6 +110,10 @@ DBGInvoker::DBGInvoker() regSchemalessFunc("region_snapshot_pre_handle_file_pks", MockRaftCommand::dbgFuncRegionSnapshotPreHandleDTFilesWithHandles); regSchemalessFunc("region_snapshot_apply_file", /* */ MockRaftCommand::dbgFuncRegionSnapshotApplyDTFiles); regSchemalessFunc("region_ingest_sst", MockRaftCommand::dbgFuncIngestSST); + // Test whether a PK exists in KVStore. + regSchemalessFunc("find_key_kvstore", dbgFuncFindKey); + // Test whether a PK exists in DT. + regSchemafulFunc("find_key_dt", dbgFuncFindKeyDt); regSchemalessFunc("init_fail_point", DbgFailPointFunc::dbgInitFailPoint); regSchemalessFunc("enable_fail_point", DbgFailPointFunc::dbgEnableFailPoint); diff --git a/dbms/src/Debug/MockTiKV.h b/dbms/src/Debug/MockKVStore/MockTiKV.h similarity index 100% rename from dbms/src/Debug/MockTiKV.h rename to dbms/src/Debug/MockKVStore/MockTiKV.h diff --git a/dbms/src/Debug/MockTiDB.cpp b/dbms/src/Debug/MockTiDB.cpp index 3b7ec9f70d4..925913ef6eb 100644 --- a/dbms/src/Debug/MockTiDB.cpp +++ b/dbms/src/Debug/MockTiDB.cpp @@ -642,7 +642,7 @@ TablePtr MockTiDB::getTableByNameInternal(const String & database_name, const St auto it = tables_by_name.find(qualified_name); if (it == tables_by_name.end()) { - throw Exception("Mock TiDB table " + qualified_name + " does not exists", ErrorCodes::UNKNOWN_TABLE); + throw Exception(ErrorCodes::UNKNOWN_TABLE, "Mock TiDB table {} does not exists", qualified_name); } return it->second; diff --git a/dbms/src/Debug/dbgFuncInvestigator.cpp b/dbms/src/Debug/dbgFuncInvestigator.cpp new file mode 100644 index 00000000000..88dd9e828d5 --- /dev/null +++ b/dbms/src/Debug/dbgFuncInvestigator.cpp @@ -0,0 +1,308 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace DB +{ + +struct MatchResult +{ + String mapped_database_name; + String table_name; + std::vector> in_default; + std::vector> in_write; + std::vector in_lock; + std::unordered_map regions; + + String toString() const + { + FmtBuffer fmt_buf; + fmt_buf.fmtAppend("default_cf "); + fmt_buf.joinStr( + in_default.begin(), + in_default.end(), + [](const auto & a, const auto &) { return fmt::format("{}-{}", a.first, a.second); }, + ":"); + fmt_buf.fmtAppend("; write_cf "); + fmt_buf.joinStr( + in_write.begin(), + in_write.end(), + [](const auto & a, const auto &) { return fmt::format("{}-{}", a.first, a.second); }, + ":"); + fmt_buf.fmtAppend("; lock_cf "); + fmt_buf.joinStr(in_lock.begin(), in_lock.end(), ":"); + for (const auto & [region_id, region] : regions) + { + fmt_buf.fmtAppend( + "; region {} {}, tikv_range: {}; ", + region_id, + region->getDebugString(), + region->getRange()->toDebugString()); + } + return fmt_buf.toString(); + } +}; + +/// 1. If the arg is [start1, end1], find all key-value pairs in this range; +/// 2. If the arg is [start1], make sure it is not a common handle, and we will only check the key-value pair by start1; +/// 3. If the arg is [start1, end1, start2, end2, ...], it must be a common handle, return all key-value pairs within the range. +void dbgFuncFindKey(Context & context, const ASTs & args, DBGInvoker::Printer output) +{ + if (args.size() < 3) + throw Exception( + "Args not matched, should be: database-name, table-name, start1 [, start2, ..., end1, end2, ...]", + ErrorCodes::BAD_ARGUMENTS); + + auto & tmt = context.getTMTContext(); + auto & kvstore = *tmt.getKVStore(); + MatchResult result; + const String & database_name_raw = typeid_cast(*args[0]).name; + const String & table_name = typeid_cast(*args[1]).name; + + auto maybe_database_name = mappedDatabaseWithOptional(context, database_name_raw); + if (maybe_database_name == std::nullopt) + { + output(fmt::format("Database {} not found.", database_name_raw)); + return; + } + result.mapped_database_name = maybe_database_name.value(); + auto & mapped_database_name = result.mapped_database_name; + result.table_name = table_name; + + auto schema_syncer = tmt.getSchemaSyncerManager(); + auto storage = tmt.getStorages().getByName(mapped_database_name, table_name, false); + if (storage == nullptr) + { + output(fmt::format("can't find table {} {}", mapped_database_name, table_name)); + return; + } + + auto table_info = storage->getTableInfo(); + schema_syncer->syncTableSchema(context, table_info.keyspace_id, table_info.id); + if (table_info.partition.num > 0) + { + for (const auto & def : table_info.partition.definitions) + { + schema_syncer->syncTableSchema(context, table_info.keyspace_id, def.id); + } + } + + auto table_id = table_info.id; + + // tablePrefix_rowPrefix_tableID_rowID + TiKVKey start_key, end_key; + HandleID start_handle, end_handle; + + constexpr static size_t OFFSET = 2; + if (table_info.is_common_handle) + { + size_t arg_size = args.size() - OFFSET; + // The `start` and `end` argments should be provided in pair. Therefore, the number of arguments must be even. + if ((arg_size & 1) != 0) + { + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "Args not matched for common handle table, arg_size={}, should be: database-name, table-name, " + "start_col, [, start_col2, ..., end_col1, end_col2, ...]", + arg_size); + } + size_t handle_column_size = table_info.is_common_handle ? table_info.getPrimaryIndexInfo().idx_cols.size() : 1; + auto key_size = arg_size / 2; + std::vector start_field; + start_field.reserve(key_size); + std::vector end_field; + end_field.reserve(key_size); + + for (size_t i = 0; i < handle_column_size; i++) + { + auto & column_info = table_info.columns[table_info.getPrimaryIndexInfo().idx_cols[i].offset]; + auto start_datum = TiDB::DatumBumpy( + RegionBench::convertField(column_info, typeid_cast(*args[OFFSET + i]).value), + column_info.tp); + start_field.emplace_back(start_datum.field()); + auto end_datum = TiDB::DatumBumpy( + RegionBench::convertField( + column_info, + typeid_cast(*args[OFFSET + key_size + i]).value), + column_info.tp); + end_field.emplace_back(end_datum.field()); + } + + start_key = RecordKVFormat::genKey(table_info, start_field); + end_key = RecordKVFormat::genKey(table_info, end_field); + } + else + { + start_handle = static_cast(safeGet(typeid_cast(*args[OFFSET]).value)); + start_key = RecordKVFormat::genKey(table_id, start_handle); + if (args.size() == 3) + { + end_handle = start_handle + 1; + } + else + { + end_handle + = static_cast(safeGet(typeid_cast(*args[OFFSET + 1]).value)); + } + end_key = RecordKVFormat::genKey(table_id, end_handle); + } + + auto range = RegionRangeKeys(TiKVKey::copyFrom(start_key), std::move(end_key)); + auto regions = kvstore.getRegionsByRangeOverlap(range.comparableKeys()); + + for (const auto & [region_id, region] : regions) + { + auto r = RegionBench::DebugRegion(region); + const auto & data = r.debugData(); + + for (const auto & [k, v] : data.defaultCF().getData()) + { + if (k.first == start_handle) + { + result.in_default.emplace_back(region_id, k.second); + } + } + for (const auto & [k, v] : data.writeCF().getData()) + { + if (k.first == start_handle) + { + result.in_write.emplace_back(region_id, k.second); + } + } + + auto lock_key = std::make_shared(TiKVKey::copyFrom(start_key)); + if (data.lockCF().getData().contains( + RegionLockCFDataTrait::Key{lock_key, std::string_view(lock_key->data(), lock_key->dataSize())})) + { + result.in_lock.emplace_back(region_id); + } + } + result.regions = regions; + output(fmt::format("find key result {}", result.toString())); +} + +BlockInputStreamPtr dbgFuncFindKeyDt(Context & context, const ASTs & args) +{ + if (args.size() < 4) + throw Exception( + "Args not matched, should be: database-name, table-name, key, value, [key2, value2, ...]", + ErrorCodes::BAD_ARGUMENTS); + + auto & tmt = context.getTMTContext(); + const String & database_name_raw = typeid_cast(*args[0]).name; + const String & table_name_raw = typeid_cast(*args[1]).name; + + auto maybe_database_name = mappedDatabaseWithOptional(context, database_name_raw); + if (maybe_database_name == std::nullopt) + { + LOG_INFO(DB::Logger::get(), "Can't find database {}", database_name_raw); + return std::make_shared("Error"); + } + auto mapped_database_name = maybe_database_name.value(); + auto mapped_qualified_table_name = mappedTable(context, database_name_raw, table_name_raw); + auto mapped_table_name = mapped_qualified_table_name.second; + + auto schema_syncer = tmt.getSchemaSyncerManager(); + auto storage = tmt.getStorages().getByName(mapped_database_name, table_name_raw, false); + if (storage == nullptr) + { + LOG_INFO(DB::Logger::get(), "Can't find database and table {}.{}", mapped_database_name, mapped_table_name); + return std::make_shared("Error"); + } + + auto table_info = storage->getTableInfo(); + schema_syncer->syncTableSchema(context, table_info.keyspace_id, table_info.id); + if (table_info.partition.num > 0) + { + for (const auto & def : table_info.partition.definitions) + { + schema_syncer->syncTableSchema(context, table_info.keyspace_id, def.id); + } + } + + auto key = safeGet(typeid_cast(*args[2]).value); + auto value = safeGet(typeid_cast(*args[3]).value); + + constexpr static size_t OFFSET = 4; + FmtBuffer fmt_buf; + auto key_size = args.size() - OFFSET; + if (key_size & 1) + { + LOG_INFO(DB::Logger::get(), "Key-values should be in pair {}", database_name_raw, table_name_raw); + return std::make_shared("Error"); + } + for (size_t i = 0; i != key_size / 2; i++) + { + auto k = safeGet(typeid_cast(*args[OFFSET + 2 * i]).value); + auto v = safeGet(typeid_cast(*args[OFFSET + 2 * i + 1]).value); + fmt_buf.fmtAppend(" and {} = {}", k, v); + } + String query; + if (table_info.is_common_handle && !table_info.pk_is_handle) + { + query = fmt::format( + "selraw *,_INTERNAL_VERSION,_INTERNAL_DELMARK,_tidb_rowid from {}.{} where {} = {}{}", + mapped_database_name, + mapped_table_name, + key, + value, + fmt_buf.toString()); + } + else + { + query = fmt::format( + "selraw *,_INTERNAL_VERSION,_INTERNAL_DELMARK from {}.{} where {} = {}{}", + mapped_database_name, + mapped_table_name, + key, + value, + fmt_buf.toString()); + } + LOG_INFO(DB::Logger::get(), "The query is {}", query); + ParserSelectQuery parser; + ASTPtr ast = parseQuery(parser, query.data(), query.data() + query.size(), "dbgFuncFindKeyDt", 0); + + InterpreterSelectQuery interpreter(ast, context); + auto res = interpreter.execute(); + return res.in; +} + + +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Debug/dbgFuncInvestigator.h b/dbms/src/Debug/dbgFuncInvestigator.h new file mode 100644 index 00000000000..12671186cb7 --- /dev/null +++ b/dbms/src/Debug/dbgFuncInvestigator.h @@ -0,0 +1,26 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +namespace DB +{ + +void dbgFuncFindKey(Context & context, const ASTs & args, DBGInvoker::Printer output); +BlockInputStreamPtr dbgFuncFindKeyDt(Context & context, const ASTs & args); + +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Debug/dbgFuncMockRaftCommand.cpp b/dbms/src/Debug/dbgFuncMockRaftCommand.cpp index b3ed7763a71..f140fa32750 100644 --- a/dbms/src/Debug/dbgFuncMockRaftCommand.cpp +++ b/dbms/src/Debug/dbgFuncMockRaftCommand.cpp @@ -14,9 +14,15 @@ #include #include +#include #include +<<<<<<< HEAD:dbms/src/Debug/dbgFuncMockRaftCommand.cpp #include #include +======= +#include +#include +>>>>>>> ce8ae39fb9 (Debug: Add find key debug invoker (#8853)):dbms/src/Debug/dbgKVStore/dbgFuncMockRaftCommand.cpp #include #include #include diff --git a/dbms/src/Debug/dbgFuncMockRaftSnapshot.cpp b/dbms/src/Debug/dbgFuncMockRaftSnapshot.cpp index b113cdabe73..e24b7e7455e 100644 --- a/dbms/src/Debug/dbgFuncMockRaftSnapshot.cpp +++ b/dbms/src/Debug/dbgFuncMockRaftSnapshot.cpp @@ -15,11 +15,19 @@ #include #include #include +<<<<<<< HEAD:dbms/src/Debug/dbgFuncMockRaftSnapshot.cpp #include #include #include #include #include +======= +#include +#include +#include +#include +#include +>>>>>>> ce8ae39fb9 (Debug: Add find key debug invoker (#8853)):dbms/src/Debug/dbgKVStore/dbgFuncMockRaftSnapshot.cpp #include #include #include diff --git a/dbms/src/Debug/dbgFuncRegion.cpp b/dbms/src/Debug/dbgFuncRegion.cpp index 9735c56e6cf..4b756bbad9d 100644 --- a/dbms/src/Debug/dbgFuncRegion.cpp +++ b/dbms/src/Debug/dbgFuncRegion.cpp @@ -14,9 +14,15 @@ #include #include +#include #include +<<<<<<< HEAD:dbms/src/Debug/dbgFuncRegion.cpp #include #include +======= +#include +#include +>>>>>>> ce8ae39fb9 (Debug: Add find key debug invoker (#8853)):dbms/src/Debug/dbgKVStore/dbgFuncRegion.cpp #include #include #include diff --git a/dbms/src/Debug/dbgNaturalDag.cpp b/dbms/src/Debug/dbgNaturalDag.cpp index 592131e1f18..adafb7aa833 100644 --- a/dbms/src/Debug/dbgNaturalDag.cpp +++ b/dbms/src/Debug/dbgNaturalDag.cpp @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include -#include #include #include #include diff --git a/dbms/src/Debug/dbgTools.cpp b/dbms/src/Debug/dbgTools.cpp index 21f4dd4e1ca..8fbfe44b964 100644 --- a/dbms/src/Debug/dbgTools.cpp +++ b/dbms/src/Debug/dbgTools.cpp @@ -13,8 +13,13 @@ // limitations under the License. #include +#include #include +<<<<<<< HEAD #include +======= +#include +>>>>>>> ce8ae39fb9 (Debug: Add find key debug invoker (#8853)) #include #include #include diff --git a/dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp b/dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp new file mode 100644 index 00000000000..c32d09a7b4e --- /dev/null +++ b/dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp @@ -0,0 +1,470 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +void UnavailableRegions::tryThrowRegionException() +{ + // For batch-cop request (not handled by disagg write node), all unavailable regions, include the ones with lock exception, should be collected and retry next round. + // For normal cop request, which only contains one region, LockException should be thrown directly and let upper layer (like client-c, tidb, tispark) handle it. + // For batch-cop request (handled by disagg write node), LockException should be thrown directly and let upper layer (disagg read node) handle it. + + if (is_wn_disagg_read && !region_locks.empty()) + throw LockException(std::move(region_locks)); + + if (!batch_cop && !region_locks.empty()) + throw LockException(std::move(region_locks)); + + if (!ids.empty()) + throw RegionException(std::move(ids), status, extra_msg.c_str()); +} + +void UnavailableRegions::addRegionWaitIndexTimeout( + const RegionID region_id, + UInt64 index_to_wait, + UInt64 current_applied_index) +{ + if (!batch_cop) + { + // If server is being terminated / time-out, add the region_id into `unavailable_regions` to other store. + addStatus(region_id, RegionException::RegionReadStatus::NOT_FOUND, ""); + return; + } + + // When wait index timeout happens, we return a `TiFlashException` instead of `RegionException` to break + // the read request from retrying. + // TODO: later maybe we can return SERVER_IS_BUSY to the client + throw TiFlashException( + Errors::Coprocessor::RegionError, + "Region unavailable, region_id={} wait_index={} applied_index={}", + region_id, + index_to_wait, + current_applied_index); +} + +/// LearnerReadWorker /// + +LearnerReadWorker::LearnerReadWorker( + MvccQueryInfo & mvcc_query_info_, + TMTContext & tmt_, + bool for_batch_cop, + bool is_wn_disagg_read, + const LoggerPtr & log_) + : mvcc_query_info(mvcc_query_info_) + , tmt(tmt_) + , kvstore(tmt.getKVStore()) + , log(log_) + , unavailable_regions(for_batch_cop, is_wn_disagg_read) +{ + assert(log != nullptr); + stats.num_regions = mvcc_query_info.regions_query_info.size(); +} + +LearnerReadSnapshot LearnerReadWorker::buildRegionsSnapshot() +{ + LearnerReadSnapshot regions_snapshot; + // check region is not null and store region map. + const auto & regions_info = mvcc_query_info.regions_query_info; + for (const auto & info : regions_info) + { + auto region = kvstore->getRegion(info.region_id); + if (region == nullptr) + { + LOG_WARNING(log, "region not found in KVStore, region_id={}", info.region_id); + throw RegionException({info.region_id}, RegionException::RegionReadStatus::NOT_FOUND, nullptr); + } + regions_snapshot.emplace(info.region_id, std::move(region)); + } + // make sure regions are not duplicated. + if (unlikely(regions_snapshot.size() != regions_info.size())) + throw TiFlashException( + Errors::Coprocessor::BadRequest, + "Duplicate region id, n_request={} n_actual={}", + regions_info.size(), + regions_snapshot.size()); + return regions_snapshot; +} + +std::vector LearnerReadWorker::buildBatchReadIndexReq( + const RegionTable & region_table, + const LearnerReadSnapshot & regions_snapshot, + RegionsReadIndexResult & batch_read_index_result) +{ + const auto & regions_info = mvcc_query_info.regions_query_info; + std::vector batch_read_index_req; + batch_read_index_req.reserve(regions_info.size()); + + // If using `std::numeric_limits::max()`, set `start-ts` 0 to get the latest index but let read-index-worker do not record as history. + auto read_index_tso + = mvcc_query_info.read_tso == std::numeric_limits::max() ? 0 : mvcc_query_info.read_tso; + for (const auto & region_to_query : regions_info) + { + const RegionID region_id = region_to_query.region_id; + // don't stale read in test scenarios. + bool can_stale_read = mvcc_query_info.read_tso != std::numeric_limits::max() + && read_index_tso <= region_table.getSelfSafeTS(region_id); + if (can_stale_read) + { + batch_read_index_result.emplace(region_id, kvrpcpb::ReadIndexResponse()); + ++stats.num_stale_read; + continue; + } + + if (auto ori_read_index = mvcc_query_info.getReadIndexRes(region_id); ori_read_index) + { + // the read index result from cache + auto resp = kvrpcpb::ReadIndexResponse(); + resp.set_read_index(ori_read_index); + batch_read_index_result.emplace(region_id, std::move(resp)); + ++stats.num_cached_read_index; + } + else + { + // generate request for read index + const auto & region = regions_snapshot.find(region_id)->second; + batch_read_index_req.emplace_back(GenRegionReadIndexReq(*region, read_index_tso)); + ++stats.num_read_index_request; + } + } + assert(stats.num_regions == stats.num_stale_read + stats.num_cached_read_index + stats.num_read_index_request); + return batch_read_index_req; +} + +void LearnerReadWorker::doBatchReadIndex( + const std::vector & batch_read_index_req, + const UInt64 timeout_ms, + RegionsReadIndexResult & batch_read_index_result) +{ + const auto & make_default_batch_read_index_result = [&](bool with_region_error) { + for (const auto & req : batch_read_index_req) + { + auto resp = kvrpcpb::ReadIndexResponse(); + if (with_region_error) + resp.mutable_region_error()->mutable_region_not_found(); + batch_read_index_result.emplace(req.context().region_id(), std::move(resp)); + } + }; + kvstore->addReadIndexEvent(1); + SCOPE_EXIT({ kvstore->addReadIndexEvent(-1); }); + if (!tmt.checkRunning()) + { + make_default_batch_read_index_result(true); + return; + } + + if (!kvstore->getProxyHelper()) + { + // Only in mock test, `proxy_helper` will be `nullptr`. Set `read_index` to 0 and skip waiting. + make_default_batch_read_index_result(false); + return; + } + + /// Blocking learner read. Note that learner read must be performed ahead of data read, + /// otherwise the desired index will be blocked by the lock of data read. + auto res = kvstore->batchReadIndex(batch_read_index_req, timeout_ms); + for (auto && [resp, region_id] : res) + { + batch_read_index_result.emplace(region_id, std::move(resp)); + } +} + +void LearnerReadWorker::recordReadIndexError( + const LearnerReadSnapshot & regions_snapshot, + RegionsReadIndexResult & read_index_result) +{ + // if size of batch_read_index_result is not equal with batch_read_index_req, there must be region_error/lock, find and return directly. + for (auto & [region_id, resp] : read_index_result) + { + std::string extra_msg; + if (resp.has_region_error()) + { + const auto & region_error = resp.region_error(); + auto region_status = RegionException::RegionReadStatus::OTHER; + if (region_error.has_epoch_not_match()) + { + auto snapshot_region_iter = regions_snapshot.find(region_id); + if (snapshot_region_iter != regions_snapshot.end()) + { + extra_msg = fmt::format( + "read_index_resp error, region_id={} version={} conf_version={}", + region_id, + snapshot_region_iter->second.create_time_version, + snapshot_region_iter->second.create_time_conf_ver); + } + else + { + extra_msg = fmt::format("read_index_resp error, region_id={} not found in snapshot", region_id); + } + GET_METRIC(tiflash_raft_learner_read_failures_count, type_epoch_not_match).Increment(); + region_status = RegionException::RegionReadStatus::EPOCH_NOT_MATCH; + } + else if (region_error.has_not_leader()) + { + GET_METRIC(tiflash_raft_learner_read_failures_count, type_not_leader).Increment(); + region_status = RegionException::RegionReadStatus::NOT_LEADER; + } + else if (region_error.has_region_not_found()) + { + GET_METRIC(tiflash_raft_learner_read_failures_count, type_not_found_tikv).Increment(); + region_status = RegionException::RegionReadStatus::NOT_FOUND_TIKV; + } + // Below errors seldomly happens in raftstore-v1, however, we are not sure if they will happen in v2. + else if (region_error.has_flashbackinprogress() || region_error.has_flashbacknotprepared()) + { + GET_METRIC(tiflash_raft_learner_read_failures_count, type_flashback).Increment(); + region_status = RegionException::RegionReadStatus::FLASHBACK; + } + else if (region_error.has_bucket_version_not_match()) + { + GET_METRIC(tiflash_raft_learner_read_failures_count, type_bucket_epoch_not_match).Increment(); + LOG_DEBUG( + log, + "meet abnormal region error {}, [region_id={}]", + resp.region_error().DebugString(), + region_id); + region_status = RegionException::RegionReadStatus::BUCKET_EPOCH_NOT_MATCH; + } + else if (region_error.has_key_not_in_region()) + { + GET_METRIC(tiflash_raft_learner_read_failures_count, type_key_not_in_region).Increment(); + LOG_DEBUG( + log, + "meet abnormal region error {}, [region_id={}]", + resp.region_error().DebugString(), + region_id); + region_status = RegionException::RegionReadStatus::KEY_NOT_IN_REGION; + } + else if ( + region_error.has_server_is_busy() || region_error.has_raft_entry_too_large() + || region_error.has_region_not_initialized() || region_error.has_disk_full() + || region_error.has_read_index_not_ready() || region_error.has_proposal_in_merging_mode()) + { + GET_METRIC(tiflash_raft_learner_read_failures_count, type_tikv_server_issue).Increment(); + LOG_DEBUG(log, "meet abnormal region error {}, [region_id={}]", resp.ShortDebugString(), region_id); + region_status = RegionException::RegionReadStatus::TIKV_SERVER_ISSUE; + } + else + { + GET_METRIC(tiflash_raft_learner_read_failures_count, type_other).Increment(); + LOG_DEBUG(log, "meet abnormal region error {}, [region_id={}]", resp.ShortDebugString(), region_id); + } + unavailable_regions.addStatus(region_id, region_status, std::move(extra_msg)); + } + else if (resp.has_locked()) + { + GET_METRIC(tiflash_raft_learner_read_failures_count, type_tikv_lock).Increment(); + unavailable_regions.addRegionLock(region_id, LockInfoPtr(resp.release_locked())); + } + else + { + // cache read-index to avoid useless overhead about retry. + // resp.read_index() is 0 when stale read, skip it to avoid overwriting read_index res from the last retry. + if (resp.read_index() != 0) + { + mvcc_query_info.addReadIndexResToCache(region_id, resp.read_index()); + } + } + } +} + +RegionsReadIndexResult LearnerReadWorker::readIndex( + const LearnerReadSnapshot & regions_snapshot, + UInt64 timeout_ms, + Stopwatch & watch) +{ + RegionsReadIndexResult batch_read_index_result; + const auto batch_read_index_req + = buildBatchReadIndexReq(tmt.getRegionTable(), regions_snapshot, batch_read_index_result); + + GET_METRIC(tiflash_stale_read_count).Increment(stats.num_stale_read); + GET_METRIC(tiflash_raft_read_index_count).Increment(batch_read_index_req.size()); + + doBatchReadIndex(batch_read_index_req, timeout_ms, batch_read_index_result); + + stats.read_index_elapsed_ms = watch.elapsedMilliseconds(); + GET_METRIC(tiflash_raft_read_index_duration_seconds).Observe(stats.read_index_elapsed_ms / 1000.0); + recordReadIndexError(regions_snapshot, batch_read_index_result); + + const auto log_lvl = unavailable_regions.empty() ? Poco::Message::PRIO_DEBUG : Poco::Message::PRIO_INFORMATION; + LOG_IMPL( + log, + log_lvl, + "[Learner Read] Batch read index, num_regions={} num_requests={} num_stale_read={} num_cached_index={} " + "num_unavailable={} " + "cost={}ms", + stats.num_regions, + stats.num_read_index_request, + stats.num_stale_read, + stats.num_cached_read_index, + unavailable_regions.size(), + stats.read_index_elapsed_ms); + + return batch_read_index_result; +} + +void LearnerReadWorker::waitIndex( + const LearnerReadSnapshot & regions_snapshot, + RegionsReadIndexResult & batch_read_index_result, + const UInt64 timeout_ms, + Stopwatch & watch) +{ + const auto & regions_info = mvcc_query_info.regions_query_info; + for (const auto & region_to_query : regions_info) + { + // if region is unavailable, skip wait index. + if (unavailable_regions.contains(region_to_query.region_id)) + continue; + + const auto & region = regions_snapshot.find(region_to_query.region_id)->second; + + const auto total_wait_index_elapsed_ms = watch.elapsedMilliseconds(); + const auto index_to_wait = batch_read_index_result.find(region_to_query.region_id)->second.read_index(); + if (timeout_ms != 0 && total_wait_index_elapsed_ms > timeout_ms) + { + // Wait index timeout is enabled && timeout happens, simply check all Regions' applied index + // instead of wait index for Regions one by one. + if (!region->checkIndex(index_to_wait)) + { + auto current = region->appliedIndex(); + unavailable_regions.addRegionWaitIndexTimeout(region_to_query.region_id, index_to_wait, current); + } + continue; // timeout happens, check next region quickly + } + + // Wait index timeout is disabled; or timeout is enabled but not happen yet, wait index for + // a specify Region. + const auto [wait_res, time_cost] = region->waitIndex( + index_to_wait, + timeout_ms, + [this]() { return tmt.checkRunning(); }, + log); + if (wait_res != WaitIndexStatus::Finished) + { + auto current = region->appliedIndex(); + unavailable_regions.addRegionWaitIndexTimeout(region_to_query.region_id, index_to_wait, current); + continue; // error or timeout happens, check next region quickly + } + + if (time_cost > 0) + { + // Only record information if wait-index does happen + GET_METRIC(tiflash_raft_wait_index_duration_seconds).Observe(time_cost); + } + + if (unlikely(!mvcc_query_info.resolve_locks)) + { + continue; + } + + // Try to resolve locks and flush data into storage layer + const auto & physical_table_id = region_to_query.physical_table_id; + auto res = RegionTable::resolveLocksAndWriteRegion( + tmt, + physical_table_id, + region, + mvcc_query_info.read_tso, + region_to_query.bypass_lock_ts, + region_to_query.version, + region_to_query.conf_version, + log); + + std::visit( + variant_op::overloaded{ + [&](LockInfoPtr & lock) { unavailable_regions.addRegionLock(region->id(), std::move(lock)); }, + [&](RegionException::RegionReadStatus & status) { + if (status != RegionException::RegionReadStatus::OK) + { + LOG_WARNING( + log, + "Check memory cache, region_id={} version={} handle_range={} status={}", + region_to_query.region_id, + region_to_query.version, + RecordKVFormat::DecodedTiKVKeyRangeToDebugString(region_to_query.range_in_table), + magic_enum::enum_name(status)); + unavailable_regions.addStatus(region->id(), status, "resolveLock"); + } + }, + }, + res); + } // wait index for next region + + stats.wait_index_elapsed_ms = watch.elapsedMilliseconds(); + const auto log_lvl = unavailable_regions.empty() ? Poco::Message::PRIO_DEBUG : Poco::Message::PRIO_INFORMATION; + LOG_IMPL( + log, + log_lvl, + "[Learner Read] Finish wait index and resolve locks, wait_cost={}ms n_regions={} n_unavailable={}", + stats.wait_index_elapsed_ms, + stats.num_regions, + unavailable_regions.size()); +} + +std::tuple // +LearnerReadWorker::waitUntilDataAvailable( + const LearnerReadSnapshot & regions_snapshot, + UInt64 read_index_timeout_ms, + UInt64 wait_index_timeout_ms) +{ + const auto start_time = Clock::now(); + + Stopwatch watch; + RegionsReadIndexResult batch_read_index_result = readIndex(regions_snapshot, read_index_timeout_ms, watch); + watch.restart(); // restart to count the elapsed of wait index + waitIndex(regions_snapshot, batch_read_index_result, wait_index_timeout_ms, watch); + + const auto end_time = Clock::now(); + const auto time_elapsed_ms = std::chrono::duration_cast(end_time - start_time).count(); + GET_METRIC(tiflash_syncing_data_freshness).Observe(time_elapsed_ms / 1000.0); // For DBaaS SLI + + // TODO should we try throw immediately after readIndex? + // Throw Region exception if there are any unavailable regions, the exception will be handled in the + // following methods + // - `CoprocessorHandler::execute` + // - `FlashService::EstablishDisaggTask` + // - `DAGDriver::execute` + // - `DAGStorageInterpreter::doBatchCopLearnerRead` + // - `DAGStorageInterpreter::buildLocalStreamsForPhysicalTable` + // - `DAGStorageInterpreter::buildLocalExecForPhysicalTable` + unavailable_regions.tryThrowRegionException(); + + // Use info level if read wait index run slow or any unavailable region exists + const auto log_lvl = (time_elapsed_ms > 1000 || !unavailable_regions.empty()) // + ? Poco::Message::PRIO_INFORMATION + : Poco::Message::PRIO_DEBUG; + LOG_IMPL( + log, + log_lvl, + "[Learner Read] batch read index | wait index" + " total_cost={} read_cost={} wait_cost={} n_regions={} n_stale_read={} n_unavailable={}", + time_elapsed_ms, + stats.read_index_elapsed_ms, + stats.wait_index_elapsed_ms, + stats.num_regions, + stats.num_stale_read, + unavailable_regions.size()); + return {start_time, end_time}; +} + +} // namespace DB diff --git a/dbms/src/Storages/KVStore/tests/gtest_async_tasks.cpp b/dbms/src/Storages/KVStore/tests/gtest_async_tasks.cpp new file mode 100644 index 00000000000..1f57eed56cb --- /dev/null +++ b/dbms/src/Storages/KVStore/tests/gtest_async_tasks.cpp @@ -0,0 +1,319 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +namespace DB +{ +namespace tests +{ +TEST(AsyncTasksTest, AsyncTasksNormal) +{ + using namespace std::chrono_literals; + using TestAsyncTasks = AsyncTasks, void>; + + auto log = DB::Logger::get(); + LOG_INFO(log, "Cancel and addTask"); + // Cancel and addTask + { + auto async_tasks = std::make_unique(1, 1, 2); + auto m = std::make_shared(); + auto m2 = std::make_shared(); + int flag = 0; + std::unique_lock cl(*m); + async_tasks->addTask(1, [m, &flag, &async_tasks, &m2]() { + auto cancel_handle = async_tasks->getCancelHandleFromExecutor(1); + std::scoped_lock rl2(*m2); + std::scoped_lock rl(*m); + if (cancel_handle->isCanceled()) + { + return; + } + flag = 1; + }); + async_tasks->asyncCancelTask(1); + ASSERT_FALSE(async_tasks->isScheduled(1)); + async_tasks->addTask(1, [&flag]() { flag = 2; }); + cl.unlock(); + std::scoped_lock rl2(*m2); + ASSERT_NO_THROW(async_tasks->fetchResult(1)); + ASSERT_EQ(flag, 2); + } + + // Lifetime of tasks + LOG_INFO(log, "Lifetime of tasks"); + { + auto async_tasks = std::make_unique(1, 1, 1); + auto sp_after_sched = SyncPointCtl::enableInScope("after_AsyncTasks::addTask_scheduled"); + auto sp_before_quit = SyncPointCtl::enableInScope("before_AsyncTasks::addTask_quit"); + std::thread t1([&]() { + sp_after_sched.waitAndPause(); + ASSERT_EQ(async_tasks->unsafeQueryState(1), TestAsyncTasks::TaskState::NotScheduled); + sp_after_sched.next(); + sp_after_sched.disable(); + }); + std::thread t2([&]() { + sp_before_quit.waitAndPause(); + ASSERT_EQ(async_tasks->unsafeQueryState(1), TestAsyncTasks::TaskState::InQueue); + sp_before_quit.next(); + sp_before_quit.disable(); + std::this_thread::sleep_for(50ms); + ASSERT_TRUE(async_tasks->isReady(1)); + }); + auto res = async_tasks->addTask(1, []() {}); + ASSERT_TRUE(res); + t1.join(); + t2.join(); + } + + // Cancel in queue + LOG_INFO(log, "Cancel in queue"); + { + auto async_tasks = std::make_unique(1, 1, 2); + bool finished = false; + bool canceled = false; + std::mutex mtx; + std::unique_lock cl(mtx); + + auto res1 = async_tasks->addTask(1, [&]() { + std::scoped_lock rl(mtx); + UNUSED(rl); + }); + ASSERT_TRUE(res1); + + auto res2 = async_tasks->addTaskWithCancel( + 2, + [&]() { finished = true; }, + [&]() { canceled = true; }); + ASSERT_TRUE(res2); + + async_tasks->asyncCancelTask(2); + cl.unlock(); + + int elapsed = 0; + while (true) + { + if (canceled) + { + break; + } + ++elapsed; + std::this_thread::sleep_for(50ms); + } + ASSERT_TRUE(elapsed < 10); + ASSERT_FALSE(finished); + } + + // Block cancel + LOG_INFO(log, "Block cancel"); + { + auto async_tasks = std::make_unique(2, 2, 10); + int total = 9; + int finished = 0; + std::vector f(total, false); + for (int i = 0; i < total; i++) + { + auto res = async_tasks->addTask(i, [i, &async_tasks, &finished, log]() { + auto cancel_handle = async_tasks->getCancelHandleFromExecutor(i); + while (true) + { + std::this_thread::sleep_for(100ms); + if (cancel_handle->isCanceled()) + { + break; + } + } + finished += 1; + }); + // Ensure thread 1 is the first + if (i == 0) + std::this_thread::sleep_for(10ms); + ASSERT_TRUE(res); + } + + while (finished < total) + { + std::this_thread::sleep_for(100ms); + for (int i = 0; i < total; i++) + { + if (f[i]) + continue; + if (async_tasks->blockedCancelRunningTask(i) == AsyncTaskHelper::TaskState::InQueue) + { + // Cancel in queue, should manually add `finished`. + finished += 1; + } + f[i] = true; + break; + } + } + + for (int i = 0; i < total; i++) + { + ASSERT_TRUE(f[i]); + } + ASSERT_EQ(async_tasks->count(), 0); + } + + // Cancel tasks in queue + LOG_INFO(log, "Cancel tasks in queue"); + { + auto async_tasks = std::make_unique(1, 1, 100); + + int total = 7; + std::atomic_int finished = 0; + for (int i = 0; i < total; i++) + { + auto res = async_tasks->addTaskWithCancel( + i, + [i, &async_tasks, &finished]() { + while (true) + { + auto cancel_handle = async_tasks->getCancelHandleFromExecutor(i); + // Busy loop to take over cpu + if (cancel_handle->isCanceled()) + { + break; + } + } + finished.fetch_add(1); + }, + [&]() { finished.fetch_add(1); }); + // Ensure task 1 is the first to handle + if (i == 0) + std::this_thread::sleep_for(10ms); + ASSERT_TRUE(res); + } + + for (int i = 0; i < total; i++) + { + std::this_thread::sleep_for(100ms); + async_tasks->asyncCancelTask(i); + // Throw on double cancel + EXPECT_THROW(async_tasks->asyncCancelTask(i), Exception); + } + + int elapsed = 0; + while (true) + { + if (finished >= total) + { + break; + } + ++elapsed; + std::this_thread::sleep_for(100ms); + } + ASSERT_TRUE(elapsed < 50); + ASSERT_EQ(async_tasks->count(), 0); + } +} + +TEST(AsyncTasksTest, AsyncTasksCommon) +try +{ + using namespace std::chrono_literals; + + using TestAsyncTasks = AsyncTasks, int>; + auto async_tasks = std::make_unique(1, 1, 2); + + int total = 5; + int max_steps = 10; + int current_step = 0; + std::vector f(total, false); + std::vector s(total, false); + bool initial_loop = true; + while (true) + { + ASSERT(current_step < max_steps); + auto count = std::accumulate(f.begin(), f.end(), 0, [&](int a, bool b) -> int { return a + int(b); }); + if (count >= total) + { + break; + } + + auto to_be_canceled = total - 1; + + if (s[to_be_canceled] && !f[to_be_canceled]) + { + auto state = async_tasks->queryState(to_be_canceled); + RUNTIME_CHECK(state == TestAsyncTasks::TaskState::InQueue || state == TestAsyncTasks::TaskState::Running); + async_tasks->asyncCancelTask( + to_be_canceled, + []() {}, + true); + f[to_be_canceled] = true; + ASSERT_EQ(async_tasks->queryState(to_be_canceled), TestAsyncTasks::TaskState::NotScheduled); + ASSERT_EQ(f[to_be_canceled], true); + ASSERT_EQ(s[to_be_canceled], true); + } + + // Add tasks + for (int i = 0; i < total; ++i) + { + if (!s[i]) + { + auto res = async_tasks->addTask(i, [i, &async_tasks, to_be_canceled, &f]() { + if (i == to_be_canceled) + { + auto cancel_handle = async_tasks->getCancelHandleFromExecutor(i); + while (true) + { + if (cancel_handle->blockedWaitFor(100ms)) + { + f[to_be_canceled] = true; + break; + } + } + } + else + { + std::this_thread::sleep_for(100ms); + } + return 1; + }); + if (res) + { + s[i] = true; + } + // In the first loop, only the first task can run. + if (initial_loop) + ASSERT_EQ(res, i <= 1); + } + } + + // Fetch result + for (int i = 0; i < total; ++i) + { + if (!f[i]) + { + if (async_tasks->isReady(i)) + { + [[maybe_unused]] auto r = async_tasks->fetchResult(i); + f[i] = true; + } + } + } + initial_loop = false; + std::this_thread::sleep_for(100ms); + current_step++; + } + + ASSERT_EQ(async_tasks->count(), 0); +} +CATCH + +} // namespace tests +} // namespace DB \ No newline at end of file