Skip to content

Commit

Permalink
[#23013] XClusterDDLRepl: Link producer table to consumer table by ta…
Browse files Browse the repository at this point in the history
…ble id

Summary:
Currently we use table names to match tables when setting up replication, but for DDL replication
this can cause some issues. Eg if the table is created and then renamed, then we cannot match the
tables. Alternatively if a table is created, dropped then recreated, then the producer would have
two tables with the same name (since the first one is just hidden).

Fixing this to capture the table id of all created relations and storing that in the json in
ddl_queue. The ddl_queue handler then updates the tserver's xcluster_context with this information,
so when the next PgCreateTable requests go through, they can check this map and add the producer
table id to the xcluster_source_table_id field. This is then picked up by the
add_table_to_xcluster_target task and used to perform the replication set up.

**Upgrade/Rollback safety:**
Only adding in new fields, which are guarded behind ddl replication flags
Jira: DB-11942

Test Plan:
```
ybd --cxx-test xcluster_ddl_replication-test --gtest_filter "XClusterDDLReplicationTest.DuplicateTableNames"
```

Reviewers: xCluster, hsunder

Reviewed By: hsunder

Subscribers: yql, ybase

Differential Revision: https://phorge.dev.yugabyte.com/D35874
  • Loading branch information
hulien22 committed Jul 2, 2024
1 parent 471c7c4 commit 8142fdc
Show file tree
Hide file tree
Showing 40 changed files with 549 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ SET yb_xcluster_ddl_replication.replication_role = BIDIRECTIONAL;
-- Verify that non-colocated table is captured.
CREATE TABLE non_coloc_foo(i int PRIMARY KEY) WITH (COLOCATION = false);
SELECT yb_data FROM yb_xcluster_ddl_replication.ddl_queue ORDER BY start_time;
yb_data
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
{"user": "yugabyte", "query": "CREATE TABLE non_coloc_foo(i int PRIMARY KEY) WITH (COLOCATION = false);", "schema": "public", "version": 1, "command_tag": "CREATE TABLE"}
yb_data
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
{"user": "yugabyte", "query": "CREATE TABLE non_coloc_foo(i int PRIMARY KEY) WITH (COLOCATION = false);", "schema": "public", "version": 1, "command_tag": "CREATE TABLE", "new_rel_map": [{"rel_name": "non_coloc_foo", "relfile_oid": 16414}]}
(1 row)

SELECT * FROM yb_xcluster_ddl_replication.replicated_ddls ORDER BY start_time;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ SET ROLE new_role;
CREATE INDEX foo_idx_include ON foo(lower(b)) INCLUDE (a) SPLIT INTO 2 TABLETS;
SET ROLE NONE;
SELECT yb_data FROM yb_xcluster_ddl_replication.ddl_queue ORDER BY start_time;
yb_data
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
{"user": "yugabyte", "query": "CREATE TABLE foo(i int PRIMARY KEY, a int, b text, c int);", "schema": "create_index", "version": 1, "command_tag": "CREATE TABLE"}
{"user": "yugabyte", "query": "CREATE INDEX foo_idx_simple ON foo(a);", "schema": "create_index", "version": 1, "command_tag": "CREATE INDEX"}
{"user": "yugabyte", "query": "CREATE UNIQUE INDEX foo_idx_unique ON foo(b);", "schema": "create_index", "version": 1, "command_tag": "CREATE INDEX"}
{"user": "yugabyte", "query": "CREATE INDEX foo_idx_filtered ON foo(c ASC, a) WHERE a > c;", "schema": "create_index", "version": 1, "command_tag": "CREATE INDEX"}
{"user": "new_role", "query": "CREATE INDEX foo_idx_include ON foo(lower(b)) INCLUDE (a) SPLIT INTO 2 TABLETS;", "schema": "create_index", "version": 1, "command_tag": "CREATE INDEX"}
yb_data
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
{"user": "yugabyte", "query": "CREATE TABLE foo(i int PRIMARY KEY, a int, b text, c int);", "schema": "create_index", "version": 1, "command_tag": "CREATE TABLE", "new_rel_map": [{"rel_name": "foo", "relfile_oid": 16437}]}
{"user": "yugabyte", "query": "CREATE INDEX foo_idx_simple ON foo(a);", "schema": "create_index", "version": 1, "command_tag": "CREATE INDEX", "new_rel_map": [{"rel_name": "foo_idx_simple", "relfile_oid": 16442}]}
{"user": "yugabyte", "query": "CREATE UNIQUE INDEX foo_idx_unique ON foo(b);", "schema": "create_index", "version": 1, "command_tag": "CREATE INDEX", "new_rel_map": [{"rel_name": "foo_idx_unique", "relfile_oid": 16443}]}
{"user": "yugabyte", "query": "CREATE INDEX foo_idx_filtered ON foo(c ASC, a) WHERE a > c;", "schema": "create_index", "version": 1, "command_tag": "CREATE INDEX", "new_rel_map": [{"rel_name": "foo_idx_filtered", "relfile_oid": 16444}]}
{"user": "new_role", "query": "CREATE INDEX foo_idx_include ON foo(lower(b)) INCLUDE (a) SPLIT INTO 2 TABLETS;", "schema": "create_index", "version": 1, "command_tag": "CREATE INDEX", "new_rel_map": [{"rel_name": "foo_idx_include", "relfile_oid": 16445}]}
(5 rows)

SELECT * FROM yb_xcluster_ddl_replication.replicated_ddls ORDER BY start_time;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ CREATE TABLE extra_foo(i int PRIMARY KEY) WITH (COLOCATION = false) SPLIT INTO 1
-- Verify that info for unique constraint indexes are also captured.
CREATE TABLE unique_foo(i int PRIMARY KEY, u text UNIQUE);
SELECT yb_data FROM yb_xcluster_ddl_replication.ddl_queue ORDER BY start_time;
yb_data
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
{"user": "yugabyte", "query": "CREATE TABLE foo(i int PRIMARY KEY);", "schema": "public", "version": 1, "command_tag": "CREATE TABLE"}
yb_data
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
{"user": "yugabyte", "query": "CREATE TABLE foo(i int PRIMARY KEY);", "schema": "public", "version": 1, "command_tag": "CREATE TABLE", "new_rel_map": [{"rel_name": "foo", "relfile_oid": 16408}]}
{"user": "yugabyte", "query": "CREATE TABLE manual_foo(i int PRIMARY KEY);", "schema": "public", "version": 1, "command_tag": "CREATE TABLE", "manual_replication": true}
{"user": "yugabyte", "query": "CREATE TABLE extra_foo(i int PRIMARY KEY) WITH (COLOCATION = false) SPLIT INTO 1 TABLETS;", "schema": "public", "version": 1, "command_tag": "CREATE TABLE"}
{"user": "yugabyte", "query": "CREATE TABLE unique_foo(i int PRIMARY KEY, u text UNIQUE);", "schema": "public", "version": 1, "command_tag": "CREATE TABLE"}
{"user": "yugabyte", "query": "CREATE TABLE extra_foo(i int PRIMARY KEY) WITH (COLOCATION = false) SPLIT INTO 1 TABLETS;", "schema": "public", "version": 1, "command_tag": "CREATE TABLE", "new_rel_map": [{"rel_name": "extra_foo", "relfile_oid": 16418}]}
{"user": "yugabyte", "query": "CREATE TABLE unique_foo(i int PRIMARY KEY, u text UNIQUE);", "schema": "public", "version": 1, "command_tag": "CREATE TABLE", "new_rel_map": [{"rel_name": "unique_foo", "relfile_oid": 16423}, {"rel_name": "unique_foo_u_key", "relfile_oid": 16428}]}
(4 rows)

SELECT * FROM yb_xcluster_ddl_replication.replicated_ddls ORDER BY start_time;
Expand Down
62 changes: 33 additions & 29 deletions src/postgres/yb-extensions/yb_xcluster_ddl_replication/json_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,46 +16,50 @@

#include "utils/fmgrprotos.h"

void
AddJsonKey(JsonbParseState *state, char *key_buf)
{
JsonbValue key;
key.type = jbvString;
key.val.string.len = strlen(key_buf);
key.val.string.val = pstrdup(key_buf);

(void) pushJsonbValue(&state, WJB_KEY, &key);
}

void
AddNumericJsonEntry(JsonbParseState *state, char *key_buf, int64 val)
{
JsonbPair pair;
pair.key.type = jbvString;
pair.value.type = jbvNumeric;
pair.key.val.string.len = strlen(key_buf);
pair.key.val.string.val = pstrdup(key_buf);
pair.value.val.numeric =
DatumGetNumeric(DirectFunctionCall1(int8_numeric, val));

(void) pushJsonbValue(&state, WJB_KEY, &pair.key);
(void) pushJsonbValue(&state, WJB_VALUE, &pair.value);
AddJsonKey(state, key_buf);

JsonbValue value;
value.type = jbvNumeric;
value.val.numeric = DatumGetNumeric(DirectFunctionCall1(int8_numeric, val));

(void) pushJsonbValue(&state, WJB_VALUE, &value);
}

void
AddBoolJsonEntry(JsonbParseState *state, char *key_buf, bool val)
{
JsonbPair pair;
pair.key.type = jbvString;
pair.value.type = jbvBool;
pair.key.val.string.len = strlen(key_buf);
pair.key.val.string.val = pstrdup(key_buf);
pair.value.val.boolean = val;

(void) pushJsonbValue(&state, WJB_KEY, &pair.key);
(void) pushJsonbValue(&state, WJB_VALUE, &pair.value);
AddJsonKey(state, key_buf);

JsonbValue value;
value.type = jbvBool;
value.val.boolean = val;

(void) pushJsonbValue(&state, WJB_VALUE, &value);
}

void
AddStringJsonEntry(JsonbParseState *state, char *key_buf, const char *val)
{
JsonbPair pair;
pair.key.type = jbvString;
pair.value.type = jbvString;
pair.key.val.string.len = strlen(key_buf);
pair.key.val.string.val = pstrdup(key_buf);
pair.value.val.string.len = strlen(val);
pair.value.val.string.val = pstrdup(val);

(void) pushJsonbValue(&state, WJB_KEY, &pair.key);
(void) pushJsonbValue(&state, WJB_VALUE, &pair.value);
AddJsonKey(state, key_buf);

JsonbValue value;
value.type = jbvString;
value.val.string.len = strlen(val);
value.val.string.val = pstrdup(val);

(void) pushJsonbValue(&state, WJB_VALUE, &value);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "postgres.h"
#include "utils/jsonb.h"

void AddJsonKey(JsonbParseState *state, char *key_buf);
void AddNumericJsonEntry(JsonbParseState *state, char *key_buf, int64 val);
void AddStringJsonEntry(JsonbParseState *state, char *key_buf, const char *val);
void AddBoolJsonEntry(JsonbParseState *state, char *key_buf, bool val);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,26 @@
#include "source_ddl_end_handler.h"

#include "executor/spi.h"
#include "json_util.h"
#include "lib/stringinfo.h"

#include "extension_util.h"
#include "pg_yb_utils.h"
#include "utils/jsonb.h"
#include "utils/palloc.h"
#include "utils/rel.h"

#define OBJID_COLUMN_ID 1
#define COMMAND_TAG_COLUMN_ID 2

typedef struct NewRelMapEntry
{
Oid relfile_oid;
char *rel_name;
} NewRelMapEntry;

bool
ShouldReplicateCreateRelation(Oid rel_oid)
ShouldReplicateCreateRelation(Oid rel_oid, List **new_rel_list)
{
Relation rel = RelationIdGetRelation(rel_oid);
// Ignore temporary tables and primary indexes (same as main table).
Expand All @@ -45,6 +55,13 @@ ShouldReplicateCreateRelation(Oid rel_oid)
"yb_xcluster_ddl_replication\n%s",
kManualReplicationErrorMsg)));

