Skip to content

Commit

Permalink
Merge branch 'release-5.0' into cherry-pick-2808-to-release-5.0
Browse files Browse the repository at this point in the history
  • Loading branch information
windtalker authored Sep 2, 2021
2 parents 329114a + 9d95db0 commit f4fdab7
Show file tree
Hide file tree
Showing 43 changed files with 1,194 additions and 422 deletions.
10 changes: 10 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,16 @@
*\#
.tramp_history

# vscode clangd cache
.cache

# JSON Compilation Database Format Specification
# https://clang.llvm.org/docs/JSONCompilationDatabase.html
compile_commands.json

# git patch reject report
*.rej

# vim cache files
*.swp

Expand Down
1 change: 1 addition & 0 deletions dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ if (ENABLE_TESTS)
${ClickHouse_SOURCE_DIR}/dbms/src/Server/StorageConfigParser.cpp
${ClickHouse_SOURCE_DIR}/dbms/src/Server/UserConfigParser.cpp
${ClickHouse_SOURCE_DIR}/dbms/src/Server/RaftConfigParser.cpp
${ClickHouse_SOURCE_DIR}/dbms/src/TestUtils/TiFlashTestBasic.cpp
${ClickHouse_SOURCE_DIR}/dbms/src/AggregateFunctions/AggregateFunctionSum.cpp
)
target_include_directories(gtests_dbms BEFORE PRIVATE ${SPARCEHASH_INCLUDE_DIR})
Expand Down
18 changes: 14 additions & 4 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,18 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
M(force_legacy_or_checkpoint_page_file_exists) \
M(exception_in_creating_set_input_stream)

#define APPLY_FOR_FAILPOINTS(M) \
M(force_set_page_file_write_errno) \
M(minimum_block_size_for_cross_join)
#define APPLY_FOR_FAILPOINTS(M) \
M(force_set_page_file_write_errno) \
M(minimum_block_size_for_cross_join) \
M(random_exception_after_dt_write_done) \
M(random_slow_page_storage_write) \
M(random_exception_after_page_storage_sequence_acquired) \
M(random_slow_page_storage_remove_expired_snapshots) \
M(random_slow_page_storage_list_all_live_files) \
M(force_set_safepoint_when_decode_block) \
M(force_set_page_data_compact_batch) \
M(force_set_dtfile_exist_when_acquire_id)


#define APPLY_FOR_FAILPOINTS_ONCE_WITH_CHANNEL(M) \
M(pause_after_learner_read) \
Expand All @@ -66,7 +75,8 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
M(pause_when_reading_from_dt_stream) \
M(pause_when_writing_to_dt_store) \
M(pause_when_ingesting_to_dt_store) \
M(pause_when_altering_dt_store)
M(pause_when_altering_dt_store) \
M(pause_after_copr_streams_acquired)

