Skip to content

Commit

Permalink
[#3522] Fix Threading Issues with CDC Consumer Writes
Browse files Browse the repository at this point in the history
Summary: The TwoDCOutputClient was accessing its TwoDCWriteImplementation variable in multiple threads: the CDC Poller thread and the Invoker callback thread.  Added it to the class-wide variable lock, added proper safety annotations, and refactored to minimize lock usage on the main data path.

Test Plan: BatchSize/TwoDCTest.ApplyOperationsRandomFailures/1 -n 10  --tp 1

Reviewers: rahuldesirazu

Reviewed By: rahuldesirazu

Subscribers: ybase, bogdan

Differential Revision: https://phabricator.dev.yugabyte.com/D9833
  • Loading branch information
nspiegelberg committed Nov 5, 2020
1 parent 80bf522 commit 220bd5e
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 51 deletions.
7 changes: 4 additions & 3 deletions ent/src/yb/integration-tests/twodc-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1425,7 +1425,8 @@ TEST_P(TwoDCTest, ApplyOperationsRandomFailures) {
SetAtomicFlag(0.25, &FLAGS_TEST_respond_write_failed_probability);

uint32_t replication_factor = NonTsanVsTsan(3, 1);
auto tables = ASSERT_RESULT(SetUpWithParams({1}, {1}, replication_factor));
// Use unequal table count so we have M:N mapping and output to multiple tablets.
auto tables = ASSERT_RESULT(SetUpWithParams({3}, {5}, replication_factor));

std::vector<std::shared_ptr<client::YBTable>> producer_tables;
// tables contains both producer and consumer universe tables (alternately).
Expand All @@ -1443,8 +1444,8 @@ TEST_P(TwoDCTest, ApplyOperationsRandomFailures) {
consumer_cluster(), producer_cluster(), producer_client(), kUniverseId, consumer_tables));

// After creating the cluster, make sure all producer tablets are being polled for.
ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 1));
ASSERT_OK(CorrectlyPollingAllTablets(producer_cluster(), 1));
ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 5));
ASSERT_OK(CorrectlyPollingAllTablets(producer_cluster(), 3));

