From d26017de216596e2b43f1d60bff0b9320db2bf65 Mon Sep 17 00:00:00 2001 From: Timur Yusupov Date: Thu, 17 Mar 2022 17:13:37 +0300 Subject: [PATCH] [#11349] docdb: Fixed RetryableRequests errors handling at follower and ConsensusRound callbacks invokation Summary: **Background** Since D5660/023c20ad6caf13e5228081ddec07d9f3cf904348 we track Write RPC request ID to prevent duplicate writes caused by retries of Write RPCs. Request ID is assigned by `YBClient` based on `TabletRequests::request_id_seq` and TServer tracks running requests IDs in `RetryableRequests` structure. When we split tablet, each of its child tablets gets copy of its Raft log and also `RetryableRequests` structure. On `YBClient` side, child tablets get `TabletRequests::request_id_seq` value from the parent tablet to be consistent with TServer side. But if process hosting `YBClient` has been restarted / partioned away or simply not issuing request to specific tablets during sequence of split, it might have no information about particular tablet and doesn't know about `TabletRequests::request_id_seq` for the parent tablet, so can't start from it for child tablets. This case was addressed by D9264/f9cae11ced426476d4f70560b2afc29f921a1737. If `YBClient` (`MetaCache`) doesn't know about the parent tablet, it use special value as `request_id_seq` for child tablet. Then, on getting "request id is less than min" error - it sets `request_id_seq` to min running request ID from the server plus 2^24 (there wouldn't be 2^24 client requests in progress from the same client to the same tablet, so it is safe to do this). **Problem** On TServer side `MaintenanceManager` periodically triggers `RetryableRequests` structure cleanup. And due to overloading or having too much tablets, it can happen that at tablet leader `RetryableRequests` structure is already cleaned up, but at the follower is not yet. This can lead to the following failure scenario: 1) Tablet `T` is split into `T1` and `T2`. 1) `YBClient` A starts doing some writes to tablet `T1`. Since A doesn't know about tablet T, it sets `request_id_seq` to `min_running_request_id` for A from the T leader (0) plus 1<<24. After 100 writes it becomes `100 + 1 << 24`. 3) Tablet `T1` is split into `T1.1` and `T1.2` 4) Tablet `T1.1` is split into `T1.1.1` and `T1.1.2` 5) `RetryableRequests` cleanup is done at the leader of `T1.1`, but not at followers. 6) `YBClient` A starts doing write to tablet `T1.1.2`. Since A doesn't know about tablet `T1.1`, it sets `request_id_seq` to `min_running_request_id` for A from `T1.1` leader (0 due to cleanup) plus `1<<24`, that is `1<<24`. 7) `T1.1` leader accepts write with `request_id = 1 <<24` and tries to replicate it at followers. 8) At follower, `min_running_request_id` is `100 + 1 << 24` (inherited from parent tablets). So, follower rejects write request that is already accepted by leader. Error message is: "Request id 16777316 from client ... is less than min running 16777216". **Solution** The solution is to update `ReplicaState::AddPendingOperation` to make it run cleanup at the follower in case of getting error when trying to register retryable request ID and try to register again. Also, some other fixes/improvements have been made: - Updated `RetryableRequests::Register` to return error instead of invoking callback on error by itself. Callers are responsible for invoking callback if needed. - Updated `RaftConsensus::StartConsensusOnlyRoundUnlocked` to invoke callback in case of error. - Updated `YBClient` logging to have prefix with both client ID and `MetaCache` address. - Added sanity check to avoid duplicate `ConsensusRoundCallback` calls and to verify that callback is always got called before destruction. - Updated `RaftConsensus::AppendNewRoundsToQueueUnlocked` to reduce code duplication and made it responsible for invoking callback for processed rounds. - Refactored `OperationDriver::Init` to exit early in case of error. - Enhanced `CatalogManager::RegisterNewTabletForSplit` logging. - Fixed `OperationTracker::Add` null pointer handling. - Added test. Test Plan: TabletSplitITest.SplitClientRequestsClean Reviewers: sergei Reviewed By: sergei Subscribers: bogdan Differential Revision: https://phabricator.dev.yugabyte.com/D15484 --- src/yb/client/client-internal.cc | 3 +- src/yb/client/client-internal.h | 1 + src/yb/client/client.cc | 28 +++-- src/yb/client/client.h | 2 + src/yb/client/meta_cache.cc | 10 +- src/yb/consensus/consensus_round.cc | 15 +++ src/yb/consensus/consensus_round.h | 16 ++- src/yb/consensus/raft_consensus.cc | 111 ++++++++++-------- src/yb/consensus/raft_consensus.h | 6 + src/yb/consensus/replica_state.cc | 24 +++- src/yb/consensus/replica_state.h | 2 +- src/yb/consensus/retryable_requests.cc | 24 ++-- src/yb/consensus/retryable_requests.h | 7 +- .../tablet-split-itest-base.cc | 9 +- .../tablet-split-itest-base.h | 8 +- .../integration-tests/tablet-split-itest.cc | 91 +++++++++++++- src/yb/master/catalog_manager.cc | 13 +- src/yb/tablet/operations/operation_driver.cc | 24 ++-- src/yb/tablet/operations/operation_driver.h | 2 +- .../operations/operation_tracker-test.cc | 14 +-- src/yb/tablet/operations/operation_tracker.cc | 6 +- 21 files changed, 287 insertions(+), 129 deletions(-) diff --git a/src/yb/client/client-internal.cc b/src/yb/client/client-internal.cc index df19c1230b70..72558f310957 100644 --- a/src/yb/client/client-internal.cc +++ b/src/yb/client/client-internal.cc @@ -309,7 +309,8 @@ YB_CLIENT_SPECIALIZE_SIMPLE_EX(Replication, UpdateConsumerOnProducerSplit); YBClient::Data::Data() : leader_master_rpc_(rpcs_.InvalidHandle()), latest_observed_hybrid_time_(YBClient::kNoHybridTime), - id_(ClientId::GenerateRandom()) { + id_(ClientId::GenerateRandom()), + log_prefix_(Format("Client $0: ", id_)) { for(auto& cache : tserver_count_cached_) { cache.store(0, std::memory_order_relaxed); } diff --git a/src/yb/client/client-internal.h b/src/yb/client/client-internal.h index be59d57c2e8d..1d443a258750 100644 --- a/src/yb/client/client-internal.h +++ b/src/yb/client/client-internal.h @@ -499,6 +499,7 @@ class YBClient::Data { std::unique_ptr threadpool_; const ClientId id_; + const std::string log_prefix_; // Used to track requests that were sent to a particular tablet, so it could track different // RPCs related to the same write operation and reject duplicates. diff --git a/src/yb/client/client.cc b/src/yb/client/client.cc index 429ac265997b..f67b71d216e7 100644 --- a/src/yb/client/client.cc +++ b/src/yb/client/client.cc @@ -1101,7 +1101,7 @@ Status YBClient::CreateTablegroup(const std::string& namespace_name, table_name, internal::GetSchema(ybschema), internal::GetSchema(info.schema)); - LOG(ERROR) << msg; + LOG_WITH_PREFIX(ERROR) << msg; return STATUS(AlreadyPresent, msg); } @@ -1148,7 +1148,8 @@ Status YBClient::DeleteTablegroup(const std::string& namespace_id, // A prior attempt to delete the table has succeeded, but // appeared as a failure to the client due to, e.g., an I/O or // network issue. - LOG(INFO) << "Parent table for tablegroup with ID " << tablegroup_id << " already deleted."; + LOG_WITH_PREFIX(INFO) << "Parent table for tablegroup with ID " << tablegroup_id + << " already deleted."; return Status::OK(); } else { return StatusFromPB(resp.error().status()); @@ -1166,7 +1167,7 @@ Status YBClient::DeleteTablegroup(const std::string& namespace_id, strings::Substitute("Failed waiting for parent table with id $0 to finish being deleted", resp.parent_table_id())); - LOG(INFO) << "Deleted parent table for tablegroup with ID " << tablegroup_id; + LOG_WITH_PREFIX(INFO) << "Deleted parent table for tablegroup with ID " << tablegroup_id; return Status::OK(); } @@ -1358,7 +1359,7 @@ Status YBClient::GetPermissions(client::internal::PermissionsCache* permissions_ GetPermissionsResponsePB resp; CALL_SYNC_LEADER_MASTER_RPC_EX(Dcl, req, resp, GetPermissions); - VLOG(1) << "Got permissions cache: " << resp.ShortDebugString(); + VLOG_WITH_PREFIX(1) << "Got permissions cache: " << resp.ShortDebugString(); // The first request is a special case. We always replace the cache since we don't have anything. if (!version) { @@ -1953,7 +1954,7 @@ void YBClient::RequestFinished(const TabletId& tablet_id, RetryableRequestId req if (it != tablet.running_requests.end()) { tablet.running_requests.erase(it); } else { - LOG(DFATAL) << "RequestFinished called for an unknown request: " + LOG_WITH_PREFIX(DFATAL) << "RequestFinished called for an unknown request: " << tablet_id << ", " << request_id; } } @@ -1964,7 +1965,8 @@ void YBClient::MaybeUpdateMinRunningRequestId( auto& tablet = data_->tablet_requests_[tablet_id]; if (tablet.request_id_seq == kInitializeFromMinRunning) { tablet.request_id_seq = min_running_request_id + (1 << 24); - VLOG(1) << "Set request_id_seq for tablet " << tablet_id << " to " << tablet.request_id_seq; + VLOG_WITH_PREFIX(1) << "Set request_id_seq for tablet " << tablet_id << " to " + << tablet.request_id_seq; } } @@ -2018,7 +2020,7 @@ Status YBClient::ListMasters(CoarseTimePoint deadline, std::vector* master_uuids->clear(); for (const ServerEntryPB& master : resp.masters()) { if (master.has_error()) { - LOG(ERROR) << "Master " << master.ShortDebugString() << " hit error " + LOG_WITH_PREFIX(ERROR) << "Master " << master.ShortDebugString() << " hit error " << master.error().ShortDebugString(); return StatusFromPB(master.error()); } @@ -2189,11 +2191,11 @@ bool YBClient::IsMultiMaster() const { Result YBClient::NumTabletsForUserTable(TableType table_type) { if (table_type == TableType::PGSQL_TABLE_TYPE && FLAGS_ysql_num_tablets > 0) { - VLOG(1) << "num_tablets = " << FLAGS_ysql_num_tablets + VLOG_WITH_PREFIX(1) << "num_tablets = " << FLAGS_ysql_num_tablets << ": --ysql_num_tablets is specified."; return FLAGS_ysql_num_tablets; } else if (FLAGS_ycql_num_tablets > 0) { - VLOG(1) << "num_tablets = " << FLAGS_ycql_num_tablets + VLOG_WITH_PREFIX(1) << "num_tablets = " << FLAGS_ycql_num_tablets << ": --ycql_num_tablets is specified."; return FLAGS_ycql_num_tablets; } else { @@ -2202,12 +2204,12 @@ Result YBClient::NumTabletsForUserTable(TableType table_type) { int num_tablets = 0; if (table_type == TableType::PGSQL_TABLE_TYPE) { num_tablets = tserver_count * FLAGS_ysql_num_shards_per_tserver; - VLOG(1) << "num_tablets = " << num_tablets << ": " + VLOG_WITH_PREFIX(1) << "num_tablets = " << num_tablets << ": " << "calculated as tserver_count * FLAGS_ysql_num_shards_per_tserver (" << tserver_count << " * " << FLAGS_ysql_num_shards_per_tserver << ")"; } else { num_tablets = tserver_count * FLAGS_yb_num_shards_per_tserver; - VLOG(1) << "num_tablets = " << num_tablets << ": " + VLOG_WITH_PREFIX(1) << "num_tablets = " << num_tablets << ": " << "calculated as tserver_count * FLAGS_yb_num_shards_per_tserver (" << tserver_count << " * " << FLAGS_yb_num_shards_per_tserver << ")"; } @@ -2264,5 +2266,9 @@ Result GetTableId(YBClient* client, const YBTableName& table_name) { return VERIFY_RESULT(client->GetYBTableInfo(table_name)).table_id; } +const std::string& YBClient::LogPrefix() const { + return data_->log_prefix_; +} + } // namespace client } // namespace yb diff --git a/src/yb/client/client.h b/src/yb/client/client.h index a42b86346f65..0b315916f380 100644 --- a/src/yb/client/client.h +++ b/src/yb/client/client.h @@ -775,6 +775,8 @@ class YBClient { void Shutdown(); + const std::string& LogPrefix() const; + private: class Data; diff --git a/src/yb/client/meta_cache.cc b/src/yb/client/meta_cache.cc index c0b9b7623d94..23fe563c8cfd 100644 --- a/src/yb/client/meta_cache.cc +++ b/src/yb/client/meta_cache.cc @@ -667,7 +667,7 @@ void LookupCallbackVisitor::operator()( MetaCache::MetaCache(YBClient* client) : client_(client), master_lookup_sem_(FLAGS_max_concurrent_master_lookups), - log_prefix_(Format("MetaCache($0): ", static_cast(this))) { + log_prefix_(Format("MetaCache($0)(client_id: $1): ", static_cast(this), client_->id())) { } MetaCache::~MetaCache() { @@ -1107,8 +1107,9 @@ void MetaCache::MaybeUpdateClientRequests(const RemoteTablet& tablet) { auto& tablet_requests = client_->data_->tablet_requests_; const auto requests_it = tablet_requests.find(tablet.split_parent_tablet_id()); if (requests_it == tablet_requests.end()) { - VLOG_WITH_PREFIX(2) << "Can't find request_id_seq for tablet " - << tablet.split_parent_tablet_id(); + VLOG_WITH_PREFIX(2) << "Can't find request_id_seq for parent tablet " + << tablet.split_parent_tablet_id() + << " (split_depth: " << tablet.split_depth() - 1 << ")"; // This can happen if client wasn't active (for example node was partitioned away) during // sequence of splits that resulted in `tablet` creation, so we don't have info about `tablet` // split parent. @@ -1124,7 +1125,8 @@ void MetaCache::MaybeUpdateClientRequests(const RemoteTablet& tablet) { return; } VLOG_WITH_PREFIX(2) << "Setting request_id_seq for tablet " << tablet.tablet_id() - << " from tablet " << tablet.split_parent_tablet_id() << " to " + << " (split_depth: " << tablet.split_depth() << ") from tablet " + << tablet.split_parent_tablet_id() << " to " << requests_it->second.request_id_seq; tablet_requests[tablet.tablet_id()].request_id_seq = requests_it->second.request_id_seq; } diff --git a/src/yb/consensus/consensus_round.cc b/src/yb/consensus/consensus_round.cc index 3d8356bdf83b..0f2a03567412 100644 --- a/src/yb/consensus/consensus_round.cc +++ b/src/yb/consensus/consensus_round.cc @@ -27,8 +27,23 @@ ConsensusRound::ConsensusRound(Consensus* consensus, DCHECK_NOTNULL(replicate_msg_.get()); } +ConsensusRound::~ConsensusRound() { + LOG_IF(DFATAL, callback_ && !callback_called) + << this << ": callback(" << callback_ << ")->ReplicationFinished() hasn't been called for " + << replicate_msg_->ShortDebugString(); +} + +void ConsensusRound::NotifyAddedToLeader(const OpId& op_id, const OpId& committed_op_id) { + callback_->AddedToLeader(op_id, committed_op_id); +} + void ConsensusRound::NotifyReplicationFinished( const Status& status, int64_t leader_term, OpIds* applied_op_ids) { + bool expected = false; + LOG_IF(DFATAL, !callback_called.compare_exchange_strong(expected, true)) + << this + << ": callback(\" << callback_ << \")->ReplicationFinished() has been already called for " + << replicate_msg_->ShortDebugString(); callback_->ReplicationFinished(status, leader_term, applied_op_ids); } diff --git a/src/yb/consensus/consensus_round.h b/src/yb/consensus/consensus_round.h index 8f101fde7535..51ad9837b564 100644 --- a/src/yb/consensus/consensus_round.h +++ b/src/yb/consensus/consensus_round.h @@ -40,11 +40,6 @@ class ConsensusRoundCallback { virtual void ReplicationFinished( const Status& status, int64_t leader_term, OpIds* applied_op_ids) = 0; - // Utility method for failed replication. - void ReplicationFailed(const Status& status) { - ReplicationFinished(status, OpId::kUnknownTerm, /* applied_op_ids= */ nullptr); - } - virtual ~ConsensusRoundCallback() = default; }; @@ -79,14 +74,16 @@ class ConsensusRound : public RefCountedThreadSafe { callback_ = callback_holder_.get(); } - ConsensusRoundCallback* callback() const { - return callback_; - } + void NotifyAddedToLeader(const OpId& op_id, const OpId& committed_op_id); // If a continuation was set, notifies it that the round has been replicated. void NotifyReplicationFinished( const Status& status, int64_t leader_term, OpIds* applied_op_ids); + void NotifyReplicationFailed(const Status& status) { + NotifyReplicationFinished(status, OpId::kUnknownTerm, /* applied_op_ids= */ nullptr); + } + // Binds this round such that it may not be eventually executed in any term // other than 'term'. // See CheckBoundTerm(). @@ -110,7 +107,7 @@ class ConsensusRound : public RefCountedThreadSafe { friend class RaftConsensusQuorumTest; friend class RefCountedThreadSafe; - ~ConsensusRound() {} + ~ConsensusRound(); Consensus* const consensus_; // This round's replicate message. @@ -125,6 +122,7 @@ class ConsensusRound : public RefCountedThreadSafe { ConsensusRoundCallback* callback_ = nullptr; std::unique_ptr callback_holder_; + std::atomic callback_called{false}; }; } // namespace consensus diff --git a/src/yb/consensus/raft_consensus.cc b/src/yb/consensus/raft_consensus.cc index 1ef3474f3b13..0447963645b2 100644 --- a/src/yb/consensus/raft_consensus.cc +++ b/src/yb/consensus/raft_consensus.cc @@ -271,7 +271,7 @@ bool IsChangeConfigOperation(OperationType op_type) { class NonTrackedRoundCallback : public ConsensusRoundCallback { public: explicit NonTrackedRoundCallback(ConsensusRound* round, const StdStatusCallback& callback) - : round_(round), callback_(callback) { + : round_(DCHECK_NOTNULL(round)), callback_(callback) { } void AddedToLeader(const OpId& op_id, const OpId& committed_op_id) override { @@ -1146,7 +1146,7 @@ Status RaftConsensus::ReplicateBatch(const ConsensusRounds& rounds) { << " operations as failed with that status"; // Treat all the operations in the batch as failed. for (size_t i = rounds.size(); i != processed_rounds;) { - rounds[--i]->callback()->ReplicationFailed(status); + rounds[--i]->NotifyReplicationFailed(status); } } return status; @@ -1171,30 +1171,7 @@ Status RaftConsensus::DoReplicateBatch(const ConsensusRounds& rounds, size_t* pr for (const auto& round : rounds) { RETURN_NOT_OK(round->CheckBoundTerm(current_term)); } - auto status = AppendNewRoundsToQueueUnlocked(rounds, processed_rounds); - if (!status.ok()) { - // In general we have 3 kinds of rounds in case of failure: - // 1) Rounds that were rejected by retryable requests. - // We should not call ReplicationFinished for them. - // 2) Rounds that were registered with retryable requests. - // We should call state_->NotifyReplicationFinishedUnlocked for them. - // 3) Rounds that were not registered with retryable requests. - // We should call ReplicationFinished directly for them. Could do it after releasing - // the lock. I.e. in ReplicateBatch. - // - // (3) is all rounds starting with index *processed_rounds and above. - // For (1) we reset bound term, so could use it to distinguish between (1) and (2). - for (size_t i = *processed_rounds; i != 0;) { - --i; - if (rounds[i]->bound_term() == OpId::kUnknownTerm) { - // Already rejected by retryable requests. - continue; - } - state_->NotifyReplicationFinishedUnlocked( - rounds[i], status, OpId::kUnknownTerm, /* applied_op_ids */ nullptr); - } - return status; - } + RETURN_NOT_OK(AppendNewRoundsToQueueUnlocked(rounds, processed_rounds)); } peer_manager_->SignalRequest(RequestTriggerMode::kNonEmptyOnly); @@ -1247,15 +1224,54 @@ Status RaftConsensus::AppendNewRoundsToQueueUnlocked( std::vector replicate_msgs; replicate_msgs.reserve(rounds.size()); + + auto status = DoAppendNewRoundsToQueueUnlocked(rounds, processed_rounds, &replicate_msgs); + if (!status.ok()) { + for (auto iter = replicate_msgs.rbegin(); iter != replicate_msgs.rend(); ++iter) { + RollbackIdAndDeleteOpId(*iter, /* should_exists = */ true); + } + // In general we have 3 kinds of rounds in case of failure: + // 1) Rounds that were rejected by retryable requests. + // We should not call ReplicationFinished for them, because it has been already called. + // 2) Rounds that were registered with retryable requests. + // We should call state_->NotifyReplicationFinishedUnlocked for them. + // 3) Rounds that were not registered with retryable requests. + // We should call ReplicationFinished directly for them. Could do it after releasing + // the lock. I.e. in ReplicateBatch. + // + // (3) is all rounds starting with index *processed_rounds and above. + // For (1) we reset bound term, so could use it to distinguish between (1) and (2). + for (size_t i = *processed_rounds; i != 0;) { + --i; + if (rounds[i]->bound_term() == OpId::kUnknownTerm) { + // Already notified. + continue; + } + state_->NotifyReplicationFinishedUnlocked( + rounds[i], status, OpId::kUnknownTerm, /* applied_op_ids */ nullptr); + } + } + return status; +} + +Status RaftConsensus::DoAppendNewRoundsToQueueUnlocked( + const ConsensusRounds& rounds, size_t* processed_rounds, + std::vector* replicate_msgs) { const OpId& committed_op_id = state_->GetCommittedOpIdUnlocked(); for (const auto& round : rounds) { ++*processed_rounds; - if (round->replicate_msg()->op_type() == OperationType::WRITE_OP && - !state_->RegisterRetryableRequest(round)) { - round->BindToTerm(OpId::kUnknownTerm); // Mark round as non replicating - continue; + if (round->replicate_msg()->op_type() == OperationType::WRITE_OP) { + auto result = state_->RegisterRetryableRequest(round); + if (!result.ok()) { + round->NotifyReplicationFinished( + result.status(), round->bound_term(), /* applied_op_ids = */ nullptr); + } + if (!result.ok() || !*result) { + round->BindToTerm(OpId::kUnknownTerm); // Mark round as non replicating + continue; + } } OpId op_id = state_->NewIdUnlocked(); @@ -1264,24 +1280,18 @@ Status RaftConsensus::AppendNewRoundsToQueueUnlocked( // the write batch inside the write operation. // // TODO: we could allocate multiple HybridTimes in batch, only reading system clock once. - round->callback()->AddedToLeader(op_id, committed_op_id); + round->NotifyAddedToLeader(op_id, committed_op_id); - Status s = state_->AddPendingOperation(round, OperationMode::kLeader); + auto s = state_->AddPendingOperation(round, OperationMode::kLeader); if (!s.ok()) { RollbackIdAndDeleteOpId(round->replicate_msg(), false /* should_exists */); - - // Iterate rounds in the reverse order and release ids. - while (!replicate_msgs.empty()) { - RollbackIdAndDeleteOpId(replicate_msgs.back(), true /* should_exists */); - replicate_msgs.pop_back(); - } return s; } - replicate_msgs.push_back(round->replicate_msg()); + replicate_msgs->push_back(round->replicate_msg()); } - if (replicate_msgs.empty()) { + if (replicate_msgs->empty()) { return Status::OK(); } @@ -1290,24 +1300,21 @@ Status RaftConsensus::AppendNewRoundsToQueueUnlocked( auto s = CheckLeasesUnlocked(rounds.back()); if (s.ok()) { - s = queue_->AppendOperations(replicate_msgs, committed_op_id, state_->Clock().Now()); + s = queue_->AppendOperations(*replicate_msgs, committed_op_id, state_->Clock().Now()); } // Handle Status::ServiceUnavailable(), which means the queue is full. // TODO: what are we doing about other errors here? Should we also release OpIds in those cases? if (PREDICT_FALSE(!s.ok())) { - LOG_WITH_PREFIX(WARNING) << "Could not append replicate request: " << s << ", queue status: " - << queue_->ToString(); - for (auto iter = replicate_msgs.rbegin(); iter != replicate_msgs.rend(); ++iter) { - RollbackIdAndDeleteOpId(*iter, true /* should_exists */); - // TODO Possibly evict a dangling peer from the configuration here. - // TODO count of number of ops failed due to consensus queue overflow. - } + LOG_WITH_PREFIX(WARNING) << "Could not append replicate request: " << s + << ", queue status: " << queue_->ToString(); + // TODO Possibly evict a dangling peer from the configuration here. + // TODO count of number of ops failed due to consensus queue overflow. return s.CloneAndPrepend("Unable to append operations to consensus queue"); } - state_->UpdateLastReceivedOpIdUnlocked(replicate_msgs.back()->id()); + state_->UpdateLastReceivedOpIdUnlocked(replicate_msgs->back()->id()); return Status::OK(); } @@ -2825,7 +2832,11 @@ Status RaftConsensus::StartConsensusOnlyRoundUnlocked(const ReplicateMsgPtr& msg &DoNothingStatusCB, std::placeholders::_1); round->SetCallback(MakeNonTrackedRoundCallback(round.get(), std::move(client_cb))); - return state_->AddPendingOperation(round, OperationMode::kFollower); + auto status = state_->AddPendingOperation(round, OperationMode::kFollower); + if (!status.ok()) { + round->NotifyReplicationFailed(status); + } + return status; } Status RaftConsensus::WaitForLeaderLeaseImprecise(CoarseTimePoint deadline) { diff --git a/src/yb/consensus/raft_consensus.h b/src/yb/consensus/raft_consensus.h index beadca46e5d0..b0e8a652359b 100644 --- a/src/yb/consensus/raft_consensus.h +++ b/src/yb/consensus/raft_consensus.h @@ -294,6 +294,8 @@ class RaftConsensus : public std::enable_shared_from_this, virtual CHECKED_STATUS AppendNewRoundToQueueUnlocked(const ConsensusRoundPtr& round); // processed_rounds - out value for number of rounds that were processed. + // This function doesn't invoke callbacks for not processed rounds for performance reasons and it + // is responsibility of the caller to invoke callbacks after lock has been released. virtual CHECKED_STATUS AppendNewRoundsToQueueUnlocked( const ConsensusRounds& rounds, size_t* processed_rounds); @@ -341,6 +343,10 @@ class RaftConsensus : public std::enable_shared_from_this, void MajorityReplicatedNumSSTFilesChanged(uint64_t majority_replicated_num_sst_files) override; + CHECKED_STATUS DoAppendNewRoundsToQueueUnlocked( + const ConsensusRounds& rounds, size_t* processed_rounds, + std::vector* replicate_msgs); + // Control whether printing of log messages should be done for a particular // function call. enum AllowLogging { diff --git a/src/yb/consensus/replica_state.cc b/src/yb/consensus/replica_state.cc index adf6cbb02ee1..55f5feae9b3c 100644 --- a/src/yb/consensus/replica_state.cc +++ b/src/yb/consensus/replica_state.cc @@ -678,8 +678,26 @@ Status ReplicaState::AddPendingOperation(const ConsensusRoundPtr& round, Operati } } else if (op_type == WRITE_OP) { // Leader registers an operation with RetryableRequests even before assigning an op id. - if (mode == OperationMode::kFollower && !retryable_requests_.Register(round)) { - return STATUS(IllegalState, "Cannot register retryable request on follower"); + if (mode == OperationMode::kFollower) { + auto result = retryable_requests_.Register(round); + const auto error_msg = "Cannot register retryable request on follower"; + if (!result.ok()) { + // This can happen if retryable requests have been cleaned up on leader before the follower, + // see https://github.com/yugabyte/yugabyte-db/issues/11349. + // Just run cleanup in this case and retry. + VLOG_WITH_PREFIX(1) << error_msg << ": " << result.status() + << ". Cleaning retryable requests"; + auto min_op_id ATTRIBUTE_UNUSED = retryable_requests_.CleanExpiredReplicatedAndGetMinOpId(); + result = retryable_requests_.Register(round); + } + if (!result.ok()) { + return result.status() + .CloneAndReplaceCode(Status::kIllegalState) + .CloneAndPrepend(error_msg); + } + if (!*result) { + return STATUS(IllegalState, error_msg); + } } } else if (op_type == SPLIT_OP) { const auto& split_request = round->replicate_msg()->split_request(); @@ -1319,7 +1337,7 @@ uint64_t ReplicaState::OnDiskSize() const { return cmeta_->on_disk_size(); } -bool ReplicaState::RegisterRetryableRequest(const ConsensusRoundPtr& round) { +Result ReplicaState::RegisterRetryableRequest(const ConsensusRoundPtr& round) { return retryable_requests_.Register(round); } diff --git a/src/yb/consensus/replica_state.h b/src/yb/consensus/replica_state.h index f6673e7959fe..4e83f7287d9c 100644 --- a/src/yb/consensus/replica_state.h +++ b/src/yb/consensus/replica_state.h @@ -412,7 +412,7 @@ class ReplicaState { OpId MinRetryableRequestOpId(); - bool RegisterRetryableRequest(const ConsensusRoundPtr& round); + Result RegisterRetryableRequest(const ConsensusRoundPtr& round); RestartSafeCoarseMonoClock& Clock(); diff --git a/src/yb/consensus/retryable_requests.cc b/src/yb/consensus/retryable_requests.cc index 1eeccacafe07..7260c6e6851b 100644 --- a/src/yb/consensus/retryable_requests.cc +++ b/src/yb/consensus/retryable_requests.cc @@ -213,7 +213,7 @@ class RetryableRequests::Impl { VLOG_WITH_PREFIX(1) << "Start"; } - bool Register(const ConsensusRoundPtr& round, RestartSafeCoarseTimePoint entry_time) { + Result Register(const ConsensusRoundPtr& round, RestartSafeCoarseTimePoint entry_time) { auto data = ReplicateData::FromMsg(*round->replicate_msg()); if (!data) { return true; @@ -229,23 +229,19 @@ class RetryableRequests::Impl { data.write().min_running_request_id(), &client_retryable_requests); if (data.request_id() < client_retryable_requests.min_running_request_id) { - round->NotifyReplicationFinished( - STATUS_EC_FORMAT( - Expired, - MinRunningRequestIdStatusData(client_retryable_requests.min_running_request_id), - "Request id $0 is less than min running $1", data.request_id(), - client_retryable_requests.min_running_request_id), - round->bound_term(), nullptr /* applied_op_ids */); - return false; + return STATUS_EC_FORMAT( + Expired, MinRunningRequestIdStatusData(client_retryable_requests.min_running_request_id), + "Request id $0 from client $1 is less than min running $2", data.request_id(), + data.client_id(), client_retryable_requests.min_running_request_id); } auto& replicated_indexed_by_last_id = client_retryable_requests.replicated.get(); auto it = replicated_indexed_by_last_id.lower_bound(data.request_id()); if (it != replicated_indexed_by_last_id.end() && it->first_id <= data.request_id()) { - round->NotifyReplicationFinished( - STATUS(AlreadyPresent, "Duplicate request"), round->bound_term(), - nullptr /* applied_op_ids */); - return false; + return STATUS_FORMAT( + AlreadyPresent, "Duplicate request $0 from client $1 (min running $2)", + data.request_id(), data.client_id(), + client_retryable_requests.min_running_request_id); } auto& running_indexed_by_request_id = client_retryable_requests.running.get(); @@ -560,7 +556,7 @@ void RetryableRequests::operator=(RetryableRequests&& rhs) { impl_ = std::move(rhs.impl_); } -bool RetryableRequests::Register( +Result RetryableRequests::Register( const ConsensusRoundPtr& round, RestartSafeCoarseTimePoint entry_time) { return impl_->Register(round, entry_time); } diff --git a/src/yb/consensus/retryable_requests.h b/src/yb/consensus/retryable_requests.h index 4d41f2c10cae..cd5cd766620a 100644 --- a/src/yb/consensus/retryable_requests.h +++ b/src/yb/consensus/retryable_requests.h @@ -42,9 +42,10 @@ class RetryableRequests { void operator=(RetryableRequests&& rhs); // Tries to register a new running retryable request. - // Returns false if request with such id is already present. - bool Register(const ConsensusRoundPtr& round, - RestartSafeCoarseTimePoint entry_time = RestartSafeCoarseTimePoint()); + // Returns error or false if request with such id is already present. + Result Register( + const ConsensusRoundPtr& round, + RestartSafeCoarseTimePoint entry_time = RestartSafeCoarseTimePoint()); // Cleans expires replicated requests and returns min op id of running request. yb::OpId CleanExpiredReplicatedAndGetMinOpId(); diff --git a/src/yb/integration-tests/tablet-split-itest-base.cc b/src/yb/integration-tests/tablet-split-itest-base.cc index bb688cd1cd66..7fdac18c182b 100644 --- a/src/yb/integration-tests/tablet-split-itest-base.cc +++ b/src/yb/integration-tests/tablet-split-itest-base.cc @@ -224,14 +224,19 @@ template Result> TabletSplitITestBase::WriteRows( client::TableHandle* table, const uint32_t num_rows, - const int32_t start_key, const int32_t start_value) { + const int32_t start_key, const int32_t start_value, client::YBSessionPtr session) { auto min_hash_code = std::numeric_limits::max(); auto max_hash_code = std::numeric_limits::min(); LOG(INFO) << "Writing " << num_rows << " rows..."; auto txn = this->CreateTransaction(); - auto session = this->CreateSession(txn); + client::YBSessionPtr session_holder; + if (session) { + session->SetTransaction(txn); + } else { + session = this->CreateSession(txn); + } for (int32_t i = start_key, v = start_value; i < start_key + static_cast(num_rows); ++i, ++v) { diff --git a/src/yb/integration-tests/tablet-split-itest-base.h b/src/yb/integration-tests/tablet-split-itest-base.h index 6e1ed5fa5475..c4e099f3eda9 100644 --- a/src/yb/integration-tests/tablet-split-itest-base.h +++ b/src/yb/integration-tests/tablet-split-itest-base.h @@ -79,11 +79,13 @@ class TabletSplitITestBase : public client::TransactionTestBase // Writes `num_rows` rows into the specified table using `CreateInsertRequest`. // Returns a pair with min and max hash code written. Result> WriteRows( - client::TableHandle* table, uint32_t num_rows, int32_t start_key, int32_t start_value); + client::TableHandle* table, uint32_t num_rows, int32_t start_key, int32_t start_value, + client::YBSessionPtr session = nullptr); Result> WriteRows( - client::TableHandle* table, uint32_t num_rows = 2000, int32_t start_key = 1) { - return WriteRows(table, num_rows, start_key, start_key); + client::TableHandle* table, uint32_t num_rows = 2000, int32_t start_key = 1, + client::YBSessionPtr session = nullptr) { + return WriteRows(table, num_rows, start_key, start_key, session); } Result> WriteRows( diff --git a/src/yb/integration-tests/tablet-split-itest.cc b/src/yb/integration-tests/tablet-split-itest.cc index d1d809f08eff..281d2b31d95d 100644 --- a/src/yb/integration-tests/tablet-split-itest.cc +++ b/src/yb/integration-tests/tablet-split-itest.cc @@ -30,6 +30,7 @@ #include "yb/consensus/consensus.pb.h" #include "yb/consensus/consensus.proxy.h" #include "yb/consensus/consensus_util.h" +#include "yb/consensus/raft_consensus.h" #include "yb/docdb/doc_key.h" @@ -92,8 +93,10 @@ using namespace yb::client::kv_table_test; // NOLINT DECLARE_int64(db_block_size_bytes); DECLARE_int64(db_write_buffer_size); DECLARE_bool(enable_load_balancing); +DECLARE_bool(enable_maintenance_manager); DECLARE_int32(load_balancer_max_concurrent_adds); DECLARE_int32(load_balancer_max_concurrent_removals); +DECLARE_int32(maintenance_manager_polling_interval_ms); DECLARE_int64(rocksdb_compact_flush_rate_limit_bytes_per_sec); DECLARE_int32(rocksdb_level0_file_num_compaction_trigger); DECLARE_bool(rocksdb_disable_compactions); @@ -107,6 +110,7 @@ DECLARE_bool(TEST_skip_deleting_split_tablets); DECLARE_uint64(tablet_split_limit_per_table); DECLARE_bool(TEST_pause_before_post_split_compaction); DECLARE_int32(TEST_slowdown_backfill_alter_table_rpcs_ms); +DECLARE_int32(retryable_request_timeout_secs); DECLARE_int32(rocksdb_base_background_compactions); DECLARE_int32(rocksdb_max_background_compactions); DECLARE_int32(rocksdb_level0_file_num_compaction_trigger); @@ -573,10 +577,18 @@ TEST_F(TabletSplitITest, SplitTabletDuringReadWriteLoad) { ASSERT_OK(cluster_->RestartSync()); } -void TabletSplitITest::SplitClientRequestsIds(int split_depth) { +namespace { + +void SetSmallDbBlockSize() { // Set data block size low enough, so we have enough data blocks for middle key // detection to work correctly. FLAGS_db_block_size_bytes = 1_KB; +} + +} + +void TabletSplitITest::SplitClientRequestsIds(int split_depth) { + SetSmallDbBlockSize(); const auto kNumRows = 50 * (1 << split_depth); SetNumTablets(1); @@ -622,8 +634,83 @@ TEST_F(TabletSplitITest, SplitClientRequestsIdsDepth2) { SplitClientRequestsIds(2); } +class TabletSplitITestSlowMainenanceManager : public TabletSplitITest { + public: + void SetUp() override { + ANNOTATE_UNPROTECTED_WRITE(FLAGS_maintenance_manager_polling_interval_ms) = 60 * 1000; + TabletSplitITest::SetUp(); + } +}; + +TEST_F_EX(TabletSplitITest, SplitClientRequestsClean, TabletSplitITestSlowMainenanceManager) { + constexpr auto kSplitDepth = 3; + constexpr auto kNumRows = 50 * (1 << kSplitDepth); + constexpr auto kRetryableRequestTimeoutSecs = 1; + SetSmallDbBlockSize(); + + // Prevent periodic retryable requests cleanup by maintenance manager. + ANNOTATE_UNPROTECTED_WRITE(FLAGS_enable_maintenance_manager) = false; + + SetNumTablets(1); + CreateTable(); + + ASSERT_OK(WriteRows(kNumRows, 1)); + ASSERT_OK(CheckRowsCount(kNumRows)); + + auto* catalog_mgr = ASSERT_RESULT(catalog_manager()); + + auto client = ASSERT_RESULT(cluster_->CreateClient()); + LOG(INFO) << "Creating new client, id: " << client->id(); + + for (int i = 0; i < kSplitDepth; ++i) { + auto peers = ListTableActiveTabletLeadersPeers(cluster_.get(), table_->id()); + ASSERT_EQ(peers.size(), 1 << i); + for (const auto& peer : peers) { + const auto tablet = peer->shared_tablet(); + ASSERT_OK(tablet->Flush(tablet::FlushMode::kSync)); + tablet->ForceRocksDBCompactInTest(); + ASSERT_OK(SplitTablet(catalog_mgr, *tablet)); + } + + ASSERT_OK(WaitForTabletSplitCompletion(/* expected_non_split_tablets =*/ 1 << (i + 1))); + + if (i == 0) { + // This will set client's request_id_seq for tablets with split depth 1 to > 1^24 (see + // YBClient::MaybeUpdateMinRunningRequestId) and update min_running_request_id for this + // client at tserver side. + auto session = client->NewSession(); + ASSERT_OK(WriteRows(&this->table_, kNumRows, 1, session)); + } + } + + auto leader_peers = ListTableActiveTabletLeadersPeers(cluster_.get(), table_->id()); + // Force cleaning retryable requests on leaders of active (split depth = kSplitDepth) tablets. + ANNOTATE_UNPROTECTED_WRITE(FLAGS_retryable_request_timeout_secs) = kRetryableRequestTimeoutSecs; + SleepFor(kRetryableRequestTimeoutSecs * 1s); + for (int i = 0; i < 2; ++i) { + for (auto& leader_peer : leader_peers) { + LOG(INFO) << leader_peer->LogPrefix() << "MinRetryableRequestOpId(): " + << AsString(leader_peer->raft_consensus()->MinRetryableRequestOpId()); + // Delay to make RetryableRequests::CleanExpiredReplicatedAndGetMinOpId (called by + // MinRetryableRequestOpId) do delayed cleanup. + SleepFor(kRetryableRequestTimeoutSecs * 1s); + } + } + + auto session = client->NewSession(); + // Since client doesn't know about tablets with split depth > 1, it will set request_id_seq for + // active tablets based on min_running_request_id on leader, but on leader it has been cleaned up. + // So, request_id_seq will be set to 0 + 1^24 that is less than min_running_request_id on the + // follower (at which retryable requests is not yet cleaned up). + // This will test how follower handles getting request_id less than min_running_request_id. + LOG(INFO) << "Starting write to active tablets after retryable requests cleanup on leaders..."; + ASSERT_OK(WriteRows(&this->table_, kNumRows, 1, session)); + LOG(INFO) << "Write to active tablets completed."; +} + + TEST_F(TabletSplitITest, SplitSingleTabletWithLimit) { - FLAGS_db_block_size_bytes = 1_KB; + SetSmallDbBlockSize(); ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_validate_all_tablet_candidates) = false; const auto kSplitDepth = 3; const auto kNumRows = 50 * (1 << kSplitDepth); diff --git a/src/yb/master/catalog_manager.cc b/src/yb/master/catalog_manager.cc index 68d58d439796..5f97d4a3c99a 100644 --- a/src/yb/master/catalog_manager.cc +++ b/src/yb/master/catalog_manager.cc @@ -5786,7 +5786,8 @@ Result CatalogManager::RegisterNewTabletForSplit( new_tablet_meta.set_state(SysTabletsEntryPB::CREATING); new_tablet_meta.mutable_committed_consensus_state()->CopyFrom( source_tablet_meta.committed_consensus_state()); - new_tablet_meta.set_split_depth(source_tablet_meta.split_depth() + 1); + const auto new_split_depth = source_tablet_meta.split_depth() + 1; + new_tablet_meta.set_split_depth(new_split_depth); new_tablet_meta.set_split_parent_tablet_id(source_tablet_info->tablet_id()); // TODO(tsplit): consider and handle failure scenarios, for example: // - Crash or leader failover before sending out the split tasks. @@ -5813,10 +5814,12 @@ Result CatalogManager::RegisterNewTabletForSplit( auto tablet_map_checkout = tablet_map_.CheckOut(); (*tablet_map_checkout)[new_tablet->id()] = new_tablet; } - LOG(INFO) << "Registered new tablet " << new_tablet->tablet_id() - << " (" << AsString(partition) << ") to split the tablet " - << source_tablet_info->tablet_id() - << " (" << AsString(source_tablet_meta.partition()) + LOG(INFO) << "Registered new tablet " << new_tablet->tablet_id() << " (partition_key_start: " + << Slice(partition.partition_key_start()).ToDebugString(/* max_length = */ 64) + << ", partition_key_end: " + << Slice(partition.partition_key_end()).ToDebugString(/* max_length = */ 64) + << ", split_depth: " << new_split_depth << ") to split the tablet " + << source_tablet_info->tablet_id() << " (" << AsString(source_tablet_meta.partition()) << ") for table " << table->ToString() << ", new partition_list_version: " << new_partition_list_version; diff --git a/src/yb/tablet/operations/operation_driver.cc b/src/yb/tablet/operations/operation_driver.cc index 768a6b8418d8..020102a84554 100644 --- a/src/yb/tablet/operations/operation_driver.cc +++ b/src/yb/tablet/operations/operation_driver.cc @@ -108,6 +108,13 @@ Status OperationDriver::Init(std::unique_ptr* operation, int64_t term operation_ = std::move(*operation); } + auto result = operation_tracker_->Add(this); + + if (!result.ok() && operation) { + *operation = std::move(operation_); + return result; + } + if (term == OpId::kUnknownTerm) { if (operation_) { op_id_copy_.store(operation_->op_id(), boost::memory_order_release); @@ -123,16 +130,11 @@ Status OperationDriver::Init(std::unique_ptr* operation, int64_t term } } - auto result = operation_tracker_->Add(this); - if (!result.ok() && operation) { - *operation = std::move(operation_); - } - if (term == OpId::kUnknownTerm && operation_) { operation_->AddedToFollower(); } - return result; + return Status::OK(); } yb::OpId OperationDriver::GetOpId() { @@ -177,8 +179,8 @@ void OperationDriver::ExecuteAsync() { if (delay != 0 && operation_type() == OperationType::kWrite && operation_->tablet()->tablet_id() != master::kSysCatalogTabletId) { - LOG(INFO) << "T " << operation_->tablet()->tablet_id() - << " Debug sleep for: " << MonoDelta(1ms * delay) << "\n" << GetStackTrace(); + LOG_WITH_PREFIX(INFO) << " Debug sleep for: " << MonoDelta(1ms * delay) << "\n" + << GetStackTrace(); std::this_thread::sleep_for(1ms * delay); } @@ -357,7 +359,7 @@ void OperationDriver::ReplicationFinished( } } -void OperationDriver::Abort(const Status& status) { +void OperationDriver::TEST_Abort(const Status& status) { CHECK(!status.ok()); ReplicationState repl_state_copy; @@ -451,11 +453,11 @@ std::string OperationDriver::LogPrefix() const { string state_str = StateString(repl_state_copy, prep_state_copy); // We use the tablet and the peer (T, P) to identify ts and tablet and the hybrid_time (Ts) to // (help) identify the operation. The state string (S) describes the state of the operation. - return Format("T $0 P $1 S $2 Ts $3 $4: ", + return Format("T $0 P $1 S $2 Ts $3 $4 ($5): ", // consensus_ is NULL in some unit tests. PREDICT_TRUE(consensus_) ? consensus_->tablet_id() : "(unknown)", PREDICT_TRUE(consensus_) ? consensus_->peer_uuid() : "(unknown)", - state_str, ts_string, operation_type); + state_str, ts_string, operation_type, static_cast(this)); } int64_t OperationDriver::SpaceUsed() { diff --git a/src/yb/tablet/operations/operation_driver.h b/src/yb/tablet/operations/operation_driver.h index 48d0378dd60a..d5bb1ede8468 100644 --- a/src/yb/tablet/operations/operation_driver.h +++ b/src/yb/tablet/operations/operation_driver.h @@ -140,7 +140,7 @@ class OperationDriver : public RefCountedThreadSafe, // multiple stages by multiple executors it might not be possible to stop // the operation immediately, but this will make sure it is aborted // at the next synchronization point. - void Abort(const Status& status); + void TEST_Abort(const Status& status); // Callback from Consensus when replication is complete, and thus the operation // is considered "committed" from the consensus perspective (ie it will be diff --git a/src/yb/tablet/operations/operation_tracker-test.cc b/src/yb/tablet/operations/operation_tracker-test.cc index 2aa3fecc0c5c..486bf853a6b6 100644 --- a/src/yb/tablet/operations/operation_tracker-test.cc +++ b/src/yb/tablet/operations/operation_tracker-test.cc @@ -143,7 +143,7 @@ TEST_F(OperationTrackerTest, TestGetPending) { ASSERT_EQ(driver.get(), pending_operations.front().get()); // And mark the operation as failed, which will cause it to unregister itself. - driver->Abort(STATUS(Aborted, "")); + driver->TEST_Abort(STATUS(Aborted, "")); ASSERT_EQ(0, tracker_.TEST_GetNumPending()); } @@ -165,7 +165,7 @@ void OperationTrackerTest::RunOperationsThread(CountDownLatch* finish_latch) { // Finish all the operations for (const scoped_refptr& driver : drivers) { // And mark the operation as failed, which will cause it to unregister itself. - driver->Abort(STATUS(Aborted, "")); + driver->TEST_Abort(STATUS(Aborted, "")); } } @@ -213,11 +213,11 @@ TEST_F(OperationTrackerTest, TestMetrics) { ASSERT_OK(AddDrivers(3, &drivers)); ASSERT_NO_FATALS(CheckMetrics(entity_, 3, 0, 0)); - drivers[0]->Abort(STATUS(Aborted, "")); + drivers[0]->TEST_Abort(STATUS(Aborted, "")); ASSERT_NO_FATALS(CheckMetrics(entity_, 2, 0, 0)); - drivers[1]->Abort(STATUS(Aborted, "")); - drivers[2]->Abort(STATUS(Aborted, "")); + drivers[1]->TEST_Abort(STATUS(Aborted, "")); + drivers[2]->TEST_Abort(STATUS(Aborted, "")); ASSERT_NO_FATALS(CheckMetrics(entity_, 0, 0, 0)); } @@ -260,7 +260,7 @@ TEST_F(OperationTrackerTest, TestTooManyOperations) { ASSERT_NO_FATALS(CheckMemTracker(t)); // If we abort one operation, we should be able to add one more. - drivers.back()->Abort(STATUS(Aborted, "")); + drivers.back()->TEST_Abort(STATUS(Aborted, "")); drivers.pop_back(); ASSERT_NO_FATALS(CheckMemTracker(t)); ASSERT_OK(AddDrivers(1, &drivers)); @@ -268,7 +268,7 @@ TEST_F(OperationTrackerTest, TestTooManyOperations) { // Clean up. for (const scoped_refptr& driver : drivers) { - driver->Abort(STATUS(Aborted, "")); + driver->TEST_Abort(STATUS(Aborted, "")); } } diff --git a/src/yb/tablet/operations/operation_tracker.cc b/src/yb/tablet/operations/operation_tracker.cc index 659c713e3042..75ebb990197c 100644 --- a/src/yb/tablet/operations/operation_tracker.cc +++ b/src/yb/tablet/operations/operation_tracker.cc @@ -151,8 +151,10 @@ Status OperationTracker::Add(OperationDriver* driver) { metrics_->operation_memory_pressure_rejections->Increment(); } - // May be null in unit tests. - Tablet* tablet = driver->operation()->tablet(); + // May be nullptr due to TabletPeer::SetPropagatedSafeTime. + auto* operation = driver->operation(); + // May be nullptr in unit tests even when operation is not nullptr. + auto* tablet = operation ? operation->tablet() : nullptr; string msg = Substitute( "Operation failed, tablet $0 operation memory consumption ($1) "