From 220bd5e1f6bd0d6b708d96453fb1376a11cf442e Mon Sep 17 00:00:00 2001 From: Nicolas Spiegelberg Date: Tue, 3 Nov 2020 14:07:53 -0800 Subject: [PATCH] [#3522] Fix Threading Issues with CDC Consumer Writes Summary: The TwoDCOutputClient was accessing its TwoDCWriteImplementation variable in multiple threads: the CDC Poller thread and the Invoker callback thread. Added it to the class-wide variable lock, added proper safety annotations, and refactored to minimize lock usage on the main data path. Test Plan: BatchSize/TwoDCTest.ApplyOperationsRandomFailures/1 -n 10 --tp 1 Reviewers: rahuldesirazu Reviewed By: rahuldesirazu Subscribers: ybase, bogdan Differential Revision: https://phabricator.dev.yugabyte.com/D9833 --- ent/src/yb/integration-tests/twodc-test.cc | 7 +- ent/src/yb/tserver/twodc_output_client.cc | 120 ++++++++++++--------- 2 files changed, 76 insertions(+), 51 deletions(-) diff --git a/ent/src/yb/integration-tests/twodc-test.cc b/ent/src/yb/integration-tests/twodc-test.cc index 8e99f8c77538..74b59dca62f7 100644 --- a/ent/src/yb/integration-tests/twodc-test.cc +++ b/ent/src/yb/integration-tests/twodc-test.cc @@ -1425,7 +1425,8 @@ TEST_P(TwoDCTest, ApplyOperationsRandomFailures) { SetAtomicFlag(0.25, &FLAGS_TEST_respond_write_failed_probability); uint32_t replication_factor = NonTsanVsTsan(3, 1); - auto tables = ASSERT_RESULT(SetUpWithParams({1}, {1}, replication_factor)); + // Use unequal table count so we have M:N mapping and output to multiple tablets. + auto tables = ASSERT_RESULT(SetUpWithParams({3}, {5}, replication_factor)); std::vector> producer_tables; // tables contains both producer and consumer universe tables (alternately). @@ -1443,8 +1444,8 @@ TEST_P(TwoDCTest, ApplyOperationsRandomFailures) { consumer_cluster(), producer_cluster(), producer_client(), kUniverseId, consumer_tables)); // After creating the cluster, make sure all producer tablets are being polled for. - ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 1)); - ASSERT_OK(CorrectlyPollingAllTablets(producer_cluster(), 1)); + ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 5)); + ASSERT_OK(CorrectlyPollingAllTablets(producer_cluster(), 3)); // Write 1000 entries to each cluster. std::thread t1([&]() { WriteWorkload(0, 1000, producer_client(), tables[0]->name()); }); diff --git a/ent/src/yb/tserver/twodc_output_client.cc b/ent/src/yb/tserver/twodc_output_client.cc index 06e9cd24f006..41519fbb7883 100644 --- a/ent/src/yb/tserver/twodc_output_client.cc +++ b/ent/src/yb/tserver/twodc_output_client.cc @@ -75,16 +75,20 @@ class TwoDCOutputClient : public cdc::CDCOutputClient { void TabletLookupCallbackFastTrack(const size_t record_idx); - void WriteIfAllRecordsProcessed(); + // Processes the Record and sends the CDCWrite for it. + void ProcessRecord(const std::string& tablet_id, const cdc::CDCRecordPB& record); - void SendNextCDCWriteToTablet(); + void SendNextCDCWriteToTablet(std::unique_ptr write_request); // Increment processed record count. // Returns true if all records are processed, false if there are still some pending records. - bool IncProcessedRecordCount(); + bool IncProcessedRecordCount() REQUIRES(lock_); - void HandleResponse(); - void HandleError(const Status& s, bool done); + cdc::OutputClientResponse PrepareResponse() REQUIRES(lock_); + void SendResponse(const cdc::OutputClientResponse& resp) EXCLUDES(lock_); + + void HandleResponse() EXCLUDES(lock_); + void HandleError(const Status& s, bool done) EXCLUDES(lock_); bool UseLocalTserver(); @@ -111,7 +115,7 @@ class TwoDCOutputClient : public cdc::CDCOutputClient { // This will cache the response to an ApplyChanges() request. cdc::GetChangesResponsePB twodc_resp_copy_; - std::unique_ptr write_strategy_; + std::unique_ptr write_strategy_ GUARDED_BY(lock_); }; Status TwoDCOutputClient::ApplyChanges(const cdc::GetChangesResponsePB* poller_resp) { @@ -157,6 +161,7 @@ Status TwoDCOutputClient::ApplyChanges(const cdc::GetChangesResponsePB* poller_r if (poller_resp->records(i).key_size() == 0) { // Transaction status record, ignore for now. // Support for handling transactions will be added in future. + std::lock_guard l(lock_); IncProcessedRecordCount(); } else { twodc_resp_copy_.add_records()->CopyFrom(poller_resp->records(i)); @@ -190,25 +195,26 @@ bool TwoDCOutputClient::UseLocalTserver() { } -void TwoDCOutputClient::WriteIfAllRecordsProcessed() { - bool done = IncProcessedRecordCount(); +void TwoDCOutputClient::ProcessRecord(const std::string& tablet_id, + const cdc::CDCRecordPB& record) { + bool done = false; + std::unique_ptr write_request; + { + std::lock_guard l(lock_); + write_strategy_->ProcessRecord(tablet_id, record); + done = IncProcessedRecordCount(); + if (done && error_status_.ok()) { + write_request = write_strategy_->GetNextWriteRequest(); + } + } if (done) { // Found tablets for all records, now we should write the records. - // But first, check if there were any errors during tablet lookup for any record. - bool has_error = false; - { - std::lock_guard l(lock_); - if (!error_status_.ok()) { - has_error = true; - } - } - - if (has_error) { - // Return error, if any, without applying records. - HandleResponse(); - } else { + if (write_request) { // Apply the writes on consumer. - SendNextCDCWriteToTablet(); + SendNextCDCWriteToTablet(std::move(write_request)); + } else { + // No write_request on error. Respond, without applying records. + HandleResponse(); } } } @@ -217,26 +223,23 @@ void TwoDCOutputClient::TabletLookupCallback( const size_t record_idx, const Result& tablet) { if (!tablet.ok()) { - bool done = IncProcessedRecordCount(); + bool done = false; + { + std::lock_guard l(lock_); + done = IncProcessedRecordCount(); + } HandleError(tablet.status(), done); return; } - - write_strategy_->ProcessRecord(tablet->get()->tablet_id(), twodc_resp_copy_.records(record_idx)); - - WriteIfAllRecordsProcessed(); + ProcessRecord(tablet->get()->tablet_id(), twodc_resp_copy_.records(record_idx)); } void TwoDCOutputClient::TabletLookupCallbackFastTrack(const size_t record_idx) { - write_strategy_->ProcessRecord(consumer_tablet_info_.tablet_id, - twodc_resp_copy_.records(record_idx)); - - WriteIfAllRecordsProcessed(); + ProcessRecord(consumer_tablet_info_.tablet_id, twodc_resp_copy_.records(record_idx)); } -void TwoDCOutputClient::SendNextCDCWriteToTablet() { - auto write_request = write_strategy_->GetNextWriteRequest(); - +void TwoDCOutputClient::SendNextCDCWriteToTablet(std::unique_ptr write_request) { + // TODO: This should be parallelized for better performance with M:N setups. auto deadline = CoarseMonoClock::Now() + MonoDelta::FromMilliseconds(FLAGS_cdc_write_rpc_timeout_ms); write_handle_ = rpcs_->Prepare(); @@ -256,6 +259,7 @@ void TwoDCOutputClient::SendNextCDCWriteToTablet() { } void TwoDCOutputClient::WriteCDCRecordDone(const Status& status, const WriteResponsePB& response) { + // Handle response. auto retained = rpcs_->Unregister(&write_handle_); if (!status.ok()) { HandleError(status, true /* done */); @@ -264,14 +268,25 @@ void TwoDCOutputClient::WriteCDCRecordDone(const Status& status, const WriteResp HandleError(StatusFromPB(response.error().status()), true /* done */); return; } - cdc_consumer_->IncrementNumSuccessfulWriteRpcs(); - if (!write_strategy_->HasMoreWrites()) { - // Last record, return response to caller. - HandleResponse(); + // See if we need to handle any more writes. + std::unique_ptr write_request; + cdc::OutputClientResponse out_response; + { + std::lock_guard l(lock_); + if (write_strategy_->HasMoreWrites()) { + write_request = write_strategy_->GetNextWriteRequest(); + } else { + out_response = PrepareResponse(); + } + } + + if (write_request) { + SendNextCDCWriteToTablet(std::move(write_request)); } else { - SendNextCDCWriteToTablet(); + // Last record, return response to caller. + SendResponse(out_response); } } @@ -287,23 +302,32 @@ void TwoDCOutputClient::HandleError(const Status& s, bool done) { } } +cdc::OutputClientResponse TwoDCOutputClient::PrepareResponse() { + cdc::OutputClientResponse response; + response.status = error_status_; + if (response.status.ok()) { + response.last_applied_op_id = op_id_; + response.processed_record_count = processed_record_count_; + } + op_id_ = consensus::MinimumOpId(); + processed_record_count_ = 0; + return response; +} + +void TwoDCOutputClient::SendResponse(const cdc::OutputClientResponse& resp) { + apply_changes_clbk_(resp); +} + void TwoDCOutputClient::HandleResponse() { cdc::OutputClientResponse response; { std::lock_guard l(lock_); - response.status = error_status_; - if (response.status.ok()) { - response.last_applied_op_id = op_id_; - response.processed_record_count = processed_record_count_; - } - op_id_ = consensus::MinimumOpId(); - processed_record_count_ = 0; + response = PrepareResponse(); } - apply_changes_clbk_(response); + SendResponse(response); } bool TwoDCOutputClient::IncProcessedRecordCount() { - std::lock_guard l(lock_); processed_record_count_++; if (processed_record_count_ == record_count_) { done_processing_ = true;