Skip to content

Commit

Permalink
[YSQL] #5232: Provide follower reads through YSQL
Browse files Browse the repository at this point in the history
Summary:
Currently we support consistent prefix reads (read from followers) for both YCQL
and YEDIS, but not for YSQL. This diff adds new session variable to specify if
read from followers should be enabled. The value for this variable is being sent as
part of the execution parameters.

When read from followers is enabled, this is translated to CONSISTENT_PREFIX
consistency level which is sent to the tablet RPC layer. In this layer, if
CONSISTENT_PREFIX consistency is specified, the closest replica that can serve
the read is selected regardless of whether or not it it's the leader for such tablet.

This diff also adds new metrics to count how the number of CONSISTENT_PREFIX
requests, and also the number of YSQL rows read as part of a consistent prefix
request.

Test Plan:
Set the `yb_read_from_followers`, execute a select statement and verify that in the rpc layer, the read has consistent_prefix consistency.
New unit tests

Also for the compatibility with isolation levels:
```ysqlsh (11.2-YB-2.3.3.0-b0)
Type "help" for help.

yugabyte=# SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL REPEATABLE READ;
SET
yugabyte=# SET yb_read_from_followers = true;
ERROR:  cannot enable yb_read_from_followers with the current transaction isolation mode
HINT:  Use READ UNCOMMITTED or READ COMMITTED to enable yb_read_from_followers.
yugabyte=# SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL READ COMMITTED;
SET
yugabyte=# SET yb_read_from_followers = true;
SET
yugabyte=# SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL REPEATABLE READ;
ERROR:  cannot use this transaction isolation level with yb_read_from_followers enabled
HINT:  Disable yb_read_from_followers to use REPEATABLE READ or SERIALIZABLE.
```

Reviewers: bogdan, amitanand, mihnea, zyu

Reviewed By: mihnea, zyu

Subscribers: bogdan, zyu, ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D9205
  • Loading branch information
hectorgcr committed Oct 10, 2020
1 parent 48018d6 commit 5ee5764
Show file tree
Hide file tree
Showing 34 changed files with 428 additions and 26 deletions.
23 changes: 23 additions & 0 deletions java/yb-pgsql/src/test/java/org/yb/pgsql/BasePgSQLTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,29 @@ protected AgregatedValue getMetric(String metricName) throws Exception {
return value;
}

protected Long getTserverMetricCountForTable(String metricName, String tableName)
throws Exception {
long count = 0;
for (JsonArray rawMetric : getRawTSMetric()) {
for (JsonElement elem : rawMetric.getAsJsonArray()) {
JsonObject obj = elem.getAsJsonObject();
if (obj.get("type").getAsString().equals("tablet") &&
obj.getAsJsonObject("attributes").get("table_name").getAsString().equals(tableName)) {
for (JsonElement subelem : obj.getAsJsonArray("metrics")) {
if (!subelem.isJsonObject()) {
continue;
}
JsonObject metric = subelem.getAsJsonObject();
if (metric.has("name") && metric.get("name").getAsString().equals(metricName)) {
count += metric.get("value").getAsLong();
}
}
}
}
}
return count;
}

protected long getMetricCounter(String metricName) throws Exception {
return getMetric(metricName).count;
}
Expand Down
206 changes: 206 additions & 0 deletions java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgSelect.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -325,6 +326,211 @@ private void verifyStatementPushdownMetric(Statement statement,
pushdown_expected ? 1 : 0, 1, true);
}

private Long getCountForTable(String metricName, String tableName) throws Exception {
return getTserverMetricCountForTable(metricName, tableName);
}

