diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 083b9f06c9491d..ba7abd05cf5c5c 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -990,7 +990,7 @@ DEFINE_Bool(hide_webserver_config_page, "false"); DEFINE_Bool(enable_segcompaction, "true"); // Max number of segments allowed in a single segcompaction task. -DEFINE_Int32(segcompaction_batch_size, "10"); +DEFINE_mInt32(segcompaction_batch_size, "10"); // Max row count allowed in a single source segment, bigger segments will be skipped. DEFINE_Int32(segcompaction_candidate_max_rows, "1048576"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 1e3d57ff763417..5d21e5e7115fb4 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1040,7 +1040,7 @@ DECLARE_Bool(hide_webserver_config_page); DECLARE_Bool(enable_segcompaction); // Max number of segments allowed in a single segcompaction task. -DECLARE_Int32(segcompaction_batch_size); +DECLARE_mInt32(segcompaction_batch_size); // Max row count allowed in a single source segment, bigger segments will be skipped. DECLARE_Int32(segcompaction_candidate_max_rows); diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index a4720f89d19be6..5f4bfa36c4dd4b 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -364,7 +364,7 @@ void BaseTablet::generate_tablet_meta_copy_unlocked(TabletMeta& new_tablet_meta) } Status BaseTablet::calc_delete_bitmap_between_segments( - RowsetSharedPtr rowset, const std::vector& segments, + RowsetId rowset_id, const std::vector& segments, DeleteBitmapPtr delete_bitmap) { size_t const num_segments = segments.size(); if (num_segments < 2) { @@ -372,7 +372,6 @@ Status BaseTablet::calc_delete_bitmap_between_segments( } OlapStopWatch watch; - auto const rowset_id = rowset->rowset_id(); size_t seq_col_length = 0; if (_tablet_meta->tablet_schema()->has_sequence_col()) { auto seq_col_idx = _tablet_meta->tablet_schema()->sequence_col_idx(); @@ -1690,7 +1689,8 @@ Status BaseTablet::update_delete_bitmap_without_lock( // calculate delete bitmap between segments if necessary. DeleteBitmapPtr delete_bitmap = std::make_shared(self->tablet_id()); - RETURN_IF_ERROR(self->calc_delete_bitmap_between_segments(rowset, segments, delete_bitmap)); + RETURN_IF_ERROR(self->calc_delete_bitmap_between_segments(rowset->rowset_id(), segments, + delete_bitmap)); // get all base rowsets to calculate on std::vector specified_rowsets; diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h index c6de447200f87c..9c6b63a00fe5bb 100644 --- a/be/src/olap/base_tablet.h +++ b/be/src/olap/base_tablet.h @@ -181,7 +181,7 @@ class BaseTablet { DeleteBitmapPtr tablet_delete_bitmap = nullptr); Status calc_delete_bitmap_between_segments( - RowsetSharedPtr rowset, const std::vector& segments, + RowsetId rowset_id, const std::vector& segments, DeleteBitmapPtr delete_bitmap); static Status commit_phase_update_delete_bitmap( diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index dc155efe0165bc..dca20b13fe726c 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -583,7 +583,6 @@ Status BetaRowsetWriter::_segcompaction_if_necessary() { Status status = Status::OK(); // if not doing segcompaction, just check segment number if (!config::enable_segcompaction || !_context.enable_segcompaction || - !_context.tablet_schema->cluster_key_uids().empty() || _context.tablet_schema->num_variant_columns() > 0) { return _check_segment_number_limit(_num_segment); } diff --git a/be/src/olap/rowset/segcompaction.cpp b/be/src/olap/rowset/segcompaction.cpp index 427236a6119673..bf12ce8cbbc366 100644 --- a/be/src/olap/rowset/segcompaction.cpp +++ b/be/src/olap/rowset/segcompaction.cpp @@ -86,7 +86,7 @@ Status SegcompactionWorker::_get_segcompaction_reader( SegCompactionCandidatesSharedPtr segments, TabletSharedPtr tablet, std::shared_ptr schema, OlapReaderStatistics* stat, vectorized::RowSourcesBuffer& row_sources_buf, bool is_key, - std::vector& return_columns, + std::vector& return_columns, std::vector& key_group_cluster_key_idxes, std::unique_ptr* reader) { const auto& ctx = _writer->_context; bool record_rowids = need_convert_delete_bitmap() && is_key; @@ -95,6 +95,19 @@ Status SegcompactionWorker::_get_segcompaction_reader( read_options.use_page_cache = false; read_options.tablet_schema = ctx.tablet_schema; read_options.record_rowids = record_rowids; + if (!tablet->tablet_schema()->cluster_key_uids().empty()) { + DeleteBitmapPtr delete_bitmap = std::make_shared(tablet->tablet_id()); + RETURN_IF_ERROR(tablet->calc_delete_bitmap_between_segments(ctx.rowset_id, *segments, + delete_bitmap)); + for (auto& seg_ptr : *segments) { + auto d = delete_bitmap->get_agg( + {ctx.rowset_id, seg_ptr->id(), DeleteBitmap::TEMP_VERSION_COMMON}); + if (d->isEmpty()) { + continue; // Empty delete bitmap for the segment + } + read_options.delete_bitmap.emplace(seg_ptr->id(), std::move(d)); + } + } std::vector> seg_iterators; std::map segment_rows; for (auto& seg_ptr : *segments) { @@ -123,6 +136,7 @@ Status SegcompactionWorker::_get_segcompaction_reader( reader_params.is_key_column_group = is_key; reader_params.use_page_cache = false; reader_params.record_rowids = record_rowids; + reader_params.key_group_cluster_key_idxes = key_group_cluster_key_idxes; return (*reader)->init(reader_params, nullptr); } @@ -190,8 +204,9 @@ Status SegcompactionWorker::_delete_original_segments(uint32_t begin, uint32_t e Status SegcompactionWorker::_check_correctness(OlapReaderStatistics& reader_stat, Merger::Statistics& merger_stat, uint32_t begin, - uint32_t end) { + uint32_t end, bool is_mow_with_cluster_keys) { uint64_t raw_rows_read = reader_stat.raw_rows_read; /* total rows read before merge */ + uint64_t rows_del_by_bitmap = reader_stat.rows_del_by_bitmap; uint64_t sum_src_row = 0; /* sum of rows in each involved source segments */ uint64_t filtered_rows = merger_stat.filtered_rows; /* rows filtered by del conditions */ uint64_t output_rows = merger_stat.output_rows; /* rows after merge */ @@ -205,11 +220,15 @@ Status SegcompactionWorker::_check_correctness(OlapReaderStatistics& reader_stat } DBUG_EXECUTE_IF("SegcompactionWorker._check_correctness_wrong_sum_src_row", { sum_src_row++; }); - if (raw_rows_read != sum_src_row) { + uint64_t raw_rows = raw_rows_read; + if (is_mow_with_cluster_keys) { + raw_rows += rows_del_by_bitmap; + } + if (raw_rows != sum_src_row) { return Status::Error( "segcompaction read row num does not match source. expect read row:{}, actual read " - "row:{}", - sum_src_row, raw_rows_read); + "row:{}(raw_rows_read: {}, rows_del_by_bitmap: {})", + sum_src_row, raw_rows, raw_rows_read, rows_del_by_bitmap); } DBUG_EXECUTE_IF("SegcompactionWorker._check_correctness_wrong_merged_rows", { merged_rows++; }); @@ -281,8 +300,9 @@ Status SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPt auto schema = std::make_shared(ctx.tablet_schema->columns(), column_ids); OlapReaderStatistics reader_stats; std::unique_ptr reader; - auto s = _get_segcompaction_reader(segments, tablet, schema, &reader_stats, row_sources_buf, - is_key, column_ids, &reader); + auto s = + _get_segcompaction_reader(segments, tablet, schema, &reader_stats, row_sources_buf, + is_key, column_ids, key_group_cluster_key_idxes, &reader); if (UNLIKELY(reader == nullptr || !s.ok())) { return Status::Error( "failed to get segcompaction reader. err: {}", s.to_string()); @@ -303,9 +323,10 @@ Status SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPt } /* check row num after merge/aggregation */ - RETURN_NOT_OK_STATUS_WITH_WARN( - _check_correctness(key_reader_stats, key_merger_stats, begin, end), - "check correctness failed"); + bool is_mow_with_cluster_keys = !tablet->tablet_schema()->cluster_key_uids().empty(); + RETURN_NOT_OK_STATUS_WITH_WARN(_check_correctness(key_reader_stats, key_merger_stats, begin, + end, is_mow_with_cluster_keys), + "check correctness failed"); { std::lock_guard lock(_writer->_segid_statistics_map_mutex); _writer->_clear_statistics_for_deleting_segments_unsafe(begin, end); diff --git a/be/src/olap/rowset/segcompaction.h b/be/src/olap/rowset/segcompaction.h index 5ec74c0e660963..0279b5bb653c6e 100644 --- a/be/src/olap/rowset/segcompaction.h +++ b/be/src/olap/rowset/segcompaction.h @@ -87,12 +87,13 @@ class SegcompactionWorker { OlapReaderStatistics* stat, vectorized::RowSourcesBuffer& row_sources_buf, bool is_key, std::vector& return_columns, + std::vector& key_group_cluster_key_idxes, std::unique_ptr* reader); std::unique_ptr _create_segcompaction_writer(uint32_t begin, uint32_t end); Status _delete_original_segments(uint32_t begin, uint32_t end); Status _check_correctness(OlapReaderStatistics& reader_stat, Merger::Statistics& merger_stat, - uint32_t begin, uint32_t end); + uint32_t begin, uint32_t end, bool is_mow_with_cluster_keys); Status _do_compact_segments(SegCompactionCandidatesSharedPtr segments); private: diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp index ccc006e1f040a6..1bbacc0b1236f6 100644 --- a/be/src/olap/rowset_builder.cpp +++ b/be/src/olap/rowset_builder.cpp @@ -286,8 +286,8 @@ Status BaseRowsetBuilder::submit_calc_delete_bitmap_task() { RETURN_IF_ERROR(beta_rowset->load_segments(&segments)); if (segments.size() > 1) { // calculate delete bitmap between segments - RETURN_IF_ERROR( - _tablet->calc_delete_bitmap_between_segments(_rowset, segments, _delete_bitmap)); + RETURN_IF_ERROR(_tablet->calc_delete_bitmap_between_segments(_rowset->rowset_id(), segments, + _delete_bitmap)); } // For partial update, we need to fill in the entire row of data, during the calculation diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index d74a9cd2e0b181..55e18b4deb8544 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -551,8 +551,8 @@ void _ingest_binlog(StorageEngine& engine, IngestBinlogArg* arg) { } if (segments.size() > 1) { // calculate delete bitmap between segments - status = local_tablet->calc_delete_bitmap_between_segments(rowset, segments, - delete_bitmap); + status = local_tablet->calc_delete_bitmap_between_segments(rowset->rowset_id(), + segments, delete_bitmap); if (!status) { LOG(WARNING) << "failed to calculate delete bitmap" << ". tablet_id: " << local_tablet->tablet_id() diff --git a/regression-test/data/segcompaction_p2/test_segcompaction_unique_keys_mow_ck.out b/regression-test/data/segcompaction_p2/test_segcompaction_unique_keys_mow_ck.out new file mode 100644 index 00000000000000..1d49e6a589aabb --- /dev/null +++ b/regression-test/data/segcompaction_p2/test_segcompaction_unique_keys_mow_ck.out @@ -0,0 +1,6 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_default -- +47 Lychee Lychee Plum Banana Lychee Lychee Cherry Pineapple Banana Watermelon Mango Apple Apple Peach Raspberry Grapes Raspberry Raspberry Kiwi Orange Apple Plum Blueberry Strawberry Orange Raspberry Strawberry Lemon Orange Blueberry Apple Peach Banana Kiwi Orange Banana Strawberry Lemon Mango Orange Peach Avocado Pineapple Kiwi Lemon Grapes Strawberry Grapes Lychee + +-- !select_default -- +47 Lychee Lychee Plum Banana Lychee Lychee Cherry Pineapple Banana Watermelon Mango Apple Apple Peach Raspberry Grapes Raspberry Raspberry Kiwi Orange Apple Plum Blueberry Strawberry Orange Raspberry Strawberry Lemon Orange Blueberry Apple Peach Banana Kiwi Orange Banana Strawberry Lemon Mango Orange Peach Avocado Pineapple Kiwi Lemon Grapes Strawberry Grapes Lychee diff --git a/regression-test/data/unique_with_mow_c_p0/test_compact_seg.out b/regression-test/data/unique_with_mow_c_p0/test_compact_seg.out new file mode 100644 index 00000000000000..17c30d14aa5b43 --- /dev/null +++ b/regression-test/data/unique_with_mow_c_p0/test_compact_seg.out @@ -0,0 +1,19 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select1 -- +12345 23083 30920 40410 + +-- !select2 -- +17320 24209 30795 40000 + +-- !select3 -- +59832 36673 30343 40299 + +-- !select1 -- +12345 23083 30920 40410 + +-- !select2 -- +17320 24209 30795 40782 + +-- !select3 -- +59832 36673 30586 40739 + diff --git a/regression-test/suites/segcompaction_p2/test_segcompaction_unique_keys_mow.groovy b/regression-test/suites/segcompaction_p2/test_segcompaction_unique_keys_mow.groovy index cc7a40c20aca91..4b6544306fb755 100644 --- a/regression-test/suites/segcompaction_p2/test_segcompaction_unique_keys_mow.groovy +++ b/regression-test/suites/segcompaction_p2/test_segcompaction_unique_keys_mow.groovy @@ -92,8 +92,20 @@ suite("test_segcompaction_unique_keys_mow") { qt_select_default """ SELECT * FROM ${tableName} WHERE col_0=47; """ - String[][] tablets = sql """ show tablets from ${tableName}; """ + def row_count = sql """ SELECT count(*) FROM ${tableName}; """ + logger.info("row_count: " + row_count) + assertEquals(4999989, row_count[0][0]) + def result = sql """ select col_0, count(*) a from ${tableName} group by col_0 having a > 1; """ + logger.info("duplicated keys: " + result) + assertTrue(result.size() == 0, "There are duplicate keys in the table") + + def tablets = sql_return_maparray """ show tablets from ${tableName}; """ + for (def tablet in tablets) { + def (code, out, err) = curl("GET", tablet.CompactionStatus) + logger.info("Show tablet status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + } } finally { try_sql("DROP TABLE IF EXISTS ${tableName}") } diff --git a/regression-test/suites/segcompaction_p2/test_segcompaction_unique_keys_mow_ck.groovy b/regression-test/suites/segcompaction_p2/test_segcompaction_unique_keys_mow_ck.groovy new file mode 100644 index 00000000000000..6fa8f6bbcb4d94 --- /dev/null +++ b/regression-test/suites/segcompaction_p2/test_segcompaction_unique_keys_mow_ck.groovy @@ -0,0 +1,116 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite("test_segcompaction_unique_keys_mow_ck") { + for (int i = 0; i < 2; i++) { + def tableName = "segcompaction_unique_keys_regression_test_mow_ck_" + i + String ak = getS3AK() + String sk = getS3SK() + String endpoint = getS3Endpoint() + String region = getS3Region() + String bucket = getS3BucketName() + + + try { + + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `col_0` BIGINT NOT NULL,`col_1` VARCHAR(20),`col_2` VARCHAR(20),`col_3` VARCHAR(20),`col_4` VARCHAR(20), + `col_5` VARCHAR(20),`col_6` VARCHAR(20),`col_7` VARCHAR(20),`col_8` VARCHAR(20),`col_9` VARCHAR(20), + `col_10` VARCHAR(20),`col_11` VARCHAR(20),`col_12` VARCHAR(20),`col_13` VARCHAR(20),`col_14` VARCHAR(20), + `col_15` VARCHAR(20),`col_16` VARCHAR(20),`col_17` VARCHAR(20),`col_18` VARCHAR(20),`col_19` VARCHAR(20), + `col_20` VARCHAR(20),`col_21` VARCHAR(20),`col_22` VARCHAR(20),`col_23` VARCHAR(20),`col_24` VARCHAR(20), + `col_25` VARCHAR(20),`col_26` VARCHAR(20),`col_27` VARCHAR(20),`col_28` VARCHAR(20),`col_29` VARCHAR(20), + `col_30` VARCHAR(20),`col_31` VARCHAR(20),`col_32` VARCHAR(20),`col_33` VARCHAR(20),`col_34` VARCHAR(20), + `col_35` VARCHAR(20),`col_36` VARCHAR(20),`col_37` VARCHAR(20),`col_38` VARCHAR(20),`col_39` VARCHAR(20), + `col_40` VARCHAR(20),`col_41` VARCHAR(20),`col_42` VARCHAR(20),`col_43` VARCHAR(20),`col_44` VARCHAR(20), + `col_45` VARCHAR(20),`col_46` VARCHAR(20),`col_47` VARCHAR(20),`col_48` VARCHAR(20),`col_49` VARCHAR(20) + ) + UNIQUE KEY(`col_0`) cluster by(`col_1`) DISTRIBUTED BY HASH(`col_0`) BUCKETS 1 + PROPERTIES ( + """ + (i == 1 ? "\"function_column.sequence_col\"='col_0', " : "") + + """ + "replication_num" = "1" + ); + """ + // "enable_unique_key_merge_on_write" = "true" + + + def uuid = UUID.randomUUID().toString().replace("-", "0") + def path = "oss://$bucket/regression/segcompaction_test/segcompaction_test.orc" + + def columns = "col_0, col_1, col_2, col_3, col_4, col_5, col_6, col_7, col_8, col_9, col_10, col_11, col_12, col_13, col_14, col_15, col_16, col_17, col_18, col_19, col_20, col_21, col_22, col_23, col_24, col_25, col_26, col_27, col_28, col_29, col_30, col_31, col_32, col_33, col_34, col_35, col_36, col_37, col_38, col_39, col_40, col_41, col_42, col_43, col_44, col_45, col_46, col_47, col_48, col_49" + String columns_str = ("$columns" != "") ? "($columns)" : ""; + + sql """ + LOAD LABEL $uuid ( + DATA INFILE("s3://$bucket/regression/segcompaction/segcompaction.orc") + INTO TABLE $tableName + FORMAT AS "ORC" + $columns_str + ) + WITH S3 ( + "AWS_ACCESS_KEY" = "$ak", + "AWS_SECRET_KEY" = "$sk", + "AWS_ENDPOINT" = "$endpoint", + "AWS_REGION" = "$region", + "provider" = "${getS3Provider()}" + ) + """ + + def max_try_milli_secs = 3600000 + while (max_try_milli_secs > 0) { + String[][] result = sql """ show load where label="$uuid" order by createtime desc limit 1; """ + if (result[0][2].equals("FINISHED")) { + logger.info("Load FINISHED " + " $uuid") + break; + } + if (result[0][2].equals("CANCELLED")) { + logger.info("Load CANCELLED " + " $uuid") + break; + } + Thread.sleep(1000) + max_try_milli_secs -= 1000 + if(max_try_milli_secs <= 0) { + assertTrue(1 == 2, "load Timeout: $uuid") + } + } + + qt_select_default """ SELECT * FROM ${tableName} WHERE col_0=47; """ + + def row_count = sql """ SELECT count(*) FROM ${tableName}; """ + logger.info("row_count: " + row_count) + assertEquals(4999989, row_count[0][0]) + + def result = sql """ select col_0, count(*) a from ${tableName} group by col_0 having a > 1; """ + logger.info("duplicated keys: " + result) + assertTrue(result.size() == 0, "There are duplicate keys in the table") + + def tablets = sql_return_maparray """ show tablets from ${tableName}; """ + for (def tablet in tablets) { + def (code, out, err) = curl("GET", tablet.CompactionStatus) + logger.info("Show tablet status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + } + } finally { + // try_sql("DROP TABLE IF EXISTS ${tableName}") + } + } +} diff --git a/regression-test/suites/unique_with_mow_c_p0/test_compact_seg.groovy b/regression-test/suites/unique_with_mow_c_p0/test_compact_seg.groovy new file mode 100644 index 00000000000000..230653bcf3bd57 --- /dev/null +++ b/regression-test/suites/unique_with_mow_c_p0/test_compact_seg.groovy @@ -0,0 +1,188 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_compact_seg", "nonConcurrent") { + def tableName = "test_compact_seg" + + def getTabletStatus = { rowsetNum, lastRowsetSegmentNum -> + def tablets = sql_return_maparray """ show tablets from ${tableName}; """ + logger.info("tablets: ${tablets}") + assertEquals(1, tablets.size()) + def tablet = tablets[0] + String compactionUrl = tablet["CompactionStatus"] + def retry = 15 + for (int i = 0; i < retry; i++) { + def (code, out, err) = curl("GET", compactionUrl) + logger.info("Show tablets " + tablet.TabletId + " status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def tabletJson = parseJson(out.trim()) + assert tabletJson.rowsets instanceof List + assertTrue(tabletJson.rowsets.size() >= rowsetNum) + def rowset = tabletJson.rowsets.get(rowsetNum - 1) + logger.info("rowset: ${rowset}") + int start_index = rowset.indexOf("]") + int end_index = rowset.indexOf("DATA") + def segmentNumStr = rowset.substring(start_index + 1, end_index).trim() + logger.info("segmentNumStr: ${segmentNumStr}") + if (Integer.parseInt(segmentNumStr) == lastRowsetSegmentNum) { + break + } + if (i == retry - 1) { + // assertEquals(lastRowsetSegmentNum, Integer.parseInt(segmentNumStr)) + logger.warn("expected segmentNum: ${segmentNumStr}, but get ${lastRowsetSegmentNum} after ${retry} retries") + } + sleep(2000) + } + } + + // batch_size is 4164 in csv_reader.cpp + // _batch_size is 8192 in vtablet_writer.cpp + def doris_scanner_row_bytes_params = get_be_param("doris_scanner_row_bytes") + def segcompaction_batch_size_params = get_be_param("segcompaction_batch_size") + onFinish { + GetDebugPoint().disableDebugPointForAllBEs("MemTable.need_flush") + set_original_be_param("doris_scanner_row_bytes", doris_scanner_row_bytes_params) + set_original_be_param('segcompaction_batch_size', segcompaction_batch_size_params) + } + GetDebugPoint().enableDebugPointForAllBEs("MemTable.need_flush") + set_be_param.call("doris_scanner_row_bytes", "1") + set_be_param.call('segcompaction_batch_size', 5) + + for (int j = 0; j < 2; j++) { + tableName = "test_compact_seg_" + j + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k1` int(11) NULL, + `k2` int(11) NULL, + `v3` int(11) NULL, + `v4` int(11) NULL + ) unique KEY(`k1`, `k2`) + cluster by(`v3`, `v4`) + DISTRIBUTED BY HASH(`k1`) BUCKETS 1 + PROPERTIES ( + """ + (j == 1 ? "\"function_column.sequence_col\"='v4', " : "") + + """ + "replication_num" = "1", + "disable_auto_compaction" = "true" + ); + """ + + streamLoad { + table "${tableName}" + set 'column_separator', ',' + file 'test_schema_change_add_key_column.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(8192, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + } + } + // check generate 3 segments + getTabletStatus(2, 3) + + streamLoad { + table "${tableName}" + set 'column_separator', ',' + file 'test_schema_change_add_key_column1.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(20480, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + } + } + // check generate 2 segments(6 -> 2) + getTabletStatus(3, 2) + + streamLoad { + table "${tableName}" + set 'column_separator', ',' + file 'test_schema_change_add_key_column2.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(20480, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + } + } + // check generate 2 segments(6 -> 2) + getTabletStatus(4, 2) + + streamLoad { + table "${tableName}" + set 'column_separator', ',' + file 'test_schema_change_add_key_column3.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(20480, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + } + } + // check generate 2 segments(6 -> 2) + getTabletStatus(5, 2) + + def rowCount1 = sql """ select count() from ${tableName}; """ + logger.info("rowCount1: ${rowCount1}") + + // get be info + def tablets = sql_return_maparray """ show tablets from ${tableName}; """ + for (def tablet in tablets) { + def (code, out, err) = curl("GET", tablet.CompactionStatus) + logger.info("Show tablet status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + } + + // check generate 1 segments + // getTabletStatus(2, 1) // [2-5] + + // check row count + def rowCount2 = sql """ select count() from ${tableName}; """ + logger.info("rowCount2: ${rowCount2}") + assertEquals(rowCount1[0][0], rowCount2[0][0]) + // check no duplicated key + def result = sql """ select `k1`, `k2`, count(*) a from ${tableName} group by `k1`, `k2` having a > 1; """ + logger.info("result: ${result}") + assertEquals(0, result.size()) + // check one row value + order_qt_select1 """ select * from ${tableName} where `k1` = 12345; """ + order_qt_select2 """ select * from ${tableName} where `k1` = 17320; """ + order_qt_select3 """ select * from ${tableName} where `k1` = 59832 and `k2` = 36673; """ + } +}