Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Cloud](Variant) support DESC merge variants's schema for cloud mode … #38143

Merged
merged 1 commit into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <memory>
#include <shared_mutex>
#include <unordered_map>
#include <vector>

#include "cloud/cloud_meta_mgr.h"
#include "cloud/cloud_storage_engine.h"
Expand All @@ -43,8 +44,10 @@
#include "olap/rowset/rowset_writer.h"
#include "olap/rowset/segment_v2/inverted_index_desc.h"
#include "olap/storage_policy.h"
#include "olap/tablet_schema.h"
#include "olap/txn_manager.h"
#include "util/debug_points.h"
#include "vec/common/schema_util.h"

namespace doris {
using namespace ErrorCode;
Expand Down Expand Up @@ -132,6 +135,19 @@ Status CloudTablet::sync_rowsets(int64_t query_version, bool warmup_delta_data)
return st;
}

TabletSchemaSPtr CloudTablet::merged_tablet_schema() const {
std::shared_lock rdlock(_meta_lock);
TabletSchemaSPtr target_schema;
std::vector<TabletSchemaSPtr> schemas;
for (const auto& [_, rowset] : _rs_version_map) {
schemas.push_back(rowset->tablet_schema());
}
// get the max version schema and merge all schema
static_cast<void>(
vectorized::schema_util::get_least_common_schema(schemas, nullptr, target_schema));
return target_schema;
}