/*
* TODO: move this test to a different file. For now it makes sense for them to be here
* because they are related to the consistent prefix tests
*/
@Test
public void testSetIsolationLevelsWithReadFromFollowersSessionVariable() throws Exception {
try (Statement statement = connection.createStatement()) {
final String CANT_CHANGE_TXN_LEVEL =
"ERROR: cannot use this transaction isolation level with yb_read_from_followers enabled";
final String CANT_CHANGE_YB_READ_FROM_FOLLOWERS =
"ERROR: cannot enable yb_read_from_followers with the current transaction isolation mode";

// READ UNCOMMITTED with yb_read_from_followers enabled -> ok.
statement.execute(
"SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL READ UNCOMMITTED");
statement.execute("SET yb_read_from_followers = true");

// Reset session variable.
statement.execute("SET yb_read_from_followers = false");

// READ COMMITTED with yb_read_from_followers enabled -> ok.
statement.execute(
"SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL READ COMMITTED");
statement.execute("SET yb_read_from_followers = true");

// Reset session variable.
statement.execute("SET yb_read_from_followers = false");

// REPEATABLE READ with yb_read_from_followers enabled -> error.
statement.execute(
"SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL REPEATABLE READ");

runInvalidQuery(statement, "SET yb_read_from_followers = true",
CANT_CHANGE_YB_READ_FROM_FOLLOWERS);

// SERIALIZABLE with yb_read_from_followers enabled -> error.
statement.execute(
"SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL SERIALIZABLE");
runInvalidQuery(statement, "SET yb_read_from_followers = true",
CANT_CHANGE_YB_READ_FROM_FOLLOWERS);

// Reset the isolation level to the lowest possible.
statement.execute(
"SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL READ UNCOMMITTED");

statement.execute("SET yb_read_from_followers = true");

// yb_read_from_followers enabled with READ UNCOMMITTED -> ok.
statement.execute(
"SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL READ UNCOMMITTED");

// yb_read_from_followers enabled with READ COMMITTED -> ok.
statement.execute(
"SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL READ COMMITTED");

// yb_read_from_followers enabled with REPEATABLE READ -> error.
runInvalidQuery(statement,
"SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL REPEATABLE READ",
CANT_CHANGE_TXN_LEVEL);

// yb_read_from_followers enabled with SERIALIZABLE -> error.
runInvalidQuery(statement,
"SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL SERIALIZABLE",
CANT_CHANGE_TXN_LEVEL);

// Reset the isolation level to the lowest possible.
statement.execute(
"SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL READ UNCOMMITTED");

// yb_read_from_followers enabled with START TRANSACTION ISOLATION LEVEL READ UNCOMMITTED
// -> ok.
statement.execute("SET yb_read_from_followers = true");
statement.execute("START TRANSACTION ISOLATION LEVEL READ UNCOMMITTED");
statement.execute("ABORT");

// yb_read_from_followers enabled with START TRANSACTION ISOLATION LEVEL READ COMMITTED
// -> ok.
statement.execute("START TRANSACTION ISOLATION LEVEL READ COMMITTED");
statement.execute("ABORT");


// yb_read_from_followers enabled with START TRANSACTION ISOLATION LEVEL REPEATABLE READ
// -> error.
runInvalidQuery(statement, "START TRANSACTION ISOLATION LEVEL REPEATABLE READ",
CANT_CHANGE_TXN_LEVEL);

// yb_read_from_followers enabled with START TRANSACTION ISOLATION LEVEL SERIALIZABLE
// -> error.
runInvalidQuery(statement, "START TRANSACTION ISOLATION LEVEL SERIALIZABLE",
CANT_CHANGE_TXN_LEVEL);

// Reset session variable.
statement.execute("SET yb_read_from_followers = false");

// START TRANSACTION ISOLATION LEVEL READ UNCOMMITTED with yb_read_from_followers enabled
// -> ok.
statement.execute("START TRANSACTION ISOLATION LEVEL READ UNCOMMITTED");
statement.execute("SET yb_read_from_followers = true");
statement.execute("ABORT");

// Reset session variable.
statement.execute("SET yb_read_from_followers = false");

// START TRANSACTION ISOLATION LEVEL READ COMMITTED with yb_read_from_followers enabled
// -> ok.
statement.execute("START TRANSACTION ISOLATION LEVEL READ COMMITTED");
statement.execute("SET yb_read_from_followers = true");
statement.execute("ABORT");

// Reset session variable.
statement.execute("SET yb_read_from_followers = false");
// START TRANSACTION ISOLATION LEVEL REPEATABLE READ with yb_read_from_followers enabled
// -> error.
statement.execute("START TRANSACTION ISOLATION LEVEL REPEATABLE READ");
runInvalidQuery(statement, "SET yb_read_from_followers = true",
CANT_CHANGE_YB_READ_FROM_FOLLOWERS);
statement.execute("ABORT");

// Reset session variable.
statement.execute("SET yb_read_from_followers = false");
// START TRANSACTION ISOLATION LEVEL SERIALIZABLE with yb_read_from_followers enabled
// -> error.
statement.execute("START TRANSACTION ISOLATION LEVEL SERIALIZABLE");
runInvalidQuery(statement, "SET yb_read_from_followers = true",
CANT_CHANGE_YB_READ_FROM_FOLLOWERS);
statement.execute("ABORT");
}
}