// Add the new relation to the list of relations to replicate.
NewRelMapEntry *new_rel_entry = palloc(sizeof(struct NewRelMapEntry));
new_rel_entry->relfile_oid = YbGetRelfileNodeId(rel);
new_rel_entry->rel_name = pstrdup(RelationGetRelationName(rel));

*new_rel_list = lappend(*new_rel_list, new_rel_entry);

return true;
}

Expand All @@ -59,27 +76,29 @@ ProcessSourceEventTriggerDDLCommands(JsonbParseState *state)
if (exec_res != SPI_OK_SELECT)
elog(ERROR, "SPI_exec failed (error %d): %s", exec_res, query_buf.data);

TupleDesc spiTupDesc = SPI_tuptable->tupdesc;
TupleDesc spi_tup_desc = SPI_tuptable->tupdesc;

// As long as there is at least one command that needs to be replicated, we
// will set this to true and replicate the entire query string.
List *new_rel_list = NIL;
bool should_replicate_ddl = false;
for (int row = 0; row < SPI_processed; row++)
{
HeapTuple spiTuple = SPI_tuptable->vals[row];
HeapTuple spi_tuple = SPI_tuptable->vals[row];
bool is_null;
Oid objid = DatumGetObjectId(
SPI_getbinval(spiTuple, spiTupDesc, OBJID_COLUMN_ID, &is_null));
SPI_getbinval(spi_tuple, spi_tup_desc, OBJID_COLUMN_ID, &is_null));
if (is_null)
elog(ERROR, "Found NULL value when parsing objid");