// Write 1000 entries to each cluster.
std::thread t1([&]() { WriteWorkload(0, 1000, producer_client(), tables[0]->name()); });
Expand Down
120 changes: 72 additions & 48 deletions ent/src/yb/tserver/twodc_output_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,20 @@ class TwoDCOutputClient : public cdc::CDCOutputClient {

void TabletLookupCallbackFastTrack(const size_t record_idx);

void WriteIfAllRecordsProcessed();
// Processes the Record and sends the CDCWrite for it.
void ProcessRecord(const std::string& tablet_id, const cdc::CDCRecordPB& record);

void SendNextCDCWriteToTablet();
void SendNextCDCWriteToTablet(std::unique_ptr<WriteRequestPB> write_request);

// Increment processed record count.
// Returns true if all records are processed, false if there are still some pending records.
bool IncProcessedRecordCount();
bool IncProcessedRecordCount() REQUIRES(lock_);

void HandleResponse();
void HandleError(const Status& s, bool done);
cdc::OutputClientResponse PrepareResponse() REQUIRES(lock_);
void SendResponse(const cdc::OutputClientResponse& resp) EXCLUDES(lock_);

void HandleResponse() EXCLUDES(lock_);
void HandleError(const Status& s, bool done) EXCLUDES(lock_);

bool UseLocalTserver();

Expand All @@ -111,7 +115,7 @@ class TwoDCOutputClient : public cdc::CDCOutputClient {
// This will cache the response to an ApplyChanges() request.
cdc::GetChangesResponsePB twodc_resp_copy_;

std::unique_ptr<TwoDCWriteInterface> write_strategy_;
std::unique_ptr<TwoDCWriteInterface> write_strategy_ GUARDED_BY(lock_);
};

Status TwoDCOutputClient::ApplyChanges(const cdc::GetChangesResponsePB* poller_resp) {
Expand Down Expand Up @@ -157,6 +161,7 @@ Status TwoDCOutputClient::ApplyChanges(const cdc::GetChangesResponsePB* poller_r
if (poller_resp->records(i).key_size() == 0) {
// Transaction status record, ignore for now.
// Support for handling transactions will be added in future.
std::lock_guard<decltype(lock_)> l(lock_);
IncProcessedRecordCount();
} else {
twodc_resp_copy_.add_records()->CopyFrom(poller_resp->records(i));
Expand Down Expand Up @@ -190,25 +195,26 @@ bool TwoDCOutputClient::UseLocalTserver() {
}


void TwoDCOutputClient::WriteIfAllRecordsProcessed() {
bool done = IncProcessedRecordCount();
void TwoDCOutputClient::ProcessRecord(const std::string& tablet_id,
const cdc::CDCRecordPB& record) {
bool done = false;
std::unique_ptr<WriteRequestPB> write_request;
{
std::lock_guard<decltype(lock_)> l(lock_);
write_strategy_->ProcessRecord(tablet_id, record);
done = IncProcessedRecordCount();
if (done && error_status_.ok()) {
write_request = write_strategy_->GetNextWriteRequest();
}
}
if (done) {
// Found tablets for all records, now we should write the records.
// But first, check if there were any errors during tablet lookup for any record.
bool has_error = false;
{
std::lock_guard<decltype(lock_)> l(lock_);
if (!error_status_.ok()) {
has_error = true;
}
}

if (has_error) {
// Return error, if any, without applying records.
HandleResponse();
} else {
if (write_request) {
// Apply the writes on consumer.
SendNextCDCWriteToTablet();
SendNextCDCWriteToTablet(std::move(write_request));
} else {
// No write_request on error. Respond, without applying records.
HandleResponse();
}
}
}
Expand All @@ -217,26 +223,23 @@ void TwoDCOutputClient::TabletLookupCallback(
const size_t record_idx,
const Result<client::internal::RemoteTabletPtr>& tablet) {
if (!tablet.ok()) {
bool done = IncProcessedRecordCount();
bool done = false;
{
std::lock_guard<decltype(lock_)> l(lock_);
done = IncProcessedRecordCount();
}
HandleError(tablet.status(), done);
return;
}

write_strategy_->ProcessRecord(tablet->get()->tablet_id(), twodc_resp_copy_.records(record_idx));

WriteIfAllRecordsProcessed();
ProcessRecord(tablet->get()->tablet_id(), twodc_resp_copy_.records(record_idx));
}

void TwoDCOutputClient::TabletLookupCallbackFastTrack(const size_t record_idx) {
write_strategy_->ProcessRecord(consumer_tablet_info_.tablet_id,
twodc_resp_copy_.records(record_idx));

WriteIfAllRecordsProcessed();
ProcessRecord(consumer_tablet_info_.tablet_id, twodc_resp_copy_.records(record_idx));
}

void TwoDCOutputClient::SendNextCDCWriteToTablet() {
auto write_request = write_strategy_->GetNextWriteRequest();

void TwoDCOutputClient::SendNextCDCWriteToTablet(std::unique_ptr<WriteRequestPB> write_request) {
// TODO: This should be parallelized for better performance with M:N setups.
auto deadline = CoarseMonoClock::Now() +
MonoDelta::FromMilliseconds(FLAGS_cdc_write_rpc_timeout_ms);
write_handle_ = rpcs_->Prepare();
Expand All @@ -256,6 +259,7 @@ void TwoDCOutputClient::SendNextCDCWriteToTablet() {
}

void TwoDCOutputClient::WriteCDCRecordDone(const Status& status, const WriteResponsePB& response) {
// Handle response.
auto retained = rpcs_->Unregister(&write_handle_);
if (!status.ok()) {
HandleError(status, true /* done */);
Expand All @@ -264,14 +268,25 @@ void TwoDCOutputClient::WriteCDCRecordDone(const Status& status, const WriteResp
HandleError(StatusFromPB(response.error().status()), true /* done */);
return;
}

cdc_consumer_->IncrementNumSuccessfulWriteRpcs();

if (!write_strategy_->HasMoreWrites()) {
// Last record, return response to caller.
HandleResponse();
// See if we need to handle any more writes.
std::unique_ptr <WriteRequestPB> write_request;
cdc::OutputClientResponse out_response;
{
std::lock_guard<decltype(lock_)> l(lock_);
if (write_strategy_->HasMoreWrites()) {
write_request = write_strategy_->GetNextWriteRequest();
} else {
out_response = PrepareResponse();
}
}

if (write_request) {
SendNextCDCWriteToTablet(std::move(write_request));
} else {
SendNextCDCWriteToTablet();
// Last record, return response to caller.
SendResponse(out_response);
}
}

Expand All @@ -287,23 +302,32 @@ void TwoDCOutputClient::HandleError(const Status& s, bool done) {
}
}

cdc::OutputClientResponse TwoDCOutputClient::PrepareResponse() {
cdc::OutputClientResponse response;
response.status = error_status_;
if (response.status.ok()) {
response.last_applied_op_id = op_id_;
response.processed_record_count = processed_record_count_;
}
op_id_ = consensus::MinimumOpId();
processed_record_count_ = 0;
return response;
}

void TwoDCOutputClient::SendResponse(const cdc::OutputClientResponse& resp) {
apply_changes_clbk_(resp);
}

void TwoDCOutputClient::HandleResponse() {
cdc::OutputClientResponse response;
{
std::lock_guard<decltype(lock_)> l(lock_);
response.status = error_status_;
if (response.status.ok()) {
response.last_applied_op_id = op_id_;
response.processed_record_count = processed_record_count_;
}
op_id_ = consensus::MinimumOpId();
processed_record_count_ = 0;
response = PrepareResponse();
}
apply_changes_clbk_(response);
SendResponse(response);
}

bool TwoDCOutputClient::IncProcessedRecordCount() {
std::lock_guard<rw_spinlock> l(lock_);
processed_record_count_++;
if (processed_record_count_ == record_count_) {
done_processing_ = true;
Expand Down

0 comments on commit 220bd5e

Please sign in to comment.