@Test
public void testCountConsistentPrefix() throws Exception {
try (Statement statement = connection.createStatement()) {
statement.execute("CREATE TABLE consistentprefixcount(k int primary key)");
for (int i = 0; i < 100; i++) {
statement.execute(String.format("INSERT INTO consistentprefixcount(k) VALUES(%d)", i));
}

statement.execute(
"SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL READ COMMITTED");
statement.execute("SET yb_read_from_followers = true;");
assertOneRow(statement, "SELECT count(*) FROM consistentprefixcount", 100L);

long count = getCountForTable("consistent_prefix_read_requests", "consistentprefixcount");
assertEquals(count, 3); // 3 tablets, 3 consistent prefix requests.
}
}

@Test
public void testOrderedSelectConsistentPrefix() throws Exception {
List<Row> expected_rows = new ArrayList<>();
try (Statement statement = connection.createStatement()) {
statement.execute("CREATE TABLE consistentprefixorderedselect(k int primary key)");
for (int i = 0; i < 5000; i++) {
statement.execute(String.format(
"INSERT INTO consistentprefixorderedselect(k) VALUES(%d)", i));
expected_rows.add(new Row(i));
}

statement.execute(
"SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL READ COMMITTED");
statement.execute("SET yb_read_from_followers = true;");
assertRowList(statement,
"SELECT * FROM consistentprefixorderedselect ORDER BY k", expected_rows);

long count = getCountForTable("consistent_prefix_read_requests",
"consistentprefixorderedselect");
// Max number of records per request is 1024, so we will need to issue two requests per
// tablet.
assertEquals(6, count);

count = getCountForTable("pgsql_consistent_prefix_read_rows",
"consistentprefixorderedselect");
assertEquals(5000, count);
}
}

@Test
public void testSelectConsistentPrefix() throws Exception {
List<Row> expected_rows = new ArrayList<>();
try (Statement statement = connection.createStatement()) {
statement.execute("CREATE TABLE consistentprefixselect(k int primary key)");
for (int i = 0; i < 7000; i++) {
statement.execute(String.format("INSERT INTO consistentprefixselect(k) VALUES(%d)", i));
expected_rows.add(new Row(i));
}

statement.execute(
"SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL READ COMMITTED");
statement.execute("SET yb_read_from_followers = true;");
statement.executeQuery("SELECT * from consistentprefixselect");

long count = getCountForTable("consistent_prefix_read_requests", "consistentprefixselect");
// Max number of records per request is 1024, so we will need to issue three requests per
// tablet.
assertEquals(9, count);

count = getCountForTable("pgsql_consistent_prefix_read_rows", "consistentprefixselect");
assertEquals(7000, count);
}
}

