Skip to content

Commit

Permalink
[#11349] docdb: Fixed RetryableRequests errors handling at follower a…
Browse files Browse the repository at this point in the history
…nd ConsensusRound callbacks invokation

Summary:
**Background**

Since D5660/023c20ad6caf13e5228081ddec07d9f3cf904348 we track Write RPC request ID to prevent duplicate writes caused by retries of Write RPCs.
Request ID is assigned by `YBClient` based on `TabletRequests::request_id_seq` and TServer tracks running requests IDs in `RetryableRequests` structure.

When we split tablet, each of its child tablets gets copy of its Raft log and also `RetryableRequests` structure.
On `YBClient` side, child tablets get `TabletRequests::request_id_seq` value from the parent tablet to be consistent with TServer side.

But if process hosting `YBClient` has been restarted / partioned away or simply not issuing request to specific tablets during sequence of split, it might have no information about particular tablet and doesn't know about `TabletRequests::request_id_seq` for the parent tablet, so can't start from it for child tablets. This case was addressed by D9264/f9cae11ced426476d4f70560b2afc29f921a1737. If `YBClient` (`MetaCache`) doesn't know about the parent tablet, it use special value as `request_id_seq` for child tablet. Then, on getting "request id is less than min" error - it sets `request_id_seq` to min running request ID from the server plus 2^24 (there wouldn't be 2^24 client requests in progress from the same client to the same tablet, so it is safe to do this).

**Problem**

On TServer side `MaintenanceManager` periodically triggers `RetryableRequests` structure cleanup. And due to overloading or having too much tablets, it can happen that at tablet leader `RetryableRequests` structure is already cleaned up, but at the follower is not yet. This can lead to the following failure scenario:
1) Tablet `T` is split into `T1` and `T2`.
1) `YBClient` A starts doing some writes to tablet `T1`. Since A doesn't know about tablet T, it sets `request_id_seq` to `min_running_request_id` for A from the T leader (0) plus 1<<24. After 100 writes it becomes `100 + 1 << 24`.
3) Tablet `T1` is split into `T1.1` and `T1.2`
4) Tablet `T1.1` is split into `T1.1.1` and `T1.1.2`
5) `RetryableRequests` cleanup is done at the leader of `T1.1`, but not at followers.
6) `YBClient` A starts doing write to tablet `T1.1.2`. Since A doesn't know about tablet `T1.1`, it sets `request_id_seq` to `min_running_request_id` for A from `T1.1` leader (0 due to cleanup) plus `1<<24`, that is `1<<24`.
7) `T1.1` leader accepts write with `request_id = 1 <<24` and tries to replicate it at followers.
8) At follower, `min_running_request_id` is `100 + 1 << 24` (inherited from parent tablets). So, follower rejects write request that is already accepted by leader. Error message is: "Request id 16777316 from client ... is less than min running 16777216".

**Solution**

The solution is to update `ReplicaState::AddPendingOperation` to make it run cleanup at the follower in case of getting error when trying to register retryable request ID and try to register again.

Also, some other fixes/improvements have been made:
- Updated `RetryableRequests::Register` to return error instead of invoking callback on error by itself. Callers are responsible for invoking callback if needed.
- Updated `RaftConsensus::StartConsensusOnlyRoundUnlocked` to invoke callback in case of error.
- Updated `YBClient` logging to have prefix with both client ID and `MetaCache` address.
- Added sanity check to avoid duplicate `ConsensusRoundCallback` calls and to verify that callback is always got called before destruction.
- Updated `RaftConsensus::AppendNewRoundsToQueueUnlocked` to reduce code duplication and made it responsible for invoking callback for processed rounds.
- Refactored `OperationDriver::Init` to exit early in case of error.
- Enhanced `CatalogManager::RegisterNewTabletForSplit` logging.
- Fixed `OperationTracker::Add` null pointer handling.
- Added test.

Test Plan: TabletSplitITest.SplitClientRequestsClean

Reviewers: sergei

Reviewed By: sergei

Subscribers: bogdan

Differential Revision: https://phabricator.dev.yugabyte.com/D15484
  • Loading branch information
