Skip to content

Commit

Permalink
[refactor](load) split rowset builder out of delta writer (#22805)
Browse files Browse the repository at this point in the history
  • Loading branch information
kaijchen authored Aug 14, 2023
1 parent c67d1cc commit 29fbe74
Show file tree
Hide file tree
Showing 7 changed files with 521 additions and 278 deletions.
269 changes: 31 additions & 238 deletions be/src/olap/delta_writer.cpp

Large diffs are not rendered by default.

37 changes: 7 additions & 30 deletions be/src/olap/delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@
#include <vector>

#include "common/status.h"
#include "olap/delta_writer_context.h"
#include "olap/memtable_writer.h"
#include "olap/olap_common.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset_builder.h"
#include "olap/tablet.h"
#include "olap/tablet_meta.h"
#include "olap/tablet_schema.h"
Expand All @@ -54,14 +56,6 @@ namespace vectorized {
class Block;
} // namespace vectorized

struct WriteRequest : MemTableWriter::WriteRequest {
int32_t schema_hash;
int64_t txn_id;
int64_t partition_id;
int64_t index_id = 0;
OlapTableSchemaParam* table_schema_param;
};

// Writer for a particular (load, index, tablet).
// This class is NOT thread-safe, external synchronization is required.
class DeltaWriter {
Expand Down Expand Up @@ -93,6 +87,8 @@ class DeltaWriter {
void add_finished_slave_replicas(google::protobuf::Map<int64_t, PSuccessSlaveTabletNodeIds>*
success_slave_tablet_node_ids);

void finish_slave_tablet_pull_rowset(int64_t node_id, bool is_succeed);

// abandon current memtable and wait for all pending-flushing memtables to be destructed.
// mem_consumption() should be 0 after this function returns.
Status cancel();
Expand All @@ -109,62 +105,43 @@ class DeltaWriter {

int64_t txn_id() const { return _req.txn_id; }

void finish_slave_tablet_pull_rowset(int64_t node_id, bool is_succeed);

int64_t total_received_rows() const { return _total_received_rows; }

int64_t num_rows_filtered() const;

// For UT
DeleteBitmapPtr get_delete_bitmap() { return _delete_bitmap; }
DeleteBitmapPtr get_delete_bitmap() { return _rowset_builder.get_delete_bitmap(); }

MemTableWriter* memtable_writer() { return &_memtable_writer; }

private:
DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, RuntimeProfile* profile,
const UniqueId& load_id);

void _garbage_collection();

void _build_current_tablet_schema(int64_t index_id,
const OlapTableSchemaParam* table_schema_param,
const TabletSchema& ori_tablet_schema);

void _request_slave_tablet_pull_rowset(PNodeInfo node_info);

void _init_profile(RuntimeProfile* profile);

bool _is_init = false;
bool _is_cancelled = false;
WriteRequest _req;
TabletSharedPtr _tablet;
RowsetSharedPtr _cur_rowset;
std::shared_ptr<RowsetWriter> _rowset_writer;
RowsetBuilder _rowset_builder;
MemTableWriter _memtable_writer;
TabletSchemaSPtr _tablet_schema;
bool _delta_written_success;

StorageEngine* _storage_engine;
UniqueId _load_id;

std::mutex _lock;

std::unordered_set<int64_t> _unfinished_slave_node;
PSuccessSlaveTabletNodeIds _success_slave_node_ids;
std::shared_mutex _slave_node_lock;

DeleteBitmapPtr _delete_bitmap = nullptr;
std::unique_ptr<CalcDeleteBitmapToken> _calc_delete_bitmap_token;
// current rowset_ids, used to do diff in publish_version
RowsetIdUnorderedSet _rowset_ids;
// current max version, used to calculate delete bitmap
int64_t _cur_max_version;

// total rows num written by DeltaWriter
int64_t _total_received_rows = 0;

RuntimeProfile* _profile = nullptr;
RuntimeProfile::Counter* _close_wait_timer = nullptr;
RuntimeProfile::Counter* _commit_txn_timer = nullptr;

MonotonicStopWatch _lock_watch;
};
Expand Down
45 changes: 45 additions & 0 deletions be/src/olap/delta_writer_context.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <gen_cpp/Types_types.h>
#include <gen_cpp/types.pb.h>

#include <vector>

namespace doris {

class TupleDescriptor;
class SlotDescriptor;
class OlapTableSchemaParam;

struct WriteRequest {
int64_t tablet_id;
int32_t schema_hash;
int64_t txn_id;
int64_t partition_id;
PUniqueId load_id;
TupleDescriptor* tuple_desc;
// slots are in order of tablet's schema
const std::vector<SlotDescriptor*>* slots;
bool is_high_priority = false;
OlapTableSchemaParam* table_schema_param;
int64_t index_id = 0;
};

} // namespace doris
2 changes: 1 addition & 1 deletion be/src/olap/memtable_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ void MemTableWriter::_init_profile(RuntimeProfile* profile) {
_wait_flush_timer = ADD_TIMER(_profile, "MemTableWaitFlushTime");
_put_into_output_timer = ADD_TIMER(_profile, "MemTablePutIntoOutputTime");
_delete_bitmap_timer = ADD_TIMER(_profile, "DeleteBitmapTime");
_close_wait_timer = ADD_TIMER(_profile, "MemTableWriterCloseWaitTime");
_close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime");
_sort_times = ADD_COUNTER(_profile, "MemTableSortTimes", TUnit::UNIT);
_agg_times = ADD_COUNTER(_profile, "MemTableAggTimes", TUnit::UNIT);
_segment_num = ADD_COUNTER(_profile, "SegmentNum", TUnit::UNIT);
Expand Down
10 changes: 1 addition & 9 deletions be/src/olap/memtable_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <vector>

#include "common/status.h"
#include "olap/delta_writer_context.h"
#include "olap/memtable.h"
#include "olap/olap_common.h"
#include "olap/rowset/rowset.h"
Expand Down Expand Up @@ -61,15 +62,6 @@ enum MemType { WRITE = 1, FLUSH = 2, ALL = 3 };
// This class is NOT thread-safe, external synchronization is required.
class MemTableWriter {
public:
struct WriteRequest {
int64_t tablet_id;
PUniqueId load_id;
TupleDescriptor* tuple_desc;
// slots are in order of tablet's schema
const std::vector<SlotDescriptor*>* slots;
bool is_high_priority = false;
};

MemTableWriter(const WriteRequest& req, RuntimeProfile* profile);

~MemTableWriter();
Expand Down
Loading

0 comments on commit 29fbe74

Please sign in to comment.