Skip to content

Commit

Permalink
[#23179] CDCSDK: Support data types with dynamically alloted oids in CDC
Browse files Browse the repository at this point in the history
Summary:
This diff adds support for data types with dynamically alloted oids in CDC (for ex: hstore, enum array, etc). Such types contain invalid pg_type_oid for the corresponding columns in docdb schema.

In the current implemtation, in `ybc_pggate`, while decoding the cdc records we look at the `type_map_` to obtain YBCPgTypeEntity, which is then used for decoding. However the `type_map_` does not contain any entries for the data types with dynamically alloted oids. As a result, this causes segmentation fault. To prevent such crashes, CDC prevents addition of tables with such columns to the stream.

This diff removes the filtering logic and adds the tables to the stream even if it has such a type column. A function pointer will now be passed to `YBCPgGetCDCConsistentChanges`, which takes attribute number and the table_oid and returns the appropriate type entity by querying the `pg_type` catalog table. While decoding if a column is encountered with invalid pg_type_oid then, the passed function is invoked and type entity is obtained for decoding.

**Upgrade/Rollback safety:**
This diff adds a field `optional int32 attr_num` to DatumMessagePB. These changes are protected by the autoflag `ysql_yb_enable_replication_slot_consumption` which already exists but has not yet been released.
Jira: DB-12118

Test Plan:
Jenkins: urgent

All the existing cdc tests

./yb_build.sh --java-test 'org.yb.pgsql.TestPgReplicationSlot#replicationConnectionConsumptionAllDataTypesWithYbOutput'

Reviewers: skumar, stiwary, asrinivasan, dmitry

Reviewed By: stiwary, dmitry

Subscribers: steve.varnau, skarri, yql, ybase, ycdcxcluster

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D36689
  • Loading branch information
Sumukh-Phalgaonkar committed Jul 19, 2024
1 parent ac9164b commit 5a76f6a
Show file tree
Hide file tree
Showing 11 changed files with 97 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -919,9 +919,12 @@ void replicationConnectionConsumptionAllDataTypes(String pluginName) throws Exce
+ "col_tsrange TSRANGE, "
+ "col_tstzrange TSTZRANGE, "
+ "col_daterange DATERANGE, "
+ "col_discount coupon_discount_type)";
+ "col_hstore HSTORE, "
+ "col_discount coupon_discount_type, "
+" col_discount_array coupon_discount_type[])";

try (Statement stmt = connection.createStatement()) {
stmt.execute("CREATE EXTENSION IF NOT EXISTS hstore;");
stmt.execute("CREATE TYPE coupon_discount_type AS ENUM ('FIXED', 'PERCENTAGE');");
stmt.execute(create_stmt);
if (pluginName.equals(PG_OUTPUT_PLUGIN_NAME)) {
Expand All @@ -947,7 +950,8 @@ void replicationConnectionConsumptionAllDataTypes(String pluginName) throws Exce
+ "'550e8400-e29b-41d4-a716-446655440000', B'101010', '2024-02-01 12:34:56+00:00', "
+ "'[1,10)', '[100,1000)', '[2024-01-01, 2024-12-31)', "
+ "'[2024-01-01 00:00:00+00:00, 2024-12-31 15:59:59+00:00)', "
+ "'[2024-01-01, 2024-12-31)', 'FIXED');");
+ "'[2024-01-01, 2024-12-31)','key1 => value1, key2 => value2'::hstore, 'FIXED', "
+ "array['FIXED', 'PERCENTAGE']::coupon_discount_type[]);");
}

PGReplicationStream stream = replConnection.replicationStream()
Expand All @@ -960,12 +964,14 @@ void replicationConnectionConsumptionAllDataTypes(String pluginName) throws Exce

List<PgOutputMessage> result = new ArrayList<PgOutputMessage>();
// 1 Relation, begin, type, insert and commit record.
result.addAll(receiveMessage(stream, 5));
result.addAll(receiveMessage(stream, 7));

