Skip to content

Commit

Permalink
[#22135] YSQL: Avoid read restart errors with ANALYZE
Browse files Browse the repository at this point in the history
Summary:
In the current state of the database, ANALYZE can run for a long time on large tables. This long duration increases the chances of errors. We want to minimize such error situations since running analyze again when there is an error can be expensive.

First, we tackle the read restart errors. ANALYZE does not require strict read-after-commit-visibility guarantee, i.e. slightly stale reads are not an issue for the ANALYZE operation. Therefore, we want to avoid these errors for ANALYZE in particular. For this reason, we do not use an ambiguity window (i.e. collapse the ambiguity window to a single point) for ANALYZE.

Moreover, in the current state of the database, DDLs are executed in a "special" transaction separate from the usual transaction code path. This means that multi-table ANALYZE operations such as `ANALYZE;` use a single read point for the entirety of the operation. This is undesirable since there may be a lot of tables in the database and that increases the risk of a snapshot too old error. For this reason, we explicitly pass a fresh read time for ANALYZE of each table from the Pg layer to the tserver proxy. Pg does not exhibit this problem since (a) it runs ANALYZE of each table in a separate transaction (b) it does not cleanup MVCC records that are in use.
Jira: DB-11062

Test Plan:
Jenkins

#### Test 1

```
./yb_build.sh --cxx-test pg_analyze_read_time-test --gtest_filter PgAnalyzeReadTimeTest.InsertRowsConcurrentlyWithAnalyze
```

Insert rows concurrently with analyze to trigger read restart errors.

#### Test 2

```
./yb_build.sh --cxx-test pg_analyze_read_time-test --gtest_filter PgAnalyzeReadTimeTest.AnalyzeMultipleTables
```

Analyze two tables and do a full compaction between the two analyze.

#### Test 3

```lang=sh
$ ./bin/ysqlsh
yugabyte=# create table keys(k int);
CREATE TABLE
... concurrently insert rows using ysql_bench and wait for a while
yugabyte=# analyze keys;
... fails with a read restart error prior to this change but not with this change.
```

To insert rows concurrently use the following sql script

```name=insert.sql,lang=sql
\set random_id random(1, 1000000)
INSERT INTO keys (k) VALUES (:random_id);
```

Ran ysql_bench using

```lang=sh
build/latest/postgres/bin/ysql_bench -t 100000 -f ../insert.sql -n -R 200
```

Reviewers: pjain, bkolagani, yguan

Reviewed By: bkolagani, yguan

Subscribers: ybase, smishra, svc_phabricator, steve.varnau, yql

Differential Revision: https://phorge.dev.yugabyte.com/D37648
  • Loading branch information
pao214 committed Sep 18, 2024
1 parent 9d8366b commit d298d44
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 8 deletions.
33 changes: 27 additions & 6 deletions src/yb/yql/pggate/pg_sample.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,15 @@
#include <vector>
#include <utility>

#include "yb/common/read_hybrid_time.h"

#include "yb/util/atomic.h"

#include "yb/gutil/casts.h"

DEFINE_test_flag(int64, delay_after_table_analyze_ms, 0,
"Add this delay after each table is analyzed.");

namespace yb::pggate {

// Internal class to work as the secondary_index_query_ to select sample tuples.
Expand All @@ -30,15 +37,19 @@ namespace yb::pggate {
class PgSamplePicker : public PgSelectIndex {
public:
PgSamplePicker(
PgSession::ScopedRefPtr pg_session, const PgObjectId& table_id, bool is_region_local)
: PgSelectIndex(std::move(pg_session), table_id, is_region_local) {}
PgSession::ScopedRefPtr pg_session, const PgObjectId& table_id, bool is_region_local,
HybridTime read_time)
: PgSelectIndex(std::move(pg_session), table_id, is_region_local), read_time_(read_time) {}

Status Prepare() override {
target_ = PgTable(VERIFY_RESULT(LoadTable()));
bind_ = PgTable(nullptr);
auto read_op = ArenaMakeShared<PgsqlReadOp>(
arena_ptr(), &arena(), *target_, is_region_local_,
pg_session_->metrics().metrics_capture());
// Use the same time as PgSample. Otherwise, ybctids may be gone
// when PgSample tries to fetch the rows.
read_op->set_read_time(ReadHybridTime::SingleTime(read_time_));
read_req_ = std::shared_ptr<LWPgsqlReadRequestPB>(read_op, &read_op->read_request());
doc_op_ = std::make_shared<PgDocReadOp>(pg_session_, &target_, std::move(read_op));
return Status::OK();
Expand Down Expand Up @@ -100,6 +111,7 @@ class PgSamplePicker : public PgSelectIndex {
}

Result<EstimatedRowCount> GetEstimatedRowCount() const {
AtomicFlagSleepMs(&FLAGS_TEST_delay_after_table_analyze_ms);
return down_cast<const PgDocReadOp*>(doc_op_.get())->GetEstimatedRowCount();
}

Expand All @@ -110,12 +122,15 @@ class PgSamplePicker : public PgSelectIndex {
bool reservoir_ready_ = false;
// Vector of Slices pointing to the values in the reservoir
std::vector<Slice> ybctids_;
// Use the same read time on the ybctid sampler as the row fetcher.
HybridTime read_time_;
};

PgSample::PgSample(
PgSession::ScopedRefPtr pg_session,
int targrows, const PgObjectId& table_id, bool is_region_local)
: PgDmlRead(pg_session, table_id, is_region_local), targrows_(targrows) {}
int targrows, const PgObjectId& table_id, bool is_region_local, HybridTime read_time)
: PgDmlRead(pg_session, table_id, is_region_local), targrows_(targrows),
read_time_(read_time) {}

Status PgSample::Prepare() {
// Setup target and bind descriptor.
Expand All @@ -124,16 +139,23 @@ Status PgSample::Prepare() {

// Setup sample picker as secondary index query
secondary_index_query_ = std::make_unique<PgSamplePicker>(
pg_session_, table_id_, is_region_local_);
pg_session_, table_id_, is_region_local_, read_time_);
RETURN_NOT_OK(secondary_index_query_->Prepare());

// Prepare read op to fetch rows
auto read_op = ArenaMakeShared<PgsqlReadOp>(
arena_ptr(), &arena(), *target_, is_region_local_,
pg_session_->metrics().metrics_capture());
// Clamp the read uncertainty window to avoid read restart errors.
read_op->set_read_time(ReadHybridTime::SingleTime(read_time_));
read_req_ = std::shared_ptr<LWPgsqlReadRequestPB>(read_op, &read_op->read_request());
doc_op_ = make_shared<PgDocReadOp>(pg_session_, &target_, std::move(read_op));

VLOG_WITH_FUNC(3)
<< "Sampling table: " << target_->table_name().table_name()
<< " for " << targrows_ << " rows"
<< " using read time: " << read_time_;

return Status::OK();
}

Expand All @@ -157,4 +179,3 @@ Result<PgSamplePicker&> PgSample::SamplePicker() {
}

} // namespace yb::pggate

7 changes: 6 additions & 1 deletion src/yb/yql/pggate/pg_sample.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

#pragma once

#include "yb/common/hybrid_time.h"

#include "yb/yql/pggate/pg_select_index.h"

#include "yb/yql/pggate/pg_tools.h"
Expand All @@ -29,7 +31,7 @@ class PgSample : public PgDmlRead {
public:
PgSample(
PgSession::ScopedRefPtr pg_session, int targrows, const PgObjectId& table_id,
bool is_region_local);
bool is_region_local, HybridTime read_time);

StmtOp stmt_op() const override { return StmtOp::STMT_SAMPLE; }

Expand All @@ -52,6 +54,9 @@ class PgSample : public PgDmlRead {

// How many sample rows are needed
const int targrows_;

// Holds the read time used for executing ANALYZE on the table.
HybridTime read_time_;
};

} // namespace yb::pggate
3 changes: 2 additions & 1 deletion src/yb/yql/pggate/pggate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1411,7 +1411,8 @@ Status PgApiImpl::NewSample(const PgObjectId& table_id,
bool is_region_local,
PgStatement **handle) {
*handle = nullptr;
auto sample = std::make_unique<PgSample>(pg_session_, targrows, table_id, is_region_local);
auto sample = std::make_unique<PgSample>(pg_session_, targrows, table_id, is_region_local,
clock_->Now());
RETURN_NOT_OK(sample->Prepare());
RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(sample), handle));
return Status::OK();
Expand Down
1 change: 1 addition & 0 deletions src/yb/yql/pgwrapper/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ ADD_YB_TEST(colocation-test)
ADD_YB_TEST(geo_transactions-test)
ADD_YB_TEST(geo_transactions_promotion-test)
ADD_YB_TEST(pg_alter_add_column_default-test)
ADD_YB_TEST(pg_analyze_read_time-test)
ADD_YB_TEST(pg_ash-test)
ADD_YB_TEST(pg_auto_analyze-test)
ADD_YB_TEST(pg_backends-test)
Expand Down
137 changes: 137 additions & 0 deletions src/yb/yql/pgwrapper/pg_analyze_read_time-test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// Copyright (c) YugabyteDB, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.