namespace FailPoints
{
Expand Down
12 changes: 6 additions & 6 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ namespace DB
F(type_limit, {"type", "limit"}), F(type_join, {"type", "join"}), F(type_exchange_sender, {"type", "exchange_sender"}), \
F(type_exchange_receiver, {"type", "exchange_receiver"}), F(type_projection, {"type", "projection"})) \
M(tiflash_coprocessor_request_duration_seconds, "Bucketed histogram of request duration", Histogram, \
F(type_batch, {{"type", "batch"}}, ExpBuckets{0.0005, 2, 20}), F(type_cop, {{"type", "cop"}}, ExpBuckets{0.0005, 2, 20}), \
F(type_super_batch, {{"type", "super_batch"}}, ExpBuckets{0.0005, 2, 20}), \
F(type_dispatch_mpp_task, {{"type", "dispatch_mpp_task"}}, ExpBuckets{0.0005, 2, 20}), \
F(type_batch, {{"type", "batch"}}, ExpBuckets{0.0005, 2, 30}), F(type_cop, {{"type", "cop"}}, ExpBuckets{0.0005, 2, 30}), \
F(type_super_batch, {{"type", "super_batch"}}, ExpBuckets{0.0005, 2, 30}), \
F(type_dispatch_mpp_task, {{"type", "dispatch_mpp_task"}}, ExpBuckets{0.0005, 2, 30}), \
F(type_mpp_establish_conn, {{"type", "mpp_establish_conn"}}, ExpBuckets{0.0005, 2, 30}), \
F(type_cancel_mpp_task, {{"type", "cancel_mpp_task"}}, ExpBuckets{0.0005, 2, 30})) \
M(tiflash_coprocessor_request_memory_usage, "Bucketed histogram of request memory usage", Histogram, \
Expand All @@ -51,9 +51,9 @@ namespace DB
F(reason_kv_client_error, {"reason", "kv_client_error"}), F(reason_internal_error, {"reason", "internal_error"}), \
F(reason_other_error, {"reason", "other_error"})) \
M(tiflash_coprocessor_request_handle_seconds, "Bucketed histogram of request handle duration", Histogram, \
F(type_batch, {{"type", "batch"}}, ExpBuckets{0.0005, 2, 20}), F(type_cop, {{"type", "cop"}}, ExpBuckets{0.0005, 2, 20}), \
F(type_super_batch, {{"type", "super_batch"}}, ExpBuckets{0.0005, 2, 20}), \
F(type_dispatch_mpp_task, {{"type", "dispatch_mpp_task"}}, ExpBuckets{0.0005, 2, 20}), \
F(type_batch, {{"type", "batch"}}, ExpBuckets{0.0005, 2, 30}), F(type_cop, {{"type", "cop"}}, ExpBuckets{0.0005, 2, 30}), \
F(type_super_batch, {{"type", "super_batch"}}, ExpBuckets{0.0005, 2, 30}), \
F(type_dispatch_mpp_task, {{"type", "dispatch_mpp_task"}}, ExpBuckets{0.0005, 2, 30}), \
F(type_mpp_establish_conn, {{"type", "mpp_establish_conn"}}, ExpBuckets{0.0005, 2, 30}), \
F(type_cancel_mpp_task, {{"type", "cancel_mpp_task"}}, ExpBuckets{0.0005, 2, 30})) \
M(tiflash_coprocessor_response_bytes, "Total bytes of response body", Counter) \
Expand Down
45 changes: 24 additions & 21 deletions dbms/src/DataStreams/SharedQueryBlockInputStream.h
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
#pragma once

#include <thread>

#include <Common/ConcurrentBoundedQueue.h>
#include <common/logger_useful.h>
#include <Common/typeid_cast.h>

#include <DataStreams/IProfilingBlockInputStream.h>

#include <thread>

namespace DB
{

Expand Down Expand Up @@ -37,15 +36,9 @@ class SharedQueryBlockInputStream : public IProfilingBlockInputStream
}
}

String getName() const override
{
return "SharedQuery";
}
String getName() const override { return "SharedQuery"; }

Block getHeader() const override
{
return children.back()->getHeader();
}
Block getHeader() const override { return children.back()->getHeader(); }

void readPrefix() override
{
Expand All @@ -69,8 +62,8 @@ class SharedQueryBlockInputStream : public IProfilingBlockInputStream

if (thread.joinable())
thread.join();
if (exception)
std::rethrow_exception(exception);
if (!exception_msg.empty())
throw Exception(exception_msg);
}

protected:
Expand All @@ -84,8 +77,10 @@ class SharedQueryBlockInputStream : public IProfilingBlockInputStream
Block block;
do
{
if (exception)
std::rethrow_exception(exception);
if (!exception_msg.empty())
{
throw Exception(exception_msg);
}
if (isCancelled() || read_suffixed)
return {};
} while (!queue.tryPop(block, try_action_millisecionds));
Expand All @@ -100,7 +95,7 @@ class SharedQueryBlockInputStream : public IProfilingBlockInputStream
in->readPrefix();
while (!isCancelled())
{
Block block;
Block block = in->read();
do
{
if (isCancelled() || read_suffixed)
Expand All @@ -109,16 +104,24 @@ class SharedQueryBlockInputStream : public IProfilingBlockInputStream
queue.tryEmplace(0);
break;
}
} while (!queue.tryPush(block = in->read(), try_action_millisecionds));
} while (!queue.tryPush(block, try_action_millisecionds));

if (!block)
break;
}
in->readSuffix();
}
catch (Exception & e)
{
exception_msg = e.message();
}
catch (std::exception & e)
{
exception_msg = e.what();
}
catch (...)
{
exception = std::current_exception();
exception_msg = "other error";
}
}

Expand All @@ -131,11 +134,11 @@ class SharedQueryBlockInputStream : public IProfilingBlockInputStream
bool read_suffixed = false;

std::thread thread;
std::mutex mutex;
std::mutex mutex;

std::exception_ptr exception;
std::string exception_msg;

Logger * log;
BlockInputStreamPtr in;
};
}
} // namespace DB
2 changes: 2 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ namespace FailPoints
extern const char region_exception_after_read_from_storage_some_error[];
extern const char region_exception_after_read_from_storage_all_error[];
extern const char pause_after_learner_read[];
extern const char pause_after_copr_streams_acquired[];
extern const char minimum_block_size_for_cross_join[];
} // namespace FailPoints

