From 171097ee5924dd2e61b17a2e6d5eab5288a6764a Mon Sep 17 00:00:00 2001 From: yibin Date: Mon, 11 Dec 2023 10:22:47 +0800 Subject: [PATCH 1/7] Support only null type for json_extract (#8489) close pingcap/tiflash#8486 --- dbms/src/Functions/FunctionsJson.h | 24 ++++++++++++------- .../Functions/tests/gtest_json_extract.cpp | 15 ++++++++++++ tests/fullstack-test/expr/json_extract.test | 14 +++++++++++ 3 files changed, 44 insertions(+), 9 deletions(-) diff --git a/dbms/src/Functions/FunctionsJson.h b/dbms/src/Functions/FunctionsJson.h index 67f4fe48c95..755bdcac2f3 100644 --- a/dbms/src/Functions/FunctionsJson.h +++ b/dbms/src/Functions/FunctionsJson.h @@ -90,19 +90,14 @@ class FunctionJsonExtract : public IFunction { for (const auto & arg : arguments) { - if (const auto * nested_type = checkAndGetDataType(arg.get())) + if (!arg->onlyNull()) { - if unlikely (!nested_type->getNestedType()->isStringOrFixedString()) + const auto * nested_arg_type = removeNullable(arg).get(); + if unlikely (!nested_arg_type->isStringOrFixedString()) throw Exception( "Illegal type " + arg->getName() + " of argument of function " + getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); } - else if unlikely (!arg->isStringOrFixedString()) - { - throw Exception( - "Illegal type " + arg->getName() + " of argument of function " + getName(), - ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); - } } return makeNullable(std::make_shared()); } @@ -116,7 +111,12 @@ class FunctionJsonExtract : public IFunction bool const_json = false; const ColumnString * source_data_column_ptr; const ColumnNullable * source_nullable_column_ptr = nullptr; - if (const auto * const_nullable_col = checkAndGetColumnConst(json_column)) + if unlikely (json_column->onlyNull()) + { + block.getByPosition(result).column = block.getByPosition(result).type->createColumnConst(rows, Null()); + return; + } + else if (const auto * const_nullable_col = checkAndGetColumnConst(json_column)) { const_json = true; json_column = const_nullable_col->getDataColumnPtr().get(); @@ -175,6 +175,12 @@ class FunctionJsonExtract : public IFunction for (size_t i = 1; i < arguments_size; ++i) { const ColumnPtr column = block.getByPosition(arguments[i]).column; + if (column->onlyNull()) + { + block.getByPosition(result).column = block.getByPosition(result).type->createColumnConst(rows, Null()); + return; + } + const auto * nested_column = static_cast(column.get())->getDataColumnPtr().get(); StringRef path_str; if (const auto * nullable_string_path_col = checkAndGetColumn(nested_column)) diff --git a/dbms/src/Functions/tests/gtest_json_extract.cpp b/dbms/src/Functions/tests/gtest_json_extract.cpp index fe4ba52920a..f0374a80e78 100644 --- a/dbms/src/Functions/tests/gtest_json_extract.cpp +++ b/dbms/src/Functions/tests/gtest_json_extract.cpp @@ -137,6 +137,14 @@ try expect_null_vec = {1, 1, 1}; checkResult(res.column, expect_null_vec, expect_string_vec); + /// JsonBinary only null + auto const_null_only_col = createOnlyNullColumnConst(3); + res = executeFunction(func_name, {const_null_only_col, path_col}); + ASSERT_TRUE(res.column->size() == 3); + expect_string_vec = {"", "", ""}; + expect_null_vec = {1, 1, 1}; + checkResult(res.column, expect_null_vec, expect_string_vec); + /// ColumnVector(non-null) auto non_null_str_col = ColumnString::create(); non_null_str_col->insertData(reinterpret_cast(bj2), sizeof(bj2) / sizeof(UInt8)); @@ -150,6 +158,13 @@ try expect_null_vec = {0, 0, 0}; checkResult(res.column, expect_null_vec, expect_string_vec); + /// One of Paths is only null + res = executeFunction(func_name, {non_null_input_col, non_null_path_col, const_null_only_col}); + ASSERT_TRUE(res.column->size() == 3); + expect_string_vec = {"", "", ""}; + expect_null_vec = {1, 1, 1}; + checkResult(res.column, expect_null_vec, expect_string_vec); + /// ColumnConst(non-null) non_null_str_col = ColumnString::create(); non_null_str_col->insertData(reinterpret_cast(bj2), sizeof(bj2) / sizeof(UInt8)); diff --git a/tests/fullstack-test/expr/json_extract.test b/tests/fullstack-test/expr/json_extract.test index 1514718273e..683546bf7d9 100644 --- a/tests/fullstack-test/expr/json_extract.test +++ b/tests/fullstack-test/expr/json_extract.test @@ -47,4 +47,18 @@ mysql> set tidb_enforce_mpp=1; select json_extract(d, '\$[0]', '\$[1]', '\$[2].a | [1, 2, "b"] | +-------------+ +mysql> set tidb_enforce_mpp=1; select json_extract(NULL, '\$[0]', '\$[1]', '\$[2].a') as col_a from test.t #NO_UNESCAPE ++-------+ +| col_a | ++-------+ +| NULL | ++-------+ + +mysql> set tidb_enforce_mpp=1; select json_extract(d, '\$[0]', NULL, '\$[2].a') as col_a from test.t #NO_UNESCAPE ++-------+ +| col_a | ++-------+ +| NULL | ++-------+ + mysql> drop table if exists test.t From 3a72d53dcb3a282baabfea60ac212917a56670d3 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Mon, 11 Dec 2023 18:30:18 +0800 Subject: [PATCH 2/7] Fix the issue that functions that rely on `tipb::FieldType` may produce incorrect results (#8483) close pingcap/tiflash#8482 --- .../DAGExpressionAnalyzerHelper.cpp | 4 +- dbms/src/Flash/Coprocessor/DAGUtils.cpp | 14 +++++- dbms/src/Flash/Coprocessor/DAGUtils.h | 6 ++- tests/fullstack-test/expr/issue_8482.test | 44 +++++++++++++++++++ 4 files changed, 64 insertions(+), 4 deletions(-) create mode 100644 tests/fullstack-test/expr/issue_8482.test diff --git a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzerHelper.cpp b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzerHelper.cpp index 10f2b24c9fa..14f5e55c4bb 100644 --- a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzerHelper.cpp +++ b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzerHelper.cpp @@ -245,7 +245,7 @@ String DAGExpressionAnalyzerHelper::buildCastFunctionInternal( { static const String tidb_cast_name = "tidb_cast"; - String result_name = genFuncString(tidb_cast_name, argument_names, {nullptr}); + String result_name = genFuncString(tidb_cast_name, argument_names, {nullptr}, {&field_type}); if (actions->getSampleBlock().has(result_name)) return result_name; @@ -299,7 +299,7 @@ String DAGExpressionAnalyzerHelper::buildSingleParamJsonRelatedFunctions( const auto & input_expr = expr.children(0); String arg = analyzer->getActions(input_expr, actions); const auto & collator = getCollatorFromExpr(expr); - String result_name = genFuncString(func_name, {arg}, {collator}); + String result_name = genFuncString(func_name, {arg}, {collator}, {&input_expr.field_type(), &expr.field_type()}); if (actions->getSampleBlock().has(result_name)) return result_name; diff --git a/dbms/src/Flash/Coprocessor/DAGUtils.cpp b/dbms/src/Flash/Coprocessor/DAGUtils.cpp index 9ffcde1a696..372883f129f 100644 --- a/dbms/src/Flash/Coprocessor/DAGUtils.cpp +++ b/dbms/src/Flash/Coprocessor/DAGUtils.cpp @@ -1405,7 +1405,11 @@ SortDescription getSortDescription( return order_descr; } -String genFuncString(const String & func_name, const Names & argument_names, const TiDB::TiDBCollators & collators) +String genFuncString( + const String & func_name, + const Names & argument_names, + const TiDB::TiDBCollators & collators, + const std::vector & field_types) { FmtBuffer buf; buf.fmtAppend("{}({})_collator", func_name, fmt::join(argument_names.begin(), argument_names.end(), ", ")); @@ -1417,6 +1421,14 @@ String genFuncString(const String & func_name, const Names & argument_names, con buf.append("_0"); } buf.append(" "); + buf.joinStr( + field_types.begin(), + field_types.end(), + [](const auto & field_type, FmtBuffer & buffer) { + if likely (field_type) + buffer.fmtAppend("{}|{}", field_type->flag(), field_type->flen()); + }, + ", "); return buf.toString(); } diff --git a/dbms/src/Flash/Coprocessor/DAGUtils.h b/dbms/src/Flash/Coprocessor/DAGUtils.h index 98b55269b22..9ab68493ad7 100644 --- a/dbms/src/Flash/Coprocessor/DAGUtils.h +++ b/dbms/src/Flash/Coprocessor/DAGUtils.h @@ -65,7 +65,11 @@ DataTypePtr inferDataType4Literal(const tipb::Expr & expr); SortDescription getSortDescription( const std::vector & order_columns, const google::protobuf::RepeatedPtrField & by_items); -String genFuncString(const String & func_name, const Names & argument_names, const TiDB::TiDBCollators & collators); +String genFuncString( + const String & func_name, + const Names & argument_names, + const TiDB::TiDBCollators & collators, + const std::vector & field_types = {}); extern const Int8 VAR_SIZE; diff --git a/tests/fullstack-test/expr/issue_8482.test b/tests/fullstack-test/expr/issue_8482.test new file mode 100644 index 00000000000..360f9c0e1ec --- /dev/null +++ b/tests/fullstack-test/expr/issue_8482.test @@ -0,0 +1,44 @@ +# Copyright 2023 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +mysql> drop table if exists test.t; +mysql> create table test.t(b json); +mysql> alter table test.t set tiflash replica 1; +mysql> insert into test.t values (true); + +func> wait_table test t + +mysql> set tidb_allow_mpp=1;set tidb_enforce_mpp=1; set tidb_isolation_read_engines='tiflash'; select b = true from test.t; ++----------+ +| b = true | ++----------+ +| 0 | ++----------+ + +mysql> set tidb_allow_mpp=1;set tidb_enforce_mpp=1; set tidb_isolation_read_engines='tiflash'; select b = 1 from test.t; ++-------+ +| b = 1 | ++-------+ +| 1 | ++-------+ + +mysql> set tidb_allow_mpp=1;set tidb_enforce_mpp=1; set tidb_isolation_read_engines='tiflash'; select b = true, b = 1 from test.t; ++----------+-------+ +| b = true | b = 1 | ++----------+-------+ +| 0 | 1 | ++----------+-------+ + +# Clean up. +mysql> drop table if exists test.t; From bd24daf4aae2b25f349335549eb05bc7437e64fe Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger <60744015+Lloyd-Pottiger@users.noreply.github.com> Date: Tue, 12 Dec 2023 15:27:17 +0800 Subject: [PATCH 3/7] *: let version follows TiDB convention (#8485) ref pingcap/tiflash#6233 --- dbms/cmake/version.cmake | 51 +++++-------------- dbms/src/Client/Connection.cpp | 45 +++++++--------- dbms/src/Client/Connection.h | 4 +- .../src/Client/ConnectionPoolWithFailover.cpp | 18 ++++--- dbms/src/Common/ClickHouseRevision.cpp | 23 --------- dbms/src/Common/ClickHouseRevision.h | 23 --------- dbms/src/Common/ErrorCodes.cpp | 2 +- dbms/src/Common/TiFlashBuildInfo.cpp | 35 ++++++++----- dbms/src/Common/TiFlashBuildInfo.h | 22 ++++---- dbms/src/Common/config_version.h.in | 29 ++++++----- dbms/src/Core/Defines.h | 11 ---- dbms/src/DataStreams/MarkInCompressedFile.h | 4 +- .../MergeSortingBlockInputStream.h | 1 - dbms/src/DataStreams/NativeBlockInputStream.h | 2 +- .../DataStreams/NativeBlockOutputStream.cpp | 7 --- dbms/src/Functions/FunctionsMiscellaneous.cpp | 23 +++++---- dbms/src/IO/Progress.cpp | 12 ++--- dbms/src/IO/Progress.h | 8 +-- dbms/src/Interpreters/ClientInfo.cpp | 40 +++++---------- dbms/src/Interpreters/ClientInfo.h | 7 ++- dbms/src/Interpreters/QueryLog.cpp | 41 +++++++-------- dbms/src/Interpreters/TablesStatus.cpp | 28 ++-------- dbms/src/Interpreters/TablesStatus.h | 8 +-- dbms/src/Server/Client.cpp | 14 ++--- dbms/src/Server/Server.cpp | 1 - dbms/src/Server/StatusFile.cpp | 4 +- dbms/src/Server/TCPHandler.cpp | 43 ++++++---------- dbms/src/Server/TCPHandler.h | 14 ++--- dbms/src/Server/main.cpp | 1 - .../System/StorageSystemProcesses.cpp | 25 ++++----- libs/libdaemon/include/daemon/BaseDaemon.h | 4 +- libs/libdaemon/src/BaseDaemon.cpp | 11 ++-- 32 files changed, 210 insertions(+), 351 deletions(-) delete mode 100644 dbms/src/Common/ClickHouseRevision.cpp delete mode 100644 dbms/src/Common/ClickHouseRevision.h diff --git a/dbms/cmake/version.cmake b/dbms/cmake/version.cmake index ab4cece20c9..5039409c0c5 100644 --- a/dbms/cmake/version.cmake +++ b/dbms/cmake/version.cmake @@ -12,45 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -# This strings autochanged from release_lib.sh: -set(VERSION_DESCRIBE v1.1.54381-testing) -set(VERSION_REVISION 54381) -set(VERSION_GITHASH af82c78a45b6a6f136e10bb2e7ca9b936d09a46c) -# end of autochange - -set (VERSION_MAJOR 1) -set (VERSION_MINOR 1) -set (VERSION_PATCH ${VERSION_REVISION}) -set (VERSION_EXTRA "") -set (VERSION_TWEAK "") - -set (VERSION_STRING "${VERSION_MAJOR}.${VERSION_MINOR}.${VERSION_PATCH}") -if (VERSION_TWEAK) - set(VERSION_STRING "${VERSION_STRING}.${VERSION_TWEAK}") -endif () -if (VERSION_EXTRA) - set(VERSION_STRING "${VERSION_STRING}${VERSION_EXTRA}") -endif () - -set (VERSION_FULL "${PROJECT_NAME} ${VERSION_STRING}") - -if (OS_DARWIN) - # dirty hack: ld: malformed 64-bit a.b.c.d.e version number: 1.1.54160 - math (EXPR VERSION_SO1 "${VERSION_REVISION}/255") - math (EXPR VERSION_SO2 "${VERSION_REVISION}%255") - set (VERSION_SO "${VERSION_MAJOR}.${VERSION_MINOR}.${VERSION_SO1}.${VERSION_SO2}") -else () - set (VERSION_SO "${VERSION_STRING}") -endif () - set (TIFLASH_NAME "TiFlash") -# Semantic version. -set (TIFLASH_VERSION_MAJOR 4) -set (TIFLASH_VERSION_MINOR 1) -set (TIFLASH_VERSION_REVISION 0) -set (TIFLASH_VERSION "${TIFLASH_VERSION_MAJOR}.${TIFLASH_VERSION_MINOR}.${TIFLASH_VERSION_REVISION}") - # Release version that follows PD/TiKV/TiDB convention. # Variables bellow are important, use `COMMAND_ERROR_IS_FATAL ANY`(since cmake 3.19) to confirm that there is output. @@ -63,6 +26,20 @@ execute_process( COMMAND_ERROR_IS_FATAL ANY ) +# Extract the major version number +string(REGEX REPLACE "^v([0-9]+).*" "\\1" TIFLASH_VERSION_MAJOR ${TIFLASH_RELEASE_VERSION}) +# Extract the minor version number +string(REGEX REPLACE "^v[0-9]+\\.([0-9]+).*" "\\1" TIFLASH_VERSION_MINOR ${TIFLASH_RELEASE_VERSION}) +# Extract the patch version number +string(REGEX REPLACE "^v[0-9]+\\.[0-9]+\\.([0-9]+).*" "\\1" TIFLASH_VERSION_PATCH ${TIFLASH_RELEASE_VERSION}) +# Extract the extra information if it exists +if (${TIFLASH_RELEASE_VERSION} MATCHES "-.*") + string(REGEX REPLACE "^v[0-9]+\\.[0-9]+\\.[0-9]+-(.*)$" "\\1" TIFLASH_VERSION_EXTRA ${TIFLASH_RELEASE_VERSION}) + set(TIFLASH_VERSION "${TIFLASH_VERSION_MAJOR}.${TIFLASH_VERSION_MINOR}.${TIFLASH_VERSION_PATCH}-${TIFLASH_VERSION_EXTRA}") +else () + set(TIFLASH_VERSION "${TIFLASH_VERSION_MAJOR}.${TIFLASH_VERSION_MINOR}.${TIFLASH_VERSION_PATCH}") +endif () + set (TIFLASH_EDITION $ENV{TIFLASH_EDITION}) if (NOT TIFLASH_EDITION) set (TIFLASH_EDITION Community) diff --git a/dbms/src/Client/Connection.cpp b/dbms/src/Client/Connection.cpp index f8aa2dd9082..5121d29fed2 100644 --- a/dbms/src/Client/Connection.cpp +++ b/dbms/src/Client/Connection.cpp @@ -14,11 +14,11 @@ #include #include -#include #include #include #include #include +#include #include #include #include @@ -44,7 +44,6 @@ namespace ErrorCodes { extern const int NETWORK_ERROR; extern const int SOCKET_TIMEOUT; -extern const int SERVER_REVISION_IS_TOO_OLD; extern const int UNEXPECTED_PACKET_FROM_SERVER; extern const int UNKNOWN_PACKET_FROM_SERVER; extern const int SUPPORT_IS_DISABLED; @@ -98,7 +97,7 @@ void Connection::connect() server_name, server_version_major, server_version_minor, - server_revision); + server_version_patch); } catch (Poco::Net::NetException & e) { @@ -131,10 +130,10 @@ void Connection::disconnect() void Connection::sendHello() { writeVarUInt(Protocol::Client::Hello, *out); - writeStringBinary((DBMS_NAME " ") + client_name, *out); - writeVarUInt(DBMS_VERSION_MAJOR, *out); - writeVarUInt(DBMS_VERSION_MINOR, *out); - writeVarUInt(ClickHouseRevision::get(), *out); + writeStringBinary(fmt::format("{} {}", TiFlashBuildInfo::getName(), client_name), *out); + writeVarUInt(TiFlashBuildInfo::getMajorVersion(), *out); + writeVarUInt(TiFlashBuildInfo::getMinorVersion(), *out); + writeVarUInt(TiFlashBuildInfo::getPatchVersion(), *out); writeStringBinary(default_database, *out); writeStringBinary(user, *out); writeStringBinary(password, *out); @@ -154,15 +153,9 @@ void Connection::receiveHello() readStringBinary(server_name, *in); readVarUInt(server_version_major, *in); readVarUInt(server_version_minor, *in); - readVarUInt(server_revision, *in); - if (server_revision >= DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE) - { - readStringBinary(server_timezone, *in); - } - if (server_revision >= DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME) - { - readStringBinary(server_display_name, *in); - } + readVarUInt(server_version_patch, *in); + readStringBinary(server_timezone, *in); + readStringBinary(server_display_name, *in); } else if (packet_type == Protocol::Server::Exception) receiveException()->rethrow(); @@ -199,7 +192,7 @@ UInt16 Connection::getPort() const return port; } -void Connection::getServerVersion(String & name, UInt64 & version_major, UInt64 & version_minor, UInt64 & revision) +void Connection::getServerVersion(String & name, UInt64 & version_major, UInt64 & version_minor, UInt64 & version_patch) { if (!connected) connect(); @@ -207,7 +200,7 @@ void Connection::getServerVersion(String & name, UInt64 & version_major, UInt64 name = server_name; version_major = server_version_major; version_minor = server_version_minor; - revision = server_revision; + version_patch = server_version_patch; } const String & Connection::getServerTimezone() @@ -284,7 +277,7 @@ TablesStatusResponse Connection::getTablesStatus(const TablesStatusRequest & req TimeoutSetter timeout_setter(*socket, sync_request_timeout, true); writeVarUInt(Protocol::Client::TablesStatusRequest, *out); - request.write(*out, server_revision); + request.write(*out); out->next(); UInt64 response_type = 0; @@ -296,7 +289,7 @@ TablesStatusResponse Connection::getTablesStatus(const TablesStatusRequest & req throwUnexpectedPacket(response_type, "TablesStatusResponse"); TablesStatusResponse response; - response.read(*in, server_revision); + response.read(*in); return response; } @@ -320,7 +313,6 @@ void Connection::sendQuery( writeStringBinary(query_id, *out); /// Client info. - if (server_revision >= DBMS_MIN_REVISION_WITH_CLIENT_INFO) { ClientInfo client_info_to_send; @@ -329,7 +321,7 @@ void Connection::sendQuery( /// No client info passed - means this query initiated by me. client_info_to_send.query_kind = ClientInfo::QueryKind::INITIAL_QUERY; client_info_to_send.fillOSUserHostNameAndVersionInfo(); - client_info_to_send.client_name = (DBMS_NAME " ") + client_name; + client_info_to_send.client_name = fmt::format("{} {}", TiFlashBuildInfo::getName(), client_name); } else { @@ -338,7 +330,7 @@ void Connection::sendQuery( client_info_to_send.query_kind = ClientInfo::QueryKind::SECONDARY_QUERY; } - client_info_to_send.write(*out, server_revision); + client_info_to_send.write(*out); } /// Per query settings. @@ -382,8 +374,7 @@ void Connection::sendData(const Block & block, const String & name) else maybe_compressed_out = out; - block_out - = std::make_shared(*maybe_compressed_out, server_revision, block.cloneEmpty()); + block_out = std::make_shared(*maybe_compressed_out, 1, block.cloneEmpty()); } writeVarUInt(Protocol::Client::Data, *out); @@ -567,7 +558,7 @@ void Connection::initBlockInput() else maybe_compressed_in = in; - block_in = std::make_shared(*maybe_compressed_in, server_revision); + block_in = std::make_shared(*maybe_compressed_in, 1); } } @@ -593,7 +584,7 @@ std::unique_ptr Connection::receiveException() Progress Connection::receiveProgress() { Progress progress; - progress.read(*in, server_revision); + progress.read(*in); return progress; } diff --git a/dbms/src/Client/Connection.h b/dbms/src/Client/Connection.h index 0950fb77d83..7910758d70d 100644 --- a/dbms/src/Client/Connection.h +++ b/dbms/src/Client/Connection.h @@ -150,7 +150,7 @@ class Connection : private boost::noncopyable /// Change default database. Changes will take effect on next reconnect. void setDefaultDatabase(const String & database); - void getServerVersion(String & name, UInt64 & version_major, UInt64 & version_minor, UInt64 & revision); + void getServerVersion(String & name, UInt64 & version_major, UInt64 & version_minor, UInt64 & version_patch); const String & getServerTimezone(); const String & getServerDisplayName(); @@ -231,7 +231,7 @@ class Connection : private boost::noncopyable String server_name; UInt64 server_version_major = 0; UInt64 server_version_minor = 0; - UInt64 server_revision = 0; + UInt64 server_version_patch = 0; String server_timezone; String server_display_name; diff --git a/dbms/src/Client/ConnectionPoolWithFailover.cpp b/dbms/src/Client/ConnectionPoolWithFailover.cpp index b7c9936aa2a..9823ad0ce1d 100644 --- a/dbms/src/Client/ConnectionPoolWithFailover.cpp +++ b/dbms/src/Client/ConnectionPoolWithFailover.cpp @@ -61,7 +61,7 @@ IConnectionPool::Entry ConnectionPoolWithFailover::getImpl( }; GetPriorityFunc get_priority; - switch (settings ? LoadBalancing(settings->load_balancing) : default_load_balancing) + switch (settings ? static_cast(settings->load_balancing) : default_load_balancing) { case LoadBalancing::NEAREST_HOSTNAME: get_priority = [&](size_t i) { @@ -126,7 +126,7 @@ std::vector ConnectionPoolWithFailover::g throw DB::Exception("Unknown pool allocation mode", DB::ErrorCodes::LOGICAL_ERROR); GetPriorityFunc get_priority; - switch (settings ? LoadBalancing(settings->load_balancing) : default_load_balancing) + switch (settings ? static_cast(settings->load_balancing) : default_load_balancing) { case LoadBalancing::NEAREST_HOSTNAME: get_priority = [&](size_t i) { @@ -162,11 +162,12 @@ ConnectionPoolWithFailover::TryResult ConnectionPoolWithFailover::tryGetEntry( String server_name; UInt64 server_version_major; UInt64 server_version_minor; - UInt64 server_revision; + UInt64 server_version_patch; if (table_to_check) - result.entry->getServerVersion(server_name, server_version_major, server_version_minor, server_revision); + result.entry + ->getServerVersion(server_name, server_version_major, server_version_minor, server_version_patch); - if (!table_to_check || server_revision < DBMS_MIN_REVISION_WITH_TABLES_STATUS) + if (!table_to_check) { result.entry->forceConnected(); result.is_usable = true; @@ -183,8 +184,11 @@ ConnectionPoolWithFailover::TryResult ConnectionPoolWithFailover::tryGetEntry( auto table_status_it = status_response.table_states_by_id.find(*table_to_check); if (table_status_it == status_response.table_states_by_id.end()) { - fail_message = "There is no table " + table_to_check->database + "." + table_to_check->table - + " on server: " + result.entry->getDescription(); + fail_message = fmt::format( + "There is no table {}.{} on server: {}", + table_to_check->database, + table_to_check->table, + result.entry->getDescription()); LOG_WARNING(log, fail_message); return result; diff --git a/dbms/src/Common/ClickHouseRevision.cpp b/dbms/src/Common/ClickHouseRevision.cpp deleted file mode 100644 index 474c471e2b6..00000000000 --- a/dbms/src/Common/ClickHouseRevision.cpp +++ /dev/null @@ -1,23 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include - -namespace ClickHouseRevision -{ -unsigned get() -{ - return VERSION_REVISION; -} -} // namespace ClickHouseRevision diff --git a/dbms/src/Common/ClickHouseRevision.h b/dbms/src/Common/ClickHouseRevision.h deleted file mode 100644 index cb147702fb6..00000000000 --- a/dbms/src/Common/ClickHouseRevision.h +++ /dev/null @@ -1,23 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include - - -namespace ClickHouseRevision -{ -unsigned get(); -} diff --git a/dbms/src/Common/ErrorCodes.cpp b/dbms/src/Common/ErrorCodes.cpp index 4b8661c7c90..63cee9a5af9 100644 --- a/dbms/src/Common/ErrorCodes.cpp +++ b/dbms/src/Common/ErrorCodes.cpp @@ -216,7 +216,7 @@ extern const int WRONG_PASSWORD = 193; extern const int REQUIRED_PASSWORD = 194; extern const int IP_ADDRESS_NOT_ALLOWED = 195; extern const int UNKNOWN_ADDRESS_PATTERN_TYPE = 196; -extern const int SERVER_REVISION_IS_TOO_OLD = 197; +// extern const int SERVER_REVISION_IS_TOO_OLD = 197; // Not Used anymore extern const int DNS_ERROR = 198; extern const int UNKNOWN_QUOTA = 199; extern const int QUOTA_DOESNT_ALLOW_KEYS = 200; diff --git a/dbms/src/Common/TiFlashBuildInfo.cpp b/dbms/src/Common/TiFlashBuildInfo.cpp index 5e54e297464..454196a1d23 100644 --- a/dbms/src/Common/TiFlashBuildInfo.cpp +++ b/dbms/src/Common/TiFlashBuildInfo.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -20,44 +21,54 @@ #include #include -#include -#include #include namespace TiFlashBuildInfo { -std::string getName() +String getName() { return TIFLASH_NAME; } -std::string getVersion() +String getVersion() { return TIFLASH_VERSION; } -std::string getReleaseVersion() +String getReleaseVersion() { return TIFLASH_RELEASE_VERSION; } -std::string getEdition() +String getEdition() { return TIFLASH_EDITION; } -std::string getGitHash() +String getGitHash() { return TIFLASH_GIT_HASH; } -std::string getGitBranch() +String getGitBranch() { return TIFLASH_GIT_BRANCH; } -std::string getUTCBuildTime() +String getUTCBuildTime() { return TIFLASH_UTC_BUILD_TIME; } +UInt32 getMajorVersion() +{ + return TIFLASH_VERSION_MAJOR; +} +UInt32 getMinorVersion() +{ + return TIFLASH_VERSION_MINOR; +} +UInt32 getPatchVersion() +{ + return TIFLASH_VERSION_PATCH; +} // clang-format off -std::string getEnabledFeatures() +String getEnabledFeatures() { - std::vector features + std::vector features { // allocator #if USE_JEMALLOC @@ -131,7 +142,7 @@ std::string getEnabledFeatures() return fmt::format("{}", fmt::join(features.begin(), features.end(), " ")); } // clang-format on -std::string getProfile() +String getProfile() { return TIFLASH_PROFILE; } diff --git a/dbms/src/Common/TiFlashBuildInfo.h b/dbms/src/Common/TiFlashBuildInfo.h index 771dc91033c..16486cc41a0 100644 --- a/dbms/src/Common/TiFlashBuildInfo.h +++ b/dbms/src/Common/TiFlashBuildInfo.h @@ -14,21 +14,25 @@ #pragma once +#include + #include -#include namespace TiFlashBuildInfo { -std::string getName(); +String getName(); /// Semantic version. -std::string getVersion(); +String getVersion(); /// Release version that follows PD/TiKV/TiDB convention. -std::string getReleaseVersion(); -std::string getEdition(); -std::string getGitHash(); -std::string getGitBranch(); -std::string getUTCBuildTime(); -std::string getProfile(); +String getReleaseVersion(); +String getEdition(); +String getGitHash(); +String getGitBranch(); +String getUTCBuildTime(); +String getProfile(); +UInt32 getMajorVersion(); +UInt32 getMinorVersion(); +UInt32 getPatchVersion(); void outputDetail(std::ostream & os); } // namespace TiFlashBuildInfo diff --git a/dbms/src/Common/config_version.h.in b/dbms/src/Common/config_version.h.in index 6ce7b0157d7..c3ec1487d09 100644 --- a/dbms/src/Common/config_version.h.in +++ b/dbms/src/Common/config_version.h.in @@ -2,21 +2,22 @@ // .h autogenerated by cmake! -#cmakedefine01 USE_DBMS_TCP_PROTOCOL_VERSION - -#if USE_DBMS_TCP_PROTOCOL_VERSION - #include "Core/Defines.h" - #ifndef VERSION_REVISION - #define VERSION_REVISION DBMS_TCP_PROTOCOL_VERSION - #endif -#else - #cmakedefine VERSION_REVISION @VERSION_REVISION@ +#cmakedefine TIFLASH_VERSION_MAJOR @TIFLASH_VERSION_MAJOR@ +#ifndef TIFLASH_VERSION_MAJOR +#define TIFLASH_VERSION_MAJOR 0 +#endif +#cmakedefine TIFLASH_VERSION_MINOR @TIFLASH_VERSION_MINOR@ +#ifndef TIFLASH_VERSION_MINOR +#define TIFLASH_VERSION_MINOR 0 +#endif +#cmakedefine TIFLASH_VERSION_PATCH @TIFLASH_VERSION_PATCH@ +#ifndef TIFLASH_VERSION_PATCH +#define TIFLASH_VERSION_PATCH 0 +#endif +#cmakedefine TIFLASH_VERSION_EXTRA "@TIFLASH_VERSION_EXTRA@" +#ifndef TIFLASH_VERSION_EXTRA +#define TIFLASH_VERSION_EXTRA "" #endif - -#cmakedefine VERSION_STRING "@VERSION_STRING@" -#cmakedefine VERSION_FULL "@VERSION_FULL@" -#cmakedefine VERSION_DESCRIBE "@VERSION_DESCRIBE@" -#cmakedefine VERSION_GITHASH "@VERSION_GITHASH@" #cmakedefine TIFLASH_NAME "@TIFLASH_NAME@" #cmakedefine TIFLASH_VERSION "@TIFLASH_VERSION@" diff --git a/dbms/src/Core/Defines.h b/dbms/src/Core/Defines.h index 3b015407a03..b0d8c2060e0 100644 --- a/dbms/src/Core/Defines.h +++ b/dbms/src/Core/Defines.h @@ -17,10 +17,6 @@ #include #include -#define DBMS_NAME "TiFlash" -#define DBMS_VERSION_MAJOR 1 -#define DBMS_VERSION_MINOR 1 - #define DBMS_DEFAULT_HOST "localhost" #define DBMS_DEFAULT_PORT 9000 #define DBMS_DEFAULT_SECURE_PORT 9440 @@ -90,13 +86,6 @@ constexpr size_t DEFAULT_BLOCK_BYTES = DEFAULT_BLOCK_SIZE * 256; // 256B per row #define DEFAULT_QUERIES_QUEUE_WAIT_TIME_MS 5000 /// Maximum waiting time in the request queue. #define DBMS_DEFAULT_BACKGROUND_POOL_SIZE 16 -#define DBMS_MIN_REVISION_WITH_CLIENT_INFO 54032 -#define DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE 54058 -#define DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO 54060 -#define DBMS_MIN_REVISION_WITH_TABLES_STATUS 54226 -#define DBMS_MIN_REVISION_WITH_TIME_ZONE_PARAMETER_IN_DATETIME_DATA_TYPE 54337 -#define DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME 54372 - /// Version of ClickHouse TCP protocol. Set to git tag with latest protocol change. #define DBMS_TCP_PROTOCOL_VERSION 54226 diff --git a/dbms/src/DataStreams/MarkInCompressedFile.h b/dbms/src/DataStreams/MarkInCompressedFile.h index 2e832bd8068..ec19e34077c 100644 --- a/dbms/src/DataStreams/MarkInCompressedFile.h +++ b/dbms/src/DataStreams/MarkInCompressedFile.h @@ -29,8 +29,8 @@ namespace DB */ struct MarkInCompressedFile { - size_t offset_in_compressed_file; - size_t offset_in_decompressed_block; + size_t offset_in_compressed_file = 0; + size_t offset_in_decompressed_block = 0; bool operator==(const MarkInCompressedFile & rhs) const { diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.h b/dbms/src/DataStreams/MergeSortingBlockInputStream.h index 8a7be109ab9..e62bc43aebb 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.h +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.h @@ -19,7 +19,6 @@ #include #include #include -#include #include #include #include diff --git a/dbms/src/DataStreams/NativeBlockInputStream.h b/dbms/src/DataStreams/NativeBlockInputStream.h index 93a4320ce1b..67d66f6c6a1 100644 --- a/dbms/src/DataStreams/NativeBlockInputStream.h +++ b/dbms/src/DataStreams/NativeBlockInputStream.h @@ -51,7 +51,7 @@ struct IndexForNativeFormat using Blocks = std::vector; Blocks blocks; - IndexForNativeFormat() {} + IndexForNativeFormat() = default; IndexForNativeFormat(ReadBuffer & istr, const NameSet & required_columns) { read(istr, required_columns); } diff --git a/dbms/src/DataStreams/NativeBlockOutputStream.cpp b/dbms/src/DataStreams/NativeBlockOutputStream.cpp index 1ee069e5e04..ca25279ed13 100644 --- a/dbms/src/DataStreams/NativeBlockOutputStream.cpp +++ b/dbms/src/DataStreams/NativeBlockOutputStream.cpp @@ -125,13 +125,6 @@ void NativeBlockOutputStream::write(const Block & block) /// Type String type_name = column.type->getName(); - - /// For compatibility, we will not send explicit timezone parameter in DateTime data type - /// to older clients, that cannot understand it. - if (client_revision < DBMS_MIN_REVISION_WITH_TIME_ZONE_PARAMETER_IN_DATETIME_DATA_TYPE - && startsWith(type_name, "DateTime(")) - type_name = "DateTime"; - writeStringBinary(type_name, ostr); /// Data diff --git a/dbms/src/Functions/FunctionsMiscellaneous.cpp b/dbms/src/Functions/FunctionsMiscellaneous.cpp index f1929189387..4322bdf99a1 100644 --- a/dbms/src/Functions/FunctionsMiscellaneous.cpp +++ b/dbms/src/Functions/FunctionsMiscellaneous.cpp @@ -18,7 +18,6 @@ #include #include #include -#include #include #include #include @@ -236,10 +235,10 @@ class FunctionGetSizeOfEnumType : public IFunction { if (const auto * type = checkAndGetDataType(block.getByPosition(arguments[0]).type.get())) block.getByPosition(result).column - = DataTypeUInt8().createColumnConst(block.rows(), UInt64(type->getValues().size())); + = DataTypeUInt8().createColumnConst(block.rows(), static_cast(type->getValues().size())); else if (const auto * type = checkAndGetDataType(block.getByPosition(arguments[0]).type.get())) block.getByPosition(result).column - = DataTypeUInt16().createColumnConst(block.rows(), UInt64(type->getValues().size())); + = DataTypeUInt16().createColumnConst(block.rows(), static_cast(type->getValues().size())); else throw Exception( "The argument for function " + getName() + " must be Enum", @@ -527,8 +526,9 @@ class FunctionSleep : public IFunction } /// convertToFullColumn needed, because otherwise (constant expression case) function will not get called on each block. - block.getByPosition(result).column - = block.getByPosition(result).type->createColumnConst(size, UInt64(0))->convertToFullColumnIfConst(); + block.getByPosition(result).column = block.getByPosition(result) + .type->createColumnConst(size, static_cast(0)) + ->convertToFullColumnIfConst(); } }; @@ -761,7 +761,7 @@ class FunctionIgnore : public IFunction void executeImpl(Block & block, const ColumnNumbers & /*arguments*/, size_t result) const override { - block.getByPosition(result).column = DataTypeUInt8().createColumnConst(block.rows(), UInt64(0)); + block.getByPosition(result).column = DataTypeUInt8().createColumnConst(block.rows(), 0UL); } }; @@ -797,7 +797,7 @@ class FunctionIndexHint : public IFunction void executeImpl(Block & block, const ColumnNumbers & /*arguments*/, size_t result) const override { - block.getByPosition(result).column = DataTypeUInt8().createColumnConst(block.rows(), UInt64(1)); + block.getByPosition(result).column = DataTypeUInt8().createColumnConst(block.rows(), 1UL); } }; @@ -1258,8 +1258,8 @@ class FunctionRunningAccumulate : public IFunction free(ptr); // NOLINT(cppcoreguidelines-no-malloc) }; std::unique_ptr place{ - reinterpret_cast(malloc(agg_func.sizeOfData())), - deleter}; // NOLINT(cppcoreguidelines-no-malloc) + reinterpret_cast(malloc(agg_func.sizeOfData())), // NOLINT(cppcoreguidelines-no-malloc) + deleter}; agg_func.create( place.get()); /// Not much exception-safe. If an exception is thrown out, destroy will be called in vain. @@ -1531,7 +1531,8 @@ void FunctionHasColumnInTable::executeImpl(Block & block, const ColumnNumbers & has_column = table->hasColumn(column_name); } - block.getByPosition(result).column = DataTypeUInt8().createColumnConst(block.rows(), UInt64(has_column)); + block.getByPosition(result).column + = DataTypeUInt8().createColumnConst(block.rows(), static_cast(has_column)); } @@ -1596,7 +1597,7 @@ class FunctionThrowIf : public IFunction std::string FunctionVersion::getVersion() { std::ostringstream os; - os << TiFlashBuildInfo::getVersion(); + os << TiFlashBuildInfo::getReleaseVersion(); return os.str(); } diff --git a/dbms/src/IO/Progress.cpp b/dbms/src/IO/Progress.cpp index 82300ee8ce0..df0a4fb3318 100644 --- a/dbms/src/IO/Progress.cpp +++ b/dbms/src/IO/Progress.cpp @@ -22,7 +22,7 @@ namespace DB { -void ProgressValues::read(ReadBuffer & in, UInt64 /*server_revision*/) +void ProgressValues::read(ReadBuffer & in) { size_t new_rows = 0; size_t new_bytes = 0; @@ -38,7 +38,7 @@ void ProgressValues::read(ReadBuffer & in, UInt64 /*server_revision*/) } -void ProgressValues::write(WriteBuffer & out, UInt64 /*client_revision*/) const +void ProgressValues::write(WriteBuffer & out) const { writeVarUInt(rows, out); writeVarUInt(bytes, out); @@ -46,10 +46,10 @@ void ProgressValues::write(WriteBuffer & out, UInt64 /*client_revision*/) const } -void Progress::read(ReadBuffer & in, UInt64 server_revision) +void Progress::read(ReadBuffer & in) { ProgressValues values{}; - values.read(in, server_revision); + values.read(in); rows.store(values.rows, std::memory_order_relaxed); bytes.store(values.bytes, std::memory_order_relaxed); @@ -57,9 +57,9 @@ void Progress::read(ReadBuffer & in, UInt64 server_revision) } -void Progress::write(WriteBuffer & out, UInt64 client_revision) const +void Progress::write(WriteBuffer & out) const { - getValues().write(out, client_revision); + getValues().write(out); } } // namespace DB diff --git a/dbms/src/IO/Progress.h b/dbms/src/IO/Progress.h index 53ff56b229b..456d813039c 100644 --- a/dbms/src/IO/Progress.h +++ b/dbms/src/IO/Progress.h @@ -33,8 +33,8 @@ struct ProgressValues size_t bytes; size_t total_rows; - void read(ReadBuffer & in, UInt64 server_revision); - void write(WriteBuffer & out, UInt64 client_revision) const; + void read(ReadBuffer & in); + void write(WriteBuffer & out) const; }; @@ -60,8 +60,8 @@ struct Progress , total_rows(total_rows_) {} - void read(ReadBuffer & in, UInt64 server_revision); - void write(WriteBuffer & out, UInt64 client_revision) const; + void read(ReadBuffer & in); + void write(WriteBuffer & out) const; /// Each value separately is changed atomically (but not whole object). void incrementPiecewiseAtomically(const Progress & rhs) diff --git a/dbms/src/Interpreters/ClientInfo.cpp b/dbms/src/Interpreters/ClientInfo.cpp index c7862e82734..e3dcc683a6a 100644 --- a/dbms/src/Interpreters/ClientInfo.cpp +++ b/dbms/src/Interpreters/ClientInfo.cpp @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include +#include #include #include #include @@ -31,14 +31,9 @@ extern const int LOGICAL_ERROR; } -void ClientInfo::write(WriteBuffer & out, const UInt64 server_protocol_revision) const +void ClientInfo::write(WriteBuffer & out) const { - if (server_protocol_revision < DBMS_MIN_REVISION_WITH_CLIENT_INFO) - throw Exception( - "Logical error: method ClientInfo::write is called for unsupported server revision", - ErrorCodes::LOGICAL_ERROR); - - writeBinary(UInt8(query_kind), out); + writeBinary(static_cast(query_kind), out); if (empty()) return; @@ -46,7 +41,7 @@ void ClientInfo::write(WriteBuffer & out, const UInt64 server_protocol_revision) writeBinary(initial_query_id, out); writeBinary(initial_address.toString(), out); - writeBinary(UInt8(interface), out); + writeBinary(static_cast(interface), out); if (interface == Interface::TCP) { @@ -55,24 +50,18 @@ void ClientInfo::write(WriteBuffer & out, const UInt64 server_protocol_revision) writeBinary(client_name, out); writeVarUInt(client_version_major, out); writeVarUInt(client_version_minor, out); - writeVarUInt(client_revision, out); + writeVarUInt(client_version_patch, out); } - if (server_protocol_revision >= DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO) - writeBinary(quota_key, out); + writeBinary(quota_key, out); } -void ClientInfo::read(ReadBuffer & in, const UInt64 client_protocol_revision) +void ClientInfo::read(ReadBuffer & in) { - if (client_protocol_revision < DBMS_MIN_REVISION_WITH_CLIENT_INFO) - throw Exception( - "Logical error: method ClientInfo::read is called for unsupported client revision", - ErrorCodes::LOGICAL_ERROR); - UInt8 read_query_kind = 0; readBinary(read_query_kind, in); - query_kind = QueryKind(read_query_kind); + query_kind = static_cast(read_query_kind); if (empty()) return; @@ -85,7 +74,7 @@ void ClientInfo::read(ReadBuffer & in, const UInt64 client_protocol_revision) UInt8 read_interface = 0; readBinary(read_interface, in); - interface = Interface(read_interface); + interface = static_cast(read_interface); if (interface == Interface::TCP) { @@ -94,11 +83,10 @@ void ClientInfo::read(ReadBuffer & in, const UInt64 client_protocol_revision) readBinary(client_name, in); readVarUInt(client_version_major, in); readVarUInt(client_version_minor, in); - readVarUInt(client_revision, in); + readVarUInt(client_version_patch, in); } - if (client_protocol_revision >= DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO) - readBinary(quota_key, in); + readBinary(quota_key, in); } @@ -112,9 +100,9 @@ void ClientInfo::fillOSUserHostNameAndVersionInfo() client_hostname = getFQDNOrHostName(); - client_version_major = DBMS_VERSION_MAJOR; - client_version_minor = DBMS_VERSION_MINOR; - client_revision = ClickHouseRevision::get(); + client_version_major = TiFlashBuildInfo::getMajorVersion(); + client_version_minor = TiFlashBuildInfo::getMinorVersion(); + client_version_patch = TiFlashBuildInfo::getPatchVersion(); } diff --git a/dbms/src/Interpreters/ClientInfo.h b/dbms/src/Interpreters/ClientInfo.h index a56944766f8..837388c3ca2 100644 --- a/dbms/src/Interpreters/ClientInfo.h +++ b/dbms/src/Interpreters/ClientInfo.h @@ -71,7 +71,7 @@ class ClientInfo String client_name; UInt64 client_version_major = 0; UInt64 client_version_minor = 0; - unsigned client_revision = 0; + UInt64 client_version_patch = 0; /// Common String quota_key; @@ -80,10 +80,9 @@ class ClientInfo /** Serialization and deserialization. * Only values that are not calculated automatically or passed separately are serialized. - * Revisions are passed to use format that server will understand or client was used. */ - void write(WriteBuffer & out, UInt64 server_protocol_revision) const; - void read(ReadBuffer & in, UInt64 client_protocol_revision); + void write(WriteBuffer & out) const; + void read(ReadBuffer & in); void fillOSUserHostNameAndVersionInfo(); }; diff --git a/dbms/src/Interpreters/QueryLog.cpp b/dbms/src/Interpreters/QueryLog.cpp index 3dcdb116669..f82eca4c971 100644 --- a/dbms/src/Interpreters/QueryLog.cpp +++ b/dbms/src/Interpreters/QueryLog.cpp @@ -15,7 +15,7 @@ #include #include #include -#include +#include #include #include #include @@ -70,14 +70,11 @@ Block QueryLogElement::createBlock() {ColumnString::create(), std::make_shared(), "os_user"}, {ColumnString::create(), std::make_shared(), "client_hostname"}, {ColumnString::create(), std::make_shared(), "client_name"}, - {ColumnUInt32::create(), std::make_shared(), "client_revision"}, {ColumnUInt8::create(), std::make_shared(), "http_method"}, {ColumnString::create(), std::make_shared(), "http_user_agent"}, {ColumnString::create(), std::make_shared(), "quota_key"}, - - {ColumnUInt32::create(), std::make_shared(), "revision"}, }; } @@ -111,50 +108,46 @@ void QueryLogElement::appendToBlock(Block & block) const size_t i = 0; - columns[i++]->insert(UInt64(type)); - columns[i++]->insert(UInt64(DateLUT::instance().toDayNum(event_time))); - columns[i++]->insert(UInt64(event_time)); - columns[i++]->insert(UInt64(query_start_time)); - columns[i++]->insert(UInt64(query_duration_ms)); + columns[i++]->insert(static_cast(type)); + columns[i++]->insert(static_cast(DateLUT::instance().toDayNum(event_time))); + columns[i++]->insert(static_cast(event_time)); + columns[i++]->insert(static_cast(query_start_time)); + columns[i++]->insert((query_duration_ms)); - columns[i++]->insert(UInt64(read_rows)); - columns[i++]->insert(UInt64(read_bytes)); + columns[i++]->insert((read_rows)); + columns[i++]->insert((read_bytes)); - columns[i++]->insert(UInt64(written_rows)); - columns[i++]->insert(UInt64(written_bytes)); + columns[i++]->insert((written_rows)); + columns[i++]->insert((written_bytes)); - columns[i++]->insert(UInt64(result_rows)); - columns[i++]->insert(UInt64(result_bytes)); + columns[i++]->insert((result_rows)); + columns[i++]->insert((result_bytes)); - columns[i++]->insert(UInt64(memory_usage)); + columns[i++]->insert((memory_usage)); columns[i++]->insertData(query.data(), query.size()); columns[i++]->insertData(exception.data(), exception.size()); columns[i++]->insertData(stack_trace.data(), stack_trace.size()); - columns[i++]->insert(UInt64(client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY)); + columns[i++]->insert(static_cast(client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY)); columns[i++]->insert(client_info.current_user); columns[i++]->insert(client_info.current_query_id); columns[i++]->insertData(IPv6ToBinary(client_info.current_address.host()).data(), 16); - columns[i++]->insert(UInt64(client_info.current_address.port())); + columns[i++]->insert(static_cast(client_info.current_address.port())); columns[i++]->insert(client_info.initial_user); columns[i++]->insert(client_info.initial_query_id); columns[i++]->insertData(IPv6ToBinary(client_info.initial_address.host()).data(), 16); - columns[i++]->insert(UInt64(client_info.initial_address.port())); + columns[i++]->insert(static_cast(client_info.initial_address.port())); - columns[i++]->insert(UInt64(client_info.interface)); + columns[i++]->insert(static_cast(client_info.interface)); columns[i++]->insert(client_info.os_user); columns[i++]->insert(client_info.client_hostname); columns[i++]->insert(client_info.client_name); - columns[i++]->insert(UInt64(client_info.client_revision)); columns[i++]->insert(client_info.quota_key); - - columns[i++]->insert(UInt64(ClickHouseRevision::get())); - block.setColumns(std::move(columns)); } diff --git a/dbms/src/Interpreters/TablesStatus.cpp b/dbms/src/Interpreters/TablesStatus.cpp index a7ad557f818..cc06e5c388b 100644 --- a/dbms/src/Interpreters/TablesStatus.cpp +++ b/dbms/src/Interpreters/TablesStatus.cpp @@ -39,13 +39,8 @@ void TableStatus::read(ReadBuffer & in) } } -void TablesStatusRequest::write(WriteBuffer & out, UInt64 server_protocol_revision) const +void TablesStatusRequest::write(WriteBuffer & out) const { - if (server_protocol_revision < DBMS_MIN_REVISION_WITH_TABLES_STATUS) - throw Exception( - "Logical error: method TablesStatusRequest::write is called for unsupported server revision", - ErrorCodes::LOGICAL_ERROR); - writeVarUInt(tables.size(), out); for (const auto & table_name : tables) { @@ -54,13 +49,8 @@ void TablesStatusRequest::write(WriteBuffer & out, UInt64 server_protocol_revisi } } -void TablesStatusRequest::read(ReadBuffer & in, UInt64 client_protocol_revision) +void TablesStatusRequest::read(ReadBuffer & in) { - if (client_protocol_revision < DBMS_MIN_REVISION_WITH_TABLES_STATUS) - throw Exception( - "method TablesStatusRequest::read is called for unsupported client revision", - ErrorCodes::LOGICAL_ERROR); - size_t size = 0; readVarUInt(size, in); @@ -76,13 +66,8 @@ void TablesStatusRequest::read(ReadBuffer & in, UInt64 client_protocol_revision) } } -void TablesStatusResponse::write(WriteBuffer & out, UInt64 client_protocol_revision) const +void TablesStatusResponse::write(WriteBuffer & out) const { - if (client_protocol_revision < DBMS_MIN_REVISION_WITH_TABLES_STATUS) - throw Exception( - "method TablesStatusResponse::write is called for unsupported client revision", - ErrorCodes::LOGICAL_ERROR); - writeVarUInt(table_states_by_id.size(), out); for (const auto & kv : table_states_by_id) { @@ -95,13 +80,8 @@ void TablesStatusResponse::write(WriteBuffer & out, UInt64 client_protocol_revis } } -void TablesStatusResponse::read(ReadBuffer & in, UInt64 server_protocol_revision) +void TablesStatusResponse::read(ReadBuffer & in) { - if (server_protocol_revision < DBMS_MIN_REVISION_WITH_TABLES_STATUS) - throw Exception( - "method TablesStatusResponse::read is called for unsupported server revision", - ErrorCodes::LOGICAL_ERROR); - size_t size = 0; readVarUInt(size, in); diff --git a/dbms/src/Interpreters/TablesStatus.h b/dbms/src/Interpreters/TablesStatus.h index 7e18f7fcbfa..e82ec110959 100644 --- a/dbms/src/Interpreters/TablesStatus.h +++ b/dbms/src/Interpreters/TablesStatus.h @@ -52,16 +52,16 @@ struct TablesStatusRequest { std::unordered_set tables; - void write(WriteBuffer & out, UInt64 server_protocol_revision) const; - void read(ReadBuffer & in, UInt64 client_protocol_revision); + void write(WriteBuffer & out) const; + void read(ReadBuffer & in); }; struct TablesStatusResponse { std::unordered_map table_states_by_id; - void write(WriteBuffer & out, UInt64 client_protocol_revision) const; - void read(ReadBuffer & in, UInt64 server_protocol_revision); + void write(WriteBuffer & out) const; + void read(ReadBuffer & in); }; } // namespace DB diff --git a/dbms/src/Server/Client.cpp b/dbms/src/Server/Client.cpp index 298eef35bec..5a213da6d89 100644 --- a/dbms/src/Server/Client.cpp +++ b/dbms/src/Server/Client.cpp @@ -14,7 +14,6 @@ #include #include -#include #include #include #include @@ -23,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -500,18 +500,15 @@ class Client : public Poco::Util::Application String server_name; UInt64 server_version_major = 0; UInt64 server_version_minor = 0; - UInt64 server_revision = 0; - + UInt64 server_version_patch = 0; if (max_client_network_bandwidth) { ThrottlerPtr throttler = std::make_shared(max_client_network_bandwidth, 0, ""); connection->setThrottler(throttler); } - connection->getServerVersion(server_name, server_version_major, server_version_minor, server_revision); - - server_version - = toString(server_version_major) + "." + toString(server_version_minor) + "." + toString(server_revision); + connection->getServerVersion(server_name, server_version_major, server_version_minor, server_version_patch); + server_version = fmt::format("{}.{}.{}", server_version_major, server_version_minor, server_version_patch); if (server_display_name = connection->getServerDisplayName(); server_display_name.length() == 0) { @@ -1304,8 +1301,7 @@ class Client : public Poco::Util::Application static void showClientVersion() { - std::cout << "ClickHouse client version " << DBMS_VERSION_MAJOR << "." << DBMS_VERSION_MINOR << "." - << ClickHouseRevision::get() << "." << std::endl; + std::cout << "TiFlash client version " << TiFlashBuildInfo::getReleaseVersion() << "." << std::endl; } public: diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index d6fbae985ed..d52f9a64efa 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -14,7 +14,6 @@ #include #include -#include #include #include #include diff --git a/dbms/src/Server/StatusFile.cpp b/dbms/src/Server/StatusFile.cpp index 3034bdf54ee..33f19f36ee0 100644 --- a/dbms/src/Server/StatusFile.cpp +++ b/dbms/src/Server/StatusFile.cpp @@ -14,7 +14,7 @@ #include "StatusFile.h" -#include +#include #include #include #include @@ -84,7 +84,7 @@ StatusFile::StatusFile(const std::string & path_) WriteBufferFromFileDescriptor out(fd, 1024); out << "PID: " << getpid() << "\n" << "Started at: " << LocalDateTime(time(nullptr)) << "\n" - << "Revision: " << ClickHouseRevision::get() << "\n"; + << "Version: " << TiFlashBuildInfo::getReleaseVersion() << "\n"; } } catch (...) diff --git a/dbms/src/Server/TCPHandler.cpp b/dbms/src/Server/TCPHandler.cpp index 6ee4cd31d64..87de58f123b 100644 --- a/dbms/src/Server/TCPHandler.cpp +++ b/dbms/src/Server/TCPHandler.cpp @@ -14,7 +14,6 @@ #include "TCPHandler.h" -#include #include #include #include @@ -444,7 +443,7 @@ void TCPHandler::processOrdinaryQuery() void TCPHandler::processTablesStatusRequest() { TablesStatusRequest request; - request.read(*in, client_revision); + request.read(*in); TablesStatusResponse response; for (const QualifiedTableName & table_name : request.tables) @@ -460,7 +459,7 @@ void TCPHandler::processTablesStatusRequest() } writeVarUInt(Protocol::Server::TablesStatusResponse, *out); - response.write(*out, client_revision); + response.write(*out); } @@ -528,7 +527,7 @@ void TCPHandler::receiveHello() readStringBinary(client_name, *in); readVarUInt(client_version_major, *in); readVarUInt(client_version_minor, *in); - readVarUInt(client_revision, *in); + readVarUInt(client_version_patch, *in); readStringBinary(default_database, *in); readStringBinary(user, *in); readStringBinary(password, *in); @@ -539,7 +538,7 @@ void TCPHandler::receiveHello() client_name, client_version_major, client_version_minor, - client_revision); + client_version_patch); if (!default_database.empty()) fmt_buf.fmtAppend(", database: {}", default_database); if (!user.empty()) @@ -554,18 +553,12 @@ void TCPHandler::receiveHello() void TCPHandler::sendHello() { writeVarUInt(Protocol::Server::Hello, *out); - writeStringBinary(DBMS_NAME, *out); - writeVarUInt(DBMS_VERSION_MAJOR, *out); - writeVarUInt(DBMS_VERSION_MINOR, *out); - writeVarUInt(ClickHouseRevision::get(), *out); - if (client_revision >= DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE) - { - writeStringBinary(DateLUT::instance().getTimeZone(), *out); - } - if (client_revision >= DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME) - { - writeStringBinary(server_display_name, *out); - } + writeStringBinary(fmt::format("{} {}", TiFlashBuildInfo::getName(), client_name), *out); + writeVarUInt(TiFlashBuildInfo::getMajorVersion(), *out); + writeVarUInt(TiFlashBuildInfo::getMinorVersion(), *out); + writeVarUInt(TiFlashBuildInfo::getPatchVersion(), *out); + writeStringBinary(DateLUT::instance().getTimeZone(), *out); + writeStringBinary(server_display_name, *out); out->next(); } @@ -637,8 +630,7 @@ void TCPHandler::receiveQuery() /// Client info { ClientInfo & client_info = query_context.getClientInfo(); - if (client_revision >= DBMS_MIN_REVISION_WITH_CLIENT_INFO) - client_info.read(*in, client_revision); + client_info.read(*in); /// For better support of old clients, that does not send ClientInfo. if (client_info.query_kind == ClientInfo::QueryKind::NO_QUERY) @@ -647,7 +639,7 @@ void TCPHandler::receiveQuery() client_info.client_name = client_name; client_info.client_version_major = client_version_major; client_info.client_version_minor = client_version_minor; - client_info.client_revision = client_revision; + client_info.client_version_patch = client_version_patch; } /// Set fields, that are known apriori. @@ -673,7 +665,7 @@ void TCPHandler::receiveQuery() state.timeout_setter = std::make_unique(socket(), settings.receive_timeout, settings.send_timeout); readVarUInt(stage, *in); - state.stage = QueryProcessingStage::Enum(stage); + state.stage = static_cast(stage); readVarUInt(compression, *in); state.compression = static_cast(compression); @@ -731,7 +723,7 @@ void TCPHandler::initBlockInput() else state.maybe_compressed_in = in; - state.block_in = std::make_shared(*state.maybe_compressed_in, client_revision); + state.block_in = std::make_shared(*state.maybe_compressed_in, 1); } } @@ -746,10 +738,7 @@ void TCPHandler::initBlockOutput(const Block & block) else state.maybe_compressed_out = out; - state.block_out = std::make_shared( - *state.maybe_compressed_out, - client_revision, - block.cloneEmpty()); + state.block_out = std::make_shared(*state.maybe_compressed_out, 1, block.cloneEmpty()); } } @@ -851,7 +840,7 @@ void TCPHandler::sendProgress() { writeVarUInt(Protocol::Server::Progress, *out); auto increment = state.progress.fetchAndResetPiecewiseAtomically(); - increment.write(*out, client_revision); + increment.write(*out); out->next(); } diff --git a/dbms/src/Server/TCPHandler.h b/dbms/src/Server/TCPHandler.h index 986f7477722..8d748c8b441 100644 --- a/dbms/src/Server/TCPHandler.h +++ b/dbms/src/Server/TCPHandler.h @@ -30,10 +30,6 @@ #include "IServer.h" -namespace Poco -{ -class Logger; -} namespace DB { @@ -77,7 +73,7 @@ struct QueryState void reset() { *this = QueryState(); } - bool empty() { return is_empty; } + bool empty() const { return is_empty; } }; @@ -87,23 +83,23 @@ class TCPHandler : public Poco::Net::TCPServerConnection TCPHandler(IServer & server_, const Poco::Net::StreamSocket & socket_) : Poco::Net::TCPServerConnection(socket_) , server(server_) - , log(&Poco::Logger::get("TCPHandler")) + , log(Logger::get("TCPHandler")) , connection_context(server.context()) , query_context(server.context()) { server_display_name = server.config().getString("display_name", "TiFlash"); } - void run(); + void run() override; private: IServer & server; - Poco::Logger * log; + LoggerPtr log; String client_name; UInt64 client_version_major = 0; UInt64 client_version_minor = 0; - UInt64 client_revision = 0; + UInt64 client_version_patch = 0; Context connection_context; Context query_context; diff --git a/dbms/src/Server/main.cpp b/dbms/src/Server/main.cpp index 7c818cdcc67..aa33635140e 100644 --- a/dbms/src/Server/main.cpp +++ b/dbms/src/Server/main.cpp @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include #include #include #include diff --git a/dbms/src/Storages/System/StorageSystemProcesses.cpp b/dbms/src/Storages/System/StorageSystemProcesses.cpp index 259df05a735..6918fcdd4a3 100644 --- a/dbms/src/Storages/System/StorageSystemProcesses.cpp +++ b/dbms/src/Storages/System/StorageSystemProcesses.cpp @@ -48,7 +48,7 @@ StorageSystemProcesses::StorageSystemProcesses(const std::string & name_) {"client_name", std::make_shared()}, {"client_version_major", std::make_shared()}, {"client_version_minor", std::make_shared()}, - {"client_revision", std::make_shared()}, + {"client_version_patch", std::make_shared()}, {"quota_key", std::make_shared()}, @@ -84,30 +84,31 @@ BlockInputStreams StorageSystemProcesses::read( for (const auto & process : info) { size_t i = 0; - res_columns[i++]->insert(UInt64(process.client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY)); + res_columns[i++]->insert( + static_cast(process.client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY)); res_columns[i++]->insert(process.client_info.current_user); res_columns[i++]->insert(process.client_info.current_query_id); res_columns[i++]->insert(process.client_info.current_address.host().toString()); - res_columns[i++]->insert(UInt64(process.client_info.current_address.port())); + res_columns[i++]->insert(static_cast(process.client_info.current_address.port())); res_columns[i++]->insert(process.client_info.initial_user); res_columns[i++]->insert(process.client_info.initial_query_id); res_columns[i++]->insert(process.client_info.initial_address.host().toString()); - res_columns[i++]->insert(UInt64(process.client_info.initial_address.port())); - res_columns[i++]->insert(UInt64(process.client_info.interface)); + res_columns[i++]->insert(static_cast(process.client_info.initial_address.port())); + res_columns[i++]->insert(static_cast(process.client_info.interface)); res_columns[i++]->insert(process.client_info.os_user); res_columns[i++]->insert(process.client_info.client_hostname); res_columns[i++]->insert(process.client_info.client_name); res_columns[i++]->insert(process.client_info.client_version_major); res_columns[i++]->insert(process.client_info.client_version_minor); - res_columns[i++]->insert(UInt64(process.client_info.client_revision)); + res_columns[i++]->insert(process.client_info.client_version_patch); res_columns[i++]->insert(process.client_info.quota_key); res_columns[i++]->insert(process.elapsed_seconds); - res_columns[i++]->insert(UInt64(process.is_cancelled)); - res_columns[i++]->insert(UInt64(process.read_rows)); - res_columns[i++]->insert(UInt64(process.read_bytes)); - res_columns[i++]->insert(UInt64(process.total_rows)); - res_columns[i++]->insert(UInt64(process.written_rows)); - res_columns[i++]->insert(UInt64(process.written_bytes)); + res_columns[i++]->insert(static_cast(process.is_cancelled)); + res_columns[i++]->insert(static_cast(process.read_rows)); + res_columns[i++]->insert(static_cast(process.read_bytes)); + res_columns[i++]->insert(static_cast(process.total_rows)); + res_columns[i++]->insert(static_cast(process.written_rows)); + res_columns[i++]->insert(static_cast(process.written_bytes)); res_columns[i++]->insert(process.memory_usage); res_columns[i++]->insert(process.peak_memory_usage); res_columns[i++]->insert(process.query); diff --git a/libs/libdaemon/include/daemon/BaseDaemon.h b/libs/libdaemon/include/daemon/BaseDaemon.h index 8aa20198f66..1246908fd55 100644 --- a/libs/libdaemon/include/daemon/BaseDaemon.h +++ b/libs/libdaemon/include/daemon/BaseDaemon.h @@ -62,7 +62,7 @@ class BaseDaemon : public Poco::Util::ServerApplication static constexpr char DEFAULT_GRAPHITE_CONFIG_NAME[] = "graphite"; BaseDaemon(); - ~BaseDaemon(); + ~BaseDaemon() override; /// Load configuration, prepare loggers, etc. void initialize(Poco::Util::Application &) override; @@ -148,7 +148,7 @@ class BaseDaemon : public Poco::Util::ServerApplication std::optional getLayer() const { return layer; } protected: - virtual void logRevision() const; + virtual void logVersion() const; /// Used when exitOnTaskError() void handleNotification(Poco::TaskFailedNotification *); diff --git a/libs/libdaemon/src/BaseDaemon.cpp b/libs/libdaemon/src/BaseDaemon.cpp index 6db7effb0fd..295fcb0077d 100644 --- a/libs/libdaemon/src/BaseDaemon.cpp +++ b/libs/libdaemon/src/BaseDaemon.cpp @@ -42,7 +42,6 @@ // ucontext is not available without _XOPEN_SOURCE #define _XOPEN_SOURCE #endif -#include #include #include #include @@ -97,11 +96,9 @@ extern "C" int __llvm_profile_write_file(void); using Poco::AutoPtr; using Poco::ConsoleChannel; -using Poco::FileChannel; using Poco::FormattingChannel; using Poco::Logger; using Poco::Message; -using Poco::Path; using Poco::Util::AbstractConfiguration; constexpr char BaseDaemon::DEFAULT_GRAPHITE_CONFIG_NAME[]; @@ -622,8 +619,7 @@ static void terminate_handler() log << "what(): " << e.what() << std::endl; } catch (...) - { - } + {} log << "Stack trace:\n\n" << StackTrace().toString() << std::endl; } @@ -1162,7 +1158,7 @@ void BaseDaemon::initialize(Application & self) static KillingErrorHandler killing_error_handler; Poco::ErrorHandler::set(&killing_error_handler); - logRevision(); + logVersion(); signal_listener = std::make_unique(*this); signal_listener_thread.start(*signal_listener); @@ -1173,11 +1169,10 @@ void BaseDaemon::initialize(Application & self) } } -void BaseDaemon::logRevision() const +void BaseDaemon::logVersion() const { auto * log = &Logger::root(); LOG_INFO(log, "Welcome to TiFlash"); - LOG_INFO(log, "Starting daemon with revision " + Poco::NumberFormatter::format(ClickHouseRevision::get())); std::stringstream ss; TiFlashBuildInfo::outputDetail(ss); LOG_INFO(log, "TiFlash build info: {}", ss.str()); From cf1082fbf9ba46a6d61317130aa81e47ee8dabdc Mon Sep 17 00:00:00 2001 From: jinhelin Date: Tue, 12 Dec 2023 18:54:19 +0800 Subject: [PATCH 4/7] Disagg: Fetch MemTableSet by streaming (#8430) close pingcap/tiflash#8429 --- dbms/src/Common/FailPoint.cpp | 3 +- .../WNEstablishDisaggTaskHandler.cpp | 3 +- .../WNFetchPagesStreamWriter.cpp | 123 +++-- .../Disaggregated/WNFetchPagesStreamWriter.h | 40 +- dbms/src/Flash/FlashService.cpp | 10 +- dbms/src/Interpreters/Settings.h | 3 +- .../ColumnFile/ColumnFileInMemory.h | 5 + .../DeltaMerge/Delta/DeltaValueSpace.h | 2 + .../Storages/DeltaMerge/DeltaMergeStore.cpp | 2 + .../DeltaMerge/ReadThread/MergedTask.h | 19 + .../DeltaMerge/ReadThread/SegmentReader.cpp | 6 +- .../Storages/DeltaMerge/Remote/Serializer.cpp | 79 +-- .../Storages/DeltaMerge/Remote/Serializer.h | 31 +- .../Storages/DeltaMerge/SegmentReadTask.cpp | 150 ++++-- .../src/Storages/DeltaMerge/SegmentReadTask.h | 5 + .../tests/gtest_segment_read_task.cpp | 451 +++++++++++++++++- 16 files changed, 780 insertions(+), 152 deletions(-) diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index 41ead893640..afee3ff95f2 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -107,7 +107,8 @@ namespace DB M(force_set_parallel_prehandle_threshold) \ M(force_raise_prehandle_exception) \ M(force_agg_on_partial_block) \ - M(delta_tree_create_node_fail) + M(delta_tree_create_node_fail) \ + M(disable_flush_cache) #define APPLY_FOR_PAUSEABLE_FAILPOINTS_ONCE(M) \ M(pause_with_alter_locks_acquired) \ diff --git a/dbms/src/Flash/Disaggregated/WNEstablishDisaggTaskHandler.cpp b/dbms/src/Flash/Disaggregated/WNEstablishDisaggTaskHandler.cpp index 3fa5214d03a..ac29e8f69fe 100644 --- a/dbms/src/Flash/Disaggregated/WNEstablishDisaggTaskHandler.cpp +++ b/dbms/src/Flash/Disaggregated/WNEstablishDisaggTaskHandler.cpp @@ -100,9 +100,10 @@ void WNEstablishDisaggTaskHandler::execute(disaggregated::EstablishDisaggTaskRes } using DM::Remote::Serializer; + bool need_mem_data = !context->getSettingsRef().dt_enable_fetch_memtableset; snap->iterateTableSnapshots([&](const DM::Remote::DisaggPhysicalTableReadSnapshotPtr & snap) { response->add_tables( - Serializer::serializePhysicalTable(snap, task_id, mem_tracker_wrapper).SerializeAsString()); + Serializer::serializePhysicalTable(snap, task_id, mem_tracker_wrapper, need_mem_data).SerializeAsString()); }); } diff --git a/dbms/src/Flash/Disaggregated/WNFetchPagesStreamWriter.cpp b/dbms/src/Flash/Disaggregated/WNFetchPagesStreamWriter.cpp index e21390ba6b3..2700aa9f58b 100644 --- a/dbms/src/Flash/Disaggregated/WNFetchPagesStreamWriter.cpp +++ b/dbms/src/Flash/Disaggregated/WNFetchPagesStreamWriter.cpp @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -30,20 +31,15 @@ #include #include +#include #include +#include + +using namespace DB::DM::Remote; namespace DB { -WNFetchPagesStreamWriterPtr WNFetchPagesStreamWriter::build( - const DM::Remote::SegmentPagesFetchTask & task, - const PageIdU64s & read_page_ids, - UInt64 packet_limit_size) -{ - return std::unique_ptr( - new WNFetchPagesStreamWriter(task.seg_task, task.column_defines, read_page_ids, packet_limit_size)); -} - -std::pair WNFetchPagesStreamWriter::getPersistedRemotePage(UInt64 page_id) +std::tuple WNFetchPagesStreamWriter::getPersistedRemotePage(UInt64 page_id) { auto page = seg_task->read_snapshot->delta->getPersistedFileSetSnapshot()->getDataProvider()->readTinyData(page_id); DM::RemotePb::RemotePage remote_page; @@ -57,57 +53,100 @@ std::pair WNFetchPagesStreamWriter::getPersist return {remote_page, page.data.size()}; } -void WNFetchPagesStreamWriter::pipeTo(SyncPagePacketWriter * sync_writer) +std::tuple WNFetchPagesStreamWriter::sendMemTableSet() +{ + const auto & memtableset_snap = seg_task->read_snapshot->delta->getMemTableSetSnapshot(); + const auto & cfs = memtableset_snap->getColumnFiles(); + disaggregated::PagesPacket packet; + UInt64 total_chunks_size = 0; + UInt64 pending_chunks_size = 0; + UInt64 packet_count = 0; + for (const auto & cf : cfs) + { + packet.mutable_chunks()->Add( + Serializer::serializeCF(cf, memtableset_snap->getDataProvider(), /*need_mem_data*/ true) + .SerializeAsString()); + auto sz = packet.chunks().rbegin()->size(); + total_chunks_size += sz; + pending_chunks_size += sz; + mem_tracker_wrapper.alloc(sz); + if (pending_chunks_size > packet_limit_size) + { + ++packet_count; + sync_write(packet); + packet.clear_chunks(); // Only set chunks field before. + mem_tracker_wrapper.free(pending_chunks_size); + pending_chunks_size = 0; + } + } + if (packet.chunks_size() > 0) + { + ++packet_count; + sync_write(packet); + mem_tracker_wrapper.free(pending_chunks_size); + } + return std::make_tuple(cfs.size(), total_chunks_size, packet_count); +} + +std::tuple WNFetchPagesStreamWriter::sendPages() { disaggregated::PagesPacket packet; - MemTrackerWrapper packet_mem_tracker_wrapper(fetch_pages_mem_tracker.get()); - UInt64 total_pages_data_size = 0; + UInt64 total_pages_size = 0; + UInt64 pending_pages_size = 0; UInt64 packet_count = 0; - UInt64 pending_pages_data_size = 0; - UInt64 read_page_ns = 0; - UInt64 send_page_ns = 0; for (const auto page_id : read_page_ids) { - Stopwatch sw_packet; auto [remote_page, page_size] = getPersistedRemotePage(page_id); - total_pages_data_size += page_size; - pending_pages_data_size += page_size; + total_pages_size += page_size; + pending_pages_size += page_size; packet.mutable_pages()->Add(remote_page.SerializeAsString()); - packet_mem_tracker_wrapper.alloc(page_size); - read_page_ns += sw_packet.elapsedFromLastTime(); - - if (pending_pages_data_size > packet_limit_size) + mem_tracker_wrapper.alloc(page_size); + if (pending_pages_size > packet_limit_size) { ++packet_count; - sync_writer->Write(packet); - send_page_ns += sw_packet.elapsedFromLastTime(); - pending_pages_data_size = 0; + sync_write(packet); packet.clear_pages(); // Only set pages field before. - packet_mem_tracker_wrapper.freeAll(); + mem_tracker_wrapper.free(pending_pages_size); + pending_pages_size = 0; } } - if (packet.pages_size() > 0) { - Stopwatch sw; ++packet_count; - sync_writer->Write(packet); - send_page_ns += sw.elapsedFromLastTime(); + sync_write(packet); + mem_tracker_wrapper.free(pending_pages_size); + } + return std::make_tuple(read_page_ids.size(), total_pages_size, packet_count); +} + +void WNFetchPagesStreamWriter::syncWrite() +{ + Stopwatch sw; + UInt64 mem_cf_count = 0; + UInt64 mem_cf_size = 0; + UInt64 mem_packet_count = 0; + if (enable_fetch_memtableset) + { + std::tie(mem_cf_count, mem_cf_size, mem_packet_count) = sendMemTableSet(); } + auto send_mem_ns = sw.elapsedFromLastTime(); - // TODO: Currently the memtable data is responded in the Establish stage, instead of in the FetchPages stage. - // We could improve it to respond in the FetchPages stage, so that the parallel FetchPages could start - // as soon as possible. + auto [page_count, page_size, page_packet_count] = sendPages(); + auto send_pages_ns = sw.elapsedFromLastTime(); LOG_DEBUG( - log, - "Send FetchPagesStream, pages={} pages_size={} blocks={} packets={} read_page_ms={} send_page_ms={}", - read_page_ids.size(), - total_pages_data_size, - packet.chunks_size(), - packet_count, - read_page_ns / 1000000, - send_page_ns / 1000000); + seg_task->read_snapshot->log, + "enable_fetch_memtableset={} mem_cf_count={} mem_cf_size={} mem_packet_count={} send_mem_ms={} page_count={} " + "page_size={} page_packet_count={} send_pages_ms={}", + enable_fetch_memtableset, + mem_cf_count, + mem_cf_size, + mem_packet_count, + send_mem_ns / 1000000, + page_count, + page_size, + page_packet_count, + send_pages_ns / 1000000); } diff --git a/dbms/src/Flash/Disaggregated/WNFetchPagesStreamWriter.h b/dbms/src/Flash/Disaggregated/WNFetchPagesStreamWriter.h index 5383c585288..8871a475e80 100644 --- a/dbms/src/Flash/Disaggregated/WNFetchPagesStreamWriter.h +++ b/dbms/src/Flash/Disaggregated/WNFetchPagesStreamWriter.h @@ -44,39 +44,33 @@ using WNFetchPagesStreamWriterPtr = std::unique_ptr; class WNFetchPagesStreamWriter { public: - static WNFetchPagesStreamWriterPtr build( - const DM::Remote::SegmentPagesFetchTask & task, - const PageIdU64s & read_page_ids, - UInt64 packet_limit_size); - - void pipeTo(SyncPagePacketWriter * sync_writer); - -private: WNFetchPagesStreamWriter( + std::function && sync_write_, DM::SegmentReadTaskPtr seg_task_, - DM::ColumnDefinesPtr column_defines_, - PageIdU64s read_page_ids, - UInt64 packet_limit_size_) - : seg_task(std::move(seg_task_)) - , column_defines(column_defines_) - , read_page_ids(std::move(read_page_ids)) - , packet_limit_size(packet_limit_size_) - , log(Logger::get()) + PageIdU64s read_page_ids_, + const Settings & settings_) + : sync_write(std::move(sync_write_)) + , seg_task(std::move(seg_task_)) + , read_page_ids(std::move(read_page_ids_)) + , packet_limit_size(settings_.dt_fetch_pages_packet_limit_size) + , enable_fetch_memtableset(settings_.dt_enable_fetch_memtableset) + , mem_tracker_wrapper(fetch_pages_mem_tracker.get()) {} - /// Returns the next packet that could write to the response sink. - disaggregated::PagesPacket nextPacket(); + void syncWrite(); - std::pair getPersistedRemotePage(UInt64 page_id); +private: + [[nodiscard]] std::tuple getPersistedRemotePage(UInt64 page_id); + [[nodiscard]] std::tuple sendMemTableSet(); + [[nodiscard]] std::tuple sendPages(); private: - const DM::DisaggTaskId task_id; + std::function sync_write; DM::SegmentReadTaskPtr seg_task; - DM::ColumnDefinesPtr column_defines; PageIdU64s read_page_ids; UInt64 packet_limit_size; - - LoggerPtr log; + bool enable_fetch_memtableset; + MemTrackerWrapper mem_tracker_wrapper; }; } // namespace DB diff --git a/dbms/src/Flash/FlashService.cpp b/dbms/src/Flash/FlashService.cpp index 23ddd334c33..21f5f4876f3 100644 --- a/dbms/src/Flash/FlashService.cpp +++ b/dbms/src/Flash/FlashService.cpp @@ -1109,12 +1109,12 @@ grpc::Status FlashService::FetchDisaggPages( for (auto page_id : request->page_ids()) read_ids.emplace_back(page_id); - auto stream_writer = WNFetchPagesStreamWriter::build( - task, + auto stream_writer = std::make_unique( + [sync_writer](const disaggregated::PagesPacket & packet) { sync_writer->Write(packet); }, + task.seg_task, read_ids, - context->getSettingsRef().dt_fetch_pages_packet_limit_size); - stream_writer->pipeTo(sync_writer); - stream_writer.reset(); + context->getSettingsRef()); + stream_writer->syncWrite(); LOG_INFO(logger, "FetchDisaggPages respond finished, task_id={}", task_id); return grpc::Status::OK; diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index 5a22ee2c407..6153cb59049 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -225,12 +225,13 @@ struct Settings M(SettingUInt64, dt_filecache_min_age_seconds, 1800, "Files of the same priority can only be evicted from files that were not accessed within `dt_filecache_min_age_seconds` seconds.") \ M(SettingUInt64, dt_small_file_size_threshold, 128 * 1024, "for dmfile, when the file size less than dt_small_file_size_threshold, it will be merged. If dt_small_file_size_threshold = 0, dmfile will just do as v2") \ M(SettingUInt64, dt_merged_file_max_size, 16 * 1024 * 1024, "Small files are merged into one or more files not larger than dt_merged_file_max_size") \ - M(SettingUInt64, dt_fetch_pages_packet_limit_size, 0, "Response packet size limit of FetchDisaggPages, 0 means one page per packet") \ + M(SettingUInt64, dt_fetch_pages_packet_limit_size, 512 * 1024, "Response packet bytes limit of FetchDisaggPages, 0 means one page per packet") \ M(SettingUInt64, dt_write_page_cache_limit_size, 2 * 1024 * 1024, "Limit size per write batch when compute node writing to PageStorage cache") \ M(SettingDouble, io_thread_count_scale, 5.0, "Number of thread of IOThreadPool = number of logical cpu cores * io_thread_count_scale. Only has meaning at server startup.") \ M(SettingDouble, dt_fetch_page_concurrency_scale, 4.0, "Concurrency of fetching pages of one query equals to num_streams * dt_fetch_page_concurrency_scale.") \ M(SettingDouble, dt_prepare_stream_concurrency_scale, 2.0, "Concurrency of preparing streams of one query equals to num_streams * dt_prepare_stream_concurrency_scale.") \ M(SettingBool, dt_enable_delta_index_error_fallback, true, "Whether fallback to an empty delta index if a delta index error is detected") \ + M(SettingBool, dt_enable_fetch_memtableset, true, "Whether fetching delta cache in FetchDisaggPages") \ M(SettingUInt64, init_thread_count_scale, 100, "Number of thread = number of logical cpu cores * init_thread_count_scale. It just works for thread pool for initStores and loadMetadata") \ M(SettingDouble, cpu_thread_count_scale, 1.0, "Number of thread of computation-intensive thread pool = number of logical cpu cores * cpu_thread_count_scale. Only takes effects at server startup.") \ \ diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.h index 81dd9a3365f..ec427033578 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.h @@ -57,6 +57,11 @@ class ColumnFileInMemory : public ColumnFile bytes = cache->block.bytes(); } + // For deserializing a ColumnFileInMemory object without schema and data in deserializeCFInMemory. + explicit ColumnFileInMemory(UInt64 rows_) + : rows(rows_) + {} + Type getType() const override { return Type::INMEMORY_FILE; } size_t getRows() const override { return rows; } diff --git a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h index 70c720ba2cf..357b9c1f1bd 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h +++ b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h @@ -403,6 +403,8 @@ class DeltaValueSnapshot size_t getDeltaIndexEpoch() const { return delta_index_epoch; } bool isForUpdate() const { return is_update; } + + void setMemTableSetSnapshot(const ColumnFileSetSnapshotPtr & mem_table_snap_) { mem_table_snap = mem_table_snap_; } }; class DeltaValueReader diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 269d1fbdbab..1295be1e074 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -101,6 +101,7 @@ extern const char force_slow_page_storage_snapshot_release[]; extern const char exception_before_drop_segment[]; extern const char exception_after_drop_segment[]; extern const char proactive_flush_force_set_type[]; +extern const char disable_flush_cache[]; } // namespace FailPoints namespace DM @@ -733,6 +734,7 @@ bool DeltaMergeStore::flushCache(const Context & context, const RowKeyRange & ra bool DeltaMergeStore::flushCache(const DMContextPtr & dm_context, const RowKeyRange & range, bool try_until_succeed) { + fiu_do_on(FailPoints::disable_flush_cache, { return false; }); size_t sleep_ms = 5; RowKeyRange cur_range = range; diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/MergedTask.h b/dbms/src/Storages/DeltaMerge/ReadThread/MergedTask.h index 1b9396db18a..5c3b63a7593 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/MergedTask.h +++ b/dbms/src/Storages/DeltaMerge/ReadThread/MergedTask.h @@ -93,6 +93,25 @@ class MergedTask } void setException(const DB::Exception & e); + const LoggerPtr getCurrentLogger() const + { + // `std::cmp_*` is safety to compare negative signed integers and unsigned integers. + if (likely( + std::cmp_less_equal(0, cur_idx) && std::cmp_less(cur_idx, units.size()) + && units[cur_idx].task != nullptr)) + { + return units[cur_idx].task->read_snapshot->log; + } + else if (!units.empty() && units.front().task != nullptr) + { + return units.front().task->read_snapshot->log; + } + else + { + return Logger::get(); + } + } + private: void initOnce(); int readOneBlock(); diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReader.cpp b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReader.cpp index d736d35d349..a1e714507a0 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReader.cpp +++ b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReader.cpp @@ -116,7 +116,11 @@ class SegmentReader } catch (DB::Exception & e) { - LOG_ERROR(log, "ErrMsg: {} StackTrace {}", e.message(), e.getStackTrace().toString()); + LOG_ERROR( + merged_task != nullptr ? merged_task->getCurrentLogger() : log, + "ErrMsg: {} StackTrace {}", + e.message(), + e.getStackTrace().toString()); if (merged_task != nullptr) { merged_task->setException(e); diff --git a/dbms/src/Storages/DeltaMerge/Remote/Serializer.cpp b/dbms/src/Storages/DeltaMerge/Remote/Serializer.cpp index 4321f826952..2066cd59a9e 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/Serializer.cpp +++ b/dbms/src/Storages/DeltaMerge/Remote/Serializer.cpp @@ -40,13 +40,18 @@ namespace CurrentMetrics extern const Metric DT_SnapshotOfDisaggReadNodeRead; } +namespace DB::ErrorCodes +{ +extern const int LOGICAL_ERROR; +} // namespace DB::ErrorCodes namespace DB::DM::Remote { RemotePb::RemotePhysicalTable Serializer::serializePhysicalTable( const DisaggPhysicalTableReadSnapshotPtr & snap, const DisaggTaskId & task_id, - MemTrackerWrapper & mem_tracker_wrapper) + MemTrackerWrapper & mem_tracker_wrapper, + bool need_mem_data) { std::shared_lock read_lock(snap->mtx); RemotePb::RemotePhysicalTable remote_table; @@ -61,7 +66,8 @@ RemotePb::RemotePhysicalTable Serializer::serializePhysicalTable( seg_task->segment->segmentEpoch(), seg_task->segment->getRowKeyRange(), /*read_ranges*/ seg_task->ranges, - mem_tracker_wrapper); + mem_tracker_wrapper, + need_mem_data); remote_table.mutable_segments()->Add(std::move(remote_seg)); } return remote_table; @@ -73,7 +79,8 @@ RemotePb::RemoteSegment Serializer::serializeSegment( UInt64 segment_epoch, const RowKeyRange & segment_range, const RowKeyRanges & read_ranges, - MemTrackerWrapper & mem_tracker_wrapper) + MemTrackerWrapper & mem_tracker_wrapper, + bool need_mem_data) { RemotePb::RemoteSegment remote; remote.set_segment_id(segment_id); @@ -99,9 +106,9 @@ RemotePb::RemoteSegment Serializer::serializeSegment( checkpoint_info->set_data_file_id(dt_file->path()); // It should be a key to remote path } remote.mutable_column_files_memtable()->CopyFrom( - serializeColumnFileSet(snap->delta->getMemTableSetSnapshot(), mem_tracker_wrapper)); + serializeColumnFileSet(snap->delta->getMemTableSetSnapshot(), mem_tracker_wrapper, need_mem_data)); remote.mutable_column_files_persisted()->CopyFrom( - serializeColumnFileSet(snap->delta->getPersistedFileSetSnapshot(), mem_tracker_wrapper)); + serializeColumnFileSet(snap->delta->getPersistedFileSetSnapshot(), mem_tracker_wrapper, true)); // serialize the read ranges to read node for (const auto & read_range : read_ranges) @@ -179,37 +186,43 @@ SegmentSnapshotPtr Serializer::deserializeSegment( RepeatedPtrField Serializer::serializeColumnFileSet( const ColumnFileSetSnapshotPtr & snap, - MemTrackerWrapper & mem_tracker_wrapper) + MemTrackerWrapper & mem_tracker_wrapper, + bool need_mem_data) { RepeatedPtrField ret; ret.Reserve(snap->column_files.size()); for (const auto & file : snap->column_files) { - if (auto * cf_in_mem = file->tryToInMemoryFile(); cf_in_mem) - { - ret.Add(serializeCFInMemory(*cf_in_mem)); - } - else if (auto * cf_tiny = file->tryToTinyFile(); cf_tiny) - { - ret.Add(serializeCFTiny(*cf_tiny, snap->getDataProvider())); - } - else if (auto * cf_delete_range = file->tryToDeleteRange(); cf_delete_range) - { - ret.Add(serializeCFDeleteRange(*cf_delete_range)); - } - else if (auto * cf_big = file->tryToBigFile(); cf_big) - { - ret.Add(serializeCFBig(*cf_big)); - } - else - { - RUNTIME_CHECK_MSG(false, "Unknown ColumnFile, type={}", magic_enum::enum_name(file->getType())); - } + ret.Add(serializeCF(file, snap->getDataProvider(), need_mem_data)); mem_tracker_wrapper.alloc(ret.rbegin()->SpaceUsedLong()); } return ret; } +RemotePb::ColumnFileRemote Serializer::serializeCF( + const ColumnFilePtr & cf, + const IColumnFileDataProviderPtr & data_provider, + bool need_mem_data) +{ + if (auto * cf_in_mem = cf->tryToInMemoryFile(); cf_in_mem) + { + return serializeCFInMemory(*cf_in_mem, need_mem_data); + } + else if (auto * cf_tiny = cf->tryToTinyFile(); cf_tiny) + { + return serializeCFTiny(*cf_tiny, data_provider); + } + else if (auto * cf_delete_range = cf->tryToDeleteRange(); cf_delete_range) + { + return serializeCFDeleteRange(*cf_delete_range); + } + else if (auto * cf_big = cf->tryToBigFile(); cf_big) + { + return serializeCFBig(*cf_big); + } + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown ColumnFile, type={}", static_cast(cf->getType())); +} + ColumnFileSetSnapshotPtr Serializer::deserializeColumnFileSet( const RepeatedPtrField & proto, const Remote::IDataStorePtr & data_store, @@ -253,10 +266,17 @@ ColumnFileSetSnapshotPtr Serializer::deserializeColumnFileSet( return ret; } -RemotePb::ColumnFileRemote Serializer::serializeCFInMemory(const ColumnFileInMemory & cf_in_mem) +RemotePb::ColumnFileRemote Serializer::serializeCFInMemory(const ColumnFileInMemory & cf_in_mem, bool need_mem_data) { RemotePb::ColumnFileRemote ret; auto * remote_in_memory = ret.mutable_in_memory(); + if (!need_mem_data) + { + std::scoped_lock lock(cf_in_mem.cache->mutex); + remote_in_memory->set_rows(cf_in_mem.cache->block.rows()); + return ret; + } + { auto wb = WriteBufferFromString(*remote_in_memory->mutable_schema()); serializeSchema(wb, cf_in_mem.getSchema()->getSchema()); @@ -289,6 +309,11 @@ ColumnFileInMemoryPtr Serializer::deserializeCFInMemory(const RemotePb::ColumnFi { LOG_DEBUG(Logger::get(), "Rebuild local ColumnFileInMemory from remote, rows={}", proto.rows()); + if (proto.block_columns().empty()) + { + return std::make_shared(proto.rows()); + } + BlockPtr block_schema; { auto read_buf = ReadBufferFromString(proto.schema()); diff --git a/dbms/src/Storages/DeltaMerge/Remote/Serializer.h b/dbms/src/Storages/DeltaMerge/Remote/Serializer.h index de24ccc226c..dc22109dbc8 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/Serializer.h +++ b/dbms/src/Storages/DeltaMerge/Remote/Serializer.h @@ -40,6 +40,8 @@ struct SegmentSnapshot; using SegmentSnapshotPtr = std::shared_ptr; class ColumnFileSetSnapshot; using ColumnFileSetSnapshotPtr = std::shared_ptr; +class ColumnFile; +using ColumnFilePtr = std::shared_ptr; class ColumnFileTiny; using ColumnFileTinyPtr = std::shared_ptr; class ColumnFileInMemory; @@ -60,7 +62,8 @@ struct Serializer static RemotePb::RemotePhysicalTable serializePhysicalTable( const DisaggPhysicalTableReadSnapshotPtr & snap, const DisaggTaskId & task_id, - MemTrackerWrapper & mem_tracker_wrapper); + MemTrackerWrapper & mem_tracker_wrapper, + bool need_mem_data); static SegmentSnapshotPtr deserializeSegment( const DMContext & dm_context, @@ -69,6 +72,18 @@ struct Serializer TableID table_id, const RemotePb::RemoteSegment & proto); + /// Note: This function always build a snapshot over nop data provider. In order to read from this snapshot, + /// you must explicitly assign a proper data provider. + static ColumnFileSetSnapshotPtr deserializeColumnFileSet( + const google::protobuf::RepeatedPtrField & proto, + const Remote::IDataStorePtr & data_store, + const RowKeyRange & segment_range); + + static RemotePb::ColumnFileRemote serializeCF( + const ColumnFilePtr & cf, + const IColumnFileDataProviderPtr & data_provider, + bool need_mem_data); + private: static RemotePb::RemoteSegment serializeSegment( const SegmentSnapshotPtr & snap, @@ -76,19 +91,15 @@ struct Serializer UInt64 segment_epoch, const RowKeyRange & segment_range, const RowKeyRanges & read_ranges, - MemTrackerWrapper & mem_tracker_wrapper); + MemTrackerWrapper & mem_tracker_wrapper, + bool need_mem_data); static google::protobuf::RepeatedPtrField serializeColumnFileSet( const ColumnFileSetSnapshotPtr & snap, - MemTrackerWrapper & mem_tracker_wrapper); - /// Note: This function always build a snapshot over nop data provider. In order to read from this snapshot, - /// you must explicitly assign a proper data provider. - static ColumnFileSetSnapshotPtr deserializeColumnFileSet( - const google::protobuf::RepeatedPtrField & proto, - const Remote::IDataStorePtr & data_store, - const RowKeyRange & segment_range); + MemTrackerWrapper & mem_tracker_wrapper, + bool need_mem_data); - static RemotePb::ColumnFileRemote serializeCFInMemory(const ColumnFileInMemory & cf_in_mem); + static RemotePb::ColumnFileRemote serializeCFInMemory(const ColumnFileInMemory & cf_in_mem, bool need_mem_data); static ColumnFileInMemoryPtr deserializeCFInMemory(const RemotePb::ColumnFileInMemory & proto); static RemotePb::ColumnFileRemote serializeCFTiny( diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp b/dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp index e72725cec52..68a0ec7d832 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp @@ -27,6 +27,7 @@ #include using namespace std::chrono_literals; +using namespace DB::DM::Remote; namespace CurrentMetrics { @@ -100,8 +101,7 @@ SegmentReadTask::SegmentReadTask( nullptr, nullptr); - read_snapshot - = Remote::Serializer::deserializeSegment(*dm_context, store_id, keyspace_id, physical_table_id, proto); + read_snapshot = Serializer::deserializeSegment(*dm_context, store_id, keyspace_id, physical_table_id, proto); ranges.reserve(proto.read_key_ranges_size()); for (const auto & read_key_range : proto.read_key_ranges()) @@ -324,10 +324,16 @@ void SegmentReadTask::doInitInputStream( void SegmentReadTask::fetchPages() { - if (!extra_remote_info.has_value() || extra_remote_info->remote_page_ids.empty()) + // Not remote segment. + if (!extra_remote_info.has_value()) { return; } + if (extra_remote_info->remote_page_ids.empty() && !needFetchMemTableSet()) + { + LOG_DEBUG(read_snapshot->log, "Neither ColumnFileTiny or ColumnFileInMemory need to be fetched from WN."); + return; + } MemoryTrackerSetter setter(true, fetch_pages_mem_tracker.get()); Stopwatch watch_work{CLOCK_MONOTONIC_COARSE}; @@ -362,7 +368,6 @@ void SegmentReadTask::fetchPages() { doFetchPages(req); initColumnFileDataProvider(occupy_result.pages_guard); - // We finished fetch all pages for this seg task, just return it for downstream // workers. If we have met any errors, page guard will not be persisted. return; @@ -375,7 +380,7 @@ void SegmentReadTask::fetchPages() "Meet RPC client exception when fetching pages: {}, will be retried. seg_task={}", e.displayText(), *this); - std::this_thread::sleep_for(1s); + std::this_thread::sleep_for(1s); // FIXME: yield instead of sleep. } catch (...) { @@ -452,7 +457,9 @@ struct WritePageTask { explicit WritePageTask(Remote::RNLocalPageCache * page_cache_) : page_cache(page_cache_) - {} + { + RUNTIME_CHECK(page_cache != nullptr); + } Remote::RNLocalPageCache * page_cache; UniversalWriteBatch wb; std::forward_list remote_pages; // Hold the data of wb. @@ -460,36 +467,103 @@ struct WritePageTask }; using WritePageTaskPtr = std::unique_ptr; +void SegmentReadTask::checkMemTableSet(const ColumnFileSetSnapshotPtr & mem_table_snap) const +{ + const auto & old_mem_table_snap = read_snapshot->delta->getMemTableSetSnapshot(); + + RUNTIME_CHECK_MSG( + mem_table_snap->getColumnFileCount() == old_mem_table_snap->getColumnFileCount(), + "log_id={}, new_cf_count={}, old_cf_count={}", + read_snapshot->log->identifier(), + mem_table_snap->getColumnFileCount(), + old_mem_table_snap->getColumnFileCount()); + + const auto & column_files = mem_table_snap->getColumnFiles(); + const auto & old_column_files = old_mem_table_snap->getColumnFiles(); + auto check_rows = [](UInt64 rows, UInt64 old_rows, bool last_cf) { + // Only the last ColumnFileInMemory is appendable. + return last_cf ? rows >= old_rows : rows == old_rows; + }; + for (size_t i = 0; i < column_files.size(); ++i) + { + const auto & cf = column_files[i]; + const auto & old_cf = old_column_files[i]; + RUNTIME_CHECK_MSG( + cf->getType() == old_cf->getType() + && check_rows(cf->getRows(), old_cf->getRows(), i == column_files.size() - 1), + "log_id={}, new_type={}, old_type={}, new_rows={}, old_rows={}, cf_count={}, cf_index={}", + read_snapshot->log->identifier(), + magic_enum::enum_name(cf->getType()), + magic_enum::enum_name(old_cf->getType()), + cf->getRows(), + old_cf->getRows(), + column_files.size(), + i); + } +} + +bool SegmentReadTask::needFetchMemTableSet() const +{ + // Check if any object of ColumnFileInMemory does not contain data. + for (const auto & cf : read_snapshot->delta->getMemTableSetSnapshot()->getColumnFiles()) + { + if (auto * cf_in_mem = cf->tryToInMemoryFile(); cf_in_mem) + { + if (cf_in_mem->getCache() == nullptr) + { + return true; + } + } + } + return false; +} + +static void checkPageID( + UInt64 page_id, + std::vector & received_page_ids, + std::unordered_set & remaining_pages_to_fetch) +{ + RUNTIME_CHECK(remaining_pages_to_fetch.contains(page_id), remaining_pages_to_fetch, page_id); + + received_page_ids.emplace_back(page_id); + remaining_pages_to_fetch.erase(page_id); +} + void SegmentReadTask::doFetchPages(const disaggregated::FetchDisaggPagesRequest & request) { - // No page need to be fetched. - if (request.page_ids_size() == 0) + // No page and memtable need to be fetched. + if (request.page_ids_size() == 0 && !needFetchMemTableSet()) return; - UInt64 read_page_ns = 0; - UInt64 deserialize_page_ns = 0; - UInt64 wait_write_page_ns = 0; - - Stopwatch sw_total; const auto * cluster = dm_context->global_context.getTMTContext().getKVCluster(); pingcap::kv::RpcCall rpc( cluster->rpc_client, extra_remote_info->store_address); grpc::ClientContext client_context; - Stopwatch sw_rpc_call; auto stream_resp = rpc.call(&client_context, request); - read_page_ns += sw_rpc_call.elapsed(); + RUNTIME_CHECK(stream_resp != nullptr); SCOPE_EXIT({ // TODO: Not sure whether we really need this. Maybe RAII is already there? stream_resp->Finish(); }); - // Used to verify all pages are fetched. - std::unordered_set remaining_pages_to_fetch(request.page_ids().begin(), request.page_ids().end()); + doFetchPagesImpl( + [&stream_resp](disaggregated::PagesPacket & packet) { return stream_resp->Read(&packet); }, + std::unordered_set(request.page_ids().begin(), request.page_ids().end())); +} +void SegmentReadTask::doFetchPagesImpl( + std::function && read_packet, + std::unordered_set remaining_pages_to_fetch) +{ + UInt64 read_page_ns = 0; + UInt64 deserialize_page_ns = 0; + UInt64 wait_write_page_ns = 0; + + Stopwatch sw_total; UInt64 packet_count = 0; UInt64 write_page_task_count = 0; - const UInt64 page_count = request.page_ids_size(); + const UInt64 page_count = remaining_pages_to_fetch.size(); auto schedule_write_page_task = [&write_page_task_count, &wait_write_page_ns](WritePageTaskPtr && write_page_task) { write_page_task_count += 1; @@ -505,23 +579,32 @@ void SegmentReadTask::doFetchPages(const disaggregated::FetchDisaggPagesRequest WritePageTaskPtr write_page_task; std::vector> write_page_results; + google::protobuf::RepeatedPtrField memtableset_cfs; + // Keep reading packets. while (true) { Stopwatch sw_read_packet; - auto packet = std::make_shared(); - if (!stream_resp->Read(packet.get())) + disaggregated::PagesPacket packet; + if (!read_packet(packet)) break; - if (packet->has_error()) - throw Exception(ErrorCodes::FETCH_PAGES_ERROR, "{} (from {})", packet->error().msg(), *this); + if (packet.has_error()) + throw Exception(ErrorCodes::FETCH_PAGES_ERROR, "{} (from {})", packet.error().msg(), *this); read_page_ns = sw_read_packet.elapsed(); packet_count += 1; - MemTrackerWrapper packet_mem_tracker_wrapper(packet->SpaceUsedLong(), fetch_pages_mem_tracker.get()); + MemTrackerWrapper packet_mem_tracker_wrapper(packet.SpaceUsedLong(), fetch_pages_mem_tracker.get()); + // Handle `chunks`. + for (const auto & s : packet.chunks()) + { + RUNTIME_CHECK(memtableset_cfs.Add()->ParseFromString(s), read_snapshot->log->identifier()); + } + + // Handle `pages`. std::vector received_page_ids; - received_page_ids.reserve(packet->pages_size()); - for (const auto & page : packet->pages()) + received_page_ids.reserve(packet.pages_size()); + for (const auto & page : packet.pages()) { Stopwatch sw; if (write_page_task == nullptr) @@ -536,13 +619,7 @@ void SegmentReadTask::doFetchPages(const disaggregated::FetchDisaggPagesRequest remote_page.SpaceUsedLong(), fetch_pages_mem_tracker.get()); - RUNTIME_CHECK( - remaining_pages_to_fetch.contains(remote_page.page_id()), - remaining_pages_to_fetch, - remote_page.page_id()); - - received_page_ids.emplace_back(remote_page.page_id()); - remaining_pages_to_fetch.erase(remote_page.page_id()); + checkPageID(remote_page.page_id(), received_page_ids, remaining_pages_to_fetch); // Write page into LocalPageCache. Note that the page must be occupied. auto oid = Remote::PageOID{ @@ -577,6 +654,15 @@ void SegmentReadTask::doFetchPages(const disaggregated::FetchDisaggPagesRequest write_page_results.push_back(schedule_write_page_task(std::move(write_page_task))); } + if (!memtableset_cfs.empty()) + { + const auto & data_store = dm_context->global_context.getSharedContextDisagg()->remote_data_store; + auto mem_table_snap + = Serializer::deserializeColumnFileSet(memtableset_cfs, data_store, segment->getRowKeyRange()); + checkMemTableSet(mem_table_snap); + read_snapshot->delta->setMemTableSetSnapshot(mem_table_snap); + } + Stopwatch sw_wait_write_page_finished; for (auto & f : write_page_results) { diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTask.h b/dbms/src/Storages/DeltaMerge/SegmentReadTask.h index f971a5a3fc9..1a55ae1e059 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTask.h +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTask.h @@ -128,6 +128,11 @@ struct SegmentReadTask const std::vector & pages_not_in_cache) const; void doFetchPages(const disaggregated::FetchDisaggPagesRequest & request); + void doFetchPagesImpl( + std::function && read_packet, + std::unordered_set remaining_pages_to_fetch); + void checkMemTableSet(const ColumnFileSetSnapshotPtr & mem_table_snap) const; + bool needFetchMemTableSet() const; void initColumnFileDataProvider(const Remote::RNLocalPageCacheGuardPtr & pages_guard); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_read_task.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_read_task.cpp index a229ee107a2..9ebd619e780 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_read_task.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_read_task.cpp @@ -14,7 +14,9 @@ #include #include +#include #include +#include #include #include #include @@ -25,6 +27,7 @@ #include #include #include + using namespace DB::tests; namespace DB::ErrorCodes @@ -33,6 +36,12 @@ extern const int DT_DELTA_INDEX_ERROR; extern const int FETCH_PAGES_ERROR; } // namespace DB::ErrorCodes +namespace DB::FailPoints +{ +extern const char skip_check_segment_update[]; +extern const char disable_flush_cache[]; +} // namespace DB::FailPoints + namespace DB::DM::tests { @@ -127,8 +136,142 @@ try } CATCH +class DMStoreForSegmentReadTaskTest : public DeltaMergeStoreTest +{ +public: + void SetUp() override + { + DeltaMergeStoreTest::SetUp(); + initReadNodePageCacheIfUninitialized(); + } -TEST_F(DeltaMergeStoreTest, DisaggReadSnapshot) + auto getWNReadSnapshot() + { + auto scan_context = std::make_shared(); + auto snap = store->writeNodeBuildRemoteReadSnapshot( + *db_context, + db_context->getSettingsRef(), + {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, + 1, + "req_id", + {}, + scan_context); + snap->column_defines = std::make_shared(store->getTableColumns()); + return snap; + } + + [[nodiscard]] auto setMockRemoteDataStore() + { + db_context->getSharedContextDisagg()->remote_data_store + = std::make_shared(db_context->getFileProvider()); + return ext::make_scope_guard([&]() { db_context->getSharedContextDisagg()->remote_data_store = nullptr; }); + } + + [[nodiscard]] static auto disableFlushCache() + { + DB::FailPointHelper::enableFailPoint(DB::FailPoints::skip_check_segment_update); + DB::FailPointHelper::enableFailPoint(DB::FailPoints::disable_flush_cache); + return ext::make_scope_guard([]() { + DB::FailPointHelper::disableFailPoint(DB::FailPoints::skip_check_segment_update); + DB::FailPointHelper::disableFailPoint(DB::FailPoints::disable_flush_cache); + }); + } + + auto getRemoteSegment(const Remote::DisaggPhysicalTableReadSnapshotPtr & snap, bool need_mem_data, int idx) + { + MemTrackerWrapper mem_tracker_wrapper(nullptr); + auto remote_table_pb = Remote::Serializer::serializePhysicalTable( + snap, + /*task_id*/ {}, + mem_tracker_wrapper, + need_mem_data); + RUNTIME_CHECK(remote_table_pb.segments_size() > idx, remote_table_pb.segments_size(), idx); + auto remote_seg = remote_table_pb.segments(idx); + auto guard = setMockRemoteDataStore(); + auto scan_context = std::make_shared(); + return std::make_shared( + Logger::get(), + *db_context, + scan_context, + remote_seg, + DisaggTaskId{}, + /*store_id*/ 1, + /*store_address*/ "127.0.0.1", + store->keyspace_id, + store->physical_table_id); + }; + + void initReadNodePageCacheIfUninitialized() + { + if (db_context->getSharedContextDisagg()->rn_page_cache == nullptr) + { + db_context->getSharedContextDisagg()->initReadNodePageCache( + db_context->getPathPool(), + getTemporaryPath(), + 1 * 1024 * 1024 * 1024); + } + } + + std::tuple getRemoteAndLocalSegmentReadTasks(bool need_mem_data, int idx) + { + auto snap = getWNReadSnapshot(); + auto remote_seg = getRemoteSegment(snap, need_mem_data, idx); + auto local_seg = snap->popTask(remote_seg->segment->segmentId()); + RUNTIME_CHECK(remote_seg->extra_remote_info.has_value()); + RUNTIME_CHECK(!local_seg->extra_remote_info.has_value()); + return std::make_tuple(remote_seg, local_seg); + } + + void fetchPages(const SegmentReadTaskPtr & remote_seg, const SegmentReadTaskPtr & local_seg) + { + auto table_column_defines = DMTestEnv::getDefaultColumns(); + WorkQueue q; + + // Send pages. + WNFetchPagesStreamWriter writer( + [&q](const disaggregated::PagesPacket & packet) { q.push(packet, nullptr); }, + local_seg, + remote_seg->extra_remote_info->remote_page_ids, + db_context->getSettingsRef()); + writer.syncWrite(); + q.finish(); + + // Receive pages. + auto ds_guard = setMockRemoteDataStore(); + auto occupied_result = remote_seg->blockingOccupySpaceForTask(); + remote_seg->doFetchPagesImpl( + [&q](disaggregated::PagesPacket & packet) { return q.pop(packet); }, + std::unordered_set( + remote_seg->extra_remote_info->remote_page_ids.begin(), + remote_seg->extra_remote_info->remote_page_ids.end())); + ASSERT_EQ(q.size(), 0); + + remote_seg->initColumnFileDataProvider(occupied_result.pages_guard); + remote_seg->initInputStream(*table_column_defines, 0, nullptr, ReadMode::Bitmap, DEFAULT_BLOCK_SIZE, false); + auto remote_stream = remote_seg->getInputStream(); + + Blocks remote_blks; + while (true) + { + auto b = remote_stream->read(); + if (b) + { + remote_blks.push_back(std::move(b)); + } + else + { + break; + } + } + + local_seg->initInputStream(*table_column_defines, 0, nullptr, ReadMode::Bitmap, DEFAULT_BLOCK_SIZE, false); + auto local_stream = local_seg->getInputStream(); + + ASSERT_INPUTSTREAM_BLOCKS(local_stream, remote_blks); + } +}; + +TEST_F(DMStoreForSegmentReadTaskTest, DisaggReadSnapshot) try { auto table_column_defines = DMTestEnv::getDefaultColumns(); @@ -186,13 +329,12 @@ try snap->column_defines = std::make_shared(store->getTableColumns()); MemTrackerWrapper mem_tracker_wrapper(nullptr); - auto remote_table_pb = Remote::Serializer::serializePhysicalTable(snap, /*task_id*/ {}, mem_tracker_wrapper); + auto remote_table_pb + = Remote::Serializer::serializePhysicalTable(snap, /*task_id*/ {}, mem_tracker_wrapper, /*need_mem_data*/ true); ASSERT_GT(remote_table_pb.segments_size(), 0); - db_context->getSharedContextDisagg()->remote_data_store - = std::make_shared(db_context->getFileProvider()); - + auto guard = setMockRemoteDataStore(); for (const auto & remote_seg : remote_table_pb.segments()) { auto seg_task = std::make_shared( @@ -373,7 +515,7 @@ try } CATCH -TEST_F(DeltaMergeStoreTest, MemTableSetWithCFTiny) +TEST_F(DMStoreForSegmentReadTaskTest, MemTableSetWithCFTiny) try { auto table_column_defines = DMTestEnv::getDefaultColumns(); @@ -400,11 +542,11 @@ try snap->column_defines = std::make_shared(store->getTableColumns()); MemTrackerWrapper mem_tracker_wrapper(nullptr); - auto remote_table_pb = Remote::Serializer::serializePhysicalTable(snap, /*task_id*/ {}, mem_tracker_wrapper); + auto remote_table_pb + = Remote::Serializer::serializePhysicalTable(snap, /*task_id*/ {}, mem_tracker_wrapper, /*need_mem_data*/ true); ASSERT_EQ(remote_table_pb.segments_size(), 1); - db_context->getSharedContextDisagg()->remote_data_store - = std::make_shared(db_context->getFileProvider()); + auto guard = setMockRemoteDataStore(); const auto & remote_seg = remote_table_pb.segments(0); auto seg_task = std::make_shared( Logger::get(), @@ -423,4 +565,295 @@ try ASSERT_EQ(seg_task->extra_remote_info->remote_page_ids.size(), 1); } CATCH + +TEST_F(DMStoreForSegmentReadTaskTest, BasicMemTableSet) +try +{ + auto fp_guard = disableFlushCache(); + + auto table_column_defines = DMTestEnv::getDefaultColumns(); + store = reload(table_column_defines); + + auto get_remote_segment = [&](bool need_mem_data) { + auto snap = getWNReadSnapshot(); + return getRemoteSegment(snap, need_mem_data, 0); + }; + + // cf delete range + { + HandleRange range(0, 128); + store->deleteRange(*db_context, db_context->getSettingsRef(), RowKeyRange::fromHandleRange(range)); + auto check = [&](bool need_mem_data) { + auto seg_task = get_remote_segment(need_mem_data); + ASSERT_NE(seg_task, nullptr); + ASSERT_FALSE(seg_task->needFetchMemTableSet()); + auto mem_snap = seg_task->read_snapshot->delta->getMemTableSetSnapshot(); + ASSERT_EQ(mem_snap->getColumnFileCount(), 1); + ASSERT_EQ(mem_snap->getColumnFiles()[0]->getType(), ColumnFile::Type::DELETE_RANGE); + }; + check(true); + check(false); + } + + // cf tiny in memtableset + { + auto block = DMTestEnv::prepareSimpleWriteBlock( + 0, + db_context->getSettingsRef().dt_segment_delta_cache_limit_rows, + false); + store->write(*db_context, db_context->getSettingsRef(), block); + auto check = [&](bool need_mem_data) { + auto seg_task = get_remote_segment(need_mem_data); + ASSERT_NE(seg_task, nullptr); + ASSERT_FALSE(seg_task->needFetchMemTableSet()); + auto mem_snap = seg_task->read_snapshot->delta->getMemTableSetSnapshot(); + ASSERT_EQ(mem_snap->getColumnFileCount(), 2); + ASSERT_EQ(mem_snap->getColumnFiles()[0]->getType(), ColumnFile::Type::DELETE_RANGE); + ASSERT_EQ(mem_snap->getColumnFiles()[1]->getType(), ColumnFile::Type::TINY_FILE); + }; + check(true); + check(false); + } + + // cf mem + { + auto block = DMTestEnv::prepareSimpleWriteBlock(0, 128, false); + store->write(*db_context, db_context->getSettingsRef(), block); + auto check = [&](bool need_mem_data) { + auto seg_task = get_remote_segment(need_mem_data); + ASSERT_NE(seg_task, nullptr); + ASSERT_NE(seg_task->needFetchMemTableSet(), need_mem_data); + auto mem_snap = seg_task->read_snapshot->delta->getMemTableSetSnapshot(); + ASSERT_EQ(mem_snap->getColumnFileCount(), 3); + ASSERT_EQ(mem_snap->getColumnFiles()[0]->getType(), ColumnFile::Type::DELETE_RANGE); + ASSERT_EQ(mem_snap->getColumnFiles()[1]->getType(), ColumnFile::Type::TINY_FILE); + ASSERT_EQ(mem_snap->getColumnFiles()[2]->getType(), ColumnFile::Type::INMEMORY_FILE); + }; + check(true); + check(false); + } + + // cf big + { + auto block = DMTestEnv::prepareSimpleWriteBlock(0, 128, false); + auto dm_context = store->newDMContext(*db_context, db_context->getSettingsRef()); + auto [range, file_ids] = genDMFile(*dm_context, block); + store->ingestFiles(dm_context, range, file_ids, false); + auto check = [&](bool need_mem_data) { + auto seg_task = get_remote_segment(need_mem_data); + ASSERT_NE(seg_task, nullptr); + ASSERT_NE(seg_task->needFetchMemTableSet(), need_mem_data); + auto mem_snap = seg_task->read_snapshot->delta->getMemTableSetSnapshot(); + ASSERT_EQ(mem_snap->getColumnFileCount(), 4); + ASSERT_EQ(mem_snap->getColumnFiles()[0]->getType(), ColumnFile::Type::DELETE_RANGE); + ASSERT_EQ(mem_snap->getColumnFiles()[1]->getType(), ColumnFile::Type::TINY_FILE); + ASSERT_EQ(mem_snap->getColumnFiles()[2]->getType(), ColumnFile::Type::INMEMORY_FILE); + ASSERT_EQ(mem_snap->getColumnFiles()[3]->getType(), ColumnFile::Type::BIG_FILE); + }; + check(true); + check(false); + } +} +CATCH + +TEST_F(DMStoreForSegmentReadTaskTest, FetchPages_NoTiny_NoInMem) +try +{ + auto fp_guard = disableFlushCache(); + auto table_column_defines = DMTestEnv::getDefaultColumns(); + store = reload(table_column_defines); + + // stable + { + auto block = DMTestEnv::prepareSimpleWriteBlock(0, 4096, false); + store->write(*db_context, db_context->getSettingsRef(), block); + store->mergeDeltaAll(*db_context); + } + + // cf delete range + { + HandleRange range(0, 128); + store->deleteRange(*db_context, db_context->getSettingsRef(), RowKeyRange::fromHandleRange(range)); + } + + // cf big + { + auto block = DMTestEnv::prepareSimpleWriteBlock(0, 128, false); + auto dm_context = store->newDMContext(*db_context, db_context->getSettingsRef()); + auto [range, file_ids] = genDMFile(*dm_context, block); + store->ingestFiles(dm_context, range, file_ids, false); + } + + // cf tiny + { + auto block = DMTestEnv::prepareSimpleWriteBlock( + 0, + db_context->getSettingsRef().dt_segment_delta_cache_limit_rows, + false); + store->write(*db_context, db_context->getSettingsRef(), block); + } + + auto check = [&](bool need_mem_data) { + auto [remote_seg, local_seg] = getRemoteAndLocalSegmentReadTasks(need_mem_data, 0); + ASSERT_FALSE(remote_seg->extra_remote_info->remote_page_ids.empty()); + ASSERT_FALSE(remote_seg->needFetchMemTableSet()); + }; + check(true); + check(false); +} +CATCH + +TEST_F(DMStoreForSegmentReadTaskTest, FetchPages_Tiny_NoInMem) +try +{ + auto fp_guard = disableFlushCache(); + auto table_column_defines = DMTestEnv::getDefaultColumns(); + store = reload(table_column_defines); + + // stable + { + auto block = DMTestEnv::prepareSimpleWriteBlock(0, 4096, false); + store->write(*db_context, db_context->getSettingsRef(), block); + store->mergeDeltaAll(*db_context); + } + + // cf delete range + { + HandleRange range(0, 128); + store->deleteRange(*db_context, db_context->getSettingsRef(), RowKeyRange::fromHandleRange(range)); + } + + // cf big + { + auto block = DMTestEnv::prepareSimpleWriteBlock(0, 128, false); + auto dm_context = store->newDMContext(*db_context, db_context->getSettingsRef()); + auto [range, file_ids] = genDMFile(*dm_context, block); + store->ingestFiles(dm_context, range, file_ids, false); + } + + // cf tiny + { + auto block = DMTestEnv::prepareSimpleWriteBlock( + 0, + db_context->getSettingsRef().dt_segment_delta_cache_limit_rows, + false); + store->write(*db_context, db_context->getSettingsRef(), block); + } + + auto [remote_seg, local_seg] = getRemoteAndLocalSegmentReadTasks(true, 0); + ASSERT_FALSE(remote_seg->needFetchMemTableSet()); + fetchPages(remote_seg, local_seg); +} +CATCH + +TEST_F(DMStoreForSegmentReadTaskTest, FetchPages_NoTiny_InMem) +try +{ + auto fp_guard = disableFlushCache(); + auto table_column_defines = DMTestEnv::getDefaultColumns(); + store = reload(table_column_defines); + + // stable + { + auto block = DMTestEnv::prepareSimpleWriteBlock(0, 4096, false); + store->write(*db_context, db_context->getSettingsRef(), block); + store->mergeDeltaAll(*db_context); + } + + // cf delete range + { + HandleRange range(0, 128); + store->deleteRange(*db_context, db_context->getSettingsRef(), RowKeyRange::fromHandleRange(range)); + } + + // cf big + { + auto block = DMTestEnv::prepareSimpleWriteBlock(0, 128, false); + auto dm_context = store->newDMContext(*db_context, db_context->getSettingsRef()); + auto [range, file_ids] = genDMFile(*dm_context, block); + store->ingestFiles(dm_context, range, file_ids, false); + } + + // cf in mem + { + auto block = DMTestEnv::prepareSimpleWriteBlock( + 0, + db_context->getSettingsRef().dt_segment_delta_cache_limit_rows / 5, + false); + store->write(*db_context, db_context->getSettingsRef(), block); + } + + { + auto [remote_seg, local_seg] = getRemoteAndLocalSegmentReadTasks(true, 0); + ASSERT_FALSE(remote_seg->needFetchMemTableSet()); + fetchPages(remote_seg, local_seg); + } + + { + auto [remote_seg, local_seg] = getRemoteAndLocalSegmentReadTasks(false, 0); + ASSERT_TRUE(remote_seg->needFetchMemTableSet()); + fetchPages(remote_seg, local_seg); + } +} +CATCH + +TEST_F(DMStoreForSegmentReadTaskTest, FetchPages_Tiny_InMem) +try +{ + auto fp_guard = disableFlushCache(); + auto table_column_defines = DMTestEnv::getDefaultColumns(); + store = reload(table_column_defines); + + // stable + { + auto block = DMTestEnv::prepareSimpleWriteBlock(0, 4096, false); + store->write(*db_context, db_context->getSettingsRef(), block); + store->mergeDeltaAll(*db_context); + } + + // cf delete range + { + HandleRange range(0, 128); + store->deleteRange(*db_context, db_context->getSettingsRef(), RowKeyRange::fromHandleRange(range)); + } + + // cf big + { + auto block = DMTestEnv::prepareSimpleWriteBlock(0, 128, false); + auto dm_context = store->newDMContext(*db_context, db_context->getSettingsRef()); + auto [range, file_ids] = genDMFile(*dm_context, block); + store->ingestFiles(dm_context, range, file_ids, false); + } + + // cf tiny + { + auto block = DMTestEnv::prepareSimpleWriteBlock( + 0, + db_context->getSettingsRef().dt_segment_delta_cache_limit_rows, + false); + store->write(*db_context, db_context->getSettingsRef(), block); + } + + // cf in mem + { + auto block = DMTestEnv::prepareSimpleWriteBlock( + 0, + db_context->getSettingsRef().dt_segment_delta_cache_limit_rows / 5, + false); + store->write(*db_context, db_context->getSettingsRef(), block); + } + + { + auto [remote_seg, local_seg] = getRemoteAndLocalSegmentReadTasks(true, 0); + ASSERT_FALSE(remote_seg->needFetchMemTableSet()); + fetchPages(remote_seg, local_seg); + } + + { + auto [remote_seg, local_seg] = getRemoteAndLocalSegmentReadTasks(false, 0); + ASSERT_TRUE(remote_seg->needFetchMemTableSet()); + fetchPages(remote_seg, local_seg); + } +} +CATCH } // namespace DB::DM::tests From 384a449d3b26ce4272f699824fb53d6646e87fde Mon Sep 17 00:00:00 2001 From: JaySon Date: Tue, 12 Dec 2023 22:46:48 +0800 Subject: [PATCH 5/7] *: Fix compile error under clang 17 (#8503) ref pingcap/tiflash#7193 --- contrib/poco | 2 +- .../DeltaMerge/tests/gtest_dm_ingest.cpp | 2 +- dbms/src/Storages/KVStore/Utils.h | 9 +++-- dbms/src/Storages/Page/V3/Blob/BlobStat.cpp | 2 +- dbms/src/Storages/Page/V3/PageDirectory.h | 2 +- release-centos7-llvm/env/prepare-sysroot.sh | 34 ++++++++++--------- 6 files changed, 28 insertions(+), 23 deletions(-) diff --git a/contrib/poco b/contrib/poco index 170872b2a15..a6fa1412ae6 160000 --- a/contrib/poco +++ b/contrib/poco @@ -1 +1 @@ -Subproject commit 170872b2a15ea042d99f8168a01aec896eef0840 +Subproject commit a6fa1412ae68503905ab6c23849c0207dcda4fce diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_ingest.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_ingest.cpp index 6c8825eaafc..dcd3b410836 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_ingest.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_ingest.cpp @@ -220,7 +220,7 @@ try auto pool = std::make_shared(4); for (const auto & op : ops) { - pool->scheduleOrThrowOnError([=, &log] { + pool->scheduleOrThrowOnError([=, this, &log] { try { LOG_INFO(log, "{} to [{}, {})", op.use_write ? "write" : "ingest", op.start_key, op.end_key); diff --git a/dbms/src/Storages/KVStore/Utils.h b/dbms/src/Storages/KVStore/Utils.h index 104e102ff87..ba79d98d3ca 100644 --- a/dbms/src/Storages/KVStore/Utils.h +++ b/dbms/src/Storages/KVStore/Utils.h @@ -50,11 +50,14 @@ class MutexLockWrap public: using Mutex = std::mutex; - std::lock_guard genLockGuard() const { return std::lock_guard(*mutex); } + std::lock_guard genLockGuard() const NO_THREAD_SAFETY_ANALYSIS { return std::lock_guard(*mutex); } - std::unique_lock tryToLock() const { return std::unique_lock(*mutex, std::try_to_lock); } + std::unique_lock tryToLock() const NO_THREAD_SAFETY_ANALYSIS + { + return std::unique_lock(*mutex, std::try_to_lock); + } - std::unique_lock genUniqueLock() const { return std::unique_lock(*mutex); } + std::unique_lock genUniqueLock() const NO_THREAD_SAFETY_ANALYSIS { return std::unique_lock(*mutex); } private: mutable AlignedStruct mutex; diff --git a/dbms/src/Storages/Page/V3/Blob/BlobStat.cpp b/dbms/src/Storages/Page/V3/Blob/BlobStat.cpp index c6ab00d8404..0e584e82fbb 100644 --- a/dbms/src/Storages/Page/V3/Blob/BlobStat.cpp +++ b/dbms/src/Storages/Page/V3/Blob/BlobStat.cpp @@ -102,7 +102,7 @@ void BlobStats::restore() } } -std::lock_guard BlobStats::lock() const +std::lock_guard BlobStats::lock() const NO_THREAD_SAFETY_ANALYSIS { return std::lock_guard(lock_stats); } diff --git a/dbms/src/Storages/Page/V3/PageDirectory.h b/dbms/src/Storages/Page/V3/PageDirectory.h index 0698aed3e59..0671b7c8ffc 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.h +++ b/dbms/src/Storages/Page/V3/PageDirectory.h @@ -273,7 +273,7 @@ class VersionedPageEntries bool isExternalPage() const { return type == EditRecordType::VAR_EXTERNAL; } - [[nodiscard]] PageLock acquireLock() const { return std::lock_guard(m); } // NOLINT + [[nodiscard]] PageLock acquireLock() const NO_THREAD_SAFETY_ANALYSIS { return std::lock_guard(m); } void createNewEntry(const PageVersion & ver, const PageEntryV3 & entry); diff --git a/release-centos7-llvm/env/prepare-sysroot.sh b/release-centos7-llvm/env/prepare-sysroot.sh index 076f018a970..e4b7b580128 100755 --- a/release-centos7-llvm/env/prepare-sysroot.sh +++ b/release-centos7-llvm/env/prepare-sysroot.sh @@ -16,24 +16,24 @@ set -ueox pipefail -CMAKE_VERSION=3.22.1 +CMAKE_VERSION="3.22.1" GO_VERSION="1.20" ARCH=$(uname -m) -GO_ARCH=$([[ "$ARCH" == "aarch64" ]] && echo "arm64" || echo "amd64") +GO_ARCH=$([[ "${ARCH}" == "aarch64" ]] && echo "arm64" || echo "amd64") LLVM_VERSION="13.0.0" CCACHE_VERSION="4.5.1" SCRIPTPATH="$( cd "$(dirname "$0")" ; pwd -P )" -SYSROOT="$SCRIPTPATH/sysroot" +SYSROOT="${SCRIPTPATH}/sysroot" OPENSSL_VERSION="1_1_1l" function install_cmake() { - wget https://github.com/Kitware/CMake/releases/download/v$CMAKE_VERSION/cmake-$CMAKE_VERSION-linux-$ARCH.sh - sh cmake-$CMAKE_VERSION-linux-$ARCH.sh --prefix="$SYSROOT" --skip-license --exclude-subdir - rm -rf cmake-$CMAKE_VERSION-linux-$ARCH.sh + wget "https://github.com/Kitware/CMake/releases/download/v${CMAKE_VERSION}/cmake-${CMAKE_VERSION}-linux-${ARCH}.sh" + sh cmake-${CMAKE_VERSION}-linux-${ARCH}.sh --prefix="${SYSROOT}" --skip-license --exclude-subdir + rm -rf cmake-${CMAKE_VERSION}-linux-${ARCH}.sh } function install_llvm() { - git clone https://github.com/llvm/llvm-project --depth=1 -b llvmorg-$LLVM_VERSION + git clone https://github.com/llvm/llvm-project --depth=1 -b "llvmorg-${LLVM_VERSION}" mkdir -p llvm-project/build cd llvm-project/build @@ -53,7 +53,7 @@ function install_llvm() { -DLLVM_ENABLE_LIBCXX=ON \ -DLLVM_ENABLE_LLD=ON \ -DLIBOMP_LIBFLAGS="-lm" \ - -DCMAKE_INSTALL_PREFIX="$SYSROOT" \ + -DCMAKE_INSTALL_PREFIX="${SYSROOT}" \ -DCMAKE_INSTALL_RPATH="\$ORIGIN/../lib/;\$ORIGIN/../lib/$(uname -m)-unknown-linux-gnu/" \ ../llvm @@ -64,7 +64,7 @@ function install_llvm() { } function install_openssl() { - wget https://github.com/openssl/openssl/archive/refs/tags/OpenSSL_${OPENSSL_VERSION}.tar.gz + wget "https://github.com/openssl/openssl/archive/refs/tags/OpenSSL_${OPENSSL_VERSION}.tar.gz" tar xvf OpenSSL_${OPENSSL_VERSION}.tar.gz cd openssl-OpenSSL_${OPENSSL_VERSION} @@ -72,8 +72,8 @@ function install_openssl() { -fPIC \ no-shared \ no-afalgeng \ - --prefix="$SYSROOT" \ - --openssldir="$SYSROOT" \ + --prefix="${SYSROOT}" \ + --openssldir="${SYSROOT}" \ -static NPROC=${NPROC:-$(nproc || grep -c ^processor /proc/cpuinfo)} @@ -86,8 +86,10 @@ function install_openssl() { } function install_go() { - wget https://dl.google.com/go/go${GO_VERSION}.linux-${GO_ARCH}.tar.gz - tar -C "$SYSROOT" -xzvf go${GO_VERSION}.linux-${GO_ARCH}.tar.gz + wget "https://dl.google.com/go/go${GO_VERSION}.linux-${GO_ARCH}.tar.gz" + tar -C "${SYSROOT}" -xzvf go${GO_VERSION}.linux-${GO_ARCH}.tar.gz + mv "${SYSROOT}/go" "${SYSROOT}/go${GO_VERSION}" && \ + pushd "${SYSROOT}" && ln -sv "go${GO_VERSION}" "go" && popd rm -rf go${GO_VERSION}.linux-${GO_ARCH}.tar.gz } @@ -101,14 +103,14 @@ function install_ccache() { -DHIREDIS_FROM_INTERNET=ON \ -DENABLE_TESTING=OFF \ -DCMAKE_INSTALL_RPATH="\$ORIGIN/../lib/;\$ORIGIN/../lib/$(uname -m)-unknown-linux-gnu/" \ - -DCMAKE_INSTALL_PREFIX="$SYSROOT" \ + -DCMAKE_INSTALL_PREFIX="${SYSROOT}" \ -GNinja ninja && ninja install cd ../.. rm -rf "ccache-$CCACHE_VERSION" } -mkdir -p $SYSROOT +mkdir -p ${SYSROOT} install_cmake install_llvm @@ -118,5 +120,5 @@ install_ccache # some extra steps if [[ -e /usr/lib64/libtinfo.so.5 ]]; then - cp /usr/lib64/libtinfo.so.5 "$SYSROOT/lib" + cp /usr/lib64/libtinfo.so.5 "${SYSROOT}/lib" fi From a252e882f7c63224b3ed8088ef1e0bba158deb92 Mon Sep 17 00:00:00 2001 From: Liqi Geng Date: Wed, 13 Dec 2023 12:25:48 +0800 Subject: [PATCH 6/7] Fix compile error on mac (#8509) close pingcap/tiflash#8508 --- dbms/src/Functions/FunctionsMiscellaneous.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbms/src/Functions/FunctionsMiscellaneous.cpp b/dbms/src/Functions/FunctionsMiscellaneous.cpp index 4322bdf99a1..89135b1fd37 100644 --- a/dbms/src/Functions/FunctionsMiscellaneous.cpp +++ b/dbms/src/Functions/FunctionsMiscellaneous.cpp @@ -761,7 +761,7 @@ class FunctionIgnore : public IFunction void executeImpl(Block & block, const ColumnNumbers & /*arguments*/, size_t result) const override { - block.getByPosition(result).column = DataTypeUInt8().createColumnConst(block.rows(), 0UL); + block.getByPosition(result).column = DataTypeUInt8().createColumnConst(block.rows(), static_cast(0)); } }; @@ -797,7 +797,7 @@ class FunctionIndexHint : public IFunction void executeImpl(Block & block, const ColumnNumbers & /*arguments*/, size_t result) const override { - block.getByPosition(result).column = DataTypeUInt8().createColumnConst(block.rows(), 1UL); + block.getByPosition(result).column = DataTypeUInt8().createColumnConst(block.rows(), static_cast(1)); } }; From 5417d84de73ba05288de672b58ab60c85c89bd25 Mon Sep 17 00:00:00 2001 From: yibin Date: Wed, 13 Dec 2023 13:13:48 +0800 Subject: [PATCH 7/7] Add protobuf::internal::AssignDescriptors into tsan.suppression to ignore false positive warning (#8499) ref pingcap/tiflash#8285 --- tests/sanitize/tsan.suppression | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/sanitize/tsan.suppression b/tests/sanitize/tsan.suppression index bcdf61be0d4..29ac8020569 100644 --- a/tests/sanitize/tsan.suppression +++ b/tests/sanitize/tsan.suppression @@ -1,5 +1,6 @@ race:dbms/src/Common/TiFlashMetrics.h race:DB::Context::setCancelTest race:DB::getCurrentExceptionMessage +race:google::protobuf::internal::AssignDescriptors race:fiu_fail race:dbms/src/DataStreams/BlockStreamProfileInfo.h