From efa5e6d612d22c0d74eeb4363d7b6798add359b1 Mon Sep 17 00:00:00 2001 From: lihangyu <15605149486@163.com> Date: Fri, 19 Jul 2024 16:42:30 +0800 Subject: [PATCH] [Cloud](Variant) support DESC merge variants's schema for cloud mode (#37955) --- be/src/cloud/cloud_tablet.cpp | 16 +++++++++++++++ be/src/cloud/cloud_tablet.h | 3 +++ be/src/olap/base_tablet.h | 4 ++++ be/src/service/internal_service.cpp | 16 +++++++-------- be/src/service/internal_service.h | 10 +++++----- regression-test/suites/variant_p0/desc.groovy | 20 ++++++++++++++++--- 6 files changed, 53 insertions(+), 16 deletions(-) diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index 50c8765a18df0b..17ec1fe22b0d85 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -28,6 +28,7 @@ #include #include #include +#include #include "cloud/cloud_meta_mgr.h" #include "cloud/cloud_storage_engine.h" @@ -43,8 +44,10 @@ #include "olap/rowset/rowset_writer.h" #include "olap/rowset/segment_v2/inverted_index_desc.h" #include "olap/storage_policy.h" +#include "olap/tablet_schema.h" #include "olap/txn_manager.h" #include "util/debug_points.h" +#include "vec/common/schema_util.h" namespace doris { using namespace ErrorCode; @@ -132,6 +135,19 @@ Status CloudTablet::sync_rowsets(int64_t query_version, bool warmup_delta_data) return st; } +TabletSchemaSPtr CloudTablet::merged_tablet_schema() const { + std::shared_lock rdlock(_meta_lock); + TabletSchemaSPtr target_schema; + std::vector schemas; + for (const auto& [_, rowset] : _rs_version_map) { + schemas.push_back(rowset->tablet_schema()); + } + // get the max version schema and merge all schema + static_cast( + vectorized::schema_util::get_least_common_schema(schemas, nullptr, target_schema)); + return target_schema; +} + // Sync tablet meta and all rowset meta if not running. // This could happen when BE didn't finish schema change job and another BE committed this schema change job. // It should be a quite rare situation. diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h index ca05759cdbf83e..2e6938444d17fc 100644 --- a/be/src/cloud/cloud_tablet.h +++ b/be/src/cloud/cloud_tablet.h @@ -206,6 +206,9 @@ class CloudTablet final : public BaseTablet { int64_t last_cumu_compaction_success_time_ms = 0; int64_t last_cumu_no_suitable_version_ms = 0; + // Return merged extended schema + TabletSchemaSPtr merged_tablet_schema() const override; + private: // FIXME(plat1ko): No need to record base size if rowsets are ordered by version void update_base_size(const Rowset& rs); diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h index f625ecf4a0a98e..cefb31ccd11e0c 100644 --- a/be/src/olap/base_tablet.h +++ b/be/src/olap/base_tablet.h @@ -28,6 +28,7 @@ #include "olap/rowset/segment_v2/segment.h" #include "olap/tablet_fwd.h" #include "olap/tablet_meta.h" +#include "olap/tablet_schema.h" #include "olap/version_graph.h" #include "util/metrics.h" @@ -252,6 +253,9 @@ class BaseTablet { const std::vector& candidate_rowsets, int limit); + // Return the merged schema of all rowsets + virtual TabletSchemaSPtr merged_tablet_schema() const { return _max_version_schema; } + protected: // Find the missed versions until the spec_version. // diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 04abf2b09ce01d..9611e1a93cb393 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -1044,11 +1044,11 @@ struct AsyncRPCContext { brpc::CallId cid; }; -void PInternalServiceImpl::fetch_remote_tablet_schema(google::protobuf::RpcController* controller, - const PFetchRemoteSchemaRequest* request, - PFetchRemoteSchemaResponse* response, - google::protobuf::Closure* done) { - bool ret = _heavy_work_pool.try_offer([this, request, response, done]() { +void PInternalService::fetch_remote_tablet_schema(google::protobuf::RpcController* controller, + const PFetchRemoteSchemaRequest* request, + PFetchRemoteSchemaResponse* response, + google::protobuf::Closure* done) { + bool ret = _heavy_work_pool.try_offer([request, response, done]() { brpc::ClosureGuard closure_guard(done); Status st = Status::OK(); if (request->is_coordinator()) { @@ -1120,13 +1120,13 @@ void PInternalServiceImpl::fetch_remote_tablet_schema(google::protobuf::RpcContr if (!target_tablets.empty()) { std::vector tablet_schemas; for (int64_t tablet_id : target_tablets) { - TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(tablet_id, false); - if (tablet == nullptr) { + auto res = ExecEnv::get_tablet(tablet_id); + if (!res.has_value()) { // just ignore LOG(WARNING) << "tablet does not exist, tablet id is " << tablet_id; continue; } - tablet_schemas.push_back(tablet->tablet_schema()); + tablet_schemas.push_back(res.value()->merged_tablet_schema()); } if (!tablet_schemas.empty()) { // merge all diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index 9cad429107afd7..7f3a2ca6f30a56 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -225,6 +225,11 @@ class PInternalService : public PBackendService { PJdbcTestConnectionResult* result, google::protobuf::Closure* done) override; + void fetch_remote_tablet_schema(google::protobuf::RpcController* controller, + const PFetchRemoteSchemaRequest* request, + PFetchRemoteSchemaResponse* response, + google::protobuf::Closure* done) override; + private: void _exec_plan_fragment_in_pthread(google::protobuf::RpcController* controller, const PExecPlanFragmentRequest* request, @@ -287,11 +292,6 @@ class PInternalServiceImpl final : public PInternalService { PGetTabletVersionsResponse* response, google::protobuf::Closure* done) override; - void fetch_remote_tablet_schema(google::protobuf::RpcController* controller, - const PFetchRemoteSchemaRequest* request, - PFetchRemoteSchemaResponse* response, - google::protobuf::Closure* done) override; - private: void _response_pull_slave_rowset(const std::string& remote_host, int64_t brpc_port, int64_t txn_id, int64_t tablet_id, int64_t node_id, diff --git a/regression-test/suites/variant_p0/desc.groovy b/regression-test/suites/variant_p0/desc.groovy index dfb5b40794e7f0..5efcda3a04308e 100644 --- a/regression-test/suites/variant_p0/desc.groovy +++ b/regression-test/suites/variant_p0/desc.groovy @@ -16,9 +16,9 @@ // under the License. suite("regression_test_variant_desc", "nonConcurrent"){ - if (isCloudMode()) { - return - } + // if (isCloudMode()) { + // return + // } def load_json_data = {table_name, file_name -> // load the json data @@ -101,10 +101,13 @@ suite("regression_test_variant_desc", "nonConcurrent"){ sql """set describe_extend_variant_column = true""" sql """insert into sparse_columns select 0, '{"a": 11245, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}' as json_str union all select 0, '{"a": 1123}' as json_str union all select 0, '{"a" : 1234, "xxxx" : "kaana"}' as json_str from numbers("number" = "4096") limit 4096 ;""" + // select for sync rowsets + sql "select * from sparse_columns limit 1" qt_sql_1 """desc ${table_name}""" sql "truncate table sparse_columns" sql """insert into sparse_columns select 0, '{"a": 1123, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}, "zzz" : null, "oooo" : {"akakaka" : null, "xxxx" : {"xxx" : 123}}}' as json_str union all select 0, '{"a" : 1234, "xxxx" : "kaana", "ddd" : {"aaa" : 123, "mxmxm" : [456, "789"]}}' as json_str from numbers("number" = "4096") limit 4096 ;""" + sql "select * from sparse_columns limit 1" qt_sql_2 """desc ${table_name}""" sql "truncate table sparse_columns" @@ -115,6 +118,7 @@ suite("regression_test_variant_desc", "nonConcurrent"){ set_be_config.call("variant_ratio_of_defaults_as_sparse_column", "1.0") sql """insert into ${table_name} select 0, '{"a": 11245, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}' as json_str union all select 0, '{"a": 1123}' as json_str union all select 0, '{"a" : 1234, "xxxx" : "kaana"}' as json_str from numbers("number" = "4096") limit 4096 ;""" + sql "select * from no_sparse_columns limit 1" qt_sql_3 """desc ${table_name}""" sql "truncate table ${table_name}" @@ -128,6 +132,7 @@ suite("regression_test_variant_desc", "nonConcurrent"){ sql """insert into ${table_name} select 45000, '{"a": 11245, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}' as json_str union all select 45000, '{"a": 1123}' as json_str union all select 45000, '{"a" : 1234, "xxxx" : "kaana"}' as json_str from numbers("number" = "4096") limit 4096 ;""" sql """insert into ${table_name} values(95000, '{"a": 11245, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}')""" + sql "select * from partition_data limit 1" qt_sql_6_1 """desc ${table_name} partition p1""" qt_sql_6_2 """desc ${table_name} partition p2""" qt_sql_6_3 """desc ${table_name} partition p3""" @@ -145,6 +150,7 @@ suite("regression_test_variant_desc", "nonConcurrent"){ sql """insert into ${table_name} values(95000, '{"a": 11245, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}')""" // drop p1 sql """alter table ${table_name} drop partition p1""" + sql "select * from drop_partition limit 1" qt_sql_7 """desc ${table_name}""" qt_sql_7_1 """desc ${table_name} partition p2""" qt_sql_7_2 """desc ${table_name} partition p3""" @@ -165,6 +171,7 @@ suite("regression_test_variant_desc", "nonConcurrent"){ properties("replication_num" = "1", "disable_auto_compaction" = "false"); """ sql """ insert into ${table_name} values (0, '{"a": 1123, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}, "zzz" : null, "oooo" : {"akakaka" : null, "xxxx" : {"xxx" : 123}}}', '{"a": 11245, "xxxx" : "kaana"}', '{"a": 11245, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}')""" + sql "select * from ${table_name} limit 1" qt_sql_8 """desc ${table_name}""" sql "truncate table ${table_name}" @@ -181,6 +188,7 @@ suite("regression_test_variant_desc", "nonConcurrent"){ properties("replication_num" = "1", "disable_auto_compaction" = "false"); """ sql """ insert into ${table_name} values (0, '{"a": 1123, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}, "zzz" : null, "oooo" : {"akakaka" : null, "xxxx" : {"xxx" : 123}}}')""" + sql "select * from ${table_name} limit 1" qt_sql_9 """desc ${table_name}""" sql """set describe_extend_variant_column = true""" qt_sql_9_1 """desc ${table_name}""" @@ -191,12 +199,14 @@ suite("regression_test_variant_desc", "nonConcurrent"){ create_table.call(table_name, "5") // add, drop columns sql """INSERT INTO ${table_name} values(0, '{"k1":1, "k2": "hello world", "k3" : [1234], "k4" : 1.10000, "k5" : [[123]]}')""" + sql "select * from ${table_name} limit 1" sql """set describe_extend_variant_column = true""" qt_sql_10 """desc ${table_name}""" // add column sql "alter table ${table_name} add column v2 variant default null" sql """ insert into ${table_name} values (0, '{"a": 1123, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}, "zzz" : null, "oooo" : {"akakaka" : null, "xxxx" : {"xxx" : 123}}}', '{"a": 1123, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}, "zzz" : null, "oooo" : {"akakaka" : null, "xxxx" : {"xxx" : 123}}}')""" + sql "select * from ${table_name} limit 1" qt_sql_10_1 """desc ${table_name}""" // drop cloumn sql "alter table ${table_name} drop column v2" @@ -205,6 +215,7 @@ suite("regression_test_variant_desc", "nonConcurrent"){ sql "alter table ${table_name} add column v3 variant default null" sql """ insert into ${table_name} values (0, '{"a": 1123, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}, "zzz" : null, "oooo" : {"akakaka" : null, "xxxx" : {"xxx" : 123}}}', '{"a": 1123, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}, "zzz" : null, "oooo" : {"akakaka" : null, "xxxx" : {"xxx" : 123}}}')""" + sql "select * from ${table_name} limit 1" qt_sql_10_3 """desc ${table_name}""" //sql "truncate table ${table_name}" @@ -221,6 +232,7 @@ suite("regression_test_variant_desc", "nonConcurrent"){ """ sql """ insert into ${table_name} values (0, '{"名字" : "jack", "!@#^&*()": "11111", "金额" : 200, "画像" : {"地址" : "北京", "\\\u4E2C\\\u6587": "unicode"}}')""" sql """set describe_extend_variant_column = true""" + sql "select * from ${table_name} limit 1" qt_sql_11 """desc ${table_name}""" // varaint subcolumn: empty @@ -237,6 +249,7 @@ suite("regression_test_variant_desc", "nonConcurrent"){ sql """ insert into ${table_name} values (0, '{}')""" sql """ insert into ${table_name} values (0, '100')""" sql """set describe_extend_variant_column = true""" + sql "select * from ${table_name} limit 1" qt_sql_12 """desc ${table_name}""" @@ -247,6 +260,7 @@ suite("regression_test_variant_desc", "nonConcurrent"){ sql """insert into large_tablets values (3001, '{"b" : 10}')""" sql """insert into large_tablets values (50001, '{"c" : 10}')""" sql """insert into large_tablets values (99999, '{"d" : 10}')""" + sql "select * from ${table_name} limit 1" sql """set max_fetch_remote_schema_tablet_count = 2""" sql "desc large_tablets" sql """set max_fetch_remote_schema_tablet_count = 128"""