// Sync tablet meta and all rowset meta if not running.
// This could happen when BE didn't finish schema change job and another BE committed this schema change job.
// It should be a quite rare situation.
Expand Down
3 changes: 3 additions & 0 deletions be/src/cloud/cloud_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,9 @@ class CloudTablet final : public BaseTablet {
int64_t last_cumu_compaction_success_time_ms = 0;
int64_t last_cumu_no_suitable_version_ms = 0;

// Return merged extended schema
TabletSchemaSPtr merged_tablet_schema() const override;

private:
// FIXME(plat1ko): No need to record base size if rowsets are ordered by version
void update_base_size(const Rowset& rs);
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/base_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "olap/rowset/segment_v2/segment.h"
#include "olap/tablet_fwd.h"
#include "olap/tablet_meta.h"
#include "olap/tablet_schema.h"
#include "olap/version_graph.h"
#include "util/metrics.h"

Expand Down Expand Up @@ -252,6 +253,9 @@ class BaseTablet {
const std::vector<RowsetSharedPtr>& candidate_rowsets,
int limit);

// Return the merged schema of all rowsets
virtual TabletSchemaSPtr merged_tablet_schema() const { return _max_version_schema; }

protected:
// Find the missed versions until the spec_version.
//
Expand Down
16 changes: 8 additions & 8 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1044,11 +1044,11 @@ struct AsyncRPCContext {
brpc::CallId cid;
};

void PInternalServiceImpl::fetch_remote_tablet_schema(google::protobuf::RpcController* controller,
const PFetchRemoteSchemaRequest* request,
PFetchRemoteSchemaResponse* response,
google::protobuf::Closure* done) {
bool ret = _heavy_work_pool.try_offer([this, request, response, done]() {
void PInternalService::fetch_remote_tablet_schema(google::protobuf::RpcController* controller,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: function 'fetch_remote_tablet_schema' exceeds recommended size/complexity thresholds [readability-function-size]

void PInternalService::fetch_remote_tablet_schema(google::protobuf::RpcController* controller,
                       ^
Additional context

be/src/service/internal_service.cpp:1046: 96 lines including whitespace and comments (threshold 80)

void PInternalService::fetch_remote_tablet_schema(google::protobuf::RpcController* controller,
                       ^

const PFetchRemoteSchemaRequest* request,
PFetchRemoteSchemaResponse* response,
google::protobuf::Closure* done) {
bool ret = _heavy_work_pool.try_offer([request, response, done]() {
brpc::ClosureGuard closure_guard(done);
Status st = Status::OK();
if (request->is_coordinator()) {
Expand Down Expand Up @@ -1120,13 +1120,13 @@ void PInternalServiceImpl::fetch_remote_tablet_schema(google::protobuf::RpcContr
if (!target_tablets.empty()) {
std::vector<TabletSchemaSPtr> tablet_schemas;
for (int64_t tablet_id : target_tablets) {
TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(tablet_id, false);
if (tablet == nullptr) {
auto res = ExecEnv::get_tablet(tablet_id);
if (!res.has_value()) {
// just ignore
LOG(WARNING) << "tablet does not exist, tablet id is " << tablet_id;
continue;
}
tablet_schemas.push_back(tablet->tablet_schema());
tablet_schemas.push_back(res.value()->merged_tablet_schema());
}
if (!tablet_schemas.empty()) {
// merge all
Expand Down
10 changes: 5 additions & 5 deletions be/src/service/internal_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,11 @@ class PInternalService : public PBackendService {
PJdbcTestConnectionResult* result,
google::protobuf::Closure* done) override;

void fetch_remote_tablet_schema(google::protobuf::RpcController* controller,
const PFetchRemoteSchemaRequest* request,
PFetchRemoteSchemaResponse* response,
google::protobuf::Closure* done) override;

private:
void _exec_plan_fragment_in_pthread(google::protobuf::RpcController* controller,
const PExecPlanFragmentRequest* request,
Expand Down Expand Up @@ -287,11 +292,6 @@ class PInternalServiceImpl final : public PInternalService {
PGetTabletVersionsResponse* response,
google::protobuf::Closure* done) override;

void fetch_remote_tablet_schema(google::protobuf::RpcController* controller,
const PFetchRemoteSchemaRequest* request,
PFetchRemoteSchemaResponse* response,
google::protobuf::Closure* done) override;

private:
void _response_pull_slave_rowset(const std::string& remote_host, int64_t brpc_port,
int64_t txn_id, int64_t tablet_id, int64_t node_id,
Expand Down
20 changes: 17 additions & 3 deletions regression-test/suites/variant_p0/desc.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
// under the License.

suite("regression_test_variant_desc", "nonConcurrent"){
if (isCloudMode()) {
return
}
// if (isCloudMode()) {
// return
// }

def load_json_data = {table_name, file_name ->
// load the json data
Expand Down Expand Up @@ -101,10 +101,13 @@ suite("regression_test_variant_desc", "nonConcurrent"){
sql """set describe_extend_variant_column = true"""
sql """insert into sparse_columns select 0, '{"a": 11245, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}' as json_str
union all select 0, '{"a": 1123}' as json_str union all select 0, '{"a" : 1234, "xxxx" : "kaana"}' as json_str from numbers("number" = "4096") limit 4096 ;"""
// select for sync rowsets
sql "select * from sparse_columns limit 1"
qt_sql_1 """desc ${table_name}"""
sql "truncate table sparse_columns"
sql """insert into sparse_columns select 0, '{"a": 1123, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}, "zzz" : null, "oooo" : {"akakaka" : null, "xxxx" : {"xxx" : 123}}}' as json_str
union all select 0, '{"a" : 1234, "xxxx" : "kaana", "ddd" : {"aaa" : 123, "mxmxm" : [456, "789"]}}' as json_str from numbers("number" = "4096") limit 4096 ;"""
sql "select * from sparse_columns limit 1"
qt_sql_2 """desc ${table_name}"""
sql "truncate table sparse_columns"

Expand All @@ -115,6 +118,7 @@ suite("regression_test_variant_desc", "nonConcurrent"){
set_be_config.call("variant_ratio_of_defaults_as_sparse_column", "1.0")
sql """insert into ${table_name} select 0, '{"a": 11245, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}' as json_str
union all select 0, '{"a": 1123}' as json_str union all select 0, '{"a" : 1234, "xxxx" : "kaana"}' as json_str from numbers("number" = "4096") limit 4096 ;"""
sql "select * from no_sparse_columns limit 1"
qt_sql_3 """desc ${table_name}"""
sql "truncate table ${table_name}"

Expand All @@ -128,6 +132,7 @@ suite("regression_test_variant_desc", "nonConcurrent"){
sql """insert into ${table_name} select 45000, '{"a": 11245, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}' as json_str
union all select 45000, '{"a": 1123}' as json_str union all select 45000, '{"a" : 1234, "xxxx" : "kaana"}' as json_str from numbers("number" = "4096") limit 4096 ;"""
sql """insert into ${table_name} values(95000, '{"a": 11245, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}')"""
sql "select * from partition_data limit 1"
qt_sql_6_1 """desc ${table_name} partition p1"""
qt_sql_6_2 """desc ${table_name} partition p2"""
qt_sql_6_3 """desc ${table_name} partition p3"""
Expand All @@ -145,6 +150,7 @@ suite("regression_test_variant_desc", "nonConcurrent"){
sql """insert into ${table_name} values(95000, '{"a": 11245, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}')"""
// drop p1
sql """alter table ${table_name} drop partition p1"""
sql "select * from drop_partition limit 1"
qt_sql_7 """desc ${table_name}"""
qt_sql_7_1 """desc ${table_name} partition p2"""
qt_sql_7_2 """desc ${table_name} partition p3"""
Expand All @@ -165,6 +171,7 @@ suite("regression_test_variant_desc", "nonConcurrent"){
properties("replication_num" = "1", "disable_auto_compaction" = "false");
"""
sql """ insert into ${table_name} values (0, '{"a": 1123, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}, "zzz" : null, "oooo" : {"akakaka" : null, "xxxx" : {"xxx" : 123}}}', '{"a": 11245, "xxxx" : "kaana"}', '{"a": 11245, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}')"""
sql "select * from ${table_name} limit 1"
qt_sql_8 """desc ${table_name}"""
sql "truncate table ${table_name}"

Expand All @@ -181,6 +188,7 @@ suite("regression_test_variant_desc", "nonConcurrent"){
properties("replication_num" = "1", "disable_auto_compaction" = "false");
"""
sql """ insert into ${table_name} values (0, '{"a": 1123, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}, "zzz" : null, "oooo" : {"akakaka" : null, "xxxx" : {"xxx" : 123}}}')"""
sql "select * from ${table_name} limit 1"
qt_sql_9 """desc ${table_name}"""
sql """set describe_extend_variant_column = true"""
qt_sql_9_1 """desc ${table_name}"""
Expand All @@ -191,12 +199,14 @@ suite("regression_test_variant_desc", "nonConcurrent"){
create_table.call(table_name, "5")
// add, drop columns
sql """INSERT INTO ${table_name} values(0, '{"k1":1, "k2": "hello world", "k3" : [1234], "k4" : 1.10000, "k5" : [[123]]}')"""
sql "select * from ${table_name} limit 1"
sql """set describe_extend_variant_column = true"""
qt_sql_10 """desc ${table_name}"""
// add column
sql "alter table ${table_name} add column v2 variant default null"
sql """ insert into ${table_name} values (0, '{"a": 1123, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}, "zzz" : null, "oooo" : {"akakaka" : null, "xxxx" : {"xxx" : 123}}}',
'{"a": 1123, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}, "zzz" : null, "oooo" : {"akakaka" : null, "xxxx" : {"xxx" : 123}}}')"""
sql "select * from ${table_name} limit 1"
qt_sql_10_1 """desc ${table_name}"""
// drop cloumn
sql "alter table ${table_name} drop column v2"
Expand All @@ -205,6 +215,7 @@ suite("regression_test_variant_desc", "nonConcurrent"){
sql "alter table ${table_name} add column v3 variant default null"
sql """ insert into ${table_name} values (0, '{"a": 1123, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}, "zzz" : null, "oooo" : {"akakaka" : null, "xxxx" : {"xxx" : 123}}}',
'{"a": 1123, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}, "zzz" : null, "oooo" : {"akakaka" : null, "xxxx" : {"xxx" : 123}}}')"""
sql "select * from ${table_name} limit 1"
qt_sql_10_3 """desc ${table_name}"""
//sql "truncate table ${table_name}"

Expand All @@ -221,6 +232,7 @@ suite("regression_test_variant_desc", "nonConcurrent"){
"""
sql """ insert into ${table_name} values (0, '{"名字" : "jack", "!@#^&*()": "11111", "金额" : 200, "画像" : {"地址" : "北京", "\\\u4E2C\\\u6587": "unicode"}}')"""
sql """set describe_extend_variant_column = true"""
sql "select * from ${table_name} limit 1"
qt_sql_11 """desc ${table_name}"""

// varaint subcolumn: empty
Expand All @@ -237,6 +249,7 @@ suite("regression_test_variant_desc", "nonConcurrent"){
sql """ insert into ${table_name} values (0, '{}')"""
sql """ insert into ${table_name} values (0, '100')"""
sql """set describe_extend_variant_column = true"""
sql "select * from ${table_name} limit 1"
qt_sql_12 """desc ${table_name}"""


Expand All @@ -247,6 +260,7 @@ suite("regression_test_variant_desc", "nonConcurrent"){
sql """insert into large_tablets values (3001, '{"b" : 10}')"""
sql """insert into large_tablets values (50001, '{"c" : 10}')"""
sql """insert into large_tablets values (99999, '{"d" : 10}')"""
sql "select * from ${table_name} limit 1"
sql """set max_fetch_remote_schema_tablet_count = 2"""
sql "desc large_tablets"
sql """set max_fetch_remote_schema_tablet_count = 128"""
Expand Down
Loading