From 65ae146ba1653ba96593821faf3b237766abdf8e Mon Sep 17 00:00:00 2001 From: Lchangliang <915311741@qq.com> Date: Thu, 1 Aug 2024 15:14:52 +0800 Subject: [PATCH] tmp --- be/src/cloud/cloud_base_compaction.cpp | 4 +- be/src/cloud/cloud_cumulative_compaction.cpp | 5 +- cloud/src/meta-service/meta_service_job.cpp | 43 +-- cloud/test/meta_service_job_test.cpp | 10 +- .../apache/doris/alter/CloudRollupJobV2.java | 6 +- .../doris/alter/CloudSchemaChangeJobV2.java | 6 +- .../datasource/CloudInternalCatalog.java | 4 +- gensrc/proto/cloud.proto | 2 +- .../pipeline/cloud_p0/conf/be_custom.conf | 1 + ...est_schema_change_with_compaction10.groovy | 1 + ...est_schema_change_with_compaction11.groovy | 280 ++++++++++++++++++ ...test_schema_change_with_compaction5.groovy | 1 + ...test_schema_change_with_compaction6.groovy | 1 + ...test_schema_change_with_compaction9.groovy | 1 + .../schema_change/ddl/date_create.sql | 5 +- .../schema_change/ddl/date_unique_create.sql | 5 +- 16 files changed, 339 insertions(+), 36 deletions(-) create mode 100644 regression-test/suites/cloud_p0/schema_change/compaction11/test_schema_change_with_compaction11.groovy diff --git a/be/src/cloud/cloud_base_compaction.cpp b/be/src/cloud/cloud_base_compaction.cpp index 044a64ff95786f..23d4850d0002e8 100644 --- a/be/src/cloud/cloud_base_compaction.cpp +++ b/be/src/cloud/cloud_base_compaction.cpp @@ -92,6 +92,9 @@ Status CloudBaseCompaction::prepare_compact() { compaction_job->set_lease(now + config::lease_compaction_interval_seconds * 4); cloud::StartTabletJobResponse resp; auto st = _engine.meta_mgr().prepare_tablet_job(job, &resp); + if (resp.has_alter_version()) { + (static_cast(_tablet.get()))->set_alter_version(resp.alter_version()); + } if (!st.ok()) { if (resp.status().code() == cloud::STALE_TABLET_CACHE) { // set last_sync_time to 0 to force sync tablet next time @@ -113,7 +116,6 @@ Status CloudBaseCompaction::prepare_compact() { << " schema_change_alter_version=" << resp.alter_version(); std::string msg = ss.str(); LOG(WARNING) << msg; - cloud_tablet->set_alter_version(resp.alter_version()); return Status::InternalError(msg); } return st; diff --git a/be/src/cloud/cloud_cumulative_compaction.cpp b/be/src/cloud/cloud_cumulative_compaction.cpp index 0d01a4e5f58b48..ea6062309f28c7 100644 --- a/be/src/cloud/cloud_cumulative_compaction.cpp +++ b/be/src/cloud/cloud_cumulative_compaction.cpp @@ -270,11 +270,13 @@ Status CloudCumulativeCompaction::modify_rowsets() { cloud::FinishTabletJobResponse resp; auto st = _engine.meta_mgr().commit_tablet_job(job, &resp); + if (resp.has_alter_version()) { + (static_cast(_tablet.get()))->set_alter_version(resp.alter_version()); + } if (!st.ok()) { if (resp.status().code() == cloud::TABLET_NOT_FOUND) { cloud_tablet()->clear_cache(); } else if (resp.status().code() == cloud::JOB_CHECK_ALTER_VERSION) { - (dynamic_cast(_tablet.get()))->set_alter_version(resp.alter_version()); std::stringstream ss; ss << "failed to prepare cumu compaction. Check compaction input versions " "failed in schema change. " @@ -288,6 +290,7 @@ Status CloudCumulativeCompaction::modify_rowsets() { } return st; } + auto& stats = resp.stats(); LOG(INFO) << "tablet stats=" << stats.ShortDebugString(); { diff --git a/cloud/src/meta-service/meta_service_job.cpp b/cloud/src/meta-service/meta_service_job.cpp index ba56d5c5a0b93e..b2b9ec2531b3fb 100644 --- a/cloud/src/meta-service/meta_service_job.cpp +++ b/cloud/src/meta-service/meta_service_job.cpp @@ -919,7 +919,10 @@ void process_compaction_job(MetaServiceCode& code, std::string& msg, std::string txn->put(job_key, job_val); INSTANCE_LOG(INFO) << "remove compaction job tabelt_id=" << tablet_id << " key=" << hex(job_key); - + response->set_alter_version(recorded_job.has_schema_change() && + recorded_job.schema_change().has_alter_version() + ? recorded_job.schema_change().alter_version() + : -1); need_commit = true; } @@ -1007,9 +1010,8 @@ void process_schema_change_job(MetaServiceCode& code, std::string& msg, std::str } // MUST check initiator to let the retried BE commit this schema_change job. - if (request->action() == FinishTabletJobRequest::COMMIT && - (schema_change.id() != recorded_schema_change.id() || - schema_change.initiator() != recorded_schema_change.initiator())) { + if (schema_change.id() != recorded_schema_change.id() || + schema_change.initiator() != recorded_schema_change.initiator()) { SS << "unmatched job id or initiator, recorded_id=" << recorded_schema_change.id() << " given_id=" << schema_change.id() << " recorded_job=" << proto_to_json(recorded_schema_change) @@ -1031,21 +1033,22 @@ void process_schema_change_job(MetaServiceCode& code, std::string& msg, std::str {instance_id, new_table_id, new_index_id, new_partition_id, new_tablet_id}); std::string new_tablet_job_val; + TabletJobInfoPB new_recorded_job; err = txn->get(new_tablet_job_key, &new_tablet_job_val); - if (err != TxnErrorCode::TXN_OK) { - SS << (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? "job not found," : "internal error,") + if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) { + SS << "internal error," << " instance_id=" << instance_id << " tablet_id=" << new_tablet_id << " job=" << proto_to_json(request->job()) << " err=" << err; msg = ss.str(); code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::INVALID_ARGUMENT : cast_as(err); return; - } - TabletJobInfoPB new_recorded_job; - if (!new_recorded_job.ParseFromString(new_tablet_job_val)) { - code = MetaServiceCode::PROTOBUF_PARSE_ERR; - msg = "malformed new tablet recorded job"; - return; + } else if (err == TxnErrorCode::TXN_OK) { + if (!new_recorded_job.ParseFromString(new_tablet_job_val)) { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + msg = "malformed new tablet recorded job"; + return; + } } //========================================================================== @@ -1058,11 +1061,13 @@ void process_schema_change_job(MetaServiceCode& code, std::string& msg, std::str recorded_schema_change.new_tablet_idx().tablet_id()) { // remove schema change recorded_job.clear_schema_change(); - new_recorded_job.clear_schema_change(); auto job_val = recorded_job.SerializeAsString(); - new_tablet_job_val = new_recorded_job.SerializeAsString(); txn->put(job_key, job_val); - txn->put(new_tablet_job_key, new_tablet_job_val); + if (!new_tablet_job_val.empty()) { + new_recorded_job.clear_schema_change(); + new_tablet_job_val = new_recorded_job.SerializeAsString(); + txn->put(new_tablet_job_key, new_tablet_job_val); + } INSTANCE_LOG(INFO) << "remove schema_change job tablet_id=" << tablet_id << " key=" << hex(job_key); @@ -1226,11 +1231,13 @@ void process_schema_change_job(MetaServiceCode& code, std::string& msg, std::str // remove schema_change job //========================================================================== recorded_job.clear_schema_change(); - new_recorded_job.clear_schema_change(); auto job_val = recorded_job.SerializeAsString(); txn->put(job_key, job_val); - new_tablet_job_val = new_recorded_job.SerializeAsString(); - txn->put(new_tablet_job_key, new_tablet_job_val); + if (!new_tablet_job_val.empty()) { + new_recorded_job.clear_schema_change(); + new_tablet_job_val = new_recorded_job.SerializeAsString(); + txn->put(new_tablet_job_key, new_tablet_job_val); + } INSTANCE_LOG(INFO) << "remove schema_change job tablet_id=" << tablet_id << " key=" << hex(job_key); diff --git a/cloud/test/meta_service_job_test.cpp b/cloud/test/meta_service_job_test.cpp index def9fb11ed8fec..f0323eebb790be 100644 --- a/cloud/test/meta_service_job_test.cpp +++ b/cloud/test/meta_service_job_test.cpp @@ -687,8 +687,11 @@ TEST(MetaServiceJobTest, ProcessSchemaChangeArguments) { recorded_sc->set_id("sc1"); recorded_sc->set_initiator("BE1"); job_val = recorded_job.SerializeAsString(); + auto new_job_key = + job_tablet_key({instance_id, table_id, new_index_id, partition_id, new_tablet_id}); ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); txn->put(job_key, job_val); + txn->put(new_job_key, job_val); ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); meta_service->finish_tablet_job(&cntl, &req, &res, nullptr); ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT) << res.status().msg(); @@ -2342,12 +2345,12 @@ TEST(MetaServiceJobTest, DoCompactionWhenSC) { StartTabletJobResponse res; start_compaction_job(meta_service.get(), tablet_id, "job1", "BE1", 0, 7, TabletCompactionJobPB::CUMULATIVE, res, {7, 10}); - ASSERT_EQ(res.status().code(), MetaServiceCode::JOB_CHECK_ALTER_VERSION_FAIL); + ASSERT_EQ(res.status().code(), MetaServiceCode::JOB_CHECK_ALTER_VERSION); res.Clear(); start_compaction_job(meta_service.get(), tablet_id, "job1", "BE1", 0, 7, TabletCompactionJobPB::BASE, res, {0, 10}); - ASSERT_EQ(res.status().code(), MetaServiceCode::JOB_CHECK_ALTER_VERSION_FAIL); + ASSERT_EQ(res.status().code(), MetaServiceCode::JOB_CHECK_ALTER_VERSION); res.Clear(); start_compaction_job(meta_service.get(), tablet_id, "job1", "BE1", 0, 7, @@ -2499,7 +2502,8 @@ TEST(MetaServiceJobTest, CancelSC) { FinishTabletJobResponse finish_res; finish_schema_change_job(meta_service.get(), tablet_id, new_tablet_id, "job_sc", "BE1", {}, finish_res, FinishTabletJobRequest::ABORT); - ASSERT_EQ(finish_res.status().code(), MetaServiceCode::OK); + ASSERT_NE(finish_res.status().msg().find("unmatched job id or initiator"), + std::string::npos); } { std::unique_ptr txn; diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java index f36d9b5f370006..5c5dd1972ed323 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java @@ -131,11 +131,11 @@ protected void onCancel() { Long rollupTabletId = tabletEntry.getKey(); Long baseTabletId = tabletEntry.getValue(); ((CloudInternalCatalog) Env.getCurrentInternalCatalog()) - .removeSchemaChangeJob(dbId, tableId, baseIndexId, rollupIndexId, + .removeSchemaChangeJob(dbId, tableId, baseIndexId, rollupIndexId, partitionId, baseTabletId, rollupTabletId); } - LOG.info("Cancel RollupJob. Remove SchemaChangeJob in ms." + - "dbId:{}, tableId:{}, rollupIndexId: {} partitionId:{}. tabletSize:{}", + LOG.info("Cancel RollupJob. Remove SchemaChangeJob in ms." + + "dbId:{}, tableId:{}, rollupIndexId: {} partitionId:{}. tabletSize:{}", dbId, tableId, rollupIndexId, partitionId, rollupTabletIdToBaseTabletId.size()); } break; diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java index ac80812e5b8420..3d79863addb8ac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java @@ -134,11 +134,11 @@ protected void onCancel() { Long shadowTabletId = entry.getKey(); Long originTabletId = entry.getValue(); ((CloudInternalCatalog) Env.getCurrentInternalCatalog()) - .removeSchemaChangeJob(dbId, tableId, originIndexId, shadowIndexId, + .removeSchemaChangeJob(dbId, tableId, originIndexId, shadowIndexId, partitionId, originTabletId, shadowTabletId); } - LOG.info("Cancel SchemaChange. Remove SchemaChangeJob in ms." + - "dbId:{}, tableId:{}, originIndexId:{}, partitionId:{}. tabletSize:{}", + LOG.info("Cancel SchemaChange. Remove SchemaChangeJob in ms." + + "dbId:{}, tableId:{}, originIndexId:{}, partitionId:{}. tabletSize:{}", dbId, tableId, originIndexId, partitionId, shadowTabletIdToOriginTabletId.size()); } break; diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java index c3243630376b4c..f0c9278562d437 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java @@ -874,8 +874,8 @@ public void removeSchemaChangeJob(long dbId, long tableId, long indexId, long ne newtabletIndexPBBuilder.setTabletId(newTabletId); final Cloud.TabletIndexPB newtabletIndex = newtabletIndexPBBuilder.build(); schemaChangeJobPBBuilder.setNewTabletIdx(newtabletIndex); - final Cloud.TabletSchemaChangeJobPB tabletSchemaChangeJobPb = - schemaChangeJobPBBuilder.build(); + final Cloud.TabletSchemaChangeJobPB tabletSchemaChangeJobPb = + schemaChangeJobPBBuilder.build(); tabletJobInfoPBBuilder.setSchemaChange(tabletSchemaChangeJobPb); diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index 06850f7db3aeaf..f8acd97d05fb38 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -1340,7 +1340,7 @@ enum MetaServiceCode { JOB_ALREADY_SUCCESS = 5002; ROUTINE_LOAD_DATA_INCONSISTENT = 5003; ROUTINE_LOAD_PROGRESS_NOT_FOUND = 5004; - JOB_CHECK_ALTER_VERSION_FAIL = 5005; + JOB_CHECK_ALTER_VERSION = 5005; // Rate limit MAX_QPS_LIMIT = 6001; diff --git a/regression-test/pipeline/cloud_p0/conf/be_custom.conf b/regression-test/pipeline/cloud_p0/conf/be_custom.conf index 9f2967b1972c11..1da9c9992d5f93 100644 --- a/regression-test/pipeline/cloud_p0/conf/be_custom.conf +++ b/regression-test/pipeline/cloud_p0/conf/be_custom.conf @@ -33,3 +33,4 @@ save_load_error_log_to_s3 = true enable_stream_load_record = true stream_load_record_batch_size = 500 webserver_num_workers = 128 +enable_new_tablet_do_compaction = true diff --git a/regression-test/suites/cloud_p0/schema_change/compaction10/test_schema_change_with_compaction10.groovy b/regression-test/suites/cloud_p0/schema_change/compaction10/test_schema_change_with_compaction10.groovy index 6fc8003527dc02..b393979d44218a 100644 --- a/regression-test/suites/cloud_p0/schema_change/compaction10/test_schema_change_with_compaction10.groovy +++ b/regression-test/suites/cloud_p0/schema_change/compaction10/test_schema_change_with_compaction10.groovy @@ -25,6 +25,7 @@ suite('test_schema_change_with_compaction10') { options.cloudMode = true options.enableDebugPoints() options.beConfigs += [ "enable_java_support=false" ] + options.beConfigs += [ "enable_new_tablet_do_compaction=true" ] options.beConfigs += [ "disable_auto_compaction=true" ] options.beNum = 1 docker(options) { diff --git a/regression-test/suites/cloud_p0/schema_change/compaction11/test_schema_change_with_compaction11.groovy b/regression-test/suites/cloud_p0/schema_change/compaction11/test_schema_change_with_compaction11.groovy new file mode 100644 index 00000000000000..fd257fcb7ea950 --- /dev/null +++ b/regression-test/suites/cloud_p0/schema_change/compaction11/test_schema_change_with_compaction11.groovy @@ -0,0 +1,280 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.http.NoHttpResponseException +import org.apache.doris.regression.util.DebugPoint +import org.apache.doris.regression.util.NodeType + +suite('test_schema_change_with_compaction11') { + def options = new ClusterOptions() + options.cloudMode = true + options.enableDebugPoints() + options.beConfigs += [ "enable_java_support=false" ] + options.beConfigs += [ "enable_new_tablet_do_compaction=false" ] + options.beConfigs += [ "disable_auto_compaction=true" ] + options.beNum = 1 + docker(options) { + def getJobState = { tableName -> + def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1 """ + return jobStateResult[0][9] + } + + def s3BucketName = getS3BucketName() + def s3WithProperties = """WITH S3 ( + |"AWS_ACCESS_KEY" = "${getS3AK()}", + |"AWS_SECRET_KEY" = "${getS3SK()}", + |"AWS_ENDPOINT" = "${getS3Endpoint()}", + |"AWS_REGION" = "${getS3Region()}", + |"provider" = "${getS3Provider()}") + |PROPERTIES( + |"exec_mem_limit" = "8589934592", + |"load_parallelism" = "3")""".stripMargin() + + // set fe configuration + sql "ADMIN SET FRONTEND CONFIG ('max_bytes_per_broker_scanner' = '161061273600')" + sql new File("""${context.file.parent}/../ddl/date_delete.sql""").text + def load_date_once = { String table -> + def uniqueID = Math.abs(UUID.randomUUID().hashCode()).toString() + def loadLabel = table + "_" + uniqueID + // load data from cos + def loadSql = new File("""${context.file.parent}/../ddl/${table}_load.sql""").text.replaceAll("\\\$\\{s3BucketName\\}", s3BucketName) + loadSql = loadSql.replaceAll("\\\$\\{loadLabel\\}", loadLabel) + s3WithProperties + sql loadSql + + // check load state + while (true) { + def stateResult = sql "show load where Label = '${loadLabel}'" + def loadState = stateResult[stateResult.size() - 1][2].toString() + if ("CANCELLED".equalsIgnoreCase(loadState)) { + throw new IllegalStateException("load ${loadLabel} failed.") + } else if ("FINISHED".equalsIgnoreCase(loadState)) { + break + } + sleep(5000) + } + } + + sql new File("""${context.file.parent}/../ddl/date_unique_create.sql""").text + def injectName = 'CloudSchemaChangeJob.process_alter_tablet.sleep' + def injectBe = null + def backends = sql_return_maparray('show backends') + def array = sql_return_maparray("SHOW TABLETS FROM date") + def injectBeId = array[0].BackendId + def originTabletId = array[0].TabletId + injectBe = backends.stream().filter(be -> be.BackendId == injectBeId).findFirst().orElse(null) + assertNotNull(injectBe) + + def load_delete_compaction = { + load_date_once("date"); + sql "delete from date where d_datekey < 19900000" + sql "select count(*) from date" + // cu compaction + logger.info("run compaction:" + originTabletId) + (code, out, err) = be_run_cumulative_compaction(injectBe.Host, injectBe.HttpPort, originTabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + boolean running = true + do { + Thread.sleep(100) + (code, out, err) = be_get_compaction_status(injectBe.Host, injectBe.HttpPort, originTabletId) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } while (running) + } + + try { + load_delete_compaction() + load_delete_compaction() + load_delete_compaction() + + load_date_once("date"); + + sleep(1000) + GetDebugPoint().enableDebugPointForAllBEs(injectName) + sql "ALTER TABLE date MODIFY COLUMN d_holidayfl bigint(11)" + sleep(5000) + array = sql_return_maparray("SHOW TABLETS FROM date") + + for (int i = 0; i < 5; i++) { + load_date_once("date"); + } + + // base compaction + logger.info("run compaction:" + originTabletId) + (code, out, err) = be_run_base_compaction(injectBe.Host, injectBe.HttpPort, originTabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + + + // wait for all compactions done + boolean running = true + while (running) { + Thread.sleep(100) + (code, out, err) = be_get_compaction_status(injectBe.Host, injectBe.HttpPort, originTabletId) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } + def newTabletId = array[1].TabletId + logger.info("run compaction:" + newTabletId) + (code, out, err) = be_run_base_compaction(injectBe.Host, injectBe.HttpPort, newTabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("invalid tablet state.")) + + + // cu compaction + tabletId = array[0].TabletId + logger.info("run compaction:" + tabletId) + (code, out, err) = be_run_cumulative_compaction(injectBe.Host, injectBe.HttpPort, tabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + + running = true + do { + Thread.sleep(100) + tabletId = array[0].TabletId + (code, out, err) = be_get_compaction_status(injectBe.Host, injectBe.HttpPort, tabletId) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } while (running) + + // new tablet cannot do cu compaction + tabletId = array[1].TabletId + logger.info("run compaction:" + tabletId) + (code, out, err) = be_run_cumulative_compaction(injectBe.Host, injectBe.HttpPort, tabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("invalid tablet state.")) + + } finally { + if (injectBe != null) { + GetDebugPoint().disableDebugPointForAllBEs(injectName) + } + int max_try_time = 3000 + while (max_try_time--){ + result = getJobState("date") + if (result == "FINISHED" || result == "CANCELLED") { + sleep(3000) + break + } else { + sleep(100) + if (max_try_time < 1){ + assertEquals(1,2) + } + } + } + assertEquals(result, "FINISHED"); + def count = sql """ select count(*) from date; """ + assertEquals(count[0][0], 2556); + // check rowsets + logger.info("run show:" + originTabletId) + (code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, originTabletId) + logger.info("Run show: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("[0-1]")) + assertTrue(out.contains("[2-7]")) + assertTrue(out.contains("[8-8]")) + assertTrue(out.contains("[9-13]")) + + logger.info("run show:" + newTabletId) + (code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, newTabletId) + logger.info("Run show: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("[0-1]")) + assertTrue(out.contains("[2-2]")) + assertTrue(out.contains("[7-7]")) + assertTrue(out.contains("[8-8]")) + assertTrue(out.contains("[9-9]")) + assertTrue(out.contains("[13-13]")) + + // base compaction + logger.info("run compaction:" + newTabletId) + (code, out, err) = be_run_base_compaction(injectBe.Host, injectBe.HttpPort, newTabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + + + // wait for all compactions done + boolean running = true + while (running) { + Thread.sleep(100) + (code, out, err) = be_get_compaction_status(injectBe.Host, injectBe.HttpPort, newTabletId) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } + + // cu compaction + logger.info("run compaction:" + newTabletId) + (code, out, err) = be_run_cumulative_compaction(injectBe.Host, injectBe.HttpPort, newTabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + + + // wait for all compactions done + running = true + while (running) { + Thread.sleep(100) + (code, out, err) = be_get_compaction_status(injectBe.Host, injectBe.HttpPort, newTabletId) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } + + logger.info("run show:" + newTabletId) + (code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, newTabletId) + logger.info("Run show: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("[0-1]")) + assertTrue(out.contains("[2-7]")) + assertTrue(out.contains("[8-13]")) + + for (int i = 0; i < 4; i++) { + load_date_once("date"); + } + + sql """ select count(*) from date """ + + logger.info("run compaction:" + newTabletId) + (code, out, err) = be_run_cumulative_compaction(injectBe.Host, injectBe.HttpPort, newTabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + + // wait for all compactions done + running = true + while (running) { + Thread.sleep(100) + (code, out, err) = be_get_compaction_status(injectBe.Host, injectBe.HttpPort, newTabletId) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } + + logger.info("run show:" + newTabletId) + (code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, newTabletId) + logger.info("Run show: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("[0-1]")) + assertTrue(out.contains("[2-7]")) + assertTrue(out.contains("[8-17]")) + } + } +} diff --git a/regression-test/suites/cloud_p0/schema_change/compaction5/test_schema_change_with_compaction5.groovy b/regression-test/suites/cloud_p0/schema_change/compaction5/test_schema_change_with_compaction5.groovy index c338dac907b245..f5028ff9e818c3 100644 --- a/regression-test/suites/cloud_p0/schema_change/compaction5/test_schema_change_with_compaction5.groovy +++ b/regression-test/suites/cloud_p0/schema_change/compaction5/test_schema_change_with_compaction5.groovy @@ -26,6 +26,7 @@ suite('test_schema_change_with_compaction5', 'nonConcurrent') { options.enableDebugPoints() options.beConfigs += [ "enable_java_support=false" ] options.beConfigs += [ "disable_auto_compaction=true" ] + options.beConfigs += [ "enable_new_tablet_do_compaction=true" ] options.beNum = 1 docker(options) { def getJobState = { tableName -> diff --git a/regression-test/suites/cloud_p0/schema_change/compaction6/test_schema_change_with_compaction6.groovy b/regression-test/suites/cloud_p0/schema_change/compaction6/test_schema_change_with_compaction6.groovy index 245dbe46b714c6..951535433d1362 100644 --- a/regression-test/suites/cloud_p0/schema_change/compaction6/test_schema_change_with_compaction6.groovy +++ b/regression-test/suites/cloud_p0/schema_change/compaction6/test_schema_change_with_compaction6.groovy @@ -26,6 +26,7 @@ suite('test_schema_change_with_compaction6', 'nonConcurrent') { options.enableDebugPoints() options.beConfigs += [ "enable_java_support=false" ] options.beConfigs += [ "disable_auto_compaction=true" ] + options.beConfigs += [ "enable_new_tablet_do_compaction=true" ] options.beNum = 1 docker(options) { def getJobState = { tableName -> diff --git a/regression-test/suites/cloud_p0/schema_change/compaction9/test_schema_change_with_compaction9.groovy b/regression-test/suites/cloud_p0/schema_change/compaction9/test_schema_change_with_compaction9.groovy index 6cb47e01f4b62c..83c549eefc5abd 100644 --- a/regression-test/suites/cloud_p0/schema_change/compaction9/test_schema_change_with_compaction9.groovy +++ b/regression-test/suites/cloud_p0/schema_change/compaction9/test_schema_change_with_compaction9.groovy @@ -26,6 +26,7 @@ suite('test_schema_change_with_compaction9') { options.enableDebugPoints() options.beConfigs += [ "enable_java_support=false" ] options.beConfigs += [ "disable_auto_compaction=true" ] + options.beConfigs += [ "enable_new_tablet_do_compaction=true" ] options.beNum = 1 docker(options) { def getJobState = { tableName -> diff --git a/regression-test/suites/cloud_p0/schema_change/ddl/date_create.sql b/regression-test/suites/cloud_p0/schema_change/ddl/date_create.sql index 8486d7178bbe5d..99c85399c123b2 100644 --- a/regression-test/suites/cloud_p0/schema_change/ddl/date_create.sql +++ b/regression-test/suites/cloud_p0/schema_change/ddl/date_create.sql @@ -19,5 +19,6 @@ CREATE TABLE IF NOT EXISTS `date` ( ) DISTRIBUTED BY HASH(`d_datekey`) BUCKETS 1 PROPERTIES ( -"replication_num" = "1" -); \ No newline at end of file +"replication_num" = "1", +"disable_auto_compaction" = "true" +); diff --git a/regression-test/suites/cloud_p0/schema_change/ddl/date_unique_create.sql b/regression-test/suites/cloud_p0/schema_change/ddl/date_unique_create.sql index 0c3005c6e03f77..6138cb213a2063 100644 --- a/regression-test/suites/cloud_p0/schema_change/ddl/date_unique_create.sql +++ b/regression-test/suites/cloud_p0/schema_change/ddl/date_unique_create.sql @@ -22,5 +22,6 @@ DISTRIBUTED BY HASH(`d_datekey`) BUCKETS 1 PROPERTIES ( "replication_num" = "1", "enable_unique_key_merge_on_write" = "true", -"enable_mow_light_delete" = "true" -); \ No newline at end of file +"enable_mow_light_delete" = "true", +"disable_auto_compaction" = "true" +);