Expand Down Expand Up @@ -383,6 +384,7 @@ void DAGQueryBlockInterpreter::executeTS(const tipb::TableScan & ts, DAGPipeline
}
});
}
FAIL_POINT_PAUSE(FailPoints::pause_after_copr_streams_acquired);
}

void DAGQueryBlockInterpreter::readFromLocalStorage( //
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Flash/Mpp/ExchangeReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ void ExchangeReceiver::setUpConnection()
{
auto & meta = pb_exchange_receiver.encoded_task_meta(index);
std::thread t(&ExchangeReceiver::ReadLoop, this, std::ref(meta), index);
live_connections++;
workers.push_back(std::move(t));
}
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/ExchangeReceiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class ExchangeReceiver
source_num(pb_exchange_receiver.encoded_task_meta_size()),
task_meta(meta),
max_buffer_size(max_buffer_size_),
live_connections(0),
live_connections(pb_exchange_receiver.encoded_task_meta_size()),
state(NORMAL),
log(&Logger::get("exchange_receiver"))
{
Expand Down
41 changes: 19 additions & 22 deletions dbms/src/Interpreters/AsynchronousMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
#include <Databases/IDatabase.h>
#include <IO/UncompressedCache.h>
#include <Interpreters/AsynchronousMetrics.h>
#include <Storages/DeltaMerge/DeltaMergeStore.h>
#include <Storages/MarkCache.h>
#include <Storages/StorageDeltaMerge.h>
#include <Storages/StorageMergeTree.h>
#include <chrono>

Expand Down Expand Up @@ -130,16 +132,10 @@ void AsynchronousMetrics::update()
{
auto databases = context.getDatabases();

size_t max_queue_size = 0;
size_t max_inserts_in_queue = 0;
size_t max_merges_in_queue = 0;

size_t sum_queue_size = 0;
size_t sum_inserts_in_queue = 0;
size_t sum_merges_in_queue = 0;

size_t max_absolute_delay = 0;
size_t max_relative_delay = 0;
double max_dt_stable_oldest_snapshot_lifetime = 0.0;
double max_dt_delta_oldest_snapshot_lifetime = 0.0;
double max_dt_meta_oldest_snapshot_lifetime = 0.0;
size_t max_dt_background_tasks_length = 0;

size_t max_part_count_for_partition = 0;

Expand All @@ -149,24 +145,25 @@ void AsynchronousMetrics::update()
{
auto & table = iterator->table();

if (StorageMergeTree * table_merge_tree = dynamic_cast<StorageMergeTree *>(table.get()); table_merge_tree)
if (auto dt_storage = std::dynamic_pointer_cast<StorageDeltaMerge>(table); dt_storage)
{
auto stat = dt_storage->getStore()->getStat();
calculateMax(max_dt_stable_oldest_snapshot_lifetime, stat.storage_stable_oldest_snapshot_lifetime);
calculateMax(max_dt_delta_oldest_snapshot_lifetime, stat.storage_delta_oldest_snapshot_lifetime);
calculateMax(max_dt_meta_oldest_snapshot_lifetime, stat.storage_meta_oldest_snapshot_lifetime);
calculateMax(max_dt_background_tasks_length, stat.background_tasks_length);
}
else if (StorageMergeTree * table_merge_tree = dynamic_cast<StorageMergeTree *>(table.get()); table_merge_tree)
{
calculateMax(max_part_count_for_partition, table_merge_tree->getData().getMaxPartsCountForPartition());
}
}
}

set("ReplicasMaxQueueSize", max_queue_size);
set("ReplicasMaxInsertsInQueue", max_inserts_in_queue);
set("ReplicasMaxMergesInQueue", max_merges_in_queue);

set("ReplicasSumQueueSize", sum_queue_size);
set("ReplicasSumInsertsInQueue", sum_inserts_in_queue);
set("ReplicasSumMergesInQueue", sum_merges_in_queue);

set("ReplicasMaxAbsoluteDelay", max_absolute_delay);
set("ReplicasMaxRelativeDelay", max_relative_delay);

set("MaxDTStableOldestSnapshotLifetime", max_dt_stable_oldest_snapshot_lifetime);
set("MaxDTDeltaOldestSnapshotLifetime", max_dt_delta_oldest_snapshot_lifetime);
set("MaxDTMetaOldestSnapshotLifetime", max_dt_meta_oldest_snapshot_lifetime);
set("MaxDTBackgroundTasksLength", max_dt_background_tasks_length);
set("MaxPartCountForPartition", max_part_count_for_partition);
}

Expand Down
9 changes: 7 additions & 2 deletions dbms/src/Storages/DeltaMerge/DMContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ struct DMContext : private boost::noncopyable
const bool enable_relevant_place;
const bool enable_skippable_place;

const String query_id;

public:
DMContext(const Context & db_context_,
StoragePathPool & path_pool_,
Expand All @@ -78,7 +80,8 @@ struct DMContext : private boost::noncopyable
const NotCompress & not_compress_,
bool is_common_handle_,
size_t rowkey_column_size_,
const DB::Settings & settings)
const DB::Settings & settings,
const String & query_id_ = "")
: db_context(db_context_),
metrics(db_context.getTiFlashMetrics()),
path_pool(path_pool_),
Expand All @@ -100,9 +103,11 @@ struct DMContext : private boost::noncopyable
read_delta_only(settings.dt_read_delta_only),
read_stable_only(settings.dt_read_stable_only),
enable_relevant_place(settings.dt_enable_relevant_place),
enable_skippable_place(settings.dt_enable_skippable_place)
enable_skippable_place(settings.dt_enable_skippable_place),
query_id(query_id_)
{
}

};

} // namespace DM
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
else
{
after_segment_read(dm_context, cur_segment);
LOG_TRACE(log, "Finish reading segment [" + DB::toString(cur_segment->segmentId()) + "]");
LOG_TRACE(log, "Finish reading segment [" << cur_segment->segmentId() << "]");
cur_segment = {};
cur_stream = {};
}
Expand Down
17 changes: 11 additions & 6 deletions dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream
DMVersionFilterBlockInputStream(const BlockInputStreamPtr & input,
const ColumnDefines & read_columns,
UInt64 version_limit_,
bool is_common_handle_)
bool is_common_handle_,
const String & query_id_ = "")
: version_limit(version_limit_),
is_common_handle(is_common_handle_),
header(toEmptyBlock(read_columns)),
query_id(query_id_),
log(&Logger::get("DMVersionFilterBlockInputStream<" + String(MODE == DM_VERSION_FILTER_MODE_MVCC ? "MVCC" : "COMPACT") + ">"))
{
children.push_back(input);
Expand All @@ -48,7 +50,9 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream
"Total rows: " << total_rows << ", pass: " << DB::toString((Float64)passed_rows * 100 / total_rows, 2)
<< "%, complete pass: " << DB::toString((Float64)complete_passed * 100 / total_blocks, 2)
<< "%, complete not pass: " << DB::toString((Float64)complete_not_passed * 100 / total_blocks, 2)
<< "%, not clean: " << DB::toString((Float64)not_clean_rows * 100 / passed_rows, 2) << "%");
<< "%, not clean: " << DB::toString((Float64)not_clean_rows * 100 / passed_rows, 2) //
<< "%, read tso: " << version_limit
<< ", query id: " << (query_id.empty() ? String("<non-query>") : query_id));
}

