diff --git a/java/yb-pgsql/src/test/java/org/yb/pgsql/BasePgSQLTest.java b/java/yb-pgsql/src/test/java/org/yb/pgsql/BasePgSQLTest.java index 1eff2ee09ab1..fbf988a80dd0 100644 --- a/java/yb-pgsql/src/test/java/org/yb/pgsql/BasePgSQLTest.java +++ b/java/yb-pgsql/src/test/java/org/yb/pgsql/BasePgSQLTest.java @@ -25,6 +25,7 @@ import java.io.File; import java.net.InetSocketAddress; +import java.net.MalformedURLException; import java.net.URL; import java.net.URLConnection; import java.nio.charset.StandardCharsets; @@ -603,8 +604,7 @@ protected AggregatedValue getStatementStat(String statName) throws Exception { ts.getLocalhostIP(), ts.getPgsqlWebPort())); Scanner scanner = new Scanner(url.openConnection().getInputStream()); - JsonParser parser = new JsonParser(); - JsonElement tree = parser.parse(scanner.useDelimiter("\\A").next()); + JsonElement tree = JsonParser.parseString(scanner.useDelimiter("\\A").next()); JsonObject obj = tree.getAsJsonObject(); YSQLStat ysqlStat = new Metrics(obj, true).getYSQLStat(statName); if (ysqlStat != null) { @@ -660,30 +660,51 @@ protected void verifyStatementStatWithReset(Statement stmt, String sql, String s verifyStatementStats(stmt, sql, statName, numLoopsAfterReset, oldValue); } - private JsonArray[] getRawMetric( - Function portFetcher) throws Exception { + private URL[] getMetricSources( + Collection servers, Function portFetcher) + throws MalformedURLException { + URL[] result = new URL[servers.size()]; + int index = 0; + for (MiniYBDaemon s : servers) { + result[index++] = new URL(String.format( + "http://%s:%d/metrics", s.getLocalhostIP(), portFetcher.apply(s))); + } + return result; + } + + protected URL[] getTSMetricSources() throws MalformedURLException { + return getMetricSources(miniCluster.getTabletServers().values(), (s) -> s.getWebPort()); + } + + protected URL[] getYSQLMetricSources() throws MalformedURLException { + return getMetricSources(miniCluster.getTabletServers().values(), (s) -> s.getPgsqlWebPort()); + } + + protected URL[] getMasterMetricSources() throws MalformedURLException { + return getMetricSources(miniCluster.getMasters().values(), (s) -> s.getWebPort()); + } + + private JsonArray[] getRawMetric(URL[] sources) throws Exception { Collection servers = miniCluster.getTabletServers().values(); JsonArray[] result = new JsonArray[servers.size()]; int index = 0; - for (MiniYBDaemon ts : servers) { - URLConnection connection = new URL(String.format("http://%s:%d/metrics", - ts.getLocalhostIP(), - portFetcher.apply(ts))).openConnection(); + for (URL url : sources) { + URLConnection connection = url.openConnection(); connection.setUseCaches(false); Scanner scanner = new Scanner(connection.getInputStream()); result[index++] = - new JsonParser().parse(scanner.useDelimiter("\\A").next()).getAsJsonArray(); + JsonParser.parseString(scanner.useDelimiter("\\A").next()).getAsJsonArray(); scanner.close(); } return result; } protected JsonArray[] getRawTSMetric() throws Exception { - return getRawMetric((ts) -> ts.getWebPort()); + return getRawMetric(getTSMetricSources()); } protected JsonArray[] getRawYSQLMetric() throws Exception { - return getRawMetric((ts) -> ts.getPgsqlWebPort()); + return getRawMetric(getYSQLMetricSources()); } protected AggregatedValue getMetric(String metricName) throws Exception { @@ -700,10 +721,10 @@ protected AggregatedValue getMetric(String metricName) throws Exception { return value; } - protected Long getTserverMetricCountForTable(String metricName, String tableName) - throws Exception { + protected long getMetricCountForTable( + URL[] sources, String metricName, String tableName) throws Exception { long count = 0; - for (JsonArray rawMetric : getRawTSMetric()) { + for (JsonArray rawMetric : getRawMetric(sources)) { for (JsonElement elem : rawMetric.getAsJsonArray()) { JsonObject obj = elem.getAsJsonObject(); if (obj.get("type").getAsString().equals("tablet") && @@ -723,13 +744,17 @@ protected Long getTserverMetricCountForTable(String metricName, String tableName return count; } - protected AggregatedValue getTServerMetric(String metricName) throws Exception { + protected long getTserverMetricCountForTable( + String metricName, String tableName) throws Exception { + return getMetricCountForTable(getTSMetricSources(), metricName, tableName); + } + + protected AggregatedValue getServerMetric(URL[] sources, String metricName) throws Exception { AggregatedValue value = new AggregatedValue(); - for (JsonArray rawMetric : getRawTSMetric()) { + for (JsonArray rawMetric : getRawMetric(sources)) { for (JsonElement elem : rawMetric.getAsJsonArray()) { JsonObject obj = elem.getAsJsonObject(); if (obj.get("type").getAsString().equals("server")) { - assertEquals(obj.get("id").getAsString(), "yb.tabletserver"); Metrics.Histogram histogram = new Metrics(obj).getHistogram(metricName); value.count += histogram.totalCount; value.value += histogram.totalSum; @@ -739,6 +764,14 @@ protected AggregatedValue getTServerMetric(String metricName) throws Exception { return value; } + protected AggregatedValue getReadRPCMetric(URL[] sources) throws Exception { + return getServerMetric(sources, "handler_latency_yb_tserver_TabletServerService_Read"); + } + + protected AggregatedValue getTServerMetric(String metricName) throws Exception { + return getServerMetric(getTSMetricSources(), metricName); + } + protected List getTabletsForTable( String database, String tableName) throws Exception { try { diff --git a/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgFollowerReads.java b/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgFollowerReads.java index 0312fd6559e1..873688381e67 100644 --- a/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgFollowerReads.java +++ b/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgFollowerReads.java @@ -18,27 +18,17 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.yb.minicluster.RocksDBMetrics; - import org.yb.util.BuildTypeUtil; import org.yb.YBTestRunner; -import org.yb.util.RegexMatcher; -import org.yb.YBTestRunner; -import java.math.BigDecimal; import java.sql.Connection; -import java.sql.ResultSet; -import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Set; -import java.util.stream.Collectors; import static org.yb.AssertionWrappers.*; @@ -154,7 +144,7 @@ public void testSetIsolationLevelsWithReadFromFollowersSessionVariable() throws statement.execute( "SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL REPEATABLE READ"); - // yb_read_from_followers enabled with SERIALIZABL + // yb_read_from_followers enabled with SERIALIZABLE statement.execute( "SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL SERIALIZABLE"); @@ -458,4 +448,26 @@ public void testOrderedSelectConsistentPrefix() throws Exception { public void testSelectConsistentPrefix() throws Exception { testConsistentPrefix(7000, /* use_ordered_by */ false, /* get_count */ false); } + + // The test checks that follower reads are not used if sys catalog reads are to be performed. + @Test + public void testPgSysCatalogNoFollowerReads() throws Exception { + try (Statement stmt = connection.createStatement()) { + stmt.execute("SET yb_follower_read_staleness_ms = " + (2 * kMaxClockSkewMs + 1)); + stmt.execute("SET yb_read_from_followers = true"); + stmt.execute("BEGIN TRANSACTION READ ONLY"); + long startReadRPCCount = getMasterReadRPCCount(); + stmt.execute("SELECT EXTRACT(month FROM NOW())"); + long endReadRPCCount = getMasterReadRPCCount(); + stmt.execute("COMMIT"); + assertGreaterThan(endReadRPCCount, startReadRPCCount); + long sysCatalogFolowerReads = getMetricCountForTable( + getMasterMetricSources(), "consistent_prefix_read_requests", "sys.catalog"); + assertEquals(0L, sysCatalogFolowerReads); + } + } + + private long getMasterReadRPCCount() throws Exception { + return getReadRPCMetric(getMasterMetricSources()).count; + } } diff --git a/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgForeignKeyOptimization.java b/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgForeignKeyOptimization.java index fe833001ebc0..cd2326795cd4 100644 --- a/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgForeignKeyOptimization.java +++ b/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgForeignKeyOptimization.java @@ -1,12 +1,8 @@ package org.yb.pgsql; -import com.google.gson.JsonArray; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; import org.junit.Test; import org.junit.runner.RunWith; import org.yb.YBTestRunner; -import org.yb.minicluster.Metrics; import java.sql.Connection; import java.sql.Statement; @@ -133,16 +129,6 @@ private void testSelectNoOptimization(String rowMarker) throws Exception { } private long getReadOpsCount() throws Exception { - JsonArray[] metrics = getRawTSMetric(); - assertEquals(1, metrics.length); - for (JsonElement el : metrics[0]) { - JsonObject obj = el.getAsJsonObject(); - String metricType = obj.get("type").getAsString(); - if (metricType.equals("server") && obj.get("id").getAsString().equals("yb.tabletserver")) { - return new Metrics(obj).getHistogram( - "handler_latency_yb_tserver_TabletServerService_Read").totalCount; - } - } - throw new Exception("handler_latency_yb_tserver_TabletServerService_Read metrict not found"); + return getReadRPCMetric(getTSMetricSources()).count; } } diff --git a/src/yb/tserver/pg_client.proto b/src/yb/tserver/pg_client.proto index c4c10a0c9fa9..fa03edef40fb 100644 --- a/src/yb/tserver/pg_client.proto +++ b/src/yb/tserver/pg_client.proto @@ -388,6 +388,7 @@ message PgPerformOptionsPB { bool use_xcluster_database_consistency = 15; uint32 active_sub_transaction_id = 16; CachingInfo caching_info = 17; + bool read_from_followers = 18; } message PgPerformRequestPB { diff --git a/src/yb/tserver/pg_client_session.cc b/src/yb/tserver/pg_client_session.cc index f7d350c9df6e..994a665a4938 100644 --- a/src/yb/tserver/pg_client_session.cc +++ b/src/yb/tserver/pg_client_session.cc @@ -290,12 +290,13 @@ Result PrepareOperations( session->Abort(); } }); + const auto read_from_followers = req->options().read_from_followers(); for (auto& op : *req->mutable_ops()) { if (op.has_read()) { auto& read = *op.mutable_read(); RETURN_NOT_OK(GetTable(read.table_id(), table_cache, &table)); auto read_op = std::make_shared(table, sidecars, &read); - if (op.read_from_followers()) { + if (read_from_followers) { read_op->set_yb_consistency_level(YBConsistencyLevel::CONSISTENT_PREFIX); } ops.push_back(read_op); @@ -833,6 +834,9 @@ PgClientSession::SetupSession(const PgPerformRequestPB& req, CoarseTimePoint dea const auto& options = req.options(); PgClientSessionKind kind; if (options.use_catalog_session()) { + SCHECK(!options.read_from_followers(), + InvalidArgument, + "Reading catalog from followers is not allowed"); kind = PgClientSessionKind::kCatalog; EnsureSession(kind); } else if (options.ddl_mode()) { diff --git a/src/yb/yql/pggate/pg_client.cc b/src/yb/yql/pggate/pg_client.cc index 4b104b580c3c..f5463df30ea2 100644 --- a/src/yb/yql/pggate/pg_client.cc +++ b/src/yb/yql/pggate/pg_client.cc @@ -454,9 +454,6 @@ class PgClient::Impl { if (op->is_read()) { auto& read_op = down_cast(*op); union_op.ref_read(&read_op.read_request()); - if (read_op.read_from_followers()) { - union_op.set_read_from_followers(true); - } } else { auto& write_op = down_cast(*op); if (write_op.write_time()) { diff --git a/src/yb/yql/pggate/pg_doc_op.cc b/src/yb/yql/pggate/pg_doc_op.cc index f5a63ac6a1da..ae2d52129b26 100644 --- a/src/yb/yql/pggate/pg_doc_op.cc +++ b/src/yb/yql/pggate/pg_doc_op.cc @@ -450,12 +450,6 @@ Status PgDocReadOp::ExecuteInit(const PgExecParameters *exec_params) { RETURN_NOT_OK(PgDocOp::ExecuteInit(exec_params)); read_op_->read_request().set_return_paging_state(true); - // TODO(10696): This is probably the only place in pg_doc_op where pg_session is being - // used as a source of truth. All other uses treat it as stateless. Refactor to move this - // state elsewhere. - if (pg_session_->ShouldUseFollowerReads()) { - read_op_->set_read_from_followers(); - } SetRequestPrefetchLimit(); SetBackfillSpec(); SetRowMark(); diff --git a/src/yb/yql/pggate/pg_op.cc b/src/yb/yql/pggate/pg_op.cc index 16fbfdadb75b..95f8a18470c8 100644 --- a/src/yb/yql/pggate/pg_op.cc +++ b/src/yb/yql/pggate/pg_op.cc @@ -136,7 +136,6 @@ PgsqlOpPtr PgsqlReadOp::DeepCopy(const std::shared_ptr& shared_ptr) const auto result = ArenaMakeShared( std::shared_ptr(shared_ptr, &arena()), &arena(), is_region_local()); result->read_request() = read_request(); - result->read_from_followers_ = read_from_followers_; return result; } diff --git a/src/yb/yql/pggate/pg_op.h b/src/yb/yql/pggate/pg_op.h index a28bb52dea26..79ca1b9c4f61 100644 --- a/src/yb/yql/pggate/pg_op.h +++ b/src/yb/yql/pggate/pg_op.h @@ -116,14 +116,6 @@ class PgsqlReadOp : public PgsqlOp { return true; } - void set_read_from_followers() { - read_from_followers_ = true; - } - - bool read_from_followers() const { - return read_from_followers_; - } - PgsqlOpPtr DeepCopy(const std::shared_ptr& shared_ptr) const; std::string RequestToString() const override; @@ -132,7 +124,6 @@ class PgsqlReadOp : public PgsqlOp { Status InitPartitionKey(const PgTableDesc& table) override; LWPgsqlReadRequestPB read_request_; - bool read_from_followers_ = false; }; std::shared_ptr InitSelect( diff --git a/src/yb/yql/pggate/pg_session.cc b/src/yb/yql/pggate/pg_session.cc index 1500465340c0..87d357d55135 100644 --- a/src/yb/yql/pggate/pg_session.cc +++ b/src/yb/yql/pggate/pg_session.cc @@ -713,10 +713,6 @@ Status PgSession::GetIndexBackfillProgress(std::vector index_ids, return pg_client_.GetIndexBackfillProgress(index_ids, backfill_statuses); } -bool PgSession::ShouldUseFollowerReads() const { - return pg_txn_manager_->ShouldUseFollowerReads(); -} - void PgSession::SetTimeout(const int timeout_ms) { pg_client_.SetTimeout(timeout_ms * 1ms); } diff --git a/src/yb/yql/pggate/pg_session.h b/src/yb/yql/pggate/pg_session.h index 865305d989a1..8503e863210f 100644 --- a/src/yb/yql/pggate/pg_session.h +++ b/src/yb/yql/pggate/pg_session.h @@ -329,8 +329,6 @@ class PgSession : public RefCountedThreadSafe { return pg_client_; } - bool ShouldUseFollowerReads() const; - Status SetActiveSubTransaction(SubTransactionId id); Status RollbackToSubTransaction(SubTransactionId id); diff --git a/src/yb/yql/pggate/pg_txn_manager.cc b/src/yb/yql/pggate/pg_txn_manager.cc index d97250c4291b..b00c02473dc9 100644 --- a/src/yb/yql/pggate/pg_txn_manager.cc +++ b/src/yb/yql/pggate/pg_txn_manager.cc @@ -458,6 +458,7 @@ uint64_t PgTxnManager::SetupPerformOptions(tserver::PgPerformOptionsPB* options) } if (read_time_for_follower_reads_) { ReadHybridTime::SingleTime(read_time_for_follower_reads_).ToPB(options->mutable_read_time()); + options->set_read_from_followers(true); } return txn_serial_no_; } diff --git a/src/yb/yql/pggate/pg_txn_manager.h b/src/yb/yql/pggate/pg_txn_manager.h index e3b2dc0c9447..6af5f625ef99 100644 --- a/src/yb/yql/pggate/pg_txn_manager.h +++ b/src/yb/yql/pggate/pg_txn_manager.h @@ -72,7 +72,6 @@ class PgTxnManager : public RefCountedThreadSafe { bool IsTxnInProgress() const { return txn_in_progress_; } IsolationLevel GetIsolationLevel() const { return isolation_level_; } - bool ShouldUseFollowerReads() const { return read_time_for_follower_reads_.is_valid(); } bool IsDdlMode() const { return ddl_type_ != DdlType::NonDdl; } uint64_t SetupPerformOptions(tserver::PgPerformOptionsPB* options);