Skip to content

Commit

Permalink
[#19211][#19212][#19509] YSQL: Add support for create, view, and drop…
Browse files Browse the repository at this point in the history
… of replication slots

Summary:
Introduce support for creating, viewing, and dropping replication slots in YSQL. This is part of the project to support Publication/Replication slot API in YSQL (#18724).

There are two interfaces for the support for create and drop:
1. Functions:
    - `pg_create_logical_replication_slot`
    -  `pg_drop_replication_slot`
2. Walsender commands:
    - CREATE_REPLICATION_SLOT
    - DROP_REPLICATION_SLOT

Both create and drop statements go to yb-master directly. Most of the PG code isn't applicable to YSQL yet and hence it is skipped.

For viewing replication slots, we have a view `pg_replication_slots` which is backed by the function `pg_get_replication_slots`. The schema of the view has been modified by adding an extra yb-specific column `yb_stream_id` which is a text.

Limitations:
1. Only `yboutput` plugin is supported. It'll only be relevant once we add support for consuming replication slots but we are enforcing it from this diff onwards

Apart from the above, this diff fixes two issues:
1. #19509 - Cleanup of held locks in case of an `ereport(elevel >= ERROR)`. This diff fixes that by making sure that we call `LWLockReleaseAll` in `src/postgres/src/backend/access/transam/xact.c` in case of an error. Thanks to Timothy Elgersma.
2. Skipping cache refresh in case of an error in the execution of a replication command. `src/postgres/src/backend/tcop/postgres.c`. This is ok because we only cache DMLs and none of the replication commands are DMLs. We need to do that because the check `yb_is_dml_command` tries to parse the query to check whether it is a DML or not but it doesn't support replication commands. So any `ereport(elevel >= ERROR)` in the execution of a replication command would lead to a syntax error.

TODOs for future:
1. This diff creates a CDC stream with CDCRecordType as `CHANGE`. We need to extend the `pg_create_logical_replication_slot` and `CREATE_REPLICATION_SLOT` syntax to take the CDCRecordType. It'll be done in a future diff
2. DROP_REPLICATION_SLOT commands allows waiting for a slot to become inactive before dropping it. It is unsupported currently and will be done in a future diff
3. Temporary replication slots are unsupported. Will be added in future once we also support consumption via Walsender

Upgrade\Rollback safety:
These changes rely on sys-catalog changes done in yb-master. As a result, all the commands are disabled during upgrade using an autoflag yb_enable_replication_commands (LocalPersisted) and will only be enabled once the user has committed to the new version.

The autoflag was introduced during the Publication syntax support and is being reused here since these are both part of the same project: https://phorge.dev.yugabyte.com/D28721
Jira: DB-8008, DB-8009, DB-8305

Test Plan:
New unit test

```
./yb_build.sh --java-test 'org.yb.pgsql.TestPgReplicationSlot'
```

New Regress test
```
./yb_build.sh --java-test 'org.yb.pgsql.TestPgRegressReplicationSlot'
```

I've also updated most of the CDCSDK tests to now use the ReplicationSlot commands to create a CDCSDK stream instead of an RPC. Remaining tests will be updated in future diffs

Reviewers: dsrinivasan, skumar, asrinivasan, aagrawal

Reviewed By: dsrinivasan

Subscribers: ycdcxcluster, bogdan, ybase, yql

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D29194
  • Loading branch information
dr0pdb committed Oct 29, 2023
1 parent 0b05735 commit 7e22b28
Show file tree
Hide file tree
Showing 51 changed files with 1,329 additions and 165 deletions.
24 changes: 24 additions & 0 deletions java/yb-pgsql/src/test/java/org/yb/pgsql/ConnectionBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Map;
import java.util.Properties;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yb.minicluster.MiniYBCluster;
import org.yb.util.EnvAndSysPropertyUtil;

import com.yugabyte.PGProperty;

import static com.google.common.base.Preconditions.checkNotNull;

public class ConnectionBuilder implements Cloneable {
Expand Down Expand Up @@ -145,13 +148,34 @@ protected ConnectionBuilder clone() {
}
}

public Connection replicationConnect() throws Exception {
Properties props = new Properties();
// Ask the driver to assume that the PG version is 11.2 which is what YSQL is based on. Any
// value above 9.4.0 works here since the support for replication connection was added to PG
// in 9.4.0.
PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "110200");
PGProperty.REPLICATION.set(props, "database");

if (preferQueryMode != null && preferQueryMode != "simple") {
throw new RuntimeException("replication connection only supports simple query mode");
}
// https://github.com/pgjdbc/pgjdbc/issues/759
PGProperty.PREFER_QUERY_MODE.set(props, "simple");
return connect(props);
}

public Connection connect() throws Exception {
return connect(new Properties());
}

public Connection connect(Properties additionalProperties) throws Exception {
final InetSocketAddress postgresAddress = connectionEndpoint == ConnectionEndpoint.YSQL_CONN_MGR
? miniCluster.getYsqlConnMgrContactPoints().get(tserverIndex)
: miniCluster.getPostgresContactPoints().get(tserverIndex);
String url = String.format("jdbc:yugabytedb://%s:%d/%s", postgresAddress.getHostName(),
postgresAddress.getPort(), database);
Properties props = new Properties();
props.putAll(additionalProperties);
props.setProperty("user", user);
if (password != null) {
props.setProperty("password", password);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright (c) YugabyteDB, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//
package org.yb.pgsql;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.yb.YBTestRunner;

/**
* Runs the pg_regress replication_slot-related tests on YB code.
*/
@RunWith(value = YBTestRunner.class)
public class TestPgRegressReplicationSlot extends BasePgSQLTest {
@Override
public int getTestMethodTimeoutSec() {
return 1800;
}

@Test
public void testPgRegressReplicationSlot() throws Exception {
runPgRegressTest("yb_replication_slot_schedule");
}
}
127 changes: 127 additions & 0 deletions java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgReplicationSlot.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright (c) YugabyteDB, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//
package org.yb.pgsql;

import static org.yb.AssertionWrappers.assertTrue;
import static org.yb.AssertionWrappers.fail;

import java.sql.Connection;
import java.sql.Statement;

import org.apache.commons.lang3.StringUtils;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yb.YBTestRunner;

import com.yugabyte.PGConnection;
import com.yugabyte.replication.PGReplicationConnection;
import com.yugabyte.util.PSQLException;

@RunWith(value = YBTestRunner.class)
public class TestPgReplicationSlot extends BasePgSQLTest {
private static final Logger LOG = LoggerFactory.getLogger(TestPgReplicationSlot.class);

@Override
protected int getInitialNumTServers() {
return 3;
}

@Test
public void createAndDropFromDifferentTservers() throws Exception {
Connection conn1 = getConnectionBuilder().withTServer(0).connect();
Connection conn2 = getConnectionBuilder().withTServer(1).connect();

try (Statement statement = conn1.createStatement()) {
statement.execute("select pg_create_logical_replication_slot('test_slot', 'yboutput')");
}
try (Statement statement = conn2.createStatement()) {
statement.execute("select pg_drop_replication_slot('test_slot')");
}
try (Statement statement = conn1.createStatement()) {
statement.execute("select pg_create_logical_replication_slot('test_slot', 'yboutput')");
}
try (Statement statement = conn2.createStatement()) {
statement.execute("select pg_drop_replication_slot('test_slot')");
}
}

@Test
public void replicationConnectionCreateDrop() throws Exception {
Connection conn =
getConnectionBuilder().withTServer(0).replicationConnect();
PGReplicationConnection replConnection = conn.unwrap(PGConnection.class).getReplicationAPI();

replConnection.createReplicationSlot()
.logical()
.withSlotName("test_slot_repl_conn")
.withOutputPlugin("yboutput")
.make();
replConnection.dropReplicationSlot("test_slot_repl_conn");
}

@Test
public void replicationConnectionCreateTemporaryUnsupported() throws Exception {
Connection conn = getConnectionBuilder().withTServer(0).replicationConnect();
PGReplicationConnection replConnection = conn.unwrap(PGConnection.class).getReplicationAPI();

String expectedErrorMessage = "Temporary replication slot is not yet supported";

boolean exceptionThrown = false;
try {
replConnection.createReplicationSlot()
.logical()
.withSlotName("test_slot_repl_conn_temporary")
.withOutputPlugin("yboutput")
.withTemporaryOption()
.make();
} catch (PSQLException e) {
exceptionThrown = true;
if (StringUtils.containsIgnoreCase(e.getMessage(), expectedErrorMessage)) {
LOG.info("Expected exception", e);
} else {
fail(String.format("Unexpected Error Message. Got: '%s', Expected to contain: '%s'",
e.getMessage(), expectedErrorMessage));
}
}

assertTrue("Expected an exception but wasn't thrown", exceptionThrown);
}

@Test
public void replicationConnectionCreatePhysicalUnsupported() throws Exception {
Connection conn = getConnectionBuilder().withTServer(0).replicationConnect();
PGReplicationConnection replConnection = conn.unwrap(PGConnection.class).getReplicationAPI();

String expectedErrorMessage = "YSQL only supports logical replication slots";

boolean exceptionThrown = false;
try {
replConnection.createReplicationSlot()
.physical()
.withSlotName("test_slot_repl_conn_temporary")
.make();
} catch (PSQLException e) {
exceptionThrown = true;
if (StringUtils.containsIgnoreCase(e.getMessage(), expectedErrorMessage)) {
LOG.info("Expected exception", e);
} else {
fail(String.format("Unexpected Error Message. Got: '%s', Expected to contain: '%s'",
e.getMessage(), expectedErrorMessage));
}
}

assertTrue("Expected an exception but wasn't thrown", exceptionThrown);
}
}
16 changes: 7 additions & 9 deletions src/postgres/src/backend/access/transam/xact.c
Original file line number Diff line number Diff line change
Expand Up @@ -2691,15 +2691,13 @@ AbortTransaction(void)
AtAbort_Memory();
AtAbort_ResourceOwner();

if (YBIsPgLockingEnabled()) {
/*
* Release any LW locks we might be holding as quickly as possible.
* (Regular locks, however, must be held till we finish aborting.)
* Releasing LW locks is critical since we might try to grab them again
* while cleaning up!
*/
LWLockReleaseAll();
}
/*
* Release any LW locks we might be holding as quickly as possible.
* (Regular locks, however, must be held till we finish aborting.)
* Releasing LW locks is critical since we might try to grab them again
* while cleaning up!
*/
LWLockReleaseAll();

/* Clear wait information and command progress indicator */
pgstat_report_wait_end();
Expand Down
3 changes: 2 additions & 1 deletion src/postgres/src/backend/catalog/yb_system_views.sql
Original file line number Diff line number Diff line change
Expand Up @@ -847,7 +847,8 @@ CREATE VIEW pg_replication_slots AS
L.xmin,
L.catalog_xmin,
L.restart_lsn,
L.confirmed_flush_lsn
L.confirmed_flush_lsn,
L.yb_stream_id
FROM pg_get_replication_slots() AS L
LEFT JOIN pg_database D ON (L.datoid = D.oid);

Expand Down
47 changes: 47 additions & 0 deletions src/postgres/src/backend/commands/ybccmds.c
Original file line number Diff line number Diff line change
Expand Up @@ -1761,3 +1761,50 @@ YBCValidatePlacement(const char *placement_info)
{
HandleYBStatus(YBCPgValidatePlacement(placement_info));
}

/* ------------------------------------------------------------------------- */
/* Replication Slot Functions. */

void
YBCCreateReplicationSlot(const char *slot_name)
{
YBCPgStatement handle;

HandleYBStatus(YBCPgNewCreateReplicationSlot(slot_name,
MyDatabaseId,
&handle));

bool already_present = false;
HandleYBStatusIgnoreAlreadyPresent(YBCPgExecCreateReplicationSlot(handle),
&already_present);
if (already_present)
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("replication slot \"%s\" already exists",
slot_name)));
}

void
YBCListReplicationSlots(YBCReplicationSlotDescriptor **replication_slots,
size_t* numreplicationslots)
{
HandleYBStatus(
YBCPgListReplicationSlots(replication_slots, numreplicationslots));
}

void
YBCDropReplicationSlot(const char *slot_name)
{
YBCPgStatement handle;

HandleYBStatus(YBCPgNewDropReplicationSlot(slot_name,
&handle));

bool not_found = false;
HandleYBStatusIgnoreNotFound(YBCPgExecDropReplicationSlot(handle),
&not_found);
if (not_found)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("replication slot \"%s\" does not exist", slot_name)));
}
15 changes: 11 additions & 4 deletions src/postgres/src/backend/replication/logical/logical.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@