#include <atomic>
#include <thread>

#include "yb/common/pgsql_error.h"
#include "yb/util/flags.h"

#include "yb/yql/pgwrapper/pg_mini_test_base.h"
#include "yb/yql/pgwrapper/pg_test_utils.h"

DECLARE_string(ysql_pg_conf_csv);
DECLARE_string(ysql_log_statement);
DECLARE_bool(ysql_beta_features);
DECLARE_string(vmodule);
DECLARE_int32(timestamp_history_retention_interval_sec);
DECLARE_int64(TEST_delay_after_table_analyze_ms);

namespace yb::pgwrapper {

class PgAnalyzeReadTimeTest : public PgMiniTestBase {
public:
void SetUp() override {
// ANALYZE is a beta feature.
ANNOTATE_UNPROTECTED_WRITE(FLAGS_ysql_beta_features) = true;
// Easier debugging.
// ASSERT_OK(SET_FLAG(vmodule, "read_query=1"));
ANNOTATE_UNPROTECTED_WRITE(FLAGS_ysql_log_statement) = "all";
PgMiniTestBase::SetUp();
}
};

class PgAnalyzeNoReadRestartsTest : public PgAnalyzeReadTimeTest {
public:
void SetUp() override {
// So that read restart errors are not retried internally.
ANNOTATE_UNPROTECTED_WRITE(FLAGS_ysql_pg_conf_csv) =
MaxQueryLayerRetriesConf(0);
PgAnalyzeReadTimeTest::SetUp();
}
};

TEST_F_EX(PgAnalyzeReadTimeTest, InsertRowsConcurrentlyWithAnalyze, PgAnalyzeNoReadRestartsTest) {
constexpr auto kNumInitialRows = 100000;

// Create table with keys from 1 to kNumInitialRows.
auto setup_conn = ASSERT_RESULT(Connect());
ASSERT_OK(setup_conn.Execute("CREATE TABLE keys (k INT) SPLIT INTO 3 TABLETS"));
ASSERT_OK(setup_conn.ExecuteFormat(
"INSERT INTO keys(k) SELECT GENERATE_SERIES(1, $0)", kNumInitialRows));

// Warm the catalog cache so that subsequent inserts are fast.
// Unfortunately, this is necessary because this test depends on timing.
auto insert_conn = ASSERT_RESULT(Connect());
auto key = kNumInitialRows;
// Populates catalog cache.
key++;
ASSERT_OK(insert_conn.ExecuteFormat(
"INSERT INTO keys(k) VALUES ($0)", key));

std::atomic<bool> stop{false};
CountDownLatch begin_analyze(1);
auto analyze_conn = ASSERT_RESULT(Connect());
auto analyze_status_future = std::async(std::launch::async, [&] {
begin_analyze.Wait();
auto status = analyze_conn.Execute("ANALYZE keys");
stop.store(true);
return status;
});

begin_analyze.CountDown();
while (!stop.load() && key < kNumInitialRows + 100) {
key++;
ASSERT_OK(insert_conn.ExecuteFormat(
"INSERT INTO keys(k) VALUES ($0)", key));

// Throttle inserts to avoid overloading the system.
std::this_thread::sleep_for(10ms);
}

ASSERT_OK(analyze_status_future.get());
}

class PgAnalyzeMultiTableTest : public PgAnalyzeReadTimeTest {
public:
void SetUp() override {
ANNOTATE_UNPROTECTED_WRITE(
FLAGS_timestamp_history_retention_interval_sec) = 0;
// This test is timing based and 10s provides enough time for compaction.
ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_delay_after_table_analyze_ms) = 10000;
PgAnalyzeReadTimeTest::SetUp();
}
};