const char *command_tag =
SPI_getvalue(spiTuple, spiTupDesc, COMMAND_TAG_COLUMN_ID);
SPI_getvalue(spi_tuple, spi_tup_desc, COMMAND_TAG_COLUMN_ID);

if (strncmp(command_tag, "CREATE TABLE", 12) == 0 ||
strncmp(command_tag, "CREATE INDEX", 12) == 0)
{
should_replicate_ddl |= ShouldReplicateCreateRelation(objid);
should_replicate_ddl |=
ShouldReplicateCreateRelation(objid, &new_rel_list);
}
else
{
Expand All @@ -88,5 +107,28 @@ ProcessSourceEventTriggerDDLCommands(JsonbParseState *state)
}
}

if (new_rel_list)
{
// Add the new_rel_map to the JSON output.
AddJsonKey(state, "new_rel_map");
(void) pushJsonbValue(&state, WJB_BEGIN_ARRAY, NULL);

ListCell *l;
foreach (l, new_rel_list)
{
NewRelMapEntry *entry = (NewRelMapEntry *) lfirst(l);

(void) pushJsonbValue(&state, WJB_BEGIN_OBJECT, NULL);
AddNumericJsonEntry(state, "relfile_oid", entry->relfile_oid);
AddStringJsonEntry(state, "rel_name", entry->rel_name);
(void) pushJsonbValue(&state, WJB_END_OBJECT, NULL);

pfree(entry->rel_name);
pfree(entry);
}

(void) pushJsonbValue(&state, WJB_END_ARRAY, NULL);
}