List<PgOutputMessage> expectedResult = new ArrayList<PgOutputMessage>() {
{
add(PgOutputBeginMessage.CreateForComparison(LogSequenceNumber.valueOf("0/4"), 2));
add(PgOutputTypeMessage.CreateForComparison("public", "hstore"));
add(PgOutputTypeMessage.CreateForComparison("public", "coupon_discount_type"));
add(PgOutputTypeMessage.CreateForComparison("public", "_coupon_discount_type"));
if (pluginName.equals(YB_OUTPUT_PLUGIN_NAME)) {
add(PgOutputRelationMessage.CreateForComparison("public", "test_table", 'c',
Arrays.asList(PgOutputRelationMessageColumn.CreateForComparison("a", 23),
Expand Down Expand Up @@ -1003,8 +1009,12 @@ void replicationConnectionConsumptionAllDataTypes(String pluginName) throws Exce
PgOutputRelationMessageColumn.CreateForComparison("col_tsrange", 3908),
PgOutputRelationMessageColumn.CreateForComparison("col_tstzrange", 3910),
PgOutputRelationMessageColumn.CreateForComparison("col_daterange", 3912),
// The Oids for columns below are not fixed. Changing the order of creation of
// objects (extensions, tables etc.) in the test will these Oids.
PgOutputRelationMessageColumn.CreateForComparison("col_hstore", 16385),
PgOutputRelationMessageColumn.CreateForComparison(
"col_discount", /* IGNORED */ 0, /* compareDataType */ false))));
"col_discount", 16518, /* compareDataType */ false),
PgOutputRelationMessageColumn.CreateForComparison("col_discount_array",16517))));
} else {
// The replica identity for test_table in case of pgoutput is DEFAULT.
add(PgOutputRelationMessage.CreateForComparison("public", "test_table", 'd',
Expand Down Expand Up @@ -1043,10 +1053,14 @@ void replicationConnectionConsumptionAllDataTypes(String pluginName) throws Exce
PgOutputRelationMessageColumn.CreateForComparison("col_tsrange", 3908),
PgOutputRelationMessageColumn.CreateForComparison("col_tstzrange", 3910),
PgOutputRelationMessageColumn.CreateForComparison("col_daterange", 3912),
// The Oids for columns below are not fixed. Changing the order of creation of
// objects (extensions, tables etc.) in the test will these Oids.
PgOutputRelationMessageColumn.CreateForComparison("col_hstore", 16385),
PgOutputRelationMessageColumn.CreateForComparison(
"col_discount", /* IGNORED */ 0, /* compareDataType */ false))));
"col_discount", 16518, /* compareDataType */ false),
PgOutputRelationMessageColumn.CreateForComparison("col_discount_array",16517))));
}
add(PgOutputInsertMessage.CreateForComparison(new PgOutputMessageTuple((short) 36,
add(PgOutputInsertMessage.CreateForComparison(new PgOutputMessageTuple((short) 38,
Arrays.asList(new PgOutputMessageTupleColumnValue("1"),
new PgOutputMessageTupleColumnValue("110110"),
new PgOutputMessageTupleColumnValue("t"),
Expand Down Expand Up @@ -1086,7 +1100,9 @@ void replicationConnectionConsumptionAllDataTypes(String pluginName) throws Exce
convertTimestampToSystemTimezone("2024-01-01T00:00:00.00Z"),
convertTimestampToSystemTimezone("2024-12-31T15:59:59.00Z"))),
new PgOutputMessageTupleColumnValue("[2024-01-01,2024-12-31)"),
new PgOutputMessageTupleColumnValue("FIXED")))));
new PgOutputMessageTupleColumnValue("\"key1\"=>\"value1\", \"key2\"=>\"value2\""),
new PgOutputMessageTupleColumnValue("FIXED"),
new PgOutputMessageTupleColumnValue("{FIXED,PERCENTAGE}")))));
add(PgOutputCommitMessage.CreateForComparison(
LogSequenceNumber.valueOf("0/4"), LogSequenceNumber.valueOf("0/5")));
}
Expand Down
5 changes: 3 additions & 2 deletions src/postgres/src/backend/commands/ybccmds.c
Original file line number Diff line number Diff line change
Expand Up @@ -2030,9 +2030,10 @@ YBCDestroyVirtualWalForCDC()