TEST_F_EX(PgAnalyzeReadTimeTest, AnalyzeMultipleTables, PgAnalyzeMultiTableTest) {
constexpr auto kNumInitialRows = 10000;

// Create table with keys from 1 to kNumInitialRows.
auto setup_conn = ASSERT_RESULT(Connect());
ASSERT_OK(setup_conn.Execute("CREATE TABLE keys (k INT)"));
ASSERT_OK(setup_conn.ExecuteFormat(
"INSERT INTO keys(k) SELECT GENERATE_SERIES(1, $0)", kNumInitialRows));
ASSERT_OK(setup_conn.Execute("CREATE TABLE values (v INT)"));
ASSERT_OK(setup_conn.ExecuteFormat(
"INSERT INTO values(v) SELECT GENERATE_SERIES(1, $0)", kNumInitialRows));

auto update_conn = ASSERT_RESULT(Connect());
auto analyze_conn = ASSERT_RESULT(Connect());

CountDownLatch update_thread_started(1);
auto update_status_future = std::async(std::launch::async, [&] {
update_thread_started.CountDown();
auto status = update_conn.Execute("UPDATE values SET v = v + 1");
FlushAndCompactTablets();
LOG(INFO) << "Compaction done!";
return status;
});

update_thread_started.Wait();
auto analyze_status = analyze_conn.Execute("ANALYZE keys, values");
ASSERT_OK(analyze_status);
LOG(INFO) << "Analyze done!";

ASSERT_OK(update_status_future.get());
}

} // namespace yb::pgwrapper

0 comments on commit d298d44

Please sign in to comment.