Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
Lchangliang committed Jul 22, 2024
1 parent 9e3fc58 commit b98e8ed
Show file tree
Hide file tree
Showing 10 changed files with 1,386 additions and 4 deletions.
3 changes: 1 addition & 2 deletions be/src/cloud/cloud_schema_change_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,6 @@ Status CloudSchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& reque
<< ", alter_version=" << start_resp.alter_version() << ", job_id=" << _job_id;
sc_job->set_alter_version(start_resp.alter_version());

DBUG_EXECUTE_IF("SchemaChangeJob.process_alter_tablet.sleep",
{ std::this_thread::sleep_for(std::chrono::seconds(120)); });
// FIXME(cyx): Should trigger compaction on base_tablet if there are too many rowsets to convert.

// Create a new tablet schema, should merge with dropped columns in light weight schema change
Expand Down Expand Up @@ -351,6 +349,7 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
_output_cumulative_point = std::min(_output_cumulative_point, sc_job->alter_version() + 1);
sc_job->set_output_cumulative_point(_output_cumulative_point);

DBUG_EXECUTE_IF("CloudSchemaChangeJob.process_alter_tablet.sleep", DBUG_BLOCK);
// process delete bitmap if the table is MOW
if (_new_tablet->enable_unique_key_merge_on_write()) {
int64_t initiator = boost::uuids::hash_value(UUIDGenerator::instance()->next_uuid()) &
Expand Down
2 changes: 1 addition & 1 deletion cloud/src/meta-service/meta_service_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ void process_compaction_job(MetaServiceCode& code, std::string& msg, std::string
}

bool abort_compaction = false;
if (recorded_job.has_schema_change() &&
if (recorded_job.has_schema_change() && request->action() == FinishTabletJobRequest::COMMIT &&
!check_compaction_input_verions(compaction, recorded_job)) {
SS << "Check compaction input versions failed in schema change. input_version_start="
<< compaction.input_versions(0) << " input_version_end=" << compaction.input_versions(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -849,7 +849,6 @@ public void removeSchemaChangeJob(long dbId, long tableId, long indexId, long pa

if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
LOG.warn("dropIndex response: {} ", response);
throw new DdlException(response.getStatus().getMsg());
}
}

Expand Down
4 changes: 4 additions & 0 deletions regression-test/plugins/plugin_curl_requester.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ Suite.metaClass.be_get_overall_compaction_status{ String ip, String port /* par
return curl("GET", String.format("http://%s:%s/api/compaction/run_status", ip, port))
}

Suite.metaClass.be_show_tablet_status{ String ip, String port, String tablet_id /* param */->
return curl("GET", String.format("http://%s:%s/api/compaction/show?tablet_id=%s", ip, port, tablet_id))
}

logger.info("Added 'be_get_compaction_status' function to Suite")

Suite.metaClass._be_run_compaction = { String ip, String port, String tablet_id, String compact_type ->
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// Most of the cases are copied from https://github.com/trinodb/trino/tree/master
// /testing/trino-product-tests/src/main/resources/sql-tests/testcases
// and modified by Doris.

// Note: To filter out tables from sql files, use the following one-liner comamnd
// sed -nr 's/.*tables: (.*)$/\1/gp' /path/to/*.sql | sed -nr 's/,/\n/gp' | sort | uniq

import org.apache.doris.regression.util.DebugPoint

import org.apache.doris.regression.util.NodeType

suite('test_schema_change_with_compaction1', 'nonConcurrent') {
def getJobState = { tableName ->
def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1 """
return jobStateResult[0][9]
}

def s3BucketName = getS3BucketName()
def s3WithProperties = """WITH S3 (
|"AWS_ACCESS_KEY" = "${getS3AK()}",
|"AWS_SECRET_KEY" = "${getS3SK()}",
|"AWS_ENDPOINT" = "${getS3Endpoint()}",
|"AWS_REGION" = "${getS3Region()}",
|"provider" = "${getS3Provider()}")
|PROPERTIES(
|"exec_mem_limit" = "8589934592",
|"load_parallelism" = "3")""".stripMargin()

// set fe configuration
sql "ADMIN SET FRONTEND CONFIG ('max_bytes_per_broker_scanner' = '161061273600')"
sql new File("""${context.file.parent}/../ddl/date_delete.sql""").text
def load_date_once = { String table ->
def uniqueID = Math.abs(UUID.randomUUID().hashCode()).toString()
def loadLabel = table + "_" + uniqueID
// load data from cos
def loadSql = new File("""${context.file.parent}/../ddl/${table}_load.sql""").text.replaceAll("\\\$\\{s3BucketName\\}", s3BucketName)
loadSql = loadSql.replaceAll("\\\$\\{loadLabel\\}", loadLabel) + s3WithProperties
sql loadSql

// check load state
while (true) {
def stateResult = sql "show load where Label = '${loadLabel}'"
def loadState = stateResult[stateResult.size() - 1][2].toString()
if ("CANCELLED".equalsIgnoreCase(loadState)) {
throw new IllegalStateException("load ${loadLabel} failed.")
} else if ("FINISHED".equalsIgnoreCase(loadState)) {
break
}
sleep(5000)
}
}

sql new File("""${context.file.parent}/../ddl/date_create.sql""").text
def injectName = 'CloudSchemaChangeJob.process_alter_tablet.sleep'
def injectBe = null
def backends = sql_return_maparray('show backends')
def array = sql_return_maparray("SHOW TABLETS FROM date")
def injectBeId = array[0].BackendId
def originTabletId = array[0].TabletId
injectBe = backends.stream().filter(be -> be.BackendId == injectBeId).findFirst().orElse(null)
assertNotNull(injectBe)

def load_delete_compaction = {
load_date_once("date");
sql "delete from date where d_datekey < 19900000"
sql "select count(*) from date"
// cu compaction
logger.info("run compaction:" + originTabletId)
(code, out, err) = be_run_cumulative_compaction(injectBe.Host, injectBe.HttpPort, originTabletId)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
boolean running = true
do {
Thread.sleep(100)
(code, out, err) = be_get_compaction_status(injectBe.Host, injectBe.HttpPort, originTabletId)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
}

try {
load_delete_compaction()
load_delete_compaction()
load_delete_compaction()

load_date_once("date");

sleep(1000)

DebugPoint.enableDebugPoint(injectBe.Host, injectBe.HttpPort.toInteger(), NodeType.BE, injectName)
sql "ALTER TABLE date MODIFY COLUMN d_holidayfl bigint(11)"
sleep(5000)
array = sql_return_maparray("SHOW TABLETS FROM date")

for (int i = 0; i < 5; i++) {
load_date_once("date");
}

// base compaction
logger.info("run compaction:" + originTabletId)
(code, out, err) = be_run_base_compaction(injectBe.Host, injectBe.HttpPort, originTabletId)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)


// wait for all compactions done
boolean running = true
while (running) {
Thread.sleep(100)
(code, out, err) = be_get_compaction_status(injectBe.Host, injectBe.HttpPort, originTabletId)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
}
def newTabletId = array[1].TabletId
logger.info("run compaction:" + newTabletId)
(code, out, err) = be_run_base_compaction(injectBe.Host, injectBe.HttpPort, newTabletId)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
assertTrue(out.contains("invalid tablet state."))


// cu compaction
for (int i = 0; i < array.size(); i++) {
tabletId = array[i].TabletId
logger.info("run compaction:" + tabletId)
(code, out, err) = be_run_cumulative_compaction(injectBe.Host, injectBe.HttpPort, tabletId)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
}

for (int i = 0; i < array.size(); i++) {
running = true
do {
Thread.sleep(100)
tabletId = array[i].TabletId
(code, out, err) = be_get_compaction_status(injectBe.Host, injectBe.HttpPort, tabletId)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
}
} finally {
if (injectBe != null) {
DebugPoint.disableDebugPoint(injectBe.Host, injectBe.HttpPort.toInteger(), NodeType.BE, injectName)
}
int max_try_time = 3000
while (max_try_time--){
result = getJobState("date")
if (result == "FINISHED" || result == "CANCELLED") {
sleep(3000)
break
} else {
sleep(100)
if (max_try_time < 1){
assertEquals(1,2)
}
}
}
assertEquals(result, "FINISHED");
def count = sql """ select count(*) from date; """
assertEquals(count[0][0], 23004);
// check rowsets
logger.info("run show:" + originTabletId)
(code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, originTabletId)
logger.info("Run show: code=" + code + ", out=" + out + ", err=" + err)
assertTrue(out.contains("[0-1]"))
assertTrue(out.contains("[2-7]"))
assertTrue(out.contains("[8-8]"))
assertTrue(out.contains("[9-13]"))

logger.info("run show:" + newTabletId)
(code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, newTabletId)
logger.info("Run show: code=" + code + ", out=" + out + ", err=" + err)
assertTrue(out.contains("[0-1]"))
assertTrue(out.contains("[2-2]"))
assertTrue(out.contains("[7-7]"))
assertTrue(out.contains("[8-8]"))
assertTrue(out.contains("[9-13]"))

// base compaction
logger.info("run compaction:" + newTabletId)
(code, out, err) = be_run_base_compaction(injectBe.Host, injectBe.HttpPort, newTabletId)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)


// wait for all compactions done
boolean running = true
while (running) {
Thread.sleep(100)
(code, out, err) = be_get_compaction_status(injectBe.Host, injectBe.HttpPort, newTabletId)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
}

logger.info("run show:" + newTabletId)
(code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, newTabletId)
logger.info("Run show: code=" + code + ", out=" + out + ", err=" + err)
assertTrue(out.contains("[0-1]"))
assertTrue(out.contains("[2-7]"))
assertTrue(out.contains("[8-8]"))
assertTrue(out.contains("[9-13]"))

for (int i = 0; i < 3; i++) {
load_date_once("date");
}

sql """ select count(*) from date """

logger.info("run compaction:" + newTabletId)
(code, out, err) = be_run_cumulative_compaction(injectBe.Host, injectBe.HttpPort, newTabletId)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)

// wait for all compactions done
running = true
while (running) {
Thread.sleep(100)
(code, out, err) = be_get_compaction_status(injectBe.Host, injectBe.HttpPort, newTabletId)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
}

logger.info("run show:" + newTabletId)
(code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, newTabletId)
logger.info("Run show: code=" + code + ", out=" + out + ", err=" + err)
assertTrue(out.contains("[0-1]"))
assertTrue(out.contains("[2-7]"))
assertTrue(out.contains("[8-16]"))
}

}
Loading

0 comments on commit b98e8ed

Please sign in to comment.