void
YBCGetCDCConsistentChanges(const char *stream_id,
YBCPgChangeRecordBatch **record_batch)
YBCPgChangeRecordBatch **record_batch,
YBCTypeEntityProvider type_entity_provider)
{
HandleYBStatus(YBCPgGetCDCConsistentChanges(stream_id, record_batch));
HandleYBStatus(YBCPgGetCDCConsistentChanges(stream_id, record_batch, type_entity_provider));
}

void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <inttypes.h>

#include "access/xact.h"
#include "catalog/yb_type.h"
#include "commands/ybccmds.h"
#include "pg_yb_utils.h"
#include "replication/slot.h"
Expand Down Expand Up @@ -240,6 +241,26 @@ InitVirtualWal(List *publication_names)
list_free(tables);
}

static const YBCPgTypeEntity *
GetDynamicTypeEntity(int attr_num, Oid relid)
{
bool is_in_txn = IsTransactionOrTransactionBlock();
if (!is_in_txn)
StartTransactionCommand();

Relation rel = RelationIdGetRelation(relid);
if (!RelationIsValid(rel))
elog(ERROR, "Could not open relation with OID %u", relid);
Oid type_oid = GetTypeId(attr_num, RelationGetDescr(rel));
RelationClose(rel);
const YBCPgTypeEntity* type_entity = YbDataTypeFromOidMod(attr_num, type_oid);

if (!is_in_txn)
AbortCurrentTransaction();

return type_entity;
}

YBCPgVirtualWalRecord *
YBCReadRecord(XLogReaderState *state, XLogRecPtr RecPtr,
List *publication_names, char **errormsg)
Expand Down Expand Up @@ -293,7 +314,7 @@ YBCReadRecord(XLogReaderState *state, XLogRecPtr RecPtr,
}

YBCGetCDCConsistentChanges(MyReplicationSlot->data.yb_stream_id,
&cached_records);
&cached_records, &GetDynamicTypeEntity);

cached_records_last_sent_row_idx = 0;
YbWalSndTotalTimeInYBDecodeMicros = 0;
Expand Down
3 changes: 2 additions & 1 deletion src/postgres/src/include/commands/ybccmds.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ extern void YBCUpdatePublicationTableList(const char *stream_id,
extern void YBCDestroyVirtualWalForCDC();

extern void YBCGetCDCConsistentChanges(const char *stream_id,
YBCPgChangeRecordBatch **record_batch);
YBCPgChangeRecordBatch **record_batch,
YBCTypeEntityProvider type_entity_provider);