ttyusupov committed Mar 21, 2022
1 parent b878c88 commit d26017d
Show file tree
Hide file tree
Showing 21 changed files with 287 additions and 129 deletions.
3 changes: 2 additions & 1 deletion src/yb/client/client-internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,8 @@ YB_CLIENT_SPECIALIZE_SIMPLE_EX(Replication, UpdateConsumerOnProducerSplit);
YBClient::Data::Data()
: leader_master_rpc_(rpcs_.InvalidHandle()),
latest_observed_hybrid_time_(YBClient::kNoHybridTime),
id_(ClientId::GenerateRandom()) {
id_(ClientId::GenerateRandom()),
log_prefix_(Format("Client $0: ", id_)) {
for(auto& cache : tserver_count_cached_) {
cache.store(0, std::memory_order_relaxed);
}
Expand Down
1 change: 1 addition & 0 deletions src/yb/client/client-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,7 @@ class YBClient::Data {
std::unique_ptr<ThreadPool> threadpool_;

const ClientId id_;
const std::string log_prefix_;

// Used to track requests that were sent to a particular tablet, so it could track different
// RPCs related to the same write operation and reject duplicates.
Expand Down
28 changes: 17 additions & 11 deletions src/yb/client/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1101,7 +1101,7 @@ Status YBClient::CreateTablegroup(const std::string& namespace_name,
table_name,
internal::GetSchema(ybschema),
internal::GetSchema(info.schema));
LOG(ERROR) << msg;
LOG_WITH_PREFIX(ERROR) << msg;
return STATUS(AlreadyPresent, msg);
}

Expand Down Expand Up @@ -1148,7 +1148,8 @@ Status YBClient::DeleteTablegroup(const std::string& namespace_id,
// A prior attempt to delete the table has succeeded, but
// appeared as a failure to the client due to, e.g., an I/O or
// network issue.
LOG(INFO) << "Parent table for tablegroup with ID " << tablegroup_id << " already deleted.";
LOG_WITH_PREFIX(INFO) << "Parent table for tablegroup with ID " << tablegroup_id
<< " already deleted.";
return Status::OK();
} else {
return StatusFromPB(resp.error().status());
Expand All @@ -1166,7 +1167,7 @@ Status YBClient::DeleteTablegroup(const std::string& namespace_id,
strings::Substitute("Failed waiting for parent table with id $0 to finish being deleted",
resp.parent_table_id()));

LOG(INFO) << "Deleted parent table for tablegroup with ID " << tablegroup_id;
LOG_WITH_PREFIX(INFO) << "Deleted parent table for tablegroup with ID " << tablegroup_id;
return Status::OK();
}

Expand Down Expand Up @@ -1358,7 +1359,7 @@ Status YBClient::GetPermissions(client::internal::PermissionsCache* permissions_
GetPermissionsResponsePB resp;
CALL_SYNC_LEADER_MASTER_RPC_EX(Dcl, req, resp, GetPermissions);

VLOG(1) << "Got permissions cache: " << resp.ShortDebugString();
VLOG_WITH_PREFIX(1) << "Got permissions cache: " << resp.ShortDebugString();

// The first request is a special case. We always replace the cache since we don't have anything.
if (!version) {
Expand Down Expand Up @@ -1953,7 +1954,7 @@ void YBClient::RequestFinished(const TabletId& tablet_id, RetryableRequestId req
if (it != tablet.running_requests.end()) {
tablet.running_requests.erase(it);
} else {
LOG(DFATAL) << "RequestFinished called for an unknown request: "
LOG_WITH_PREFIX(DFATAL) << "RequestFinished called for an unknown request: "
<< tablet_id << ", " << request_id;
}
}
Expand All @@ -1964,7 +1965,8 @@ void YBClient::MaybeUpdateMinRunningRequestId(
auto& tablet = data_->tablet_requests_[tablet_id];
if (tablet.request_id_seq == kInitializeFromMinRunning) {
tablet.request_id_seq = min_running_request_id + (1 << 24);
VLOG(1) << "Set request_id_seq for tablet " << tablet_id << " to " << tablet.request_id_seq;
VLOG_WITH_PREFIX(1) << "Set request_id_seq for tablet " << tablet_id << " to "
<< tablet.request_id_seq;
}
}

Expand Down Expand Up @@ -2018,7 +2020,7 @@ Status YBClient::ListMasters(CoarseTimePoint deadline, std::vector<std::string>*
master_uuids->clear();
for (const ServerEntryPB& master : resp.masters()) {
if (master.has_error()) {
LOG(ERROR) << "Master " << master.ShortDebugString() << " hit error "
LOG_WITH_PREFIX(ERROR) << "Master " << master.ShortDebugString() << " hit error "
<< master.error().ShortDebugString();
return StatusFromPB(master.error());
}
Expand Down Expand Up @@ -2189,11 +2191,11 @@ bool YBClient::IsMultiMaster() const {
Result<int> YBClient::NumTabletsForUserTable(TableType table_type) {
if (table_type == TableType::PGSQL_TABLE_TYPE &&
FLAGS_ysql_num_tablets > 0) {
VLOG(1) << "num_tablets = " << FLAGS_ysql_num_tablets
VLOG_WITH_PREFIX(1) << "num_tablets = " << FLAGS_ysql_num_tablets
<< ": --ysql_num_tablets is specified.";
return FLAGS_ysql_num_tablets;
} else if (FLAGS_ycql_num_tablets > 0) {
VLOG(1) << "num_tablets = " << FLAGS_ycql_num_tablets
VLOG_WITH_PREFIX(1) << "num_tablets = " << FLAGS_ycql_num_tablets
<< ": --ycql_num_tablets is specified.";
return FLAGS_ycql_num_tablets;
} else {
Expand All @@ -2202,12 +2204,12 @@ Result<int> YBClient::NumTabletsForUserTable(TableType table_type) {
int num_tablets = 0;
if (table_type == TableType::PGSQL_TABLE_TYPE) {
num_tablets = tserver_count * FLAGS_ysql_num_shards_per_tserver;
VLOG(1) << "num_tablets = " << num_tablets << ": "
VLOG_WITH_PREFIX(1) << "num_tablets = " << num_tablets << ": "
<< "calculated as tserver_count * FLAGS_ysql_num_shards_per_tserver ("
<< tserver_count << " * " << FLAGS_ysql_num_shards_per_tserver << ")";
} else {
num_tablets = tserver_count * FLAGS_yb_num_shards_per_tserver;
VLOG(1) << "num_tablets = " << num_tablets << ": "
VLOG_WITH_PREFIX(1) << "num_tablets = " << num_tablets << ": "
<< "calculated as tserver_count * FLAGS_yb_num_shards_per_tserver ("
<< tserver_count << " * " << FLAGS_yb_num_shards_per_tserver << ")";
}
Expand Down Expand Up @@ -2264,5 +2266,9 @@ Result<TableId> GetTableId(YBClient* client, const YBTableName& table_name) {
return VERIFY_RESULT(client->GetYBTableInfo(table_name)).table_id;
}

const std::string& YBClient::LogPrefix() const {
return data_->log_prefix_;
}

} // namespace client
} // namespace yb
2 changes: 2 additions & 0 deletions src/yb/client/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,8 @@ class YBClient {

void Shutdown();

const std::string& LogPrefix() const;

private:
class Data;

Expand Down
10 changes: 6 additions & 4 deletions src/yb/client/meta_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,7 @@ void LookupCallbackVisitor::operator()(
MetaCache::MetaCache(YBClient* client)
: client_(client),
master_lookup_sem_(FLAGS_max_concurrent_master_lookups),
log_prefix_(Format("MetaCache($0): ", static_cast<void*>(this))) {
log_prefix_(Format("MetaCache($0)(client_id: $1): ", static_cast<void*>(this), client_->id())) {
}

MetaCache::~MetaCache() {
Expand Down Expand Up @@ -1107,8 +1107,9 @@ void MetaCache::MaybeUpdateClientRequests(const RemoteTablet& tablet) {
auto& tablet_requests = client_->data_->tablet_requests_;
const auto requests_it = tablet_requests.find(tablet.split_parent_tablet_id());
if (requests_it == tablet_requests.end()) {
VLOG_WITH_PREFIX(2) << "Can't find request_id_seq for tablet "
<< tablet.split_parent_tablet_id();
VLOG_WITH_PREFIX(2) << "Can't find request_id_seq for parent tablet "
<< tablet.split_parent_tablet_id()
<< " (split_depth: " << tablet.split_depth() - 1 << ")";
// This can happen if client wasn't active (for example node was partitioned away) during
// sequence of splits that resulted in `tablet` creation, so we don't have info about `tablet`
// split parent.
Expand All @@ -1124,7 +1125,8 @@ void MetaCache::MaybeUpdateClientRequests(const RemoteTablet& tablet) {
return;
}
VLOG_WITH_PREFIX(2) << "Setting request_id_seq for tablet " << tablet.tablet_id()
<< " from tablet " << tablet.split_parent_tablet_id() << " to "
<< " (split_depth: " << tablet.split_depth() << ") from tablet "
<< tablet.split_parent_tablet_id() << " to "
<< requests_it->second.request_id_seq;
tablet_requests[tablet.tablet_id()].request_id_seq = requests_it->second.request_id_seq;
}
Expand Down
15 changes: 15 additions & 0 deletions src/yb/consensus/consensus_round.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,23 @@ ConsensusRound::ConsensusRound(Consensus* consensus,
DCHECK_NOTNULL(replicate_msg_.get());
}

ConsensusRound::~ConsensusRound() {
LOG_IF(DFATAL, callback_ && !callback_called)
<< this << ": callback(" << callback_ << ")->ReplicationFinished() hasn't been called for "
<< replicate_msg_->ShortDebugString();
}

void ConsensusRound::NotifyAddedToLeader(const OpId& op_id, const OpId& committed_op_id) {
callback_->AddedToLeader(op_id, committed_op_id);
}

void ConsensusRound::NotifyReplicationFinished(
const Status& status, int64_t leader_term, OpIds* applied_op_ids) {
bool expected = false;
LOG_IF(DFATAL, !callback_called.compare_exchange_strong(expected, true))
<< this
<< ": callback(\" << callback_ << \")->ReplicationFinished() has been already called for "
<< replicate_msg_->ShortDebugString();
callback_->ReplicationFinished(status, leader_term, applied_op_ids);
}

Expand Down
16 changes: 7 additions & 9 deletions src/yb/consensus/consensus_round.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,6 @@ class ConsensusRoundCallback {
virtual void ReplicationFinished(
const Status& status, int64_t leader_term, OpIds* applied_op_ids) = 0;

// Utility method for failed replication.
void ReplicationFailed(const Status& status) {
ReplicationFinished(status, OpId::kUnknownTerm, /* applied_op_ids= */ nullptr);
}

virtual ~ConsensusRoundCallback() = default;
};

Expand Down Expand Up @@ -79,14 +74,16 @@ class ConsensusRound : public RefCountedThreadSafe<ConsensusRound> {
callback_ = callback_holder_.get();
}

ConsensusRoundCallback* callback() const {
return callback_;
}
void NotifyAddedToLeader(const OpId& op_id, const OpId& committed_op_id);

// If a continuation was set, notifies it that the round has been replicated.
void NotifyReplicationFinished(
const Status& status, int64_t leader_term, OpIds* applied_op_ids);

void NotifyReplicationFailed(const Status& status) {
NotifyReplicationFinished(status, OpId::kUnknownTerm, /* applied_op_ids= */ nullptr);
}

// Binds this round such that it may not be eventually executed in any term
// other than 'term'.
// See CheckBoundTerm().
Expand All @@ -110,7 +107,7 @@ class ConsensusRound : public RefCountedThreadSafe<ConsensusRound> {
friend class RaftConsensusQuorumTest;
friend class RefCountedThreadSafe<ConsensusRound>;

~ConsensusRound() {}
~ConsensusRound();

Consensus* const consensus_;
// This round's replicate message.
Expand All @@ -125,6 +122,7 @@ class ConsensusRound : public RefCountedThreadSafe<ConsensusRound> {

ConsensusRoundCallback* callback_ = nullptr;
std::unique_ptr<ConsensusRoundCallback> callback_holder_;
std::atomic<bool> callback_called{false};
};

} // namespace consensus
Expand Down
Loading

0 comments on commit d26017d

Please sign in to comment.