String getName() const override { return "DeltaMergeVersionFilter"; }
Expand Down Expand Up @@ -113,9 +117,10 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream
}

private:
UInt64 version_limit;
bool is_common_handle;
Block header;
const UInt64 version_limit;
const bool is_common_handle;
const Block header;
const String query_id;

size_t handle_col_pos;
size_t version_col_pos;
Expand All @@ -139,7 +144,7 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream
size_t complete_not_passed = 0;
size_t not_clean_rows = 0;

Logger * log;
Poco::Logger * const log;
};
} // namespace DM
} // namespace DB
7 changes: 4 additions & 3 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,11 @@ DeltaPacks DeltaValueSpace::checkHeadAndCloneTail(DMContext & context,
}
else if (auto f = pack->tryToFile(); f)
{
auto new_ref_id = context.storage_pool.newDataPageId();
auto file_id = f->getFile()->fileId();
auto delegator = context.path_pool.getStableDiskDelegator();
auto new_ref_id = context.storage_pool.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__);
auto file_id = f->getFile()->fileId();
wbs.data.putRefPage(new_ref_id, file_id);
auto file_parent_path = context.path_pool.getStableDiskDelegator().getDTFilePath(file_id);
auto file_parent_path = delegator.getDTFilePath(file_id);
auto new_file = DMFile::restore(context.db_context.getFileProvider(), file_id, /* ref_id= */ new_ref_id, file_parent_path);

auto new_pack = f->cloneWith(context, new_file, target_range);
Expand Down
Loading

0 comments on commit f4fdab7

Please sign in to comment.