From 5a76f6a8ff902ad40651320d9b4156739da845ed Mon Sep 17 00:00:00 2001 From: Sumukh-Phalgaonkar Date: Fri, 19 Jul 2024 15:36:19 +0530 Subject: [PATCH] [#23179] CDCSDK: Support data types with dynamically alloted oids in CDC Summary: This diff adds support for data types with dynamically alloted oids in CDC (for ex: hstore, enum array, etc). Such types contain invalid pg_type_oid for the corresponding columns in docdb schema. In the current implemtation, in `ybc_pggate`, while decoding the cdc records we look at the `type_map_` to obtain YBCPgTypeEntity, which is then used for decoding. However the `type_map_` does not contain any entries for the data types with dynamically alloted oids. As a result, this causes segmentation fault. To prevent such crashes, CDC prevents addition of tables with such columns to the stream. This diff removes the filtering logic and adds the tables to the stream even if it has such a type column. A function pointer will now be passed to `YBCPgGetCDCConsistentChanges`, which takes attribute number and the table_oid and returns the appropriate type entity by querying the `pg_type` catalog table. While decoding if a column is encountered with invalid pg_type_oid then, the passed function is invoked and type entity is obtained for decoding. **Upgrade/Rollback safety:** This diff adds a field `optional int32 attr_num` to DatumMessagePB. These changes are protected by the autoflag `ysql_yb_enable_replication_slot_consumption` which already exists but has not yet been released. Jira: DB-12118 Test Plan: Jenkins: urgent All the existing cdc tests ./yb_build.sh --java-test 'org.yb.pgsql.TestPgReplicationSlot#replicationConnectionConsumptionAllDataTypesWithYbOutput' Reviewers: skumar, stiwary, asrinivasan, dmitry Reviewed By: stiwary, dmitry Subscribers: steve.varnau, skarri, yql, ybase, ycdcxcluster Tags: #jenkins-ready Differential Revision: https://phorge.dev.yugabyte.com/D36689 --- .../org/yb/pgsql/TestPgReplicationSlot.java | 30 ++++++++++++---- src/postgres/src/backend/commands/ybccmds.c | 5 +-- .../logical/yb_virtual_wal_client.c | 23 ++++++++++++- src/postgres/src/include/commands/ybccmds.h | 3 +- src/yb/cdc/cdcsdk_producer.cc | 1 + src/yb/common/common.proto | 2 ++ src/yb/integration-tests/cdcsdk_ysql-test.cc | 13 ------- src/yb/master/master_xrepl-test.cc | 19 ++++++----- src/yb/master/xrepl_catalog_manager.cc | 18 ++++------ src/yb/yql/pggate/ybc_pggate.cc | 34 +++++++++++++------ src/yb/yql/pggate/ybc_pggate.h | 4 ++- 11 files changed, 97 insertions(+), 55 deletions(-) diff --git a/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgReplicationSlot.java b/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgReplicationSlot.java index 2c73dd859e78..4fe714230623 100644 --- a/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgReplicationSlot.java +++ b/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgReplicationSlot.java @@ -919,9 +919,12 @@ void replicationConnectionConsumptionAllDataTypes(String pluginName) throws Exce + "col_tsrange TSRANGE, " + "col_tstzrange TSTZRANGE, " + "col_daterange DATERANGE, " - + "col_discount coupon_discount_type)"; + + "col_hstore HSTORE, " + + "col_discount coupon_discount_type, " + +" col_discount_array coupon_discount_type[])"; try (Statement stmt = connection.createStatement()) { + stmt.execute("CREATE EXTENSION IF NOT EXISTS hstore;"); stmt.execute("CREATE TYPE coupon_discount_type AS ENUM ('FIXED', 'PERCENTAGE');"); stmt.execute(create_stmt); if (pluginName.equals(PG_OUTPUT_PLUGIN_NAME)) { @@ -947,7 +950,8 @@ void replicationConnectionConsumptionAllDataTypes(String pluginName) throws Exce + "'550e8400-e29b-41d4-a716-446655440000', B'101010', '2024-02-01 12:34:56+00:00', " + "'[1,10)', '[100,1000)', '[2024-01-01, 2024-12-31)', " + "'[2024-01-01 00:00:00+00:00, 2024-12-31 15:59:59+00:00)', " - + "'[2024-01-01, 2024-12-31)', 'FIXED');"); + + "'[2024-01-01, 2024-12-31)','key1 => value1, key2 => value2'::hstore, 'FIXED', " + + "array['FIXED', 'PERCENTAGE']::coupon_discount_type[]);"); } PGReplicationStream stream = replConnection.replicationStream() @@ -960,12 +964,14 @@ void replicationConnectionConsumptionAllDataTypes(String pluginName) throws Exce List result = new ArrayList(); // 1 Relation, begin, type, insert and commit record. - result.addAll(receiveMessage(stream, 5)); + result.addAll(receiveMessage(stream, 7)); List expectedResult = new ArrayList() { { add(PgOutputBeginMessage.CreateForComparison(LogSequenceNumber.valueOf("0/4"), 2)); + add(PgOutputTypeMessage.CreateForComparison("public", "hstore")); add(PgOutputTypeMessage.CreateForComparison("public", "coupon_discount_type")); + add(PgOutputTypeMessage.CreateForComparison("public", "_coupon_discount_type")); if (pluginName.equals(YB_OUTPUT_PLUGIN_NAME)) { add(PgOutputRelationMessage.CreateForComparison("public", "test_table", 'c', Arrays.asList(PgOutputRelationMessageColumn.CreateForComparison("a", 23), @@ -1003,8 +1009,12 @@ void replicationConnectionConsumptionAllDataTypes(String pluginName) throws Exce PgOutputRelationMessageColumn.CreateForComparison("col_tsrange", 3908), PgOutputRelationMessageColumn.CreateForComparison("col_tstzrange", 3910), PgOutputRelationMessageColumn.CreateForComparison("col_daterange", 3912), + // The Oids for columns below are not fixed. Changing the order of creation of + // objects (extensions, tables etc.) in the test will these Oids. + PgOutputRelationMessageColumn.CreateForComparison("col_hstore", 16385), PgOutputRelationMessageColumn.CreateForComparison( - "col_discount", /* IGNORED */ 0, /* compareDataType */ false)))); + "col_discount", 16518, /* compareDataType */ false), + PgOutputRelationMessageColumn.CreateForComparison("col_discount_array",16517)))); } else { // The replica identity for test_table in case of pgoutput is DEFAULT. add(PgOutputRelationMessage.CreateForComparison("public", "test_table", 'd', @@ -1043,10 +1053,14 @@ void replicationConnectionConsumptionAllDataTypes(String pluginName) throws Exce PgOutputRelationMessageColumn.CreateForComparison("col_tsrange", 3908), PgOutputRelationMessageColumn.CreateForComparison("col_tstzrange", 3910), PgOutputRelationMessageColumn.CreateForComparison("col_daterange", 3912), + // The Oids for columns below are not fixed. Changing the order of creation of + // objects (extensions, tables etc.) in the test will these Oids. + PgOutputRelationMessageColumn.CreateForComparison("col_hstore", 16385), PgOutputRelationMessageColumn.CreateForComparison( - "col_discount", /* IGNORED */ 0, /* compareDataType */ false)))); + "col_discount", 16518, /* compareDataType */ false), + PgOutputRelationMessageColumn.CreateForComparison("col_discount_array",16517)))); } - add(PgOutputInsertMessage.CreateForComparison(new PgOutputMessageTuple((short) 36, + add(PgOutputInsertMessage.CreateForComparison(new PgOutputMessageTuple((short) 38, Arrays.asList(new PgOutputMessageTupleColumnValue("1"), new PgOutputMessageTupleColumnValue("110110"), new PgOutputMessageTupleColumnValue("t"), @@ -1086,7 +1100,9 @@ void replicationConnectionConsumptionAllDataTypes(String pluginName) throws Exce convertTimestampToSystemTimezone("2024-01-01T00:00:00.00Z"), convertTimestampToSystemTimezone("2024-12-31T15:59:59.00Z"))), new PgOutputMessageTupleColumnValue("[2024-01-01,2024-12-31)"), - new PgOutputMessageTupleColumnValue("FIXED"))))); + new PgOutputMessageTupleColumnValue("\"key1\"=>\"value1\", \"key2\"=>\"value2\""), + new PgOutputMessageTupleColumnValue("FIXED"), + new PgOutputMessageTupleColumnValue("{FIXED,PERCENTAGE}"))))); add(PgOutputCommitMessage.CreateForComparison( LogSequenceNumber.valueOf("0/4"), LogSequenceNumber.valueOf("0/5"))); } diff --git a/src/postgres/src/backend/commands/ybccmds.c b/src/postgres/src/backend/commands/ybccmds.c index aa7a2b1076a8..7828231df919 100644 --- a/src/postgres/src/backend/commands/ybccmds.c +++ b/src/postgres/src/backend/commands/ybccmds.c @@ -2030,9 +2030,10 @@ YBCDestroyVirtualWalForCDC() void YBCGetCDCConsistentChanges(const char *stream_id, - YBCPgChangeRecordBatch **record_batch) + YBCPgChangeRecordBatch **record_batch, + YBCTypeEntityProvider type_entity_provider) { - HandleYBStatus(YBCPgGetCDCConsistentChanges(stream_id, record_batch)); + HandleYBStatus(YBCPgGetCDCConsistentChanges(stream_id, record_batch, type_entity_provider)); } void diff --git a/src/postgres/src/backend/replication/logical/yb_virtual_wal_client.c b/src/postgres/src/backend/replication/logical/yb_virtual_wal_client.c index e05ee247f8c1..31cacf0e6f81 100644 --- a/src/postgres/src/backend/replication/logical/yb_virtual_wal_client.c +++ b/src/postgres/src/backend/replication/logical/yb_virtual_wal_client.c @@ -26,6 +26,7 @@ #include #include "access/xact.h" +#include "catalog/yb_type.h" #include "commands/ybccmds.h" #include "pg_yb_utils.h" #include "replication/slot.h" @@ -240,6 +241,26 @@ InitVirtualWal(List *publication_names) list_free(tables); } +static const YBCPgTypeEntity * +GetDynamicTypeEntity(int attr_num, Oid relid) +{ + bool is_in_txn = IsTransactionOrTransactionBlock(); + if (!is_in_txn) + StartTransactionCommand(); + + Relation rel = RelationIdGetRelation(relid); + if (!RelationIsValid(rel)) + elog(ERROR, "Could not open relation with OID %u", relid); + Oid type_oid = GetTypeId(attr_num, RelationGetDescr(rel)); + RelationClose(rel); + const YBCPgTypeEntity* type_entity = YbDataTypeFromOidMod(attr_num, type_oid); + + if (!is_in_txn) + AbortCurrentTransaction(); + + return type_entity; +} + YBCPgVirtualWalRecord * YBCReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, List *publication_names, char **errormsg) @@ -293,7 +314,7 @@ YBCReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, } YBCGetCDCConsistentChanges(MyReplicationSlot->data.yb_stream_id, - &cached_records); + &cached_records, &GetDynamicTypeEntity); cached_records_last_sent_row_idx = 0; YbWalSndTotalTimeInYBDecodeMicros = 0; diff --git a/src/postgres/src/include/commands/ybccmds.h b/src/postgres/src/include/commands/ybccmds.h index d243c2c96e18..17f1c8104231 100644 --- a/src/postgres/src/include/commands/ybccmds.h +++ b/src/postgres/src/include/commands/ybccmds.h @@ -143,7 +143,8 @@ extern void YBCUpdatePublicationTableList(const char *stream_id, extern void YBCDestroyVirtualWalForCDC(); extern void YBCGetCDCConsistentChanges(const char *stream_id, - YBCPgChangeRecordBatch **record_batch); + YBCPgChangeRecordBatch **record_batch, + YBCTypeEntityProvider type_entity_provider); extern void YBCUpdateAndPersistLSN(const char *stream_id, XLogRecPtr restart_lsn_hint, diff --git a/src/yb/cdc/cdcsdk_producer.cc b/src/yb/cdc/cdcsdk_producer.cc index b361ebfc16eb..629a854125c1 100644 --- a/src/yb/cdc/cdcsdk_producer.cc +++ b/src/yb/cdc/cdcsdk_producer.cc @@ -168,6 +168,7 @@ Status AddColumnToMap( // send NULL values to the walsender. This is needed to be able to differentiate between NULL // and Omitted values. if (request_source == CDCSDKRequestSource::WALSENDER) { + cdc_datum_message->set_col_attr_num(col_schema.order()); cdc_datum_message->set_column_type(col_schema.pg_type_oid()); cdc_datum_message->mutable_pg_ql_value()->CopyFrom(ql_value); return Status::OK(); diff --git a/src/yb/common/common.proto b/src/yb/common/common.proto index ffb7e59a729d..ee604b8ba84a 100644 --- a/src/yb/common/common.proto +++ b/src/yb/common/common.proto @@ -639,6 +639,8 @@ message DatumMessagePB { // The Walsender does the conversion from QLValuePB to the PG datum. QLValuePB pg_ql_value = 15; } + // This is applicable only for YSQL table columns. + optional int32 col_attr_num = 16; } message PgDatumPB { diff --git a/src/yb/integration-tests/cdcsdk_ysql-test.cc b/src/yb/integration-tests/cdcsdk_ysql-test.cc index bf61a34c2b0e..efec69bf592c 100644 --- a/src/yb/integration-tests/cdcsdk_ysql-test.cc +++ b/src/yb/integration-tests/cdcsdk_ysql-test.cc @@ -8611,19 +8611,6 @@ TEST_F(CDCSDKYsqlTest, TestNonEligibleTableShouldNotGetAddedToConsistentSnapshot TestNonEligibleTableShouldNotGetAddedToCDCStream(/* create_consistent_snapshot_stream */ true); } -TEST_F(CDCSDKYsqlTest, TestTablesWithEnumArrayColumnShouldNotGetAddedToStream) { - ASSERT_OK(SetUpWithParams(1, 1, false, false)); - auto conn = ASSERT_RESULT(test_cluster_.ConnectToDB(kNamespaceName)); - - ASSERT_OK(conn.Execute("CREATE TYPE \"enum_type\" AS ENUM('a', 'b', 'c');")); - ASSERT_OK(conn.Execute("CREATE TABLE test_table (a int primary key, b \"enum_type\"[])")); - auto stream_id = ASSERT_RESULT(CreateDBStream()); - - auto stream = ASSERT_RESULT(GetCDCStream(stream_id)); - // The table with enum array column will not be added to the stream. - ASSERT_EQ(stream.stream().table_id_size(), 0); -} - void CDCSDKYsqlTest::TestDisableOfDynamicTableAdditionOnCDCStream( bool use_consistent_snapshot_stream) { ANNOTATE_UNPROTECTED_WRITE(FLAGS_yb_enable_cdc_consistent_snapshot_streams) = diff --git a/src/yb/master/master_xrepl-test.cc b/src/yb/master/master_xrepl-test.cc index c3fbefb7de64..0f6a1966a3d1 100644 --- a/src/yb/master/master_xrepl-test.cc +++ b/src/yb/master/master_xrepl-test.cc @@ -556,7 +556,6 @@ TEST_F(MasterTestXRepl, YB_DISABLE_TEST_IN_TSAN(TestCDCStreamCreationWithOldReco CreateCDCStreamRequestPB req; CreateCDCStreamResponsePB resp; req.set_namespace_id(ns_id); - req.set_cdcsdk_ysql_replication_slot_name(kPgReplicationSlotName); AddKeyValueToCreateCDCStreamRequestOption(&req, cdc::kIdType, cdc::kNamespaceId); AddKeyValueToCreateCDCStreamRequestOption( &req, cdc::kSourceType, CDCRequestSource_Name(cdc::CDCRequestSource::CDCSDK)); @@ -574,7 +573,8 @@ TEST_F(MasterTestXRepl, TestCreateCDCStreamForNamespaceInvalidDuplicationSlotNam auto ns_id = create_namespace_resp.id(); for (auto i = 0; i < num_tables; ++i) { - ASSERT_OK(CreatePgsqlTable(ns_id, Format("cdc_table_$0", i), kTableIds[i], kTableSchema)); + ASSERT_OK( + CreatePgsqlTable(ns_id, Format("cdc_table_$0", i), kTableIds[i], kTableSchemaWithTypeOids)); } ASSERT_RESULT( @@ -644,7 +644,8 @@ TEST_F(MasterTestXRepl, TestCreateCDCStreamForNamespaceLimitReached) { auto ns_id = create_namespace_resp.id(); for (auto i = 0; i < num_tables; ++i) { - ASSERT_OK(CreatePgsqlTable(ns_id, Format("cdc_table_$0", i), kTableIds[i], kTableSchema)); + ASSERT_OK( + CreatePgsqlTable(ns_id, Format("cdc_table_$0", i), kTableIds[i], kTableSchemaWithTypeOids)); } ASSERT_RESULT( @@ -791,7 +792,8 @@ TEST_F(MasterTestXRepl, TestCreateDropCDCStreamWithReplicationSlotName) { ASSERT_OK(CreatePgsqlNamespace(kNamespaceName, kPgsqlNamespaceId, &create_namespace_resp)); auto ns_id = create_namespace_resp.id(); for (auto i = 0; i < num_tables; ++i) { - ASSERT_OK(CreatePgsqlTable(ns_id, Format("cdc_table_$0", i), kTableIds[i], kTableSchema)); + ASSERT_OK( + CreatePgsqlTable(ns_id, Format("cdc_table_$0", i), kTableIds[i], kTableSchemaWithTypeOids)); } // Create and Delete CDC stream with replication slot name in quick succession. @@ -845,9 +847,9 @@ TEST_F(MasterTestXRepl, TestListCDCStreamsCDCSDKWithReplicationSlot) { auto ns_id2 = create_namespace_resp.id(); // 2 tables in cdc_namespace and 1 table in cdc_namespace2 - ASSERT_OK(CreatePgsqlTable(ns_id, "cdc_table_1", kTableIds[0], kTableSchema)); - ASSERT_OK(CreatePgsqlTable(ns_id, "cdc_table_2", kTableIds[1], kTableSchema)); - ASSERT_OK(CreatePgsqlTable(ns_id2, "cdc_table_3", kTableIds[2], kTableSchema)); + ASSERT_OK(CreatePgsqlTable(ns_id, "cdc_table_1", kTableIds[0], kTableSchemaWithTypeOids)); + ASSERT_OK(CreatePgsqlTable(ns_id, "cdc_table_2", kTableIds[1], kTableSchemaWithTypeOids)); + ASSERT_OK(CreatePgsqlTable(ns_id2, "cdc_table_3", kTableIds[2], kTableSchemaWithTypeOids)); auto stream_id = ASSERT_RESULT( CreateCDCStreamForNamespace(ns_id, kPgReplicationSlotName, kPgReplicationSlotPgOutput)); @@ -1134,7 +1136,8 @@ TEST_F(MasterTestXRepl, DropNamespaceWithLiveCDCStream) { auto ns_id = create_namespace_resp.id(); for (auto i = 0; i < num_tables; ++i) { - ASSERT_OK(CreatePgsqlTable(ns_id, Format("cdc_table_$0", i), kTableIds[i], kTableSchema)); + ASSERT_OK( + CreatePgsqlTable(ns_id, Format("cdc_table_$0", i), kTableIds[i], kTableSchemaWithTypeOids)); } ASSERT_RESULT( CreateCDCStreamForNamespace(ns_id, kPgReplicationSlotName, kPgReplicationSlotPgOutput)); diff --git a/src/yb/master/xrepl_catalog_manager.cc b/src/yb/master/xrepl_catalog_manager.cc index 40f079ba43f3..b23dd192fdd7 100644 --- a/src/yb/master/xrepl_catalog_manager.cc +++ b/src/yb/master/xrepl_catalog_manager.cc @@ -1930,7 +1930,6 @@ bool CatalogManager::IsTableEligibleForCDCSDKStream( const TableInfoPtr& table_info, const std::optional& schema) const { if (schema.has_value()) { bool has_pk = true; - bool has_invalid_pg_typeoid = false; for (const auto& col : schema->columns()) { if (col.order() == static_cast(PgSystemAttrNum::kYBRowId)) { // ybrowid column is added for tables that don't have user-specified primary key. @@ -1939,19 +1938,16 @@ bool CatalogManager::IsTableEligibleForCDCSDKStream( has_pk = false; break; } - if (col.pg_type_oid() == 0) { - has_invalid_pg_typeoid = true; - } } - if (!has_pk || has_invalid_pg_typeoid) { - if (FLAGS_TEST_cdcsdk_add_indexes_to_stream) { - // allow adding user created indexes to CDC stream. - if (IsUserIndexUnlocked(*table_info)) { - return true; - } - } + + if (!has_pk) { return false; } + + // Allow adding user created indexes to CDC stream. + if (FLAGS_TEST_cdcsdk_add_indexes_to_stream && IsUserIndexUnlocked(*table_info)) { + return true; + } } if (IsMatviewTable(*table_info)) { diff --git a/src/yb/yql/pggate/ybc_pggate.cc b/src/yb/yql/pggate/ybc_pggate.cc index 556df16c6cba..ea0432602194 100644 --- a/src/yb/yql/pggate/ybc_pggate.cc +++ b/src/yb/yql/pggate/ybc_pggate.cc @@ -400,6 +400,14 @@ std::string HumanReadableTableType(yb::TableType table_type) { FATAL_INVALID_ENUM_VALUE(yb::TableType, table_type); } +const YBCPgTypeEntity* GetTypeEntity( + int pg_type_oid, int attr_num, YBCPgOid table_oid, YBCTypeEntityProvider type_entity_provider) { + + // TODO(23239): Optimize the lookup of type entities for dynamic types. + return pg_type_oid == kPgInvalidOid ? (*type_entity_provider)(attr_num, table_oid) + : pgapi->FindTypeEntity(pg_type_oid); +} + } // namespace //-------------------------------------------------------------------------------------------------- @@ -2425,7 +2433,9 @@ YBCPgRowMessageAction GetRowMessageAction(yb::cdc::RowMessage row_message_pb) { } YBCStatus YBCPgGetCDCConsistentChanges( - const char *stream_id, YBCPgChangeRecordBatch **record_batch) { + const char* stream_id, + YBCPgChangeRecordBatch** record_batch, + YBCTypeEntityProvider type_entity_provider) { const auto result = pgapi->GetConsistentChangesForCDC(std::string(stream_id)); if (!result.ok()) { return ToYBCStatus(result.status()); @@ -2484,6 +2494,10 @@ YBCStatus YBCPgGetCDCConsistentChanges( old_tuple_idx++; } + const auto table_oid = row_message_pb.has_table_id() + ? PgObjectId(row_message_pb.table_id()).object_oid + : kPgInvalidOid; + auto col_count = narrow_cast(col_name_idx_map.size()); YBCPgDatumMessage *cols = nullptr; if (col_count > 0) { @@ -2501,8 +2515,10 @@ YBCStatus YBCPgGetCDCConsistentChanges( if (!before_op_is_omitted) { const auto old_datum_pb = &row_message_pb.old_tuple(static_cast(col_idxs.second.first)); - const auto *type_entity = - pgapi->FindTypeEntity(static_cast(old_datum_pb->column_type())); + DCHECK(table_oid != kPgInvalidOid); + const auto* type_entity = GetTypeEntity( + static_cast(old_datum_pb->column_type()), old_datum_pb->col_attr_num(), + table_oid, type_entity_provider); auto s = PBToDatum( type_entity, type_attrs, old_datum_pb->pg_ql_value(), &before_op_datum, &before_op_is_null); @@ -2518,8 +2534,10 @@ YBCStatus YBCPgGetCDCConsistentChanges( if (!after_op_is_omitted) { const auto new_datum_pb = &row_message_pb.new_tuple(static_cast(col_idxs.second.second)); - const auto *type_entity = - pgapi->FindTypeEntity(static_cast(new_datum_pb->column_type())); + DCHECK(table_oid != kPgInvalidOid); + const auto* type_entity = GetTypeEntity( + static_cast(new_datum_pb->column_type()), new_datum_pb->col_attr_num(), + table_oid, type_entity_provider); auto s = PBToDatum( type_entity, type_attrs, new_datum_pb->pg_ql_value(), &after_op_datum, &after_op_is_null); @@ -2539,12 +2557,6 @@ YBCStatus YBCPgGetCDCConsistentChanges( } } - // Only present for DML records. - YBCPgOid table_oid = kPgInvalidOid; - if (row_message_pb.has_table_id()) { - auto table_id = PgObjectId(row_message_pb.table_id()); - table_oid = table_id.object_oid; - } new (&resp_rows[row_idx]) YBCPgRowMessage{ .col_count = col_count, diff --git a/src/yb/yql/pggate/ybc_pggate.h b/src/yb/yql/pggate/ybc_pggate.h index 12cf3c7f837d..fd082df1f87d 100644 --- a/src/yb/yql/pggate/ybc_pggate.h +++ b/src/yb/yql/pggate/ybc_pggate.h @@ -25,6 +25,7 @@ extern "C" { typedef void (*YBCAshAcquireBufferLock)(bool); typedef YBCAshSample* (*YBCAshGetNextCircularBufferSlot)(); +typedef const YBCPgTypeEntity* (*YBCTypeEntityProvider)(int, YBCPgOid); typedef void * SliceVector; typedef const void * ConstSliceVector; @@ -877,7 +878,8 @@ YBCStatus YBCPgUpdatePublicationTableList( YBCStatus YBCPgDestroyVirtualWalForCDC(); YBCStatus YBCPgGetCDCConsistentChanges(const char *stream_id, - YBCPgChangeRecordBatch **record_batch); + YBCPgChangeRecordBatch **record_batch, + YBCTypeEntityProvider type_entity_provider); YBCStatus YBCPgUpdateAndPersistLSN( const char* stream_id, YBCPgXLogRecPtr restart_lsn_hint, YBCPgXLogRecPtr confirmed_flush,