extern void YBCUpdateAndPersistLSN(const char *stream_id,
XLogRecPtr restart_lsn_hint,
Expand Down
1 change: 1 addition & 0 deletions src/yb/cdc/cdcsdk_producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ Status AddColumnToMap(
// send NULL values to the walsender. This is needed to be able to differentiate between NULL
// and Omitted values.
if (request_source == CDCSDKRequestSource::WALSENDER) {
cdc_datum_message->set_col_attr_num(col_schema.order());
cdc_datum_message->set_column_type(col_schema.pg_type_oid());
cdc_datum_message->mutable_pg_ql_value()->CopyFrom(ql_value);
return Status::OK();
Expand Down
2 changes: 2 additions & 0 deletions src/yb/common/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,8 @@ message DatumMessagePB {
// The Walsender does the conversion from QLValuePB to the PG datum.
QLValuePB pg_ql_value = 15;
}
// This is applicable only for YSQL table columns.
optional int32 col_attr_num = 16;
}

message PgDatumPB {
Expand Down
13 changes: 0 additions & 13 deletions src/yb/integration-tests/cdcsdk_ysql-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8611,19 +8611,6 @@ TEST_F(CDCSDKYsqlTest, TestNonEligibleTableShouldNotGetAddedToConsistentSnapshot
TestNonEligibleTableShouldNotGetAddedToCDCStream(/* create_consistent_snapshot_stream */ true);
}

TEST_F(CDCSDKYsqlTest, TestTablesWithEnumArrayColumnShouldNotGetAddedToStream) {
ASSERT_OK(SetUpWithParams(1, 1, false, false));
auto conn = ASSERT_RESULT(test_cluster_.ConnectToDB(kNamespaceName));

ASSERT_OK(conn.Execute("CREATE TYPE \"enum_type\" AS ENUM('a', 'b', 'c');"));
ASSERT_OK(conn.Execute("CREATE TABLE test_table (a int primary key, b \"enum_type\"[])"));
auto stream_id = ASSERT_RESULT(CreateDBStream());

auto stream = ASSERT_RESULT(GetCDCStream(stream_id));
// The table with enum array column will not be added to the stream.
ASSERT_EQ(stream.stream().table_id_size(), 0);
}

void CDCSDKYsqlTest::TestDisableOfDynamicTableAdditionOnCDCStream(
bool use_consistent_snapshot_stream) {
ANNOTATE_UNPROTECTED_WRITE(FLAGS_yb_enable_cdc_consistent_snapshot_streams) =
Expand Down
19 changes: 11 additions & 8 deletions src/yb/master/master_xrepl-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,6 @@ TEST_F(MasterTestXRepl, YB_DISABLE_TEST_IN_TSAN(TestCDCStreamCreationWithOldReco
CreateCDCStreamRequestPB req;
CreateCDCStreamResponsePB resp;
req.set_namespace_id(ns_id);
req.set_cdcsdk_ysql_replication_slot_name(kPgReplicationSlotName);
AddKeyValueToCreateCDCStreamRequestOption(&req, cdc::kIdType, cdc::kNamespaceId);
AddKeyValueToCreateCDCStreamRequestOption(
&req, cdc::kSourceType, CDCRequestSource_Name(cdc::CDCRequestSource::CDCSDK));
Expand All @@ -574,7 +573,8 @@ TEST_F(MasterTestXRepl, TestCreateCDCStreamForNamespaceInvalidDuplicationSlotNam
auto ns_id = create_namespace_resp.id();

for (auto i = 0; i < num_tables; ++i) {
ASSERT_OK(CreatePgsqlTable(ns_id, Format("cdc_table_$0", i), kTableIds[i], kTableSchema));
ASSERT_OK(
CreatePgsqlTable(ns_id, Format("cdc_table_$0", i), kTableIds[i], kTableSchemaWithTypeOids));
}

ASSERT_RESULT(
Expand Down Expand Up @@ -644,7 +644,8 @@ TEST_F(MasterTestXRepl, TestCreateCDCStreamForNamespaceLimitReached) {
auto ns_id = create_namespace_resp.id();

for (auto i = 0; i < num_tables; ++i) {
ASSERT_OK(CreatePgsqlTable(ns_id, Format("cdc_table_$0", i), kTableIds[i], kTableSchema));
ASSERT_OK(
CreatePgsqlTable(ns_id, Format("cdc_table_$0", i), kTableIds[i], kTableSchemaWithTypeOids));
}

ASSERT_RESULT(
Expand Down Expand Up @@ -791,7 +792,8 @@ TEST_F(MasterTestXRepl, TestCreateDropCDCStreamWithReplicationSlotName) {
ASSERT_OK(CreatePgsqlNamespace(kNamespaceName, kPgsqlNamespaceId, &create_namespace_resp));
auto ns_id = create_namespace_resp.id();
for (auto i = 0; i < num_tables; ++i) {
ASSERT_OK(CreatePgsqlTable(ns_id, Format("cdc_table_$0", i), kTableIds[i], kTableSchema));
ASSERT_OK(
CreatePgsqlTable(ns_id, Format("cdc_table_$0", i), kTableIds[i], kTableSchemaWithTypeOids));
}

// Create and Delete CDC stream with replication slot name in quick succession.
Expand Down Expand Up @@ -845,9 +847,9 @@ TEST_F(MasterTestXRepl, TestListCDCStreamsCDCSDKWithReplicationSlot) {
auto ns_id2 = create_namespace_resp.id();

// 2 tables in cdc_namespace and 1 table in cdc_namespace2
ASSERT_OK(CreatePgsqlTable(ns_id, "cdc_table_1", kTableIds[0], kTableSchema));
ASSERT_OK(CreatePgsqlTable(ns_id, "cdc_table_2", kTableIds[1], kTableSchema));
ASSERT_OK(CreatePgsqlTable(ns_id2, "cdc_table_3", kTableIds[2], kTableSchema));
ASSERT_OK(CreatePgsqlTable(ns_id, "cdc_table_1", kTableIds[0], kTableSchemaWithTypeOids));
ASSERT_OK(CreatePgsqlTable(ns_id, "cdc_table_2", kTableIds[1], kTableSchemaWithTypeOids));
ASSERT_OK(CreatePgsqlTable(ns_id2, "cdc_table_3", kTableIds[2], kTableSchemaWithTypeOids));

auto stream_id = ASSERT_RESULT(
CreateCDCStreamForNamespace(ns_id, kPgReplicationSlotName, kPgReplicationSlotPgOutput));
Expand Down Expand Up @@ -1134,7 +1136,8 @@ TEST_F(MasterTestXRepl, DropNamespaceWithLiveCDCStream) {
auto ns_id = create_namespace_resp.id();

for (auto i = 0; i < num_tables; ++i) {
ASSERT_OK(CreatePgsqlTable(ns_id, Format("cdc_table_$0", i), kTableIds[i], kTableSchema));
ASSERT_OK(
CreatePgsqlTable(ns_id, Format("cdc_table_$0", i), kTableIds[i], kTableSchemaWithTypeOids));
}
ASSERT_RESULT(
CreateCDCStreamForNamespace(ns_id, kPgReplicationSlotName, kPgReplicationSlotPgOutput));
Expand Down
18 changes: 7 additions & 11 deletions src/yb/master/xrepl_catalog_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1930,7 +1930,6 @@ bool CatalogManager::IsTableEligibleForCDCSDKStream(
const TableInfoPtr& table_info, const std::optional<Schema>& schema) const {
if (schema.has_value()) {
bool has_pk = true;
bool has_invalid_pg_typeoid = false;
for (const auto& col : schema->columns()) {
if (col.order() == static_cast<int32_t>(PgSystemAttrNum::kYBRowId)) {
// ybrowid column is added for tables that don't have user-specified primary key.
Expand All @@ -1939,19 +1938,16 @@ bool CatalogManager::IsTableEligibleForCDCSDKStream(
has_pk = false;
break;
}
if (col.pg_type_oid() == 0) {
has_invalid_pg_typeoid = true;
}
}
if (!has_pk || has_invalid_pg_typeoid) {
if (FLAGS_TEST_cdcsdk_add_indexes_to_stream) {
// allow adding user created indexes to CDC stream.
if (IsUserIndexUnlocked(*table_info)) {
return true;
}
}

if (!has_pk) {
return false;
}

// Allow adding user created indexes to CDC stream.
if (FLAGS_TEST_cdcsdk_add_indexes_to_stream && IsUserIndexUnlocked(*table_info)) {
return true;
}
}

if (IsMatviewTable(*table_info)) {
Expand Down
34 changes: 23 additions & 11 deletions src/yb/yql/pggate/ybc_pggate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,14 @@ std::string HumanReadableTableType(yb::TableType table_type) {
FATAL_INVALID_ENUM_VALUE(yb::TableType, table_type);
}

const YBCPgTypeEntity* GetTypeEntity(
int pg_type_oid, int attr_num, YBCPgOid table_oid, YBCTypeEntityProvider type_entity_provider) {

// TODO(23239): Optimize the lookup of type entities for dynamic types.
return pg_type_oid == kPgInvalidOid ? (*type_entity_provider)(attr_num, table_oid)
: pgapi->FindTypeEntity(pg_type_oid);
}

} // namespace

//--------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -2425,7 +2433,9 @@ YBCPgRowMessageAction GetRowMessageAction(yb::cdc::RowMessage row_message_pb) {
}

YBCStatus YBCPgGetCDCConsistentChanges(
const char *stream_id, YBCPgChangeRecordBatch **record_batch) {
const char* stream_id,
YBCPgChangeRecordBatch** record_batch,
YBCTypeEntityProvider type_entity_provider) {
const auto result = pgapi->GetConsistentChangesForCDC(std::string(stream_id));
if (!result.ok()) {
return ToYBCStatus(result.status());
Expand Down Expand Up @@ -2484,6 +2494,10 @@ YBCStatus YBCPgGetCDCConsistentChanges(
old_tuple_idx++;
}

const auto table_oid = row_message_pb.has_table_id()
? PgObjectId(row_message_pb.table_id()).object_oid
: kPgInvalidOid;

auto col_count = narrow_cast<int>(col_name_idx_map.size());
YBCPgDatumMessage *cols = nullptr;
if (col_count > 0) {
Expand All @@ -2501,8 +2515,10 @@ YBCStatus YBCPgGetCDCConsistentChanges(
if (!before_op_is_omitted) {
const auto old_datum_pb =
&row_message_pb.old_tuple(static_cast<int>(col_idxs.second.first));
const auto *type_entity =
pgapi->FindTypeEntity(static_cast<int>(old_datum_pb->column_type()));
DCHECK(table_oid != kPgInvalidOid);
const auto* type_entity = GetTypeEntity(
static_cast<int>(old_datum_pb->column_type()), old_datum_pb->col_attr_num(),
table_oid, type_entity_provider);
auto s = PBToDatum(
type_entity, type_attrs, old_datum_pb->pg_ql_value(), &before_op_datum,
&before_op_is_null);
Expand All @@ -2518,8 +2534,10 @@ YBCStatus YBCPgGetCDCConsistentChanges(
if (!after_op_is_omitted) {
const auto new_datum_pb =
&row_message_pb.new_tuple(static_cast<int>(col_idxs.second.second));
const auto *type_entity =
pgapi->FindTypeEntity(static_cast<int>(new_datum_pb->column_type()));
DCHECK(table_oid != kPgInvalidOid);
const auto* type_entity = GetTypeEntity(
static_cast<int>(new_datum_pb->column_type()), new_datum_pb->col_attr_num(),
table_oid, type_entity_provider);
auto s = PBToDatum(
type_entity, type_attrs, new_datum_pb->pg_ql_value(), &after_op_datum,
&after_op_is_null);
Expand All @@ -2539,12 +2557,6 @@ YBCStatus YBCPgGetCDCConsistentChanges(
}
}

// Only present for DML records.
YBCPgOid table_oid = kPgInvalidOid;
if (row_message_pb.has_table_id()) {
auto table_id = PgObjectId(row_message_pb.table_id());
table_oid = table_id.object_oid;
}

new (&resp_rows[row_idx]) YBCPgRowMessage{
.col_count = col_count,
Expand Down
4 changes: 3 additions & 1 deletion src/yb/yql/pggate/ybc_pggate.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ extern "C" {

typedef void (*YBCAshAcquireBufferLock)(bool);
typedef YBCAshSample* (*YBCAshGetNextCircularBufferSlot)();
typedef const YBCPgTypeEntity* (*YBCTypeEntityProvider)(int, YBCPgOid);

typedef void * SliceVector;
typedef const void * ConstSliceVector;
Expand Down Expand Up @@ -877,7 +878,8 @@ YBCStatus YBCPgUpdatePublicationTableList(
YBCStatus YBCPgDestroyVirtualWalForCDC();

YBCStatus YBCPgGetCDCConsistentChanges(const char *stream_id,
YBCPgChangeRecordBatch **record_batch);
YBCPgChangeRecordBatch **record_batch,
YBCTypeEntityProvider type_entity_provider);

YBCStatus YBCPgUpdateAndPersistLSN(
const char* stream_id, YBCPgXLogRecPtr restart_lsn_hint, YBCPgXLogRecPtr confirmed_flush,
Expand Down

0 comments on commit 5a76f6a

Please sign in to comment.