diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 97e7d0a9f5db50..9c5b4b1464764b 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -37,17 +37,13 @@ #include "gutil/strings/numbers.h" #include "io/fs/file_writer.h" // IWYU pragma: keep #include "olap/memtable_flush_executor.h" -#include "olap/olap_define.h" #include "olap/rowset/beta_rowset.h" #include "olap/rowset/beta_rowset_writer.h" #include "olap/rowset/rowset_meta.h" -#include "olap/rowset/rowset_writer.h" -#include "olap/rowset/rowset_writer_context.h" #include "olap/rowset/segment_v2/inverted_index_desc.h" #include "olap/schema_change.h" #include "olap/storage_engine.h" #include "olap/tablet_manager.h" -#include "olap/tablet_meta.h" #include "olap/txn_manager.h" #include "runtime/exec_env.h" #include "service/backend_options.h" @@ -55,7 +51,6 @@ #include "util/mem_info.h" #include "util/ref_count_closure.h" #include "util/stopwatch.hpp" -#include "util/time.h" #include "vec/core/block.h" namespace doris { @@ -70,27 +65,19 @@ Status DeltaWriter::open(WriteRequest* req, DeltaWriter** writer, RuntimeProfile DeltaWriter::DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, RuntimeProfile* profile, const UniqueId& load_id) : _req(*req), - _tablet(nullptr), - _cur_rowset(nullptr), - _rowset_writer(nullptr), + _rowset_builder(*req, storage_engine, profile), _memtable_writer(*req, profile), - _tablet_schema(new TabletSchema), - _delta_written_success(false), - _storage_engine(storage_engine), - _load_id(load_id) { + _storage_engine(storage_engine) { _init_profile(profile); } void DeltaWriter::_init_profile(RuntimeProfile* profile) { _profile = profile->create_child(fmt::format("DeltaWriter {}", _req.tablet_id), true, true); - _close_wait_timer = ADD_TIMER(_profile, "DeltaWriterCloseWaitTime"); + _close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime"); + _commit_txn_timer = ADD_TIMER(_profile, "CommitTxnTime"); } DeltaWriter::~DeltaWriter() { - if (_is_init && !_delta_written_success) { - _garbage_collection(); - } - if (!_is_init) { return; } @@ -98,110 +85,17 @@ DeltaWriter::~DeltaWriter() { // cancel and wait all memtables in flush queue to be finished _memtable_writer.cancel(); - if (_tablet != nullptr) { + if (_rowset_builder.tablet() != nullptr) { const FlushStatistic& stat = _memtable_writer.get_flush_token_stats(); - _tablet->flush_bytes->increment(stat.flush_size_bytes); - _tablet->flush_finish_count->increment(stat.flush_finish_count); - } - - if (_calc_delete_bitmap_token != nullptr) { - _calc_delete_bitmap_token->cancel(); - } - - if (_tablet != nullptr) { - _tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX + - _rowset_writer->rowset_id().to_string()); - } -} - -void DeltaWriter::_garbage_collection() { - Status rollback_status = Status::OK(); - TxnManager* txn_mgr = _storage_engine->txn_manager(); - if (_tablet != nullptr) { - rollback_status = txn_mgr->rollback_txn(_req.partition_id, _tablet, _req.txn_id); - } - // has to check rollback status, because the rowset maybe committed in this thread and - // published in another thread, then rollback will failed. - // when rollback failed should not delete rowset - if (rollback_status.ok()) { - _storage_engine->add_unused_rowset(_cur_rowset); + _rowset_builder.tablet()->flush_bytes->increment(stat.flush_size_bytes); + _rowset_builder.tablet()->flush_finish_count->increment(stat.flush_finish_count); } } Status DeltaWriter::init() { - TabletManager* tablet_mgr = _storage_engine->tablet_manager(); - _tablet = tablet_mgr->get_tablet(_req.tablet_id); - if (_tablet == nullptr) { - return Status::Error("fail to find tablet. tablet_id={}, schema_hash={}", - _req.tablet_id, _req.schema_hash); - } - - // get rowset ids snapshot - if (_tablet->enable_unique_key_merge_on_write()) { - std::lock_guard lck(_tablet->get_header_lock()); - _cur_max_version = _tablet->max_version_unlocked().second; - // tablet is under alter process. The delete bitmap will be calculated after conversion. - if (_tablet->tablet_state() == TABLET_NOTREADY && - SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id())) { - // Disable 'partial_update' when the tablet is undergoing a 'schema changing process' - if (_req.table_schema_param->is_partial_update()) { - return Status::InternalError( - "Unable to do 'partial_update' when " - "the tablet is undergoing a 'schema changing process'"); - } - _rowset_ids.clear(); - } else { - _rowset_ids = _tablet->all_rs_id(_cur_max_version); - } - } - - // check tablet version number - if (!config::disable_auto_compaction && - _tablet->exceed_version_limit(config::max_tablet_version_num - 100) && - !MemInfo::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) { - //trigger compaction - StorageEngine::instance()->submit_compaction_task( - _tablet, CompactionType::CUMULATIVE_COMPACTION, true); - if (_tablet->version_count() > config::max_tablet_version_num) { - return Status::Error( - "failed to init delta writer. version count: {}, exceed limit: {}, tablet: {}", - _tablet->version_count(), config::max_tablet_version_num, _tablet->full_name()); - } - } - - { - std::shared_lock base_migration_rlock(_tablet->get_migration_lock(), std::try_to_lock); - if (!base_migration_rlock.owns_lock()) { - return Status::Error("get lock failed"); - } - std::lock_guard push_lock(_tablet->get_push_lock()); - RETURN_IF_ERROR(_storage_engine->txn_manager()->prepare_txn(_req.partition_id, _tablet, - _req.txn_id, _req.load_id)); - } - if (_tablet->enable_unique_key_merge_on_write() && _delete_bitmap == nullptr) { - _delete_bitmap.reset(new DeleteBitmap(_tablet->tablet_id())); - } - // build tablet schema in request level - _build_current_tablet_schema(_req.index_id, _req.table_schema_param, *_tablet->tablet_schema()); - RowsetWriterContext context; - context.txn_id = _req.txn_id; - context.load_id = _req.load_id; - context.rowset_state = PREPARED; - context.segments_overlap = OVERLAPPING; - context.tablet_schema = _tablet_schema; - context.newest_write_timestamp = UnixSeconds(); - context.tablet_id = _tablet->tablet_id(); - context.tablet = _tablet; - context.write_type = DataWriteType::TYPE_DIRECT; - context.mow_context = std::make_shared(_cur_max_version, _req.txn_id, _rowset_ids, - _delete_bitmap); - std::unique_ptr rowset_writer; - RETURN_IF_ERROR(_tablet->create_rowset_writer(context, &rowset_writer)); - _rowset_writer = std::move(rowset_writer); - _memtable_writer.init(_rowset_writer, _tablet_schema, - _tablet->enable_unique_key_merge_on_write()); - _calc_delete_bitmap_token = _storage_engine->calc_delete_bitmap_executor()->create_token(); - + _rowset_builder.init(); + _memtable_writer.init(_rowset_builder.rowset_writer(), _rowset_builder.tablet_schema(), + _rowset_builder.tablet()->enable_unique_key_merge_on_write()); _is_init = true; return Status::OK(); } @@ -247,90 +141,24 @@ Status DeltaWriter::build_rowset() { DCHECK(_is_init) << "delta writer is supposed be to initialized before build_rowset() being called"; + SCOPED_TIMER(_close_wait_timer); RETURN_IF_ERROR(_memtable_writer.close_wait()); - - // use rowset meta manager to save meta - _cur_rowset = _rowset_writer->build(); - if (_cur_rowset == nullptr) { - return Status::Error("fail to build rowset"); - } - return Status::OK(); + return _rowset_builder.build_rowset(); } Status DeltaWriter::submit_calc_delete_bitmap_task() { - if (!_tablet->enable_unique_key_merge_on_write()) { - return Status::OK(); - } - - std::lock_guard l(_lock); - // tablet is under alter process. The delete bitmap will be calculated after conversion. - if (_tablet->tablet_state() == TABLET_NOTREADY && - SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id())) { - LOG(INFO) << "tablet is under alter process, delete bitmap will be calculated later, " - "tablet_id: " - << _tablet->tablet_id() << " txn_id: " << _req.txn_id; - return Status::OK(); - } - auto beta_rowset = reinterpret_cast(_cur_rowset.get()); - std::vector segments; - RETURN_IF_ERROR(beta_rowset->load_segments(&segments)); - // tablet is under alter process. The delete bitmap will be calculated after conversion. - if (_tablet->tablet_state() == TABLET_NOTREADY && - SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id())) { - return Status::OK(); - } - if (segments.size() > 1) { - // calculate delete bitmap between segments - RETURN_IF_ERROR(_tablet->calc_delete_bitmap_between_segments(_cur_rowset, segments, - _delete_bitmap)); - } - - // For partial update, we need to fill in the entire row of data, during the calculation - // of the delete bitmap. This operation is resource-intensive, and we need to minimize - // the number of times it occurs. Therefore, we skip this operation here. - if (_cur_rowset->tablet_schema()->is_partial_update()) { - return Status::OK(); - } - - LOG(INFO) << "submit calc delete bitmap task to executor, tablet_id: " << _tablet->tablet_id() - << ", txn_id: " << _req.txn_id; - return _tablet->commit_phase_update_delete_bitmap(_cur_rowset, _rowset_ids, _delete_bitmap, - segments, _req.txn_id, - _calc_delete_bitmap_token.get(), nullptr); + return _rowset_builder.submit_calc_delete_bitmap_task(); } Status DeltaWriter::wait_calc_delete_bitmap() { - if (!_tablet->enable_unique_key_merge_on_write() || - _cur_rowset->tablet_schema()->is_partial_update()) { - return Status::OK(); - } - std::lock_guard l(_lock); - RETURN_IF_ERROR(_calc_delete_bitmap_token->wait()); - RETURN_IF_ERROR(_calc_delete_bitmap_token->get_delete_bitmap(_delete_bitmap)); - LOG(INFO) << "Got result of calc delete bitmap task from executor, tablet_id: " - << _tablet->tablet_id() << ", txn_id: " << _req.txn_id; - return Status::OK(); + return _rowset_builder.wait_calc_delete_bitmap(); } Status DeltaWriter::commit_txn(const PSlaveTabletNodes& slave_tablet_nodes, const bool write_single_replica) { std::lock_guard l(_lock); - SCOPED_TIMER(_close_wait_timer); - Status res = _storage_engine->txn_manager()->commit_txn(_req.partition_id, _tablet, _req.txn_id, - _req.load_id, _cur_rowset, false); - - if (!res && !res.is()) { - LOG(WARNING) << "Failed to commit txn: " << _req.txn_id - << " for rowset: " << _cur_rowset->rowset_id(); - return res; - } - if (_tablet->enable_unique_key_merge_on_write()) { - _storage_engine->txn_manager()->set_txn_related_delete_bitmap( - _req.partition_id, _req.txn_id, _tablet->tablet_id(), _tablet->schema_hash(), - _tablet->tablet_uid(), true, _delete_bitmap, _rowset_ids); - } - - _delta_written_success = true; + SCOPED_TIMER(_commit_txn_timer); + _rowset_builder.commit_txn(); if (write_single_replica) { for (auto node_info : slave_tablet_nodes.slave_nodes()) { @@ -344,7 +172,7 @@ bool DeltaWriter::check_slave_replicas_done( google::protobuf::Map* success_slave_tablet_node_ids) { std::lock_guard lock(_slave_node_lock); if (_unfinished_slave_node.empty()) { - success_slave_tablet_node_ids->insert({_tablet->tablet_id(), _success_slave_node_ids}); + success_slave_tablet_node_ids->insert({_req.tablet_id, _success_slave_node_ids}); return true; } return false; @@ -353,7 +181,7 @@ bool DeltaWriter::check_slave_replicas_done( void DeltaWriter::add_finished_slave_replicas( google::protobuf::Map* success_slave_tablet_node_ids) { std::lock_guard lock(_slave_node_lock); - success_slave_tablet_node_ids->insert({_tablet->tablet_id(), _success_slave_node_ids}); + success_slave_tablet_node_ids->insert({_req.tablet_id, _success_slave_node_ids}); } Status DeltaWriter::cancel() { @@ -366,12 +194,6 @@ Status DeltaWriter::cancel_with_status(const Status& st) { return Status::OK(); } RETURN_IF_ERROR(_memtable_writer.cancel_with_status(st)); - if (_rowset_writer && _rowset_writer->is_doing_segcompaction()) { - _rowset_writer->wait_flying_segcompaction(); /* already cancel, ignore the return status */ - } - if (_calc_delete_bitmap_token != nullptr) { - _calc_delete_bitmap_token->cancel(); - } _is_cancelled = true; return Status::OK(); } @@ -380,35 +202,6 @@ int64_t DeltaWriter::mem_consumption(MemType mem) { return _memtable_writer.mem_consumption(mem); } -void DeltaWriter::_build_current_tablet_schema(int64_t index_id, - const OlapTableSchemaParam* table_schema_param, - const TabletSchema& ori_tablet_schema) { - _tablet_schema->copy_from(ori_tablet_schema); - // find the right index id - int i = 0; - auto indexes = table_schema_param->indexes(); - for (; i < indexes.size(); i++) { - if (indexes[i]->index_id == index_id) { - break; - } - } - - if (indexes.size() > 0 && indexes[i]->columns.size() != 0 && - indexes[i]->columns[0]->unique_id() >= 0) { - _tablet_schema->build_current_tablet_schema(index_id, table_schema_param->version(), - indexes[i], ori_tablet_schema); - } - if (_tablet_schema->schema_version() > ori_tablet_schema.schema_version()) { - _tablet->update_max_version_schema(_tablet_schema); - } - - _tablet_schema->set_table_id(table_schema_param->table_id()); - // set partial update columns info - _tablet_schema->set_partial_update_info(table_schema_param->is_partial_update(), - table_schema_param->partial_update_input_columns()); - _tablet_schema->set_is_strict_mode(table_schema_param->is_strict_mode()); -} - void DeltaWriter::_request_slave_tablet_pull_rowset(PNodeInfo node_info) { std::shared_ptr stub = ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client( @@ -417,19 +210,19 @@ void DeltaWriter::_request_slave_tablet_pull_rowset(PNodeInfo node_info) { LOG(WARNING) << "failed to send pull rowset request to slave replica. get rpc stub failed, " "slave host=" << node_info.host() << ", port=" << node_info.async_internal_port() - << ", tablet_id=" << _tablet->tablet_id() << ", txn_id=" << _req.txn_id; + << ", tablet_id=" << _req.tablet_id << ", txn_id=" << _req.txn_id; return; } - _storage_engine->txn_manager()->add_txn_tablet_delta_writer(_req.txn_id, _tablet->tablet_id(), - this); + _storage_engine->txn_manager()->add_txn_tablet_delta_writer(_req.txn_id, _req.tablet_id, this); { std::lock_guard lock(_slave_node_lock); _unfinished_slave_node.insert(node_info.id()); } std::vector indices_ids; - auto tablet_schema = _cur_rowset->rowset_meta()->tablet_schema(); + auto cur_rowset = _rowset_builder.rowset(); + auto tablet_schema = cur_rowset->rowset_meta()->tablet_schema(); if (!tablet_schema->skip_write_index_on_load()) { for (auto& column : tablet_schema->columns()) { const TabletIndex* index_meta = tablet_schema->get_inverted_index(column.unique_id()); @@ -440,19 +233,18 @@ void DeltaWriter::_request_slave_tablet_pull_rowset(PNodeInfo node_info) { } PTabletWriteSlaveRequest request; - RowsetMetaPB rowset_meta_pb = _cur_rowset->rowset_meta()->get_rowset_pb(); + RowsetMetaPB rowset_meta_pb = cur_rowset->rowset_meta()->get_rowset_pb(); request.set_allocated_rowset_meta(&rowset_meta_pb); request.set_host(BackendOptions::get_localhost()); request.set_http_port(config::webserver_port); - string tablet_path = _tablet->tablet_path(); + string tablet_path = _rowset_builder.tablet()->tablet_path(); request.set_rowset_path(tablet_path); request.set_token(ExecEnv::GetInstance()->token()); request.set_brpc_port(config::brpc_port); request.set_node_id(node_info.id()); - for (int segment_id = 0; segment_id < _cur_rowset->rowset_meta()->num_segments(); - segment_id++) { + for (int segment_id = 0; segment_id < cur_rowset->rowset_meta()->num_segments(); segment_id++) { std::stringstream segment_name; - segment_name << _cur_rowset->rowset_id() << "_" << segment_id << ".dat"; + segment_name << cur_rowset->rowset_id() << "_" << segment_id << ".dat"; int64_t segment_size = std::filesystem::file_size(tablet_path + "/" + segment_name.str()); request.mutable_segments_size()->insert({segment_id, segment_size}); @@ -491,8 +283,8 @@ void DeltaWriter::_request_slave_tablet_pull_rowset(PNodeInfo node_info) { LOG(WARNING) << "failed to send pull rowset request to slave replica, error=" << berror(closure->cntl.ErrorCode()) << ", error_text=" << closure->cntl.ErrorText() - << ". slave host: " << node_info.host() - << ", tablet_id=" << _tablet->tablet_id() << ", txn_id=" << _req.txn_id; + << ". slave host: " << node_info.host() << ", tablet_id=" << _req.tablet_id + << ", txn_id=" << _req.txn_id; std::lock_guard lock(_slave_node_lock); _unfinished_slave_node.erase(node_info.id()); } @@ -508,13 +300,14 @@ void DeltaWriter::finish_slave_tablet_pull_rowset(int64_t node_id, bool is_succe if (is_succeed) { _success_slave_node_ids.add_slave_node_ids(node_id); VLOG_CRITICAL << "record successful slave replica for txn [" << _req.txn_id - << "], tablet_id=" << _tablet->tablet_id() << ", node_id=" << node_id; + << "], tablet_id=" << _req.tablet_id << ", node_id=" << node_id; } _unfinished_slave_node.erase(node_id); } int64_t DeltaWriter::num_rows_filtered() const { - return _rowset_writer == nullptr ? 0 : _rowset_writer->num_rows_filtered(); + auto rowset_writer = _rowset_builder.rowset_writer(); + return rowset_writer == nullptr ? 0 : rowset_writer->num_rows_filtered(); } } // namespace doris diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h index c0d98b3b28e8a9..4c9f0fc35a17a0 100644 --- a/be/src/olap/delta_writer.h +++ b/be/src/olap/delta_writer.h @@ -30,9 +30,11 @@ #include #include "common/status.h" +#include "olap/delta_writer_context.h" #include "olap/memtable_writer.h" #include "olap/olap_common.h" #include "olap/rowset/rowset.h" +#include "olap/rowset_builder.h" #include "olap/tablet.h" #include "olap/tablet_meta.h" #include "olap/tablet_schema.h" @@ -54,14 +56,6 @@ namespace vectorized { class Block; } // namespace vectorized -struct WriteRequest : MemTableWriter::WriteRequest { - int32_t schema_hash; - int64_t txn_id; - int64_t partition_id; - int64_t index_id = 0; - OlapTableSchemaParam* table_schema_param; -}; - // Writer for a particular (load, index, tablet). // This class is NOT thread-safe, external synchronization is required. class DeltaWriter { @@ -93,6 +87,8 @@ class DeltaWriter { void add_finished_slave_replicas(google::protobuf::Map* success_slave_tablet_node_ids); + void finish_slave_tablet_pull_rowset(int64_t node_id, bool is_succeed); + // abandon current memtable and wait for all pending-flushing memtables to be destructed. // mem_consumption() should be 0 after this function returns. Status cancel(); @@ -109,14 +105,12 @@ class DeltaWriter { int64_t txn_id() const { return _req.txn_id; } - void finish_slave_tablet_pull_rowset(int64_t node_id, bool is_succeed); - int64_t total_received_rows() const { return _total_received_rows; } int64_t num_rows_filtered() const; // For UT - DeleteBitmapPtr get_delete_bitmap() { return _delete_bitmap; } + DeleteBitmapPtr get_delete_bitmap() { return _rowset_builder.get_delete_bitmap(); } MemTableWriter* memtable_writer() { return &_memtable_writer; } @@ -124,12 +118,6 @@ class DeltaWriter { DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, RuntimeProfile* profile, const UniqueId& load_id); - void _garbage_collection(); - - void _build_current_tablet_schema(int64_t index_id, - const OlapTableSchemaParam* table_schema_param, - const TabletSchema& ori_tablet_schema); - void _request_slave_tablet_pull_rowset(PNodeInfo node_info); void _init_profile(RuntimeProfile* profile); @@ -137,15 +125,10 @@ class DeltaWriter { bool _is_init = false; bool _is_cancelled = false; WriteRequest _req; - TabletSharedPtr _tablet; - RowsetSharedPtr _cur_rowset; - std::shared_ptr _rowset_writer; + RowsetBuilder _rowset_builder; MemTableWriter _memtable_writer; - TabletSchemaSPtr _tablet_schema; - bool _delta_written_success; StorageEngine* _storage_engine; - UniqueId _load_id; std::mutex _lock; @@ -153,18 +136,12 @@ class DeltaWriter { PSuccessSlaveTabletNodeIds _success_slave_node_ids; std::shared_mutex _slave_node_lock; - DeleteBitmapPtr _delete_bitmap = nullptr; - std::unique_ptr _calc_delete_bitmap_token; - // current rowset_ids, used to do diff in publish_version - RowsetIdUnorderedSet _rowset_ids; - // current max version, used to calculate delete bitmap - int64_t _cur_max_version; - // total rows num written by DeltaWriter int64_t _total_received_rows = 0; RuntimeProfile* _profile = nullptr; RuntimeProfile::Counter* _close_wait_timer = nullptr; + RuntimeProfile::Counter* _commit_txn_timer = nullptr; MonotonicStopWatch _lock_watch; }; diff --git a/be/src/olap/delta_writer_context.h b/be/src/olap/delta_writer_context.h new file mode 100644 index 00000000000000..c5c30b5ce7e193 --- /dev/null +++ b/be/src/olap/delta_writer_context.h @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include + +namespace doris { + +class TupleDescriptor; +class SlotDescriptor; +class OlapTableSchemaParam; + +struct WriteRequest { + int64_t tablet_id; + int32_t schema_hash; + int64_t txn_id; + int64_t partition_id; + PUniqueId load_id; + TupleDescriptor* tuple_desc; + // slots are in order of tablet's schema + const std::vector* slots; + bool is_high_priority = false; + OlapTableSchemaParam* table_schema_param; + int64_t index_id = 0; +}; + +} // namespace doris \ No newline at end of file diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp index b40d03a8ce66e6..8caddb6d37266c 100644 --- a/be/src/olap/memtable_writer.cpp +++ b/be/src/olap/memtable_writer.cpp @@ -63,7 +63,7 @@ void MemTableWriter::_init_profile(RuntimeProfile* profile) { _wait_flush_timer = ADD_TIMER(_profile, "MemTableWaitFlushTime"); _put_into_output_timer = ADD_TIMER(_profile, "MemTablePutIntoOutputTime"); _delete_bitmap_timer = ADD_TIMER(_profile, "DeleteBitmapTime"); - _close_wait_timer = ADD_TIMER(_profile, "MemTableWriterCloseWaitTime"); + _close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime"); _sort_times = ADD_COUNTER(_profile, "MemTableSortTimes", TUnit::UNIT); _agg_times = ADD_COUNTER(_profile, "MemTableAggTimes", TUnit::UNIT); _segment_num = ADD_COUNTER(_profile, "SegmentNum", TUnit::UNIT); diff --git a/be/src/olap/memtable_writer.h b/be/src/olap/memtable_writer.h index ed09f619337278..457534ce3a84e1 100644 --- a/be/src/olap/memtable_writer.h +++ b/be/src/olap/memtable_writer.h @@ -30,6 +30,7 @@ #include #include "common/status.h" +#include "olap/delta_writer_context.h" #include "olap/memtable.h" #include "olap/olap_common.h" #include "olap/rowset/rowset.h" @@ -61,15 +62,6 @@ enum MemType { WRITE = 1, FLUSH = 2, ALL = 3 }; // This class is NOT thread-safe, external synchronization is required. class MemTableWriter { public: - struct WriteRequest { - int64_t tablet_id; - PUniqueId load_id; - TupleDescriptor* tuple_desc; - // slots are in order of tablet's schema - const std::vector* slots; - bool is_high_priority = false; - }; - MemTableWriter(const WriteRequest& req, RuntimeProfile* profile); ~MemTableWriter(); diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp new file mode 100644 index 00000000000000..f92b1c96fb1a78 --- /dev/null +++ b/be/src/olap/rowset_builder.cpp @@ -0,0 +1,313 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/rowset_builder.h" + +#include +#include + +#include +#include +#include +#include + +// IWYU pragma: no_include +#include "common/compiler_util.h" // IWYU pragma: keep +#include "common/config.h" +#include "common/status.h" +#include "exec/tablet_info.h" +#include "gutil/strings/numbers.h" +#include "io/fs/file_writer.h" // IWYU pragma: keep +#include "olap/calc_delete_bitmap_executor.h" +#include "olap/olap_define.h" +#include "olap/rowset/beta_rowset.h" +#include "olap/rowset/beta_rowset_writer.h" +#include "olap/rowset/rowset_meta.h" +#include "olap/rowset/rowset_writer.h" +#include "olap/rowset/rowset_writer_context.h" +#include "olap/schema_change.h" +#include "olap/storage_engine.h" +#include "olap/tablet_manager.h" +#include "olap/tablet_meta.h" +#include "olap/txn_manager.h" +#include "util/brpc_client_cache.h" +#include "util/mem_info.h" +#include "util/ref_count_closure.h" +#include "util/stopwatch.hpp" +#include "util/time.h" +#include "vec/core/block.h" + +namespace doris { +using namespace ErrorCode; + +RowsetBuilder::RowsetBuilder(const WriteRequest& req, StorageEngine* storage_engine, + RuntimeProfile* profile) + : _req(req), _tablet_schema(new TabletSchema), _storage_engine(storage_engine) { + _init_profile(profile); +} + +void RowsetBuilder::_init_profile(RuntimeProfile* profile) { + _profile = profile->create_child(fmt::format("RowsetBuilder {}", _req.tablet_id), true, true); + _build_rowset_timer = ADD_TIMER(_profile, "BuildRowsetTime"); + _submit_delete_bitmap_timer = ADD_TIMER(_profile, "DeleteBitmapSubmitTime"); + _wait_delete_bitmap_timer = ADD_TIMER(_profile, "DeleteBitmapWaitTime"); + _commit_txn_timer = ADD_TIMER(_profile, "CommitTxnTime"); +} + +RowsetBuilder::~RowsetBuilder() { + if (_is_init && !_is_committed) { + _garbage_collection(); + } + + if (!_is_init) { + return; + } + + if (_calc_delete_bitmap_token != nullptr) { + _calc_delete_bitmap_token->cancel(); + } + + if (_tablet != nullptr) { + _tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX + + _rowset_writer->rowset_id().to_string()); + } +} + +void RowsetBuilder::_garbage_collection() { + Status rollback_status = Status::OK(); + TxnManager* txn_mgr = _storage_engine->txn_manager(); + if (_tablet != nullptr) { + rollback_status = txn_mgr->rollback_txn(_req.partition_id, _tablet, _req.txn_id); + } + // has to check rollback status, because the rowset maybe committed in this thread and + // published in another thread, then rollback will fail. + // when rollback failed should not delete rowset + if (rollback_status.ok()) { + _storage_engine->add_unused_rowset(_rowset); + } +} + +Status RowsetBuilder::init() { + TabletManager* tablet_mgr = _storage_engine->tablet_manager(); + _tablet = tablet_mgr->get_tablet(_req.tablet_id); + if (_tablet == nullptr) { + return Status::Error("fail to find tablet. tablet_id={}, schema_hash={}", + _req.tablet_id, _req.schema_hash); + } + + std::shared_ptr mow_context = nullptr; + // get rowset ids snapshot + if (_tablet->enable_unique_key_merge_on_write()) { + std::lock_guard lck(_tablet->get_header_lock()); + int64_t cur_max_version = _tablet->max_version_unlocked().second; + // tablet is under alter process. The delete bitmap will be calculated after conversion. + if (_tablet->tablet_state() == TABLET_NOTREADY && + SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id())) { + // Disable 'partial_update' when the tablet is undergoing a 'schema changing process' + if (_req.table_schema_param->is_partial_update()) { + return Status::InternalError( + "Unable to do 'partial_update' when " + "the tablet is undergoing a 'schema changing process'"); + } + _rowset_ids.clear(); + } else { + _rowset_ids = _tablet->all_rs_id(cur_max_version); + } + _delete_bitmap = std::make_shared(_tablet->tablet_id()); + mow_context = std::make_shared(cur_max_version, _req.txn_id, _rowset_ids, + _delete_bitmap); + } + + // check tablet version number + if (!config::disable_auto_compaction && + _tablet->exceed_version_limit(config::max_tablet_version_num - 100) && + !MemInfo::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) { + //trigger compaction + StorageEngine::instance()->submit_compaction_task( + _tablet, CompactionType::CUMULATIVE_COMPACTION, true); + if (_tablet->version_count() > config::max_tablet_version_num) { + return Status::Error( + "failed to init rowset builder. version count: {}, exceed limit: {}, tablet: " + "{}", + _tablet->version_count(), config::max_tablet_version_num, _tablet->full_name()); + } + } + + { + std::shared_lock base_migration_lock(_tablet->get_migration_lock(), std::try_to_lock); + if (!base_migration_lock.owns_lock()) { + return Status::Error("get lock failed"); + } + std::lock_guard push_lock(_tablet->get_push_lock()); + RETURN_IF_ERROR(_storage_engine->txn_manager()->prepare_txn(_req.partition_id, _tablet, + _req.txn_id, _req.load_id)); + } + // build tablet schema in request level + _build_current_tablet_schema(_req.index_id, _req.table_schema_param, *_tablet->tablet_schema()); + RowsetWriterContext context; + context.txn_id = _req.txn_id; + context.load_id = _req.load_id; + context.rowset_state = PREPARED; + context.segments_overlap = OVERLAPPING; + context.tablet_schema = _tablet_schema; + context.newest_write_timestamp = UnixSeconds(); + context.tablet_id = _tablet->tablet_id(); + context.tablet = _tablet; + context.write_type = DataWriteType::TYPE_DIRECT; + context.mow_context = mow_context; + std::unique_ptr rowset_writer; + RETURN_IF_ERROR(_tablet->create_rowset_writer(context, &rowset_writer)); + _rowset_writer = std::move(rowset_writer); + _calc_delete_bitmap_token = _storage_engine->calc_delete_bitmap_executor()->create_token(); + + _is_init = true; + return Status::OK(); +} + +Status RowsetBuilder::build_rowset() { + std::lock_guard l(_lock); + DCHECK(_is_init) + << "rowset builder is supposed be to initialized before build_rowset() being called"; + + SCOPED_TIMER(_build_rowset_timer); + // use rowset meta manager to save meta + _rowset = _rowset_writer->build(); + if (_rowset == nullptr) { + return Status::Error("fail to build rowset"); + } + return Status::OK(); +} + +Status RowsetBuilder::submit_calc_delete_bitmap_task() { + if (!_tablet->enable_unique_key_merge_on_write()) { + return Status::OK(); + } + std::lock_guard l(_lock); + SCOPED_TIMER(_submit_delete_bitmap_timer); + // tablet is under alter process. The delete bitmap will be calculated after conversion. + if (_tablet->tablet_state() == TABLET_NOTREADY && + SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id())) { + LOG(INFO) << "tablet is under alter process, delete bitmap will be calculated later, " + "tablet_id: " + << _tablet->tablet_id() << " txn_id: " << _req.txn_id; + return Status::OK(); + } + auto beta_rowset = reinterpret_cast(_rowset.get()); + std::vector segments; + RETURN_IF_ERROR(beta_rowset->load_segments(&segments)); + // tablet is under alter process. The delete bitmap will be calculated after conversion. + if (_tablet->tablet_state() == TABLET_NOTREADY && + SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id())) { + return Status::OK(); + } + if (segments.size() > 1) { + // calculate delete bitmap between segments + RETURN_IF_ERROR( + _tablet->calc_delete_bitmap_between_segments(_rowset, segments, _delete_bitmap)); + } + + // For partial update, we need to fill in the entire row of data, during the calculation + // of the delete bitmap. This operation is resource-intensive, and we need to minimize + // the number of times it occurs. Therefore, we skip this operation here. + if (_rowset->tablet_schema()->is_partial_update()) { + return Status::OK(); + } + + LOG(INFO) << "submit calc delete bitmap task to executor, tablet_id: " << _tablet->tablet_id() + << ", txn_id: " << _req.txn_id; + return _tablet->commit_phase_update_delete_bitmap(_rowset, _rowset_ids, _delete_bitmap, + segments, _req.txn_id, + _calc_delete_bitmap_token.get(), nullptr); +} + +Status RowsetBuilder::wait_calc_delete_bitmap() { + if (!_tablet->enable_unique_key_merge_on_write() || + _rowset->tablet_schema()->is_partial_update()) { + return Status::OK(); + } + std::lock_guard l(_lock); + SCOPED_TIMER(_wait_delete_bitmap_timer); + RETURN_IF_ERROR(_calc_delete_bitmap_token->wait()); + RETURN_IF_ERROR(_calc_delete_bitmap_token->get_delete_bitmap(_delete_bitmap)); + LOG(INFO) << "Got result of calc delete bitmap task from executor, tablet_id: " + << _tablet->tablet_id() << ", txn_id: " << _req.txn_id; + return Status::OK(); +} + +Status RowsetBuilder::commit_txn() { + std::lock_guard l(_lock); + SCOPED_TIMER(_commit_txn_timer); + Status res = _storage_engine->txn_manager()->commit_txn(_req.partition_id, _tablet, _req.txn_id, + _req.load_id, _rowset, false); + + if (!res && !res.is()) { + LOG(WARNING) << "Failed to commit txn: " << _req.txn_id + << " for rowset: " << _rowset->rowset_id(); + return res; + } + if (_tablet->enable_unique_key_merge_on_write()) { + _storage_engine->txn_manager()->set_txn_related_delete_bitmap( + _req.partition_id, _req.txn_id, _tablet->tablet_id(), _tablet->schema_hash(), + _tablet->tablet_uid(), true, _delete_bitmap, _rowset_ids); + } + + _is_committed = true; + return Status::OK(); +} + +Status RowsetBuilder::cancel() { + std::lock_guard l(_lock); + if (_is_cancelled) { + return Status::OK(); + } + if (_calc_delete_bitmap_token != nullptr) { + _calc_delete_bitmap_token->cancel(); + } + _is_cancelled = true; + return Status::OK(); +} + +void RowsetBuilder::_build_current_tablet_schema(int64_t index_id, + const OlapTableSchemaParam* table_schema_param, + const TabletSchema& ori_tablet_schema) { + _tablet_schema->copy_from(ori_tablet_schema); + // find the right index id + int i = 0; + auto indexes = table_schema_param->indexes(); + for (; i < indexes.size(); i++) { + if (indexes[i]->index_id == index_id) { + break; + } + } + + if (indexes.size() > 0 && indexes[i]->columns.size() != 0 && + indexes[i]->columns[0]->unique_id() >= 0) { + _tablet_schema->build_current_tablet_schema(index_id, table_schema_param->version(), + indexes[i], ori_tablet_schema); + } + if (_tablet_schema->schema_version() > ori_tablet_schema.schema_version()) { + _tablet->update_max_version_schema(_tablet_schema); + } + + _tablet_schema->set_table_id(table_schema_param->table_id()); + // set partial update columns info + _tablet_schema->set_partial_update_info(table_schema_param->is_partial_update(), + table_schema_param->partial_update_input_columns()); + _tablet_schema->set_is_strict_mode(table_schema_param->is_strict_mode()); +} + +} // namespace doris diff --git a/be/src/olap/rowset_builder.h b/be/src/olap/rowset_builder.h new file mode 100644 index 00000000000000..8bb94c20905651 --- /dev/null +++ b/be/src/olap/rowset_builder.h @@ -0,0 +1,123 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "common/status.h" +#include "olap/delta_writer_context.h" +#include "olap/olap_common.h" +#include "olap/rowset/rowset.h" +#include "olap/tablet.h" +#include "olap/tablet_meta.h" +#include "olap/tablet_schema.h" +#include "util/spinlock.h" +#include "util/uid_util.h" + +namespace doris { + +class CalcDeleteBitmapToken; +class FlushToken; +class MemTable; +class MemTracker; +class StorageEngine; +class TupleDescriptor; +class SlotDescriptor; +class OlapTableSchemaParam; +class RowsetWriter; + +namespace vectorized { +class Block; +} // namespace vectorized + +// Writer for a particular (load, index, tablet). +// This class is NOT thread-safe, external synchronization is required. +class RowsetBuilder { +public: + RowsetBuilder(const WriteRequest& req, StorageEngine* storage_engine, RuntimeProfile* profile); + + ~RowsetBuilder(); + + Status init(); + + Status build_rowset(); + + Status submit_calc_delete_bitmap_task(); + + Status wait_calc_delete_bitmap(); + + Status commit_txn(); + + Status cancel(); + + std::shared_ptr rowset_writer() const { return _rowset_writer; } + + TabletSharedPtr tablet() const { return _tablet; } + + RowsetSharedPtr rowset() const { return _rowset; } + + TabletSchemaSPtr tablet_schema() const { return _tablet_schema; } + + // For UT + DeleteBitmapPtr get_delete_bitmap() { return _delete_bitmap; } + +private: + void _garbage_collection(); + + void _build_current_tablet_schema(int64_t index_id, + const OlapTableSchemaParam* table_schema_param, + const TabletSchema& ori_tablet_schema); + + void _init_profile(RuntimeProfile* profile); + + bool _is_init = false; + bool _is_cancelled = false; + bool _is_committed = false; + WriteRequest _req; + TabletSharedPtr _tablet; + RowsetSharedPtr _rowset; + std::shared_ptr _rowset_writer; + TabletSchemaSPtr _tablet_schema; + + StorageEngine* _storage_engine = nullptr; + + std::mutex _lock; + + DeleteBitmapPtr _delete_bitmap; + std::unique_ptr _calc_delete_bitmap_token; + // current rowset_ids, used to do diff in publish_version + RowsetIdUnorderedSet _rowset_ids; + + RuntimeProfile* _profile = nullptr; + RuntimeProfile::Counter* _build_rowset_timer = nullptr; + RuntimeProfile::Counter* _submit_delete_bitmap_timer = nullptr; + RuntimeProfile::Counter* _wait_delete_bitmap_timer = nullptr; + RuntimeProfile::Counter* _commit_txn_timer = nullptr; +}; + +} // namespace doris