Skip to content

Commit

Permalink
[fix](auto-partition) fix auto partition load lost data in multi send…
Browse files Browse the repository at this point in the history
…er (#35287)

Change `use_cnt` mechanism for incremental (auto partition) channels and
streams, it's now dynamically counted.
Use `close_wait()` of regular partitions as a synchronize point to make
sure all sinks are in close phase before closing any incremental (auto
partition) channels and streams.
Add dummy (fake) partition and tablet if there is no regular partition
in the auto partition table.

Replace #34740

Co-authored-by: zhaochangle <[email protected]>
  • Loading branch information
kaijchen and zclllyybb committed May 30, 2024
1 parent 300582f commit 6573dc1
Show file tree
Hide file tree
Showing 28 changed files with 457 additions and 142 deletions.
17 changes: 10 additions & 7 deletions be/src/exec/tablet_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -388,18 +388,21 @@ Status VOlapTablePartitionParam::init() {
// for both auto/non-auto partition table.
_is_in_partition = _part_type == TPartitionType::type::LIST_PARTITIONED;

// initial partitions
// initial partitions. if meet dummy partitions only for open BE nodes, not generate key of them for finding
for (const auto& t_part : _t_param.partitions) {
VOlapTablePartition* part = nullptr;
RETURN_IF_ERROR(generate_partition_from(t_part, part));
_partitions.emplace_back(part);
if (_is_in_partition) {
for (auto& in_key : part->in_keys) {
_partitions_map->emplace(std::tuple {in_key.first, in_key.second, false}, part);

if (!_t_param.partitions_is_fake) {
if (_is_in_partition) {
for (auto& in_key : part->in_keys) {
_partitions_map->emplace(std::tuple {in_key.first, in_key.second, false}, part);
}
} else {
_partitions_map->emplace(
std::tuple {part->end_key.first, part->end_key.second, false}, part);
}
} else {
_partitions_map->emplace(std::tuple {part->end_key.first, part->end_key.second, false},
part);
}
}

Expand Down
23 changes: 21 additions & 2 deletions be/src/runtime/load_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ namespace doris {
bvar::Adder<int64_t> g_loadchannel_cnt("loadchannel_cnt");

LoadChannel::LoadChannel(const UniqueId& load_id, int64_t timeout_s, bool is_high_priority,
const std::string& sender_ip, int64_t backend_id, bool enable_profile)
std::string sender_ip, int64_t backend_id, bool enable_profile)
: _load_id(load_id),
_timeout_s(timeout_s),
_is_high_priority(is_high_priority),
_sender_ip(sender_ip),
_sender_ip(std::move(sender_ip)),
_backend_id(backend_id),
_enable_profile(enable_profile) {
std::shared_ptr<QueryContext> query_context =
Expand Down Expand Up @@ -161,6 +161,7 @@ Status LoadChannel::add_batch(const PTabletWriterAddBlockRequest& request,
}

// 3. handle eos
// if channel is incremental, maybe hang on close until all close request arrived.
if (request.has_eos() && request.eos()) {
st = _handle_eos(channel.get(), request, response);
_report_profile(response);
Expand All @@ -182,6 +183,23 @@ Status LoadChannel::_handle_eos(BaseTabletsChannel* channel,
auto index_id = request.index_id();

RETURN_IF_ERROR(channel->close(this, request, response, &finished));

// for init node, we close waiting(hang on) all close request and let them return together.
if (request.has_hang_wait() && request.hang_wait()) {
DCHECK(!channel->is_incremental_channel());
VLOG_TRACE << "reciever close waiting!" << request.sender_id();
int count = 0;
while (!channel->is_finished()) {
bthread_usleep(1000);
count++;
}
// now maybe finished or cancelled.
VLOG_TRACE << "reciever close wait finished!" << request.sender_id();
if (count >= 1000 * _timeout_s) { // maybe config::streaming_load_rpc_max_alive_time_sec
return Status::InternalError("Tablets channel didn't wait all close");
}
}

if (finished) {
std::lock_guard<std::mutex> l(_lock);
{
Expand All @@ -191,6 +209,7 @@ Status LoadChannel::_handle_eos(BaseTabletsChannel* channel,
std::make_pair(channel->total_received_rows(), channel->num_rows_filtered())));
_tablets_channels.erase(index_id);
}
VLOG_NOTICE << "load " << _load_id.to_string() << " closed tablets_channel " << index_id;
_finished_channel_ids.emplace(index_id);
}
return Status::OK();
Expand Down
9 changes: 1 addition & 8 deletions be/src/runtime/load_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,19 @@

#pragma once

#include <algorithm>
#include <atomic>
#include <functional>
#include <map>
#include <memory>
#include <mutex>
#include <ostream>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>

#include "common/status.h"
#include "olap/memtable_memory_limiter.h"
#include "runtime/exec_env.h"
#include "runtime/thread_context.h"
#include "util/runtime_profile.h"
#include "util/spinlock.h"
#include "util/thrift_util.h"
#include "util/uid_util.h"

namespace doris {
Expand All @@ -52,7 +45,7 @@ class BaseTabletsChannel;
class LoadChannel {
public:
LoadChannel(const UniqueId& load_id, int64_t timeout_s, bool is_high_priority,
const std::string& sender_ip, int64_t backend_id, bool enable_profile);
std::string sender_ip, int64_t backend_id, bool enable_profile);
~LoadChannel();

// open a new load channel if not exist
Expand Down
8 changes: 0 additions & 8 deletions be/src/runtime/load_channel_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,17 @@
// IWYU pragma: no_include <bits/chrono.h>
#include <chrono> // IWYU pragma: keep
#include <ctime>
#include <functional>
#include <map>
#include <memory>
#include <ostream>
#include <queue>
#include <string>
#include <tuple>
#include <vector>

#include "common/config.h"
#include "common/logging.h"
#include "runtime/exec_env.h"
#include "runtime/load_channel.h"
#include "runtime/memory/mem_tracker.h"
#include "util/doris_metrics.h"
#include "util/mem_info.h"
#include "util/metrics.h"
#include "util/perf_counters.h"
#include "util/pretty_printer.h"
#include "util/thread.h"

namespace doris {
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/load_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ LoadStream::~LoadStream() {
Status LoadStream::init(const POpenLoadStreamRequest* request) {
_txn_id = request->txn_id();
_total_streams = request->total_streams();
DCHECK(_total_streams > 0) << "total streams should be greator than 0";
_is_incremental = (_total_streams == 0);

_schema = std::make_shared<OlapTableSchemaParam>();
RETURN_IF_ERROR(_schema->init(request->schema()));
Expand Down
4 changes: 4 additions & 0 deletions be/src/runtime/load_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ class LoadStream : public brpc::StreamInputHandler {
void add_source(int64_t src_id) {
std::lock_guard lock_guard(_lock);
_open_streams[src_id]++;
if (_is_incremental) {
_total_streams++;
}
}

Status close(int64_t src_id, const std::vector<PTabletID>& tablets_to_commit,
Expand Down Expand Up @@ -167,6 +170,7 @@ class LoadStream : public brpc::StreamInputHandler {
RuntimeProfile::Counter* _close_wait_timer = nullptr;
LoadStreamMgr* _load_stream_mgr = nullptr;
QueryThreadContext _query_thread_context;
bool _is_incremental = false;
};

using LoadStreamPtr = std::unique_ptr<LoadStream>;
Expand Down
37 changes: 33 additions & 4 deletions be/src/runtime/tablets_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,29 @@ Status BaseTabletsChannel::open(const PTabletWriterOpenRequest& request) {
RETURN_IF_ERROR(_schema->init(request.schema()));
_tuple_desc = _schema->tuple_desc();

_num_remaining_senders = request.num_senders();
_next_seqs.resize(_num_remaining_senders, 0);
_closed_senders.Reset(_num_remaining_senders);
int max_sender = request.num_senders();
/*
* a tablets channel in reciever is related to a bulk of VNodeChannel of sender. each instance one or none.
* there are two possibilities:
* 1. there's partitions originally broadcasted by FE. so all sender(instance) know it at start. and open() will be
* called directly, not by incremental_open(). and after _state changes to kOpened. _open_by_incremental will never
* be true. in this case, _num_remaining_senders will keep same with senders number. when all sender sent close rpc,
* the tablets channel will close. and if for auto partition table, these channel's closing will hang on reciever and
* return together to avoid close-then-incremental-open problem.
* 2. this tablets channel is opened by incremental_open of sender's sink node. so only this sender will know this partition
* (this TabletsChannel) at that time. and we are not sure how many sender will know in the end. it depends on data
* distribution. in this situation open() is called by incremental_open() at first time. so _open_by_incremental is true.
* then _num_remaining_senders will not be set here. but inc every time when incremental_open() called. so it's dynamic
* and also need same number of senders' close to close. but will not hang.
*/
if (_open_by_incremental) {
DCHECK(_num_remaining_senders == 0) << _num_remaining_senders;
} else {
_num_remaining_senders = max_sender;
}
// just use max_sender no matter incremental or not cuz we dont know how many senders will open.
_next_seqs.resize(max_sender, 0);
_closed_senders.Reset(max_sender);

RETURN_IF_ERROR(_open_all_writers(request));

Expand All @@ -152,10 +172,19 @@ Status BaseTabletsChannel::open(const PTabletWriterOpenRequest& request) {

Status BaseTabletsChannel::incremental_open(const PTabletWriterOpenRequest& params) {
SCOPED_TIMER(_incremental_open_timer);
if (_state == kInitialized) { // haven't opened

// current node first opened by incremental open
if (_state == kInitialized) {
_open_by_incremental = true;
RETURN_IF_ERROR(open(params));
}

std::lock_guard<std::mutex> l(_lock);

if (_open_by_incremental) {
_num_remaining_senders++;
}

std::vector<SlotDescriptor*>* index_slots = nullptr;
int32_t schema_hash = 0;
for (const auto& index : _schema->indexes()) {
Expand Down
9 changes: 6 additions & 3 deletions be/src/runtime/tablets_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@

#include <atomic>
#include <cstdint>
#include <functional>
#include <map>
#include <mutex>
#include <ostream>
#include <shared_mutex>
Expand Down Expand Up @@ -113,6 +111,11 @@ class BaseTabletsChannel {

size_t num_rows_filtered() const { return _num_rows_filtered; }

// means this tablets in this BE is incremental opened partitions.
bool is_incremental_channel() const { return _open_by_incremental; }

bool is_finished() const { return _state == kFinished; }

protected:
Status _get_current_seq(int64_t& cur_seq, const PTabletWriterAddBlockRequest& request);

Expand Down Expand Up @@ -151,8 +154,8 @@ class BaseTabletsChannel {
int64_t _txn_id = -1;
int64_t _index_id = -1;
std::shared_ptr<OlapTableSchemaParam> _schema;

TupleDescriptor* _tuple_desc = nullptr;
bool _open_by_incremental = false;

// next sequence we expect
int _num_remaining_senders = 0;
Expand Down
11 changes: 7 additions & 4 deletions be/src/vec/sink/load_stream_map_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ LoadStreamMap::LoadStreamMap(UniqueId load_id, int64_t src_id, int num_streams,
DCHECK(num_use > 0) << "use num should be greater than 0";
}

std::shared_ptr<Streams> LoadStreamMap::get_or_create(int64_t dst_id) {
std::shared_ptr<Streams> LoadStreamMap::get_or_create(int64_t dst_id, bool incremental) {
std::lock_guard<std::mutex> lock(_mutex);
std::shared_ptr<Streams> streams = _streams_for_node[dst_id];
if (streams != nullptr) {
Expand All @@ -44,7 +44,7 @@ std::shared_ptr<Streams> LoadStreamMap::get_or_create(int64_t dst_id) {
streams = std::make_shared<Streams>();
for (int i = 0; i < _num_streams; i++) {
streams->emplace_back(new LoadStreamStub(_load_id, _src_id, _tablet_schema_for_index,
_enable_unique_mow_for_index));
_enable_unique_mow_for_index, incremental));
}
_streams_for_node[dst_id] = streams;
return streams;
Expand Down Expand Up @@ -101,10 +101,13 @@ bool LoadStreamMap::release() {
return false;
}

Status LoadStreamMap::close_load() {
return for_each_st([this](int64_t dst_id, const Streams& streams) -> Status {
Status LoadStreamMap::close_load(bool incremental) {
return for_each_st([this, incremental](int64_t dst_id, const Streams& streams) -> Status {
const auto& tablets = _tablets_to_commit[dst_id];
for (auto& stream : streams) {
if (stream->is_incremental() != incremental) {
continue;
}
RETURN_IF_ERROR(stream->close_load(tablets));
}
return Status::OK();
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/sink/load_stream_map_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class LoadStreamMap {
LoadStreamMap(UniqueId load_id, int64_t src_id, int num_streams, int num_use,
LoadStreamMapPool* pool);

std::shared_ptr<Streams> get_or_create(int64_t dst_id);
std::shared_ptr<Streams> get_or_create(int64_t dst_id, bool incremental = false);

std::shared_ptr<Streams> at(int64_t dst_id);

Expand All @@ -95,7 +95,7 @@ class LoadStreamMap {

// send CLOSE_LOAD to all streams, return ERROR if any.
// only call this method after release() returns true.
Status close_load();
Status close_load(bool incremental);

private:
const UniqueId _load_id;
Expand Down
13 changes: 10 additions & 3 deletions be/src/vec/sink/load_stream_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,12 @@ inline std::ostream& operator<<(std::ostream& ostr, const LoadStreamReplyHandler

LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id,
std::shared_ptr<IndexToTabletSchema> schema_map,
std::shared_ptr<IndexToEnableMoW> mow_map)
std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental)
: _load_id(load_id),
_src_id(src_id),
_tablet_schema_for_index(schema_map),
_enable_unique_mow_for_index(mow_map) {};
_enable_unique_mow_for_index(mow_map),
_is_incremental(incremental) {};

LoadStreamStub::~LoadStreamStub() {
if (_is_init.load() && !_is_closed.load()) {
Expand Down Expand Up @@ -168,7 +169,13 @@ Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
request.set_src_id(_src_id);
request.set_txn_id(txn_id);
request.set_enable_profile(enable_profile);
request.set_total_streams(total_streams);
if (_is_incremental) {
request.set_total_streams(0);
} else if (total_streams > 0) {
request.set_total_streams(total_streams);
} else {
return Status::InternalError("total_streams should be greator than 0");
}
request.set_idle_timeout_ms(idle_timeout_ms);
schema.to_protobuf(request.mutable_schema());
for (auto& tablet : tablets_for_schema) {
Expand Down
10 changes: 7 additions & 3 deletions be/src/vec/sink/load_stream_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,12 @@ class LoadStreamStub : public std::enable_shared_from_this<LoadStreamStub> {
// construct new stub
LoadStreamStub(PUniqueId load_id, int64_t src_id,
std::shared_ptr<IndexToTabletSchema> schema_map,
std::shared_ptr<IndexToEnableMoW> mow_map);
std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental = false);

LoadStreamStub(UniqueId load_id, int64_t src_id,
std::shared_ptr<IndexToTabletSchema> schema_map,
std::shared_ptr<IndexToEnableMoW> mow_map)
: LoadStreamStub(load_id.to_proto(), src_id, schema_map, mow_map) {};
std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental = false)
: LoadStreamStub(load_id.to_proto(), src_id, schema_map, mow_map, incremental) {};

// for mock this class in UT
#ifdef BE_TEST
Expand Down Expand Up @@ -195,6 +195,8 @@ class LoadStreamStub : public std::enable_shared_from_this<LoadStreamStub> {

int64_t dst_id() const { return _dst_id; }

bool is_incremental() const { return _is_incremental; }

friend std::ostream& operator<<(std::ostream& ostr, const LoadStreamStub& stub);

std::string to_string();
Expand Down Expand Up @@ -255,6 +257,8 @@ class LoadStreamStub : public std::enable_shared_from_this<LoadStreamStub> {
bthread::Mutex _failed_tablets_mutex;
std::vector<int64_t> _success_tablets;
std::unordered_map<int64_t, Status> _failed_tablets;

bool _is_incremental = false;
};

} // namespace doris
Loading

0 comments on commit 6573dc1

Please sign in to comment.