From 8648e4767e0459045ccb2782a794de37a21c299f Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Mon, 8 Jul 2024 17:58:28 +0800 Subject: [PATCH] [fix](move-memtable) check segment num when closing each tablet (#36753) ## Proposed changes Previously, there is chance that sender failed to send some data while the receiver being unaware of. This will cause lost data if some segments are skipped. This PR fixes the problem by including checks in both sender and receiver. When sender failed to send rpc, LoadStreamStub will mark the involved tablets failed. Each sender will send segment num for each tablet in CLOSE_LOAD, and receivers (LoadStream) will sum up and check total segment nums. --- be/src/io/fs/stream_sink_file_writer.cpp | 53 +++++++++-------- be/src/olap/delta_writer_v2.cpp | 3 +- be/src/olap/delta_writer_v2.h | 2 +- be/src/olap/rowset/beta_rowset_writer_v2.h | 2 + be/src/runtime/load_stream.cpp | 13 ++++- be/src/runtime/load_stream.h | 2 + be/src/vec/sink/delta_writer_v2_pool.cpp | 9 ++- be/src/vec/sink/delta_writer_v2_pool.h | 3 +- be/src/vec/sink/load_stream_map_pool.cpp | 18 +++++- be/src/vec/sink/load_stream_map_pool.h | 7 ++- be/src/vec/sink/load_stream_stub.cpp | 58 ++++++++++++++++++- be/src/vec/sink/load_stream_stub.h | 2 +- be/src/vec/sink/writer/vtablet_writer_v2.cpp | 10 +++- .../vec/exec/delta_writer_v2_pool_test.cpp | 10 ++-- gensrc/proto/internal_service.proto | 1 + .../test_multi_replica_fault_injection.groovy | 5 +- 16 files changed, 151 insertions(+), 47 deletions(-) diff --git a/be/src/io/fs/stream_sink_file_writer.cpp b/be/src/io/fs/stream_sink_file_writer.cpp index 1d7f823af1091f..cc55adc1cfb769 100644 --- a/be/src/io/fs/stream_sink_file_writer.cpp +++ b/be/src/io/fs/stream_sink_file_writer.cpp @@ -52,42 +52,26 @@ Status StreamSinkFileWriter::appendv(const Slice* data, size_t data_cnt) { << ", data_length: " << bytes_req << "file_type" << _file_type; std::span slices {data, data_cnt}; - size_t stream_index = 0; + size_t fault_injection_skipped_streams = 0; bool ok = false; - bool skip_stream = false; Status st; for (auto& stream : _streams) { DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_one_replica", { - if (stream_index >= 2) { - skip_stream = true; + if (fault_injection_skipped_streams < 1) { + fault_injection_skipped_streams++; + continue; } }); DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_two_replica", { - if (stream_index >= 1) { - skip_stream = true; + if (fault_injection_skipped_streams < 2) { + fault_injection_skipped_streams++; + continue; } }); - if (!skip_stream) { - st = stream->append_data(_partition_id, _index_id, _tablet_id, _segment_id, - _bytes_appended, slices, false, _file_type); - } - DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_one_replica", { - if (stream_index >= 2) { - st = Status::InternalError("stream sink file writer append data failed"); - } - stream_index++; - skip_stream = false; - }); - DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_two_replica", { - if (stream_index >= 1) { - st = Status::InternalError("stream sink file writer append data failed"); - } - stream_index++; - skip_stream = false; - }); - DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_all_replica", { - st = Status::InternalError("stream sink file writer append data failed"); - }); + DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_all_replica", + { continue; }); + st = stream->append_data(_partition_id, _index_id, _tablet_id, _segment_id, _bytes_appended, + slices, false, _file_type); ok = ok || st.ok(); if (!st.ok()) { LOG(WARNING) << "failed to send segment data to backend " << stream->dst_id() @@ -139,8 +123,23 @@ Status StreamSinkFileWriter::_finalize() { VLOG_DEBUG << "writer finalize, load_id: " << print_id(_load_id) << ", index_id: " << _index_id << ", tablet_id: " << _tablet_id << ", segment_id: " << _segment_id; // TODO(zhengyu): update get_inverted_index_file_size into stat + size_t fault_injection_skipped_streams = 0; bool ok = false; for (auto& stream : _streams) { + DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_one_replica", { + if (fault_injection_skipped_streams < 1) { + fault_injection_skipped_streams++; + continue; + } + }); + DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_two_replica", { + if (fault_injection_skipped_streams < 2) { + fault_injection_skipped_streams++; + continue; + } + }); + DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_all_replica", + { continue; }); auto st = stream->append_data(_partition_id, _index_id, _tablet_id, _segment_id, _bytes_appended, {}, true, _file_type); ok = ok || st.ok(); diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp index 805f072f6e6d4a..4b9f4231bf1745 100644 --- a/be/src/olap/delta_writer_v2.cpp +++ b/be/src/olap/delta_writer_v2.cpp @@ -181,7 +181,7 @@ Status DeltaWriterV2::close() { return _memtable_writer->close(); } -Status DeltaWriterV2::close_wait(RuntimeProfile* profile) { +Status DeltaWriterV2::close_wait(int32_t& num_segments, RuntimeProfile* profile) { SCOPED_RAW_TIMER(&_close_wait_time); std::lock_guard l(_lock); DCHECK(_is_init) @@ -191,6 +191,7 @@ Status DeltaWriterV2::close_wait(RuntimeProfile* profile) { _update_profile(profile); } RETURN_IF_ERROR(_memtable_writer->close_wait(profile)); + num_segments = _rowset_writer->next_segment_id(); _delta_written_success = true; return Status::OK(); diff --git a/be/src/olap/delta_writer_v2.h b/be/src/olap/delta_writer_v2.h index 9aded8fb5560fd..beeb3d3ecd3ec5 100644 --- a/be/src/olap/delta_writer_v2.h +++ b/be/src/olap/delta_writer_v2.h @@ -78,7 +78,7 @@ class DeltaWriterV2 { Status close(); // wait for all memtables to be flushed. // mem_consumption() should be 0 after this function returns. - Status close_wait(RuntimeProfile* profile = nullptr); + Status close_wait(int32_t& num_segments, RuntimeProfile* profile = nullptr); // abandon current memtable and wait for all pending-flushing memtables to be destructed. // mem_consumption() should be 0 after this function returns. diff --git a/be/src/olap/rowset/beta_rowset_writer_v2.h b/be/src/olap/rowset/beta_rowset_writer_v2.h index 89bd3045089a93..d2267a3dbd1734 100644 --- a/be/src/olap/rowset/beta_rowset_writer_v2.h +++ b/be/src/olap/rowset/beta_rowset_writer_v2.h @@ -130,6 +130,8 @@ class BetaRowsetWriterV2 : public RowsetWriter { int32_t allocate_segment_id() override { return _segment_creator.allocate_segment_id(); }; + int32_t next_segment_id() { return _segment_creator.next_segment_id(); }; + int64_t delete_bitmap_ns() override { return _delete_bitmap_ns; } int64_t segment_writer_ns() override { return _segment_writer_ns; } diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp index 0a35bf6008e807..07d488e578fe12 100644 --- a/be/src/runtime/load_stream.cpp +++ b/be/src/runtime/load_stream.cpp @@ -252,6 +252,11 @@ Status TabletStream::close() { if (!_failed_st->ok()) { return *_failed_st; } + if (_next_segid.load() != _num_segments) { + return Status::Corruption( + "segment num mismatch in tablet {}, expected: {}, actual: {}, load_id: {}", _id, + _num_segments, _next_segid.load(), print_id(_load_id)); + } Status st = Status::OK(); auto close_func = [this, &mu, &cv, &st]() { @@ -315,11 +320,17 @@ Status IndexStream::close(const std::vector& tablets_to_commit, SCOPED_TIMER(_close_wait_timer); // open all need commit tablets for (const auto& tablet : tablets_to_commit) { + if (_id != tablet.index_id()) { + continue; + } TabletStreamSharedPtr tablet_stream; auto it = _tablet_streams_map.find(tablet.tablet_id()); - if (it == _tablet_streams_map.end() && _id == tablet.index_id()) { + if (it == _tablet_streams_map.end()) { RETURN_IF_ERROR( _init_tablet_stream(tablet_stream, tablet.tablet_id(), tablet.partition_id())); + tablet_stream->add_num_segments(tablet.num_segments()); + } else { + it->second->add_num_segments(tablet.num_segments()); } } diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h index 9e6e0e36a4b721..80e69c784ad789 100644 --- a/be/src/runtime/load_stream.h +++ b/be/src/runtime/load_stream.h @@ -52,6 +52,7 @@ class TabletStream { Status append_data(const PStreamHeader& header, butil::IOBuf* data); Status add_segment(const PStreamHeader& header, butil::IOBuf* data); + void add_num_segments(int64_t num_segments) { _num_segments += num_segments; } Status close(); int64_t id() const { return _id; } @@ -63,6 +64,7 @@ class TabletStream { std::vector> _flush_tokens; std::unordered_map> _segids_mapping; std::atomic _next_segid; + int64_t _num_segments = 0; bthread::Mutex _lock; std::shared_ptr _failed_st; PUniqueId _load_id; diff --git a/be/src/vec/sink/delta_writer_v2_pool.cpp b/be/src/vec/sink/delta_writer_v2_pool.cpp index 87c18194127e10..bc5233ac30796e 100644 --- a/be/src/vec/sink/delta_writer_v2_pool.cpp +++ b/be/src/vec/sink/delta_writer_v2_pool.cpp @@ -43,7 +43,8 @@ std::shared_ptr DeltaWriterV2Map::get_or_create( return writer; } -Status DeltaWriterV2Map::close(RuntimeProfile* profile) { +Status DeltaWriterV2Map::close(std::unordered_map& segments_for_tablet, + RuntimeProfile* profile) { int num_use = --_use_cnt; if (num_use > 0) { LOG(INFO) << "keeping DeltaWriterV2Map, load_id=" << _load_id << " , use_cnt=" << num_use; @@ -58,8 +59,10 @@ Status DeltaWriterV2Map::close(RuntimeProfile* profile) { RETURN_IF_ERROR(writer->close()); } LOG(INFO) << "close-waiting DeltaWriterV2Map, load_id=" << _load_id; - for (auto& [_, writer] : _map) { - RETURN_IF_ERROR(writer->close_wait(profile)); + for (auto& [tablet_id, writer] : _map) { + int32_t num_segments; + RETURN_IF_ERROR(writer->close_wait(num_segments, profile)); + segments_for_tablet[tablet_id] = num_segments; } return Status::OK(); } diff --git a/be/src/vec/sink/delta_writer_v2_pool.h b/be/src/vec/sink/delta_writer_v2_pool.h index 912b9216e9f58e..7e58eea31498f6 100644 --- a/be/src/vec/sink/delta_writer_v2_pool.h +++ b/be/src/vec/sink/delta_writer_v2_pool.h @@ -70,7 +70,8 @@ class DeltaWriterV2Map { int64_t tablet_id, std::function()> creator); // close all delta writers in this DeltaWriterV2Map if there is no other users - Status close(RuntimeProfile* profile = nullptr); + Status close(std::unordered_map& segments_for_tablet, + RuntimeProfile* profile = nullptr); // cancel all delta writers in this DeltaWriterV2Map void cancel(Status status); diff --git a/be/src/vec/sink/load_stream_map_pool.cpp b/be/src/vec/sink/load_stream_map_pool.cpp index 7a3072ade6e70b..2fcb8deaeb2c85 100644 --- a/be/src/vec/sink/load_stream_map_pool.cpp +++ b/be/src/vec/sink/load_stream_map_pool.cpp @@ -87,7 +87,9 @@ void LoadStreamMap::save_tablets_to_commit(int64_t dst_id, const std::vector& tablets_to_commit) { std::lock_guard lock(_tablets_to_commit_mutex); auto& tablets = _tablets_to_commit[dst_id]; - tablets.insert(tablets.end(), tablets_to_commit.begin(), tablets_to_commit.end()); + for (const auto& tablet : tablets_to_commit) { + tablets.emplace(tablet.tablet_id(), tablet); + } } bool LoadStreamMap::release() { @@ -103,12 +105,24 @@ bool LoadStreamMap::release() { Status LoadStreamMap::close_load(bool incremental) { return for_each_st([this, incremental](int64_t dst_id, const Streams& streams) -> Status { + std::vector tablets_to_commit; const auto& tablets = _tablets_to_commit[dst_id]; + tablets_to_commit.reserve(tablets.size()); + for (const auto& [tablet_id, tablet] : tablets) { + tablets_to_commit.push_back(tablet); + tablets_to_commit.back().set_num_segments(_segments_for_tablet[tablet_id]); + } + bool first = true; for (auto& stream : streams) { if (stream->is_incremental() != incremental) { continue; } - RETURN_IF_ERROR(stream->close_load(tablets)); + if (first) { + RETURN_IF_ERROR(stream->close_load(tablets_to_commit)); + first = false; + } else { + RETURN_IF_ERROR(stream->close_load({})); + } } return Status::OK(); }); diff --git a/be/src/vec/sink/load_stream_map_pool.h b/be/src/vec/sink/load_stream_map_pool.h index 5d08b3d1aff7c5..8728686ce9bd62 100644 --- a/be/src/vec/sink/load_stream_map_pool.h +++ b/be/src/vec/sink/load_stream_map_pool.h @@ -89,6 +89,10 @@ class LoadStreamMap { void save_tablets_to_commit(int64_t dst_id, const std::vector& tablets_to_commit); + void save_segments_for_tablet(const std::unordered_map& segments_for_tablet) { + _segments_for_tablet.insert(segments_for_tablet.cbegin(), segments_for_tablet.cend()); + } + // Return true if the last instance is just released. bool release(); @@ -108,7 +112,8 @@ class LoadStreamMap { std::shared_ptr _enable_unique_mow_for_index; std::mutex _tablets_to_commit_mutex; - std::unordered_map> _tablets_to_commit; + std::unordered_map> _tablets_to_commit; + std::unordered_map _segments_for_tablet; }; class LoadStreamMapPool { diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index 63f91678989248..f322d67ceaf9c3 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -207,6 +207,11 @@ Status LoadStreamStub::open(BrpcClientCache* client_cache, Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id, int64_t tablet_id, int64_t segment_id, uint64_t offset, std::span data, bool segment_eos, FileType file_type) { + DBUG_EXECUTE_IF("LoadStreamStub.only_send_segment_0", { + if (segment_id != 0) { + return Status::OK(); + } + }); PStreamHeader header; header.set_src_id(_src_id); *header.mutable_load_id() = _load_id; @@ -225,6 +230,11 @@ Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id, int64 Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id, int64_t tablet_id, int64_t segment_id, const SegmentStatistics& segment_stat, TabletSchemaSPtr flush_schema) { + DBUG_EXECUTE_IF("LoadStreamStub.only_send_segment_0", { + if (segment_id != 0) { + return Status::OK(); + } + }); PStreamHeader header; header.set_src_id(_src_id); *header.mutable_load_id() = _load_id; @@ -369,7 +379,53 @@ Status LoadStreamStub::_send_with_buffer(butil::IOBuf& buf, bool sync) { std::lock_guard send_lock(_send_mutex); buffer_lock.unlock(); VLOG_DEBUG << "send buf size : " << output.size() << ", sync: " << sync; - return _send_with_retry(output); + auto st = _send_with_retry(output); + if (!st.ok()) { + _handle_failure(output, st); + } + return st; +} + +void LoadStreamStub::_handle_failure(butil::IOBuf& buf, Status st) { + while (buf.size() > 0) { + // step 1: parse header + size_t hdr_len = 0; + buf.cutn((void*)&hdr_len, sizeof(size_t)); + butil::IOBuf hdr_buf; + PStreamHeader hdr; + buf.cutn(&hdr_buf, hdr_len); + butil::IOBufAsZeroCopyInputStream wrapper(hdr_buf); + hdr.ParseFromZeroCopyStream(&wrapper); + + // step 2: cut data + size_t data_len = 0; + buf.cutn((void*)&data_len, sizeof(size_t)); + butil::IOBuf data_buf; + buf.cutn(&data_buf, data_len); + + // step 3: handle failure + switch (hdr.opcode()) { + case PStreamHeader::ADD_SEGMENT: + case PStreamHeader::APPEND_DATA: { + add_failed_tablet(hdr.tablet_id(), st); + } break; + case PStreamHeader::CLOSE_LOAD: { + brpc::StreamClose(_stream_id); + } break; + case PStreamHeader::GET_SCHEMA: { + // Just log and let wait_for_schema timeout + std::ostringstream oss; + for (const auto& tablet : hdr.tablets()) { + oss << " " << tablet.tablet_id(); + } + LOG(WARNING) << "failed to send GET_SCHEMA request, tablet_id:" << oss.str() << ", " + << *this; + } break; + default: + LOG(WARNING) << "unexpected stream message " << hdr.opcode() << ", " << *this; + DCHECK(false); + } + } } Status LoadStreamStub::_send_with_retry(butil::IOBuf& buf) { diff --git a/be/src/vec/sink/load_stream_stub.h b/be/src/vec/sink/load_stream_stub.h index dd15eb7bf4c758..b6436a4b81a4e4 100644 --- a/be/src/vec/sink/load_stream_stub.h +++ b/be/src/vec/sink/load_stream_stub.h @@ -206,7 +206,6 @@ class LoadStreamStub : public std::enable_shared_from_this { _success_tablets.push_back(tablet_id); } - // for tests only void add_failed_tablet(int64_t tablet_id, Status reason) { std::lock_guard lock(_failed_tablets_mutex); _failed_tablets[tablet_id] = reason; @@ -216,6 +215,7 @@ class LoadStreamStub : public std::enable_shared_from_this { Status _encode_and_send(PStreamHeader& header, std::span data = {}); Status _send_with_buffer(butil::IOBuf& buf, bool sync = false); Status _send_with_retry(butil::IOBuf& buf); + void _handle_failure(butil::IOBuf& buf, Status st); Status _check_cancel() { if (!_is_cancelled.load()) { diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index fbefd7a6d836cd..b293151ef94d7f 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -546,13 +546,18 @@ Status VTabletWriterV2::close(Status exec_status) { // close DeltaWriters { + std::unordered_map segments_for_tablet; SCOPED_TIMER(_close_writer_timer); // close all delta writers if this is the last user - auto st = _delta_writer_for_tablet->close(_profile); + auto st = _delta_writer_for_tablet->close(segments_for_tablet, _profile); _delta_writer_for_tablet.reset(); if (!st.ok()) { RETURN_IF_ERROR(_cancel(st)); } + // only the last sink closing delta writers will have segment num + if (!segments_for_tablet.empty()) { + _load_stream_map->save_segments_for_tablet(segments_for_tablet); + } } _calc_tablets_to_commit(); @@ -662,7 +667,8 @@ void VTabletWriterV2::_calc_tablets_to_commit() { if (VLOG_DEBUG_IS_ON) { partition_ids.push_back(tablet.partition_id()); } - tablets_to_commit.push_back(tablet); + PTabletID t(tablet); + tablets_to_commit.push_back(t); } } if (VLOG_DEBUG_IS_ON) { diff --git a/be/test/vec/exec/delta_writer_v2_pool_test.cpp b/be/test/vec/exec/delta_writer_v2_pool_test.cpp index a67a701c409e59..dc86ce8c3a28aa 100644 --- a/be/test/vec/exec/delta_writer_v2_pool_test.cpp +++ b/be/test/vec/exec/delta_writer_v2_pool_test.cpp @@ -42,9 +42,10 @@ TEST_F(DeltaWriterV2PoolTest, test_pool) { EXPECT_EQ(2, pool.size()); EXPECT_EQ(map, map3); EXPECT_NE(map, map2); - EXPECT_TRUE(map->close().ok()); - EXPECT_TRUE(map2->close().ok()); - EXPECT_TRUE(map3->close().ok()); + std::unordered_map sft; + EXPECT_TRUE(map->close(sft).ok()); + EXPECT_TRUE(map2->close(sft).ok()); + EXPECT_TRUE(map3->close(sft).ok()); EXPECT_EQ(0, pool.size()); } @@ -72,7 +73,8 @@ TEST_F(DeltaWriterV2PoolTest, test_map) { EXPECT_EQ(2, map->size()); EXPECT_EQ(writer, writer3); EXPECT_NE(writer, writer2); - static_cast(map->close()); + std::unordered_map sft; + static_cast(map->close(sft)); EXPECT_EQ(0, pool.size()); } diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 4457c50917bc3e..8857b65278b5bb 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -72,6 +72,7 @@ message PTabletID { optional int64 partition_id = 1; optional int64 index_id = 2; optional int64 tablet_id = 3; + optional int64 num_segments = 4; } message PTabletInfo { diff --git a/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy index 37d8b4f26100c4..8080b52ff483a1 100644 --- a/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy +++ b/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy @@ -91,10 +91,11 @@ suite("test_multi_replica_fault_injection", "nonConcurrent") { // success load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_one_replica", "sucess") // StreamSinkFileWriter appendv write segment failed two replica - load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_two_replica", "replica num 1 < load required replica num 2") + load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_two_replica", "add segment failed") // StreamSinkFileWriter appendv write segment failed all replica load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_all_replica", "failed to send segment data to any replicas") - + // test segment num check when LoadStreamStub missed tail segments + load_with_injection("LoadStreamStub.only_send_segment_0", "segment num mismatch") sql """ set enable_memtable_on_sink_node=false """ } }