return should_replicate_ddl;
}
9 changes: 9 additions & 0 deletions src/yb/client/table_creator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,11 @@ YBTableCreator& YBTableCreator::is_truncate(bool is_truncate) {
return *this;
}

YBTableCreator& YBTableCreator::xcluster_source_table_id(const TableId& source_table_id) {
xcluster_source_table_id_ = source_table_id;
return *this;
}

YBTableCreator& YBTableCreator::schema(const YBSchema* schema) {
schema_ = schema;
return *this;
Expand Down Expand Up @@ -315,6 +320,10 @@ Status YBTableCreator::Create() {
req.set_is_truncate(*is_truncate_);
}

if (!xcluster_source_table_id_.empty()) {
req.set_xcluster_source_table_id(xcluster_source_table_id_);
}

// Note that the check that the sum of min_num_replicas for each placement block being less or
// equal than the overall placement info num_replicas is done on the master side and an error is
// naturally returned if you try to create a table and the numbers mismatch. As such, it is the
Expand Down
5 changes: 5 additions & 0 deletions src/yb/client/table_creator.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ class YBTableCreator {

YBTableCreator& is_truncate(bool is_truncate);

YBTableCreator& xcluster_source_table_id(const TableId& source_table_id);

// Sets the schema with which to create the table. Must remain valid for
// the lifetime of the builder. Required.
YBTableCreator& schema(const YBSchema* schema);
Expand Down Expand Up @@ -238,6 +240,9 @@ class YBTableCreator {
// Set to true when the table is being re-written as part of a TRUNCATE operation.
boost::optional<bool> is_truncate_;

// Set by DDL Replication to link the table to the original table in the source cluster.
TableId xcluster_source_table_id_;

const TransactionMetadata* txn_ = nullptr;

DISALLOW_COPY_AND_ASSIGN(YBTableCreator);
Expand Down
4 changes: 4 additions & 0 deletions src/yb/common/pg_types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,8 @@ std::string PgObjectId::ToString() const {
return YB_STRUCT_TO_STRING(database_oid, object_oid);
}

std::string YsqlFullTableName::ToString() const {
return YB_STRUCT_TO_STRING(namespace_name, schema_name, table_name);
}

} // namespace yb
29 changes: 29 additions & 0 deletions src/yb/common/pg_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <boost/functional/hash/hash.hpp>

#include "yb/common/entity_ids.h"
#include "yb/common/schema.h"

namespace yb {

Expand Down Expand Up @@ -98,4 +99,32 @@ inline size_t hash_value(const PgObjectId& id) {
return value;
}

// A struct for complete PG table names.
struct YsqlFullTableName {
NamespaceName namespace_name;
PgSchemaName schema_name;
TableName table_name;

bool operator==(const YsqlFullTableName& other) const {
return namespace_name == other.namespace_name && schema_name == other.schema_name &&
table_name == other.table_name;
}

std::string ToString() const;

struct Hash {
std::size_t operator()(const YsqlFullTableName& p) const noexcept;
};
};

using YsqlFullTableNameHash = boost::hash<YsqlFullTableName>;

inline size_t hash_value(const YsqlFullTableName& table) {
size_t value = 0;
boost::hash_combine(value, table.namespace_name);
boost::hash_combine(value, table.schema_name);
boost::hash_combine(value, table.table_name);
return value;
}

} // namespace yb
Loading

0 comments on commit 8142fdc

Please sign in to comment.