diff --git a/ent/src/yb/cdc/cdcsdk_producer.cc b/ent/src/yb/cdc/cdcsdk_producer.cc index cc18825a82bd..ea38311df195 100644 --- a/ent/src/yb/cdc/cdcsdk_producer.cc +++ b/ent/src/yb/cdc/cdcsdk_producer.cc @@ -227,6 +227,7 @@ Status PopulateBeforeImage( std::vector columns(schema.columns()); + size_t found_columns = 0; if (row.ColumnCount() == columns.size()) { for (size_t index = 0; index < row.ColumnCount(); ++index) { bool column_updated = false; @@ -236,25 +237,27 @@ Status PopulateBeforeImage( tablet_peer, columns[index], PrimitiveValue(), enum_oid_label_map, composite_atts_map, row_message->add_old_tuple(), &ql_value.value())); if (row_message->op() == RowMessage_Op_UPDATE) { + const auto& old_tuple_column_name = + row_message->old_tuple(static_cast(found_columns)).column_name(); for (int new_tuple_index = 0; new_tuple_index < row_message->new_tuple_size(); ++new_tuple_index) { if (row_message->new_tuple(static_cast(new_tuple_index)).column_name() == - columns[index].name()) { + old_tuple_column_name) { column_updated = true; break; } } if (!column_updated) { - *(row_message->add_new_tuple()) = row_message->old_tuple(static_cast(index)); + auto new_tuple_pb = row_message->mutable_new_tuple()->Add(); + new_tuple_pb->CopyFrom(row_message->old_tuple(static_cast(found_columns))); } } + found_columns += 1; } } - } else { - if (row_message->op() != RowMessage_Op_DELETE) { - for (size_t index = 0; index < schema.num_columns(); ++index) { - row_message->add_old_tuple(); - } + } else if (row_message->op() != RowMessage_Op_DELETE) { + for (size_t index = 0; index < schema.num_columns(); ++index) { + row_message->add_old_tuple(); } } return Status::OK(); diff --git a/ent/src/yb/integration-tests/cdcsdk_ysql-test.cc b/ent/src/yb/integration-tests/cdcsdk_ysql-test.cc index 5745bbd0c501..e26250e7d6a4 100644 --- a/ent/src/yb/integration-tests/cdcsdk_ysql-test.cc +++ b/ent/src/yb/integration-tests/cdcsdk_ysql-test.cc @@ -9778,6 +9778,48 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestExpiredStreamWithCompaction)) ASSERT_LE(count_compaction_after_expired, count_after_compaction); } +TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestColumnDropBeforeImage)) { + ANNOTATE_UNPROTECTED_WRITE(FLAGS_timestamp_history_retention_interval_sec) = 0; + ASSERT_OK(SetUpWithParams(3, 1, false)); + auto table = ASSERT_RESULT(CreateTable(&test_cluster_, kNamespaceName, kTableName)); + google::protobuf::RepeatedPtrField tablets; + ASSERT_OK(test_client()->GetTablets(table, 0, &tablets, nullptr)); + ASSERT_EQ(tablets.size(), 1); + CDCStreamId stream_id = + ASSERT_RESULT(CreateDBStream(CDCCheckpointType::IMPLICIT, CDCRecordType::ALL)); + auto set_resp = ASSERT_RESULT(SetCDCCheckpoint(stream_id, tablets)); + ASSERT_FALSE(set_resp.has_error()); + + auto conn = ASSERT_RESULT(test_cluster_.ConnectToDB(kNamespaceName)); + + ASSERT_OK(conn.Execute("INSERT INTO test_table VALUES (1, 2)")); + ASSERT_OK(conn.Execute("UPDATE test_table SET value_1 = 3 WHERE key = 1")); + ASSERT_OK(conn.Execute("ALTER TABLE test_table ADD COLUMN value_2 INT")); + ASSERT_OK(conn.Execute("UPDATE test_table SET value_2 = 4 WHERE key = 1")); + ASSERT_OK(conn.Execute("ALTER TABLE test_table DROP COLUMN value_2")); + + // The count array stores counts of DDL, INSERT, UPDATE, DELETE, READ, TRUNCATE in that order. + const uint32_t expected_count[] = {3, 1, 2, 0, 0, 0}; + uint32_t count[] = {0, 0, 0, 0, 0, 0}; + + ExpectedRecordWithThreeColumns expected_records[] = { + {0, 0, 0}, {1, 2, INT_MAX}, {1, 3, INT_MAX}, {0, 0, INT_MAX}, {1, 3, 4}, {}}; + ExpectedRecordWithThreeColumns expected_before_image_records[] = { + {}, {}, {1, 2, INT_MAX}, {}, {1, 3, INT_MAX}, {}}; + + GetChangesResponsePB change_resp = ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets)); + + uint32_t record_size = change_resp.cdc_sdk_proto_records_size(); + for (uint32_t i = 0; i < record_size; ++i) { + const CDCSDKProtoRecordPB record = change_resp.cdc_sdk_proto_records(i); + CheckRecordWithThreeColumns( + record, expected_records[i], count, true, expected_before_image_records[i]); + } + LOG(INFO) << "Got " << count[1] << " insert record and " << count[2] << " update record"; + + CheckCount(expected_count, count); +} + } // namespace enterprise } // namespace cdc } // namespace yb