Skip to content

Commit

Permalink
[fix](cluster key) fix cluster key in topn
Browse files Browse the repository at this point in the history
  • Loading branch information
mymeiyi committed Dec 2, 2024
1 parent d77bfa0 commit 3b1bcaf
Show file tree
Hide file tree
Showing 10 changed files with 378 additions and 24 deletions.
33 changes: 26 additions & 7 deletions be/src/olap/tablet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -464,13 +464,32 @@ Status TabletReader::_init_orderby_keys_param(const ReaderParams& read_params) {
// UNIQUE_KEYS will compare all keys as before
if (_tablet_schema->keys_type() == DUP_KEYS || (_tablet_schema->keys_type() == UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write())) {
// find index in vector _return_columns
// for the read_orderby_key_num_prefix_columns orderby keys
for (uint32_t i = 0; i < read_params.read_orderby_key_num_prefix_columns; i++) {
for (uint32_t idx = 0; idx < _return_columns.size(); idx++) {
if (_return_columns[idx] == i) {
_orderby_key_columns.push_back(idx);
break;
if (!_tablet_schema->cluster_key_idxes().empty()) {
for (uint32_t i = 0; i < read_params.read_orderby_key_num_prefix_columns; i++) {
auto cid = _tablet_schema->cluster_key_idxes()[i];
auto index = _tablet_schema->field_index(cid);
if (index < 0) {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
"could not find cluster key column with unique_id=" +
std::to_string(cid) + " in tablet schema, tablet_id=" +
std::to_string(_tablet->tablet_id()));
}
for (uint32_t idx = 0; idx < _return_columns.size(); idx++) {
if (_return_columns[idx] == index) {
_orderby_key_columns.push_back(idx);
break;
}
}
}
} else {
// find index in vector _return_columns
// for the read_orderby_key_num_prefix_columns orderby keys
for (uint32_t i = 0; i < read_params.read_orderby_key_num_prefix_columns; i++) {
for (uint32_t idx = 0; idx < _return_columns.size(); idx++) {
if (_return_columns[idx] == i) {
_orderby_key_columns.push_back(idx);
break;
}
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions regression-test/data/compaction/test_full_compaction.out
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,7 @@
2 200
3 0

-- !select_final2 --
1 100
2 200

47 changes: 47 additions & 0 deletions regression-test/data/compaction/test_full_compaction_ck.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !1 --
1 1
2 2

-- !2 --
1 10
2 20

-- !3 --
1 100
2 200

-- !4 --
1 100
2 200
3 300

-- !5 --
1 100
2 200
3 100

-- !6 --
1 100
2 200

-- !skip_delete --
1 1
1 10
1 100
2 2
2 20
2 200
3 100
3 100
3 300

-- !select_final --
1 100
2 200
3 100

-- !select_final2 --
1 100
2 200

Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,6 @@ c double No false \N NONE,STORED GENERATED
b int Yes false \N NONE
d int Yes false \N NONE,STORED GENERATED

-- !test_update --
1

-- !test_update_generated_column --
1 20 21

-- !gen_col_unique_key --
0

Expand Down Expand Up @@ -221,3 +215,9 @@ d int Yes false \N NONE,STORED GENERATED
-- !agg_replace_null --
1 2 3 4 13

-- !test_update --
1

-- !test_update_generated_column --
1 20 21

9 changes: 9 additions & 0 deletions regression-test/data/unique_with_mow_c_p0/test_select.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql0 --
0
can

-- !sql1 --
0
can

Original file line number Diff line number Diff line change
Expand Up @@ -1025,8 +1025,15 @@ class Config {
if (isCKEnabled) {
excludeDirectorySet.add("unique_with_mow_p0/partial_update")
excludeDirectorySet.add("unique_with_mow_p0/flexible")
excludeDirectorySet.add("schema_change_p0/unique_ck")
excludeDirectorySet.add("doc")
List<String> excludeCases = ["test_table_properties", "test_default_hll", "test_default_pi", "test_full_compaction", "test_full_compaction_by_table_id", "test_create_table", "txn_insert", "test_update_mow", "test_new_update", "test_update_unique", "test_partial_update_generated_column", "nereids_partial_update_native_insert_stmt", "partial_update", "nereids_update_on_current_timestamp", "update_on_current_timestamp", "test_default_bitmap_empty", "nereids_delete_mow_partial_update", "delete_mow_partial_update", "partial_update_seq_col", "nereids_partial_update_native_insert_stmt_complex", "regression_test_variant_delete_and_update", "test_unique_table_auto_inc_partial_update_correct_stream_load", "test_unique_table_auto_inc", "test_unique_table_auto_inc_partial_update_correct_insert", "test_update_schema_change"]
List<String> excludeCases = ["test_table_properties", "test_create_table"
, "test_default_hll", "test_default_pi", "test_default_bitmap_empty"
// partial update
, "txn_insert", "test_update_schema_change", "test_generated_column_update", "test_nested_type_with_rowstore", "test_partial_update_generated_column", "nereids_partial_update_native_insert_stmt"
, "partial_update", "nereids_update_on_current_timestamp", "update_on_current_timestamp", "nereids_delete_mow_partial_update", "delete_mow_partial_update", "test_unique_table_auto_inc"
, "test_unique_table_auto_inc_partial_update_correct_insert", "partial_update_seq_col", "nereids_partial_update_native_insert_stmt_complex", "regression_test_variant_delete_and_update"
, "test_unique_table_auto_inc_partial_update_correct_stream_load", "test_update_mow", "test_new_update", "test_update_unique", "nereids_partial_update_native_insert_seq_col"]
for (def excludeCase in excludeCases) {
excludeSuiteWildcard.add(excludeCase)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,12 @@ suite("test_full_compaction") {
// make sure all hidden data has been deleted
// (1,100)(2,200)
qt_select_final """select * from ${tableName} order by user_id"""

sql "SET skip_delete_predicate = false"
sql "SET skip_delete_sign = false"
sql "SET skip_delete_bitmap = false"
qt_select_final2 """select * from ${tableName} order by user_id"""
} finally {
try_sql("DROP TABLE IF EXISTS ${tableName}")
// try_sql("DROP TABLE IF EXISTS ${tableName}")
}
}
189 changes: 189 additions & 0 deletions regression-test/suites/compaction/test_full_compaction_ck.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import org.codehaus.groovy.runtime.IOGroovyMethods

suite("test_full_compaction_ck") {
def tableName = "test_full_compaction_ck"

try {
String backend_id;

def backendId_to_backendIP = [:]
def backendId_to_backendHttpPort = [:]
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);

backend_id = backendId_to_backendIP.keySet()[0]
def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id))
logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def configList = parseJson(out.trim())
assert configList instanceof List

boolean disableAutoCompaction = true
for (Object ele in (List) configList) {
assert ele instanceof List<String>
if (((List<String>) ele)[0] == "disable_auto_compaction") {
disableAutoCompaction = Boolean.parseBoolean(((List<String>) ele)[2])
}
}

sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE ${tableName} (
`user_id` INT NOT NULL, `value` INT NOT NULL)
UNIQUE KEY(`user_id`)
CLUSTER BY(`value`)
DISTRIBUTED BY HASH(`user_id`)
BUCKETS 1
PROPERTIES ("replication_allocation" = "tag.location.default: 1",
"disable_auto_compaction" = "true",
"enable_mow_light_delete" = "false",
"enable_unique_key_merge_on_write" = "true");"""

// version1 (1,1)(2,2)
sql """ INSERT INTO ${tableName} VALUES
(1,1),(2,2)
"""
qt_1 """select * from ${tableName} order by user_id"""


// version2 (1,10)(2,20)
sql """ INSERT INTO ${tableName} VALUES
(1,10),(2,20)
"""
qt_2 """select * from ${tableName} order by user_id"""


// version3 (1,100)(2,200)
sql """ INSERT INTO ${tableName} VALUES
(1,100),(2,200)
"""
qt_3 """select * from ${tableName} order by user_id"""


// version4 (1,100)(2,200)(3,300)
sql """ INSERT INTO ${tableName} VALUES
(3,300)
"""
qt_4 """select * from ${tableName} order by user_id"""


// version5 (1,100)(2,200)(3,100)
sql """update ${tableName} set value = 100 where user_id = 3"""
qt_5 """select * from ${tableName} order by user_id"""


// version6 (1,100)(2,200)
sql """delete from ${tableName} where user_id = 3"""
qt_6 """select * from ${tableName} order by user_id"""

sql "SET skip_delete_predicate = true"
sql "SET skip_delete_sign = true"
sql "SET skip_delete_bitmap = true"
// show all hidden data
// (1,10)(1,100)(2,2)(2,20)(2,200)(3,300)(3,100)
qt_skip_delete """select * from ${tableName} order by user_id, value"""

//TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,VersionCount,PathHash,MetaUrl,CompactionStatus
def tablets = sql_return_maparray """ show tablets from ${tableName}; """

def replicaNum = get_table_replica_num(tableName)
logger.info("get table replica num: " + replicaNum)
// before full compaction, there are 7 rowsets.
int rowsetCount = 0
for (def tablet in tablets) {
String tablet_id = tablet.TabletId
(code, out, err) = curl("GET", tablet.CompactionStatus)
logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def tabletJson = parseJson(out.trim())
assert tabletJson.rowsets instanceof List
rowsetCount +=((List<String>) tabletJson.rowsets).size()
}
assert (rowsetCount == 7 * replicaNum)

// trigger full compactions for all tablets in ${tableName}
for (def tablet in tablets) {
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
times = 1

do{
(code, out, err) = be_run_full_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
++times
sleep(2000)
} while (parseJson(out.trim()).status.toLowerCase()!="success" && times<=10)

def compactJson = parseJson(out.trim())
if (compactJson.status.toLowerCase() == "fail") {
assertEquals(disableAutoCompaction, false)
logger.info("Compaction was done automatically!")
}
if (disableAutoCompaction) {
assertEquals("success", compactJson.status.toLowerCase())
}
}

// wait for full compaction done
for (def tablet in tablets) {
boolean running = true
do {
Thread.sleep(1000)
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
(code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
}

// after full compaction, there is only 1 rowset.

rowsetCount = 0
for (def tablet in tablets) {
String tablet_id = tablet.TabletId
(code, out, err) = curl("GET", tablet.CompactionStatus)
logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def tabletJson = parseJson(out.trim())
assert tabletJson.rowsets instanceof List
rowsetCount +=((List<String>) tabletJson.rowsets).size()
}
def cloudMode = isCloudMode()
if (cloudMode) {
assert (rowsetCount == 2)
} else {
assert (rowsetCount == 1 * replicaNum)
}

// make sure all hidden data has been deleted
// (1,100)(2,200)
qt_select_final """select * from ${tableName} order by user_id"""

sql "SET skip_delete_predicate = false"
sql "SET skip_delete_sign = false"
sql "SET skip_delete_bitmap = false"
qt_select_final2 """select * from ${tableName} order by user_id"""
} finally {
// try_sql("DROP TABLE IF EXISTS ${tableName}")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,6 @@ suite("test_generated_column") {

qt_describe "describe gencol_refer_gencol"

//test update
sql "drop table if exists test_gen_col_update"
sql """create table test_gen_col_update (a int, b int, c int as (a+b))
unique key(a)
distributed by hash(a) properties("replication_num"="1")"""
sql "insert into test_gen_col_update values(1,3,default)"
qt_test_update "update test_gen_col_update set b=20"
qt_test_update_generated_column "select * from test_gen_col_update"

// test unique table, generated column is not key
sql "drop table if exists test_gen_col_unique_key"
qt_gen_col_unique_key """create table test_gen_col_unique_key(a int,b int,c int generated always as (abs(a+b)) not null)
Expand Down Expand Up @@ -233,4 +224,17 @@ suite("test_generated_column") {
PROPERTIES("replication_num" = "1");"""
exception "The generated columns can be key columns, or value columns of replace and replace_if_not_null aggregation type."
}

//test update
sql "drop table if exists test_gen_col_update"
sql """create table test_gen_col_update (a int, b int, c int as (a+b))
unique key(a)
distributed by hash(a) properties("replication_num"="1")"""
sql "insert into test_gen_col_update values(1,3,default)"
if (!isClusterKeyEnabled()) {
qt_test_update "update test_gen_col_update set b=20"
qt_test_update_generated_column "select * from test_gen_col_update"
} else {
// errCode = 2, detailMessage = The value specified for generated column 'c' in table 'test_gen_col_update' is not allowed
}
}
Loading

0 comments on commit 3b1bcaf

Please sign in to comment.