Skip to content

Commit

Permalink
[fix](cluster key) fix cluster key duplicated key
Browse files Browse the repository at this point in the history
  • Loading branch information
mymeiyi committed Dec 2, 2024
1 parent 70b0a08 commit dc2ea14
Show file tree
Hide file tree
Showing 8 changed files with 198 additions and 7 deletions.
2 changes: 0 additions & 2 deletions be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -375,11 +375,9 @@ Status CloudCumulativeCompaction::modify_rowsets() {
Status CloudCumulativeCompaction::process_old_version_delete_bitmap() {
// agg previously rowset old version delete bitmap
std::vector<RowsetSharedPtr> pre_rowsets {};
std::vector<std::string> pre_rowset_ids {};
for (const auto& it : cloud_tablet()->rowset_map()) {
if (it.first.second < _input_rowsets.front()->start_version()) {
pre_rowsets.emplace_back(it.second);
pre_rowset_ids.emplace_back(it.second->rowset_id().to_string());
}
}
std::sort(pre_rowsets.begin(), pre_rowsets.end(), Rowset::comparator);
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
}
}
auto total_update_delete_bitmap_time_us = MonotonicMicros() - t3;
LOG(INFO) << "calculate delete bitmap successfully on tablet"
LOG(INFO) << "finish calculate delete bitmap on tablet"
<< ", table_id=" << tablet->table_id() << ", transaction_id=" << _transaction_id
<< ", tablet_id=" << tablet->tablet_id()
<< ", get_tablet_time_us=" << get_tablet_time_us
Expand Down
38 changes: 38 additions & 0 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

