Skip to content

Commit

Permalink
FAP: Implement two phase fast add peer (#8377)
Browse files Browse the repository at this point in the history
close #8382
  • Loading branch information
CalvinNeo authored Nov 29, 2023
1 parent cf0e9af commit 79a71b3
Show file tree
Hide file tree
Showing 48 changed files with 1,594 additions and 371 deletions.
2 changes: 1 addition & 1 deletion contrib/tiflash-proxy
Submodule tiflash-proxy updated 25 files
+1 −0 proxy_components/engine_store_ffi/src/core/fast_add_peer.rs
+8 −0 proxy_components/engine_store_ffi/src/core/forward_raft/command.rs
+11 −2 proxy_components/engine_store_ffi/src/core/forward_raft/snapshot.rs
+9 −1 proxy_components/engine_store_ffi/src/engine/ffihub_impl.rs
+4 −1 proxy_components/engine_tiflash/src/proxy_utils/hub_impls.rs
+6 −0 proxy_components/engine_tiflash/src/proxy_utils/proxy_ext.rs
+1 −0 proxy_components/mock-engine-store/src/mock_cluster/cluster_ext.rs
+1 −0 proxy_components/mock-engine-store/src/mock_cluster/mixed_cluster.rs
+27 −3 proxy_components/mock-engine-store/src/mock_cluster/test_utils.rs
+30 −0 proxy_components/mock-engine-store/src/mock_cluster/v1/mod.rs
+7 −0 proxy_components/mock-engine-store/src/mock_cluster/v1/node.rs
+7 −0 proxy_components/mock-engine-store/src/mock_cluster/v1/server.rs
+27 −3 proxy_components/mock-engine-store/src/mock_store/mock_core.rs
+42 −4 proxy_components/mock-engine-store/src/mock_store/mock_engine_store_server.rs
+32 −13 proxy_components/mock-engine-store/src/mock_store/mock_fast_add_peer_impls.rs
+6 −1 proxy_components/mock-engine-store/src/mock_store/mock_snapshot_impls.rs
+5 −0 proxy_components/proxy_ffi/src/engine_store_helper_impls.rs
+15 −1 proxy_components/proxy_ffi/src/interfaces.rs
+1 −0 proxy_components/proxy_server/src/run.rs
+1 −2 proxy_tests/proxy/shared/config.rs
+326 −15 proxy_tests/proxy/shared/fast_add_peer.rs
+26 −4 proxy_tests/proxy/utils/common.rs
+1 −1 raftstore-proxy/ffi/src/RaftStoreProxyFFI/@version
+7 −0 raftstore-proxy/ffi/src/RaftStoreProxyFFI/ProxyFFI.h
+10 −1 readme.md
7 changes: 4 additions & 3 deletions dbms/src/Columns/ColumnVector.h
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,10 @@ class ColumnVector final : public COWPtrHelper<ColumnVectorHelper, ColumnVector<
else
{
throw Exception(
"Detected overflow when decoding integer of length " + std::to_string(length)
+ " with column type " + this->getName(),
ErrorCodes::LOGICAL_ERROR);
ErrorCodes::LOGICAL_ERROR,
"Detected overflow when decoding integer of length {} with column type {}",
length,
this->getName());
}
}

Expand Down
95 changes: 86 additions & 9 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#pragma once

#include <Common/ComputeLabelHolder.h>
#include <Common/Exception.h>
#include <Common/ProcessCollector.h>
#include <Common/TiFlashBuildInfo.h>
#include <Common/nocopyable.h>
Expand All @@ -26,13 +27,11 @@
#include <prometheus/histogram.h>
#include <prometheus/registry.h>

#include <cassert>
#include <ext/scope_guard.h>
#include <mutex>
#include <shared_mutex>

// to make GCC 11 happy
#include <cassert>