#include "utils/memutils.h"

/* YB includes. */
#include "pg_yb_utils.h"

/* data for errcontext callback */
typedef struct LogicalErrorCallbackState
{
Expand Down Expand Up @@ -84,10 +87,14 @@ CheckLogicalDecodingRequirements(void)
* needs the same check.
*/

if (wal_level < WAL_LEVEL_LOGICAL)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("logical decoding requires wal_level >= logical")));
/* wal_level is not applicable to YSQL. */
if (!IsYugaByteEnabled())
{
if (wal_level < WAL_LEVEL_LOGICAL)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("logical decoding requires wal_level >= logical")));
}

if (MyDatabaseId == InvalidOid)
ereport(ERROR,
Expand Down
28 changes: 28 additions & 0 deletions src/postgres/src/backend/replication/slot.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@
#include "storage/procarray.h"
#include "utils/builtins.h"

/* YB includes. */
#include "commands/ybccmds.h"
#include "pg_yb_utils.h"

/*
* Replication slot on-disk data structure.
*/
Expand Down Expand Up @@ -99,6 +103,8 @@ ReplicationSlot *MyReplicationSlot = NULL;
int max_replication_slots = 0; /* the maximum number of replication
* slots */

const char *YB_OUTPUT_PLUGIN = "yboutput";

static void ReplicationSlotDropAcquired(void);
static void ReplicationSlotDropPtr(ReplicationSlot *slot);

Expand Down Expand Up @@ -228,6 +234,17 @@ ReplicationSlotCreate(const char *name, bool db_specific,

ReplicationSlotValidateName(name, ERROR);

/*
* yb-master is the source of truth for replication slots. Skip the
* ReplicationSlotCtl related stuff as it isn't applicable till we support
* consuming replication slots via Walsender.
*/
if (IsYugaByteEnabled())
{
YBCCreateReplicationSlot(name);
return;
}

/*
* If some other backend ran this code concurrently with us, we'd likely
* both allocate the same slot, and that would be bad. We'd also be at
Expand Down Expand Up @@ -526,6 +543,17 @@ ReplicationSlotDrop(const char *name, bool nowait)
{
Assert(MyReplicationSlot == NULL);

/*
* yb-master is the source of truth for replication slots. Skip the
* ReplicationSlotCtl related stuff as it isn't applicable till we support
* consuming replication slots via Walsender.
*/
if (IsYugaByteEnabled())
{
YBCDropReplicationSlot(name);
return;
}

ReplicationSlotAcquire(name, nowait);

ReplicationSlotDropAcquired();
Expand Down
Loading

0 comments on commit 7e22b28

Please sign in to comment.