From 6e604ba9a00842d4e114ef70708c99b6d2fac2af Mon Sep 17 00:00:00 2001 From: Deepthi Srinivasan Date: Wed, 2 Nov 2022 17:03:24 -0700 Subject: [PATCH] [#13358] YSQL: Part 1 - YSQL DDL Atomicity Summary: Currently the metadata for YSQL Tables is stored in both the PG catalog and DocDB schema. Sometimes when a DDL transaction fails, we can hit an issue where these two sources of metadata go out of sync. This is because the PG catalog is modified using the DocDB transactions framework, but the DocDB schema is stored in the Kudu-inherited structures in the YB-Master sys catalog. Thus when a transaction is aborted, only the changes to the PG catalog rollback, but the changes made to the DocDB schema cannot be rolled-back. This patch contains the first phase of changes required to fix this issue. We now enforce that only one DDL that modifies the DocDB schema can occur on a table. This simplifies the problem because, if a DDL transaction starts, the DocDB schema now only needs to maintain two versions of the schema - the current committed schema, and the uncommitted schema of the ongoing transaction. This patch then extends the Transaction GC framework introduced in commit: 88e9d5921ff80d4ab277df6e6db7ddb5267c8a8e to also handle ALTER operations and DROP operations. When a DDL transaction modifies the DocDB schema, the transaction metadata, and the current schema of the table is stored in the SysTablesEntryPB. YB-Master monitors the state of the transaction. Once the transaction ends, YB-Master detects whether the transaction was successful or not by comparing the DocDB schema and PG catalog. If the transaction is aborted, YB-Master will rollback the changes made to the DocDB schema using the state present in the SysTablesEntryPB. **Correctness** Thus, the two schemas can be described to be "eventually consistent" because we can have periods of time where both the schemas do not match. However this mostly not affect correctness because of the following two properties: - When inconsistent, the DocDB schema always has more columns/constraints than PG schema. - Clients always use the PG schema (which is guaranteed to not return uncommitted state) to prepare their read/write requests. These two properties ensure that we can't have orphaned data or failed integrity checks or use DDL entities created by uncommitted transactions because of DocDB schema being inconsistent. **Feature Flag:** ysql_ddl_rollback_enabled Note that this flag is shared across pg backends and YB-Master. This flag is used by the PG backends to determine whether or not to perform best-effort rollback for ALTER TABLE ADD column introduced in 5a6bfcd9eb9b00abef2dffdf0f40e4da181c3146 Note that we do not support changing this flag //**during**// a DDL operation. If this flag is changed to false after a DDL but before the rollback completes, then all future DDL operations will ignore the presence of YsqlTxnVerifierState. Alter operations in particular will clear the YsqlTxnVerifierState as well. Notes: - Since this patch contains only the first phase of changes, this feature is disabled by default using the flag `ysql_ddl_rollback_enabled` - This patch only handles Create Table, Alter Table Add Column, Alter Table Add Primary Key, Alter Table Rename Table/Column and Drop Table - This patch does not have support for databases, materialized views, indexes, colocated tables, PG catalog tables and tablegroups. - GUC `yb_test_fail_next_ddl` has been changed to fail any upcoming DDL operation, not just CREATE TABLE ddl operations. Design Doc: https://docs.google.com/document/d/1YAUnSV9X7Gv2ZVpILUe15NbXbVwgFvpV8IRaj6wFMZo/edit?usp=sharing Test Plan: ybd --cxx-test pg_ddl_atomicity-test Reviewers: ena, nicolas, myang Reviewed By: nicolas, myang Subscribers: zyu, pjain, jmaley, hsunder, yguan, dmitry, ybase, yql Differential Revision: https://phabricator.dev.yugabyte.com/D18398 --- .../java/org/yb/pgsql/TestYsqlUpgrade.java | 2 +- src/postgres/src/backend/commands/tablecmds.c | 22 +- src/postgres/src/backend/commands/ybccmds.c | 27 +- src/postgres/src/backend/utils/misc/guc.c | 8 +- .../src/backend/utils/misc/pg_yb_utils.c | 13 +- src/postgres/src/include/pg_yb_utils.h | 2 - src/yb/client/CMakeLists.txt | 2 +- src/yb/client/client-internal.cc | 12 +- src/yb/client/client-internal.h | 14 +- src/yb/client/client-test-util.cc | 85 ++ src/yb/client/client-test-util.h | 18 + src/yb/client/client.cc | 8 +- src/yb/client/client.h | 10 +- src/yb/client/snapshot_test_util.cc | 35 +- src/yb/client/snapshot_test_util.h | 13 +- src/yb/common/common_flags.cc | 8 + src/yb/master/catalog_entity_info.h | 27 + src/yb/master/catalog_entity_info.proto | 21 +- src/yb/master/catalog_loaders.cc | 32 +- src/yb/master/catalog_manager.cc | 329 ++++++- src/yb/master/catalog_manager.h | 21 + src/yb/master/master-path-handlers.cc | 38 +- src/yb/master/master_ddl.proto | 1 + src/yb/master/master_types.proto | 2 + src/yb/master/ysql_transaction_ddl.cc | 372 ++++++-- src/yb/master/ysql_transaction_ddl.h | 61 ++ src/yb/tserver/pg_client_session.cc | 9 +- src/yb/yql/pggate/pggate.cc | 8 + src/yb/yql/pggate/pggate.h | 2 + src/yb/yql/pggate/ybc_pg_typedefs.h | 1 + src/yb/yql/pggate/ybc_pggate.cc | 9 +- src/yb/yql/pggate/ybc_pggate.h | 2 + src/yb/yql/pgwrapper/CMakeLists.txt | 4 +- src/yb/yql/pgwrapper/libpq_utils.cc | 5 + src/yb/yql/pgwrapper/libpq_utils.h | 2 + src/yb/yql/pgwrapper/pg_ddl_atomicity-test.cc | 890 ++++++++++++++++++ src/yb/yql/pgwrapper/pg_gin_index-test.cc | 18 +- .../yql/pgwrapper/pg_index_backfill-test.cc | 14 +- src/yb/yql/pgwrapper/pg_libpq-test.cc | 236 +---- 39 files changed, 1940 insertions(+), 443 deletions(-) create mode 100644 src/yb/yql/pgwrapper/pg_ddl_atomicity-test.cc diff --git a/java/yb-pgsql/src/test/java/org/yb/pgsql/TestYsqlUpgrade.java b/java/yb-pgsql/src/test/java/org/yb/pgsql/TestYsqlUpgrade.java index 986032b1cae0..4ce114cf9b61 100644 --- a/java/yb-pgsql/src/test/java/org/yb/pgsql/TestYsqlUpgrade.java +++ b/java/yb-pgsql/src/test/java/org/yb/pgsql/TestYsqlUpgrade.java @@ -441,7 +441,7 @@ public void creatingSystemRelsAfterFailure() throws Exception { + ")"; stmt.execute("SET yb_test_fail_next_ddl TO true"); - runInvalidQuery(stmt, ddlSql, "DDL failed as requested"); + runInvalidQuery(stmt, ddlSql, "Failed DDL operation as requested"); // Letting CatalogManagerBgTasks do the cleanup. Thread.sleep(BuildTypeUtil.adjustTimeout(5000)); diff --git a/src/postgres/src/backend/commands/tablecmds.c b/src/postgres/src/backend/commands/tablecmds.c index b54a47413052..c614ed81c7ae 100644 --- a/src/postgres/src/backend/commands/tablecmds.c +++ b/src/postgres/src/backend/commands/tablecmds.c @@ -991,9 +991,6 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId, InvalidOid /* matviewPgTableId */); } - /* For testing purposes, user might ask us to fail a DLL. */ - YBTestFailDdlIfRequested(); - /* * Open the new relation and acquire exclusive lock on it. This isn't * really necessary for locking out other backends (since they can't see @@ -3896,12 +3893,21 @@ ATController(AlterTableStmt *parsetree, } PG_CATCH(); { - /* Rollback the DocDB changes. */ - ListCell *lc = NULL; - foreach(lc, rollbackHandles) + if (!*YBCGetGFlags()->ysql_ddl_rollback_enabled || + rel->yb_table_properties->is_colocated) { - YBCPgStatement handle = (YBCPgStatement) lfirst(lc); - YBCExecAlterTable(handle, RelationGetRelid(rel)); + /* + * The new way of doing ddl rollback is disabled (or unsupported, + * as in the case of colocated tables) fall back to the old way of + * doing a best-effort rollback which may not always succeed + * (e.g., in case of network failure or PG crash). + */ + ListCell *lc = NULL; + foreach(lc, rollbackHandles) + { + YBCPgStatement handle = (YBCPgStatement) lfirst(lc); + YBCExecAlterTable(handle, RelationGetRelid(rel)); + } } PG_RE_THROW(); } diff --git a/src/postgres/src/backend/commands/ybccmds.c b/src/postgres/src/backend/commands/ybccmds.c index e6ec15b058cc..ce88e510c853 100644 --- a/src/postgres/src/backend/commands/ybccmds.c +++ b/src/postgres/src/backend/commands/ybccmds.c @@ -34,7 +34,6 @@ #include "catalog/pg_class.h" #include "catalog/pg_constraint.h" #include "catalog/pg_database.h" -#include "catalog/pg_namespace.h" #include "catalog/pg_tablespace.h" #include "catalog/pg_type.h" #include "catalog/pg_type_d.h" @@ -697,15 +696,31 @@ YBCDropTable(Relation relation) false, /* if_exists */ &handle), ¬_found); - const bool valid_handle = !not_found; - if (valid_handle) + if (not_found) + { + return; + } + /* + * YSQL DDL Rollback is not yet supported for colocated tables. + */ + if (*YBCGetGFlags()->ysql_ddl_rollback_enabled && + !yb_props->is_colocated) { /* - * We cannot abort drop in DocDB so postpone the execution until - * the rest of the statement/txn is finished executing. + * The following issues a request to the YB-Master to drop the + * table once this transaction commits. */ - YBSaveDdlHandle(handle); + HandleYBStatusIgnoreNotFound(YBCPgExecDropTable(handle), + ¬_found); + return; } + /* + * YSQL DDL Rollback is disabled/unsupported. This means DocDB will not + * rollback the drop if the transaction ends up failing. We cannot + * abort drop in DocDB so postpone the execution until the rest of the + * statement/txn finishes executing. + */ + YBSaveDdlHandle(handle); } } diff --git a/src/postgres/src/backend/utils/misc/guc.c b/src/postgres/src/backend/utils/misc/guc.c index 873a0ae65933..eef7e216467b 100644 --- a/src/postgres/src/backend/utils/misc/guc.c +++ b/src/postgres/src/backend/utils/misc/guc.c @@ -2020,8 +2020,8 @@ static struct config_bool ConfigureNamesBool[] = { {"yb_test_system_catalogs_creation", PGC_SUSET, DEVELOPER_OPTIONS, - gettext_noop("Relaxes some internal sanity checks for system catalogs to " - "allow creating them."), + gettext_noop("Relaxes some internal sanity checks for system " + "catalogs to allow creating them."), NULL, GUC_NOT_IN_SAMPLE }, @@ -2032,8 +2032,8 @@ static struct config_bool ConfigureNamesBool[] = { {"yb_test_fail_next_ddl", PGC_USERSET, DEVELOPER_OPTIONS, - gettext_noop("When set, the next DDL (only CREATE TABLE for now) " - "will fail right after DocDB processes the actual database structure change."), + gettext_noop("When set, the next DDL will fail right before " + "commit."), NULL, GUC_NOT_IN_SAMPLE }, diff --git a/src/postgres/src/backend/utils/misc/pg_yb_utils.c b/src/postgres/src/backend/utils/misc/pg_yb_utils.c index 3421bb180754..df938aa297d8 100644 --- a/src/postgres/src/backend/utils/misc/pg_yb_utils.c +++ b/src/postgres/src/backend/utils/misc/pg_yb_utils.c @@ -1127,6 +1127,11 @@ YBDecrementDdlNestingLevel(bool is_catalog_version_increment, --ddl_transaction_state.nesting_level; if (ddl_transaction_state.nesting_level == 0) { + if (yb_test_fail_next_ddl) + { + yb_test_fail_next_ddl = false; + elog(ERROR, "Failed DDL operation as requested"); + } if (GetCurrentMemoryContext() == ddl_transaction_state.mem_context) MemoryContextSwitchTo(ddl_transaction_state.mem_context->parent); /* @@ -1638,14 +1643,6 @@ void YBCFillUniqueIndexNullAttribute(YBCPgYBTupleIdDescriptor* descr) { last_attr->is_null = true; } -void YBTestFailDdlIfRequested() { - if (!yb_test_fail_next_ddl) - return; - - yb_test_fail_next_ddl = false; - elog(ERROR, "DDL failed as requested"); -} - void YbTestGucBlockWhileStrEqual(char **actual, const char *expected, const char *msg) diff --git a/src/postgres/src/include/pg_yb_utils.h b/src/postgres/src/include/pg_yb_utils.h index 8138d86000f0..b48b3967d19a 100644 --- a/src/postgres/src/include/pg_yb_utils.h +++ b/src/postgres/src/include/pg_yb_utils.h @@ -565,8 +565,6 @@ YbTableProperties YbTryGetTableProperties(Relation rel); */ bool YBIsSupportedLibcLocale(const char *localebuf); -void YBTestFailDdlIfRequested(); - /* Spin wait while test guc var actual equals expected. */ extern void YbTestGucBlockWhileStrEqual(char **actual, const char *expected, const char *msg); diff --git a/src/yb/client/CMakeLists.txt b/src/yb/client/CMakeLists.txt index c7514269b23b..09a970c3ac0a 100644 --- a/src/yb/client/CMakeLists.txt +++ b/src/yb/client/CMakeLists.txt @@ -109,7 +109,7 @@ endif() # and therefore don't need to worry about export strictness) ADD_YB_LIBRARY(yb_client_test_util SRCS client-test-util.cc - DEPS gmock gtest yb_client) + DEPS gmock gtest yb_client yb_test_util) ADD_YB_LIBRARY(ql-dml-test-base SRCS ql-dml-test-base.cc txn-test-base.cc snapshot_test_util.cc diff --git a/src/yb/client/client-internal.cc b/src/yb/client/client-internal.cc index 81d2d4fdf669..e59de4a82c8b 100644 --- a/src/yb/client/client-internal.cc +++ b/src/yb/client/client-internal.cc @@ -108,6 +108,7 @@ DEFINE_test_flag(string, assert_tablet_server_select_is_in_zone, "", DECLARE_int64(reset_master_leader_timeout_ms); DECLARE_string(flagfile); +DECLARE_bool(ysql_ddl_rollback_enabled); namespace yb { @@ -558,7 +559,8 @@ Status YBClient::Data::DeleteTable(YBClient* client, const bool is_index_table, CoarseTimePoint deadline, YBTableName* indexed_table_name, - bool wait) { + bool wait, + const TransactionMetadata *txn) { DeleteTableRequestPB req; DeleteTableResponsePB resp; int attempts = 0; @@ -569,6 +571,14 @@ Status YBClient::Data::DeleteTable(YBClient* client, if (!table_id.empty()) { req.mutable_table()->set_table_id(table_id); } + if (FLAGS_ysql_ddl_rollback_enabled && txn) { + // If 'txn' is set, this means this delete operation should actually result in the + // deletion of table data only if this transaction is a success. Therefore ensure that + // 'wait' is not set, because it makes no sense to wait for the deletion to complete if we want + // to postpone the deletion until end of transaction. + DCHECK(!wait); + txn->ToPB(req.mutable_transaction()); + } req.set_is_index_table(is_index_table); const Status status = SyncLeaderMasterRpc( deadline, req, &resp, "DeleteTable", &master::MasterDdlProxy::DeleteTableAsync, diff --git a/src/yb/client/client-internal.h b/src/yb/client/client-internal.h index c13431675c36..0c3cc98e8b90 100644 --- a/src/yb/client/client-internal.h +++ b/src/yb/client/client-internal.h @@ -42,6 +42,7 @@ #include "yb/common/common_net.pb.h" #include "yb/common/entity_ids.h" #include "yb/common/index.h" +#include "yb/common/transaction.h" #include "yb/common/wire_protocol.h" #include "yb/master/master_fwd.h" @@ -137,12 +138,13 @@ class YBClient::Data { // Take one of table id or name. Status DeleteTable(YBClient* client, - const YBTableName& table_name, - const std::string& table_id, - bool is_index_table, - CoarseTimePoint deadline, - YBTableName* indexed_table_name, - bool wait = true); + const YBTableName& table_name, + const std::string& table_id, + bool is_index_table, + CoarseTimePoint deadline, + YBTableName* indexed_table_name, + bool wait = true, + const TransactionMetadata *txn = nullptr); Status IsDeleteTableInProgress(YBClient* client, const std::string& table_id, diff --git a/src/yb/client/client-test-util.cc b/src/yb/client/client-test-util.cc index 14396ed449f6..df5ff6ede528 100644 --- a/src/yb/client/client-test-util.cc +++ b/src/yb/client/client-test-util.cc @@ -43,16 +43,21 @@ #include #include "yb/client/client_fwd.h" +#include "yb/client/client.h" #include "yb/client/error.h" #include "yb/client/schema.h" #include "yb/client/session.h" #include "yb/client/table_handle.h" #include "yb/client/yb_op.h" +#include "yb/client/yb_table_name.h" #include "yb/common/common.pb.h" #include "yb/common/ql_type.h" #include "yb/common/ql_value.h" +#include "yb/master/master_types.pb.h" + +#include "yb/util/backoff_waiter.h" #include "yb/util/enums.h" #include "yb/util/monotime.h" #include "yb/util/status.h" @@ -60,12 +65,42 @@ #include "yb/util/status_log.h" #include "yb/util/strongly_typed_bool.h" #include "yb/util/test_macros.h" +#include "yb/util/test_util.h" using std::string; namespace yb { namespace client { +namespace { + void VerifyNamespace(client::YBClient *client, + const NamespaceName& db_name, + const bool exists, + const int timeout_secs) { + ASSERT_OK(LoggedWaitFor([&]() -> Result { + Result ret = client->NamespaceExists(db_name, YQLDatabase::YQL_DATABASE_PGSQL); + WARN_NOT_OK(ResultToStatus(ret), "NamespaceExists call failed"); + return ret.ok() && ret.get() == exists; + }, MonoDelta::FromSeconds(timeout_secs), + Format("Verify Namespace $0 $1 exists", db_name, (exists ? "" : "not")))); + } + + void VerifyTable(client::YBClient* client, + const string& database_name, + const string& table_name, + const int timeout_secs, + const bool exists) { + ASSERT_OK(LoggedWaitFor([&]() -> Result { + auto ret = client->TableExists( + client::YBTableName(YQL_DATABASE_PGSQL, database_name, table_name)); + WARN_NOT_OK(ResultToStatus(ret), "TableExists call failed"); + return ret.ok() && ret.get() == exists; + }, MonoDelta::FromSeconds(timeout_secs), + Format("Verify Table $0 $1 exists in database $2", + table_name, (exists ? "" : "not"), database_name))); + } +} // namespace + void LogSessionErrorsAndDie(const FlushStatus& flush_status) { const auto& s = flush_status.status; CHECK(!s.ok()); @@ -158,5 +193,55 @@ std::shared_ptr CreateReadOp( return op; } +Result GetNamespaceIdByNamespaceName(YBClient* client, + const string& namespace_name) { + const auto namespaces = VERIFY_RESULT(client->ListNamespaces(YQL_DATABASE_PGSQL)); + for (const auto& ns : namespaces) { + if (ns.name() == namespace_name) { + return ns.id(); + } + } + return STATUS_SUBSTITUTE(NotFound, "The namespace $0 does not exist", namespace_name); +} + +Result GetTableIdByTableName(client::YBClient* client, + const string& namespace_name, + const string& table_name) { + const auto tables = VERIFY_RESULT(client->ListTables()); + for (const auto& t : tables) { + if (t.namespace_name() == namespace_name && t.table_name() == table_name) { + return t.table_id(); + } + } + return STATUS_SUBSTITUTE(NotFound, "The table $0 does not exist in namespace $1", + table_name, namespace_name); +} + +void VerifyNamespaceExists(YBClient *client, + const NamespaceName& db_name, + const int timeout_secs) { + VerifyNamespace(client, db_name, true /* exists */, timeout_secs); +} + +void VerifyNamespaceNotExists(YBClient *client, + const NamespaceName& db_name, + const int timeout_secs) { + VerifyNamespace(client, db_name, false /* exists */, timeout_secs); +} + +void VerifyTableExists(YBClient* client, + const string& database_name, + const string& table_name, + const int timeout_secs) { + VerifyTable(client, database_name, table_name, timeout_secs, true /* exists */); +} + +void VerifyTableNotExists(YBClient* client, + const string& database_name, + const string& table_name, + const int timeout_secs) { + VerifyTable(client, database_name, table_name, timeout_secs, false /* exists */); +} + } // namespace client } // namespace yb diff --git a/src/yb/client/client-test-util.h b/src/yb/client/client-test-util.h index 8322cb7699a8..8d35aa4afd6d 100644 --- a/src/yb/client/client-test-util.h +++ b/src/yb/client/client-test-util.h @@ -74,5 +74,23 @@ YBSchema YBSchemaFromSchema(const Schema& schema); std::shared_ptr CreateReadOp( int32_t key, const TableHandle& table, const std::string& value_column); +Result GetNamespaceIdByNamespaceName( + YBClient* client, const std::string& namespace_name); + +Result GetTableIdByTableName( + YBClient* client, const std::string& namespace_name, const std::string& table_name); + +void VerifyNamespaceExists(YBClient *client, const NamespaceName& db_name, int timeout_secs = 20); + +void VerifyNamespaceNotExists( + YBClient *client, const NamespaceName& db_name, int timeout_secs = 20); + +void VerifyTableExists( + YBClient* client, const std::string& db_name, const std::string& table_name, int timeout_secs); + +void VerifyTableNotExists( + YBClient* client, const std::string& db_name, const std::string& table_name, int timeout_secs); + + } // namespace client } // namespace yb diff --git a/src/yb/client/client.cc b/src/yb/client/client.cc index 236c1a800f55..52adcc16fe76 100644 --- a/src/yb/client/client.cc +++ b/src/yb/client/client.cc @@ -633,14 +633,18 @@ Status YBClient::DeleteTable(const YBTableName& table_name, bool wait) { wait); } -Status YBClient::DeleteTable(const string& table_id, bool wait, CoarseTimePoint deadline) { +Status YBClient::DeleteTable(const string& table_id, + bool wait, + const TransactionMetadata *txn, + CoarseTimePoint deadline) { return data_->DeleteTable(this, YBTableName(), table_id, false /* is_index_table */, PatchAdminDeadline(deadline), nullptr /* indexed_table_name */, - wait); + wait, + txn); } Status YBClient::DeleteIndexTable(const YBTableName& table_name, diff --git a/src/yb/client/client.h b/src/yb/client/client.h index 926f7dc10b81..5f8273ca2868 100644 --- a/src/yb/client/client.h +++ b/src/yb/client/client.h @@ -53,6 +53,7 @@ #include "yb/common/common_types.pb.h" #include "yb/common/entity_ids.h" #include "yb/common/retryable_request.h" +#include "yb/common/transaction.h" #include "yb/gutil/macros.h" #include "yb/gutil/port.h" @@ -258,8 +259,13 @@ class YBClient { // Delete the specified table. // Set 'wait' to true if the call must wait for the table to be fully deleted before returning. Status DeleteTable(const YBTableName& table_name, bool wait = true); - Status DeleteTable( - const std::string& table_id, bool wait = true, CoarseTimePoint deadline = CoarseTimePoint()); + // 'txn' describes the transaction that is performing this delete operation. For YSQL + // operations, YB-Master will perform the actual deletion only if this transaction is a + // success. + Status DeleteTable(const std::string& table_id, + bool wait = true, + const TransactionMetadata *txn = nullptr, + CoarseTimePoint deadline = CoarseTimePoint()); // Delete the specified index table. // Set 'wait' to true if the call must wait for the table to be fully deleted before returning. diff --git a/src/yb/client/snapshot_test_util.cc b/src/yb/client/snapshot_test_util.cc index 867925f31793..eabfeda43344 100644 --- a/src/yb/client/snapshot_test_util.cc +++ b/src/yb/client/snapshot_test_util.cc @@ -297,6 +297,31 @@ Result SnapshotTestUtil::CreateSchedule( return id; } +Result SnapshotTestUtil::CreateSchedule( + const NamespaceName& namespace_name, WaitSnapshot wait_snapshot, + MonoDelta interval, MonoDelta retention) { + + rpc::RpcController controller; + controller.set_timeout(60s); + master::CreateSnapshotScheduleRequestPB req; + auto& options = *req.mutable_options(); + options.set_interval_sec(interval.ToSeconds()); + options.set_retention_duration_sec(retention.ToSeconds()); + auto& tables = *options.mutable_filter()->mutable_tables()->mutable_tables(); + auto* ns = tables.Add()->mutable_namespace_(); + ns->set_name(namespace_name); + ns->set_database_type(YQLDatabase::YQL_DATABASE_PGSQL); + master::CreateSnapshotScheduleResponsePB resp; + RETURN_NOT_OK( + VERIFY_RESULT(MakeBackupServiceProxy()).CreateSnapshotSchedule(req, &resp, &controller)); + auto id = VERIFY_RESULT(FullyDecodeSnapshotScheduleId(resp.snapshot_schedule_id())); + if (wait_snapshot) { + RETURN_NOT_OK(WaitScheduleSnapshot(id, std::numeric_limits::max(), + HybridTime::kMin, 60s * kTimeMultiplier)); + } + return id; +} + Result SnapshotTestUtil::ListSchedules(const SnapshotScheduleId& id) { master::ListSnapshotSchedulesRequestPB req; master::ListSnapshotSchedulesResponsePB resp; @@ -338,9 +363,15 @@ Status SnapshotTestUtil::WaitScheduleSnapshot( return WaitScheduleSnapshot(schedule_id, std::numeric_limits::max(), min_hybrid_time); } +Status SnapshotTestUtil::WaitScheduleSnapshot( + const SnapshotScheduleId& schedule_id, int max_snapshots, HybridTime min_hybrid_time) { + return WaitScheduleSnapshot(schedule_id, max_snapshots, min_hybrid_time, + ((max_snapshots == 1) ? 0s : kSnapshotInterval) + kSnapshotInterval / 2); +} + Status SnapshotTestUtil::WaitScheduleSnapshot( const SnapshotScheduleId& schedule_id, int max_snapshots, - HybridTime min_hybrid_time) { + HybridTime min_hybrid_time, MonoDelta timeout) { return WaitFor([this, schedule_id, max_snapshots, min_hybrid_time]() -> Result { auto snapshots = VERIFY_RESULT(ListSnapshots()); EXPECT_LE(snapshots.size(), max_snapshots); @@ -354,7 +385,7 @@ Status SnapshotTestUtil::WaitScheduleSnapshot( } return false; }, - ((max_snapshots == 1) ? 0s : kSnapshotInterval) + kSnapshotInterval / 2, + timeout, "Schedule snapshot"); } diff --git a/src/yb/client/snapshot_test_util.h b/src/yb/client/snapshot_test_util.h index 69fb139a0168..fcde00269957 100644 --- a/src/yb/client/snapshot_test_util.h +++ b/src/yb/client/snapshot_test_util.h @@ -48,13 +48,13 @@ class SnapshotTestUtil { void SetProxy(rpc::ProxyCache* proxy_cache) { proxy_cache_ = proxy_cache; } - void SetCluster(MiniCluster* cluster) { + void SetCluster(MiniClusterBase* cluster) { cluster_ = cluster; } Result MakeBackupServiceProxy() { return master::MasterBackupProxy( - proxy_cache_, VERIFY_RESULT(cluster_->GetLeaderMiniMaster())->bound_rpc_addr()); + proxy_cache_, VERIFY_RESULT(cluster_->GetLeaderMasterBoundRpcAddr())); } Result SnapshotState(const TxnSnapshotId& snapshot_id); @@ -98,6 +98,9 @@ class SnapshotTestUtil { const YBTablePtr table, YQLDatabase db_type, const std::string& db_name, WaitSnapshot wait_snapshot, MonoDelta interval = kSnapshotInterval, MonoDelta retention = kSnapshotRetention); + Result CreateSchedule( + const NamespaceName& database, WaitSnapshot wait_snapshot, + MonoDelta interval = kSnapshotInterval, MonoDelta retention = kSnapshotRetention); Result ListSchedules(const SnapshotScheduleId& id = SnapshotScheduleId::Nil()); @@ -111,9 +114,13 @@ class SnapshotTestUtil { const SnapshotScheduleId& schedule_id, int max_snapshots = 1, HybridTime min_hybrid_time = HybridTime::kMin); + Status WaitScheduleSnapshot( + const SnapshotScheduleId& schedule_id, int max_snapshots, + HybridTime min_hybrid_time, MonoDelta timeout); + private: rpc::ProxyCache* proxy_cache_; - MiniCluster* cluster_; + MiniClusterBase* cluster_; }; } // namespace client diff --git a/src/yb/common/common_flags.cc b/src/yb/common/common_flags.cc index 7e2558fa5058..33a82caa5be0 100644 --- a/src/yb/common/common_flags.cc +++ b/src/yb/common/common_flags.cc @@ -57,6 +57,14 @@ DEFINE_UNKNOWN_bool(enable_wait_queues, false, "If true, use pessimistic locking behavior in conflict resolution."); TAG_FLAG(enable_wait_queues, evolving); +DEFINE_RUNTIME_bool(ysql_ddl_rollback_enabled, false, + "If true, failed YSQL DDL transactions that affect both pg catalog and DocDB schema " + "will be rolled back by YB-Master. Note that this is applicable only for few DDL " + "operations such as dropping a table, adding a column, renaming a column/table. This " + "flag should not be changed in the middle of a DDL operation."); +TAG_FLAG(ysql_ddl_rollback_enabled, hidden); +TAG_FLAG(ysql_ddl_rollback_enabled, advanced); + DEFINE_test_flag(bool, enable_db_catalog_version_mode, false, "Enable the per database catalog version mode, a DDL statement is assumed to " "only affect the current database and will only increment catalog version for " diff --git a/src/yb/master/catalog_entity_info.h b/src/yb/master/catalog_entity_info.h index 4dd470439db1..051a6940d418 100644 --- a/src/yb/master/catalog_entity_info.h +++ b/src/yb/master/catalog_entity_info.h @@ -386,6 +386,33 @@ struct PersistentTableInfo : public Persistent 0; + } + + auto ysql_ddl_txn_verifier_state() const { + // Currently DDL with savepoints is disabled, so this repeated field can have only 1 element. + DCHECK_EQ(pb.ysql_ddl_txn_verifier_state_size(), 1); + return pb.ysql_ddl_txn_verifier_state(0); + } + + bool is_being_deleted_by_ysql_ddl_txn() const { + return has_ysql_ddl_txn_verifier_state() && + ysql_ddl_txn_verifier_state().contains_drop_table_op(); + } + + bool is_being_created_by_ysql_ddl_txn() const { + return has_ysql_ddl_txn_verifier_state() && + ysql_ddl_txn_verifier_state().contains_create_table_op(); + } + // Helper to set the state of the tablet with a custom message. void set_state(SysTablesEntryPB::State state, const std::string& msg); }; diff --git a/src/yb/master/catalog_entity_info.proto b/src/yb/master/catalog_entity_info.proto index 7331821faa74..ddc6dbf8b613 100644 --- a/src/yb/master/catalog_entity_info.proto +++ b/src/yb/master/catalog_entity_info.proto @@ -163,9 +163,6 @@ message SysTablesEntryPB { repeated IndexInfoPB fully_applied_indexes = 26; optional IndexInfoPB fully_applied_index_info = 27; - // Optional: Table dependent upon transaction success (abort removes table). Used by YSQL. - optional TransactionMetadataPB transaction = 29; - // During an alter table, which involves no schema change but only updating // a permission, is it sometimes acceptable for a client's request (which is // prepared with the current schema) to be accepted by a tserver which is @@ -185,6 +182,24 @@ message SysTablesEntryPB { optional bytes transaction_table_tablespace_id = 34; // Time when the table was hidden. optional fixed64 hide_hybrid_time = 35; + + message YsqlDdlTxnVerifierState { + optional bool contains_create_table_op = 1; + optional bool contains_alter_table_op = 2; + optional bool contains_drop_table_op = 3; + + optional SchemaPB previous_schema = 4; + optional string previous_table_name = 5; + } + + // State that indicates that this table is being changed by a YSQL transaction. + // This repeated field contains only a single element as of now. When we support DDL + Savepoints, + // we will have one element for every savepoint modifying this table in this field. + repeated YsqlDdlTxnVerifierState ysql_ddl_txn_verifier_state = 38; + + // YSQL transaction that is currently modifying this table state. The changes being performed by + // it are detailed in 'ysql_ddl_txn_state' above. + optional TransactionMetadataPB transaction = 29; } // The on-disk entry in the sys.catalog table ("metadata" column) for diff --git a/src/yb/master/catalog_loaders.cc b/src/yb/master/catalog_loaders.cc index a472640ae460..45f03ae70992 100644 --- a/src/yb/master/catalog_loaders.cc +++ b/src/yb/master/catalog_loaders.cc @@ -50,6 +50,8 @@ DEFINE_UNKNOWN_bool(master_ignore_deleted_on_load, true, DEFINE_test_flag(uint64, slow_cluster_config_load_secs, 0, "When set, it pauses load of cluster config during sys catalog load."); +DECLARE_bool(ysql_ddl_rollback_enabled); + namespace yb { namespace master { @@ -113,14 +115,24 @@ Status TableLoader::Visit(const TableId& table_id, const SysTablesEntryPB& metad // Tables created as part of a Transaction should check transaction status and be deleted // if the transaction is aborted. if (metadata.has_transaction()) { - LOG(INFO) << "Enqueuing table for Transaction Verification: " << table->ToString(); TransactionMetadata txn = VERIFY_RESULT(TransactionMetadata::FromPB(metadata.transaction())); - std::function when_done = - std::bind(&CatalogManager::VerifyTablePgLayer, catalog_manager_, table, _1); - WARN_NOT_OK(catalog_manager_->background_tasks_thread_pool_->SubmitFunc( - std::bind(&YsqlTransactionDdl::VerifyTransaction, catalog_manager_->ysql_transaction_.get(), - txn, when_done)), - "Could not submit VerifyTransaction to thread pool"); + if (metadata.ysql_ddl_txn_verifier_state_size() > 0) { + if (FLAGS_ysql_ddl_rollback_enabled) { + catalog_manager_->ScheduleYsqlTxnVerification(table, txn); + } + } else { + // This is a table/index for which YSQL transaction verification is not supported yet. + // For these, we only support rolling back creating the table. If the transaction has + // completed, merely check for the presence of this entity in the PG catalog. + LOG(INFO) << "Enqueuing table for Transaction Verification: " << table->ToString(); + std::function when_done = + std::bind(&CatalogManager::VerifyTablePgLayer, catalog_manager_, table, _1); + WARN_NOT_OK(catalog_manager_->background_tasks_thread_pool_->SubmitFunc( + std::bind(&YsqlTransactionDdl::VerifyTransaction, + catalog_manager_->ysql_transaction_.get(), + txn, table, false /* has_ysql_ddl_txn_state */, when_done)), + "Could not submit VerifyTransaction to thread pool"); + } } LOG(INFO) << "Loaded metadata for table " << table->ToString() << ", state: " @@ -390,7 +402,11 @@ Status NamespaceLoader::Visit(const NamespaceId& ns_id, const SysNamespaceEntryP std::bind(&CatalogManager::VerifyNamespacePgLayer, catalog_manager_, ns, _1); WARN_NOT_OK(catalog_manager_->background_tasks_thread_pool_->SubmitFunc( std::bind(&YsqlTransactionDdl::VerifyTransaction, - catalog_manager_->ysql_transaction_.get(), txn, when_done)), + catalog_manager_->ysql_transaction_.get(), + txn, + nullptr /* table */, + false /* has_ysql_ddl_state */, + when_done)), "Could not submit VerifyTransaction to thread pool"); } break; diff --git a/src/yb/master/catalog_manager.cc b/src/yb/master/catalog_manager.cc index 9062ed787506..1f01db91283b 100644 --- a/src/yb/master/catalog_manager.cc +++ b/src/yb/master/catalog_manager.cc @@ -84,6 +84,7 @@ #include "yb/common/ql_type.h" #include "yb/common/roles_permissions.h" #include "yb/common/schema.h" +#include "yb/common/transaction.h" #include "yb/common/wire_protocol.h" #include "yb/consensus/consensus.h" @@ -344,6 +345,8 @@ DEFINE_RUNTIME_bool(enable_transactional_ddl_gc, true, "A kill switch for transactional DDL GC. Temporary safety measure."); TAG_FLAG(enable_transactional_ddl_gc, hidden); +DECLARE_bool(ysql_ddl_rollback_enabled); + // TODO: should this be a test flag? DEFINE_RUNTIME_bool(hide_pg_catalog_table_creation_logs, false, "Whether to hide detailed log messages for PostgreSQL catalog table creation. " @@ -3273,7 +3276,8 @@ Status CatalogManager::CreateYsqlSysTable( std::function when_done = std::bind(&CatalogManager::VerifyTablePgLayer, this, table, _1); WARN_NOT_OK(background_tasks_thread_pool_->SubmitFunc( - std::bind(&YsqlTransactionDdl::VerifyTransaction, ysql_transaction_.get(), txn, when_done)), + std::bind(&YsqlTransactionDdl::VerifyTransaction, ysql_transaction_.get(), + txn, table, false /* has_ysql_txn_ddl_state */, when_done)), "Could not submit VerifyTransaction to thread pool"); } @@ -4001,11 +4005,21 @@ Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req, // Tables with a transaction should be rolled back if the transaction does not get committed. // Store this on the table persistent state until the transaction has been a verified success. TransactionMetadata txn; - if (req.has_transaction() && FLAGS_enable_transactional_ddl_gc) { + bool schedule_ysql_txn_verifier = false; + if (req.has_transaction() && + (FLAGS_enable_transactional_ddl_gc || FLAGS_ysql_ddl_rollback_enabled)) { table->mutable_metadata()->mutable_dirty()->pb.mutable_transaction()-> CopyFrom(req.transaction()); txn = VERIFY_RESULT(TransactionMetadata::FromPB(req.transaction())); RSTATUS_DCHECK(!txn.status_tablet.empty(), Corruption, "Given incomplete Transaction"); + + // Set the YsqlTxnVerifierState. + if (FLAGS_ysql_ddl_rollback_enabled && !IsIndex(req) && !colocated && + !table->is_matview()) { + table->mutable_metadata()->mutable_dirty()->pb.add_ysql_ddl_txn_verifier_state()-> + set_contains_create_table_op(true); + schedule_ysql_txn_verifier = true; + } } if (PREDICT_FALSE(FLAGS_TEST_simulate_slow_table_create_secs > 0) && @@ -4089,13 +4103,18 @@ Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req, } // Verify Transaction gets committed, which occurs after table create finishes. - if (req.has_transaction() && PREDICT_TRUE(FLAGS_enable_transactional_ddl_gc)) { - LOG(INFO) << "Enqueuing table for Transaction Verification: " << req.name(); - std::function when_done = - std::bind(&CatalogManager::VerifyTablePgLayer, this, table, _1); - WARN_NOT_OK(background_tasks_thread_pool_->SubmitFunc( - std::bind(&YsqlTransactionDdl::VerifyTransaction, ysql_transaction_.get(), txn, when_done)), - "Could not submit VerifyTransaction to thread pool"); + if (req.has_transaction()) { + if (schedule_ysql_txn_verifier) { + ScheduleYsqlTxnVerification(table, txn); + } else if (PREDICT_TRUE(FLAGS_enable_transactional_ddl_gc)) { + LOG(INFO) << "Enqueuing table for Transaction Verification: " << req.name(); + std::function when_done = + std::bind(&CatalogManager::VerifyTablePgLayer, this, table, _1); + WARN_NOT_OK(background_tasks_thread_pool_->SubmitFunc( + std::bind(&YsqlTransactionDdl::VerifyTransaction, ysql_transaction_.get(), + txn, table, false /* has_ysql_ddl_txn_state */, when_done)), + "Could not submit VerifyTransaction to thread pool"); + } } LOG(INFO) << "Successfully created " << object_type << " " << table->ToString() << " in " @@ -5595,6 +5614,59 @@ Status CatalogManager::DeleteTable( } } + // Check whether DDL rollback is enabled. + if (FLAGS_ysql_ddl_rollback_enabled && req->has_transaction()) { + bool ysql_txn_verifier_state_present = false; + bool table_created_by_same_transaction = false; + auto l = table->LockForWrite(); + if (l->has_ysql_ddl_txn_verifier_state()) { + DCHECK(!l->pb_transaction_id().empty()); + + // If the table is already undergoing DDL transaction verification as part of a different + // transaction then fail this request. + if (l->pb_transaction_id() != req->transaction().transaction_id()) { + const Status s = STATUS(TryAgain, "Table is undergoing DDL transaction verification"); + return SetupError(resp->mutable_error(), MasterErrorPB::TABLE_SCHEMA_CHANGE_IN_PROGRESS, s); + } + // This DROP operation is part of a DDL transaction that has already made changes + // to this table. + ysql_txn_verifier_state_present = true; + // If this table is being dropped in the same transaction where it was being created, then + // it can be dropped without waiting for the transaction to end. This is because this table + // will be dropped anyway whether this transaction commits or aborts. + table_created_by_same_transaction = l->is_being_created_by_ysql_ddl_txn(); + } + + // If this table has not been created in the same transaction that is dropping it, mark this + // table for deletion upon successful commit of this transaction. + if (!table_created_by_same_transaction && + table->GetTableType() == PGSQL_TABLE_TYPE && !table->is_matview() && + !req->is_index_table() && !table->colocated()) { + // Setup a background task. It monitors the YSQL transaction. If it commits, the task drops + // the table. Otherwise it removes the deletion marker in the ysql_ddl_txn_verifier_state. + TransactionMetadata txn; + auto& pb = l.mutable_data()->pb; + if (!ysql_txn_verifier_state_present) { + pb.mutable_transaction()->CopyFrom(req->transaction()); + txn = VERIFY_RESULT(TransactionMetadata::FromPB(req->transaction())); + RSTATUS_DCHECK(!txn.status_tablet.empty(), Corruption, "Given incomplete Transaction"); + pb.add_ysql_ddl_txn_verifier_state()->set_contains_drop_table_op(true); + } else { + DCHECK_EQ(pb.ysql_ddl_txn_verifier_state_size(), 1); + pb.mutable_ysql_ddl_txn_verifier_state(0)->set_contains_drop_table_op(true); + } + // Upsert to sys_catalog. + RETURN_NOT_OK(sys_catalog_->Upsert(leader_ready_term(), table)); + // Update the in-memory state. + TRACE("Committing in-memory state as part of DeleteTable operation"); + l.Commit(); + if (!ysql_txn_verifier_state_present) { + ScheduleYsqlTxnVerification(table, txn); + } + return Status::OK(); + } + } + return DeleteTableInternal(req, resp, rpc); } @@ -6207,6 +6279,16 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req, auto l = table->LockForWrite(); RETURN_NOT_OK(CatalogManagerUtil::CheckIfTableDeletedOrNotVisibleToClient(l, resp)); + // If the table is already undergoing an alter operation, return failure. + if (FLAGS_ysql_ddl_rollback_enabled && l->has_ysql_ddl_txn_verifier_state()) { + DCHECK(req->has_transaction()); + DCHECK(req->transaction().has_transaction_id()); + if (l->pb_transaction_id() != req->transaction().transaction_id()) { + const Status s = STATUS(TryAgain, "Table is undergoing DDL transaction verification"); + return SetupError(resp->mutable_error(), MasterErrorPB::TABLE_SCHEMA_CHANGE_IN_PROGRESS, s); + } + } + bool has_changes = false; auto& table_pb = l.mutable_data()->pb; const TableName table_name = l->name(); @@ -6215,6 +6297,9 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req, // Calculate new schema for the on-disk state, not persisted yet. Schema new_schema; + Schema previous_schema; + RETURN_NOT_OK(SchemaFromPB(l->pb.schema(), &previous_schema)); + string previous_table_name = l->pb.name(); ColumnId next_col_id = ColumnId(l->pb.next_column_id()); if (req->alter_schema_steps_size() || req->has_alter_properties()) { TRACE("Apply alter schema"); @@ -6346,29 +6431,8 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req, SysTablesEntryPB::ALTERING, Substitute("Alter table version=$0 ts=$1", table_pb.version(), LocalTimeAsString())); - // Update sys-catalog with the new table schema. - TRACE("Updating metadata on disk"); - std::vector ddl_log_entry_pointers; - ddl_log_entry_pointers.reserve(ddl_log_entries.size()); - for (const auto& entry : ddl_log_entries) { - ddl_log_entry_pointers.push_back(&entry); - } - Status s = sys_catalog_->Upsert(leader_ready_term(), ddl_log_entry_pointers, table); - if (!s.ok()) { - s = s.CloneAndPrepend( - Substitute("An error occurred while updating sys-catalog tables entry: $0", - s.ToString())); - LOG(WARNING) << s.ToString(); - if (table->GetTableType() != PGSQL_TABLE_TYPE && - (req->has_new_namespace() || req->has_new_table_name())) { - LockGuard lock(mutex_); - VLOG_WITH_FUNC(3) << "Acquired the catalog manager lock"; - CHECK_EQ(table_names_map_.erase({new_namespace_id, new_table_name}), 1); - } - // TableMetadaLock follows RAII paradigm: when it leaves scope, - // 'l' will be unlocked, and the mutation will be aborted. - return CheckIfNoLongerLeaderAndSetupError(s, resp); - } + RETURN_NOT_OK(UpdateSysCatalogWithNewSchema( + table, ddl_log_entries, new_namespace_id, new_table_name, resp)); // Remove the old name. Not present if PGSQL. if (table->GetTableType() != PGSQL_TABLE_TYPE && @@ -6379,10 +6443,37 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req, table_names_map_.erase({namespace_id, table_name}); } + // Update a task to rollback alter if the corresponding YSQL transaction + // rolls back. + TransactionMetadata txn; + bool schedule_ysql_txn_verifier = false; + if (!FLAGS_ysql_ddl_rollback_enabled) { + // If DDL rollback is no longer enabled, make sure that there is no transaction + // verification state present. + table_pb.clear_ysql_ddl_txn_verifier_state(); + } else if (req->has_transaction() && table->GetTableType() == PGSQL_TABLE_TYPE && + !table->colocated()) { + if (!l->has_ysql_ddl_txn_verifier_state()) { + table_pb.mutable_transaction()->CopyFrom(req->transaction()); + auto *ddl_state = table_pb.add_ysql_ddl_txn_verifier_state(); + SchemaToPB(previous_schema, ddl_state->mutable_previous_schema()); + ddl_state->set_previous_table_name(previous_table_name); + schedule_ysql_txn_verifier = true; + } + txn = VERIFY_RESULT(TransactionMetadata::FromPB(req->transaction())); + RSTATUS_DCHECK(!txn.status_tablet.empty(), Corruption, "Given incomplete Transaction"); + DCHECK_EQ(table_pb.ysql_ddl_txn_verifier_state_size(), 1); + table_pb.mutable_ysql_ddl_txn_verifier_state(0)->set_contains_alter_table_op(true); + } + // Update the in-memory state. TRACE("Committing in-memory state"); l.Commit(); + // Verify Transaction gets committed, which occurs after table alter finishes. + if (schedule_ysql_txn_verifier) { + ScheduleYsqlTxnVerification(table, txn); + } RETURN_NOT_OK(SendAlterTableRequest(table, req)); // Increment transaction status version if needed. @@ -6395,6 +6486,38 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req, return Status::OK(); } +Status CatalogManager::UpdateSysCatalogWithNewSchema( + const scoped_refptr& table, + const std::vector& ddl_log_entries, + const string& new_namespace_id, + const string& new_table_name, + AlterTableResponsePB* resp) { + TRACE("Updating metadata on disk"); + std::vector ddl_log_entry_pointers; + ddl_log_entry_pointers.reserve(ddl_log_entries.size()); + for (const auto& entry : ddl_log_entries) { + ddl_log_entry_pointers.push_back(&entry); + } + Status s = sys_catalog_->Upsert(leader_ready_term(), ddl_log_entry_pointers, table); + if (!s.ok()) { + s = s.CloneAndPrepend( + Substitute("An error occurred while updating sys-catalog tables entry: $0", + s.ToString())); + LOG(WARNING) << s.ToString(); + if (table->GetTableType() != PGSQL_TABLE_TYPE && + (!new_namespace_id.empty() || !new_table_name.empty())) { + LockGuard lock(mutex_); + VLOG_WITH_FUNC(3) << "Acquired the catalog manager lock"; + CHECK_EQ(table_names_map_.erase({new_namespace_id, new_table_name}), 1); + } + if (resp) + return CheckIfNoLongerLeaderAndSetupError(s, resp); + + return CheckIfNoLongerLeader(s); + } + return Status::OK(); +} + Status CatalogManager::IsAlterTableDone(const IsAlterTableDoneRequestPB* req, IsAlterTableDoneResponsePB* resp) { // 1. Lookup the table and verify if it exists. @@ -6413,6 +6536,136 @@ Status CatalogManager::IsAlterTableDone(const IsAlterTableDoneRequestPB* req, return Status::OK(); } +void CatalogManager::ScheduleYsqlTxnVerification(const scoped_refptr& table, + const TransactionMetadata& txn) { + auto l = table->LockForRead(); + LOG(INFO) << "Enqueuing table for DDL transaction Verification: " << table->name() + << " id: " << table->id() << " schema version: " << l->pb.version(); + std::function when_done = + std::bind(&CatalogManager::YsqlTableSchemaChecker, this, table, + l->pb_transaction_id(), _1); + // For now, just print warning if submission to thread pool fails. Fix this as part of + // #13358. + WARN_NOT_OK(background_tasks_thread_pool_->SubmitFunc( + std::bind(&YsqlTransactionDdl::VerifyTransaction, ysql_transaction_.get(), txn, table, + true /* has_ysql_ddl_state */, when_done)), + "Could not submit VerifyTransaction to thread pool"); +} + +Status CatalogManager::YsqlTableSchemaChecker(scoped_refptr table, + const string& txn_id_pb, + bool txn_rpc_success) { + if (!txn_rpc_success) { + return STATUS_FORMAT(IllegalState, "Failed to find Transaction Status for table $0", + table->ToString()); + } + bool is_committed = VERIFY_RESULT(ysql_transaction_->PgSchemaChecker(table)); + return YsqlDdlTxnCompleteCallback(table, txn_id_pb, is_committed); +} + +Status CatalogManager::YsqlDdlTxnCompleteCallback(scoped_refptr table, + const string& txn_id_pb, + bool success) { + DCHECK(!txn_id_pb.empty()); + DCHECK(table); + const string& id = "table id: " + table->id(); + auto txn = VERIFY_RESULT(FullyDecodeTransactionId(txn_id_pb)); + auto l = table->LockForWrite(); + LOG(INFO) << "YsqlDdlTxnCompleteCallback for " << id + << " for transaction " << txn + << ": Success: " << (success ? "true" : "false") + << " ysql_ddl_txn_verifier_state: " + << l->ysql_ddl_txn_verifier_state().DebugString(); + + if (!l->has_ysql_ddl_txn_verifier_state()) { + // The table no longer has any DDL state to clean up. It was probably cleared by + // another thread, nothing to do. + LOG(INFO) << "YsqlDdlTxnCompleteCallback was invoked but no ysql transaction " + << "verification state found for " << id << " , ignoring"; + return Status::OK(); + } + + if (txn_id_pb != l->pb_transaction_id()) { + // Now the table has transaction state for a different transaction. + // Do nothing. + LOG(INFO) << "YsqlDdlTxnCompleteCallback was invoked for transaction " << txn << " but the " + << "schema contains state for a different transaction"; + return Status::OK(); + } + + if (success && !l->is_being_deleted_by_ysql_ddl_txn()) { + // This transaction was successful. We did not drop this table in this + // transaction. In that case, we have nothing else to do. + LOG(INFO) << "Clearing ysql_ddl_txn_verifier_state from " << id; + auto& pb = l.mutable_data()->pb; + pb.clear_ysql_ddl_txn_verifier_state(); + pb.clear_transaction(); + RETURN_NOT_OK(sys_catalog_->Upsert(leader_ready_term(), table)); + l.Commit(); + return Status::OK(); + } + + if ((success && l->is_being_deleted_by_ysql_ddl_txn()) || + (!success && l->is_being_created_by_ysql_ddl_txn())) { + // This is either a successful DROP operation or a failed CREATE operation. + // In both cases, drop the table. + LOG(INFO) << "Dropping " << id; + l.Commit(); + DeleteTableRequestPB dtreq; + DeleteTableResponsePB dtresp; + + dtreq.mutable_table()->set_table_name(table->name()); + dtreq.mutable_table()->set_table_id(table->id()); + dtreq.set_is_index_table(false); + return DeleteTableInternal(&dtreq, &dtresp, nullptr /* rpc */); + } + + auto& table_pb = l.mutable_data()->pb; + if (!success && l->ysql_ddl_txn_verifier_state().contains_alter_table_op()) { + LOG(INFO) << "Alter transaction " << txn << " failed, rolling back its schema changes"; + std::vector ddl_log_entries; + ddl_log_entries.emplace_back( + master_->clock()->Now(), + table->id(), + l->pb, + "Rollback of DDL Transaction"); + table_pb.mutable_schema()->CopyFrom(table_pb.ysql_ddl_txn_verifier_state(0).previous_schema()); + const string& new_table_name = table_pb.ysql_ddl_txn_verifier_state(0).previous_table_name(); + l.mutable_data()->pb.set_name(new_table_name); + table_pb.set_version(table_pb.version() + 1); + table_pb.set_updates_only_index_permissions(false); + l.mutable_data()->set_state( + SysTablesEntryPB::ALTERING, + Substitute("Alter table version=$0 ts=$1", table_pb.version(), LocalTimeAsString())); + + table_pb.clear_ysql_ddl_txn_verifier_state(); + table_pb.clear_transaction(); + + // Update sys-catalog with the new table schema. + RETURN_NOT_OK(UpdateSysCatalogWithNewSchema( + table, + ddl_log_entries, + "" /* new_namespace_id */, + new_table_name, + nullptr /* resp */)); + l.Commit(); + LOG(INFO) << "Sending Alter Table request as part of rollback for table " << table->name(); + return SendAlterTableRequestInternal(table, TransactionId::Nil()); + } + + // This must be a failed transaction with only a delete operation in it and no alter operations. + // Hence we don't need to do anything other than clear the transaction state. + DCHECK(!success && l->is_being_deleted_by_ysql_ddl_txn()) + << "Unexpected state for table " << table->ToString() << " with transaction verification " + << "state " << l->ysql_ddl_txn_verifier_state().DebugString(); + + table_pb.clear_ysql_ddl_txn_verifier_state(); + table_pb.clear_transaction(); + RETURN_NOT_OK(sys_catalog_->Upsert(leader_ready_term(), table)); + l.Commit(); + return Status::OK(); +} + Result CatalogManager::RegisterNewTabletForSplit( TabletInfo* source_tablet_info, const PartitionPB& partition, TableInfo::WriteLock* table_write_lock, TabletInfo::WriteLock* tablet_write_lock) { @@ -8055,7 +8308,7 @@ void CatalogManager::ProcessPendingNamespace( std::bind(&CatalogManager::VerifyNamespacePgLayer, this, ns, _1); WARN_NOT_OK(background_tasks_thread_pool_->SubmitFunc( std::bind(&YsqlTransactionDdl::VerifyTransaction, ysql_transaction_.get(), - txn, when_done)), + txn, nullptr /* table */, false /* has_ysql_ddl_state */, when_done)), "Could not submit VerifyTransaction to thread pool"); } } else { @@ -9688,9 +9941,7 @@ Status CatalogManager::StartRemoteBootstrap(const StartRemoteBootstrapRequestPB& } Status CatalogManager::SendAlterTableRequest(const scoped_refptr& table, - const AlterTableRequestPB* req) { - auto tablets = table->GetTablets(); - + const AlterTableRequestPB* req) { bool is_ysql_table_with_transaction_metadata = table->GetTableType() == TableType::PGSQL_TABLE_TYPE && req != nullptr && @@ -9719,6 +9970,12 @@ Status CatalogManager::SendAlterTableRequest(const scoped_refptr& tab txn_id = VERIFY_RESULT(FullyDecodeTransactionId(req->transaction().transaction_id())); } + return SendAlterTableRequestInternal(table, txn_id); +} + +Status CatalogManager::SendAlterTableRequestInternal(const scoped_refptr& table, + const TransactionId& txn_id) { + auto tablets = table->GetTablets(); for (const scoped_refptr& tablet : tablets) { auto call = std::make_shared(master_, AsyncTaskPool(), tablet, table, txn_id); table->AddTask(call); diff --git a/src/yb/master/catalog_manager.h b/src/yb/master/catalog_manager.h index 0f0e68e5ec2e..5e1bd96a3456 100644 --- a/src/yb/master/catalog_manager.h +++ b/src/yb/master/catalog_manager.h @@ -348,12 +348,30 @@ class CatalogManager : public tserver::TabletPeerLookupIf, AlterTableResponsePB* resp, rpc::RpcContext* rpc); + Status UpdateSysCatalogWithNewSchema( + const scoped_refptr& table, + const std::vector& ddl_log_entries, + const std::string& new_namespace_id, + const std::string& new_table_name, + AlterTableResponsePB* resp); + // Get the information about an in-progress alter operation. Status IsAlterTableDone(const IsAlterTableDoneRequestPB* req, IsAlterTableDoneResponsePB* resp); Result GetTableNamespaceId(TableId table_id) EXCLUDES(mutex_); + void ScheduleYsqlTxnVerification(const scoped_refptr& table, + const TransactionMetadata& txn); + + Status YsqlTableSchemaChecker(scoped_refptr table, + const std::string& txn_id_pb, + bool txn_rpc_success); + + Status YsqlDdlTxnCompleteCallback(scoped_refptr table, + const std::string& txn_id_pb, + bool success); + // Get the information about the specified table. Status GetTableSchema(const GetTableSchemaRequestPB* req, GetTableSchemaResponsePB* resp) override; @@ -1303,6 +1321,9 @@ class CatalogManager : public tserver::TabletPeerLookupIf, Status SendAlterTableRequest(const scoped_refptr& table, const AlterTableRequestPB* req = nullptr); + Status SendAlterTableRequestInternal(const scoped_refptr& table, + const TransactionId& txn_id); + // Start the background task to send the CopartitionTable() RPC to the leader for this // tablet. void SendCopartitionTabletRequest(const scoped_refptr& tablet, diff --git a/src/yb/master/master-path-handlers.cc b/src/yb/master/master-path-handlers.cc index 7c02a7723b81..ae7a05a4f911 100644 --- a/src/yb/master/master-path-handlers.cc +++ b/src/yb/master/master-path-handlers.cc @@ -1267,7 +1267,7 @@ void MasterPathHandlers::HandleTablePage(const Webserver::WebRequest& req, "Unable to determine Tablespace information."); *output << " Unable to determine Tablespace information."; } - *output << " \n \n"; + *output << " \n"; } else { // The table was associated with a tablespace, but that tablespace was not found. *output << " Replication Info:"; @@ -1279,7 +1279,41 @@ void MasterPathHandlers::HandleTablePage(const Webserver::WebRequest& req, << " used to refresh it is disabled."; } - *output << " \n \n"; + *output << " \n"; + } + + if (l->has_ysql_ddl_txn_verifier_state()) { + auto result = FullyDecodeTransactionId(l->pb.transaction().transaction_id()); + *output << " Verifying Ysql DDL Transaction: "; + if (result) + *output << result.get(); + else + *output << "Failed to decode transaction with error:" << result; + *output << " \n"; + + const bool contains_alter = l->pb.ysql_ddl_txn_verifier_state(0).contains_alter_table_op(); + *output << " Ysql DDL transaction Operations: " + << (l->is_being_created_by_ysql_ddl_txn() ? "Create " : "") + << (contains_alter ? " Alter " : "") + << (l->is_being_deleted_by_ysql_ddl_txn() ? "Delete" : "") + << " \n"; + if (contains_alter && !l->is_being_created_by_ysql_ddl_txn()) { + *output << " Previous table name: " + << l->pb.ysql_ddl_txn_verifier_state(0).previous_table_name() + << " \n \n"; + Schema previous_schema; + Status s = + SchemaFromPB(l->pb.ysql_ddl_txn_verifier_state(0).previous_schema(), &previous_schema); + if (s.ok()) { + *output << " Previous Schema\n"; + server::HtmlOutputSchemaTable(previous_schema, output); + *output << " Current Schema\n"; + } + } else { + *output << "\n"; + } + } else { + *output << "\n"; } Status s = SchemaFromPB(l->pb.schema(), &schema); diff --git a/src/yb/master/master_ddl.proto b/src/yb/master/master_ddl.proto index b103fc28a5f3..35d869c24a23 100644 --- a/src/yb/master/master_ddl.proto +++ b/src/yb/master/master_ddl.proto @@ -295,6 +295,7 @@ message GetBackfillJobsResponsePB { message DeleteTableRequestPB { required TableIdentifierPB table = 1; optional bool is_index_table = 2 [ default = false ]; + optional TransactionMetadataPB transaction = 3; } message DeleteTableResponsePB { diff --git a/src/yb/master/master_types.proto b/src/yb/master/master_types.proto index d1def33298dd..ede2787a63f7 100644 --- a/src/yb/master/master_types.proto +++ b/src/yb/master/master_types.proto @@ -141,6 +141,8 @@ message MasterErrorPB { TABLET_NOT_RUNNING = 38; TABLE_NOT_RUNNING = 39; + + TABLE_SCHEMA_CHANGE_IN_PROGRESS = 40; } // The error code. diff --git a/src/yb/master/ysql_transaction_ddl.cc b/src/yb/master/ysql_transaction_ddl.cc index b7db9cd6fa4e..d6ef3a599a49 100644 --- a/src/yb/master/ysql_transaction_ddl.cc +++ b/src/yb/master/ysql_transaction_ddl.cc @@ -18,8 +18,6 @@ #include "yb/common/ql_expr.h" #include "yb/common/wire_protocol.h" -#include "yb/docdb/doc_rowwise_iterator.h" - #include "yb/gutil/casts.h" #include "yb/master/sys_catalog.h" @@ -40,6 +38,9 @@ DEFINE_UNKNOWN_int32(ysql_transaction_bg_task_wait_ms, 200, "Amount of time the catalog manager background task thread waits " "between runs"); +using std::string; +using std::vector; + namespace yb { namespace master { @@ -48,9 +49,78 @@ YsqlTransactionDdl::~YsqlTransactionDdl() { rpcs_.Shutdown(); } +Result YsqlTransactionDdl::PgEntryExists(TableId pg_table_id, Result entry_oid, + TableId relfilenode_oid) { + vector col_names = {"oid"}; + bool is_matview = relfilenode_oid.empty() ? false : true; + if (is_matview) { + col_names.emplace_back("relfilenode"); + } + + Schema projection; + auto iter = VERIFY_RESULT(GetPgCatalogTableScanIterator(pg_table_id, + "oid", VERIFY_RESULT(std::move(entry_oid)), std::move(col_names), &projection)); + + // If no rows found, the entry does not exist. + if (!VERIFY_RESULT(iter->HasNext())) { + return false; + } + + // The entry exists. Expect only one row. + QLTableRow row; + RETURN_NOT_OK(iter->NextRow(&row)); + CHECK(!VERIFY_RESULT(iter->HasNext())); + if (is_matview) { + const auto relfilenode_col_id = VERIFY_RESULT(projection.ColumnIdByName("relfilenode")).rep(); + const auto& relfilenode = row.GetValue(relfilenode_col_id); + if (relfilenode->uint32_value() != VERIFY_RESULT(GetPgsqlTableOid(relfilenode_oid))) { + return false; + } + } + return true; +} + +Result> +YsqlTransactionDdl::GetPgCatalogTableScanIterator(const TableId& pg_catalog_table_id, + const string& oid_col_name, + uint32_t oid_value, + std::vector col_names, + Schema *projection) { + + auto tablet_peer = sys_catalog_->tablet_peer(); + if (!tablet_peer || !tablet_peer->tablet()) { + return STATUS(ServiceUnavailable, "SysCatalog unavailable"); + } + const tablet::Tablet* catalog_tablet = tablet_peer->tablet(); + DCHECK(catalog_tablet->metadata()); + const Schema& schema = + VERIFY_RESULT(catalog_tablet->metadata()->GetTableInfo(pg_catalog_table_id))->schema(); + + // Use Scan to query the given table, filtering by lookup_oid_col. + RETURN_NOT_OK(schema.CreateProjectionByNames(col_names, projection, schema.num_key_columns())); + const auto oid_col_id = VERIFY_RESULT(projection->ColumnIdByName(oid_col_name)).rep(); + auto iter = VERIFY_RESULT(catalog_tablet->NewRowIterator( + projection->CopyWithoutColumnIds(), {} /* read_hybrid_time */, pg_catalog_table_id)); + + auto doc_iter = down_cast(iter.get()); + PgsqlConditionPB cond; + cond.add_operands()->set_column_id(oid_col_id); + cond.set_op(QL_OP_EQUAL); + cond.add_operands()->mutable_value()->set_uint32_value(oid_value); + const std::vector empty_key_components; + docdb::DocPgsqlScanSpec spec( + *projection, rocksdb::kDefaultQueryId, empty_key_components, empty_key_components, + &cond, boost::none /* hash_code */, boost::none /* max_hash_code */, nullptr /* where */); + RETURN_NOT_OK(doc_iter->Init(spec)); + return iter; +} + void YsqlTransactionDdl::VerifyTransaction( const TransactionMetadata& transaction_metadata, + scoped_refptr table, + bool has_ysql_ddl_txn_state, std::function complete_callback) { + SleepFor(MonoDelta::FromMilliseconds(FLAGS_ysql_transaction_bg_task_wait_ms)); YB_LOG_EVERY_N_SECS(INFO, 1) << "Verifying Transaction " << transaction_metadata; @@ -74,30 +144,41 @@ void YsqlTransactionDdl::VerifyTransaction( // We need to query the TransactionCoordinator here. Can't use TransactionStatusResolver in // TransactionParticipant since this TransactionMetadata may not have any actual data flushed yet. *rpc_handle = client::GetTransactionStatus( - TransactionRpcDeadline(), - nullptr /* tablet */, - client, - &req, - [this, rpc_handle, transaction_metadata, complete_callback] - (Status status, const tserver::GetTransactionStatusResponsePB& resp) { - auto retained = rpcs_.Unregister(rpc_handle); - TransactionReceived(transaction_metadata, complete_callback, std::move(status), resp); - }); + TransactionRpcDeadline(), + nullptr /* tablet */, + client, + &req, + [this, rpc_handle, transaction_metadata, table, has_ysql_ddl_txn_state, complete_callback] + (Status status, const tserver::GetTransactionStatusResponsePB& resp) { + auto retained = rpcs_.Unregister(rpc_handle); + TransactionReceived(transaction_metadata, table, has_ysql_ddl_txn_state, + complete_callback, std::move(status), resp); + }); (**rpc_handle).SendRpc(); } void YsqlTransactionDdl::TransactionReceived( const TransactionMetadata& transaction, + scoped_refptr table, + bool has_ysql_ddl_txn_state, std::function complete_callback, Status txn_status, const tserver::GetTransactionStatusResponsePB& resp) { + if (has_ysql_ddl_txn_state) { + // This was invoked for a table for which YSQL DDL rollback is enabled. Verify that it + // contains ysql_ddl_txn_state even now. + DCHECK(table->LockForRead()->has_ysql_ddl_txn_verifier_state()); + } + if (!txn_status.ok()) { LOG(WARNING) << "Transaction Status attempt (" << transaction.ToString() << ") failed with status " << txn_status; WARN_NOT_OK(thread_pool_->SubmitFunc([complete_callback] () { WARN_NOT_OK(complete_callback(false /* txn_rpc_success */), "Callback failure"); }), "Failed to enqueue callback"); + return; // #5981: Improve failure handling to retry transient errors or recognize transaction complete. - } else if (resp.has_error()) { + } + if (resp.has_error()) { const Status s = StatusFromPB(resp.error().status()); const tserver::TabletServerErrorPB::Code code = resp.error().code(); LOG(WARNING) << "Transaction Status attempt (" << transaction.ToString() @@ -107,86 +188,217 @@ void YsqlTransactionDdl::TransactionReceived( WARN_NOT_OK(complete_callback(false /* txn_rpc_success */), "Callback failure"); }), "Failed to enqueue callback"); // #5981: Maybe have the same heuristic as above? - } else { - YB_LOG_EVERY_N_SECS(INFO, 1) << "Got Response for " << transaction.ToString() - << ", resp: " << resp.ShortDebugString(); - bool is_pending = (resp.status_size() == 0); - for (int i = 0; i < resp.status_size() && !is_pending; ++i) { - // NOTE: COMMITTED state is also "pending" because we need APPLIED. - is_pending = resp.status(i) == TransactionStatus::PENDING || - resp.status(i) == TransactionStatus::COMMITTED; - } - if (is_pending) { - // Re-enqueue if transaction is still pending. - WARN_NOT_OK(thread_pool_->SubmitFunc( - std::bind(&YsqlTransactionDdl::VerifyTransaction, this, transaction, complete_callback)), - "Could not submit VerifyTransaction to thread pool"); - } else { - // If this transaction isn't pending, then the transaction is in a terminal state. - // Note: We ignore the resp.status() now, because it could be ABORT'd but actually a SUCCESS. - WARN_NOT_OK(thread_pool_->SubmitFunc([complete_callback] () { - WARN_NOT_OK(complete_callback(true /* txn_rpc_success */), "Callback failure"); - }), "Failed to enqueue callback"); - } + return; } + YB_LOG_EVERY_N_SECS(INFO, 1) << "Got Response for " << transaction.ToString() + << ", resp: " << resp.ShortDebugString(); + bool is_pending = (resp.status_size() == 0); + for (int i = 0; i < resp.status_size() && !is_pending; ++i) { + // NOTE: COMMITTED state is also "pending" because we need APPLIED. + is_pending = resp.status(i) == TransactionStatus::PENDING || + resp.status(i) == TransactionStatus::COMMITTED; + } + if (is_pending) { + // Re-enqueue if transaction is still pending. + WARN_NOT_OK(thread_pool_->SubmitFunc( + std::bind(&YsqlTransactionDdl::VerifyTransaction, this, + transaction, table, has_ysql_ddl_txn_state, complete_callback)), + "Could not submit VerifyTransaction to thread pool"); + return; + } + // If this transaction isn't pending, then the transaction is in a terminal state. + // Note: We ignore the resp.status() now, because it could be ABORT'd but actually a SUCCESS. + // Determine whether the transaction was a success by comparing with the PG schema. + WARN_NOT_OK(thread_pool_->SubmitFunc([complete_callback] () { + WARN_NOT_OK(complete_callback(true /* txn_rpc_success */), "Callback failure"); + }), "Failed to enqueue callback"); } -Result YsqlTransactionDdl::PgEntryExists(TableId pg_table_id, Result entry_oid, - TableId relfilenode_oid) { - auto tablet_peer = sys_catalog_->tablet_peer(); - if (!tablet_peer) { - return STATUS(ServiceUnavailable, "SysCatalog unavailable"); +Result YsqlTransactionDdl::PgSchemaChecker(const scoped_refptr& table) { + const uint32_t database_oid = VERIFY_RESULT(GetPgsqlDatabaseOidByTableId(table->id())); + const auto& pg_catalog_table_id = GetPgsqlTableId(database_oid, kPgClassTableOid); + + Schema projection; + uint32_t oid = VERIFY_RESULT(GetPgsqlTableOid(table->id())); + auto iter = VERIFY_RESULT(GetPgCatalogTableScanIterator(pg_catalog_table_id, + "oid" /* oid_col_name */, + oid, + {"oid", "relname"}, + &projection)); + + QLTableRow row; + auto l = table->LockForRead(); + if (!l->has_ysql_ddl_txn_verifier_state()) { + // The table no longer has transaction verifier state on it, it was probably cleaned up + // concurrently. + return STATUS_FORMAT(Aborted, "Not performing transaction verification for table $0 as it no " + "longer has any transaction verification state", table->ToString()); + } + // Table not found in pg_class. This can only happen in two cases: Table creation failed, + // or a table deletion went through successfully. + if (!VERIFY_RESULT(iter->HasNext())) { + if (l->is_being_deleted_by_ysql_ddl_txn()) { + return true; + } + CHECK(l->is_being_created_by_ysql_ddl_txn()); + return false; } - auto shared_tablet = VERIFY_RESULT(tablet_peer->shared_tablet_safe()); - const tablet::Tablet* catalog_tablet = shared_tablet.get(); - const Schema& pg_database_schema = - VERIFY_RESULT(catalog_tablet->metadata()->GetTableInfo(pg_table_id))->schema(); - bool is_matview = relfilenode_oid.empty() ? false : true; - std::vector col_names; + // Table found in pg_class. + if (l->is_being_deleted_by_ysql_ddl_txn()) { + LOG(INFO) << "Ysql Drop transaction for " << table->ToString() + << " detected to have failed as table found " + << "in PG catalog"; + return false; + } - if (is_matview) { - col_names = {"oid", "relfilenode"}; - } else { - col_names = {"oid"}; + if (l->is_being_created_by_ysql_ddl_txn()) { + return true; + } + + // Table was being altered. Check whether its current DocDB schema matches + // that of PG catalog. + CHECK(l->ysql_ddl_txn_verifier_state().contains_alter_table_op()); + RETURN_NOT_OK(iter->NextRow(&row)); + const auto relname_col_id = VERIFY_RESULT(projection.ColumnIdByName("relname")).rep(); + const auto& relname_col = row.GetValue(relname_col_id); + const string& table_name = relname_col->string_value(); + + const string fail_msg = "Alter transaction on " + table->ToString() + " failed."; + if (table->name().compare(table_name) != 0) { + // Table name does not match. + LOG(INFO) << fail_msg << Format(" Expected table name: $0 Table name in PG: $1", + table->name(), table_name); + CHECK_EQ(table_name, l->ysql_ddl_txn_verifier_state().previous_table_name()); + return false; } - // Use Scan to query the 'pg_database' table, filtering by our 'oid'. + vector pg_cols = VERIFY_RESULT(ReadPgAttribute(table)); + // In DocDB schema, columns are sorted based on 'order'. + sort(pg_cols.begin(), pg_cols.end(), [](const auto& lhs, const auto& rhs) { + return lhs.order < rhs.order; + }); + + Schema schema; + RETURN_NOT_OK(table->GetSchema(&schema)); + if (MatchPgDocDBSchemaColumns(table, schema, pg_cols)) { + // The PG catalog schema matches the current DocDB schema. The transaction was a success. + return true; + } + + Schema previous_schema; + RETURN_NOT_OK(SchemaFromPB(l->ysql_ddl_txn_verifier_state().previous_schema(), &previous_schema)); + if (MatchPgDocDBSchemaColumns(table, previous_schema, pg_cols)) { + // The PG catalog schema matches the DocDB schema of the table prior to this transaction. The + // transaction must have aborted. + return false; + } + + // The PG catalog schema does not match either the current schema nor the previous schema. This + // is an unexpected state, do nothing. + return STATUS_FORMAT(IllegalState, "Failed to verify transaction for table $0", + table->ToString()); +} + +bool YsqlTransactionDdl::MatchPgDocDBSchemaColumns( + const scoped_refptr& table, + const Schema& schema, + const vector& pg_cols) { + + const string& fail_msg = "Schema mismatch for table " + table->ToString(); + const std::vector& columns = schema.columns(); + + size_t i = 0; + for (const auto& col : columns) { + // 'ybrowid' is a column present only in DocDB. Skip it. + if (col.name() == "ybrowid") { + continue; + } + + if (i >= pg_cols.size()) { + LOG(INFO) << fail_msg << Format(" Expected num_columns: $0 num_columns in PG: $1", + columns.size(), pg_cols.size()); + return false; + } + + if (col.name().compare(pg_cols[i].attname) != 0) { + LOG(INFO) << fail_msg << Format(" Expected column name with attnum: $0 is :$1" + " but column name at PG is $2", pg_cols[i].order, col.name(), pg_cols[i].attname); + return false; + } + + // Verify whether attnum matches. + if (col.order() != pg_cols[i].order) { + LOG(INFO) << fail_msg << Format(" At index $0 expected attnum is $1 but actual attnum is $2", + i, col.order(), pg_cols[i].order); + return false; + } + i++; + } + + return true; +} + +Result> +YsqlTransactionDdl::ReadPgAttribute(scoped_refptr table) { + // Build schema using values read from pg_attribute. + auto tablet_peer = sys_catalog_->tablet_peer(); + const tablet::TabletPtr tablet = tablet_peer->shared_tablet(); + + const uint32_t database_oid = VERIFY_RESULT(GetPgsqlDatabaseOidByTableId(table->id())); + const uint32_t table_oid = VERIFY_RESULT(GetPgsqlTableOid(table->id())); + const auto& pg_attribute_table_id = GetPgsqlTableId(database_oid, kPgAttributeTableOid); + Schema projection; - RETURN_NOT_OK(pg_database_schema.CreateProjectionByNames(col_names, &projection, - pg_database_schema.num_key_columns())); - const auto oid_col_id = VERIFY_RESULT(projection.ColumnIdByName("oid")).rep(); - auto iter = VERIFY_RESULT(catalog_tablet->NewRowIterator( - projection.CopyWithoutColumnIds(), {} /* read_hybrid_time */, pg_table_id)); - auto e_oid_val = VERIFY_RESULT(std::move(entry_oid)); - { - auto doc_iter = down_cast(iter.get()); - PgsqlConditionPB cond; - cond.add_operands()->set_column_id(oid_col_id); - cond.set_op(QL_OP_EQUAL); - cond.add_operands()->mutable_value()->set_uint32_value(e_oid_val); - const std::vector empty_key_components; - docdb::DocPgsqlScanSpec spec( - projection, rocksdb::kDefaultQueryId, empty_key_components, empty_key_components, - &cond, boost::none /* hash_code */, boost::none /* max_hash_code */, nullptr /* where */); - RETURN_NOT_OK(doc_iter->Init(spec)); - } - - // Expect exactly one row, which means the transaction was a success. - QLTableRow row; - if (VERIFY_RESULT(iter->HasNext())) { + uint32_t oid = VERIFY_RESULT(GetPgsqlTableOid(table->id())); + auto iter = VERIFY_RESULT(GetPgCatalogTableScanIterator( + pg_attribute_table_id, + "attrelid" /* col_name */, + oid, + {"attrelid", "attnum", "attname", "atttypid"}, + &projection)); + + const auto attname_col_id = VERIFY_RESULT(projection.ColumnIdByName("attname")).rep(); + const auto atttypid_col_id = VERIFY_RESULT(projection.ColumnIdByName("atttypid")).rep(); + const auto attnum_col_id = VERIFY_RESULT(projection.ColumnIdByName("attnum")).rep(); + + vector pg_cols; + while (VERIFY_RESULT(iter->HasNext())) { + QLTableRow row; RETURN_NOT_OK(iter->NextRow(&row)); - if (is_matview) { - const auto relfilenode_col_id = VERIFY_RESULT(projection.ColumnIdByName("relfilenode")).rep(); - const auto& relfilenode = row.GetValue(relfilenode_col_id); - if (relfilenode->uint32_value() != VERIFY_RESULT(GetPgsqlTableOid(relfilenode_oid))) { - return false; - } - return true; + + const auto& attname_col = row.GetValue(attname_col_id); + const auto& atttypid_col = row.GetValue(atttypid_col_id); + const auto& attnum_col = row.GetValue(attnum_col_id); + if (!attname_col || !atttypid_col || !attnum_col) { + std::string corrupted_col = + !attname_col ? "attname" : !atttypid_col ? "atttypid" : "attnum"; + return STATUS_FORMAT( + Corruption, + "Could not read $0 column from pg_attribute for attrelid: $1 database_oid: $2", + corrupted_col, table_oid, database_oid); } - return true; + + const int32_t attnum = attnum_col->int16_value(); + if (attnum < 0) { + // Ignore system columns. + VLOG(3) << "Ignoring system column (attnum = " << attnum_col->int16_value() + << ") for attrelid:" << table_oid; + continue; + } + string attname = attname_col->string_value(); + uint32_t atttypid = atttypid_col->uint32_value(); + if (atttypid == 0) { + // Ignore dropped columns. + VLOG(3) << "Ignoring dropped column " << attname << " (atttypid = 0)" + << " for attrelid:" << table_oid; + continue; + } + VLOG(3) << "attrelid: " << table_oid << " attname: " << attname << " atttypid: " << atttypid; + pg_cols.emplace_back(attnum, attname); } - return false; + + return pg_cols; } } // namespace master diff --git a/src/yb/master/ysql_transaction_ddl.h b/src/yb/master/ysql_transaction_ddl.h index 699f9598b614..4bb653465687 100644 --- a/src/yb/master/ysql_transaction_ddl.h +++ b/src/yb/master/ysql_transaction_ddl.h @@ -19,7 +19,11 @@ #include "yb/client/client_fwd.h" #include "yb/common/entity_ids.h" +#include "yb/common/transaction.h" +#include "yb/docdb/doc_rowwise_iterator.h" + +#include "yb/master/catalog_entity_info.h" #include "yb/master/master_fwd.h" #include "yb/rpc/rpc.h" @@ -34,7 +38,41 @@ class GetTransactionStatusResponsePB; namespace master { +/* + * Currently the metadata for YSQL Tables is stored in both the PG catalog and DocDB schema. + * This class helps maintain consistency between the two schemas. When a DDL transaction fails, + * since the PG catalog is modified using the DocDB transactions framework, the changes to the + * PG catalog are automatically rolled-back. This class helps perform similar rollback for the + * DocDB schema upon transaction failure. + * When a DDL transaction modifies the DocDB schema, the transaction metadata, and the current + * schema of the table is stored in the SysTablesEntryPB. At this point, future DDL operations on + * this table are blocked until verification is complete. A poller is scheduled through this + * class. It monitors whether the transaction is complete. Once the transaction is detected to + * be complete, it compares the PG catalog and the DocDB schema and finds whether the transaction + * is a success or a failure. + * Based on whether the transaction was a success or failure, the CatalogManager will effect + * rollback or roll-forward of the DocDB schema. + * + * Note that the above protocol introduces eventual consistency between the two types of metadata. + * However this mostly not affect correctness because of the following two properties: + * 1) When inconsistent, the DocDB schema always has more columns/constraints than PG schema. + * 2) Clients always use the PG schema (which is guaranteed to not return uncommitted state) + * to prepare their read/write requests. + * + * These two properties ensure that we can't have orphaned data or failed integrity checks + * or use DDL entities created by uncommitted transactions. + */ + class YsqlTransactionDdl { + struct PgColumnFields { + // Order determines the order in which the columns were created. This is equal to the + // 'attnum' field in the pg_attribute table in PG catalog. + int order; + std::string attname; + + PgColumnFields(int attnum, std::string name) : order(attnum), attname(name) {} + }; + public: YsqlTransactionDdl( const SysCatalogTable* sys_catalog, std::shared_future client_future, @@ -49,15 +87,38 @@ class YsqlTransactionDdl { } void VerifyTransaction(const TransactionMetadata& transaction, + scoped_refptr table, + bool has_ysql_ddl_txn_state, std::function complete_callback); + Result PgEntryExists(TableId tableId, Result entry_oid, TableId relfilenode_oid); + Result PgSchemaChecker(const scoped_refptr& table); + protected: void TransactionReceived(const TransactionMetadata& transaction, + scoped_refptr table, + bool has_ysql_ddl_txn_state, std::function complete_callback, Status txn_status, const tserver::GetTransactionStatusResponsePB& response); + bool MatchPgDocDBSchemaColumns(const scoped_refptr& table, + const Schema& schema, + const std::vector& pg_cols); + + Result> ReadPgAttribute(scoped_refptr table); + + // Scan table 'pg_table_id' for all rows that satisfy the SQL filter + // 'WHERE old_col_name = oid_value'. Each returned row contains the columns specified in + // 'col_names'. + Result> GetPgCatalogTableScanIterator( + const TableId& pg_catalog_table_id, + const std::string& oid_col_name, + uint32_t oid_value, + std::vector col_names, + Schema *projection); + const SysCatalogTable* sys_catalog_; std::shared_future client_future_; ThreadPool* thread_pool_; diff --git a/src/yb/tserver/pg_client_session.cc b/src/yb/tserver/pg_client_session.cc index 901a700eaf40..eaf0e907c5db 100644 --- a/src/yb/tserver/pg_client_session.cc +++ b/src/yb/tserver/pg_client_session.cc @@ -54,6 +54,7 @@ using std::string; DECLARE_bool(ysql_serializable_isolation_for_ddl_txn); +DECLARE_bool(ysql_ddl_rollback_enabled); namespace yb { namespace tserver { @@ -430,7 +431,13 @@ Status PgClientSession::DropTable( return Status::OK(); } - RETURN_NOT_OK(client().DeleteTable(yb_table_id, true, context->GetClientDeadline())); + const auto* metadata = VERIFY_RESULT(GetDdlTransactionMetadata( + true /* use_transaction */, context->GetClientDeadline())); + // If ddl rollback is enabled, the table will not be deleted now, so we cannot wait for the + // table deletion to complete. The table will be deleted in the background only after the + // transaction has been determined to be a success. + RETURN_NOT_OK(client().DeleteTable(yb_table_id, !FLAGS_ysql_ddl_rollback_enabled, metadata, + context->GetClientDeadline())); table_cache_.Invalidate(yb_table_id); return Status::OK(); } diff --git a/src/yb/yql/pggate/pggate.cc b/src/yb/yql/pggate/pggate.cc index e37209a00ccc..c89020a7f2df 100644 --- a/src/yb/yql/pggate/pggate.cc +++ b/src/yb/yql/pggate/pggate.cc @@ -1053,6 +1053,14 @@ Status PgApiImpl::ExecPostponedDdlStmt(PgStatement *handle) { return STATUS(InvalidArgument, "Invalid statement handle"); } +Status PgApiImpl::ExecDropTable(PgStatement *handle) { + if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_DROP_TABLE)) { + return STATUS(InvalidArgument, "Invalid statement handle"); + } + + return down_cast(handle)->Exec(); +} + Status PgApiImpl::BackfillIndex(const PgObjectId& table_id) { tserver::PgBackfillIndexRequestPB req; table_id.ToPB(req.mutable_table_id()); diff --git a/src/yb/yql/pggate/pggate.h b/src/yb/yql/pggate/pggate.h index ffea9892859e..9762c8c27f7d 100644 --- a/src/yb/yql/pggate/pggate.h +++ b/src/yb/yql/pggate/pggate.h @@ -353,6 +353,8 @@ class PgApiImpl { Status ExecPostponedDdlStmt(PgStatement *handle); + Status ExecDropTable(PgStatement *handle); + Status BackfillIndex(const PgObjectId& table_id); //------------------------------------------------------------------------------------------------ diff --git a/src/yb/yql/pggate/ybc_pg_typedefs.h b/src/yb/yql/pggate/ybc_pg_typedefs.h index c8ea7c25b969..4956bc241baf 100644 --- a/src/yb/yql/pggate/ybc_pg_typedefs.h +++ b/src/yb/yql/pggate/ybc_pg_typedefs.h @@ -343,6 +343,7 @@ typedef struct PgGFlagsAccessor { const uint64_t* ysql_session_max_batch_size; const bool* ysql_sleep_before_retry_on_txn_conflict; const bool* ysql_colocate_database_by_default; + const bool* ysql_ddl_rollback_enabled; } YBCPgGFlagsAccessor; typedef struct YbTablePropertiesData { diff --git a/src/yb/yql/pggate/ybc_pggate.cc b/src/yb/yql/pggate/ybc_pggate.cc index 5eb4242b5071..ccd7b6b84b3d 100644 --- a/src/yb/yql/pggate/ybc_pggate.cc +++ b/src/yb/yql/pggate/ybc_pggate.cc @@ -71,6 +71,8 @@ DECLARE_int32(delay_alter_sequence_sec); DECLARE_int32(client_read_write_timeout_ms); +DECLARE_bool(ysql_ddl_rollback_enabled); + DEFINE_UNKNOWN_bool(ysql_enable_reindex, false, "Enable REINDEX INDEX statement."); TAG_FLAG(ysql_enable_reindex, advanced); @@ -736,6 +738,10 @@ YBCStatus YBCPgExecPostponedDdlStmt(YBCPgStatement handle) { return ToYBCStatus(pgapi->ExecPostponedDdlStmt(handle)); } +YBCStatus YBCPgExecDropTable(YBCPgStatement handle) { + return ToYBCStatus(pgapi->ExecDropTable(handle)); +} + YBCStatus YBCPgBackfillIndex( const YBCPgOid database_oid, const YBCPgOid index_oid) { @@ -1279,7 +1285,8 @@ const YBCPgGFlagsAccessor* YBCGetGFlags() { .ysql_sequence_cache_minval = &FLAGS_ysql_sequence_cache_minval, .ysql_session_max_batch_size = &FLAGS_ysql_session_max_batch_size, .ysql_sleep_before_retry_on_txn_conflict = &FLAGS_ysql_sleep_before_retry_on_txn_conflict, - .ysql_colocate_database_by_default = &FLAGS_ysql_colocate_database_by_default + .ysql_colocate_database_by_default = &FLAGS_ysql_colocate_database_by_default, + .ysql_ddl_rollback_enabled = &FLAGS_ysql_ddl_rollback_enabled }; return &accessor; } diff --git a/src/yb/yql/pggate/ybc_pggate.h b/src/yb/yql/pggate/ybc_pggate.h index 71e0da8bf324..bb752b56e883 100644 --- a/src/yb/yql/pggate/ybc_pggate.h +++ b/src/yb/yql/pggate/ybc_pggate.h @@ -303,6 +303,8 @@ YBCStatus YBCPgNewDropIndex(YBCPgOid database_oid, YBCStatus YBCPgExecPostponedDdlStmt(YBCPgStatement handle); +YBCStatus YBCPgExecDropTable(YBCPgStatement handle); + YBCStatus YBCPgBackfillIndex( const YBCPgOid database_oid, const YBCPgOid index_oid); diff --git a/src/yb/yql/pgwrapper/CMakeLists.txt b/src/yb/yql/pgwrapper/CMakeLists.txt index cf3a213bc356..9fea4ebb6877 100644 --- a/src/yb/yql/pgwrapper/CMakeLists.txt +++ b/src/yb/yql/pgwrapper/CMakeLists.txt @@ -60,7 +60,8 @@ set(PG_WRAPPER_TEST_BASE_SRCS pg_tablet_split_test_base.cc pg_wrapper_test_base.cc) set(PG_WRAPPER_TEST_BASE_DEPS - pq_utils) + pq_utils + yb_client_test_util) ADD_YB_LIBRARY(pg_wrapper_test_base SRCS ${PG_WRAPPER_TEST_BASE_SRCS} DEPS ${PG_WRAPPER_TEST_BASE_DEPS}) @@ -112,6 +113,7 @@ ADD_YB_TEST(geo_transactions_promotion-test) ADD_YB_TEST(pg_explicit_lock-test) ADD_YB_TEST(alter_schema_abort_txn-test) ADD_YB_TEST(pg_cache_refresh-test) +ADD_YB_TEST(pg_ddl_atomicity-test) # This is really a tool, not a test, but uses a lot of existing test infrastructure. ADD_YB_TEST(create_initial_sys_catalog_snapshot) diff --git a/src/yb/yql/pgwrapper/libpq_utils.cc b/src/yb/yql/pgwrapper/libpq_utils.cc index 2fde0c7bcfe7..cd9bbab93da8 100644 --- a/src/yb/yql/pgwrapper/libpq_utils.cc +++ b/src/yb/yql/pgwrapper/libpq_utils.cc @@ -364,6 +364,11 @@ Status PGConn::RollbackTransaction() { return Execute("ROLLBACK"); } +Status PGConn::TestFailDdl(const std::string& ddl_to_fail) { + RETURN_NOT_OK(Execute("SET yb_test_fail_next_ddl=true")); + return Execute(ddl_to_fail); +} + Result PGConn::HasIndexScan(const std::string& query) { return VERIFY_RESULT(HasScanType(query, "Index")) || VERIFY_RESULT(HasScanType(query, "Index Only")); diff --git a/src/yb/yql/pgwrapper/libpq_utils.h b/src/yb/yql/pgwrapper/libpq_utils.h index b8a90a5deadc..1e1f0962ef38 100644 --- a/src/yb/yql/pgwrapper/libpq_utils.h +++ b/src/yb/yql/pgwrapper/libpq_utils.h @@ -134,6 +134,8 @@ class PGConn { Status CommitTransaction(); Status RollbackTransaction(); + Status TestFailDdl(const std::string& ddl_to_fail); + // Would this query use an index [only] scan? Result HasIndexScan(const std::string& query); Result HasScanType(const std::string& query, const std::string expected_scan_type); diff --git a/src/yb/yql/pgwrapper/pg_ddl_atomicity-test.cc b/src/yb/yql/pgwrapper/pg_ddl_atomicity-test.cc new file mode 100644 index 000000000000..66791faaf08a --- /dev/null +++ b/src/yb/yql/pgwrapper/pg_ddl_atomicity-test.cc @@ -0,0 +1,890 @@ +// Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. + +#include "yb/client/client_fwd.h" +#include "yb/client/snapshot_test_util.h" +#include "yb/client/table_info.h" +#include "yb/client/yb_table_name.h" +#include "yb/client/client-test-util.h" + +#include "yb/common/common.pb.h" +#include "yb/common/pgsql_error.h" +#include "yb/common/schema.h" + +#include "yb/master/master_client.pb.h" + +#include "yb/util/async_util.h" +#include "yb/util/backoff_waiter.h" +#include "yb/util/monotime.h" +#include "yb/util/test_thread_holder.h" +#include "yb/util/timestamp.h" +#include "yb/util/tsan_util.h" + +#include "yb/yql/pgwrapper/libpq_test_base.h" +#include "yb/yql/pgwrapper/libpq_utils.h" + +DECLARE_bool(ysql_ddl_rollback_enabled); + +using std::string; +using std::vector; + +namespace yb { +namespace pgwrapper { + +class PgDdlAtomicityTest : public LibPqTestBase { + void UpdateMiniClusterOptions(ExternalMiniClusterOptions* options) override { + options->extra_master_flags.push_back("--ysql_transaction_bg_task_wait_ms=5000"); + } + + protected: + void CreateTable(const string& tablename) { + auto conn = ASSERT_RESULT(Connect()); + ASSERT_OK(conn.Execute(CreateTableStmt(tablename))); + // Sleep until the transaction ddl state is cleared from this table. + // TODO (deepthi): This will no longer be needed after Phase-II. + sleep(2); + } + + string CreateTableStmt(const string& tablename) { + return "CREATE TABLE " + tablename + " (key INT PRIMARY KEY)"; + } + + string RenameTableStmt(const string& tablename) { + return RenameTableStmt(tablename, "foobar"); + } + + string RenameTableStmt(const string& tablename, const string& table_new_name) { + return Format("ALTER TABLE $0 RENAME TO $1", tablename, table_new_name); + } + + string AddColumnStmt(const string& tablename) { + return AddColumnStmt(tablename, "value"); + } + + string AddColumnStmt(const string& tablename, const string& col_name_to_add) { + return Format("ALTER TABLE $0 ADD COLUMN $1 TEXT", tablename, col_name_to_add); + } + + string RenameColumnStmt(const string& tablename) { + return RenameColumnStmt(tablename, "key", "key2"); + } + + string RenameColumnStmt(const string& tablename, + const string& col_to_rename, + const string& col_new_name) { + return Format("ALTER TABLE $0 RENAME COLUMN $1 TO $2", tablename, col_to_rename, col_new_name); + } + + string DropTableStmt(const string& tablename) { + return "DROP TABLE " + tablename; + } + + string db() { + return "yugabyte"; + } + + void RestartMaster() { + LOG(INFO) << "Restarting Master"; + auto master = cluster_->GetLeaderMaster(); + master->Shutdown(); + ASSERT_OK(master->Restart()); + ASSERT_OK(LoggedWaitFor([&]() -> Result { + auto s = cluster_->GetIsMasterLeaderServiceReady(master); + return s.ok(); + }, MonoDelta::FromSeconds(60), "Wait for Master to be ready.")); + } + + void SetFlagOnAllProcessesWithRollingRestart(const string& flag) { + LOG(INFO) << "Restart the cluster and set " << flag; + for (size_t i = 0; i != cluster_->num_masters(); ++i) { + cluster_->master(i)->mutable_flags()->push_back(flag); + } + + RestartMaster(); + + for (size_t i = 0; i != cluster_->num_tablet_servers(); ++i) { + cluster_->tablet_server(i)->mutable_flags()->push_back(flag); + } + for (auto* tserver : cluster_->tserver_daemons()) { + tserver->Shutdown(); + ASSERT_OK(tserver->Restart()); + sleep(5); + } + } + + void VerifySchema(client::YBClient* client, + const string& database_name, + const string& table_name, + const vector& expected_column_names) { + ASSERT_TRUE(ASSERT_RESULT( + CheckIfSchemaMatches(client, database_name, table_name, expected_column_names))); + } + + Result CheckIfSchemaMatches(client::YBClient* client, + const string& database_name, + const string& table_name, + const vector& expected_column_names) { + std::string table_id = VERIFY_RESULT(GetTableIdByTableName(client, database_name, table_name)); + + std::shared_ptr table_info = std::make_shared(); + Synchronizer sync; + RETURN_NOT_OK(client->GetTableSchemaById(table_id, table_info, sync.AsStatusCallback())); + RETURN_NOT_OK(sync.Wait()); + + const auto& columns = table_info->schema.columns(); + if (expected_column_names.size() != columns.size()) { + LOG(INFO) << "Expected " << expected_column_names.size() << " for " << table_name << " but " + << "found " << columns.size() << " columns"; + return false; + } + for (size_t i = 0; i < expected_column_names.size(); ++i) { + if (columns[i].name().compare(expected_column_names[i]) != 0) { + LOG(INFO) << "Expected column " << expected_column_names[i] << " but found " + << columns[i].name(); + return false; + } + } + return true; + } +}; + +TEST_F(PgDdlAtomicityTest, YB_DISABLE_TEST_IN_TSAN(TestDatabaseGC)) { + TableName test_name = "test_pgsql"; + auto client = ASSERT_RESULT(cluster_->CreateClient()); + + auto conn = ASSERT_RESULT(Connect()); + ASSERT_NOK(conn.TestFailDdl("CREATE DATABASE " + test_name)); + + // Verify DocDB Database creation, even though it failed in PG layer. + // 'ysql_transaction_bg_task_wait_ms' setting ensures we can finish this before the GC. + VerifyNamespaceExists(client.get(), test_name); + + // After bg_task_wait, DocDB will notice the PG layer failure because the transaction aborts. + // Confirm that DocDB async deletes the namespace. + VerifyNamespaceNotExists(client.get(), test_name); +} + +TEST_F(PgDdlAtomicityTest, YB_DISABLE_TEST_IN_TSAN(TestCreateDbFailureAndRestartGC)) { + NamespaceName test_name = "test_pgsql"; + auto client = ASSERT_RESULT(cluster_->CreateClient()); + auto conn = ASSERT_RESULT(Connect()); + ASSERT_NOK(conn.TestFailDdl("CREATE DATABASE " + test_name)); + + // Verify DocDB Database creation, even though it fails in PG layer. + // 'ysql_transaction_bg_task_wait_ms' setting ensures we can finish this before the GC. + VerifyNamespaceExists(client.get(), test_name); + + // Restart the master before the BG task can kick in and GC the failed transaction. + RestartMaster(); + + // Re-init client after restart. + client = ASSERT_RESULT(cluster_->CreateClient()); + + // Confirm that Catalog Loader deletes the namespace on master restart. + VerifyNamespaceNotExists(client.get(), test_name); +} + +TEST_F(PgDdlAtomicityTest, YB_DISABLE_TEST_IN_TSAN(TestIndexTableGC)) { + TableName test_name = "test_pgsql_table"; + TableName test_name_idx = test_name + "_idx"; + + auto client = ASSERT_RESULT(cluster_->CreateClient()); + + // Lower the delays so we successfully create this first table. + ASSERT_OK(cluster_->SetFlagOnMasters("ysql_transaction_bg_task_wait_ms", "10")); + + // Create Table that Index will be set on. + auto conn = ASSERT_RESULT(Connect()); + + ASSERT_OK(conn.Execute(CreateTableStmt(test_name))); + + // After successfully creating the first table, set flags to delay the background task. + ASSERT_OK(cluster_->SetFlagOnMasters("ysql_transaction_bg_task_wait_ms", "13000")); + + ASSERT_NOK(conn.TestFailDdl("CREATE INDEX " + test_name_idx + " ON " + test_name + "(key)")); + + // Wait for DocDB index creation, even though it will fail in PG layer. + // 'ysql_transaction_bg_task_wait_ms' setting ensures we can finish this before the GC. + VerifyTableExists(client.get(), db(), test_name_idx, 10); + + // DocDB will notice the PG layer failure because the transaction aborts. + // Confirm that DocDB async deletes the index. + VerifyTableNotExists(client.get(), db(), test_name_idx, 40); +} + +// Class for sanity test. +class PgDdlAtomicitySanityTest : public PgDdlAtomicityTest { + protected: + void UpdateMiniClusterOptions(ExternalMiniClusterOptions* options) override { + options->extra_master_flags.push_back("--ysql_ddl_rollback_enabled=true"); + options->extra_tserver_flags.push_back("--ysql_ddl_rollback_enabled=true"); + } +}; + +TEST_F(PgDdlAtomicitySanityTest, YB_DISABLE_TEST_IN_TSAN(AlterDropTableRollback)) { + TableName rename_table_test = "rename_table_test"; + TableName rename_col_test = "rename_col_test"; + TableName add_col_test = "add_col_test"; + TableName drop_table_test = "drop_table_test"; + + vector tables = { + rename_table_test, rename_col_test, add_col_test, drop_table_test}; + auto client = ASSERT_RESULT(cluster_->CreateClient()); + + auto conn = ASSERT_RESULT(Connect()); + for (size_t ii = 0; ii < tables.size(); ++ii) { + ASSERT_OK(conn.Execute(CreateTableStmt(tables[ii]))); + } + + // Wait for the transaction state left by create table statement to clear. + sleep(5); + + // Deliberately cause failure of the following Alter Table statements. + ASSERT_NOK(conn.TestFailDdl(RenameTableStmt(rename_table_test))); + ASSERT_NOK(conn.TestFailDdl(RenameColumnStmt(rename_col_test))); + ASSERT_NOK(conn.TestFailDdl(AddColumnStmt(add_col_test))); + ASSERT_NOK(conn.TestFailDdl(DropTableStmt(drop_table_test))); + + // Wait for rollback. + sleep(5); + for (size_t ii = 0; ii < tables.size(); ++ii) { + VerifySchema(client.get(), db(), tables[ii], {"key"}); + } + + // Verify that DDL succeeds after rollback is complete. + // TODO: Need to start a different connection here for rename to work as expected until #14395 + // is fixed. + conn = ASSERT_RESULT(Connect()); + ASSERT_OK(conn.Execute(RenameTableStmt(rename_table_test))); + ASSERT_OK(conn.Execute(RenameColumnStmt(rename_col_test))); + ASSERT_OK(conn.Execute(AddColumnStmt(add_col_test))); + ASSERT_OK(conn.Execute(DropTableStmt(drop_table_test))); +} + +TEST_F(PgDdlAtomicitySanityTest, YB_DISABLE_TEST_IN_TSAN(PrimaryKeyRollback)) { + TableName add_pk_table = "add_pk_table"; + TableName drop_pk_table = "drop_pk_table"; + + auto client = ASSERT_RESULT(cluster_->CreateClient()); + auto conn = ASSERT_RESULT(Connect()); + ASSERT_OK(conn.ExecuteFormat("CREATE TABLE $0 (id int)", add_pk_table)); + ASSERT_OK(conn.Execute(CreateTableStmt(drop_pk_table))); + + // Wait for transaction verification on the tables to complete. + sleep(1); + + // Delay rollback. + ASSERT_OK(cluster_->SetFlagOnMasters("ysql_transaction_bg_task_wait_ms", "10000")); + // Fail Alter operation that adds/drops primary key. + ASSERT_NOK(conn.TestFailDdl(Format("ALTER TABLE $0 ADD PRIMARY KEY(id)", add_pk_table))); + ASSERT_NOK(conn.TestFailDdl(Format("ALTER TABLE $0 DROP CONSTRAINT $0_pkey;", drop_pk_table))); + + // Verify presence of temp table created for adding a primary key. + VerifyTableExists(client.get(), db(), add_pk_table + "_temp_old", 10); + VerifyTableExists(client.get(), db(), drop_pk_table + "_temp_old", 10); + + // Verify that the background task detected the failure of the DDL operation and removed the + // temp table. + VerifyTableNotExists(client.get(), db(), add_pk_table + "_temp_old", 40); + VerifyTableNotExists(client.get(), db(), drop_pk_table + "_temp_old", 40); + + // Verify that PK constraint is not present on the table. + ASSERT_OK(conn.Execute("INSERT INTO " + add_pk_table + " VALUES (1), (1)")); + + // Verify that PK constraint is still present on the table. + ASSERT_NOK(conn.Execute("INSERT INTO " + drop_pk_table + " VALUES (1), (1)")); +} + +TEST_F(PgDdlAtomicitySanityTest, YB_DISABLE_TEST_IN_TSAN(DdlRollbackMasterRestart)) { + TableName create_table_test = "create_test"; + TableName rename_table_test = "rename_table_test"; + TableName rename_col_test = "rename_col_test"; + TableName add_col_test = "add_col_test"; + TableName drop_table_test = "drop_test"; + + vector tables_to_create = { + rename_table_test, rename_col_test, add_col_test, drop_table_test}; + + auto client = ASSERT_RESULT(cluster_->CreateClient()); + auto conn = ASSERT_RESULT(Connect()); + for (size_t ii = 0; ii < tables_to_create.size(); ++ii) { + ASSERT_OK(conn.Execute(CreateTableStmt(tables_to_create[ii]))); + } + + // Set ysql_transaction_bg_task_wait_ms so high that table rollback is nearly + // disabled. + ASSERT_OK(cluster_->SetFlagOnMasters("ysql_transaction_bg_task_wait_ms", "100000")); + ASSERT_NOK(conn.TestFailDdl(CreateTableStmt(create_table_test))); + ASSERT_NOK(conn.TestFailDdl(RenameTableStmt(rename_table_test))); + ASSERT_NOK(conn.TestFailDdl(RenameColumnStmt(rename_col_test))); + ASSERT_NOK(conn.TestFailDdl(AddColumnStmt(add_col_test))); + ASSERT_NOK(conn.TestFailDdl(DropTableStmt(drop_table_test))); + + // Verify that table was created on DocDB. + VerifyTableExists(client.get(), db(), create_table_test, 10); + VerifyTableExists(client.get(), db(), drop_table_test, 10); + + // Verify that the tables were altered on DocDB. + VerifySchema(client.get(), db(), "foobar", {"key"}); + VerifySchema(client.get(), db(), rename_col_test, {"key2"}); + VerifySchema(client.get(), db(), add_col_test, {"key", "value"}); + + // Restart the master before the BG task can kick in and GC the failed transaction. + ASSERT_OK(cluster_->SetFlagOnMasters("ysql_transaction_bg_task_wait_ms", "0")); + RestartMaster(); + + // Verify that rollback reflected on all the tables after restart. + client = ASSERT_RESULT(cluster_->CreateClient()); // Reinit the YBClient after restart. + + VerifyTableNotExists(client.get(), db(), create_table_test, 20); + VerifyTableExists(client.get(), db(), rename_table_test, 20); + // Verify all the other tables are unchanged by all of the DDLs. + for (const string& table : tables_to_create) { + ASSERT_OK(LoggedWaitFor([&]() -> Result { + return CheckIfSchemaMatches(client.get(), db(), table, {"key"}); + }, MonoDelta::FromSeconds(30), Format("Wait for rollback for table $0", table))); + } +} + +// TODO (deepthi) : This test is flaky because of #14995. Re-enable it back after #14995 is fixed. +TEST_F(PgDdlAtomicitySanityTest, YB_DISABLE_TEST(FailureRecoveryTest)) { + TableName rename_table_test = "rename_table_test"; + TableName rename_col_test = "rename_col_test"; + TableName add_col_test = "add_col_test"; + TableName drop_table_test = "drop_test"; + + vector tables_to_create = { + rename_table_test, rename_col_test, add_col_test, drop_table_test}; + + auto client = ASSERT_RESULT(cluster_->CreateClient()); + auto conn = ASSERT_RESULT(Connect()); + for (size_t ii = 0; ii < tables_to_create.size(); ++ii) { + ASSERT_OK(conn.Execute(CreateTableStmt(tables_to_create[ii]))); + } + + // Set ysql_transaction_bg_task_wait_ms so high that table rollback is nearly + // disabled. + ASSERT_OK(cluster_->SetFlagOnMasters("ysql_transaction_bg_task_wait_ms", "100000")); + ASSERT_NOK(conn.TestFailDdl(RenameTableStmt(rename_table_test))); + ASSERT_NOK(conn.TestFailDdl(RenameColumnStmt(rename_col_test))); + ASSERT_NOK(conn.TestFailDdl(AddColumnStmt(add_col_test))); + ASSERT_NOK(conn.TestFailDdl(DropTableStmt(drop_table_test))); + + // Verify that table was created on DocDB. + VerifyTableExists(client.get(), db(), drop_table_test, 10); + + // Verify that the tables were altered on DocDB. + VerifySchema(client.get(), db(), "foobar", {"key"}); + VerifySchema(client.get(), db(), rename_col_test, {"key2"}); + VerifySchema(client.get(), db(), add_col_test, {"key", "value"}); + + // Disable DDL rollback. + ASSERT_OK(cluster_->SetFlagOnMasters("ysql_ddl_rollback_enabled", "false")); + ASSERT_OK(cluster_->SetFlagOnTServers("ysql_ddl_rollback_enabled", "false")); + + // Verify that best effort rollback works when ysql_ddl_rollback is disabled. + ASSERT_OK(conn.ExecuteFormat("INSERT INTO $0 VALUES (1)", add_col_test)); + // The following DDL fails because the table already contains data, so it is not possible to + // add a new column with a not null constraint. + ASSERT_NOK(conn.ExecuteFormat("ALTER TABLE $0 ADD COLUMN value2 TEXT NOT NULL")); + // Verify that the above DDL was rolled back by YSQL. + VerifySchema(client.get(), db(), add_col_test, {"key", "value"}); + + // Restart the cluster with DDL rollback disabled. + SetFlagOnAllProcessesWithRollingRestart("--ysql_ddl_rollback_enabled=false"); + // Verify that rollback did not occur even after the restart. + client = ASSERT_RESULT(cluster_->CreateClient()); + VerifyTableExists(client.get(), db(), drop_table_test, 10); + VerifySchema(client.get(), db(), "foobar", {"key"}); + VerifySchema(client.get(), db(), rename_col_test, {"key2"}); + VerifySchema(client.get(), db(), add_col_test, {"key", "value"}); + + // Verify that it is still possible to run DDL on an affected table. + // Tables having unverified transaction state on them can still be altered if the DDL rollback + // is not enabled. + // TODO: Need to start a different connection here for rename to work as expected until #14395 + // is fixed. + conn = ASSERT_RESULT(Connect()); + ASSERT_OK(conn.Execute(RenameTableStmt(rename_table_test, "foobar2"))); + ASSERT_OK(conn.Execute(AddColumnStmt(add_col_test, "value2"))); + + // Re-enable DDL rollback properly with restart. + SetFlagOnAllProcessesWithRollingRestart("--ysql_ddl_rollback_enabled=true"); + + client = ASSERT_RESULT(cluster_->CreateClient()); + conn = ASSERT_RESULT(Connect()); + // The tables with the transaction state on them must have had their state cleared upon restart + // since the flag is now enabled again. + ASSERT_OK(conn.Execute(RenameColumnStmt(rename_col_test))); + ASSERT_OK(conn.Execute(DropTableStmt(drop_table_test))); + + // However add_col_test will be corrupted because we never performed transaction rollback on it. + // It should ideally have only one added column "value2", but we never rolled back the addition of + // "value". + VerifySchema(client.get(), db(), add_col_test, {"key", "value", "value2"}); + + // Add a column to add_col_test which is now corrupted. + ASSERT_OK(conn.Execute(AddColumnStmt(add_col_test, "value3"))); + // Wait for transaction verification to run. + sleep(2); + // However since the PG schema and DocDB schema have become out-of-sync, the above DDL cannot + // be verified. All future DDL operations will now fail. + ASSERT_NOK(conn.Execute(RenameTableStmt(add_col_test, "foobar3"))); + ASSERT_NOK(conn.Execute(RenameColumnStmt(add_col_test))); + ASSERT_NOK(conn.Execute(DropTableStmt(add_col_test))); +} + +class PgDdlAtomicityConcurrentDdlTest : public PgDdlAtomicitySanityTest { + public: + void UpdateMiniClusterOptions(ExternalMiniClusterOptions* options) override { + PgDdlAtomicitySanityTest::UpdateMiniClusterOptions(options); + } + + protected: + bool testFailedDueToTxnVerification(PGConn *conn, const string& cmd) { + Status s = conn->Execute(cmd); + if (s.ok()) { + LOG(ERROR) << "Command " << cmd << " executed successfully when failure expected"; + return false; + } + return s.ToString().find("Table is undergoing DDL transaction verification") != string::npos; + } + + void testConcurrentDDL(const string& stmt1, const string& stmt2) { + // Test that until transaction verification is complete, another DDL operating on + // the same object cannot go through. + auto conn = ASSERT_RESULT(Connect()); + ASSERT_OK(conn.Execute(stmt1)); + // The first DDL was successful, but the second should fail because rollback has + // been delayed using 'ysql_transaction_bg_task_wait_ms'. + auto conn2 = ASSERT_RESULT(Connect()); + ASSERT_TRUE(testFailedDueToTxnVerification(&conn2, stmt2)); + } + + void testConcurrentFailedDDL(const string& stmt1, const string& stmt2) { + // Same as 'testConcurrentDDL' but here the first DDL statement is a failure. However + // other DDLs still cannot happen unless rollback is complete. + auto conn = ASSERT_RESULT(Connect()); + ASSERT_NOK(conn.TestFailDdl(stmt1)); + auto conn2 = ASSERT_RESULT(Connect()); + ASSERT_TRUE(testFailedDueToTxnVerification(&conn2, stmt2)); + } +}; + +TEST_F(PgDdlAtomicityConcurrentDdlTest, YB_DISABLE_TEST_IN_TSAN(ConcurrentDdl)) { + const string kCreateAndAlter = "create_and_alter_test"; + const string kCreateAndDrop = "create_and_drop_test"; + const string kDropAndAlter = "drop_and_alter_test"; + const string kAlterAndAlter = "alter_and_alter_test"; + const string kAlterAndDrop = "alter_and_drop_test"; + + const vector tables_to_create = {kDropAndAlter, kAlterAndAlter, kAlterAndDrop}; + + auto conn = ASSERT_RESULT(Connect()); + for (const auto& table : tables_to_create) { + ASSERT_OK(conn.Execute(CreateTableStmt(table))); + } + + // Wait for YsqlDdlTxnVerifierState to be cleaned up from the tables. + sleep(5); + + // Test that we can't run a second DDL on a table until the YsqlTxnVerifierState on the first + // table is cleared. + ASSERT_OK(cluster_->SetFlagOnMasters("ysql_transaction_bg_task_wait_ms", "60000")); + testConcurrentDDL(CreateTableStmt(kCreateAndAlter), AddColumnStmt(kCreateAndAlter)); + testConcurrentDDL(CreateTableStmt(kCreateAndDrop), DropTableStmt(kCreateAndDrop)); + testConcurrentDDL(AddColumnStmt(kAlterAndDrop), DropTableStmt(kAlterAndDrop)); + testConcurrentDDL(RenameColumnStmt(kAlterAndAlter), RenameTableStmt(kAlterAndAlter)); + + // Test that we can't run a second DDL on a table until the YsqlTxnVerifierState on the first + // table is cleared even if the first DDL was a failure. + testConcurrentFailedDDL(DropTableStmt(kDropAndAlter), RenameColumnStmt(kDropAndAlter)); +} + +class PgDdlAtomicityTxnTest : public PgDdlAtomicitySanityTest { + public: + void runFailingDdlTransaction(const string& ddl_statements) { + // Normally every DDL auto-commits, and thus we usually have only one DDL statement in a + // transaction. However, when DDLs are invoked in a function, all the DDLs will be executed + // in a single atomic transaction if the function itself was invoked as part of a DDL statement. + auto conn = ASSERT_RESULT(Connect()); + ASSERT_OK(conn.ExecuteFormat( + "CREATE OR REPLACE FUNCTION ddl_txn_func() RETURNS TABLE (k INT) AS $$ " + "BEGIN " + ddl_statements + + " CREATE TABLE t AS SELECT (1) AS k;" + // The following statement will fail due to NOT NULL constraint. + " ALTER TABLE t ADD COLUMN v3 int not null;" + " RETURN QUERY SELECT k FROM t;" + " END $$ LANGUAGE plpgsql")); + ASSERT_NOK(conn.Execute("CREATE TABLE txntest AS SELECT k FROM ddl_txn_func()")); + // Sleep 1s for the rollback to complete. + sleep(1); + } + + string table() { + return "test_table"; + } +}; + +TEST_F(PgDdlAtomicityTxnTest, YB_DISABLE_TEST_IN_TSAN(CreateAlterDropTest)) { + string ddl_statements = CreateTableStmt(table()) + "; " + + AddColumnStmt(table()) + "; " + + RenameColumnStmt(table()) + "; " + + RenameTableStmt(table()) + "; " + + DropTableStmt("foobar") + ";"; + runFailingDdlTransaction(ddl_statements); + // Table should not exist. + auto client = ASSERT_RESULT(cluster_->CreateClient()); + VerifyTableNotExists(client.get(), db(), table(), 10); +} + +TEST_F(PgDdlAtomicityTxnTest, YB_DISABLE_TEST_IN_TSAN(CreateDropTest)) { + string ddl_statements = CreateTableStmt(table()) + "; " + + DropTableStmt(table()) + "; "; + runFailingDdlTransaction(ddl_statements); + // Table should not exist. + auto client = ASSERT_RESULT(cluster_->CreateClient()); + VerifyTableNotExists(client.get(), db(), table(), 10); +} + +TEST_F(PgDdlAtomicityTxnTest, YB_DISABLE_TEST_IN_TSAN(CreateAlterTest)) { + string ddl_statements = CreateTableStmt(table()) + "; " + + AddColumnStmt(table()) + "; " + + RenameColumnStmt(table()) + "; " + + RenameTableStmt(table()) + "; "; + runFailingDdlTransaction(ddl_statements); + // Table should not exist. + auto client = ASSERT_RESULT(cluster_->CreateClient()); + VerifyTableNotExists(client.get(), db(), table(), 10); +} + +TEST_F(PgDdlAtomicityTxnTest, YB_DISABLE_TEST_IN_TSAN(AlterDropTest)) { + CreateTable(table()); + string ddl_statements = RenameColumnStmt(table()) + "; " + DropTableStmt(table()) + "; "; + runFailingDdlTransaction(ddl_statements); + + // Table should exist with old schema intact. + auto client = ASSERT_RESULT(cluster_->CreateClient()); + VerifySchema(client.get(), db(), table(), {"key"}); +} + +TEST_F(PgDdlAtomicityTxnTest, YB_DISABLE_TEST_IN_TSAN(AddColRenameColTest)) { + CreateTable(table()); + string ddl_statements = AddColumnStmt(table()) + "; " + RenameColumnStmt(table()) + "; "; + runFailingDdlTransaction(ddl_statements); + + // Table should exist with old schema intact. + auto client = ASSERT_RESULT(cluster_->CreateClient()); + VerifySchema(client.get(), db(), table(), {"key"}); +} + +TEST_F(PgDdlAtomicitySanityTest, YB_DISABLE_TEST_IN_TSAN(DmlWithAddColTest)) { + auto client = ASSERT_RESULT(cluster_->CreateClient()); + const string& table = "dml_with_add_col_test"; + CreateTable(table); + auto conn1 = ASSERT_RESULT(Connect()); + auto conn2 = ASSERT_RESULT(Connect()); + + // Conn1: Begin write to the table. + ASSERT_OK(conn1.Execute("BEGIN")); + ASSERT_OK(conn1.Execute("INSERT INTO " + table + " VALUES (1)")); + + // Conn2: Initiate rollback of the alter. + ASSERT_OK(cluster_->SetFlagOnMasters("ysql_transaction_bg_task_wait_ms", "10000")); + ASSERT_NOK(conn2.TestFailDdl(AddColumnStmt(table))); + + // Conn1: Since we parallely added a column to the table, the add-column operation would have + // detected the distributed transaction locks acquired by this transaction on its tablets and + // aborted it. Add-column operation aborts all ongoing transactions on the table without + // exception as the transaction could now be in an erroneous state having used the schema + // without the newly added column. + ASSERT_NOK(conn1.Execute("COMMIT")); + + // Conn1: Non-transactional insert retry due to Schema version mismatch, + // refreshing the table cache. + ASSERT_OK(conn1.Execute("INSERT INTO " + table + " VALUES (1)")); + + // Conn1: Start new transaction. + ASSERT_OK(conn1.Execute("BEGIN")); + ASSERT_OK(conn1.Execute("INSERT INTO " + table + " VALUES (2)")); + + // Rollback happens while the transaction is in progress. + // Wait for the rollback to complete. + ASSERT_OK(LoggedWaitFor([&]() -> Result { + return CheckIfSchemaMatches(client.get(), db(), table, {"key"}); + }, MonoDelta::FromSeconds(30), "Wait for Add Column to be rolled back.")); + + // Transaction at conn1 succeeds. Normally, drop-column operation would also have aborted all + // ongoing transactions on this table. However this is a drop-column operation initiated as part + // of rollback. The column being dropped by this operation was added as part of an uncommitted + // transaction, so this column would not have been visible to any transaction. It is safe for + // this drop-column operation to operate silently without aborting any transaction. Therefore this + // transaction succeeds. + ASSERT_OK(conn1.Execute("COMMIT")); +} + +// Test that DML transactions concurrent with an aborted DROP TABLE transaction +// can commit successfully (both before and after the rollback is complete).` +TEST_F(PgDdlAtomicitySanityTest, YB_DISABLE_TEST_IN_TSAN(DmlWithDropTableTest)) { + auto client = ASSERT_RESULT(cluster_->CreateClient()); + const string& table = "dml_with_drop_test"; + CreateTable(table); + auto conn1 = ASSERT_RESULT(Connect()); + auto conn2 = ASSERT_RESULT(Connect()); + auto conn3 = ASSERT_RESULT(Connect()); + + // Conn1: Begin write to the table. + ASSERT_OK(conn1.Execute("BEGIN")); + ASSERT_OK(conn1.Execute("INSERT INTO " + table + " VALUES (1)")); + + // Conn2: Also begin write to the table. + ASSERT_OK(conn2.Execute("BEGIN")); + ASSERT_OK(conn2.Execute("INSERT INTO " + table + " VALUES (2)")); + + // Conn3: Initiate rollback of DROP table. + ASSERT_OK(cluster_->SetFlagOnMasters("ysql_transaction_bg_task_wait_ms", "5000")); + ASSERT_NOK(conn3.TestFailDdl(DropTableStmt(table))); + + // Conn1: This should succeed. + ASSERT_OK(conn1.Execute("INSERT INTO " + table + " VALUES (3)")); + ASSERT_OK(conn1.Execute("COMMIT")); + + // Wait for rollback to complete. + sleep(5); + + // Conn2 must also commit successfully. + ASSERT_OK(conn2.Execute("INSERT INTO " + table + " VALUES (4)")); + ASSERT_OK(conn2.Execute("COMMIT")); +} + +class PgDdlAtomicityNegativeTestBase : public PgDdlAtomicitySanityTest { + protected: + void SetUp() override { + LibPqTestBase::SetUp(); + + conn_ = std::make_unique(ASSERT_RESULT(ConnectToDB(kDatabaseName))); + } + + virtual string CreateTableStmt(const string& tablename) { + return PgDdlAtomicitySanityTest::CreateTableStmt(tablename); + } + + virtual string CreateRollbackEnabledTableStmt(const string& tablename) = 0; + + virtual string CreateIndexStmt(const string& tablename, const string& indexname) { + return Format("CREATE INDEX $0 ON $1(key)", indexname, tablename); + } + + // Test function. + void negativeTest(); + void negativeDropTableTxnTest(); + + std::unique_ptr conn_; + string kDatabaseName = "yugabyte"; +}; + +void PgDdlAtomicityNegativeTestBase::negativeTest() { + TableName create_table_test = "create_test"; + TableName create_table_idx = "create_test_idx"; + TableName rename_table_test = "rename_table_test"; + TableName rename_col_test = "rename_col_test"; + TableName add_col_test = "add_col_test"; + + vector tables_to_create = { + rename_table_test, rename_col_test, add_col_test}; + + auto client = ASSERT_RESULT(cluster_->CreateClient()); + for (size_t ii = 0; ii < tables_to_create.size(); ++ii) { + ASSERT_OK(conn_->Execute(CreateTableStmt(tables_to_create[ii]))); + } + + ASSERT_NOK(conn_->TestFailDdl(CreateTableStmt(create_table_test))); + ASSERT_NOK(conn_->TestFailDdl(CreateIndexStmt(rename_table_test, create_table_idx))); + ASSERT_NOK(conn_->TestFailDdl(RenameTableStmt(rename_table_test))); + ASSERT_NOK(conn_->TestFailDdl(RenameColumnStmt(rename_col_test))); + ASSERT_NOK(conn_->TestFailDdl(AddColumnStmt(add_col_test))); + + // Wait for rollback to complete. + sleep(5); + + // Create is rolled back using existing transaction GC infrastructure. + VerifyTableNotExists(client.get(), kDatabaseName, create_table_test, 15); + VerifyTableNotExists(client.get(), kDatabaseName, create_table_idx, 15); + + // Verify that Alter table transactions are not rolled back because rollback is not enabled yet + // for certain cases like colocation and tablegroups. + VerifySchema(client.get(), kDatabaseName, "foobar", {"key"}); + VerifySchema(client.get(), kDatabaseName, rename_col_test, {"key2"}); + VerifySchema(client.get(), kDatabaseName, add_col_test, {"key", "value"}); +} + +void PgDdlAtomicityNegativeTestBase::negativeDropTableTxnTest() { + // Verify that dropping a table with rollback enabled and another table with rollback disabled + // in the same transaction works as expected. + TableName rollback_enabled_table = "rollback_disabled_table"; + TableName rollback_disabled_table = "rollback_enabled_table"; + + ASSERT_OK(conn_->Execute(CreateTableStmt(rollback_disabled_table))); + ASSERT_OK(conn_->Execute(CreateRollbackEnabledTableStmt(rollback_enabled_table))); + + // Wait for rollback state to clear from the created tables. + sleep(1); + + ASSERT_NOK(conn_->TestFailDdl(Format("DROP TABLE $0, $1", + rollback_enabled_table, rollback_disabled_table))); + // The YB-Master background task ensures that 'rollback_enabled_table' is not deleted. For + // 'rollback_disabled_table', PG layer delays sending the delete request until the transaction + // completes. Thus, verify that the tables do not get deleted even after some time. + sleep(5); + auto client = ASSERT_RESULT(cluster_->CreateClient()); + VerifyTableExists(client.get(), kDatabaseName, rollback_disabled_table, 15); + VerifyTableExists(client.get(), kDatabaseName, rollback_enabled_table, 15); + + // DDLs succeed after rollback. + ASSERT_OK(conn_->Execute(RenameColumnStmt(rollback_disabled_table))); + ASSERT_OK(conn_->Execute(AddColumnStmt(rollback_enabled_table))); +} + +class PgDdlAtomicityNegativeTestColocated : public PgDdlAtomicityNegativeTestBase { + void SetUp() override { + LibPqTestBase::SetUp(); + + kDatabaseName = "colocateddbtest"; + PGConn conn_init = ASSERT_RESULT(Connect()); + ASSERT_OK(conn_init.ExecuteFormat("CREATE DATABASE $0 WITH colocated = true", kDatabaseName)); + + conn_ = std::make_unique(ASSERT_RESULT(ConnectToDB(kDatabaseName))); + } + + string CreateRollbackEnabledTableStmt(const string& tablename) override { + return Format("CREATE TABLE $0 (a INT) WITH (colocated = false);", tablename); + } +}; + +TEST_F(PgDdlAtomicityNegativeTestColocated, YB_DISABLE_TEST_IN_TSAN(ColocatedTest)) { + negativeTest(); + negativeDropTableTxnTest(); +} + +class PgDdlAtomicityNegativeTestTablegroup : public PgDdlAtomicityNegativeTestBase { + void SetUp() override { + LibPqTestBase::SetUp(); + + conn_ = std::make_unique(ASSERT_RESULT(Connect())); + ASSERT_OK(conn_->ExecuteFormat("CREATE TABLEGROUP $0", kTablegroup)); + } + + string CreateTableStmt(const string& tablename) override { + return (PgDdlAtomicitySanityTest::CreateTableStmt(tablename) + " TABLEGROUP " + kTablegroup); + } + + string CreateRollbackEnabledTableStmt(const string& tablename) override { + return PgDdlAtomicitySanityTest::CreateTableStmt(tablename); + } + + const string kTablegroup = "test_tgroup"; +}; + +TEST_F(PgDdlAtomicityNegativeTestTablegroup, YB_DISABLE_TEST_IN_TSAN(TablegroupTest)) { + negativeTest(); + negativeDropTableTxnTest(); +} + +class PgDdlAtomicitySnapshotTest : public PgDdlAtomicitySanityTest { + void SetUp() override { + LibPqTestBase::SetUp(); + + snapshot_util_ = std::make_unique(); + client_ = ASSERT_RESULT(cluster_->CreateClient()); + snapshot_util_->SetProxy(&client_->proxy_cache()); + snapshot_util_->SetCluster(cluster_.get()); + } + + public: + std::unique_ptr snapshot_util_; + std::unique_ptr client_; +}; + +TEST_F(PgDdlAtomicitySnapshotTest, YB_DISABLE_TEST_IN_TSAN(SnapshotTest)) { + // Create requisite tables. + TableName create_table_test = "create_test"; + TableName add_col_test = "add_col_test"; + TableName drop_table_test = "drop_test"; + + vector tables_to_create = {add_col_test, drop_table_test}; + + auto client = ASSERT_RESULT(cluster_->CreateClient()); + auto conn = ASSERT_RESULT(Connect()); + for (size_t ii = 0; ii < tables_to_create.size(); ++ii) { + ASSERT_OK(conn.Execute(CreateTableStmt(tables_to_create[ii]))); + } + + const int snapshot_interval_secs = 10; + // Create a snapshot schedule before running all the failed DDLs. + auto schedule_id = ASSERT_RESULT( + snapshot_util_->CreateSchedule(db(), + client::WaitSnapshot::kTrue, + MonoDelta::FromSeconds(snapshot_interval_secs))); + + // Run all the failed DDLs. + ASSERT_OK(cluster_->SetFlagOnMasters("ysql_transaction_bg_task_wait_ms", "35000")); + ASSERT_NOK(conn.TestFailDdl(CreateTableStmt(create_table_test))); + ASSERT_NOK(conn.TestFailDdl(AddColumnStmt(add_col_test))); + ASSERT_NOK(conn.TestFailDdl(DropTableStmt(drop_table_test))); + + // Wait 10s to ensure that a snapshot is taken right after these DDLs failed. + sleep(snapshot_interval_secs); + + // Get the hybrid time before the rollback can happen. + Timestamp current_time(ASSERT_RESULT(WallClock()->Now()).time_point); + HybridTime hybrid_time_before_rollback = HybridTime::FromMicros(current_time.ToInt64()); + + // Verify that rollback for Alter and Create has indeed not happened yet. + VerifyTableExists(client.get(), db(), create_table_test, 10); + VerifyTableExists(client.get(), db(), drop_table_test, 10); + VerifySchema(client.get(), db(), add_col_test, {"key", "value"}); + + // Verify that rollback happens eventually. + VerifyTableNotExists(client.get(), db(), create_table_test, 60); + for (const string& table : tables_to_create) { + ASSERT_OK(LoggedWaitFor([&]() -> Result { + return CheckIfSchemaMatches(client.get(), db(), table, {"key"}); + }, MonoDelta::FromSeconds(60), "Wait for rollback to complete")); + } + + /* + TODO (deepthi): Uncomment the following code after #14679 is fixed. + // Run different failing DDL operations on the tables. + ASSERT_NOK(conn.TestFailDdl(RenameTableStmt(add_col_test))); + ASSERT_NOK(conn.TestFailDdl(RenameColumnStmt(drop_table_test))); + */ + + // Restore to before rollback. + LOG(INFO) << "Start restoration to timestamp " << hybrid_time_before_rollback; + auto snapshot_id = + ASSERT_RESULT(snapshot_util_->PickSuitableSnapshot(schedule_id, hybrid_time_before_rollback)); + ASSERT_OK(snapshot_util_->RestoreSnapshot(snapshot_id, hybrid_time_before_rollback)); + + LOG(INFO) << "Restoration complete"; + + // We restored to a point before the first rollback occurred. That DDL transaction should have + // been re-detected to be a failure and rolled back again. + VerifyTableNotExists(client.get(), db(), create_table_test, 60); + for (const string& table : tables_to_create) { + ASSERT_OK(LoggedWaitFor([&]() -> Result { + return CheckIfSchemaMatches(client.get(), db(), table, {"key"}); + }, MonoDelta::FromSeconds(60), "Wait for rollback to complete")); + } +} + +} // namespace pgwrapper +} // namespace yb diff --git a/src/yb/yql/pgwrapper/pg_gin_index-test.cc b/src/yb/yql/pgwrapper/pg_gin_index-test.cc index 4e37b4a7adf9..6d2263148770 100644 --- a/src/yb/yql/pgwrapper/pg_gin_index-test.cc +++ b/src/yb/yql/pgwrapper/pg_gin_index-test.cc @@ -13,6 +13,7 @@ #include #include "yb/client/client.h" +#include "yb/client/client-test-util.h" #include "yb/client/table_info.h" #include "yb/client/yb_table_name.h" @@ -51,23 +52,6 @@ class PgGinIndexTest : public LibPqTestBase { std::unique_ptr conn_; }; -namespace { - -// A copy of the same function in pg_libpq-test.cc. Eventually, issue #6868 should provide a way to -// do this easily for both this file and that. -Result GetTableIdByTableName( - client::YBClient* client, const string& namespace_name, const string& table_name) { - const auto tables = VERIFY_RESULT(client->ListTables()); - for (const auto& t : tables) { - if (t.namespace_name() == namespace_name && t.table_name() == table_name) { - return t.table_id(); - } - } - return STATUS(NotFound, "The table does not exist"); -} - -} // namespace - // Test creating a ybgin index on an array whose element type is unsupported for primary key. TEST_F(PgGinIndexTest, YB_DISABLE_TEST_IN_TSAN(UnsupportedArrayElementType)) { ASSERT_OK(conn_->ExecuteFormat("CREATE TABLE $0 (a tsvector[])", kTableName)); diff --git a/src/yb/yql/pgwrapper/pg_index_backfill-test.cc b/src/yb/yql/pgwrapper/pg_index_backfill-test.cc index f75930c4eb2d..ee595aba3e54 100644 --- a/src/yb/yql/pgwrapper/pg_index_backfill-test.cc +++ b/src/yb/yql/pgwrapper/pg_index_backfill-test.cc @@ -15,6 +15,7 @@ #include #include +#include "yb/client/client-test-util.h" #include "yb/client/table_info.h" #include "yb/common/schema.h" @@ -146,19 +147,6 @@ class PgIndexBackfillTest : public LibPqTestBase { namespace { -// A copy of the same function in pg_libpq-test.cc. Eventually, issue #6868 should provide a way to -// do this easily for both this file and that. -Result GetTableIdByTableName( - client::YBClient* client, const string& namespace_name, const string& table_name) { - const auto tables = VERIFY_RESULT(client->ListTables()); - for (const auto& t : tables) { - if (t.namespace_name() == namespace_name && t.table_name() == table_name) { - return t.table_id(); - } - } - return STATUS(NotFound, "The table does not exist"); -} - Result TotalBackfillRpcMetric(ExternalMiniCluster* cluster, const char* type) { int total_rpc_calls = 0; constexpr auto metric_name = "handler_latency_yb_tserver_TabletServerAdminService_BackfillIndex"; diff --git a/src/yb/yql/pgwrapper/pg_libpq-test.cc b/src/yb/yql/pgwrapper/pg_libpq-test.cc index 99581a653740..52d19321aa10 100644 --- a/src/yb/yql/pgwrapper/pg_libpq-test.cc +++ b/src/yb/yql/pgwrapper/pg_libpq-test.cc @@ -28,6 +28,7 @@ #include "yb/client/client_fwd.h" #include "yb/client/table_info.h" #include "yb/client/yb_table_name.h" +#include "yb/client/client-test-util.h" #include "yb/common/common.pb.h" #include "yb/common/pgsql_error.h" @@ -886,32 +887,6 @@ TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(InTxnDelete)) { ASSERT_NO_FATALS(AssertRows(&conn, 1)); } -namespace { - -Result GetNamespaceIdByNamespaceName( - client::YBClient* client, const string& namespace_name) { - const auto namespaces = VERIFY_RESULT(client->ListNamespaces(YQL_DATABASE_PGSQL)); - for (const auto& ns : namespaces) { - if (ns.name() == namespace_name) { - return ns.id(); - } - } - return STATUS(NotFound, "The namespace does not exist"); -} - -Result GetTableIdByTableName( - client::YBClient* client, const string& namespace_name, const string& table_name) { - const auto tables = VERIFY_RESULT(client->ListTables()); - for (const auto& t : tables) { - if (t.namespace_name() == namespace_name && t.table_name() == table_name) { - return t.table_id(); - } - } - return STATUS(NotFound, "The table does not exist"); -} - -} // namespace - TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(CompoundKeyColumnOrder)) { const string namespace_name = "yugabyte"; const string table_name = "test"; @@ -2411,215 +2386,6 @@ TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(CacheRefreshRetryEnabled)) { TestCacheRefreshRetry(false /* is_retry_disabled */); } -class PgLibPqDatabaseTimeoutTest : public PgLibPqTest { - void UpdateMiniClusterOptions(ExternalMiniClusterOptions* options) override { - options->extra_tserver_flags.push_back("--TEST_user_ddl_operation_timeout_sec=1"); - options->extra_master_flags.push_back("--ysql_transaction_bg_task_wait_ms=5000"); - } -}; - -TEST_F(PgLibPqDatabaseTimeoutTest, YB_DISABLE_TEST_IN_TSAN(TestDatabaseTimeoutGC)) { - NamespaceName test_name = "test_pgsql"; - auto client = ASSERT_RESULT(cluster_->CreateClient()); - - // Create Database: will timeout because the admin setting is lower than the DB create latency. - { - auto conn = ASSERT_RESULT(Connect()); - ASSERT_NOK(conn.Execute("CREATE DATABASE " + test_name)); - } - - // Verify DocDB Database creation, even though it failed in PG layer. - // 'ysql_transaction_bg_task_wait_ms' setting ensures we can finish this before the GC. - ASSERT_OK(LoggedWaitFor([&]() -> Result { - Result ret = client->NamespaceExists(test_name, YQLDatabase::YQL_DATABASE_PGSQL); - WARN_NOT_OK(ResultToStatus(ret), "" /* prefix */); - return ret.ok() && ret.get(); - }, MonoDelta::FromSeconds(60), - "Verify Namespace was created in DocDB")); - - // After bg_task_wait, DocDB will notice the PG layer failure because the transaction aborts. - // Confirm that DocDB async deletes the namespace. - ASSERT_OK(LoggedWaitFor([&]() -> Result { - Result ret = client->NamespaceExists(test_name, YQLDatabase::YQL_DATABASE_PGSQL); - WARN_NOT_OK(ResultToStatus(ret), "ret"); - return ret.ok() && ret.get() == false; - }, MonoDelta::FromSeconds(20), "Verify Namespace was removed by Transaction GC")); -} - -TEST_F(PgLibPqDatabaseTimeoutTest, YB_DISABLE_TEST_IN_TSAN(TestDatabaseTimeoutAndRestartGC)) { - NamespaceName test_name = "test_pgsql"; - auto client = ASSERT_RESULT(cluster_->CreateClient()); - - // Create Database: will timeout because the admin setting is lower than the DB create latency. - { - auto conn = ASSERT_RESULT(Connect()); - ASSERT_NOK(conn.Execute("CREATE DATABASE " + test_name)); - } - - // Verify DocDB Database creation, even though it fails in PG layer. - // 'ysql_transaction_bg_task_wait_ms' setting ensures we can finish this before the GC. - ASSERT_OK(LoggedWaitFor([&]() -> Result { - Result ret = client->NamespaceExists(test_name, YQLDatabase::YQL_DATABASE_PGSQL); - WARN_NOT_OK(ResultToStatus(ret), ""); - return ret.ok() && ret.get() == true; - }, MonoDelta::FromSeconds(60), - "Verify Namespace was created in DocDB")); - - LOG(INFO) << "Restarting Master."; - - // Restart the master before the BG task can kick in and GC the failed transaction. - auto master = cluster_->GetLeaderMaster(); - master->Shutdown(); - ASSERT_OK(master->Restart()); - ASSERT_OK(LoggedWaitFor([&]() -> Result { - auto s = cluster_->GetIsMasterLeaderServiceReady(master); - return s.ok(); - }, MonoDelta::FromSeconds(20), "Wait for Master to be ready.")); - - // Confirm that Catalog Loader deletes the namespace on master restart. - client = ASSERT_RESULT(cluster_->CreateClient()); // Reinit the YBClient after restart. - ASSERT_OK(LoggedWaitFor([&]() -> Result { - Result ret = client->NamespaceExists(test_name, YQLDatabase::YQL_DATABASE_PGSQL); - WARN_NOT_OK(ResultToStatus(ret), ""); - return ret.ok() && ret.get() == false; - }, MonoDelta::FromSeconds(20), "Verify Namespace was removed by Transaction GC")); -} - -class PgLibPqTableTimeoutTest : public PgLibPqTest { - void UpdateMiniClusterOptions(ExternalMiniClusterOptions* options) override { - // Use small clock skew, to decrease number of read restarts. - options->extra_tserver_flags.push_back("--TEST_user_ddl_operation_timeout_sec=1"); - options->extra_master_flags.push_back("--TEST_simulate_slow_table_create_secs=2"); - options->extra_master_flags.push_back("--ysql_transaction_bg_task_wait_ms=3000"); - } -}; - -TEST_F(PgLibPqTableTimeoutTest, YB_DISABLE_TEST_IN_TSAN(TestTableTimeoutGC)) { - const string kDatabaseName ="yugabyte"; - NamespaceName test_name = "test_pgsql_table"; - auto client = ASSERT_RESULT(cluster_->CreateClient()); - - // Create Table: will timeout because the admin setting is lower than the DB create latency. - { - auto conn = ASSERT_RESULT(Connect()); - ASSERT_NOK(conn.Execute("CREATE TABLE " + test_name + " (key INT PRIMARY KEY)")); - } - - // Wait for DocDB Table creation, even though it will fail in PG layer. - // 'ysql_transaction_bg_task_wait_ms' setting ensures we can finish this before the GC. - ASSERT_OK(LoggedWaitFor([&]() -> Result { - LOG(INFO) << "Requesting TableExists"; - auto ret = client->TableExists( - client::YBTableName(YQL_DATABASE_PGSQL, kDatabaseName, test_name)); - WARN_NOT_OK(ResultToStatus(ret), ""); - return ret.ok() && ret.get() == true; - }, MonoDelta::FromSeconds(20), "Verify Table was created in DocDB")); - - // DocDB will notice the PG layer failure because the transaction aborts. - // Confirm that DocDB async deletes the namespace. - ASSERT_OK(LoggedWaitFor([&]() -> Result { - auto ret = client->TableExists( - client::YBTableName(YQL_DATABASE_PGSQL, kDatabaseName, test_name)); - WARN_NOT_OK(ResultToStatus(ret), ""); - return ret.ok() && ret.get() == false; - }, MonoDelta::FromSeconds(20), "Verify Table was removed by Transaction GC")); -} - -TEST_F(PgLibPqTableTimeoutTest, YB_DISABLE_TEST_IN_TSAN(TestTableTimeoutAndRestartGC)) { - const string kDatabaseName ="yugabyte"; - NamespaceName test_name = "test_pgsql_table"; - auto client = ASSERT_RESULT(cluster_->CreateClient()); - - // Create Table: will timeout because the admin setting is lower than the DB create latency. - { - auto conn = ASSERT_RESULT(Connect()); - ASSERT_NOK(conn.Execute("CREATE TABLE " + test_name + " (key INT PRIMARY KEY)")); - } - - // Wait for DocDB Table creation, even though it will fail in PG layer. - // 'ysql_transaction_bg_task_wait_ms' setting ensures we can finish this before the GC. - ASSERT_OK(LoggedWaitFor([&]() -> Result { - LOG(INFO) << "Requesting TableExists"; - auto ret = client->TableExists( - client::YBTableName(YQL_DATABASE_PGSQL, kDatabaseName, test_name)); - WARN_NOT_OK(ResultToStatus(ret), ""); - return ret.ok() && ret.get() == true; - }, MonoDelta::FromSeconds(20), "Verify Table was created in DocDB")); - - LOG(INFO) << "Restarting Master."; - - // Restart the master before the BG task can kick in and GC the failed transaction. - auto master = cluster_->GetLeaderMaster(); - master->Shutdown(); - ASSERT_OK(master->Restart()); - ASSERT_OK(LoggedWaitFor([&]() -> Result { - auto s = cluster_->GetIsMasterLeaderServiceReady(master); - return s.ok(); - }, MonoDelta::FromSeconds(20), "Wait for Master to be ready.")); - - // Confirm that Catalog Loader deletes the namespace on master restart. - client = ASSERT_RESULT(cluster_->CreateClient()); // Reinit the YBClient after restart. - ASSERT_OK(LoggedWaitFor([&]() -> Result { - auto ret = client->TableExists( - client::YBTableName(YQL_DATABASE_PGSQL, kDatabaseName, test_name)); - WARN_NOT_OK(ResultToStatus(ret), ""); - return ret.ok() && ret.get() == false; - }, MonoDelta::FromSeconds(20), "Verify Table was removed by Transaction GC")); -} - -class PgLibPqIndexTableTimeoutTest : public PgLibPqTest { - void UpdateMiniClusterOptions(ExternalMiniClusterOptions* options) override { - options->extra_tserver_flags.push_back("--TEST_user_ddl_operation_timeout_sec=10"); - } -}; - -TEST_F(PgLibPqIndexTableTimeoutTest, YB_DISABLE_TEST_IN_TSAN(TestIndexTableTimeoutGC)) { - const string kDatabaseName ="yugabyte"; - NamespaceName test_name = "test_pgsql_table"; - NamespaceName test_name_idx = test_name + "_idx"; - - auto client = ASSERT_RESULT(cluster_->CreateClient()); - - // Lower the delays so we successfully create this first table. - ASSERT_OK(cluster_->SetFlagOnMasters("ysql_transaction_bg_task_wait_ms", "10")); - ASSERT_OK(cluster_->SetFlagOnMasters("TEST_simulate_slow_table_create_secs", "0")); - - // Create Table that Index will be set on. - { - auto conn = ASSERT_RESULT(Connect()); - ASSERT_OK(conn.Execute("CREATE TABLE " + test_name + " (key INT PRIMARY KEY)")); - } - - // After successfully creating the first table, set to flags similar to: PgLibPqTableTimeoutTest. - ASSERT_OK(cluster_->SetFlagOnMasters("ysql_transaction_bg_task_wait_ms", "13000")); - ASSERT_OK(cluster_->SetFlagOnMasters("TEST_simulate_slow_table_create_secs", "12")); - - // Create Index: will timeout because the admin setting is lower than the DB create latency. - { - auto conn = ASSERT_RESULT(Connect()); - ASSERT_NOK(conn.Execute("CREATE INDEX " + test_name_idx + " ON " + test_name + "(key)")); - } - - // Wait for DocDB Table creation, even though it will fail in PG layer. - // 'ysql_transaction_bg_task_wait_ms' setting ensures we can finish this before the GC. - ASSERT_OK(LoggedWaitFor([&]() -> Result { - LOG(INFO) << "Requesting TableExists"; - auto ret = client->TableExists( - client::YBTableName(YQL_DATABASE_PGSQL, kDatabaseName, test_name_idx)); - WARN_NOT_OK(ResultToStatus(ret), ""); - return ret.ok() && ret.get() == true; - }, MonoDelta::FromSeconds(40), "Verify Index Table was created in DocDB")); - - // DocDB will notice the PG layer failure because the transaction aborts. - // Confirm that DocDB async deletes the namespace. - ASSERT_OK(LoggedWaitFor([&]() -> Result { - auto ret = client->TableExists( - client::YBTableName(YQL_DATABASE_PGSQL, kDatabaseName, test_name_idx)); - WARN_NOT_OK(ResultToStatus(ret), ""); - return ret.ok() && ret.get() == false; - }, MonoDelta::FromSeconds(40), "Verify Index Table was removed by Transaction GC")); -} - class PgLibPqTestEnumType: public PgLibPqTest { public: void UpdateMiniClusterOptions(ExternalMiniClusterOptions* options) override {