namespace DB
{
/// Central place to define metrics across all subsystems.
Expand Down Expand Up @@ -376,16 +375,22 @@ namespace DB
"", \
Counter, \
F(type_total, {{"type", "total"}}), \
F(type_success_transform, {{"type", "success_transform"}}), \
F(type_failed_other, {{"type", "failed_other"}}), \
F(type_failed_cancel, {{"type", "failed_cancel"}}), \
F(type_failed_no_suitable, {{"type", "failed_no_suitable"}}), \
F(type_failed_timeout, {{"type", "failed_timeout"}}), \
F(type_failed_baddata, {{"type", "failed_baddata"}}), \
F(type_failed_repeated, {{"type", "failed_repeated"}}), \
F(type_succeed, {{"type", "succeed"}})) \
M(tiflash_fap_task_state, \
"", \
Gauge, \
F(type_ongoing, {{"type", "ongoing"}}), \
F(type_ingesting_stage, {{"type", "ingesting_stage"}}), \
F(type_building_stage, {{"type", "building_stage"}})) \
F(type_writing_stage, {{"type", "writing_stage"}}), \
F(type_queueing_stage, {{"type", "queueing_stage"}}), \
F(type_selecting_stage, {{"type", "selecting_stage"}})) \
M(tiflash_fap_nomatch_reason, \
"", \
Counter, \
Expand All @@ -395,10 +400,12 @@ namespace DB
M(tiflash_fap_task_duration_seconds, \
"", \
Histogram, \
F(type_build_stage, {{"type", "build_stage"}}, ExpBuckets{0.05, 2, 60}), \
F(type_success, {{"type", "success"}}, ExpBuckets{0.05, 2, 60}), \
F(type_ingest_stage, {{"type", "ingest_stage"}}, ExpBuckets{0.05, 2, 60}), \
F(type_total, {{"type", "total"}}, ExpBuckets{0.05, 2, 60})) \
F(type_select_stage, {{"type", "select_stage"}}, ExpBucketsWithRange{0.1, 2, 60}), \
F(type_write_stage, {{"type", "write_stage"}}, ExpBucketsWithRange{0.05, 2, 60}), \
F(type_ingest_stage, {{"type", "ingest_stage"}}, ExpBucketsWithRange{0.05, 2, 30}), \
F(type_total, {{"type", "total"}}, ExpBucketsWithRange{0.1, 2, 300}), \
F(type_queue_stage, {{"type", "queue_stage"}}, ExpBucketsWithRange{0.1, 2, 300}), \
F(type_phase1_total, {{"type", "phase1_total"}}, ExpBucketsWithRange{0.2, 2, 80})) \
M(tiflash_raft_command_duration_seconds, \
"Bucketed histogram of some raft command: apply snapshot and ingest SST", \
Histogram, /* these command usually cost several seconds, increase the start bucket to 50ms */ \
Expand All @@ -407,7 +414,7 @@ namespace DB
F(type_ingest_sst_sst2dt, {{"type", "ingest_sst_sst2dt"}}, ExpBuckets{0.05, 2, 10}), \
F(type_ingest_sst_upload, {{"type", "ingest_sst_upload"}}, ExpBuckets{0.05, 2, 10}), \
F(type_apply_snapshot_predecode, {{"type", "snapshot_predecode"}}, ExpBuckets{0.05, 2, 15}), \
F(type_apply_snapshot_total, {{"type", "snapshot_total"}}, ExpBuckets{0.2, 2, 60}), \
F(type_apply_snapshot_total, {{"type", "snapshot_total"}}, ExpBucketsWithRange{0.1, 2, 600}), \
F(type_apply_snapshot_predecode_sst2dt, {{"type", "snapshot_predecode_sst2dt"}}, ExpBuckets{0.05, 2, 15}), \
F(type_apply_snapshot_predecode_parallel_wait, \
{{"type", "snapshot_predecode_parallel_wait"}}, \
Expand Down Expand Up @@ -620,6 +627,7 @@ namespace DB
F(type_data, {"type", "data"}), \
F(type_log, {"type", "log"}), \
F(type_meta, {"type", "kvstore"}), \
F(type_localkv, {"type", "localkv"}), \
F(type_unknown, {"type", "unknown"})) \
M(tiflash_storage_checkpoint_flow_by_types, \
"The bytes flow cause by remote checkpoint", \
Expand All @@ -630,6 +638,7 @@ namespace DB
F(type_data, {"type", "data"}), \
F(type_log, {"type", "log"}), \
F(type_meta, {"type", "kvstore"}), \
F(type_localkv, {"type", "localkv"}), \
F(type_unknown, {"type", "unknown"})) \
M(tiflash_storage_page_data_by_types, \
"The existing bytes stored in UniPageStorage", \
Expand All @@ -640,6 +649,7 @@ namespace DB
F(type_data, {"type", "data"}), \
F(type_log, {"type", "log"}), \
F(type_meta, {"type", "kvstore"}), \
F(type_localkv, {"type", "localkv"}), \
F(type_unknown, {"type", "unknown"})) \
M(tiflash_storage_s3_request_seconds, \
"S3 request duration in seconds", \
Expand Down Expand Up @@ -770,6 +780,68 @@ struct ExpBuckets
const double base;
const size_t size;

constexpr ExpBuckets(const double start_, const double base_, const size_t size_)
: start(start_)
, base(base_)
, size(size_)
{
#ifndef NDEBUG
// Checks under debug mode
// Check the base
RUNTIME_CHECK_MSG(base > 1.0, "incorrect base for ExpBuckets, start={} base={} size={}", start, base, size);
// Too many buckets will bring more network flow by transferring metrics
RUNTIME_CHECK_MSG(
size <= 50,
"too many metrics buckets, reconsider step/unit, start={} base={} size={}",
start,
base,
size);
#endif
}

// NOLINTNEXTLINE(google-explicit-constructor)
inline operator prometheus::Histogram::BucketBoundaries() const &&
{
prometheus::Histogram::BucketBoundaries buckets(size);
double current = start;
std::for_each(buckets.begin(), buckets.end(), [&](auto & e) {
e = current;
current *= base;
});
return buckets;
}
};

/// Buckets with boundaries [start * base^0, start * base^1, ..., start * base^x]
/// such x that start * base^(x-1) < end, and start * base^x >= end.
struct ExpBucketsWithRange
{
static size_t getSize(double l, double r, double b)
{
return static_cast<size_t>(::ceil(::log(r / l) / ::log(b))) + 1;
}

ExpBucketsWithRange(double start_, double base_, double end_)
: start(start_)
, base(base_)
, size(ExpBucketsWithRange::getSize(start_, end_, base_))
{
#ifndef NDEBUG
// Check the base
RUNTIME_CHECK_MSG(
base > 1.0,
"incorrect base for ExpBucketsWithRange, start={} base={} end={}",
start,
base,
end_);
RUNTIME_CHECK_MSG(
start_ < end_,
"incorrect start/end for ExpBucketsWithRange, start={} base={} end={}",
start,
base,
end_);
#endif
}
// NOLINTNEXTLINE(google-explicit-constructor)
inline operator prometheus::Histogram::BucketBoundaries() const &&
{
Expand All @@ -781,6 +853,11 @@ struct ExpBuckets
});
return buckets;
}

