Skip to content

Commit

Permalink
[#10696] YSQL: Avoid reading sys catalog from followers
Browse files Browse the repository at this point in the history
Summary:
YSQL uses dedicated session to read data from system catalog. This session should not allow
reading from followers because catalog requires freshest possible data.

Test Plan:
Jenkins

New unit test was introduced

```
./yb_build.sh --java-test 'org.yb.pgsql.TestPgFollowerReads#testPgSysCatalogNoFollowerReads'
```

Reviewers: sergei, amitanand

Reviewed By: amitanand

Subscribers: yql, bogdan

Differential Revision: https://phabricator.dev.yugabyte.com/D22941
  • Loading branch information
d-uspenskiy committed Feb 17, 2023
1 parent 65aadd1 commit d050f04
Show file tree
Hide file tree
Showing 13 changed files with 81 additions and 70 deletions.
67 changes: 50 additions & 17 deletions java/yb-pgsql/src/test/java/org/yb/pgsql/BasePgSQLTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.io.File;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -603,8 +604,7 @@ protected AggregatedValue getStatementStat(String statName) throws Exception {
ts.getLocalhostIP(),
ts.getPgsqlWebPort()));
Scanner scanner = new Scanner(url.openConnection().getInputStream());
JsonParser parser = new JsonParser();
JsonElement tree = parser.parse(scanner.useDelimiter("\\A").next());
JsonElement tree = JsonParser.parseString(scanner.useDelimiter("\\A").next());
JsonObject obj = tree.getAsJsonObject();
YSQLStat ysqlStat = new Metrics(obj, true).getYSQLStat(statName);
if (ysqlStat != null) {
Expand Down Expand Up @@ -660,30 +660,51 @@ protected void verifyStatementStatWithReset(Statement stmt, String sql, String s
verifyStatementStats(stmt, sql, statName, numLoopsAfterReset, oldValue);
}

private JsonArray[] getRawMetric(
Function<MiniYBDaemon, Integer> portFetcher) throws Exception {
private URL[] getMetricSources(
Collection<MiniYBDaemon> servers, Function<MiniYBDaemon, Integer> portFetcher)
throws MalformedURLException {
URL[] result = new URL[servers.size()];
int index = 0;
for (MiniYBDaemon s : servers) {
result[index++] = new URL(String.format(
"http://%s:%d/metrics", s.getLocalhostIP(), portFetcher.apply(s)));
}
return result;
}

protected URL[] getTSMetricSources() throws MalformedURLException {
return getMetricSources(miniCluster.getTabletServers().values(), (s) -> s.getWebPort());
}

protected URL[] getYSQLMetricSources() throws MalformedURLException {
return getMetricSources(miniCluster.getTabletServers().values(), (s) -> s.getPgsqlWebPort());
}

protected URL[] getMasterMetricSources() throws MalformedURLException {
return getMetricSources(miniCluster.getMasters().values(), (s) -> s.getWebPort());
}

private JsonArray[] getRawMetric(URL[] sources) throws Exception {
Collection<MiniYBDaemon> servers = miniCluster.getTabletServers().values();
JsonArray[] result = new JsonArray[servers.size()];
int index = 0;
for (MiniYBDaemon ts : servers) {
URLConnection connection = new URL(String.format("http://%s:%d/metrics",
ts.getLocalhostIP(),
portFetcher.apply(ts))).openConnection();
for (URL url : sources) {
URLConnection connection = url.openConnection();
connection.setUseCaches(false);
Scanner scanner = new Scanner(connection.getInputStream());
result[index++] =
new JsonParser().parse(scanner.useDelimiter("\\A").next()).getAsJsonArray();
JsonParser.parseString(scanner.useDelimiter("\\A").next()).getAsJsonArray();
scanner.close();
}
return result;
}

protected JsonArray[] getRawTSMetric() throws Exception {
return getRawMetric((ts) -> ts.getWebPort());
return getRawMetric(getTSMetricSources());
}

protected JsonArray[] getRawYSQLMetric() throws Exception {
return getRawMetric((ts) -> ts.getPgsqlWebPort());
return getRawMetric(getYSQLMetricSources());
}

protected AggregatedValue getMetric(String metricName) throws Exception {
Expand All @@ -700,10 +721,10 @@ protected AggregatedValue getMetric(String metricName) throws Exception {
return value;
}

protected Long getTserverMetricCountForTable(String metricName, String tableName)
throws Exception {
protected long getMetricCountForTable(
URL[] sources, String metricName, String tableName) throws Exception {
long count = 0;
for (JsonArray rawMetric : getRawTSMetric()) {
for (JsonArray rawMetric : getRawMetric(sources)) {
for (JsonElement elem : rawMetric.getAsJsonArray()) {
JsonObject obj = elem.getAsJsonObject();
if (obj.get("type").getAsString().equals("tablet") &&
Expand All @@ -723,13 +744,17 @@ protected Long getTserverMetricCountForTable(String metricName, String tableName
return count;
}

protected AggregatedValue getTServerMetric(String metricName) throws Exception {
protected long getTserverMetricCountForTable(
String metricName, String tableName) throws Exception {
return getMetricCountForTable(getTSMetricSources(), metricName, tableName);
}

protected AggregatedValue getServerMetric(URL[] sources, String metricName) throws Exception {
AggregatedValue value = new AggregatedValue();
for (JsonArray rawMetric : getRawTSMetric()) {
for (JsonArray rawMetric : getRawMetric(sources)) {
for (JsonElement elem : rawMetric.getAsJsonArray()) {
JsonObject obj = elem.getAsJsonObject();
if (obj.get("type").getAsString().equals("server")) {
assertEquals(obj.get("id").getAsString(), "yb.tabletserver");
Metrics.Histogram histogram = new Metrics(obj).getHistogram(metricName);
value.count += histogram.totalCount;
value.value += histogram.totalSum;
Expand All @@ -739,6 +764,14 @@ protected AggregatedValue getTServerMetric(String metricName) throws Exception {
return value;
}

protected AggregatedValue getReadRPCMetric(URL[] sources) throws Exception {
return getServerMetric(sources, "handler_latency_yb_tserver_TabletServerService_Read");
}

protected AggregatedValue getTServerMetric(String metricName) throws Exception {
return getServerMetric(getTSMetricSources(), metricName);
}

protected List<String> getTabletsForTable(
String database, String tableName) throws Exception {
try {
Expand Down
34 changes: 23 additions & 11 deletions java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgFollowerReads.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,17 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.yb.minicluster.RocksDBMetrics;

import org.yb.util.BuildTypeUtil;
import org.yb.YBTestRunner;
import org.yb.util.RegexMatcher;
import org.yb.YBTestRunner;

import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

import static org.yb.AssertionWrappers.*;

Expand Down Expand Up @@ -154,7 +144,7 @@ public void testSetIsolationLevelsWithReadFromFollowersSessionVariable() throws
statement.execute(
"SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL REPEATABLE READ");

// yb_read_from_followers enabled with SERIALIZABL
// yb_read_from_followers enabled with SERIALIZABLE
statement.execute(
"SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL SERIALIZABLE");

Expand Down Expand Up @@ -458,4 +448,26 @@ public void testOrderedSelectConsistentPrefix() throws Exception {
public void testSelectConsistentPrefix() throws Exception {
testConsistentPrefix(7000, /* use_ordered_by */ false, /* get_count */ false);
}

// The test checks that follower reads are not used if sys catalog reads are to be performed.
@Test
public void testPgSysCatalogNoFollowerReads() throws Exception {
try (Statement stmt = connection.createStatement()) {
stmt.execute("SET yb_follower_read_staleness_ms = " + (2 * kMaxClockSkewMs + 1));
stmt.execute("SET yb_read_from_followers = true");
stmt.execute("BEGIN TRANSACTION READ ONLY");
long startReadRPCCount = getMasterReadRPCCount();
stmt.execute("SELECT EXTRACT(month FROM NOW())");
long endReadRPCCount = getMasterReadRPCCount();
stmt.execute("COMMIT");
assertGreaterThan(endReadRPCCount, startReadRPCCount);
long sysCatalogFolowerReads = getMetricCountForTable(
getMasterMetricSources(), "consistent_prefix_read_requests", "sys.catalog");
assertEquals(0L, sysCatalogFolowerReads);
}
}

private long getMasterReadRPCCount() throws Exception {
return getReadRPCMetric(getMasterMetricSources()).count;
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
package org.yb.pgsql;

import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.yb.YBTestRunner;
import org.yb.minicluster.Metrics;

import java.sql.Connection;
import java.sql.Statement;
Expand Down Expand Up @@ -133,16 +129,6 @@ private void testSelectNoOptimization(String rowMarker) throws Exception {
}

private long getReadOpsCount() throws Exception {
JsonArray[] metrics = getRawTSMetric();
assertEquals(1, metrics.length);
for (JsonElement el : metrics[0]) {
JsonObject obj = el.getAsJsonObject();
String metricType = obj.get("type").getAsString();
if (metricType.equals("server") && obj.get("id").getAsString().equals("yb.tabletserver")) {
return new Metrics(obj).getHistogram(
"handler_latency_yb_tserver_TabletServerService_Read").totalCount;
}
}
throw new Exception("handler_latency_yb_tserver_TabletServerService_Read metrict not found");
return getReadRPCMetric(getTSMetricSources()).count;
}
}
1 change: 1 addition & 0 deletions src/yb/tserver/pg_client.proto
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ message PgPerformOptionsPB {
bool use_xcluster_database_consistency = 15;
uint32 active_sub_transaction_id = 16;
CachingInfo caching_info = 17;
bool read_from_followers = 18;
}

message PgPerformRequestPB {
Expand Down
6 changes: 5 additions & 1 deletion src/yb/tserver/pg_client_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -290,12 +290,13 @@ Result<PgClientSessionOperations> PrepareOperations(
session->Abort();
}
});
const auto read_from_followers = req->options().read_from_followers();
for (auto& op : *req->mutable_ops()) {
if (op.has_read()) {
auto& read = *op.mutable_read();
RETURN_NOT_OK(GetTable(read.table_id(), table_cache, &table));
auto read_op = std::make_shared<client::YBPgsqlReadOp>(table, sidecars, &read);
if (op.read_from_followers()) {
if (read_from_followers) {
read_op->set_yb_consistency_level(YBConsistencyLevel::CONSISTENT_PREFIX);
}
ops.push_back(read_op);
Expand Down Expand Up @@ -833,6 +834,9 @@ PgClientSession::SetupSession(const PgPerformRequestPB& req, CoarseTimePoint dea
const auto& options = req.options();
PgClientSessionKind kind;
if (options.use_catalog_session()) {
SCHECK(!options.read_from_followers(),
InvalidArgument,
"Reading catalog from followers is not allowed");
kind = PgClientSessionKind::kCatalog;
EnsureSession(kind);
} else if (options.ddl_mode()) {
Expand Down
3 changes: 0 additions & 3 deletions src/yb/yql/pggate/pg_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -454,9 +454,6 @@ class PgClient::Impl {
if (op->is_read()) {
auto& read_op = down_cast<PgsqlReadOp&>(*op);
union_op.ref_read(&read_op.read_request());
if (read_op.read_from_followers()) {
union_op.set_read_from_followers(true);
}
} else {
auto& write_op = down_cast<PgsqlWriteOp&>(*op);
if (write_op.write_time()) {
Expand Down
6 changes: 0 additions & 6 deletions src/yb/yql/pggate/pg_doc_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -450,12 +450,6 @@ Status PgDocReadOp::ExecuteInit(const PgExecParameters *exec_params) {
RETURN_NOT_OK(PgDocOp::ExecuteInit(exec_params));

read_op_->read_request().set_return_paging_state(true);
// TODO(10696): This is probably the only place in pg_doc_op where pg_session is being
// used as a source of truth. All other uses treat it as stateless. Refactor to move this
// state elsewhere.
if (pg_session_->ShouldUseFollowerReads()) {
read_op_->set_read_from_followers();
}
SetRequestPrefetchLimit();
SetBackfillSpec();
SetRowMark();
Expand Down
1 change: 0 additions & 1 deletion src/yb/yql/pggate/pg_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ PgsqlOpPtr PgsqlReadOp::DeepCopy(const std::shared_ptr<void>& shared_ptr) const
auto result = ArenaMakeShared<PgsqlReadOp>(
std::shared_ptr<ThreadSafeArena>(shared_ptr, &arena()), &arena(), is_region_local());
result->read_request() = read_request();
result->read_from_followers_ = read_from_followers_;
return result;
}

Expand Down
9 changes: 0 additions & 9 deletions src/yb/yql/pggate/pg_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,6 @@ class PgsqlReadOp : public PgsqlOp {
return true;
}

void set_read_from_followers() {
read_from_followers_ = true;
}

bool read_from_followers() const {
return read_from_followers_;
}

PgsqlOpPtr DeepCopy(const std::shared_ptr<void>& shared_ptr) const;

std::string RequestToString() const override;
Expand All @@ -132,7 +124,6 @@ class PgsqlReadOp : public PgsqlOp {
Status InitPartitionKey(const PgTableDesc& table) override;

LWPgsqlReadRequestPB read_request_;
bool read_from_followers_ = false;
};

std::shared_ptr<PgsqlReadRequestPB> InitSelect(
Expand Down
4 changes: 0 additions & 4 deletions src/yb/yql/pggate/pg_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -713,10 +713,6 @@ Status PgSession::GetIndexBackfillProgress(std::vector<PgObjectId> index_ids,
return pg_client_.GetIndexBackfillProgress(index_ids, backfill_statuses);
}

bool PgSession::ShouldUseFollowerReads() const {
return pg_txn_manager_->ShouldUseFollowerReads();
}

void PgSession::SetTimeout(const int timeout_ms) {
pg_client_.SetTimeout(timeout_ms * 1ms);
}
Expand Down
2 changes: 0 additions & 2 deletions src/yb/yql/pggate/pg_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,6 @@ class PgSession : public RefCountedThreadSafe<PgSession> {
return pg_client_;
}

bool ShouldUseFollowerReads() const;

Status SetActiveSubTransaction(SubTransactionId id);
Status RollbackToSubTransaction(SubTransactionId id);

Expand Down
1 change: 1 addition & 0 deletions src/yb/yql/pggate/pg_txn_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,7 @@ uint64_t PgTxnManager::SetupPerformOptions(tserver::PgPerformOptionsPB* options)
}
if (read_time_for_follower_reads_) {
ReadHybridTime::SingleTime(read_time_for_follower_reads_).ToPB(options->mutable_read_time());
options->set_read_from_followers(true);
}
return txn_serial_no_;
}
Expand Down
1 change: 0 additions & 1 deletion src/yb/yql/pggate/pg_txn_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ class PgTxnManager : public RefCountedThreadSafe<PgTxnManager> {

bool IsTxnInProgress() const { return txn_in_progress_; }
IsolationLevel GetIsolationLevel() const { return isolation_level_; }
bool ShouldUseFollowerReads() const { return read_time_for_follower_reads_.is_valid(); }
bool IsDdlMode() const { return ddl_type_ != DdlType::NonDdl; }

uint64_t SetupPerformOptions(tserver::PgPerformOptionsPB* options);
Expand Down

0 comments on commit d050f04

Please sign in to comment.