Skip to content

Commit

Permalink
[fix](move-memtable) check segment num when closing each tablet (#36753)
Browse files Browse the repository at this point in the history
## Proposed changes

Previously, there is chance that sender failed to send some data while
the receiver being unaware of.
This will cause lost data if some segments are skipped.

This PR fixes the problem by including checks in both sender and
receiver.
When sender failed to send rpc, LoadStreamStub will mark the involved
tablets failed.
Each sender will send segment num for each tablet in CLOSE_LOAD,
and receivers (LoadStream) will sum up and check total segment nums.
  • Loading branch information
kaijchen authored and dataroaring committed Jul 17, 2024
1 parent 9b565c6 commit 8648e47
Show file tree
Hide file tree
Showing 16 changed files with 151 additions and 47 deletions.
53 changes: 26 additions & 27 deletions be/src/io/fs/stream_sink_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,42 +52,26 @@ Status StreamSinkFileWriter::appendv(const Slice* data, size_t data_cnt) {
<< ", data_length: " << bytes_req << "file_type" << _file_type;

std::span<const Slice> slices {data, data_cnt};
size_t stream_index = 0;
size_t fault_injection_skipped_streams = 0;
bool ok = false;
bool skip_stream = false;
Status st;
for (auto& stream : _streams) {
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_one_replica", {
if (stream_index >= 2) {
skip_stream = true;
if (fault_injection_skipped_streams < 1) {
fault_injection_skipped_streams++;
continue;
}
});
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_two_replica", {
if (stream_index >= 1) {
skip_stream = true;
if (fault_injection_skipped_streams < 2) {
fault_injection_skipped_streams++;
continue;
}
});
if (!skip_stream) {
st = stream->append_data(_partition_id, _index_id, _tablet_id, _segment_id,
_bytes_appended, slices, false, _file_type);
}
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_one_replica", {
if (stream_index >= 2) {
st = Status::InternalError("stream sink file writer append data failed");
}
stream_index++;
skip_stream = false;
});
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_two_replica", {
if (stream_index >= 1) {
st = Status::InternalError("stream sink file writer append data failed");
}
stream_index++;
skip_stream = false;
});
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_all_replica", {
st = Status::InternalError("stream sink file writer append data failed");
});
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_all_replica",
{ continue; });
st = stream->append_data(_partition_id, _index_id, _tablet_id, _segment_id, _bytes_appended,
slices, false, _file_type);
ok = ok || st.ok();
if (!st.ok()) {
LOG(WARNING) << "failed to send segment data to backend " << stream->dst_id()
Expand Down Expand Up @@ -139,8 +123,23 @@ Status StreamSinkFileWriter::_finalize() {
VLOG_DEBUG << "writer finalize, load_id: " << print_id(_load_id) << ", index_id: " << _index_id
<< ", tablet_id: " << _tablet_id << ", segment_id: " << _segment_id;
// TODO(zhengyu): update get_inverted_index_file_size into stat
size_t fault_injection_skipped_streams = 0;
bool ok = false;
for (auto& stream : _streams) {
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_one_replica", {
if (fault_injection_skipped_streams < 1) {
fault_injection_skipped_streams++;
continue;
}
});
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_two_replica", {
if (fault_injection_skipped_streams < 2) {
fault_injection_skipped_streams++;
continue;
}
});
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_all_replica",
{ continue; });
auto st = stream->append_data(_partition_id, _index_id, _tablet_id, _segment_id,
_bytes_appended, {}, true, _file_type);
ok = ok || st.ok();
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/delta_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ Status DeltaWriterV2::close() {
return _memtable_writer->close();
}

