From 136e713d0d8204fd3e1076329c614c80ed13d1e6 Mon Sep 17 00:00:00 2001 From: Adithya Bharadwaj Date: Wed, 30 Nov 2022 20:55:35 +0530 Subject: [PATCH] [#15136] CDCSDK: Fix for crash when running cdcsdk with before image Summary: We observed a crash while running TPCC workload with CDCSDK enabled. The stack trace is: ``` (gdb) bt #0 0x0000557f25b11910 in yb::DatumMessagePB::MergeFrom(yb::DatumMessagePB const&) () #1 0x0000557f258a41ef in yb::cdc::PopulateBeforeImage(std::__1::shared_ptr const&, yb::ReadHybridTime const&, yb::cdc::RowMessage*, std::__1::unordered_map, std::__1::allocator >, std::__1::hash, std::__1::equal_to, std::__1::allocator, std::__1::allocator > > > > const&, std::__1::unordered_map >, std::__1::hash, std::__1::equal_to, std::__1::allocator > > > > const&, yb::docdb::SubDocKey const&, yb::Schema const&, unsigned int) () #2 0x0000557f258a7304 in yb::cdc::PopulateCDCSDKIntentRecord(yb::OpId const&, yb::StronglyTypedUuid const&, std::__1::vector > const&, yb::cdc::StreamMetadata const&, std::__1::shared_ptr const&, std::__1::unordered_map, std::__1::allocator >, std::__1::hash, std::__1::equal_to, std::__1::allocator, std::__1::allocator > > > > const&, std::__1::unordered_map >, std::__1::hash, std::__1::equal_to, std::__1::allocator > > > > const&, yb::cdc::GetChangesResponsePB*, yb::ScopedTrackedConsumption*, unsigned int*, std::__1::basic_string, std::__1::allocator >*, yb::Schema*, unsigned int, unsigned long const&) () #3 0x0000557f258aaa27 in yb::cdc::ProcessIntents(yb::OpId const&, yb::StronglyTypedUuid const&, yb::cdc::StreamMetadata const&, std::__1::unordered_map, std::__1::allocator >, std::__1::hash, std::__1::equal_to, std::__1::allocator, std::__1::allocator > > > > const&, std::__1::unordered_map >, std::__1::hash, std::__1::equal_to, std::__1::allocator > > > > const&, yb::cdc::GetChangesResponsePB*, yb::ScopedTrackedConsumption*, yb::cdc::CDCSDKCheckpointPB*, std::__1::shared_ptr const&, std::__1::vector >*, yb::docdb::ApplyTransactionState*, yb::client::YBClient*, std::__1::shared_ptr*, unsigned int*, unsigned long const&) () #4 0x0000557f258b00c1 in yb::cdc::GetChangesForCDCSDK(std::__1::basic_string, std::__1::allocator > const&, std::__1::basic_string, std::__1::allocator > const&, yb::cdc::CDCSDKCheckpointPB const&, yb::cdc::StreamMetadata const&, std::__1::shared_ptr const&, std::__1::shared_ptr const&, std::__1::unordered_map, std::__1::allocator >, std::__1::hash, std::__1::equal_to, std::__1::allocator, std::__1::allocator > > > > const&, std::__1::unordered_map >, std::__1::hash, std::__1::equal_to, std::__1::allocator > > > > const&, yb::client::YBClient*, yb::consensus::ReplicateMsgsHolder*, yb::cdc::GetChangesResponsePB*, std::__1::basic_string, std::__1::allocator >*, std::__1::shared_ptr*, unsigned int*, yb::OpId*, long*, std::__1::chrono::time_point > >) () #5 0x0000557f2586c448 in yb::cdc::CDCServiceImpl::GetChanges(yb::cdc::GetChangesRequestPB const*, yb::cdc::GetChangesResponsePB*, yb::rpc::RpcContext) () #6 0x0000557f25908246 in std::__1::__function::__func const&)::$_3, std::__1::allocator const&)::$_3>, void (std::__1::shared_ptr)>::operator()(std::__1::shared_ptr&&) () #7 0x0000557f2590a6af in yb::cdc::CDCServiceIf::Handle(std::__1::shared_ptr) () #8 0x0000557f26227a1e in yb::rpc::ServicePoolImpl::Handle(std::__1::shared_ptr) () #9 0x0000557f2616db2f in yb::rpc::InboundCall::InboundCallTask::Run() () #10 0x0000557f26236583 in yb::rpc::(anonymous namespace)::Worker::Execute() () #11 0x0000557f268698cf in yb::Thread::SuperviseThread(void*) () #12 0x00007fa6fce89694 in ?? () #13 0x0000000000000000 in ?? () ``` The problem is in the method: PopulateBeforeImage When we drop a column, the the row won't have data for the dropped column, and hence will not be added to the "old_tuple" member of RowMessage. This will mean the size of "old_tuple" does not match the number of columns in the schema. Which means this line: "row_message->old_tuple(static_cast(index))" could lead to an out of bounds exception. Instead, now we are keeping track of the found columns in the row. Test Plan: Running existing ctests Reviewers: srangavajjula, sdash, skumar Reviewed By: sdash, skumar Differential Revision: https://phabricator.dev.yugabyte.com/D21338 --- ent/src/yb/cdc/cdcsdk_producer.cc | 17 ++++---- .../yb/integration-tests/cdcsdk_ysql-test.cc | 42 +++++++++++++++++++ 2 files changed, 52 insertions(+), 7 deletions(-) diff --git a/ent/src/yb/cdc/cdcsdk_producer.cc b/ent/src/yb/cdc/cdcsdk_producer.cc index cc18825a82bd..ea38311df195 100644 --- a/ent/src/yb/cdc/cdcsdk_producer.cc +++ b/ent/src/yb/cdc/cdcsdk_producer.cc @@ -227,6 +227,7 @@ Status PopulateBeforeImage( std::vector columns(schema.columns()); + size_t found_columns = 0; if (row.ColumnCount() == columns.size()) { for (size_t index = 0; index < row.ColumnCount(); ++index) { bool column_updated = false; @@ -236,25 +237,27 @@ Status PopulateBeforeImage( tablet_peer, columns[index], PrimitiveValue(), enum_oid_label_map, composite_atts_map, row_message->add_old_tuple(), &ql_value.value())); if (row_message->op() == RowMessage_Op_UPDATE) { + const auto& old_tuple_column_name = + row_message->old_tuple(static_cast(found_columns)).column_name(); for (int new_tuple_index = 0; new_tuple_index < row_message->new_tuple_size(); ++new_tuple_index) { if (row_message->new_tuple(static_cast(new_tuple_index)).column_name() == - columns[index].name()) { + old_tuple_column_name) { column_updated = true; break; } } if (!column_updated) { - *(row_message->add_new_tuple()) = row_message->old_tuple(static_cast(index)); + auto new_tuple_pb = row_message->mutable_new_tuple()->Add(); + new_tuple_pb->CopyFrom(row_message->old_tuple(static_cast(found_columns))); } } + found_columns += 1; } } - } else { - if (row_message->op() != RowMessage_Op_DELETE) { - for (size_t index = 0; index < schema.num_columns(); ++index) { - row_message->add_old_tuple(); - } + } else if (row_message->op() != RowMessage_Op_DELETE) { + for (size_t index = 0; index < schema.num_columns(); ++index) { + row_message->add_old_tuple(); } } return Status::OK(); diff --git a/ent/src/yb/integration-tests/cdcsdk_ysql-test.cc b/ent/src/yb/integration-tests/cdcsdk_ysql-test.cc index 5745bbd0c501..e26250e7d6a4 100644 --- a/ent/src/yb/integration-tests/cdcsdk_ysql-test.cc +++ b/ent/src/yb/integration-tests/cdcsdk_ysql-test.cc @@ -9778,6 +9778,48 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestExpiredStreamWithCompaction)) ASSERT_LE(count_compaction_after_expired, count_after_compaction); } +TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestColumnDropBeforeImage)) { + ANNOTATE_UNPROTECTED_WRITE(FLAGS_timestamp_history_retention_interval_sec) = 0; + ASSERT_OK(SetUpWithParams(3, 1, false)); + auto table = ASSERT_RESULT(CreateTable(&test_cluster_, kNamespaceName, kTableName)); + google::protobuf::RepeatedPtrField tablets; + ASSERT_OK(test_client()->GetTablets(table, 0, &tablets, nullptr)); + ASSERT_EQ(tablets.size(), 1); + CDCStreamId stream_id = + ASSERT_RESULT(CreateDBStream(CDCCheckpointType::IMPLICIT, CDCRecordType::ALL)); + auto set_resp = ASSERT_RESULT(SetCDCCheckpoint(stream_id, tablets)); + ASSERT_FALSE(set_resp.has_error()); + + auto conn = ASSERT_RESULT(test_cluster_.ConnectToDB(kNamespaceName)); + + ASSERT_OK(conn.Execute("INSERT INTO test_table VALUES (1, 2)")); + ASSERT_OK(conn.Execute("UPDATE test_table SET value_1 = 3 WHERE key = 1")); + ASSERT_OK(conn.Execute("ALTER TABLE test_table ADD COLUMN value_2 INT")); + ASSERT_OK(conn.Execute("UPDATE test_table SET value_2 = 4 WHERE key = 1")); + ASSERT_OK(conn.Execute("ALTER TABLE test_table DROP COLUMN value_2")); + + // The count array stores counts of DDL, INSERT, UPDATE, DELETE, READ, TRUNCATE in that order. + const uint32_t expected_count[] = {3, 1, 2, 0, 0, 0}; + uint32_t count[] = {0, 0, 0, 0, 0, 0}; + + ExpectedRecordWithThreeColumns expected_records[] = { + {0, 0, 0}, {1, 2, INT_MAX}, {1, 3, INT_MAX}, {0, 0, INT_MAX}, {1, 3, 4}, {}}; + ExpectedRecordWithThreeColumns expected_before_image_records[] = { + {}, {}, {1, 2, INT_MAX}, {}, {1, 3, INT_MAX}, {}}; + + GetChangesResponsePB change_resp = ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets)); + + uint32_t record_size = change_resp.cdc_sdk_proto_records_size(); + for (uint32_t i = 0; i < record_size; ++i) { + const CDCSDKProtoRecordPB record = change_resp.cdc_sdk_proto_records(i); + CheckRecordWithThreeColumns( + record, expected_records[i], count, true, expected_before_image_records[i]); + } + LOG(INFO) << "Got " << count[1] << " insert record and " << count[2] << " update record"; + + CheckCount(expected_count, count); +} + } // namespace enterprise } // namespace cdc } // namespace yb