Skip to content

Commit

Permalink
[yugabyte#6876] 2dc: Fix data race between TwoDCOutputClient::WriteCD…
Browse files Browse the repository at this point in the history
…CRecordDone and TwoDCOutputClient destructor

Summary:
From https://detective-gcp.dev.yugabyte.com/job/github-yugabyte-db-phabricator%2F71843%2Fartifact%2Fbuild%2Ftsan-clang-dynamic-ninja%2Fyb-test-logs%2Ftests-integration-tests__twodc-test%2FTwoDCTestParams__TwoDCTest_TestInsertDeleteWorkloadWithRestart__0.log?class=TwoDCTestParams%2FTwoDCTest&max_lines=3000&name=TestInsertDeleteWorkloadWithRestart%2F0&start_line=3001 :
```
WARNING: ThreadSanitizer: data race (pid=3176)
3018       Read of size 8 at 0x7b48004ccfd8 by main thread (mutexes: write M46578954689579608, read M46860429666290280, write M544226541712483464):
3019         #0 boost::container::operator!=(boost::container::stable_vector_iterator<std::__1::shared_ptr<yb::rpc::RpcCommand>*, false> const&, boost::container::stable_vector_iterator<std::__1::shared_ptr<yb::rpc::RpcCommand>*, false> const&) /opt/yb-build/thirdparty/yugabyte-db-thirdparty-v20201111062958-7047513803-centos7-linuxbrew/installed/tsan/include/boost/container/stable_vector.hpp:404:16 (libtserver.so+0x28694d)
3020         yugabyte#1 yb::rpc::Rpcs::Abort(std::initializer_list<boost::container::stable_vector_iterator<std::__1::shared_ptr<yb::rpc::RpcCommand>*, false>*>) /nfusr/centos-gcp-cloud/jenkins-worker-939/jenkins/jenkins-github-yugabyte-db-phabricator-71843/build/tsan-clang-dynamic-ninja/../../src/yb/rpc/rpc.cc:401:22 (libyrpc.so+0x1cf44c)
3021         yugabyte#2 yb::tserver::enterprise::TwoDCOutputClient::~TwoDCOutputClient() /nfusr/centos-gcp-cloud/jenkins-worker-939/jenkins/jenkins-github-yugabyte-db-phabricator-71843/build/tsan-clang-dynamic-ninja/../../ent/src/yb/tserver/twodc_output_client.cc:65:12 (libtserver.so+0x286e04)
3022         yugabyte#3 yb::tserver::enterprise::TwoDCOutputClient::~TwoDCOutputClient() /nfusr/centos-gcp-cloud/jenkins-worker-939/jenkins/jenkins-github-yugabyte-db-phabricator-71843/build/tsan-clang-dynamic-ninja/../../ent/src/yb/tserver/twodc_output_client.cc:64:24 (libtserver.so+0x286f79)
3023         yugabyte#4 std::__1::default_delete<yb::cdc::CDCOutputClient>::operator()(yb::cdc::CDCOutputClient*) const /opt/yb-build/thirdparty/yugabyte-db-thirdparty-v20201111062958-7047513803-centos7-linuxbrew/installed/tsan/libcxx/include/c++/v1/memory:2321:5 (libtserver.so+0x2890bf)
```
```
   Previous write of size 8 at 0x7b48004ccfd8 by thread T199 (mutexes: write M214619532964243632):
3056         #0 yb::rpc::Rpcs::Unregister(boost::container::stable_vector_iterator<std::__1::shared_ptr<yb::rpc::RpcCommand>*, false>*) /nfusr/centos-gcp-cloud/jenkins-worker-939/jenkins/jenkins-github-yugabyte-db-phabricator-71843/build/tsan-clang-dynamic-ninja/../../src/yb/rpc/rpc.cc:365:11 (libyrpc.so+0x1cf069)
3057         yugabyte#1 yb::tserver::enterprise::TwoDCOutputClient::WriteCDCRecordDone(yb::Status const&, yb::tserver::WriteResponsePB const&) /nfusr/centos-gcp-cloud/jenkins-worker-939/jenkins/jenkins-github-yugabyte-db-phabricator-71843/build/tsan-clang-dynamic-ninja/../../ent/src/yb/tserver/twodc_output_client.cc:268:26 (libtserver.so+0x285e4d)
3058         yugabyte#2 decltype(*(std::__1::forward<yb::tserver::enterprise::TwoDCOutputClient*&>(fp0)).*fp(std::__1::forward<yb::Status const&>(fp1), std::__1::forward<yb::tserver::WriteResponsePB const&>(fp1))) std::__1::__invoke<void (yb::tserver::enterprise::TwoDCOutputClient::*&)(yb::Status const&, yb::tserver::WriteResponsePB const&), yb::tserver::enterprise::TwoDCOutputClient*&, yb::Status const&, yb::tserver::WriteResponsePB const&, void>(void (yb::tserver::enterprise::TwoDCOutputClient::*&)(yb::Status const&, yb::tserver::WriteResponsePB const&), yb::tserver::enterprise::TwoDCOutputClient*&, yb::Status const&, yb::tserver::WriteResponsePB const&) /opt/yb-build/thirdparty/yugabyte-db-thirdparty-v20201111062958-7047513803-centos7-linuxbrew/installed/tsan/libcxx/include/c++/v1/type_traits:4286:1 (libtserver.so+0x288684)
```
The data race is for write_handle_.

This revision adds locking when accessing the write_handle_

Test Plan: Ran test suite via Jenkins

Reviewers: nicolas, rahuldesirazu

Reviewed By: rahuldesirazu

Subscribers: ybase, bogdan

Differential Revision: https://phabricator.dev.yugabyte.com/D10354
  • Loading branch information
tedyu authored and Alex Ball committed Mar 9, 2021
1 parent d1faa95 commit 9214950
Showing 1 changed file with 9 additions and 3 deletions.
12 changes: 9 additions & 3 deletions ent/src/yb/tserver/twodc_output_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class TwoDCOutputClient : public cdc::CDCOutputClient {
use_local_tserver_(use_local_tserver) {}

~TwoDCOutputClient() {
std::lock_guard<decltype(lock_)> l(lock_);
rpcs_->Abort({&write_handle_});
}

Expand Down Expand Up @@ -103,14 +104,14 @@ class TwoDCOutputClient : public cdc::CDCOutputClient {
cdc::ConsumerTabletInfo consumer_tablet_info_;
std::shared_ptr<CDCClient> local_client_;
rpc::Rpcs* rpcs_;
rpc::Rpcs::Handle write_handle_;
rpc::Rpcs::Handle write_handle_ GUARDED_BY(lock_);
std::function<void(const cdc::OutputClientResponse& response)> apply_changes_clbk_;

bool use_local_tserver_;

std::shared_ptr<client::YBTable> table_;

// Used to protect error_status_, op_id_, done_processing_ and record counts.
// Used to protect error_status_, op_id_, done_processing_, write_handle_ and record counts.
mutable rw_spinlock lock_;
Status error_status_ GUARDED_BY(lock_);
OpIdPB op_id_ GUARDED_BY(lock_) = consensus::MinimumOpId();
Expand Down Expand Up @@ -277,6 +278,7 @@ void TwoDCOutputClient::SendNextCDCWriteToTablet(std::unique_ptr<WriteRequestPB>
// TODO: This should be parallelized for better performance with M:N setups.
auto deadline = CoarseMonoClock::Now() +
MonoDelta::FromMilliseconds(FLAGS_cdc_write_rpc_timeout_ms);
std::lock_guard<decltype(lock_)> l(lock_);
write_handle_ = rpcs_->Prepare();
if (write_handle_ != rpcs_->InvalidHandle()) {
// Send in nullptr for RemoteTablet since cdc rpc now gets the tablet_id from the write request.
Expand All @@ -295,7 +297,11 @@ void TwoDCOutputClient::SendNextCDCWriteToTablet(std::unique_ptr<WriteRequestPB>

void TwoDCOutputClient::WriteCDCRecordDone(const Status& status, const WriteResponsePB& response) {
// Handle response.
auto retained = rpcs_->Unregister(&write_handle_);
rpc::RpcCommandPtr retained = nullptr;
{
std::lock_guard<decltype(lock_)> l(lock_);
retained = rpcs_->Unregister(&write_handle_);
}
if (!status.ok()) {
HandleError(status, true /* done */);
return;
Expand Down

0 comments on commit 9214950

Please sign in to comment.