#include "cloud/cloud_meta_mgr.h"
#include "cloud/cloud_storage_engine.h"
#include "cloud/config.h"
#include "common/config.h"
#include "common/status.h"
#include "cpp/sync_point.h"
Expand Down Expand Up @@ -191,6 +192,9 @@ Status Compaction::merge_input_rowsets() {
SCOPED_TIMER(_merge_rowsets_latency_timer);
// 1. Merge segment files and write bkd inverted index
if (_is_vertical) {
if (!_tablet->tablet_schema()->cluster_key_idxes().empty()) {
RETURN_IF_ERROR(_update_delete_bitmap());
}
res = Merger::vertical_merge_rowsets(_tablet, compaction_type(), *_cur_tablet_schema,
input_rs_readers, _output_rs_writer.get(),
get_avg_segment_rows(), way_num, &_stats);
Expand Down Expand Up @@ -872,6 +876,40 @@ void Compaction::construct_index_compaction_columns(RowsetWriterContext& ctx) {
}
}

Status Compaction::_update_delete_bitmap() {
// for mow with cluster keys, compaction read data with delete bitmap
// if tablet is not ready(such as schema change), we need to update delete bitmap
{
std::shared_lock meta_rlock(_tablet->get_header_lock());
if (_tablet->tablet_state() != TABLET_NOTREADY) {
return Status::OK();
}
}
OlapStopWatch watch;
std::vector<RowsetSharedPtr> rowsets;
for (const auto& rowset : _input_rowsets) {
Status st;
if (config::is_cloud_mode()) {
st = _tablet->update_delete_bitmap_without_lock(_tablet, rowset, &rowsets);
} else {
std::lock_guard rwlock(
(std::dynamic_pointer_cast<Tablet>(_tablet)->get_rowset_update_lock()));
std::shared_lock rlock(_tablet->get_header_lock());
st = _tablet->update_delete_bitmap_without_lock(_tablet, rowset, &rowsets);
}
if (!st.ok()) {
LOG(INFO) << "failed update_delete_bitmap_without_lock for tablet_id="
<< _tablet->tablet_id() << ", st=" << st.to_string();
return st;
}
rowsets.push_back(rowset);
}
LOG(INFO) << "finish update delete bitmap for tablet: " << _tablet->tablet_id()
<< ", rowsets: " << _input_rowsets.size() << ", cost: " << watch.get_elapse_time_us()
<< "(us)";
return Status::OK();
}

Status CompactionMixin::construct_output_rowset_writer(RowsetWriterContext& ctx) {
// only do index compaction for dup_keys and unique_keys with mow enabled
if (config::inverted_index_compaction_enable &&
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ class Compaction {

int64_t merge_way_num();

Status _update_delete_bitmap();

// the root tracker for this compaction
std::shared_ptr<MemTrackerLimiter> _mem_tracker;

Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ Status CumulativeCompaction::pick_rowsets_to_compact() {
DCHECK(missing_versions.size() % 2 == 0);
LOG(WARNING) << "There are missed versions among rowsets. "
<< "total missed version size: " << missing_versions.size() / 2
<< " first missed version prev rowset verison=" << missing_versions[0]
<< ", first missed version prev rowset verison=" << missing_versions[0]
<< ", first missed version next rowset version=" << missing_versions[1]
<< ", tablet=" << _tablet->tablet_id();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -681,9 +681,9 @@ private void finishCalcDeleteBitmap(AgentTask task, TFinishTaskRequest request)
CalcDeleteBitmapTask calcDeleteBitmapTask = (CalcDeleteBitmapTask) task;
if (request.getTaskStatus().getStatusCode() != TStatusCode.OK) {
calcDeleteBitmapTask.countDownToZero(request.getTaskStatus().getStatusCode(),
"backend: " + task.getBackendId() + ", error_tablet_size: "
+ request.getErrorTabletIdsSize() + ", err_msg: "
+ request.getTaskStatus().getErrorMsgs().toString());
"backend: " + task.getBackendId() + ", error_tablet_size: " + request.getErrorTabletIdsSize()
+ ", error_tablets: " + request.getErrorTabletIds()
+ ", err_msg: " + request.getTaskStatus().getErrorMsgs().toString());
} else if (request.isSetRespPartitions()
&& calcDeleteBitmapTask.isFinishRequestStale(request.getRespPartitions())) {
LOG.warn("get staled response from backend: {}, report version: {}. calcDeleteBitmapTask's"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select1 --
10 20 35 40

-- !select2 --
10 20 40 37
11 20 40 37

Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// The cases is copied from https://github.com/trinodb/trino/tree/master
// /testing/trino-product-tests/src/main/resources/sql-tests/testcases
// and modified by Doris.

import org.codehaus.groovy.runtime.IOGroovyMethods

suite("test_schema_change_and_compaction", "nonConcurrent") {
def tableName = "test_schema_change_and_compaction"

def getAlterTableState = { job_state ->
def retry = 0
def last_state = ""
while (true) {
sleep(2000)
def state = sql " show alter table column where tablename = '${tableName}' order by CreateTime desc limit 1"
logger.info("alter table state: ${state}")
last_state = state[0][9]
if (state.size() > 0 && state[0][9] == job_state) {
return
}
retry++
if (retry >= 10) {
break
}
}
assertTrue(false, "alter table job state is ${last_state}, not ${job_state} after retry ${retry} times")
}

def block_convert_historical_rowsets = {
if (isCloudMode()) {
GetDebugPoint().enableDebugPointForAllBEs("CloudSchemaChangeJob::_convert_historical_rowsets.block")
} else {
GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob::_convert_historical_rowsets.block")
}
}

def unblock = {
if (isCloudMode()) {
GetDebugPoint().disableDebugPointForAllBEs("CloudSchemaChangeJob::_convert_historical_rowsets.block")
} else {
GetDebugPoint().disableDebugPointForAllBEs("SchemaChangeJob::_convert_historical_rowsets.block")
}
}

onFinish {
unblock()
}

sql """ DROP TABLE IF EXISTS ${tableName} force """
sql """
CREATE TABLE ${tableName} ( `k1` int(11), `k2` int(11), `v1` int(11), `v2` int(11) ) ENGINE=OLAP
unique KEY(`k1`, `k2`) cluster by(v1) DISTRIBUTED BY HASH(`k1`) BUCKETS 1
PROPERTIES ( "replication_num" = "1" );
"""
sql """ insert into ${tableName} values(10, 20, 30, 40); """

// alter table
block_convert_historical_rowsets()
sql """ alter table ${tableName} order by(k1, k2, v2, v1); """
getAlterTableState("RUNNING")

def tablets = sql_return_maparray """ show tablets from ${tableName}; """
logger.info("tablets: ${tablets}")
assertEquals(2, tablets.size())
String alterTabletId = ""
String alterTabletBackendId = ""
String alterTabletCompactionUrl = ""
for (Map<String, String> tablet : tablets) {
if (tablet["State"] == "ALTER") {
alterTabletId = tablet["TabletId"].toLong()
alterTabletBackendId = tablet["BackendId"]
alterTabletCompactionUrl = tablet["CompactionStatus"]
}
}
logger.info("alterTabletId: ${alterTabletId}, alterTabletBackendId: ${alterTabletBackendId}, alterTabletCompactionUrl: ${alterTabletCompactionUrl}")
assertTrue(!alterTabletId.isEmpty())

// write some data
sql """ insert into ${tableName} values(10, 20, 31, 40); """
sql """ insert into ${tableName} values(10, 20, 32, 40); """
sql """ insert into ${tableName} values(10, 20, 33, 40); """
sql """ insert into ${tableName} values(10, 20, 34, 40); """
sql """ insert into ${tableName} values(10, 20, 35, 40); """
order_qt_select1 """ select * from ${tableName}; """

// trigger compaction
def backendId_to_backendIP = [:]
def backendId_to_backendHttpPort = [:]
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort)
logger.info("ip: " + backendId_to_backendIP.get(alterTabletBackendId) + ", port: " + backendId_to_backendHttpPort.get(alterTabletBackendId))
def (code, out, err) = be_run_cumulative_compaction(backendId_to_backendIP.get(alterTabletBackendId), backendId_to_backendHttpPort.get(alterTabletBackendId), alterTabletId+"")
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)

// wait for compaction done
def enable_new_tablet_do_compaction = get_be_param.call("enable_new_tablet_do_compaction")
logger.info("enable_new_tablet_do_compaction: " + enable_new_tablet_do_compaction)
boolean enable = enable_new_tablet_do_compaction.get(alterTabletBackendId).toBoolean()
logger.info("enable: " + enable)
for (int i = 0; i < 10; i++) {
(code, out, err) = curl("GET", alterTabletCompactionUrl)
logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def tabletJson = parseJson(out.trim())
assert tabletJson.rowsets instanceof List
if (isCloudMode()) {
if (enable) {
if(tabletJson.rowsets.size() < 5) {
break
}
} else {
// "msg": "invalid tablet state. tablet_id="
break
}
} else {
if(tabletJson.rowsets.size() < 5) {
break
}
}
sleep(2000)
}

// unblock
unblock()
sql """ insert into ${tableName}(k1, k2, v1, v2) values(10, 20, 36, 40), (11, 20, 36, 40); """
sql """ insert into ${tableName}(k1, k2, v1, v2) values(10, 20, 37, 40), (11, 20, 37, 40); """
getAlterTableState("FINISHED")
order_qt_select2 """ select * from ${tableName}; """
}

0 comments on commit dc2ea14

Please sign in to comment.