private:
const double start;
const double base;
const size_t size;
};

// Buckets with same width
Expand Down
20 changes: 18 additions & 2 deletions dbms/src/Common/tests/gtest_tiflash_metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ namespace tests
M(test_histogram_with_1_label, \
"Test histogram metric with 1 label", \
Histogram, \
F(m1, {{"label1", "value1"}}, ExpBuckets{1.0, 2, 1024})) \
F(m1, {{"label1", "value1"}}, ExpBuckets{1.0, 2, 24})) \
M(test_histogram_with_2_labels, \
"Test histogram metric with 2 labels", \
Histogram, \
F(m1, {{"label1", "value1"}}, ExpBuckets{1.0, 2, 1024}), \
F(m1, {{"label1", "value1"}}, ExpBuckets{1.0, 2, 24}), \
F(m2, {{"label21", "value21"}, {"label22", "value22"}}, {1, 2, 3, 4}))

class TestMetrics : public ext::Singleton<TestMetrics>
Expand Down Expand Up @@ -104,6 +104,22 @@ TEST(TiFlashMetrics, Histogram)
ASSERT_NO_THROW(GET_METRIC(test_histogram_with_2_labels, m2).Observe(3));
}

TEST(TiFlashMetrics, ExpBucketsWithRange)
{
ASSERT_EQ(2, ExpBucketsWithRange::getSize(1.0, 2.0, 2)); // 1 2
ASSERT_EQ(3, ExpBucketsWithRange::getSize(1.0, 3.0, 2)); // 1 2 4
ASSERT_EQ(2, ExpBucketsWithRange::getSize(2.0, 3.0, 2)); // 2 4
ASSERT_EQ(2, ExpBucketsWithRange::getSize(2.0, 4.0, 2)); // 2 4
ASSERT_EQ(3, ExpBucketsWithRange::getSize(2.0, 5.0, 2)); // 2 4 8
ASSERT_EQ(1, ExpBucketsWithRange::getSize(2.0, 2.0, 2)); // 2
ASSERT_EQ(3, ExpBucketsWithRange::getSize(2.0, 12.0, 3)); // 2 6 18
ASSERT_EQ(3, ExpBucketsWithRange::getSize(2.0, 18.0, 3)); // 2 6 18
ASSERT_EQ(4, ExpBucketsWithRange::getSize(2.0, 19.0, 3)); // 2 6 18 54

ASSERT_ANY_THROW({ ExpBucketsWithRange(2.0, 1.0, 3); });
ASSERT_ANY_THROW({ ExpBucketsWithRange(2.0, 2.0, 2.0); });
}

} // namespace tests

} // namespace DB
3 changes: 2 additions & 1 deletion dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,8 @@ struct Settings
M(SettingInt64, remote_gc_small_size, 128 * 1024, "The files with total size less than this threshold will be compacted") \
M(SettingDouble, disagg_read_concurrency_scale, 20.0, "Scale * logical cpu cores = disaggregated read IO concurrency.") \
\
M(SettingInt64, fap_wait_checkpoint_timeout_seconds, 60, "The max time wait for a usable checkpoint for FAP. Unit is second.") \
M(SettingInt64, fap_wait_checkpoint_timeout_seconds, 80, "The max time wait for a usable checkpoint for FAP") \
M(SettingUInt64, fap_handle_concurrency, 25, "The number of threads for handling FAP tasks") \
\
M(SettingUInt64, max_rows_in_set, 0, "Maximum size of the set (in number of elements) resulting from the execution of the IN section.") \
M(SettingUInt64, rf_max_in_value_set, 1024, "Maximum size of the set (in number of elements) resulting from the execution of the RF IN Predicate.") \
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Interpreters/SharedContexts/Disagg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,9 @@ void SharedContextDisagg::initRemoteDataStore(const FileProviderPtr & file_provi
remote_data_store = std::make_shared<DM::Remote::DataStoreS3>(file_provider);
}

void SharedContextDisagg::initFastAddPeerContext()
void SharedContextDisagg::initFastAddPeerContext(UInt64 fap_concur)
{
fap_context = std::make_shared<FastAddPeerContext>();
fap_context = std::make_shared<FastAddPeerContext>(fap_concur);
}

} // namespace DB
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/SharedContexts/Disagg.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ struct SharedContextDisagg : private boost::noncopyable

void initRemoteDataStore(const FileProviderPtr & file_provider, bool s3_enabled);

void initFastAddPeerContext();
void initFastAddPeerContext(UInt64 fap_concur);

bool isDisaggregatedComputeMode() const { return disaggregated_mode == DisaggregatedMode::Compute; }

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1333,7 +1333,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
if (global_context->getSharedContextDisagg()->isDisaggregatedStorageMode())
{
global_context->getSharedContextDisagg()->initWriteNodeSnapManager();
global_context->getSharedContextDisagg()->initFastAddPeerContext();
global_context->getSharedContextDisagg()->initFastAddPeerContext(settings.fap_handle_concurrency);
}

if (global_context->getSharedContextDisagg()->isDisaggregatedComputeMode())
Expand Down
Loading

0 comments on commit 79a71b3

Please sign in to comment.