Skip to content

Commit

Permalink
fix schema change test
Browse files Browse the repository at this point in the history
  • Loading branch information
Lchangliang committed Jul 10, 2022
1 parent db42352 commit 47c163e
Show file tree
Hide file tree
Showing 16 changed files with 650 additions and 47 deletions.
8 changes: 4 additions & 4 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,9 @@ Status Compaction::do_compaction_impl(int64_t permits) {
LOG(INFO) << "start " << merge_type << compaction_name() << ". tablet=" << _tablet->full_name()
<< ", output_version=" << _output_version << ", permits: " << permits;
// get cur schema if rowset schema exist, rowset schema must be newer than tablet schema
const TabletSchema* cur_tablet_schema = &_tablet->tablet_schema();
const TabletSchema cur_tablet_schema = _tablet->tablet_schema();

RETURN_NOT_OK(construct_output_rowset_writer(cur_tablet_schema));
RETURN_NOT_OK(construct_output_rowset_writer(&cur_tablet_schema));
RETURN_NOT_OK(construct_input_rowset_readers());
TRACE("prepare finished");

Expand All @@ -157,10 +157,10 @@ Status Compaction::do_compaction_impl(int64_t permits) {
Status res;

if (use_vectorized_compaction) {
res = Merger::vmerge_rowsets(_tablet, compaction_type(), cur_tablet_schema,
res = Merger::vmerge_rowsets(_tablet, compaction_type(), &cur_tablet_schema,
_input_rs_readers, _output_rs_writer.get(), &stats);
} else {
res = Merger::merge_rowsets(_tablet, compaction_type(), cur_tablet_schema,
res = Merger::merge_rowsets(_tablet, compaction_type(), &cur_tablet_schema,
_input_rs_readers, _output_rs_writer.get(), &stats);
}

Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/snapshot_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ Status SnapshotManager::_rename_rowset_id(const RowsetMetaPB& rs_meta_pb,
context.tablet_schema_hash = org_rowset_meta->tablet_schema_hash();
context.rowset_type = org_rowset_meta->rowset_type();
context.tablet_path = new_tablet_path;
context.tablet_schema = &tablet_schema;
context.tablet_schema =
org_rowset_meta->tablet_schema() ? org_rowset_meta->tablet_schema() : &tablet_schema;
context.rowset_state = org_rowset_meta->rowset_state();
context.version = org_rowset_meta->version();
context.oldest_write_timestamp = org_rowset_meta->oldest_write_timestamp();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !test_partition_schema_change --
0

-- !test_partition_schema_change_2 --
0

-- !test_partition_schema_change_3 --
1

-- !test_partition_schema_change_4 --
1

-- !test_partition_schema_change_5 --
1

-- !test_partition_schema_change_6 --
1 2017-01-02 Beijing 10 1 2017-01-02T00:00 1 30 20
1 2017-02-02 Beijing 10 1 2017-02-02T00:00 1 30 20
1 2017-03-02 Beijing 10 1 2017-03-02T00:00 1 30 20

-- !test_partition_schema_change_7 --
0

-- !test_partition_schema_change_8 --
1

-- !test_partition_schema_change_9 --
1 2017-01-02 Beijing 10 1 2017-01-02T00:00 1 30 20 1
1 2017-02-02 Beijing 10 1 2017-02-02T00:00 1 30 20 1
2 2017-02-03 Beijing 10 1 2017-02-02T00:00 1 30 20 2
1 2017-03-02 Beijing 10 1 2017-03-02T00:00 1 30 20 1

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,42 @@ suite ("test_agg_keys_schema_change") {
def tableName = "schema_change_agg_keys_regression_test"

try {

String[][] backends = sql """ show backends; """
assertTrue(backends.size() > 0)
String backend_id;
def backendId_to_backendIP = [:]
def backendId_to_backendHttpPort = [:]
for (String[] backend in backends) {
backendId_to_backendIP.put(backend[0], backend[2])
backendId_to_backendHttpPort.put(backend[0], backend[5])
}

backend_id = backendId_to_backendIP.keySet()[0]
StringBuilder showConfigCommand = new StringBuilder();
showConfigCommand.append("curl -X GET http://")
showConfigCommand.append(backendId_to_backendIP.get(backend_id))
showConfigCommand.append(":")
showConfigCommand.append(backendId_to_backendHttpPort.get(backend_id))
showConfigCommand.append("/api/show_config")
logger.info(showConfigCommand.toString())
def process = showConfigCommand.toString().execute()
int code = process.waitFor()
String err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())));
String out = process.getText()
logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def configList = parseJson(out.trim())
assert configList instanceof List

boolean disableAutoCompaction = true
for (Object ele in (List) configList) {
assert ele instanceof List<String>
if (((List<String>) ele)[0] == "disable_auto_compaction") {
disableAutoCompaction = Boolean.parseBoolean(((List<String>) ele)[2])
}
}

sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE ${tableName} (
Expand Down Expand Up @@ -151,10 +187,13 @@ suite ("test_agg_keys_schema_change") {
String[][] tablets = sql """ show tablets from ${tableName}; """
for (String[] tablet in tablets) {
String tablet_id = tablet[0]
backend_id = tablet[2]
logger.info("run compaction:" + tablet_id)
StringBuilder sb = new StringBuilder();
sb.append("curl -X POST http://")
sb.append(context.config.beHttpAddress)
sb.append(backendId_to_backendIP.get(backend_id))
sb.append(":")
sb.append(backendId_to_backendHttpPort.get(backend_id))
sb.append("/api/compaction/run?tablet_id=")
sb.append(tablet_id)
sb.append("&compact_type=cumulative")
Expand All @@ -173,9 +212,12 @@ suite ("test_agg_keys_schema_change") {
do {
Thread.sleep(1000)
String tablet_id = tablet[0]
backend_id = tablet[2]
StringBuilder sb = new StringBuilder();
sb.append("curl -X GET http://")
sb.append(context.config.beHttpAddress)
sb.append(backendId_to_backendIP.get(backend_id))
sb.append(":")
sb.append(backendId_to_backendHttpPort.get(backend_id))
sb.append("/api/compaction/run_status?tablet_id=")
sb.append(tablet_id)

Expand Down Expand Up @@ -203,9 +245,12 @@ suite ("test_agg_keys_schema_change") {
int rowCount = 0
for (String[] tablet in tablets) {
String tablet_id = tablet[0]
backend_id = tablet[2]
StringBuilder sb = new StringBuilder();
sb.append("curl -X GET http://")
sb.append(context.config.beHttpAddress)
sb.append(backendId_to_backendIP.get(backend_id))
sb.append(":")
sb.append(backendId_to_backendHttpPort.get(backend_id))
sb.append("/api/compaction/show?tablet_id=")
sb.append(tablet_id)
String command = sb.toString()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,41 @@ suite ("test_agg_mv_schema_change") {
def tableName = "schema_change_agg_mv_regression_test"

try {
String[][] backends = sql """ show backends; """
assertTrue(backends.size() > 0)
String backend_id;
def backendId_to_backendIP = [:]
def backendId_to_backendHttpPort = [:]
for (String[] backend in backends) {
backendId_to_backendIP.put(backend[0], backend[2])
backendId_to_backendHttpPort.put(backend[0], backend[5])
}

backend_id = backendId_to_backendIP.keySet()[0]
StringBuilder showConfigCommand = new StringBuilder();
showConfigCommand.append("curl -X GET http://")
showConfigCommand.append(backendId_to_backendIP.get(backend_id))
showConfigCommand.append(":")
showConfigCommand.append(backendId_to_backendHttpPort.get(backend_id))
showConfigCommand.append("/api/show_config")
logger.info(showConfigCommand.toString())
def process = showConfigCommand.toString().execute()
int code = process.waitFor()
String err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())));
String out = process.getText()
logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def configList = parseJson(out.trim())
assert configList instanceof List

boolean disableAutoCompaction = true
for (Object ele in (List) configList) {
assert ele instanceof List<String>
if (((List<String>) ele)[0] == "disable_auto_compaction") {
disableAutoCompaction = Boolean.parseBoolean(((List<String>) ele)[2])
}
}

sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE ${tableName} (
Expand Down Expand Up @@ -131,10 +166,13 @@ suite ("test_agg_mv_schema_change") {
String[][] tablets = sql """ show tablets from ${tableName}; """
for (String[] tablet in tablets) {
String tablet_id = tablet[0]
backend_id = tablet[2]
logger.info("run compaction:" + tablet_id)
StringBuilder sb = new StringBuilder();
sb.append("curl -X POST http://")
sb.append(context.config.beHttpAddress)
sb.append(backendId_to_backendIP.get(backend_id))
sb.append(":")
sb.append(backendId_to_backendHttpPort.get(backend_id))
sb.append("/api/compaction/run?tablet_id=")
sb.append(tablet_id)
sb.append("&compact_type=cumulative")
Expand All @@ -154,9 +192,12 @@ suite ("test_agg_mv_schema_change") {
do {
Thread.sleep(1000)
String tablet_id = tablet[0]
backend_id = tablet[2]
StringBuilder sb = new StringBuilder();
sb.append("curl -X GET http://")
sb.append(context.config.beHttpAddress)
sb.append(backendId_to_backendIP.get(backend_id))
sb.append(":")
sb.append(backendId_to_backendHttpPort.get(backend_id))
sb.append("/api/compaction/run_status?tablet_id=")
sb.append(tablet_id)

Expand Down Expand Up @@ -184,9 +225,12 @@ suite ("test_agg_mv_schema_change") {
int rowCount = 0
for (String[] tablet in tablets) {
String tablet_id = tablet[0]
backend_id = tablet[2]
StringBuilder sb = new StringBuilder();
sb.append("curl -X GET http://")
sb.append(context.config.beHttpAddress)
sb.append(backendId_to_backendIP.get(backend_id))
sb.append(":")
sb.append(backendId_to_backendHttpPort.get(backend_id))
sb.append("/api/compaction/show?tablet_id=")
sb.append(tablet_id)
String command = sb.toString()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,42 @@ suite ("test_agg_rollup_schema_change") {
def tableName = "schema_change_agg_rollup_regression_test"

try {

String[][] backends = sql """ show backends; """
assertTrue(backends.size() > 0)
String backend_id;
def backendId_to_backendIP = [:]
def backendId_to_backendHttpPort = [:]
for (String[] backend in backends) {
backendId_to_backendIP.put(backend[0], backend[2])
backendId_to_backendHttpPort.put(backend[0], backend[5])
}

backend_id = backendId_to_backendIP.keySet()[0]
StringBuilder showConfigCommand = new StringBuilder();
showConfigCommand.append("curl -X GET http://")
showConfigCommand.append(backendId_to_backendIP.get(backend_id))
showConfigCommand.append(":")
showConfigCommand.append(backendId_to_backendHttpPort.get(backend_id))
showConfigCommand.append("/api/show_config")
logger.info(showConfigCommand.toString())
def process = showConfigCommand.toString().execute()
int code = process.waitFor()
String err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())));
String out = process.getText()
logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def configList = parseJson(out.trim())
assert configList instanceof List

boolean disableAutoCompaction = true
for (Object ele in (List) configList) {
assert ele instanceof List<String>
if (((List<String>) ele)[0] == "disable_auto_compaction") {
disableAutoCompaction = Boolean.parseBoolean(((List<String>) ele)[2])
}
}

sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE ${tableName} (
Expand Down Expand Up @@ -131,10 +167,13 @@ suite ("test_agg_rollup_schema_change") {
String[][] tablets = sql """ show tablets from ${tableName}; """
for (String[] tablet in tablets) {
String tablet_id = tablet[0]
backend_id = tablet[2]
logger.info("run compaction:" + tablet_id)
StringBuilder sb = new StringBuilder();
sb.append("curl -X POST http://")
sb.append(context.config.beHttpAddress)
sb.append(backendId_to_backendIP.get(backend_id))
sb.append(":")
sb.append(backendId_to_backendHttpPort.get(backend_id))
sb.append("/api/compaction/run?tablet_id=")
sb.append(tablet_id)
sb.append("&compact_type=cumulative")
Expand All @@ -154,9 +193,12 @@ suite ("test_agg_rollup_schema_change") {
do {
Thread.sleep(1000)
String tablet_id = tablet[0]
backend_id = tablet[2]
StringBuilder sb = new StringBuilder();
sb.append("curl -X GET http://")
sb.append(context.config.beHttpAddress)
sb.append(backendId_to_backendIP.get(backend_id))
sb.append(":")
sb.append(backendId_to_backendHttpPort.get(backend_id))
sb.append("/api/compaction/run_status?tablet_id=")
sb.append(tablet_id)

Expand Down Expand Up @@ -184,9 +226,12 @@ suite ("test_agg_rollup_schema_change") {
int rowCount = 0
for (String[] tablet in tablets) {
String tablet_id = tablet[0]
backend_id = tablet[2]
StringBuilder sb = new StringBuilder();
sb.append("curl -X GET http://")
sb.append(context.config.beHttpAddress)
sb.append(backendId_to_backendIP.get(backend_id))
sb.append(":")
sb.append(backendId_to_backendHttpPort.get(backend_id))
sb.append("/api/compaction/show?tablet_id=")
sb.append(tablet_id)
String command = sb.toString()
Expand Down
Loading

0 comments on commit 47c163e

Please sign in to comment.