@Test
public void testAggregatePushdowns() throws Exception {
try (Statement statement = connection.createStatement()) {
Expand Down
14 changes: 11 additions & 3 deletions src/postgres/src/backend/access/ybc/ybcin.c
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ bool
ybcinproperty(Oid index_oid, int attno, IndexAMProperty prop, const char *propname,
bool *res, bool *isnull)
{
return false;
return false;
}

bool
Expand All @@ -270,7 +270,7 @@ ybcinbeginscan(Relation rel, int nkeys, int norderbys)
return scan;
}

void
void
ybcinrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys, ScanKey orderbys, int norderbys)
{
if (scan->opaque)
Expand Down Expand Up @@ -303,6 +303,11 @@ ybcingettuple(IndexScanDesc scan, ScanDirection dir)

YbScanDesc ybscan = (YbScanDesc) scan->opaque;
ybscan->exec_params = scan->yb_exec_params;
if (!ybscan->exec_params) {
ereport(DEBUG1, (errmsg("null exec_params")));
} else {
ybscan->exec_params->read_from_followers = YBReadFromFollowersEnabled();
}
Assert(PointerIsValid(ybscan));

/*
Expand All @@ -321,6 +326,9 @@ ybcingettuple(IndexScanDesc scan, ScanDirection dir)
}
else
{
if (ybscan->exec_params && ybscan->exec_params->read_from_followers) {
ereport(DEBUG2, (errmsg("ybcingettuple read from followers")));
}
HeapTuple tuple = ybc_getnext_heaptuple(ybscan, is_forward_scan, &scan->xs_recheck);
if (tuple)
{
Expand All @@ -333,7 +341,7 @@ ybcingettuple(IndexScanDesc scan, ScanDirection dir)
return scan->xs_ctup.t_ybctid != 0;
}

void
void
ybcinendscan(IndexScanDesc scan)
{
YbScanDesc ybscan = (YbScanDesc)scan->opaque;
Expand Down
56 changes: 56 additions & 0 deletions src/postgres/src/backend/commands/variable.c
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,14 @@ check_XactIsoLevel(char **newval, void **extra, GucSource source)
}
}

if ((newXactIsoLevel == XACT_SERIALIZABLE || newXactIsoLevel == XACT_REPEATABLE_READ) &&
YBReadFromFollowersEnabled()) {
GUC_check_errcode(ERRCODE_FEATURE_NOT_SUPPORTED);
GUC_check_errmsg("cannot use this transaction isolation level with yb_read_from_followers enabled");
GUC_check_errhint("Disable yb_read_from_followers to use REPEATABLE READ or SERIALIZABLE.");
return false;
}

*extra = malloc(sizeof(int));
if (!*extra)
return false;
Expand Down Expand Up @@ -623,6 +631,54 @@ show_XactIsoLevel(void)
}
}

/* Check if the isolation level is compatible with read from followers */
static inline bool
iso_level_compatible(int iso_level) {
switch (iso_level)
{
case XACT_READ_UNCOMMITTED:
return true;
case XACT_READ_COMMITTED:
return true;
case XACT_REPEATABLE_READ:
return false;
case XACT_SERIALIZABLE:
return false;
default:
return false;
}
}

bool
check_follower_reads(bool *newval, void **extra, GucSource source) {
if (*newval == false) {
return true;
}

bool ret = iso_level_compatible(XactIsoLevel);
if (!ret) {
GUC_check_errcode(ERRCODE_FEATURE_NOT_SUPPORTED);
GUC_check_errmsg("cannot enable yb_read_from_followers with the current transaction isolation mode");
GUC_check_errhint("Use READ UNCOMMITTED or READ COMMITTED to enable yb_read_from_followers.");
}
return ret;
}

bool
check_default_XactIsoLevel(int *newval, void **extra, GucSource source) {
if (!YBReadFromFollowersEnabled()) {
return true;
}

bool ret = iso_level_compatible(*newval);
if (!ret) {
GUC_check_errcode(ERRCODE_FEATURE_NOT_SUPPORTED);
GUC_check_errmsg("cannot use this transaction isolation level with yb_read_from_followers enabled");
GUC_check_errhint("Disable yb_read_from_followers to use REPEATABLE READ or SERIALIZABLE.");
}
return ret;
}

/*
* SET TRANSACTION [NOT] DEFERRABLE
*/
Expand Down
1 change: 1 addition & 0 deletions src/postgres/src/backend/executor/execUtils.c
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ CreateExecutorState(void)
estate->yb_exec_params.limit_use_default = true;
estate->yb_exec_params.rowmark = -1;
estate->yb_can_batch_updates = false;
estate->yb_exec_params.read_from_followers = false;

return estate;
}
Expand Down
1 change: 1 addition & 0 deletions src/postgres/src/backend/executor/nodeIndexonlyscan.c
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ IndexOnlyNext(IndexOnlyScanState *node)

// TODO(hector) Add row marks for INDEX_ONLY_SCAN
scandesc->yb_exec_params->rowmark = -1;
scandesc->yb_exec_params->read_from_followers = YBReadFromFollowersEnabled();
}

/*
Expand Down
4 changes: 4 additions & 0 deletions src/postgres/src/backend/executor/ybc_fdw.c
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,10 @@ ybcBeginForeignScan(ForeignScanState *node, int eflags)
ybc_state->exec_params = &estate->yb_exec_params;

ybc_state->exec_params->rowmark = -1;
ybc_state->exec_params->read_from_followers = YBReadFromFollowersEnabled();
if (YBReadFromFollowersEnabled()) {
ereport(DEBUG2, (errmsg("Doing read from followers")));
}
ListCell *l;
foreach(l, estate->es_rowMarks) {
ExecRowMark *erm = (ExecRowMark *) lfirst(l);
Expand Down
Loading

0 comments on commit 5ee5764

Please sign in to comment.