Status DeltaWriterV2::close_wait(RuntimeProfile* profile) {
Status DeltaWriterV2::close_wait(int32_t& num_segments, RuntimeProfile* profile) {
SCOPED_RAW_TIMER(&_close_wait_time);
std::lock_guard<std::mutex> l(_lock);
DCHECK(_is_init)
Expand All @@ -191,6 +191,7 @@ Status DeltaWriterV2::close_wait(RuntimeProfile* profile) {
_update_profile(profile);
}
RETURN_IF_ERROR(_memtable_writer->close_wait(profile));
num_segments = _rowset_writer->next_segment_id();

_delta_written_success = true;
return Status::OK();
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/delta_writer_v2.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class DeltaWriterV2 {
Status close();
// wait for all memtables to be flushed.
// mem_consumption() should be 0 after this function returns.
Status close_wait(RuntimeProfile* profile = nullptr);
Status close_wait(int32_t& num_segments, RuntimeProfile* profile = nullptr);

// abandon current memtable and wait for all pending-flushing memtables to be destructed.
// mem_consumption() should be 0 after this function returns.
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/rowset/beta_rowset_writer_v2.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ class BetaRowsetWriterV2 : public RowsetWriter {

int32_t allocate_segment_id() override { return _segment_creator.allocate_segment_id(); };

int32_t next_segment_id() { return _segment_creator.next_segment_id(); };

int64_t delete_bitmap_ns() override { return _delete_bitmap_ns; }

int64_t segment_writer_ns() override { return _segment_writer_ns; }
Expand Down
13 changes: 12 additions & 1 deletion be/src/runtime/load_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,11 @@ Status TabletStream::close() {
if (!_failed_st->ok()) {
return *_failed_st;
}
if (_next_segid.load() != _num_segments) {
return Status::Corruption(
"segment num mismatch in tablet {}, expected: {}, actual: {}, load_id: {}", _id,
_num_segments, _next_segid.load(), print_id(_load_id));
}

Status st = Status::OK();
auto close_func = [this, &mu, &cv, &st]() {
Expand Down Expand Up @@ -315,11 +320,17 @@ Status IndexStream::close(const std::vector<PTabletID>& tablets_to_commit,
SCOPED_TIMER(_close_wait_timer);
// open all need commit tablets
for (const auto& tablet : tablets_to_commit) {
if (_id != tablet.index_id()) {
continue;
}
TabletStreamSharedPtr tablet_stream;
auto it = _tablet_streams_map.find(tablet.tablet_id());
if (it == _tablet_streams_map.end() && _id == tablet.index_id()) {
if (it == _tablet_streams_map.end()) {
RETURN_IF_ERROR(
_init_tablet_stream(tablet_stream, tablet.tablet_id(), tablet.partition_id()));
tablet_stream->add_num_segments(tablet.num_segments());
} else {
it->second->add_num_segments(tablet.num_segments());
}
}

Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/load_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class TabletStream {

Status append_data(const PStreamHeader& header, butil::IOBuf* data);
Status add_segment(const PStreamHeader& header, butil::IOBuf* data);
void add_num_segments(int64_t num_segments) { _num_segments += num_segments; }
Status close();
int64_t id() const { return _id; }

Expand All @@ -63,6 +64,7 @@ class TabletStream {
std::vector<std::unique_ptr<ThreadPoolToken>> _flush_tokens;
std::unordered_map<int64_t, std::unique_ptr<SegIdMapping>> _segids_mapping;
std::atomic<uint32_t> _next_segid;
int64_t _num_segments = 0;
bthread::Mutex _lock;
std::shared_ptr<Status> _failed_st;
PUniqueId _load_id;
Expand Down
9 changes: 6 additions & 3 deletions be/src/vec/sink/delta_writer_v2_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ std::shared_ptr<DeltaWriterV2> DeltaWriterV2Map::get_or_create(
return writer;
}

Status DeltaWriterV2Map::close(RuntimeProfile* profile) {
Status DeltaWriterV2Map::close(std::unordered_map<int64_t, int32_t>& segments_for_tablet,
RuntimeProfile* profile) {
int num_use = --_use_cnt;
if (num_use > 0) {
LOG(INFO) << "keeping DeltaWriterV2Map, load_id=" << _load_id << " , use_cnt=" << num_use;
Expand All @@ -58,8 +59,10 @@ Status DeltaWriterV2Map::close(RuntimeProfile* profile) {
RETURN_IF_ERROR(writer->close());
}
LOG(INFO) << "close-waiting DeltaWriterV2Map, load_id=" << _load_id;
for (auto& [_, writer] : _map) {
RETURN_IF_ERROR(writer->close_wait(profile));
for (auto& [tablet_id, writer] : _map) {
int32_t num_segments;
RETURN_IF_ERROR(writer->close_wait(num_segments, profile));
segments_for_tablet[tablet_id] = num_segments;
}
return Status::OK();
}
Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/sink/delta_writer_v2_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ class DeltaWriterV2Map {
int64_t tablet_id, std::function<std::unique_ptr<DeltaWriterV2>()> creator);

// close all delta writers in this DeltaWriterV2Map if there is no other users
Status close(RuntimeProfile* profile = nullptr);
Status close(std::unordered_map<int64_t, int32_t>& segments_for_tablet,
RuntimeProfile* profile = nullptr);

// cancel all delta writers in this DeltaWriterV2Map
void cancel(Status status);
Expand Down
18 changes: 16 additions & 2 deletions be/src/vec/sink/load_stream_map_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ void LoadStreamMap::save_tablets_to_commit(int64_t dst_id,
const std::vector<PTabletID>& tablets_to_commit) {
std::lock_guard<std::mutex> lock(_tablets_to_commit_mutex);
auto& tablets = _tablets_to_commit[dst_id];
tablets.insert(tablets.end(), tablets_to_commit.begin(), tablets_to_commit.end());
for (const auto& tablet : tablets_to_commit) {
tablets.emplace(tablet.tablet_id(), tablet);
}
}

bool LoadStreamMap::release() {
Expand All @@ -103,12 +105,24 @@ bool LoadStreamMap::release() {

Status LoadStreamMap::close_load(bool incremental) {
return for_each_st([this, incremental](int64_t dst_id, const Streams& streams) -> Status {
std::vector<PTabletID> tablets_to_commit;
const auto& tablets = _tablets_to_commit[dst_id];
tablets_to_commit.reserve(tablets.size());
for (const auto& [tablet_id, tablet] : tablets) {
tablets_to_commit.push_back(tablet);
tablets_to_commit.back().set_num_segments(_segments_for_tablet[tablet_id]);
}
bool first = true;
for (auto& stream : streams) {
if (stream->is_incremental() != incremental) {
continue;
}
RETURN_IF_ERROR(stream->close_load(tablets));
if (first) {
RETURN_IF_ERROR(stream->close_load(tablets_to_commit));
first = false;
} else {
RETURN_IF_ERROR(stream->close_load({}));
}
}
return Status::OK();
});
Expand Down
7 changes: 6 additions & 1 deletion be/src/vec/sink/load_stream_map_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ class LoadStreamMap {

void save_tablets_to_commit(int64_t dst_id, const std::vector<PTabletID>& tablets_to_commit);

void save_segments_for_tablet(const std::unordered_map<int64_t, int32_t>& segments_for_tablet) {
_segments_for_tablet.insert(segments_for_tablet.cbegin(), segments_for_tablet.cend());
}

// Return true if the last instance is just released.
bool release();

Expand All @@ -108,7 +112,8 @@ class LoadStreamMap {
std::shared_ptr<IndexToEnableMoW> _enable_unique_mow_for_index;

std::mutex _tablets_to_commit_mutex;
std::unordered_map<int64_t, std::vector<PTabletID>> _tablets_to_commit;
std::unordered_map<int64_t, std::unordered_map<int64_t, PTabletID>> _tablets_to_commit;
std::unordered_map<int64_t, int32_t> _segments_for_tablet;
};

class LoadStreamMapPool {
Expand Down
58 changes: 57 additions & 1 deletion be/src/vec/sink/load_stream_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,11 @@ Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id, int64_t tablet_id,
int64_t segment_id, uint64_t offset, std::span<const Slice> data,
bool segment_eos, FileType file_type) {
DBUG_EXECUTE_IF("LoadStreamStub.only_send_segment_0", {
if (segment_id != 0) {
return Status::OK();
}
});
PStreamHeader header;
header.set_src_id(_src_id);
*header.mutable_load_id() = _load_id;
Expand All @@ -225,6 +230,11 @@ Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id, int64
Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id, int64_t tablet_id,
int64_t segment_id, const SegmentStatistics& segment_stat,
TabletSchemaSPtr flush_schema) {
DBUG_EXECUTE_IF("LoadStreamStub.only_send_segment_0", {
if (segment_id != 0) {
return Status::OK();
}
});
PStreamHeader header;
header.set_src_id(_src_id);
*header.mutable_load_id() = _load_id;
Expand Down Expand Up @@ -369,7 +379,53 @@ Status LoadStreamStub::_send_with_buffer(butil::IOBuf& buf, bool sync) {
std::lock_guard<decltype(_send_mutex)> send_lock(_send_mutex);
buffer_lock.unlock();
VLOG_DEBUG << "send buf size : " << output.size() << ", sync: " << sync;
return _send_with_retry(output);
auto st = _send_with_retry(output);
if (!st.ok()) {
_handle_failure(output, st);
}
return st;
}

void LoadStreamStub::_handle_failure(butil::IOBuf& buf, Status st) {
while (buf.size() > 0) {
// step 1: parse header
size_t hdr_len = 0;
buf.cutn((void*)&hdr_len, sizeof(size_t));
butil::IOBuf hdr_buf;
PStreamHeader hdr;
buf.cutn(&hdr_buf, hdr_len);
butil::IOBufAsZeroCopyInputStream wrapper(hdr_buf);
hdr.ParseFromZeroCopyStream(&wrapper);

// step 2: cut data
size_t data_len = 0;
buf.cutn((void*)&data_len, sizeof(size_t));
butil::IOBuf data_buf;
buf.cutn(&data_buf, data_len);

// step 3: handle failure
switch (hdr.opcode()) {
case PStreamHeader::ADD_SEGMENT:
case PStreamHeader::APPEND_DATA: {
add_failed_tablet(hdr.tablet_id(), st);
} break;
case PStreamHeader::CLOSE_LOAD: {
brpc::StreamClose(_stream_id);
} break;
case PStreamHeader::GET_SCHEMA: {
// Just log and let wait_for_schema timeout
std::ostringstream oss;
for (const auto& tablet : hdr.tablets()) {
oss << " " << tablet.tablet_id();
}
LOG(WARNING) << "failed to send GET_SCHEMA request, tablet_id:" << oss.str() << ", "
<< *this;
} break;
default:
LOG(WARNING) << "unexpected stream message " << hdr.opcode() << ", " << *this;
DCHECK(false);
}
}
}

Status LoadStreamStub::_send_with_retry(butil::IOBuf& buf) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/sink/load_stream_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ class LoadStreamStub : public std::enable_shared_from_this<LoadStreamStub> {
_success_tablets.push_back(tablet_id);
}

// for tests only
void add_failed_tablet(int64_t tablet_id, Status reason) {
std::lock_guard<bthread::Mutex> lock(_failed_tablets_mutex);
_failed_tablets[tablet_id] = reason;
Expand All @@ -216,6 +215,7 @@ class LoadStreamStub : public std::enable_shared_from_this<LoadStreamStub> {
Status _encode_and_send(PStreamHeader& header, std::span<const Slice> data = {});
Status _send_with_buffer(butil::IOBuf& buf, bool sync = false);
Status _send_with_retry(butil::IOBuf& buf);
void _handle_failure(butil::IOBuf& buf, Status st);

Status _check_cancel() {
if (!_is_cancelled.load()) {
Expand Down
10 changes: 8 additions & 2 deletions be/src/vec/sink/writer/vtablet_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -546,13 +546,18 @@ Status VTabletWriterV2::close(Status exec_status) {

// close DeltaWriters
{
std::unordered_map<int64_t, int32_t> segments_for_tablet;
SCOPED_TIMER(_close_writer_timer);
// close all delta writers if this is the last user
auto st = _delta_writer_for_tablet->close(_profile);
auto st = _delta_writer_for_tablet->close(segments_for_tablet, _profile);
_delta_writer_for_tablet.reset();
if (!st.ok()) {
RETURN_IF_ERROR(_cancel(st));
}
// only the last sink closing delta writers will have segment num
if (!segments_for_tablet.empty()) {
_load_stream_map->save_segments_for_tablet(segments_for_tablet);
}
}

_calc_tablets_to_commit();
Expand Down Expand Up @@ -662,7 +667,8 @@ void VTabletWriterV2::_calc_tablets_to_commit() {
if (VLOG_DEBUG_IS_ON) {
partition_ids.push_back(tablet.partition_id());
}
tablets_to_commit.push_back(tablet);
PTabletID t(tablet);
tablets_to_commit.push_back(t);
}
}
if (VLOG_DEBUG_IS_ON) {
Expand Down
10 changes: 6 additions & 4 deletions be/test/vec/exec/delta_writer_v2_pool_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ TEST_F(DeltaWriterV2PoolTest, test_pool) {
EXPECT_EQ(2, pool.size());
EXPECT_EQ(map, map3);
EXPECT_NE(map, map2);
EXPECT_TRUE(map->close().ok());
EXPECT_TRUE(map2->close().ok());
EXPECT_TRUE(map3->close().ok());
std::unordered_map<int64_t, int32_t> sft;
EXPECT_TRUE(map->close(sft).ok());
EXPECT_TRUE(map2->close(sft).ok());
EXPECT_TRUE(map3->close(sft).ok());
EXPECT_EQ(0, pool.size());
}

Expand Down Expand Up @@ -72,7 +73,8 @@ TEST_F(DeltaWriterV2PoolTest, test_map) {
EXPECT_EQ(2, map->size());
EXPECT_EQ(writer, writer3);
EXPECT_NE(writer, writer2);
static_cast<void>(map->close());
std::unordered_map<int64_t, int32_t> sft;
static_cast<void>(map->close(sft));
EXPECT_EQ(0, pool.size());
}

Expand Down
Loading

0 comments on commit 8648e47

Please sign in to comment.