diff --git a/java/yb-pgsql/src/test/java/org/yb/pgsql/TestYsqlUpgrade.java b/java/yb-pgsql/src/test/java/org/yb/pgsql/TestYsqlUpgrade.java index 986032b1cae0..4ce114cf9b61 100644 --- a/java/yb-pgsql/src/test/java/org/yb/pgsql/TestYsqlUpgrade.java +++ b/java/yb-pgsql/src/test/java/org/yb/pgsql/TestYsqlUpgrade.java @@ -441,7 +441,7 @@ public void creatingSystemRelsAfterFailure() throws Exception { + ")"; stmt.execute("SET yb_test_fail_next_ddl TO true"); - runInvalidQuery(stmt, ddlSql, "DDL failed as requested"); + runInvalidQuery(stmt, ddlSql, "Failed DDL operation as requested"); // Letting CatalogManagerBgTasks do the cleanup. Thread.sleep(BuildTypeUtil.adjustTimeout(5000)); diff --git a/src/postgres/src/backend/commands/tablecmds.c b/src/postgres/src/backend/commands/tablecmds.c index b54a47413052..c614ed81c7ae 100644 --- a/src/postgres/src/backend/commands/tablecmds.c +++ b/src/postgres/src/backend/commands/tablecmds.c @@ -991,9 +991,6 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId, InvalidOid /* matviewPgTableId */); } - /* For testing purposes, user might ask us to fail a DLL. */ - YBTestFailDdlIfRequested(); - /* * Open the new relation and acquire exclusive lock on it. This isn't * really necessary for locking out other backends (since they can't see @@ -3896,12 +3893,21 @@ ATController(AlterTableStmt *parsetree, } PG_CATCH(); { - /* Rollback the DocDB changes. */ - ListCell *lc = NULL; - foreach(lc, rollbackHandles) + if (!*YBCGetGFlags()->ysql_ddl_rollback_enabled || + rel->yb_table_properties->is_colocated) { - YBCPgStatement handle = (YBCPgStatement) lfirst(lc); - YBCExecAlterTable(handle, RelationGetRelid(rel)); + /* + * The new way of doing ddl rollback is disabled (or unsupported, + * as in the case of colocated tables) fall back to the old way of + * doing a best-effort rollback which may not always succeed + * (e.g., in case of network failure or PG crash). + */ + ListCell *lc = NULL; + foreach(lc, rollbackHandles) + { + YBCPgStatement handle = (YBCPgStatement) lfirst(lc); + YBCExecAlterTable(handle, RelationGetRelid(rel)); + } } PG_RE_THROW(); } diff --git a/src/postgres/src/backend/commands/ybccmds.c b/src/postgres/src/backend/commands/ybccmds.c index e6ec15b058cc..ce88e510c853 100644 --- a/src/postgres/src/backend/commands/ybccmds.c +++ b/src/postgres/src/backend/commands/ybccmds.c @@ -34,7 +34,6 @@ #include "catalog/pg_class.h" #include "catalog/pg_constraint.h" #include "catalog/pg_database.h" -#include "catalog/pg_namespace.h" #include "catalog/pg_tablespace.h" #include "catalog/pg_type.h" #include "catalog/pg_type_d.h" @@ -697,15 +696,31 @@ YBCDropTable(Relation relation) false, /* if_exists */ &handle), ¬_found); - const bool valid_handle = !not_found; - if (valid_handle) + if (not_found) + { + return; + } + /* + * YSQL DDL Rollback is not yet supported for colocated tables. + */ + if (*YBCGetGFlags()->ysql_ddl_rollback_enabled && + !yb_props->is_colocated) { /* - * We cannot abort drop in DocDB so postpone the execution until - * the rest of the statement/txn is finished executing. + * The following issues a request to the YB-Master to drop the + * table once this transaction commits. */ - YBSaveDdlHandle(handle); + HandleYBStatusIgnoreNotFound(YBCPgExecDropTable(handle), + ¬_found); + return; } + /* + * YSQL DDL Rollback is disabled/unsupported. This means DocDB will not + * rollback the drop if the transaction ends up failing. We cannot + * abort drop in DocDB so postpone the execution until the rest of the + * statement/txn finishes executing. + */ + YBSaveDdlHandle(handle); } } diff --git a/src/postgres/src/backend/utils/misc/guc.c b/src/postgres/src/backend/utils/misc/guc.c index 873a0ae65933..eef7e216467b 100644 --- a/src/postgres/src/backend/utils/misc/guc.c +++ b/src/postgres/src/backend/utils/misc/guc.c @@ -2020,8 +2020,8 @@ static struct config_bool ConfigureNamesBool[] = { {"yb_test_system_catalogs_creation", PGC_SUSET, DEVELOPER_OPTIONS, - gettext_noop("Relaxes some internal sanity checks for system catalogs to " - "allow creating them."), + gettext_noop("Relaxes some internal sanity checks for system " + "catalogs to allow creating them."), NULL, GUC_NOT_IN_SAMPLE }, @@ -2032,8 +2032,8 @@ static struct config_bool ConfigureNamesBool[] = { {"yb_test_fail_next_ddl", PGC_USERSET, DEVELOPER_OPTIONS, - gettext_noop("When set, the next DDL (only CREATE TABLE for now) " - "will fail right after DocDB processes the actual database structure change."), + gettext_noop("When set, the next DDL will fail right before " + "commit."), NULL, GUC_NOT_IN_SAMPLE }, diff --git a/src/postgres/src/backend/utils/misc/pg_yb_utils.c b/src/postgres/src/backend/utils/misc/pg_yb_utils.c index 3421bb180754..df938aa297d8 100644 --- a/src/postgres/src/backend/utils/misc/pg_yb_utils.c +++ b/src/postgres/src/backend/utils/misc/pg_yb_utils.c @@ -1127,6 +1127,11 @@ YBDecrementDdlNestingLevel(bool is_catalog_version_increment, --ddl_transaction_state.nesting_level; if (ddl_transaction_state.nesting_level == 0) { + if (yb_test_fail_next_ddl) + { + yb_test_fail_next_ddl = false; + elog(ERROR, "Failed DDL operation as requested"); + } if (GetCurrentMemoryContext() == ddl_transaction_state.mem_context) MemoryContextSwitchTo(ddl_transaction_state.mem_context->parent); /* @@ -1638,14 +1643,6 @@ void YBCFillUniqueIndexNullAttribute(YBCPgYBTupleIdDescriptor* descr) { last_attr->is_null = true; } -void YBTestFailDdlIfRequested() { - if (!yb_test_fail_next_ddl) - return; - - yb_test_fail_next_ddl = false; - elog(ERROR, "DDL failed as requested"); -} - void YbTestGucBlockWhileStrEqual(char **actual, const char *expected, const char *msg) diff --git a/src/postgres/src/include/pg_yb_utils.h b/src/postgres/src/include/pg_yb_utils.h index 8138d86000f0..b48b3967d19a 100644 --- a/src/postgres/src/include/pg_yb_utils.h +++ b/src/postgres/src/include/pg_yb_utils.h @@ -565,8 +565,6 @@ YbTableProperties YbTryGetTableProperties(Relation rel); */ bool YBIsSupportedLibcLocale(const char *localebuf); -void YBTestFailDdlIfRequested(); - /* Spin wait while test guc var actual equals expected. */ extern void YbTestGucBlockWhileStrEqual(char **actual, const char *expected, const char *msg); diff --git a/src/yb/client/CMakeLists.txt b/src/yb/client/CMakeLists.txt index c7514269b23b..09a970c3ac0a 100644 --- a/src/yb/client/CMakeLists.txt +++ b/src/yb/client/CMakeLists.txt @@ -109,7 +109,7 @@ endif() # and therefore don't need to worry about export strictness) ADD_YB_LIBRARY(yb_client_test_util SRCS client-test-util.cc - DEPS gmock gtest yb_client) + DEPS gmock gtest yb_client yb_test_util) ADD_YB_LIBRARY(ql-dml-test-base SRCS ql-dml-test-base.cc txn-test-base.cc snapshot_test_util.cc diff --git a/src/yb/client/client-internal.cc b/src/yb/client/client-internal.cc index 81d2d4fdf669..e59de4a82c8b 100644 --- a/src/yb/client/client-internal.cc +++ b/src/yb/client/client-internal.cc @@ -108,6 +108,7 @@ DEFINE_test_flag(string, assert_tablet_server_select_is_in_zone, "", DECLARE_int64(reset_master_leader_timeout_ms); DECLARE_string(flagfile); +DECLARE_bool(ysql_ddl_rollback_enabled); namespace yb { @@ -558,7 +559,8 @@ Status YBClient::Data::DeleteTable(YBClient* client, const bool is_index_table, CoarseTimePoint deadline, YBTableName* indexed_table_name, - bool wait) { + bool wait, + const TransactionMetadata *txn) { DeleteTableRequestPB req; DeleteTableResponsePB resp; int attempts = 0; @@ -569,6 +571,14 @@ Status YBClient::Data::DeleteTable(YBClient* client, if (!table_id.empty()) { req.mutable_table()->set_table_id(table_id); } + if (FLAGS_ysql_ddl_rollback_enabled && txn) { + // If 'txn' is set, this means this delete operation should actually result in the + // deletion of table data only if this transaction is a success. Therefore ensure that + // 'wait' is not set, because it makes no sense to wait for the deletion to complete if we want + // to postpone the deletion until end of transaction. + DCHECK(!wait); + txn->ToPB(req.mutable_transaction()); + } req.set_is_index_table(is_index_table); const Status status = SyncLeaderMasterRpc( deadline, req, &resp, "DeleteTable", &master::MasterDdlProxy::DeleteTableAsync, diff --git a/src/yb/client/client-internal.h b/src/yb/client/client-internal.h index c13431675c36..0c3cc98e8b90 100644 --- a/src/yb/client/client-internal.h +++ b/src/yb/client/client-internal.h @@ -42,6 +42,7 @@ #include "yb/common/common_net.pb.h" #include "yb/common/entity_ids.h" #include "yb/common/index.h" +#include "yb/common/transaction.h" #include "yb/common/wire_protocol.h" #include "yb/master/master_fwd.h" @@ -137,12 +138,13 @@ class YBClient::Data { // Take one of table id or name. Status DeleteTable(YBClient* client, - const YBTableName& table_name, - const std::string& table_id, - bool is_index_table, - CoarseTimePoint deadline, - YBTableName* indexed_table_name, - bool wait = true); + const YBTableName& table_name, + const std::string& table_id, + bool is_index_table, + CoarseTimePoint deadline, + YBTableName* indexed_table_name, + bool wait = true, + const TransactionMetadata *txn = nullptr); Status IsDeleteTableInProgress(YBClient* client, const std::string& table_id, diff --git a/src/yb/client/client-test-util.cc b/src/yb/client/client-test-util.cc index 14396ed449f6..df5ff6ede528 100644 --- a/src/yb/client/client-test-util.cc +++ b/src/yb/client/client-test-util.cc @@ -43,16 +43,21 @@ #include #include "yb/client/client_fwd.h" +#include "yb/client/client.h" #include "yb/client/error.h" #include "yb/client/schema.h" #include "yb/client/session.h" #include "yb/client/table_handle.h" #include "yb/client/yb_op.h" +#include "yb/client/yb_table_name.h" #include "yb/common/common.pb.h" #include "yb/common/ql_type.h" #include "yb/common/ql_value.h" +#include "yb/master/master_types.pb.h" + +#include "yb/util/backoff_waiter.h" #include "yb/util/enums.h" #include "yb/util/monotime.h" #include "yb/util/status.h" @@ -60,12 +65,42 @@ #include "yb/util/status_log.h" #include "yb/util/strongly_typed_bool.h" #include "yb/util/test_macros.h" +#include "yb/util/test_util.h" using std::string; namespace yb { namespace client { +namespace { + void VerifyNamespace(client::YBClient *client, + const NamespaceName& db_name, + const bool exists, + const int timeout_secs) { + ASSERT_OK(LoggedWaitFor([&]() -> Result { + Result ret = client->NamespaceExists(db_name, YQLDatabase::YQL_DATABASE_PGSQL); + WARN_NOT_OK(ResultToStatus(ret), "NamespaceExists call failed"); + return ret.ok() && ret.get() == exists; + }, MonoDelta::FromSeconds(timeout_secs), + Format("Verify Namespace $0 $1 exists", db_name, (exists ? "" : "not")))); + } + + void VerifyTable(client::YBClient* client, + const string& database_name, + const string& table_name, + const int timeout_secs, + const bool exists) { + ASSERT_OK(LoggedWaitFor([&]() -> Result { + auto ret = client->TableExists( + client::YBTableName(YQL_DATABASE_PGSQL, database_name, table_name)); + WARN_NOT_OK(ResultToStatus(ret), "TableExists call failed"); + return ret.ok() && ret.get() == exists; + }, MonoDelta::FromSeconds(timeout_secs), + Format("Verify Table $0 $1 exists in database $2", + table_name, (exists ? "" : "not"), database_name))); + } +} // namespace + void LogSessionErrorsAndDie(const FlushStatus& flush_status) { const auto& s = flush_status.status; CHECK(!s.ok()); @@ -158,5 +193,55 @@ std::shared_ptr CreateReadOp( return op; } +Result GetNamespaceIdByNamespaceName(YBClient* client, + const string& namespace_name) { + const auto namespaces = VERIFY_RESULT(client->ListNamespaces(YQL_DATABASE_PGSQL)); + for (const auto& ns : namespaces) { + if (ns.name() == namespace_name) { + return ns.id(); + } + } + return STATUS_SUBSTITUTE(NotFound, "The namespace $0 does not exist", namespace_name); +} + +Result GetTableIdByTableName(client::YBClient* client, + const string& namespace_name, + const string& table_name) { + const auto tables = VERIFY_RESULT(client->ListTables()); + for (const auto& t : tables) { + if (t.namespace_name() == namespace_name && t.table_name() == table_name) { + return t.table_id(); + } + } + return STATUS_SUBSTITUTE(NotFound, "The table $0 does not exist in namespace $1", + table_name, namespace_name); +} + +void VerifyNamespaceExists(YBClient *client, + const NamespaceName& db_name, + const int timeout_secs) { + VerifyNamespace(client, db_name, true /* exists */, timeout_secs); +} + +void VerifyNamespaceNotExists(YBClient *client, + const NamespaceName& db_name, + const int timeout_secs) { + VerifyNamespace(client, db_name, false /* exists */, timeout_secs); +} + +void VerifyTableExists(YBClient* client, + const string& database_name, + const string& table_name, + const int timeout_secs) { + VerifyTable(client, database_name, table_name, timeout_secs, true /* exists */); +} + +void VerifyTableNotExists(YBClient* client, + const string& database_name, + const string& table_name, + const int timeout_secs) { + VerifyTable(client, database_name, table_name, timeout_secs, false /* exists */); +} + } // namespace client } // namespace yb diff --git a/src/yb/client/client-test-util.h b/src/yb/client/client-test-util.h index 8322cb7699a8..8d35aa4afd6d 100644 --- a/src/yb/client/client-test-util.h +++ b/src/yb/client/client-test-util.h @@ -74,5 +74,23 @@ YBSchema YBSchemaFromSchema(const Schema& schema); std::shared_ptr CreateReadOp( int32_t key, const TableHandle& table, const std::string& value_column); +Result GetNamespaceIdByNamespaceName( + YBClient* client, const std::string& namespace_name); + +Result GetTableIdByTableName( + YBClient* client, const std::string& namespace_name, const std::string& table_name); + +void VerifyNamespaceExists(YBClient *client, const NamespaceName& db_name, int timeout_secs = 20); + +void VerifyNamespaceNotExists( + YBClient *client, const NamespaceName& db_name, int timeout_secs = 20); + +void VerifyTableExists( + YBClient* client, const std::string& db_name, const std::string& table_name, int timeout_secs); + +void VerifyTableNotExists( + YBClient* client, const std::string& db_name, const std::string& table_name, int timeout_secs); + + } // namespace client } // namespace yb diff --git a/src/yb/client/client.cc b/src/yb/client/client.cc index 236c1a800f55..52adcc16fe76 100644 --- a/src/yb/client/client.cc +++ b/src/yb/client/client.cc @@ -633,14 +633,18 @@ Status YBClient::DeleteTable(const YBTableName& table_name, bool wait) { wait); } -Status YBClient::DeleteTable(const string& table_id, bool wait, CoarseTimePoint deadline) { +Status YBClient::DeleteTable(const string& table_id, + bool wait, + const TransactionMetadata *txn, + CoarseTimePoint deadline) { return data_->DeleteTable(this, YBTableName(), table_id, false /* is_index_table */, PatchAdminDeadline(deadline), nullptr /* indexed_table_name */, - wait); + wait, + txn); } Status YBClient::DeleteIndexTable(const YBTableName& table_name, diff --git a/src/yb/client/client.h b/src/yb/client/client.h index 926f7dc10b81..5f8273ca2868 100644 --- a/src/yb/client/client.h +++ b/src/yb/client/client.h @@ -53,6 +53,7 @@ #include "yb/common/common_types.pb.h" #include "yb/common/entity_ids.h" #include "yb/common/retryable_request.h" +#include "yb/common/transaction.h" #include "yb/gutil/macros.h" #include "yb/gutil/port.h" @@ -258,8 +259,13 @@ class YBClient { // Delete the specified table. // Set 'wait' to true if the call must wait for the table to be fully deleted before returning. Status DeleteTable(const YBTableName& table_name, bool wait = true); - Status DeleteTable( - const std::string& table_id, bool wait = true, CoarseTimePoint deadline = CoarseTimePoint()); + // 'txn' describes the transaction that is performing this delete operation. For YSQL + // operations, YB-Master will perform the actual deletion only if this transaction is a + // success. + Status DeleteTable(const std::string& table_id, + bool wait = true, + const TransactionMetadata *txn = nullptr, + CoarseTimePoint deadline = CoarseTimePoint()); // Delete the specified index table. // Set 'wait' to true if the call must wait for the table to be fully deleted before returning. diff --git a/src/yb/client/snapshot_test_util.cc b/src/yb/client/snapshot_test_util.cc index 867925f31793..eabfeda43344 100644 --- a/src/yb/client/snapshot_test_util.cc +++ b/src/yb/client/snapshot_test_util.cc @@ -297,6 +297,31 @@ Result SnapshotTestUtil::CreateSchedule( return id; } +Result SnapshotTestUtil::CreateSchedule( + const NamespaceName& namespace_name, WaitSnapshot wait_snapshot, + MonoDelta interval, MonoDelta retention) { + + rpc::RpcController controller; + controller.set_timeout(60s); + master::CreateSnapshotScheduleRequestPB req; + auto& options = *req.mutable_options(); + options.set_interval_sec(interval.ToSeconds()); + options.set_retention_duration_sec(retention.ToSeconds()); + auto& tables = *options.mutable_filter()->mutable_tables()->mutable_tables(); + auto* ns = tables.Add()->mutable_namespace_(); + ns->set_name(namespace_name); + ns->set_database_type(YQLDatabase::YQL_DATABASE_PGSQL); + master::CreateSnapshotScheduleResponsePB resp; + RETURN_NOT_OK( + VERIFY_RESULT(MakeBackupServiceProxy()).CreateSnapshotSchedule(req, &resp, &controller)); + auto id = VERIFY_RESULT(FullyDecodeSnapshotScheduleId(resp.snapshot_schedule_id())); + if (wait_snapshot) { + RETURN_NOT_OK(WaitScheduleSnapshot(id, std::numeric_limits::max(), + HybridTime::kMin, 60s * kTimeMultiplier)); + } + return id; +} + Result SnapshotTestUtil::ListSchedules(const SnapshotScheduleId& id) { master::ListSnapshotSchedulesRequestPB req; master::ListSnapshotSchedulesResponsePB resp; @@ -338,9 +363,15 @@ Status SnapshotTestUtil::WaitScheduleSnapshot( return WaitScheduleSnapshot(schedule_id, std::numeric_limits::max(), min_hybrid_time); } +Status SnapshotTestUtil::WaitScheduleSnapshot( + const SnapshotScheduleId& schedule_id, int max_snapshots, HybridTime min_hybrid_time) { + return WaitScheduleSnapshot(schedule_id, max_snapshots, min_hybrid_time, + ((max_snapshots == 1) ? 0s : kSnapshotInterval) + kSnapshotInterval / 2); +} + Status SnapshotTestUtil::WaitScheduleSnapshot( const SnapshotScheduleId& schedule_id, int max_snapshots, - HybridTime min_hybrid_time) { + HybridTime min_hybrid_time, MonoDelta timeout) { return WaitFor([this, schedule_id, max_snapshots, min_hybrid_time]() -> Result { auto snapshots = VERIFY_RESULT(ListSnapshots()); EXPECT_LE(snapshots.size(), max_snapshots); @@ -354,7 +385,7 @@ Status SnapshotTestUtil::WaitScheduleSnapshot( } return false; }, - ((max_snapshots == 1) ? 0s : kSnapshotInterval) + kSnapshotInterval / 2, + timeout, "Schedule snapshot"); } diff --git a/src/yb/client/snapshot_test_util.h b/src/yb/client/snapshot_test_util.h index 69fb139a0168..fcde00269957 100644 --- a/src/yb/client/snapshot_test_util.h +++ b/src/yb/client/snapshot_test_util.h @@ -48,13 +48,13 @@ class SnapshotTestUtil { void SetProxy(rpc::ProxyCache* proxy_cache) { proxy_cache_ = proxy_cache; } - void SetCluster(MiniCluster* cluster) { + void SetCluster(MiniClusterBase* cluster) { cluster_ = cluster; } Result MakeBackupServiceProxy() { return master::MasterBackupProxy( - proxy_cache_, VERIFY_RESULT(cluster_->GetLeaderMiniMaster())->bound_rpc_addr()); + proxy_cache_, VERIFY_RESULT(cluster_->GetLeaderMasterBoundRpcAddr())); } Result SnapshotState(const TxnSnapshotId& snapshot_id); @@ -98,6 +98,9 @@ class SnapshotTestUtil { const YBTablePtr table, YQLDatabase db_type, const std::string& db_name, WaitSnapshot wait_snapshot, MonoDelta interval = kSnapshotInterval, MonoDelta retention = kSnapshotRetention); + Result CreateSchedule( + const NamespaceName& database, WaitSnapshot wait_snapshot, + MonoDelta interval = kSnapshotInterval, MonoDelta retention = kSnapshotRetention); Result ListSchedules(const SnapshotScheduleId& id = SnapshotScheduleId::Nil()); @@ -111,9 +114,13 @@ class SnapshotTestUtil { const SnapshotScheduleId& schedule_id, int max_snapshots = 1, HybridTime min_hybrid_time = HybridTime::kMin); + Status WaitScheduleSnapshot( + const SnapshotScheduleId& schedule_id, int max_snapshots, + HybridTime min_hybrid_time, MonoDelta timeout); + private: rpc::ProxyCache* proxy_cache_; - MiniCluster* cluster_; + MiniClusterBase* cluster_; }; } // namespace client diff --git a/src/yb/common/common_flags.cc b/src/yb/common/common_flags.cc index 7e2558fa5058..33a82caa5be0 100644 --- a/src/yb/common/common_flags.cc +++ b/src/yb/common/common_flags.cc @@ -57,6 +57,14 @@ DEFINE_UNKNOWN_bool(enable_wait_queues, false, "If true, use pessimistic locking behavior in conflict resolution."); TAG_FLAG(enable_wait_queues, evolving); +DEFINE_RUNTIME_bool(ysql_ddl_rollback_enabled, false, + "If true, failed YSQL DDL transactions that affect both pg catalog and DocDB schema " + "will be rolled back by YB-Master. Note that this is applicable only for few DDL " + "operations such as dropping a table, adding a column, renaming a column/table. This " + "flag should not be changed in the middle of a DDL operation."); +TAG_FLAG(ysql_ddl_rollback_enabled, hidden); +TAG_FLAG(ysql_ddl_rollback_enabled, advanced); + DEFINE_test_flag(bool, enable_db_catalog_version_mode, false, "Enable the per database catalog version mode, a DDL statement is assumed to " "only affect the current database and will only increment catalog version for " diff --git a/src/yb/master/catalog_entity_info.h b/src/yb/master/catalog_entity_info.h index 4dd470439db1..051a6940d418 100644 --- a/src/yb/master/catalog_entity_info.h +++ b/src/yb/master/catalog_entity_info.h @@ -386,6 +386,33 @@ struct PersistentTableInfo : public Persistent 0; + } + + auto ysql_ddl_txn_verifier_state() const { + // Currently DDL with savepoints is disabled, so this repeated field can have only 1 element. + DCHECK_EQ(pb.ysql_ddl_txn_verifier_state_size(), 1); + return pb.ysql_ddl_txn_verifier_state(0); + } + + bool is_being_deleted_by_ysql_ddl_txn() const { + return has_ysql_ddl_txn_verifier_state() && + ysql_ddl_txn_verifier_state().contains_drop_table_op(); + } + + bool is_being_created_by_ysql_ddl_txn() const { + return has_ysql_ddl_txn_verifier_state() && + ysql_ddl_txn_verifier_state().contains_create_table_op(); + } + // Helper to set the state of the tablet with a custom message. void set_state(SysTablesEntryPB::State state, const std::string& msg); }; diff --git a/src/yb/master/catalog_entity_info.proto b/src/yb/master/catalog_entity_info.proto index 7331821faa74..ddc6dbf8b613 100644 --- a/src/yb/master/catalog_entity_info.proto +++ b/src/yb/master/catalog_entity_info.proto @@ -163,9 +163,6 @@ message SysTablesEntryPB { repeated IndexInfoPB fully_applied_indexes = 26; optional IndexInfoPB fully_applied_index_info = 27; - // Optional: Table dependent upon transaction success (abort removes table). Used by YSQL. - optional TransactionMetadataPB transaction = 29; - // During an alter table, which involves no schema change but only updating // a permission, is it sometimes acceptable for a client's request (which is // prepared with the current schema) to be accepted by a tserver which is @@ -185,6 +182,24 @@ message SysTablesEntryPB { optional bytes transaction_table_tablespace_id = 34; // Time when the table was hidden. optional fixed64 hide_hybrid_time = 35; + + message YsqlDdlTxnVerifierState { + optional bool contains_create_table_op = 1; + optional bool contains_alter_table_op = 2; + optional bool contains_drop_table_op = 3; + + optional SchemaPB previous_schema = 4; + optional string previous_table_name = 5; + } + + // State that indicates that this table is being changed by a YSQL transaction. + // This repeated field contains only a single element as of now. When we support DDL + Savepoints, + // we will have one element for every savepoint modifying this table in this field. + repeated YsqlDdlTxnVerifierState ysql_ddl_txn_verifier_state = 38; + + // YSQL transaction that is currently modifying this table state. The changes being performed by + // it are detailed in 'ysql_ddl_txn_state' above. + optional TransactionMetadataPB transaction = 29; } // The on-disk entry in the sys.catalog table ("metadata" column) for diff --git a/src/yb/master/catalog_loaders.cc b/src/yb/master/catalog_loaders.cc index a472640ae460..45f03ae70992 100644 --- a/src/yb/master/catalog_loaders.cc +++ b/src/yb/master/catalog_loaders.cc @@ -50,6 +50,8 @@ DEFINE_UNKNOWN_bool(master_ignore_deleted_on_load, true, DEFINE_test_flag(uint64, slow_cluster_config_load_secs, 0, "When set, it pauses load of cluster config during sys catalog load."); +DECLARE_bool(ysql_ddl_rollback_enabled); + namespace yb { namespace master { @@ -113,14 +115,24 @@ Status TableLoader::Visit(const TableId& table_id, const SysTablesEntryPB& metad // Tables created as part of a Transaction should check transaction status and be deleted // if the transaction is aborted. if (metadata.has_transaction()) { - LOG(INFO) << "Enqueuing table for Transaction Verification: " << table->ToString(); TransactionMetadata txn = VERIFY_RESULT(TransactionMetadata::FromPB(metadata.transaction())); - std::function when_done = - std::bind(&CatalogManager::VerifyTablePgLayer, catalog_manager_, table, _1); - WARN_NOT_OK(catalog_manager_->background_tasks_thread_pool_->SubmitFunc( - std::bind(&YsqlTransactionDdl::VerifyTransaction, catalog_manager_->ysql_transaction_.get(), - txn, when_done)), - "Could not submit VerifyTransaction to thread pool"); + if (metadata.ysql_ddl_txn_verifier_state_size() > 0) { + if (FLAGS_ysql_ddl_rollback_enabled) { + catalog_manager_->ScheduleYsqlTxnVerification(table, txn); + } + } else { + // This is a table/index for which YSQL transaction verification is not supported yet. + // For these, we only support rolling back creating the table. If the transaction has + // completed, merely check for the presence of this entity in the PG catalog. + LOG(INFO) << "Enqueuing table for Transaction Verification: " << table->ToString(); + std::function when_done = + std::bind(&CatalogManager::VerifyTablePgLayer, catalog_manager_, table, _1); + WARN_NOT_OK(catalog_manager_->background_tasks_thread_pool_->SubmitFunc( + std::bind(&YsqlTransactionDdl::VerifyTransaction, + catalog_manager_->ysql_transaction_.get(), + txn, table, false /* has_ysql_ddl_txn_state */, when_done)), + "Could not submit VerifyTransaction to thread pool"); + } } LOG(INFO) << "Loaded metadata for table " << table->ToString() << ", state: " @@ -390,7 +402,11 @@ Status NamespaceLoader::Visit(const NamespaceId& ns_id, const SysNamespaceEntryP std::bind(&CatalogManager::VerifyNamespacePgLayer, catalog_manager_, ns, _1); WARN_NOT_OK(catalog_manager_->background_tasks_thread_pool_->SubmitFunc( std::bind(&YsqlTransactionDdl::VerifyTransaction, - catalog_manager_->ysql_transaction_.get(), txn, when_done)), + catalog_manager_->ysql_transaction_.get(), + txn, + nullptr /* table */, + false /* has_ysql_ddl_state */, + when_done)), "Could not submit VerifyTransaction to thread pool"); } break; diff --git a/src/yb/master/catalog_manager.cc b/src/yb/master/catalog_manager.cc index 9062ed787506..1f01db91283b 100644 --- a/src/yb/master/catalog_manager.cc +++ b/src/yb/master/catalog_manager.cc @@ -84,6 +84,7 @@ #include "yb/common/ql_type.h" #include "yb/common/roles_permissions.h" #include "yb/common/schema.h" +#include "yb/common/transaction.h" #include "yb/common/wire_protocol.h" #include "yb/consensus/consensus.h" @@ -344,6 +345,8 @@ DEFINE_RUNTIME_bool(enable_transactional_ddl_gc, true, "A kill switch for transactional DDL GC. Temporary safety measure."); TAG_FLAG(enable_transactional_ddl_gc, hidden); +DECLARE_bool(ysql_ddl_rollback_enabled); + // TODO: should this be a test flag? DEFINE_RUNTIME_bool(hide_pg_catalog_table_creation_logs, false, "Whether to hide detailed log messages for PostgreSQL catalog table creation. " @@ -3273,7 +3276,8 @@ Status CatalogManager::CreateYsqlSysTable( std::function when_done = std::bind(&CatalogManager::VerifyTablePgLayer, this, table, _1); WARN_NOT_OK(background_tasks_thread_pool_->SubmitFunc( - std::bind(&YsqlTransactionDdl::VerifyTransaction, ysql_transaction_.get(), txn, when_done)), + std::bind(&YsqlTransactionDdl::VerifyTransaction, ysql_transaction_.get(), + txn, table, false /* has_ysql_txn_ddl_state */, when_done)), "Could not submit VerifyTransaction to thread pool"); } @@ -4001,11 +4005,21 @@ Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req, // Tables with a transaction should be rolled back if the transaction does not get committed. // Store this on the table persistent state until the transaction has been a verified success. TransactionMetadata txn; - if (req.has_transaction() && FLAGS_enable_transactional_ddl_gc) { + bool schedule_ysql_txn_verifier = false; + if (req.has_transaction() && + (FLAGS_enable_transactional_ddl_gc || FLAGS_ysql_ddl_rollback_enabled)) { table->mutable_metadata()->mutable_dirty()->pb.mutable_transaction()-> CopyFrom(req.transaction()); txn = VERIFY_RESULT(TransactionMetadata::FromPB(req.transaction())); RSTATUS_DCHECK(!txn.status_tablet.empty(), Corruption, "Given incomplete Transaction"); + + // Set the YsqlTxnVerifierState. + if (FLAGS_ysql_ddl_rollback_enabled && !IsIndex(req) && !colocated && + !table->is_matview()) { + table->mutable_metadata()->mutable_dirty()->pb.add_ysql_ddl_txn_verifier_state()-> + set_contains_create_table_op(true); + schedule_ysql_txn_verifier = true; + } } if (PREDICT_FALSE(FLAGS_TEST_simulate_slow_table_create_secs > 0) && @@ -4089,13 +4103,18 @@ Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req, } // Verify Transaction gets committed, which occurs after table create finishes. - if (req.has_transaction() && PREDICT_TRUE(FLAGS_enable_transactional_ddl_gc)) { - LOG(INFO) << "Enqueuing table for Transaction Verification: " << req.name(); - std::function when_done = - std::bind(&CatalogManager::VerifyTablePgLayer, this, table, _1); - WARN_NOT_OK(background_tasks_thread_pool_->SubmitFunc( - std::bind(&YsqlTransactionDdl::VerifyTransaction, ysql_transaction_.get(), txn, when_done)), - "Could not submit VerifyTransaction to thread pool"); + if (req.has_transaction()) { + if (schedule_ysql_txn_verifier) { + ScheduleYsqlTxnVerification(table, txn); + } else if (PREDICT_TRUE(FLAGS_enable_transactional_ddl_gc)) { + LOG(INFO) << "Enqueuing table for Transaction Verification: " << req.name(); + std::function when_done = + std::bind(&CatalogManager::VerifyTablePgLayer, this, table, _1); + WARN_NOT_OK(background_tasks_thread_pool_->SubmitFunc( + std::bind(&YsqlTransactionDdl::VerifyTransaction, ysql_transaction_.get(), + txn, table, false /* has_ysql_ddl_txn_state */, when_done)), + "Could not submit VerifyTransaction to thread pool"); + } } LOG(INFO) << "Successfully created " << object_type << " " << table->ToString() << " in " @@ -5595,6 +5614,59 @@ Status CatalogManager::DeleteTable( } } + // Check whether DDL rollback is enabled. + if (FLAGS_ysql_ddl_rollback_enabled && req->has_transaction()) { + bool ysql_txn_verifier_state_present = false; + bool table_created_by_same_transaction = false; + auto l = table->LockForWrite(); + if (l->has_ysql_ddl_txn_verifier_state()) { + DCHECK(!l->pb_transaction_id().empty()); + + // If the table is already undergoing DDL transaction verification as part of a different + // transaction then fail this request. + if (l->pb_transaction_id() != req->transaction().transaction_id()) { + const Status s = STATUS(TryAgain, "Table is undergoing DDL transaction verification"); + return SetupError(resp->mutable_error(), MasterErrorPB::TABLE_SCHEMA_CHANGE_IN_PROGRESS, s); + } + // This DROP operation is part of a DDL transaction that has already made changes + // to this table. + ysql_txn_verifier_state_present = true; + // If this table is being dropped in the same transaction where it was being created, then + // it can be dropped without waiting for the transaction to end. This is because this table + // will be dropped anyway whether this transaction commits or aborts. + table_created_by_same_transaction = l->is_being_created_by_ysql_ddl_txn(); + } + + // If this table has not been created in the same transaction that is dropping it, mark this + // table for deletion upon successful commit of this transaction. + if (!table_created_by_same_transaction && + table->GetTableType() == PGSQL_TABLE_TYPE && !table->is_matview() && + !req->is_index_table() && !table->colocated()) { + // Setup a background task. It monitors the YSQL transaction. If it commits, the task drops + // the table. Otherwise it removes the deletion marker in the ysql_ddl_txn_verifier_state. + TransactionMetadata txn; + auto& pb = l.mutable_data()->pb; + if (!ysql_txn_verifier_state_present) { + pb.mutable_transaction()->CopyFrom(req->transaction()); + txn = VERIFY_RESULT(TransactionMetadata::FromPB(req->transaction())); + RSTATUS_DCHECK(!txn.status_tablet.empty(), Corruption, "Given incomplete Transaction"); + pb.add_ysql_ddl_txn_verifier_state()->set_contains_drop_table_op(true); + } else { + DCHECK_EQ(pb.ysql_ddl_txn_verifier_state_size(), 1); + pb.mutable_ysql_ddl_txn_verifier_state(0)->set_contains_drop_table_op(true); + } + // Upsert to sys_catalog. + RETURN_NOT_OK(sys_catalog_->Upsert(leader_ready_term(), table)); + // Update the in-memory state. + TRACE("Committing in-memory state as part of DeleteTable operation"); + l.Commit(); + if (!ysql_txn_verifier_state_present) { + ScheduleYsqlTxnVerification(table, txn); + } + return Status::OK(); + } + } + return DeleteTableInternal(req, resp, rpc); } @@ -6207,6 +6279,16 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req, auto l = table->LockForWrite(); RETURN_NOT_OK(CatalogManagerUtil::CheckIfTableDeletedOrNotVisibleToClient(l, resp)); + // If the table is already undergoing an alter operation, return failure. + if (FLAGS_ysql_ddl_rollback_enabled && l->has_ysql_ddl_txn_verifier_state()) { + DCHECK(req->has_transaction()); + DCHECK(req->transaction().has_transaction_id()); + if (l->pb_transaction_id() != req->transaction().transaction_id()) { + const Status s = STATUS(TryAgain, "Table is undergoing DDL transaction verification"); + return SetupError(resp->mutable_error(), MasterErrorPB::TABLE_SCHEMA_CHANGE_IN_PROGRESS, s); + } + } + bool has_changes = false; auto& table_pb = l.mutable_data()->pb; const TableName table_name = l->name(); @@ -6215,6 +6297,9 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req, // Calculate new schema for the on-disk state, not persisted yet. Schema new_schema; + Schema previous_schema; + RETURN_NOT_OK(SchemaFromPB(l->pb.schema(), &previous_schema)); + string previous_table_name = l->pb.name(); ColumnId next_col_id = ColumnId(l->pb.next_column_id()); if (req->alter_schema_steps_size() || req->has_alter_properties()) { TRACE("Apply alter schema"); @@ -6346,29 +6431,8 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req, SysTablesEntryPB::ALTERING, Substitute("Alter table version=$0 ts=$1", table_pb.version(), LocalTimeAsString())); - // Update sys-catalog with the new table schema. - TRACE("Updating metadata on disk"); - std::vector ddl_log_entry_pointers; - ddl_log_entry_pointers.reserve(ddl_log_entries.size()); - for (const auto& entry : ddl_log_entries) { - ddl_log_entry_pointers.push_back(&entry); - } - Status s = sys_catalog_->Upsert(leader_ready_term(), ddl_log_entry_pointers, table); - if (!s.ok()) { - s = s.CloneAndPrepend( - Substitute("An error occurred while updating sys-catalog tables entry: $0", - s.ToString())); - LOG(WARNING) << s.ToString(); - if (table->GetTableType() != PGSQL_TABLE_TYPE && - (req->has_new_namespace() || req->has_new_table_name())) { - LockGuard lock(mutex_); - VLOG_WITH_FUNC(3) << "Acquired the catalog manager lock"; - CHECK_EQ(table_names_map_.erase({new_namespace_id, new_table_name}), 1); - } - // TableMetadaLock follows RAII paradigm: when it leaves scope, - // 'l' will be unlocked, and the mutation will be aborted. - return CheckIfNoLongerLeaderAndSetupError(s, resp); - } + RETURN_NOT_OK(UpdateSysCatalogWithNewSchema( + table, ddl_log_entries, new_namespace_id, new_table_name, resp)); // Remove the old name. Not present if PGSQL. if (table->GetTableType() != PGSQL_TABLE_TYPE && @@ -6379,10 +6443,37 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req, table_names_map_.erase({namespace_id, table_name}); } + // Update a task to rollback alter if the corresponding YSQL transaction + // rolls back. + TransactionMetadata txn; + bool schedule_ysql_txn_verifier = false; + if (!FLAGS_ysql_ddl_rollback_enabled) { + // If DDL rollback is no longer enabled, make sure that there is no transaction + // verification state present. + table_pb.clear_ysql_ddl_txn_verifier_state(); + } else if (req->has_transaction() && table->GetTableType() == PGSQL_TABLE_TYPE && + !table->colocated()) { + if (!l->has_ysql_ddl_txn_verifier_state()) { + table_pb.mutable_transaction()->CopyFrom(req->transaction()); + auto *ddl_state = table_pb.add_ysql_ddl_txn_verifier_state(); + SchemaToPB(previous_schema, ddl_state->mutable_previous_schema()); + ddl_state->set_previous_table_name(previous_table_name); + schedule_ysql_txn_verifier = true; + } + txn = VERIFY_RESULT(TransactionMetadata::FromPB(req->transaction())); + RSTATUS_DCHECK(!txn.status_tablet.empty(), Corruption, "Given incomplete Transaction"); + DCHECK_EQ(table_pb.ysql_ddl_txn_verifier_state_size(), 1); + table_pb.mutable_ysql_ddl_txn_verifier_state(0)->set_contains_alter_table_op(true); + } + // Update the in-memory state. TRACE("Committing in-memory state"); l.Commit(); + // Verify Transaction gets committed, which occurs after table alter finishes. + if (schedule_ysql_txn_verifier) { + ScheduleYsqlTxnVerification(table, txn); + } RETURN_NOT_OK(SendAlterTableRequest(table, req)); // Increment transaction status version if needed. @@ -6395,6 +6486,38 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req, return Status::OK(); } +Status CatalogManager::UpdateSysCatalogWithNewSchema( + const scoped_refptr& table, + const std::vector& ddl_log_entries, + const string& new_namespace_id, + const string& new_table_name, + AlterTableResponsePB* resp) { + TRACE("Updating metadata on disk"); + std::vector ddl_log_entry_pointers; + ddl_log_entry_pointers.reserve(ddl_log_entries.size()); + for (const auto& entry : ddl_log_entries) { + ddl_log_entry_pointers.push_back(&entry); + } + Status s = sys_catalog_->Upsert(leader_ready_term(), ddl_log_entry_pointers, table); + if (!s.ok()) { + s = s.CloneAndPrepend( + Substitute("An error occurred while updating sys-catalog tables entry: $0", + s.ToString())); + LOG(WARNING) << s.ToString(); + if (table->GetTableType() != PGSQL_TABLE_TYPE && + (!new_namespace_id.empty() || !new_table_name.empty())) { + LockGuard lock(mutex_); + VLOG_WITH_FUNC(3) << "Acquired the catalog manager lock"; + CHECK_EQ(table_names_map_.erase({new_namespace_id, new_table_name}), 1); + } + if (resp) + return CheckIfNoLongerLeaderAndSetupError(s, resp); + + return CheckIfNoLongerLeader(s); + } + return Status::OK(); +} + Status CatalogManager::IsAlterTableDone(const IsAlterTableDoneRequestPB* req, IsAlterTableDoneResponsePB* resp) { // 1. Lookup the table and verify if it exists. @@ -6413,6 +6536,136 @@ Status CatalogManager::IsAlterTableDone(const IsAlterTableDoneRequestPB* req, return Status::OK(); } +void CatalogManager::ScheduleYsqlTxnVerification(const scoped_refptr& table, + const TransactionMetadata& txn) { + auto l = table->LockForRead(); + LOG(INFO) << "Enqueuing table for DDL transaction Verification: " << table->name() + << " id: " << table->id() << " schema version: " << l->pb.version(); + std::function when_done = + std::bind(&CatalogManager::YsqlTableSchemaChecker, this, table, + l->pb_transaction_id(), _1); + // For now, just print warning if submission to thread pool fails. Fix this as part of + // #13358. + WARN_NOT_OK(background_tasks_thread_pool_->SubmitFunc( + std::bind(&YsqlTransactionDdl::VerifyTransaction, ysql_transaction_.get(), txn, table, + true /* has_ysql_ddl_state */, when_done)), + "Could not submit VerifyTransaction to thread pool"); +} + +Status CatalogManager::YsqlTableSchemaChecker(scoped_refptr table, + const string& txn_id_pb, + bool txn_rpc_success) { + if (!txn_rpc_success) { + return STATUS_FORMAT(IllegalState, "Failed to find Transaction Status for table $0", + table->ToString()); + } + bool is_committed = VERIFY_RESULT(ysql_transaction_->PgSchemaChecker(table)); + return YsqlDdlTxnCompleteCallback(table, txn_id_pb, is_committed); +} + +Status CatalogManager::YsqlDdlTxnCompleteCallback(scoped_refptr table, + const string& txn_id_pb, + bool success) { + DCHECK(!txn_id_pb.empty()); + DCHECK(table); + const string& id = "table id: " + table->id(); + auto txn = VERIFY_RESULT(FullyDecodeTransactionId(txn_id_pb)); + auto l = table->LockForWrite(); + LOG(INFO) << "YsqlDdlTxnCompleteCallback for " << id + << " for transaction " << txn + << ": Success: " << (success ? "true" : "false") + << " ysql_ddl_txn_verifier_state: " + << l->ysql_ddl_txn_verifier_state().DebugString(); + + if (!l->has_ysql_ddl_txn_verifier_state()) { + // The table no longer has any DDL state to clean up. It was probably cleared by + // another thread, nothing to do. + LOG(INFO) << "YsqlDdlTxnCompleteCallback was invoked but no ysql transaction " + << "verification state found for " << id << " , ignoring"; + return Status::OK(); + } + + if (txn_id_pb != l->pb_transaction_id()) { + // Now the table has transaction state for a different transaction. + // Do nothing. + LOG(INFO) << "YsqlDdlTxnCompleteCallback was invoked for transaction " << txn << " but the " + << "schema contains state for a different transaction"; + return Status::OK(); + } + + if (success && !l->is_being_deleted_by_ysql_ddl_txn()) { + // This transaction was successful. We did not drop this table in this + // transaction. In that case, we have nothing else to do. + LOG(INFO) << "Clearing ysql_ddl_txn_verifier_state from " << id; + auto& pb = l.mutable_data()->pb; + pb.clear_ysql_ddl_txn_verifier_state(); + pb.clear_transaction(); + RETURN_NOT_OK(sys_catalog_->Upsert(leader_ready_term(), table)); + l.Commit(); + return Status::OK(); + } + + if ((success && l->is_being_deleted_by_ysql_ddl_txn()) || + (!success && l->is_being_created_by_ysql_ddl_txn())) { + // This is either a successful DROP operation or a failed CREATE operation. + // In both cases, drop the table. + LOG(INFO) << "Dropping " << id; + l.Commit(); + DeleteTableRequestPB dtreq; + DeleteTableResponsePB dtresp; + + dtreq.mutable_table()->set_table_name(table->name()); + dtreq.mutable_table()->set_table_id(table->id()); + dtreq.set_is_index_table(false); + return DeleteTableInternal(&dtreq, &dtresp, nullptr /* rpc */); + } + + auto& table_pb = l.mutable_data()->pb; + if (!success && l->ysql_ddl_txn_verifier_state().contains_alter_table_op()) { + LOG(INFO) << "Alter transaction " << txn << " failed, rolling back its schema changes"; + std::vector ddl_log_entries; + ddl_log_entries.emplace_back( + master_->clock()->Now(), + table->id(), + l->pb, + "Rollback of DDL Transaction"); + table_pb.mutable_schema()->CopyFrom(table_pb.ysql_ddl_txn_verifier_state(0).previous_schema()); + const string& new_table_name = table_pb.ysql_ddl_txn_verifier_state(0).previous_table_name(); + l.mutable_data()->pb.set_name(new_table_name); + table_pb.set_version(table_pb.version() + 1); + table_pb.set_updates_only_index_permissions(false); + l.mutable_data()->set_state( + SysTablesEntryPB::ALTERING, + Substitute("Alter table version=$0 ts=$1", table_pb.version(), LocalTimeAsString())); + + table_pb.clear_ysql_ddl_txn_verifier_state(); + table_pb.clear_transaction(); + + // Update sys-catalog with the new table schema. + RETURN_NOT_OK(UpdateSysCatalogWithNewSchema( + table, + ddl_log_entries, + "" /* new_namespace_id */, + new_table_name, + nullptr /* resp */)); + l.Commit(); + LOG(INFO) << "Sending Alter Table request as part of rollback for table " << table->name(); + return SendAlterTableRequestInternal(table, TransactionId::Nil()); + } + + // This must be a failed transaction with only a delete operation in it and no alter operations. + // Hence we don't need to do anything other than clear the transaction state. + DCHECK(!success && l->is_being_deleted_by_ysql_ddl_txn()) + << "Unexpected state for table " << table->ToString() << " with transaction verification " + << "state " << l->ysql_ddl_txn_verifier_state().DebugString(); + + table_pb.clear_ysql_ddl_txn_verifier_state(); + table_pb.clear_transaction(); + RETURN_NOT_OK(sys_catalog_->Upsert(leader_ready_term(), table)); + l.Commit(); + return Status::OK(); +} + Result CatalogManager::RegisterNewTabletForSplit( TabletInfo* source_tablet_info, const PartitionPB& partition, TableInfo::WriteLock* table_write_lock, TabletInfo::WriteLock* tablet_write_lock) { @@ -8055,7 +8308,7 @@ void CatalogManager::ProcessPendingNamespace( std::bind(&CatalogManager::VerifyNamespacePgLayer, this, ns, _1); WARN_NOT_OK(background_tasks_thread_pool_->SubmitFunc( std::bind(&YsqlTransactionDdl::VerifyTransaction, ysql_transaction_.get(), - txn, when_done)), + txn, nullptr /* table */, false /* has_ysql_ddl_state */, when_done)), "Could not submit VerifyTransaction to thread pool"); } } else { @@ -9688,9 +9941,7 @@ Status CatalogManager::StartRemoteBootstrap(const StartRemoteBootstrapRequestPB& } Status CatalogManager::SendAlterTableRequest(const scoped_refptr& table, - const AlterTableRequestPB* req) { - auto tablets = table->GetTablets(); - + const AlterTableRequestPB* req) { bool is_ysql_table_with_transaction_metadata = table->GetTableType() == TableType::PGSQL_TABLE_TYPE && req != nullptr && @@ -9719,6 +9970,12 @@ Status CatalogManager::SendAlterTableRequest(const scoped_refptr& tab txn_id = VERIFY_RESULT(FullyDecodeTransactionId(req->transaction().transaction_id())); } + return SendAlterTableRequestInternal(table, txn_id); +} + +Status CatalogManager::SendAlterTableRequestInternal(const scoped_refptr& table, + const TransactionId& txn_id) { + auto tablets = table->GetTablets(); for (const scoped_refptr& tablet : tablets) { auto call = std::make_shared(master_, AsyncTaskPool(), tablet, table, txn_id); table->AddTask(call); diff --git a/src/yb/master/catalog_manager.h b/src/yb/master/catalog_manager.h index 0f0e68e5ec2e..5e1bd96a3456 100644 --- a/src/yb/master/catalog_manager.h +++ b/src/yb/master/catalog_manager.h @@ -348,12 +348,30 @@ class CatalogManager : public tserver::TabletPeerLookupIf, AlterTableResponsePB* resp, rpc::RpcContext* rpc); + Status UpdateSysCatalogWithNewSchema( + const scoped_refptr& table, + const std::vector& ddl_log_entries, + const std::string& new_namespace_id, + const std::string& new_table_name, + AlterTableResponsePB* resp); + // Get the information about an in-progress alter operation. Status IsAlterTableDone(const IsAlterTableDoneRequestPB* req, IsAlterTableDoneResponsePB* resp); Result GetTableNamespaceId(TableId table_id) EXCLUDES(mutex_); + void ScheduleYsqlTxnVerification(const scoped_refptr& table, + const TransactionMetadata& txn); + + Status YsqlTableSchemaChecker(scoped_refptr table, + const std::string& txn_id_pb, + bool txn_rpc_success); + + Status YsqlDdlTxnCompleteCallback(scoped_refptr table, + const std::string& txn_id_pb, + bool success); + // Get the information about the specified table. Status GetTableSchema(const GetTableSchemaRequestPB* req, GetTableSchemaResponsePB* resp) override; @@ -1303,6 +1321,9 @@ class CatalogManager : public tserver::TabletPeerLookupIf, Status SendAlterTableRequest(const scoped_refptr& table, const AlterTableRequestPB* req = nullptr); + Status SendAlterTableRequestInternal(const scoped_refptr& table, + const TransactionId& txn_id); + // Start the background task to send the CopartitionTable() RPC to the leader for this // tablet. void SendCopartitionTabletRequest(const scoped_refptr& tablet, diff --git a/src/yb/master/master-path-handlers.cc b/src/yb/master/master-path-handlers.cc index 7c02a7723b81..ae7a05a4f911 100644 --- a/src/yb/master/master-path-handlers.cc +++ b/src/yb/master/master-path-handlers.cc @@ -1267,7 +1267,7 @@ void MasterPathHandlers::HandleTablePage(const Webserver::WebRequest& req, "Unable to determine Tablespace information."); *output << " Unable to determine Tablespace information."; } - *output << " \n \n"; + *output << " \n"; } else { // The table was associated with a tablespace, but that tablespace was not found. *output << " Replication Info:"; @@ -1279,7 +1279,41 @@ void MasterPathHandlers::HandleTablePage(const Webserver::WebRequest& req, << " used to refresh it is disabled."; } - *output << " \n \n"; + *output << " \n"; + } + + if (l->has_ysql_ddl_txn_verifier_state()) { + auto result = FullyDecodeTransactionId(l->pb.transaction().transaction_id()); + *output << " Verifying Ysql DDL Transaction: "; + if (result) + *output << result.get(); + else + *output << "Failed to decode transaction with error:" << result; + *output << " \n"; + + const bool contains_alter = l->pb.ysql_ddl_txn_verifier_state(0).contains_alter_table_op(); + *output << " Ysql DDL transaction Operations: " + << (l->is_being_created_by_ysql_ddl_txn() ? "Create " : "") + << (contains_alter ? " Alter " : "") + << (l->is_being_deleted_by_ysql_ddl_txn() ? "Delete" : "") + << " \n"; + if (contains_alter && !l->is_being_created_by_ysql_ddl_txn()) { + *output << " Previous table name: " + << l->pb.ysql_ddl_txn_verifier_state(0).previous_table_name() + << " \n \n"; + Schema previous_schema; + Status s = + SchemaFromPB(l->pb.ysql_ddl_txn_verifier_state(0).previous_schema(), &previous_schema); + if (s.ok()) { + *output << " Previous Schema\n"; + server::HtmlOutputSchemaTable(previous_schema, output); + *output << " Current Schema\n"; + } + } else { + *output << "\n"; + } + } else { + *output << "\n"; } Status s = SchemaFromPB(l->pb.schema(), &schema); diff --git a/src/yb/master/master_ddl.proto b/src/yb/master/master_ddl.proto index b103fc28a5f3..35d869c24a23 100644 --- a/src/yb/master/master_ddl.proto +++ b/src/yb/master/master_ddl.proto @@ -295,6 +295,7 @@ message GetBackfillJobsResponsePB { message DeleteTableRequestPB { required TableIdentifierPB table = 1; optional bool is_index_table = 2 [ default = false ]; + optional TransactionMetadataPB transaction = 3; } message DeleteTableResponsePB { diff --git a/src/yb/master/master_types.proto b/src/yb/master/master_types.proto index d1def33298dd..ede2787a63f7 100644 --- a/src/yb/master/master_types.proto +++ b/src/yb/master/master_types.proto @@ -141,6 +141,8 @@ message MasterErrorPB { TABLET_NOT_RUNNING = 38; TABLE_NOT_RUNNING = 39; + + TABLE_SCHEMA_CHANGE_IN_PROGRESS = 40; } // The error code. diff --git a/src/yb/master/ysql_transaction_ddl.cc b/src/yb/master/ysql_transaction_ddl.cc index b7db9cd6fa4e..d6ef3a599a49 100644 --- a/src/yb/master/ysql_transaction_ddl.cc +++ b/src/yb/master/ysql_transaction_ddl.cc @@ -18,8 +18,6 @@ #include "yb/common/ql_expr.h" #include "yb/common/wire_protocol.h" -#include "yb/docdb/doc_rowwise_iterator.h" - #include "yb/gutil/casts.h" #include "yb/master/sys_catalog.h" @@ -40,6 +38,9 @@ DEFINE_UNKNOWN_int32(ysql_transaction_bg_task_wait_ms, 200, "Amount of time the catalog manager background task thread waits " "between runs"); +using std::string; +using std::vector; + namespace yb { namespace master { @@ -48,9 +49,78 @@ YsqlTransactionDdl::~YsqlTransactionDdl() { rpcs_.Shutdown(); } +Result YsqlTransactionDdl::PgEntryExists(TableId pg_table_id, Result entry_oid, + TableId relfilenode_oid) { + vector col_names = {"oid"}; + bool is_matview = relfilenode_oid.empty() ? false : true; + if (is_matview) { + col_names.emplace_back("relfilenode"); + } + + Schema projection; + auto iter = VERIFY_RESULT(GetPgCatalogTableScanIterator(pg_table_id, + "oid", VERIFY_RESULT(std::move(entry_oid)), std::move(col_names), &projection)); + + // If no rows found, the entry does not exist. + if (!VERIFY_RESULT(iter->HasNext())) { + return false; + } + + // The entry exists. Expect only one row. + QLTableRow row; + RETURN_NOT_OK(iter->NextRow(&row)); + CHECK(!VERIFY_RESULT(iter->HasNext())); + if (is_matview) { + const auto relfilenode_col_id = VERIFY_RESULT(projection.ColumnIdByName("relfilenode")).rep(); + const auto& relfilenode = row.GetValue(relfilenode_col_id); + if (relfilenode->uint32_value() != VERIFY_RESULT(GetPgsqlTableOid(relfilenode_oid))) { + return false; + } + } + return true; +} + +Result> +YsqlTransactionDdl::GetPgCatalogTableScanIterator(const TableId& pg_catalog_table_id, + const string& oid_col_name, + uint32_t oid_value, + std::vector col_names, + Schema *projection) { + + auto tablet_peer = sys_catalog_->tablet_peer(); + if (!tablet_peer || !tablet_peer->tablet()) { + return STATUS(ServiceUnavailable, "SysCatalog unavailable"); + } + const tablet::Tablet* catalog_tablet = tablet_peer->tablet(); + DCHECK(catalog_tablet->metadata()); + const Schema& schema = + VERIFY_RESULT(catalog_tablet->metadata()->GetTableInfo(pg_catalog_table_id))->schema(); + + // Use Scan to query the given table, filtering by lookup_oid_col. + RETURN_NOT_OK(schema.CreateProjectionByNames(col_names, projection, schema.num_key_columns())); + const auto oid_col_id = VERIFY_RESULT(projection->ColumnIdByName(oid_col_name)).rep(); + auto iter = VERIFY_RESULT(catalog_tablet->NewRowIterator( + projection->CopyWithoutColumnIds(), {} /* read_hybrid_time */, pg_catalog_table_id)); + + auto doc_iter = down_cast(iter.get()); + PgsqlConditionPB cond; + cond.add_operands()->set_column_id(oid_col_id); + cond.set_op(QL_OP_EQUAL); + cond.add_operands()->mutable_value()->set_uint32_value(oid_value); + const std::vector empty_key_components; + docdb::DocPgsqlScanSpec spec( + *projection, rocksdb::kDefaultQueryId, empty_key_components, empty_key_components, + &cond, boost::none /* hash_code */, boost::none /* max_hash_code */, nullptr /* where */); + RETURN_NOT_OK(doc_iter->Init(spec)); + return iter; +} + void YsqlTransactionDdl::VerifyTransaction( const TransactionMetadata& transaction_metadata, + scoped_refptr table, + bool has_ysql_ddl_txn_state, std::function complete_callback) { + SleepFor(MonoDelta::FromMilliseconds(FLAGS_ysql_transaction_bg_task_wait_ms)); YB_LOG_EVERY_N_SECS(INFO, 1) << "Verifying Transaction " << transaction_metadata; @@ -74,30 +144,41 @@ void YsqlTransactionDdl::VerifyTransaction( // We need to query the TransactionCoordinator here. Can't use TransactionStatusResolver in // TransactionParticipant since this TransactionMetadata may not have any actual data flushed yet. *rpc_handle = client::GetTransactionStatus( - TransactionRpcDeadline(), - nullptr /* tablet */, - client, - &req, - [this, rpc_handle, transaction_metadata, complete_callback] - (Status status, const tserver::GetTransactionStatusResponsePB& resp) { - auto retained = rpcs_.Unregister(rpc_handle); - TransactionReceived(transaction_metadata, complete_callback, std::move(status), resp); - }); + TransactionRpcDeadline(), + nullptr /* tablet */, + client, + &req, + [this, rpc_handle, transaction_metadata, table, has_ysql_ddl_txn_state, complete_callback] + (Status status, const tserver::GetTransactionStatusResponsePB& resp) { + auto retained = rpcs_.Unregister(rpc_handle); + TransactionReceived(transaction_metadata, table, has_ysql_ddl_txn_state, + complete_callback, std::move(status), resp); + }); (**rpc_handle).SendRpc(); } void YsqlTransactionDdl::TransactionReceived( const TransactionMetadata& transaction, + scoped_refptr table, + bool has_ysql_ddl_txn_state, std::function complete_callback, Status txn_status, const tserver::GetTransactionStatusResponsePB& resp) { + if (has_ysql_ddl_txn_state) { + // This was invoked for a table for which YSQL DDL rollback is enabled. Verify that it + // contains ysql_ddl_txn_state even now. + DCHECK(table->LockForRead()->has_ysql_ddl_txn_verifier_state()); + } + if (!txn_status.ok()) { LOG(WARNING) << "Transaction Status attempt (" << transaction.ToString() << ") failed with status " << txn_status; WARN_NOT_OK(thread_pool_->SubmitFunc([complete_callback] () { WARN_NOT_OK(complete_callback(false /* txn_rpc_success */), "Callback failure"); }), "Failed to enqueue callback"); + return; // #5981: Improve failure handling to retry transient errors or recognize transaction complete. - } else if (resp.has_error()) { + } + if (resp.has_error()) { const Status s = StatusFromPB(resp.error().status()); const tserver::TabletServerErrorPB::Code code = resp.error().code(); LOG(WARNING) << "Transaction Status attempt (" << transaction.ToString() @@ -107,86 +188,217 @@ void YsqlTransactionDdl::TransactionReceived( WARN_NOT_OK(complete_callback(false /* txn_rpc_success */), "Callback failure"); }), "Failed to enqueue callback"); // #5981: Maybe have the same heuristic as above? - } else { - YB_LOG_EVERY_N_SECS(INFO, 1) << "Got Response for " << transaction.ToString() - << ", resp: " << resp.ShortDebugString(); - bool is_pending = (resp.status_size() == 0); - for (int i = 0; i < resp.status_size() && !is_pending; ++i) { - // NOTE: COMMITTED state is also "pending" because we need APPLIED. - is_pending = resp.status(i) == TransactionStatus::PENDING || - resp.status(i) == TransactionStatus::COMMITTED; - } - if (is_pending) { - // Re-enqueue if transaction is still pending. - WARN_NOT_OK(thread_pool_->SubmitFunc( - std::bind(&YsqlTransactionDdl::VerifyTransaction, this, transaction, complete_callback)), - "Could not submit VerifyTransaction to thread pool"); - } else { - // If this transaction isn't pending, then the transaction is in a terminal state. - // Note: We ignore the resp.status() now, because it could be ABORT'd but actually a SUCCESS. - WARN_NOT_OK(thread_pool_->SubmitFunc([complete_callback] () { - WARN_NOT_OK(complete_callback(true /* txn_rpc_success */), "Callback failure"); - }), "Failed to enqueue callback"); - } + return; } + YB_LOG_EVERY_N_SECS(INFO, 1) << "Got Response for " << transaction.ToString() + << ", resp: " << resp.ShortDebugString(); + bool is_pending = (resp.status_size() == 0); + for (int i = 0; i < resp.status_size() && !is_pending; ++i) { + // NOTE: COMMITTED state is also "pending" because we need APPLIED. + is_pending = resp.status(i) == TransactionStatus::PENDING || + resp.status(i) == TransactionStatus::COMMITTED; + } + if (is_pending) { + // Re-enqueue if transaction is still pending. + WARN_NOT_OK(thread_pool_->SubmitFunc( + std::bind(&YsqlTransactionDdl::VerifyTransaction, this, + transaction, table, has_ysql_ddl_txn_state, complete_callback)), + "Could not submit VerifyTransaction to thread pool"); + return; + } + // If this transaction isn't pending, then the transaction is in a terminal state. + // Note: We ignore the resp.status() now, because it could be ABORT'd but actually a SUCCESS. + // Determine whether the transaction was a success by comparing with the PG schema. + WARN_NOT_OK(thread_pool_->SubmitFunc([complete_callback] () { + WARN_NOT_OK(complete_callback(true /* txn_rpc_success */), "Callback failure"); + }), "Failed to enqueue callback"); } -Result YsqlTransactionDdl::PgEntryExists(TableId pg_table_id, Result entry_oid, - TableId relfilenode_oid) { - auto tablet_peer = sys_catalog_->tablet_peer(); - if (!tablet_peer) { - return STATUS(ServiceUnavailable, "SysCatalog unavailable"); +Result YsqlTransactionDdl::PgSchemaChecker(const scoped_refptr& table) { + const uint32_t database_oid = VERIFY_RESULT(GetPgsqlDatabaseOidByTableId(table->id())); + const auto& pg_catalog_table_id = GetPgsqlTableId(database_oid, kPgClassTableOid); + + Schema projection; + uint32_t oid = VERIFY_RESULT(GetPgsqlTableOid(table->id())); + auto iter = VERIFY_RESULT(GetPgCatalogTableScanIterator(pg_catalog_table_id, + "oid" /* oid_col_name */, + oid, + {"oid", "relname"}, + &projection)); + + QLTableRow row; + auto l = table->LockForRead(); + if (!l->has_ysql_ddl_txn_verifier_state()) { + // The table no longer has transaction verifier state on it, it was probably cleaned up + // concurrently. + return STATUS_FORMAT(Aborted, "Not performing transaction verification for table $0 as it no " + "longer has any transaction verification state", table->ToString()); + } + // Table not found in pg_class. This can only happen in two cases: Table creation failed, + // or a table deletion went through successfully. + if (!VERIFY_RESULT(iter->HasNext())) { + if (l->is_being_deleted_by_ysql_ddl_txn()) { + return true; + } + CHECK(l->is_being_created_by_ysql_ddl_txn()); + return false; } - auto shared_tablet = VERIFY_RESULT(tablet_peer->shared_tablet_safe()); - const tablet::Tablet* catalog_tablet = shared_tablet.get(); - const Schema& pg_database_schema = - VERIFY_RESULT(catalog_tablet->metadata()->GetTableInfo(pg_table_id))->schema(); - bool is_matview = relfilenode_oid.empty() ? false : true; - std::vector col_names; + // Table found in pg_class. + if (l->is_being_deleted_by_ysql_ddl_txn()) { + LOG(INFO) << "Ysql Drop transaction for " << table->ToString() + << " detected to have failed as table found " + << "in PG catalog"; + return false; + } - if (is_matview) { - col_names = {"oid", "relfilenode"}; - } else { - col_names = {"oid"}; + if (l->is_being_created_by_ysql_ddl_txn()) { + return true; + } + + // Table was being altered. Check whether its current DocDB schema matches + // that of PG catalog. + CHECK(l->ysql_ddl_txn_verifier_state().contains_alter_table_op()); + RETURN_NOT_OK(iter->NextRow(&row)); + const auto relname_col_id = VERIFY_RESULT(projection.ColumnIdByName("relname")).rep(); + const auto& relname_col = row.GetValue(relname_col_id); + const string& table_name = relname_col->string_value(); + + const string fail_msg = "Alter transaction on " + table->ToString() + " failed."; + if (table->name().compare(table_name) != 0) { + // Table name does not match. + LOG(INFO) << fail_msg << Format(" Expected table name: $0 Table name in PG: $1", + table->name(), table_name); + CHECK_EQ(table_name, l->ysql_ddl_txn_verifier_state().previous_table_name()); + return false; } - // Use Scan to query the 'pg_database' table, filtering by our 'oid'. + vector pg_cols = VERIFY_RESULT(ReadPgAttribute(table)); + // In DocDB schema, columns are sorted based on 'order'. + sort(pg_cols.begin(), pg_cols.end(), [](const auto& lhs, const auto& rhs) { + return lhs.order < rhs.order; + }); + + Schema schema; + RETURN_NOT_OK(table->GetSchema(&schema)); + if (MatchPgDocDBSchemaColumns(table, schema, pg_cols)) { + // The PG catalog schema matches the current DocDB schema. The transaction was a success. + return true; + } + + Schema previous_schema; + RETURN_NOT_OK(SchemaFromPB(l->ysql_ddl_txn_verifier_state().previous_schema(), &previous_schema)); + if (MatchPgDocDBSchemaColumns(table, previous_schema, pg_cols)) { + // The PG catalog schema matches the DocDB schema of the table prior to this transaction. The + // transaction must have aborted. + return false; + } + + // The PG catalog schema does not match either the current schema nor the previous schema. This + // is an unexpected state, do nothing. + return STATUS_FORMAT(IllegalState, "Failed to verify transaction for table $0", + table->ToString()); +} + +bool YsqlTransactionDdl::MatchPgDocDBSchemaColumns( + const scoped_refptr& table, + const Schema& schema, + const vector& pg_cols) { + + const string& fail_msg = "Schema mismatch for table " + table->ToString(); + const std::vector& columns = schema.columns(); + + size_t i = 0; + for (const auto& col : columns) { + // 'ybrowid' is a column present only in DocDB. Skip it. + if (col.name() == "ybrowid") { + continue; + } + + if (i >= pg_cols.size()) { + LOG(INFO) << fail_msg << Format(" Expected num_columns: $0 num_columns in PG: $1", + columns.size(), pg_cols.size()); + return false; + } + + if (col.name().compare(pg_cols[i].attname) != 0) { + LOG(INFO) << fail_msg << Format(" Expected column name with attnum: $0 is :$1" + " but column name at PG is $2", pg_cols[i].order, col.name(), pg_cols[i].attname); + return false; + } + + // Verify whether attnum matches. + if (col.order() != pg_cols[i].order) { + LOG(INFO) << fail_msg << Format(" At index $0 expected attnum is $1 but actual attnum is $2", + i, col.order(), pg_cols[i].order); + return false; + } + i++; + } + + return true; +} + +Result> +YsqlTransactionDdl::ReadPgAttribute(scoped_refptr table) { + // Build schema using values read from pg_attribute. + auto tablet_peer = sys_catalog_->tablet_peer(); + const tablet::TabletPtr tablet = tablet_peer->shared_tablet(); + + const uint32_t database_oid = VERIFY_RESULT(GetPgsqlDatabaseOidByTableId(table->id())); + const uint32_t table_oid = VERIFY_RESULT(GetPgsqlTableOid(table->id())); + const auto& pg_attribute_table_id = GetPgsqlTableId(database_oid, kPgAttributeTableOid); + Schema projection; - RETURN_NOT_OK(pg_database_schema.CreateProjectionByNames(col_names, &projection, - pg_database_schema.num_key_columns())); - const auto oid_col_id = VERIFY_RESULT(projection.ColumnIdByName("oid")).rep(); - auto iter = VERIFY_RESULT(catalog_tablet->NewRowIterator( - projection.CopyWithoutColumnIds(), {} /* read_hybrid_time */, pg_table_id)); - auto e_oid_val = VERIFY_RESULT(std::move(entry_oid)); - { - auto doc_iter = down_cast(iter.get()); - PgsqlConditionPB cond; - cond.add_operands()->set_column_id(oid_col_id); - cond.set_op(QL_OP_EQUAL); - cond.add_operands()->mutable_value()->set_uint32_value(e_oid_val); - const std::vector empty_key_components; - docdb::DocPgsqlScanSpec spec( - projection, rocksdb::kDefaultQueryId, empty_key_components, empty_key_components, - &cond, boost::none /* hash_code */, boost::none /* max_hash_code */, nullptr /* where */); - RETURN_NOT_OK(doc_iter->Init(spec)); - } - - // Expect exactly one row, which means the transaction was a success. - QLTableRow row; - if (VERIFY_RESULT(iter->HasNext())) { + uint32_t oid = VERIFY_RESULT(GetPgsqlTableOid(table->id())); + auto iter = VERIFY_RESULT(GetPgCatalogTableScanIterator( + pg_attribute_table_id, + "attrelid" /* col_name */, + oid, + {"attrelid", "attnum", "attname", "atttypid"}, + &projection)); + + const auto attname_col_id = VERIFY_RESULT(projection.ColumnIdByName("attname")).rep(); + const auto atttypid_col_id = VERIFY_RESULT(projection.ColumnIdByName("atttypid")).rep(); + const auto attnum_col_id = VERIFY_RESULT(projection.ColumnIdByName("attnum")).rep(); + + vector pg_cols; + while (VERIFY_RESULT(iter->HasNext())) { + QLTableRow row; RETURN_NOT_OK(iter->NextRow(&row)); - if (is_matview) { - const auto relfilenode_col_id = VERIFY_RESULT(projection.ColumnIdByName("relfilenode")).rep(); - const auto& relfilenode = row.GetValue(relfilenode_col_id); - if (relfilenode->uint32_value() != VERIFY_RESULT(GetPgsqlTableOid(relfilenode_oid))) { - return false; - } - return true; + + const auto& attname_col = row.GetValue(attname_col_id); + const auto& atttypid_col = row.GetValue(atttypid_col_id); + const auto& attnum_col = row.GetValue(attnum_col_id); + if (!attname_col || !atttypid_col || !attnum_col) { + std::string corrupted_col = + !attname_col ? "attname" : !atttypid_col ? "atttypid" : "attnum"; + return STATUS_FORMAT( + Corruption, + "Could not read $0 column from pg_attribute for attrelid: $1 database_oid: $2", + corrupted_col, table_oid, database_oid); } - return true; + + const int32_t attnum = attnum_col->int16_value(); + if (attnum < 0) { + // Ignore system columns. + VLOG(3) << "Ignoring system column (attnum = " << attnum_col->int16_value() + << ") for attrelid:" << table_oid; + continue; + } + string attname = attname_col->string_value(); + uint32_t atttypid = atttypid_col->uint32_value(); + if (atttypid == 0) { + // Ignore dropped columns. + VLOG(3) << "Ignoring dropped column " << attname << " (atttypid = 0)" + << " for attrelid:" << table_oid; + continue; + } + VLOG(3) << "attrelid: " << table_oid << " attname: " << attname << " atttypid: " << atttypid; + pg_cols.emplace_back(attnum, attname); } - return false; + + return pg_cols; } } // namespace master diff --git a/src/yb/master/ysql_transaction_ddl.h b/src/yb/master/ysql_transaction_ddl.h index 699f9598b614..4bb653465687 100644 --- a/src/yb/master/ysql_transaction_ddl.h +++ b/src/yb/master/ysql_transaction_ddl.h @@ -19,7 +19,11 @@ #include "yb/client/client_fwd.h" #include "yb/common/entity_ids.h" +#include "yb/common/transaction.h" +#include "yb/docdb/doc_rowwise_iterator.h" + +#include "yb/master/catalog_entity_info.h" #include "yb/master/master_fwd.h" #include "yb/rpc/rpc.h" @@ -34,7 +38,41 @@ class GetTransactionStatusResponsePB; namespace master { +/* + * Currently the metadata for YSQL Tables is stored in both the PG catalog and DocDB schema. + * This class helps maintain consistency between the two schemas. When a DDL transaction fails, + * since the PG catalog is modified using the DocDB transactions framework, the changes to the + * PG catalog are automatically rolled-back. This class helps perform similar rollback for the + * DocDB schema upon transaction failure. + * When a DDL transaction modifies the DocDB schema, the transaction metadata, and the current + * schema of the table is stored in the SysTablesEntryPB. At this point, future DDL operations on + * this table are blocked until verification is complete. A poller is scheduled through this + * class. It monitors whether the transaction is complete. Once the transaction is detected to + * be complete, it compares the PG catalog and the DocDB schema and finds whether the transaction + * is a success or a failure. + * Based on whether the transaction was a success or failure, the CatalogManager will effect + * rollback or roll-forward of the DocDB schema. + * + * Note that the above protocol introduces eventual consistency between the two types of metadata. + * However this mostly not affect correctness because of the following two properties: + * 1) When inconsistent, the DocDB schema always has more columns/constraints than PG schema. + * 2) Clients always use the PG schema (which is guaranteed to not return uncommitted state) + * to prepare their read/write requests. + * + * These two properties ensure that we can't have orphaned data or failed integrity checks + * or use DDL entities created by uncommitted transactions. + */ + class YsqlTransactionDdl { + struct PgColumnFields { + // Order determines the order in which the columns were created. This is equal to the + // 'attnum' field in the pg_attribute table in PG catalog. + int order; + std::string attname; + + PgColumnFields(int attnum, std::string name) : order(attnum), attname(name) {} + }; + public: YsqlTransactionDdl( const SysCatalogTable* sys_catalog, std::shared_future client_future, @@ -49,15 +87,38 @@ class YsqlTransactionDdl { } void VerifyTransaction(const TransactionMetadata& transaction, + scoped_refptr table, + bool has_ysql_ddl_txn_state, std::function complete_callback); + Result PgEntryExists(TableId tableId, Result entry_oid, TableId relfilenode_oid); + Result PgSchemaChecker(const scoped_refptr& table); + protected: void TransactionReceived(const TransactionMetadata& transaction, + scoped_refptr table, + bool has_ysql_ddl_txn_state, std::function complete_callback, Status txn_status, const tserver::GetTransactionStatusResponsePB& response); + bool MatchPgDocDBSchemaColumns(const scoped_refptr& table, + const Schema& schema, + const std::vector& pg_cols); + + Result> ReadPgAttribute(scoped_refptr table); + + // Scan table 'pg_table_id' for all rows that satisfy the SQL filter + // 'WHERE old_col_name = oid_value'. Each returned row contains the columns specified in + // 'col_names'. + Result> GetPgCatalogTableScanIterator( + const TableId& pg_catalog_table_id, + const std::string& oid_col_name, + uint32_t oid_value, + std::vector col_names, + Schema *projection); + const SysCatalogTable* sys_catalog_; std::shared_future client_future_; ThreadPool* thread_pool_; diff --git a/src/yb/tserver/pg_client_session.cc b/src/yb/tserver/pg_client_session.cc index 901a700eaf40..eaf0e907c5db 100644 --- a/src/yb/tserver/pg_client_session.cc +++ b/src/yb/tserver/pg_client_session.cc @@ -54,6 +54,7 @@ using std::string; DECLARE_bool(ysql_serializable_isolation_for_ddl_txn); +DECLARE_bool(ysql_ddl_rollback_enabled); namespace yb { namespace tserver { @@ -430,7 +431,13 @@ Status PgClientSession::DropTable( return Status::OK(); } - RETURN_NOT_OK(client().DeleteTable(yb_table_id, true, context->GetClientDeadline())); + const auto* metadata = VERIFY_RESULT(GetDdlTransactionMetadata( + true /* use_transaction */, context->GetClientDeadline())); + // If ddl rollback is enabled, the table will not be deleted now, so we cannot wait for the + // table deletion to complete. The table will be deleted in the background only after the + // transaction has been determined to be a success. + RETURN_NOT_OK(client().DeleteTable(yb_table_id, !FLAGS_ysql_ddl_rollback_enabled, metadata, + context->GetClientDeadline())); table_cache_.Invalidate(yb_table_id); return Status::OK(); } diff --git a/src/yb/yql/pggate/pggate.cc b/src/yb/yql/pggate/pggate.cc index e37209a00ccc..c89020a7f2df 100644 --- a/src/yb/yql/pggate/pggate.cc +++ b/src/yb/yql/pggate/pggate.cc @@ -1053,6 +1053,14 @@ Status PgApiImpl::ExecPostponedDdlStmt(PgStatement *handle) { return STATUS(InvalidArgument, "Invalid statement handle"); } +Status PgApiImpl::ExecDropTable(PgStatement *handle) { + if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_DROP_TABLE)) { + return STATUS(InvalidArgument, "Invalid statement handle"); + } + + return down_cast(handle)->Exec(); +} + Status PgApiImpl::BackfillIndex(const PgObjectId& table_id) { tserver::PgBackfillIndexRequestPB req; table_id.ToPB(req.mutable_table_id()); diff --git a/src/yb/yql/pggate/pggate.h b/src/yb/yql/pggate/pggate.h index ffea9892859e..9762c8c27f7d 100644 --- a/src/yb/yql/pggate/pggate.h +++ b/src/yb/yql/pggate/pggate.h @@ -353,6 +353,8 @@ class PgApiImpl { Status ExecPostponedDdlStmt(PgStatement *handle); + Status ExecDropTable(PgStatement *handle); + Status BackfillIndex(const PgObjectId& table_id); //------------------------------------------------------------------------------------------------ diff --git a/src/yb/yql/pggate/ybc_pg_typedefs.h b/src/yb/yql/pggate/ybc_pg_typedefs.h index c8ea7c25b969..4956bc241baf 100644 --- a/src/yb/yql/pggate/ybc_pg_typedefs.h +++ b/src/yb/yql/pggate/ybc_pg_typedefs.h @@ -343,6 +343,7 @@ typedef struct PgGFlagsAccessor { const uint64_t* ysql_session_max_batch_size; const bool* ysql_sleep_before_retry_on_txn_conflict; const bool* ysql_colocate_database_by_default; + const bool* ysql_ddl_rollback_enabled; } YBCPgGFlagsAccessor; typedef struct YbTablePropertiesData { diff --git a/src/yb/yql/pggate/ybc_pggate.cc b/src/yb/yql/pggate/ybc_pggate.cc index 5eb4242b5071..ccd7b6b84b3d 100644 --- a/src/yb/yql/pggate/ybc_pggate.cc +++ b/src/yb/yql/pggate/ybc_pggate.cc @@ -71,6 +71,8 @@ DECLARE_int32(delay_alter_sequence_sec); DECLARE_int32(client_read_write_timeout_ms); +DECLARE_bool(ysql_ddl_rollback_enabled); + DEFINE_UNKNOWN_bool(ysql_enable_reindex, false, "Enable REINDEX INDEX statement."); TAG_FLAG(ysql_enable_reindex, advanced); @@ -736,6 +738,10 @@ YBCStatus YBCPgExecPostponedDdlStmt(YBCPgStatement handle) { return ToYBCStatus(pgapi->ExecPostponedDdlStmt(handle)); } +YBCStatus YBCPgExecDropTable(YBCPgStatement handle) { + return ToYBCStatus(pgapi->ExecDropTable(handle)); +} + YBCStatus YBCPgBackfillIndex( const YBCPgOid database_oid, const YBCPgOid index_oid) { @@ -1279,7 +1285,8 @@ const YBCPgGFlagsAccessor* YBCGetGFlags() { .ysql_sequence_cache_minval = &FLAGS_ysql_sequence_cache_minval, .ysql_session_max_batch_size = &FLAGS_ysql_session_max_batch_size, .ysql_sleep_before_retry_on_txn_conflict = &FLAGS_ysql_sleep_before_retry_on_txn_conflict, - .ysql_colocate_database_by_default = &FLAGS_ysql_colocate_database_by_default + .ysql_colocate_database_by_default = &FLAGS_ysql_colocate_database_by_default, + .ysql_ddl_rollback_enabled = &FLAGS_ysql_ddl_rollback_enabled }; return &accessor; } diff --git a/src/yb/yql/pggate/ybc_pggate.h b/src/yb/yql/pggate/ybc_pggate.h index 71e0da8bf324..bb752b56e883 100644 --- a/src/yb/yql/pggate/ybc_pggate.h +++ b/src/yb/yql/pggate/ybc_pggate.h @@ -303,6 +303,8 @@ YBCStatus YBCPgNewDropIndex(YBCPgOid database_oid, YBCStatus YBCPgExecPostponedDdlStmt(YBCPgStatement handle); +YBCStatus YBCPgExecDropTable(YBCPgStatement handle); + YBCStatus YBCPgBackfillIndex( const YBCPgOid database_oid, const YBCPgOid index_oid); diff --git a/src/yb/yql/pgwrapper/CMakeLists.txt b/src/yb/yql/pgwrapper/CMakeLists.txt index cf3a213bc356..9fea4ebb6877 100644 --- a/src/yb/yql/pgwrapper/CMakeLists.txt +++ b/src/yb/yql/pgwrapper/CMakeLists.txt @@ -60,7 +60,8 @@ set(PG_WRAPPER_TEST_BASE_SRCS pg_tablet_split_test_base.cc pg_wrapper_test_base.cc) set(PG_WRAPPER_TEST_BASE_DEPS - pq_utils) + pq_utils + yb_client_test_util) ADD_YB_LIBRARY(pg_wrapper_test_base SRCS ${PG_WRAPPER_TEST_BASE_SRCS} DEPS ${PG_WRAPPER_TEST_BASE_DEPS}) @@ -112,6 +113,7 @@ ADD_YB_TEST(geo_transactions_promotion-test) ADD_YB_TEST(pg_explicit_lock-test) ADD_YB_TEST(alter_schema_abort_txn-test) ADD_YB_TEST(pg_cache_refresh-test) +ADD_YB_TEST(pg_ddl_atomicity-test) # This is really a tool, not a test, but uses a lot of existing test infrastructure. ADD_YB_TEST(create_initial_sys_catalog_snapshot) diff --git a/src/yb/yql/pgwrapper/libpq_utils.cc b/src/yb/yql/pgwrapper/libpq_utils.cc index 2fde0c7bcfe7..cd9bbab93da8 100644 --- a/src/yb/yql/pgwrapper/libpq_utils.cc +++ b/src/yb/yql/pgwrapper/libpq_utils.cc @@ -364,6 +364,11 @@ Status PGConn::RollbackTransaction() { return Execute("ROLLBACK"); } +Status PGConn::TestFailDdl(const std::string& ddl_to_fail) { + RETURN_NOT_OK(Execute("SET yb_test_fail_next_ddl=true")); + return Execute(ddl_to_fail); +} + Result PGConn::HasIndexScan(const std::string& query) { return VERIFY_RESULT(HasScanType(query, "Index")) || VERIFY_RESULT(HasScanType(query, "Index Only")); diff --git a/src/yb/yql/pgwrapper/libpq_utils.h b/src/yb/yql/pgwrapper/libpq_utils.h index b8a90a5deadc..1e1f0962ef38 100644 --- a/src/yb/yql/pgwrapper/libpq_utils.h +++ b/src/yb/yql/pgwrapper/libpq_utils.h @@ -134,6 +134,8 @@ class PGConn { Status CommitTransaction(); Status RollbackTransaction(); + Status TestFailDdl(const std::string& ddl_to_fail); + // Would this query use an index [only] scan? Result HasIndexScan(const std::string& query); Result HasScanType(const std::string& query, const std::string expected_scan_type); diff --git a/src/yb/yql/pgwrapper/pg_ddl_atomicity-test.cc b/src/yb/yql/pgwrapper/pg_ddl_atomicity-test.cc new file mode 100644 index 000000000000..66791faaf08a --- /dev/null +++ b/src/yb/yql/pgwrapper/pg_ddl_atomicity-test.cc @@ -0,0 +1,890 @@ +// Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. + +#include "yb/client/client_fwd.h" +#include "yb/client/snapshot_test_util.h" +#include "yb/client/table_info.h" +#include "yb/client/yb_table_name.h" +#include "yb/client/client-test-util.h" + +#include "yb/common/common.pb.h" +#include "yb/common/pgsql_error.h" +#include "yb/common/schema.h" + +#include "yb/master/master_client.pb.h" + +#include "yb/util/async_util.h" +#include "yb/util/backoff_waiter.h" +#include "yb/util/monotime.h" +#include "yb/util/test_thread_holder.h" +#include "yb/util/timestamp.h" +#include "yb/util/tsan_util.h" + +#include "yb/yql/pgwrapper/libpq_test_base.h" +#include "yb/yql/pgwrapper/libpq_utils.h" + +DECLARE_bool(ysql_ddl_rollback_enabled); + +using std::string; +using std::vector; + +namespace yb { +namespace pgwrapper { + +class PgDdlAtomicityTest : public LibPqTestBase { + void UpdateMiniClusterOptions(ExternalMiniClusterOptions* options) override { + options->extra_master_flags.push_back("--ysql_transaction_bg_task_wait_ms=5000"); + } + + protected: + void CreateTable(const string& tablename) { + auto conn = ASSERT_RESULT(Connect()); + ASSERT_OK(conn.Execute(CreateTableStmt(tablename))); + // Sleep until the transaction ddl state is cleared from this table. + // TODO (deepthi): This will no longer be needed after Phase-II. + sleep(2); + } + + string CreateTableStmt(const string& tablename) { + return "CREATE TABLE " + tablename + " (key INT PRIMARY KEY)"; + } + + string RenameTableStmt(const string& tablename) { + return RenameTableStmt(tablename, "foobar"); + } + + string RenameTableStmt(const string& tablename, const string& table_new_name) { + return Format("ALTER TABLE $0 RENAME TO $1", tablename, table_new_name); + } + + string AddColumnStmt(const string& tablename) { + return AddColumnStmt(tablename, "value"); + } + + string AddColumnStmt(const string& tablename, const string& col_name_to_add) { + return Format("ALTER TABLE $0 ADD COLUMN $1 TEXT", tablename, col_name_to_add); + } + + string RenameColumnStmt(const string& tablename) { + return RenameColumnStmt(tablename, "key", "key2"); + } + + string RenameColumnStmt(const string& tablename, + const string& col_to_rename, + const string& col_new_name) { + return Format("ALTER TABLE $0 RENAME COLUMN $1 TO $2", tablename, col_to_rename, col_new_name); + } + + string DropTableStmt(const string& tablename) { + return "DROP TABLE " + tablename; + } + + string db() { + return "yugabyte"; + } + + void RestartMaster() { + LOG(INFO) << "Restarting Master"; + auto master = cluster_->GetLeaderMaster(); + master->Shutdown(); + ASSERT_OK(master->Restart()); + ASSERT_OK(LoggedWaitFor([&]() -> Result { + auto s = cluster_->GetIsMasterLeaderServiceReady(master); + return s.ok(); + }, MonoDelta::FromSeconds(60), "Wait for Master to be ready.")); + } + + void SetFlagOnAllProcessesWithRollingRestart(const string& flag) { + LOG(INFO) << "Restart the cluster and set " << flag; + for (size_t i = 0; i != cluster_->num_masters(); ++i) { + cluster_->master(i)->mutable_flags()->push_back(flag); + } + + RestartMaster(); + + for (size_t i = 0; i != cluster_->num_tablet_servers(); ++i) { + cluster_->tablet_server(i)->mutable_flags()->push_back(flag); + } + for (auto* tserver : cluster_->tserver_daemons()) { + tserver->Shutdown(); + ASSERT_OK(tserver->Restart()); + sleep(5); + } + } + + void VerifySchema(client::YBClient* client, + const string& database_name, + const string& table_name, + const vector& expected_column_names) { + ASSERT_TRUE(ASSERT_RESULT( + CheckIfSchemaMatches(client, database_name, table_name, expected_column_names))); + } + + Result CheckIfSchemaMatches(client::YBClient* client, + const string& database_name, + const string& table_name, + const vector& expected_column_names) { + std::string table_id = VERIFY_RESULT(GetTableIdByTableName(client, database_name, table_name)); + + std::shared_ptr table_info = std::make_shared(); + Synchronizer sync; + RETURN_NOT_OK(client->GetTableSchemaById(table_id, table_info, sync.AsStatusCallback())); + RETURN_NOT_OK(sync.Wait()); + + const auto& columns = table_info->schema.columns(); + if (expected_column_names.size() != columns.size()) { + LOG(INFO) << "Expected " << expected_column_names.size() << " for " << table_name << " but " + << "found " << columns.size() << " columns"; + return false; + } + for (size_t i = 0; i < expected_column_names.size(); ++i) { + if (columns[i].name().compare(expected_column_names[i]) != 0) { + LOG(INFO) << "Expected column " << expected_column_names[i] << " but found " + << columns[i].name(); + return false; + } + } + return true; + } +}; + +TEST_F(PgDdlAtomicityTest, YB_DISABLE_TEST_IN_TSAN(TestDatabaseGC)) { + TableName test_name = "test_pgsql"; + auto client = ASSERT_RESULT(cluster_->CreateClient()); + + auto conn = ASSERT_RESULT(Connect()); + ASSERT_NOK(conn.TestFailDdl("CREATE DATABASE " + test_name)); + + // Verify DocDB Database creation, even though it failed in PG layer. + // 'ysql_transaction_bg_task_wait_ms' setting ensures we can finish this before the GC. + VerifyNamespaceExists(client.get(), test_name); + + // After bg_task_wait, DocDB will notice the PG layer failure because the transaction aborts. + // Confirm that DocDB async deletes the namespace. + VerifyNamespaceNotExists(client.get(), test_name); +} + +TEST_F(PgDdlAtomicityTest, YB_DISABLE_TEST_IN_TSAN(TestCreateDbFailureAndRestartGC)) { + NamespaceName test_name = "test_pgsql"; + auto client = ASSERT_RESULT(cluster_->CreateClient()); + auto conn = ASSERT_RESULT(Connect()); + ASSERT_NOK(conn.TestFailDdl("CREATE DATABASE " + test_name)); + + // Verify DocDB Database creation, even though it fails in PG layer. + // 'ysql_transaction_bg_task_wait_ms' setting ensures we can finish this before the GC. + VerifyNamespaceExists(client.get(), test_name); + + // Restart the master before the BG task can kick in and GC the failed transaction. + RestartMaster(); + + // Re-init client after restart. + client = ASSERT_RESULT(cluster_->CreateClient()); + + // Confirm that Catalog Loader deletes the namespace on master restart. + VerifyNamespaceNotExists(client.get(), test_name); +} + +TEST_F(PgDdlAtomicityTest, YB_DISABLE_TEST_IN_TSAN(TestIndexTableGC)) { + TableName test_name = "test_pgsql_table"; + TableName test_name_idx = test_name + "_idx"; + + auto client = ASSERT_RESULT(cluster_->CreateClient()); + + // Lower the delays so we successfully create this first table. + ASSERT_OK(cluster_->SetFlagOnMasters("ysql_transaction_bg_task_wait_ms", "10")); + + // Create Table that Index will be set on. + auto conn = ASSERT_RESULT(Connect()); + + ASSERT_OK(conn.Execute(CreateTableStmt(test_name))); + + // After successfully creating the first table, set flags to delay the background task. + ASSERT_OK(cluster_->SetFlagOnMasters("ysql_transaction_bg_task_wait_ms", "13000")); + + ASSERT_NOK(conn.TestFailDdl("CREATE INDEX " + test_name_idx + " ON " + test_name + "(key)")); + + // Wait for DocDB index creation, even though it will fail in PG layer. + // 'ysql_transaction_bg_task_wait_ms' setting ensures we can finish this before the GC. + VerifyTableExists(client.get(), db(), test_name_idx, 10); + + // DocDB will notice the PG layer failure because the transaction aborts. + // Confirm that DocDB async deletes the index. + VerifyTableNotExists(client.get(), db(), test_name_idx, 40); +} + +// Class for sanity test. +class PgDdlAtomicitySanityTest : public PgDdlAtomicityTest { + protected: + void UpdateMiniClusterOptions(ExternalMiniClusterOptions* options) override { + options->extra_master_flags.push_back("--ysql_ddl_rollback_enabled=true"); + options->extra_tserver_flags.push_back("--ysql_ddl_rollback_enabled=true"); + } +}; + +TEST_F(PgDdlAtomicitySanityTest, YB_DISABLE_TEST_IN_TSAN(AlterDropTableRollback)) { + TableName rename_table_test = "rename_table_test"; + TableName rename_col_test = "rename_col_test"; + TableName add_col_test = "add_col_test"; + TableName drop_table_test = "drop_table_test"; + + vector tables = { + rename_table_test, rename_col_test, add_col_test, drop_table_test}; + auto client = ASSERT_RESULT(cluster_->CreateClient()); + + auto conn = ASSERT_RESULT(Connect()); + for (size_t ii = 0; ii < tables.size(); ++ii) { + ASSERT_OK(conn.Execute(CreateTableStmt(tables[ii]))); + } + + // Wait for the transaction state left by create table statement to clear. + sleep(5); + + // Deliberately cause failure of the following Alter Table statements. + ASSERT_NOK(conn.TestFailDdl(RenameTableStmt(rename_table_test))); + ASSERT_NOK(conn.TestFailDdl(RenameColumnStmt(rename_col_test))); + ASSERT_NOK(conn.TestFailDdl(AddColumnStmt(add_col_test))); + ASSERT_NOK(conn.TestFailDdl(DropTableStmt(drop_table_test))); + + // Wait for rollback. + sleep(5); + for (size_t ii = 0; ii < tables.size(); ++ii) { + VerifySchema(client.get(), db(), tables[ii], {"key"}); + } + + // Verify that DDL succeeds after rollback is complete. + // TODO: Need to start a different connection here for rename to work as expected until #14395 + // is fixed. + conn = ASSERT_RESULT(Connect()); + ASSERT_OK(conn.Execute(RenameTableStmt(rename_table_test))); + ASSERT_OK(conn.Execute(RenameColumnStmt(rename_col_test))); + ASSERT_OK(conn.Execute(AddColumnStmt(add_col_test))); + ASSERT_OK(conn.Execute(DropTableStmt(drop_table_test))); +} + +TEST_F(PgDdlAtomicitySanityTest, YB_DISABLE_TEST_IN_TSAN(PrimaryKeyRollback)) { + TableName add_pk_table = "add_pk_table"; + TableName drop_pk_table = "drop_pk_table"; + + auto client = ASSERT_RESULT(cluster_->CreateClient()); + auto conn = ASSERT_RESULT(Connect()); + ASSERT_OK(conn.ExecuteFormat("CREATE TABLE $0 (id int)", add_pk_table)); + ASSERT_OK(conn.Execute(CreateTableStmt(drop_pk_table))); + + // Wait for transaction verification on the tables to complete. + sleep(1); + + // Delay rollback. + ASSERT_OK(cluster_->SetFlagOnMasters("ysql_transaction_bg_task_wait_ms", "10000")); + // Fail Alter operation that adds/drops primary key. + ASSERT_NOK(conn.TestFailDdl(Format("ALTER TABLE $0 ADD PRIMARY KEY(id)", add_pk_table))); + ASSERT_NOK(conn.TestFailDdl(Format("ALTER TABLE $0 DROP CONSTRAINT $0_pkey;", drop_pk_table))); + + // Verify presence of temp table created for adding a primary key. + VerifyTableExists(client.get(), db(), add_pk_table + "_temp_old", 10); + VerifyTableExists(client.get(), db(), drop_pk_table + "_temp_old", 10); + + // Verify that the background task detected the failure of the DDL operation and removed the + // temp table. + VerifyTableNotExists(client.get(), db(), add_pk_table + "_temp_old", 40); + VerifyTableNotExists(client.get(), db(), drop_pk_table + "_temp_old", 40); + + // Verify that PK constraint is not present on the table. + ASSERT_OK(conn.Execute("INSERT INTO " + add_pk_table + " VALUES (1), (1)")); + + // Verify that PK constraint is still present on the table. + ASSERT_NOK(conn.Execute("INSERT INTO " + drop_pk_table + " VALUES (1), (1)")); +} + +TEST_F(PgDdlAtomicitySanityTest, YB_DISABLE_TEST_IN_TSAN(DdlRollbackMasterRestart)) { + TableName create_table_test = "create_test"; + TableName rename_table_test = "rename_table_test"; + TableName rename_col_test = "rename_col_test"; + TableName add_col_test = "add_col_test"; + TableName drop_table_test = "drop_test"; + + vector tables_to_create = { + rename_table_test, rename_col_test, add_col_test, drop_table_test}; + + auto client = ASSERT_RESULT(cluster_->CreateClient()); + auto conn = ASSERT_RESULT(Connect()); + for (size_t ii = 0; ii < tables_to_create.size(); ++ii) { + ASSERT_OK(conn.Execute(CreateTableStmt(tables_to_create[ii]))); + } + + // Set ysql_transaction_bg_task_wait_ms so high that table rollback is nearly + // disabled. + ASSERT_OK(cluster_->SetFlagOnMasters("ysql_transaction_bg_task_wait_ms", "100000")); + ASSERT_NOK(conn.TestFailDdl(CreateTableStmt(create_table_test))); + ASSERT_NOK(conn.TestFailDdl(RenameTableStmt(rename_table_test))); + ASSERT_NOK(conn.TestFailDdl(RenameColumnStmt(rename_col_test))); + ASSERT_NOK(conn.TestFailDdl(AddColumnStmt(add_col_test))); + ASSERT_NOK(conn.TestFailDdl(DropTableStmt(drop_table_test))); + + // Verify that table was created on DocDB. + VerifyTableExists(client.get(), db(), create_table_test, 10); + VerifyTableExists(client.get(), db(), drop_table_test, 10); + + // Verify that the tables were altered on DocDB. + VerifySchema(client.get(), db(), "foobar", {"key"}); + VerifySchema(client.get(), db(), rename_col_test, {"key2"}); + VerifySchema(client.get(), db(), add_col_test, {"key", "value"}); + + // Restart the master before the BG task can kick in and GC the failed transaction. + ASSERT_OK(cluster_->SetFlagOnMasters("ysql_transaction_bg_task_wait_ms", "0")); + RestartMaster(); + + // Verify that rollback reflected on all the tables after restart. + client = ASSERT_RESULT(cluster_->CreateClient()); // Reinit the YBClient after restart. + + VerifyTableNotExists(client.get(), db(), create_table_test, 20); + VerifyTableExists(client.get(), db(), rename_table_test, 20); + // Verify all the other tables are unchanged by all of the DDLs. + for (const string& table : tables_to_create) { + ASSERT_OK(LoggedWaitFor([&]() -> Result { + return CheckIfSchemaMatches(client.get(), db(), table, {"key"}); + }, MonoDelta::FromSeconds(30), Format("Wait for rollback for table $0", table))); + } +} + +// TODO (deepthi) : This test is flaky because of #14995. Re-enable it back after #14995 is fixed. +TEST_F(PgDdlAtomicitySanityTest, YB_DISABLE_TEST(FailureRecoveryTest)) { + TableName rename_table_test = "rename_table_test"; + TableName rename_col_test = "rename_col_test"; + TableName add_col_test = "add_col_test"; + TableName drop_table_test = "drop_test"; + + vector tables_to_create = { + rename_table_test, rename_col_test, add_col_test, drop_table_test}; + + auto client = ASSERT_RESULT(cluster_->CreateClient()); + auto conn = ASSERT_RESULT(Connect()); + for (size_t ii = 0; ii < tables_to_create.size(); ++ii) { + ASSERT_OK(conn.Execute(CreateTableStmt(tables_to_create[ii]))); + } + + // Set ysql_transaction_bg_task_wait_ms so high that table rollback is nearly + // disabled. + ASSERT_OK(cluster_->SetFlagOnMasters("ysql_transaction_bg_task_wait_ms", "100000")); + ASSERT_NOK(conn.TestFailDdl(RenameTableStmt(rename_table_test))); + ASSERT_NOK(conn.TestFailDdl(RenameColumnStmt(rename_col_test))); + ASSERT_NOK(conn.TestFailDdl(AddColumnStmt(add_col_test))); + ASSERT_NOK(conn.TestFailDdl(DropTableStmt(drop_table_test))); + + // Verify that table was created on DocDB. + VerifyTableExists(client.get(), db(), drop_table_test, 10); + + // Verify that the tables were altered on DocDB. + VerifySchema(client.get(), db(), "foobar", {"key"}); + VerifySchema(client.get(), db(), rename_col_test, {"key2"}); + VerifySchema(client.get(), db(), add_col_test, {"key", "value"}); + + // Disable DDL rollback. + ASSERT_OK(cluster_->SetFlagOnMasters("ysql_ddl_rollback_enabled", "false")); + ASSERT_OK(cluster_->SetFlagOnTServers("ysql_ddl_rollback_enabled", "false")); + + // Verify that best effort rollback works when ysql_ddl_rollback is disabled. + ASSERT_OK(conn.ExecuteFormat("INSERT INTO $0 VALUES (1)", add_col_test)); + // The following DDL fails because the table already contains data, so it is not possible to + // add a new column with a not null constraint. + ASSERT_NOK(conn.ExecuteFormat("ALTER TABLE $0 ADD COLUMN value2 TEXT NOT NULL")); + // Verify that the above DDL was rolled back by YSQL. + VerifySchema(client.get(), db(), add_col_test, {"key", "value"}); + + // Restart the cluster with DDL rollback disabled. + SetFlagOnAllProcessesWithRollingRestart("--ysql_ddl_rollback_enabled=false"); + // Verify that rollback did not occur even after the restart. + client = ASSERT_RESULT(cluster_->CreateClient()); + VerifyTableExists(client.get(), db(), drop_table_test, 10); + VerifySchema(client.get(), db(), "foobar", {"key"}); + VerifySchema(client.get(), db(), rename_col_test, {"key2"}); + VerifySchema(client.get(), db(), add_col_test, {"key", "value"}); + + // Verify that it is still possible to run DDL on an affected table. + // Tables having unverified transaction state on them can still be altered if the DDL rollback + // is not enabled. + // TODO: Need to start a different connection here for rename to work as expected until #14395 + // is fixed. + conn = ASSERT_RESULT(Connect()); + ASSERT_OK(conn.Execute(RenameTableStmt(rename_table_test, "foobar2"))); + ASSERT_OK(conn.Execute(AddColumnStmt(add_col_test, "value2"))); + + // Re-enable DDL rollback properly with restart. + SetFlagOnAllProcessesWithRollingRestart("--ysql_ddl_rollback_enabled=true"); + + client = ASSERT_RESULT(cluster_->CreateClient()); + conn = ASSERT_RESULT(Connect()); + // The tables with the transaction state on them must have had their state cleared upon restart + // since the flag is now enabled again. + ASSERT_OK(conn.Execute(RenameColumnStmt(rename_col_test))); + ASSERT_OK(conn.Execute(DropTableStmt(drop_table_test))); + + // However add_col_test will be corrupted because we never performed transaction rollback on it. + // It should ideally have only one added column "value2", but we never rolled back the addition of + // "value". + VerifySchema(client.get(), db(), add_col_test, {"key", "value", "value2"}); + + // Add a column to add_col_test which is now corrupted. + ASSERT_OK(conn.Execute(AddColumnStmt(add_col_test, "value3"))); + // Wait for transaction verification to run. + sleep(2); + // However since the PG schema and DocDB schema have become out-of-sync, the above DDL cannot + // be verified. All future DDL operations will now fail. + ASSERT_NOK(conn.Execute(RenameTableStmt(add_col_test, "foobar3"))); + ASSERT_NOK(conn.Execute(RenameColumnStmt(add_col_test))); + ASSERT_NOK(conn.Execute(DropTableStmt(add_col_test))); +} + +class PgDdlAtomicityConcurrentDdlTest : public PgDdlAtomicitySanityTest { + public: + void UpdateMiniClusterOptions(ExternalMiniClusterOptions* options) override { + PgDdlAtomicitySanityTest::UpdateMiniClusterOptions(options); + } + + protected: + bool testFailedDueToTxnVerification(PGConn *conn, const string& cmd) { + Status s = conn->Execute(cmd); + if (s.ok()) { + LOG(ERROR) << "Command " << cmd << " executed successfully when failure expected"; + return false; + } + return s.ToString().find("Table is undergoing DDL transaction verification") != string::npos; + } + + void testConcurrentDDL(const string& stmt1, const string& stmt2) { + // Test that until transaction verification is complete, another DDL operating on + // the same object cannot go through. + auto conn = ASSERT_RESULT(Connect()); + ASSERT_OK(conn.Execute(stmt1)); + // The first DDL was successful, but the second should fail because rollback has + // been delayed using 'ysql_transaction_bg_task_wait_ms'. + auto conn2 = ASSERT_RESULT(Connect()); + ASSERT_TRUE(testFailedDueToTxnVerification(&conn2, stmt2)); + } + + void testConcurrentFailedDDL(const string& stmt1, const string& stmt2) { + // Same as 'testConcurrentDDL' but here the first DDL statement is a failure. However + // other DDLs still cannot happen unless rollback is complete. + auto conn = ASSERT_RESULT(Connect()); + ASSERT_NOK(conn.TestFailDdl(stmt1)); + auto conn2 = ASSERT_RESULT(Connect()); + ASSERT_TRUE(testFailedDueToTxnVerification(&conn2, stmt2)); + } +}; + +TEST_F(PgDdlAtomicityConcurrentDdlTest, YB_DISABLE_TEST_IN_TSAN(ConcurrentDdl)) { + const string kCreateAndAlter = "create_and_alter_test"; + const string kCreateAndDrop = "create_and_drop_test"; + const string kDropAndAlter = "drop_and_alter_test"; + const string kAlterAndAlter = "alter_and_alter_test"; + const string kAlterAndDrop = "alter_and_drop_test"; + + const vector tables_to_create = {kDropAndAlter, kAlterAndAlter, kAlterAndDrop}; + + auto conn = ASSERT_RESULT(Connect()); + for (const auto& table : tables_to_create) { + ASSERT_OK(conn.Execute(CreateTableStmt(table))); + } + + // Wait for YsqlDdlTxnVerifierState to be cleaned up from the tables. + sleep(5); + + // Test that we can't run a second DDL on a table until the YsqlTxnVerifierState on the first + // table is cleared. + ASSERT_OK(cluster_->SetFlagOnMasters("ysql_transaction_bg_task_wait_ms", "60000")); + testConcurrentDDL(CreateTableStmt(kCreateAndAlter), AddColumnStmt(kCreateAndAlter)); + testConcurrentDDL(CreateTableStmt(kCreateAndDrop), DropTableStmt(kCreateAndDrop)); + testConcurrentDDL(AddColumnStmt(kAlterAndDrop), DropTableStmt(kAlterAndDrop)); + testConcurrentDDL(RenameColumnStmt(kAlterAndAlter), RenameTableStmt(kAlterAndAlter)); + + // Test that we can't run a second DDL on a table until the YsqlTxnVerifierState on the first + // table is cleared even if the first DDL was a failure. + testConcurrentFailedDDL(DropTableStmt(kDropAndAlter), RenameColumnStmt(kDropAndAlter)); +} + +class PgDdlAtomicityTxnTest : public PgDdlAtomicitySanityTest { + public: + void runFailingDdlTransaction(const string& ddl_statements) { + // Normally every DDL auto-commits, and thus we usually have only one DDL statement in a + // transaction. However, when DDLs are invoked in a function, all the DDLs will be executed + // in a single atomic transaction if the function itself was invoked as part of a DDL statement. + auto conn = ASSERT_RESULT(Connect()); + ASSERT_OK(conn.ExecuteFormat( + "CREATE OR REPLACE FUNCTION ddl_txn_func() RETURNS TABLE (k INT) AS $$ " + "BEGIN " + ddl_statements + + " CREATE TABLE t AS SELECT (1) AS k;" + // The following statement will fail due to NOT NULL constraint. + " ALTER TABLE t ADD COLUMN v3 int not null;" + " RETURN QUERY SELECT k FROM t;" + " END $$ LANGUAGE plpgsql")); + ASSERT_NOK(conn.Execute("CREATE TABLE txntest AS SELECT k FROM ddl_txn_func()")); + // Sleep 1s for the rollback to complete. + sleep(1); + } + + string table() { + return "test_table"; + } +}; + +TEST_F(PgDdlAtomicityTxnTest, YB_DISABLE_TEST_IN_TSAN(CreateAlterDropTest)) { + string ddl_statements = CreateTableStmt(table()) + "; " + + AddColumnStmt(table()) + "; " + + RenameColumnStmt(table()) + "; " + + RenameTableStmt(table()) + "; " + + DropTableStmt("foobar") + ";"; + runFailingDdlTransaction(ddl_statements); + // Table should not exist. + auto client = ASSERT_RESULT(cluster_->CreateClient()); + VerifyTableNotExists(client.get(), db(), table(), 10); +} + +TEST_F(PgDdlAtomicityTxnTest, YB_DISABLE_TEST_IN_TSAN(CreateDropTest)) { + string ddl_statements = CreateTableStmt(table()) + "; " + + DropTableStmt(table()) + "; "; + runFailingDdlTransaction(ddl_statements); + // Table should not exist. + auto client = ASSERT_RESULT(cluster_->CreateClient()); + VerifyTableNotExists(client.get(), db(), table(), 10); +} + +TEST_F(PgDdlAtomicityTxnTest, YB_DISABLE_TEST_IN_TSAN(CreateAlterTest)) { + string ddl_statements = CreateTableStmt(table()) + "; " + + AddColumnStmt(table()) + "; " + + RenameColumnStmt(table()) + "; " + + RenameTableStmt(table()) + "; "; + runFailingDdlTransaction(ddl_statements); + // Table should not exist. + auto client = ASSERT_RESULT(cluster_->CreateClient()); + VerifyTableNotExists(client.get(), db(), table(), 10); +} + +TEST_F(PgDdlAtomicityTxnTest, YB_DISABLE_TEST_IN_TSAN(AlterDropTest)) { + CreateTable(table()); + string ddl_statements = RenameColumnStmt(table()) + "; " + DropTableStmt(table()) + "; "; + runFailingDdlTransaction(ddl_statements); + + // Table should exist with old schema intact. + auto client = ASSERT_RESULT(cluster_->CreateClient()); + VerifySchema(client.get(), db(), table(), {"key"}); +} + +TEST_F(PgDdlAtomicityTxnTest, YB_DISABLE_TEST_IN_TSAN(AddColRenameColTest)) { + CreateTable(table()); + string ddl_statements = AddColumnStmt(table()) + "; " + RenameColumnStmt(table()) + "; "; + runFailingDdlTransaction(ddl_statements); + + // Table should exist with old schema intact. + auto client = ASSERT_RESULT(cluster_->CreateClient()); + VerifySchema(client.get(), db(), table(), {"key"}); +} + +TEST_F(PgDdlAtomicitySanityTest, YB_DISABLE_TEST_IN_TSAN(DmlWithAddColTest)) { + auto client = ASSERT_RESULT(cluster_->CreateClient()); + const string& table = "dml_with_add_col_test"; + CreateTable(table); + auto conn1 = ASSERT_RESULT(Connect()); + auto conn2 = ASSERT_RESULT(Connect()); + + // Conn1: Begin write to the table. + ASSERT_OK(conn1.Execute("BEGIN")); + ASSERT_OK(conn1.Execute("INSERT INTO " + table + " VALUES (1)")); + + // Conn2: Initiate rollback of the alter. + ASSERT_OK(cluster_->SetFlagOnMasters("ysql_transaction_bg_task_wait_ms", "10000")); + ASSERT_NOK(conn2.TestFailDdl(AddColumnStmt(table))); + + // Conn1: Since we parallely added a column to the table, the add-column operation would have + // detected the distributed transaction locks acquired by this transaction on its tablets and + // aborted it. Add-column operation aborts all ongoing transactions on the table without + // exception as the transaction could now be in an erroneous state having used the schema + // without the newly added column. + ASSERT_NOK(conn1.Execute("COMMIT")); + + // Conn1: Non-transactional insert retry due to Schema version mismatch, + // refreshing the table cache. + ASSERT_OK(conn1.Execute("INSERT INTO " + table + " VALUES (1)")); + + // Conn1: Start new transaction. + ASSERT_OK(conn1.Execute("BEGIN")); + ASSERT_OK(conn1.Execute("INSERT INTO " + table + " VALUES (2)")); + + // Rollback happens while the transaction is in progress. + // Wait for the rollback to complete. + ASSERT_OK(LoggedWaitFor([&]() -> Result { + return CheckIfSchemaMatches(client.get(), db(), table, {"key"}); + }, MonoDelta::FromSeconds(30), "Wait for Add Column to be rolled back.")); + + // Transaction at conn1 succeeds. Normally, drop-column operation would also have aborted all + // ongoing transactions on this table. However this is a drop-column operation initiated as part + // of rollback. The column being dropped by this operation was added as part of an uncommitted + // transaction, so this column would not have been visible to any transaction. It is safe for + // this drop-column operation to operate silently without aborting any transaction. Therefore this + // transaction succeeds. + ASSERT_OK(conn1.Execute("COMMIT")); +} + +// Test that DML transactions concurrent with an aborted DROP TABLE transaction +// can commit successfully (both before and after the rollback is complete).` +TEST_F(PgDdlAtomicitySanityTest, YB_DISABLE_TEST_IN_TSAN(DmlWithDropTableTest)) { + auto client = ASSERT_RESULT(cluster_->CreateClient()); + const string& table = "dml_with_drop_test"; + CreateTable(table); + auto conn1 = ASSERT_RESULT(Connect()); + auto conn2 = ASSERT_RESULT(Connect()); + auto conn3 = ASSERT_RESULT(Connect()); + + // Conn1: Begin write to the table. + ASSERT_OK(conn1.Execute("BEGIN")); + ASSERT_OK(conn1.Execute("INSERT INTO " + table + " VALUES (1)")); + + // Conn2: Also begin write to the table. + ASSERT_OK(conn2.Execute("BEGIN")); + ASSERT_OK(conn2.Execute("INSERT INTO " + table + " VALUES (2)")); + + // Conn3: Initiate rollback of DROP table. + ASSERT_OK(cluster_->SetFlagOnMasters("ysql_transaction_bg_task_wait_ms", "5000")); + ASSERT_NOK(conn3.TestFailDdl(DropTableStmt(table))); + + // Conn1: This should succeed. + ASSERT_OK(conn1.Execute("INSERT INTO " + table + " VALUES (3)")); + ASSERT_OK(conn1.Execute("COMMIT")); + + // Wait for rollback to complete. + sleep(5); + + // Conn2 must also commit successfully. + ASSERT_OK(conn2.Execute("INSERT INTO " + table + " VALUES (4)")); + ASSERT_OK(conn2.Execute("COMMIT")); +} + +class PgDdlAtomicityNegativeTestBase : public PgDdlAtomicitySanityTest { + protected: + void SetUp() override { + LibPqTestBase::SetUp(); + + conn_ = std::make_unique(ASSERT_RESULT(ConnectToDB(kDatabaseName))); + } + + virtual string CreateTableStmt(const string& tablename) { + return PgDdlAtomicitySanityTest::CreateTableStmt(tablename); + } + + virtual string CreateRollbackEnabledTableStmt(const string& tablename) = 0; + + virtual string CreateIndexStmt(const string& tablename, const string& indexname) { + return Format("CREATE INDEX $0 ON $1(key)", indexname, tablename); + } + + // Test function. + void negativeTest(); + void negativeDropTableTxnTest(); + + std::unique_ptr conn_; + string kDatabaseName = "yugabyte"; +}; + +void PgDdlAtomicityNegativeTestBase::negativeTest() { + TableName create_table_test = "create_test"; + TableName create_table_idx = "create_test_idx"; + TableName rename_table_test = "rename_table_test"; + TableName rename_col_test = "rename_col_test"; + TableName add_col_test = "add_col_test"; + + vector tables_to_create = { + rename_table_test, rename_col_test, add_col_test}; + + auto client = ASSERT_RESULT(cluster_->CreateClient()); + for (size_t ii = 0; ii < tables_to_create.size(); ++ii) { + ASSERT_OK(conn_->Execute(CreateTableStmt(tables_to_create[ii]))); + } + + ASSERT_NOK(conn_->TestFailDdl(CreateTableStmt(create_table_test))); + ASSERT_NOK(conn_->TestFailDdl(CreateIndexStmt(rename_table_test, create_table_idx))); + ASSERT_NOK(conn_->TestFailDdl(RenameTableStmt(rename_table_test))); + ASSERT_NOK(conn_->TestFailDdl(RenameColumnStmt(rename_col_test))); + ASSERT_NOK(conn_->TestFailDdl(AddColumnStmt(add_col_test))); + + // Wait for rollback to complete. + sleep(5); + + // Create is rolled back using existing transaction GC infrastructure. + VerifyTableNotExists(client.get(), kDatabaseName, create_table_test, 15); + VerifyTableNotExists(client.get(), kDatabaseName, create_table_idx, 15); + + // Verify that Alter table transactions are not rolled back because rollback is not enabled yet + // for certain cases like colocation and tablegroups. + VerifySchema(client.get(), kDatabaseName, "foobar", {"key"}); + VerifySchema(client.get(), kDatabaseName, rename_col_test, {"key2"}); + VerifySchema(client.get(), kDatabaseName, add_col_test, {"key", "value"}); +} + +void PgDdlAtomicityNegativeTestBase::negativeDropTableTxnTest() { + // Verify that dropping a table with rollback enabled and another table with rollback disabled + // in the same transaction works as expected. + TableName rollback_enabled_table = "rollback_disabled_table"; + TableName rollback_disabled_table = "rollback_enabled_table"; + + ASSERT_OK(conn_->Execute(CreateTableStmt(rollback_disabled_table))); + ASSERT_OK(conn_->Execute(CreateRollbackEnabledTableStmt(rollback_enabled_table))); + + // Wait for rollback state to clear from the created tables. + sleep(1); + + ASSERT_NOK(conn_->TestFailDdl(Format("DROP TABLE $0, $1", + rollback_enabled_table, rollback_disabled_table))); + // The YB-Master background task ensures that 'rollback_enabled_table' is not deleted. For + // 'rollback_disabled_table', PG layer delays sending the delete request until the transaction + // completes. Thus, verify that the tables do not get deleted even after some time. + sleep(5); + auto client = ASSERT_RESULT(cluster_->CreateClient()); + VerifyTableExists(client.get(), kDatabaseName, rollback_disabled_table, 15); + VerifyTableExists(client.get(), kDatabaseName, rollback_enabled_table, 15); + + // DDLs succeed after rollback. + ASSERT_OK(conn_->Execute(RenameColumnStmt(rollback_disabled_table))); + ASSERT_OK(conn_->Execute(AddColumnStmt(rollback_enabled_table))); +} + +class PgDdlAtomicityNegativeTestColocated : public PgDdlAtomicityNegativeTestBase { + void SetUp() override { + LibPqTestBase::SetUp(); + + kDatabaseName = "colocateddbtest"; + PGConn conn_init = ASSERT_RESULT(Connect()); + ASSERT_OK(conn_init.ExecuteFormat("CREATE DATABASE $0 WITH colocated = true", kDatabaseName)); + + conn_ = std::make_unique(ASSERT_RESULT(ConnectToDB(kDatabaseName))); + } + + string CreateRollbackEnabledTableStmt(const string& tablename) override { + return Format("CREATE TABLE $0 (a INT) WITH (colocated = false);", tablename); + } +}; + +TEST_F(PgDdlAtomicityNegativeTestColocated, YB_DISABLE_TEST_IN_TSAN(ColocatedTest)) { + negativeTest(); + negativeDropTableTxnTest(); +} + +class PgDdlAtomicityNegativeTestTablegroup : public PgDdlAtomicityNegativeTestBase { + void SetUp() override { + LibPqTestBase::SetUp(); + + conn_ = std::make_unique(ASSERT_RESULT(Connect())); + ASSERT_OK(conn_->ExecuteFormat("CREATE TABLEGROUP $0", kTablegroup)); + } + + string CreateTableStmt(const string& tablename) override { + return (PgDdlAtomicitySanityTest::CreateTableStmt(tablename) + " TABLEGROUP " + kTablegroup); + } + + string CreateRollbackEnabledTableStmt(const string& tablename) override { + return PgDdlAtomicitySanityTest::CreateTableStmt(tablename); + } + + const string kTablegroup = "test_tgroup"; +}; + +TEST_F(PgDdlAtomicityNegativeTestTablegroup, YB_DISABLE_TEST_IN_TSAN(TablegroupTest)) { + negativeTest(); + negativeDropTableTxnTest(); +} + +class PgDdlAtomicitySnapshotTest : public PgDdlAtomicitySanityTest { + void SetUp() override { + LibPqTestBase::SetUp(); + + snapshot_util_ = std::make_unique(); + client_ = ASSERT_RESULT(cluster_->CreateClient()); + snapshot_util_->SetProxy(&client_->proxy_cache()); + snapshot_util_->SetCluster(cluster_.get()); + } + + public: + std::unique_ptr snapshot_util_; + std::unique_ptr client_; +}; + +TEST_F(PgDdlAtomicitySnapshotTest, YB_DISABLE_TEST_IN_TSAN(SnapshotTest)) { + // Create requisite tables. + TableName create_table_test = "create_test"; + TableName add_col_test = "add_col_test"; + TableName drop_table_test = "drop_test"; + + vector tables_to_create = {add_col_test, drop_table_test}; + + auto client = ASSERT_RESULT(cluster_->CreateClient()); + auto conn = ASSERT_RESULT(Connect()); + for (size_t ii = 0; ii < tables_to_create.size(); ++ii) { + ASSERT_OK(conn.Execute(CreateTableStmt(tables_to_create[ii]))); + } + + const int snapshot_interval_secs = 10; + // Create a snapshot schedule before running all the failed DDLs. + auto schedule_id = ASSERT_RESULT( + snapshot_util_->CreateSchedule(db(), + client::WaitSnapshot::kTrue, + MonoDelta::FromSeconds(snapshot_interval_secs))); + + // Run all the failed DDLs. + ASSERT_OK(cluster_->SetFlagOnMasters("ysql_transaction_bg_task_wait_ms", "35000")); + ASSERT_NOK(conn.TestFailDdl(CreateTableStmt(create_table_test))); + ASSERT_NOK(conn.TestFailDdl(AddColumnStmt(add_col_test))); + ASSERT_NOK(conn.TestFailDdl(DropTableStmt(drop_table_test))); + + // Wait 10s to ensure that a snapshot is taken right after these DDLs failed. + sleep(snapshot_interval_secs); + + // Get the hybrid time before the rollback can happen. + Timestamp current_time(ASSERT_RESULT(WallClock()->Now()).time_point); + HybridTime hybrid_time_before_rollback = HybridTime::FromMicros(current_time.ToInt64()); + + // Verify that rollback for Alter and Create has indeed not happened yet. + VerifyTableExists(client.get(), db(), create_table_test, 10); + VerifyTableExists(client.get(), db(), drop_table_test, 10); + VerifySchema(client.get(), db(), add_col_test, {"key", "value"}); + + // Verify that rollback happens eventually. + VerifyTableNotExists(client.get(), db(), create_table_test, 60); + for (const string& table : tables_to_create) { + ASSERT_OK(LoggedWaitFor([&]() -> Result { + return CheckIfSchemaMatches(client.get(), db(), table, {"key"}); + }, MonoDelta::FromSeconds(60), "Wait for rollback to complete")); + } + + /* + TODO (deepthi): Uncomment the following code after #14679 is fixed. + // Run different failing DDL operations on the tables. + ASSERT_NOK(conn.TestFailDdl(RenameTableStmt(add_col_test))); + ASSERT_NOK(conn.TestFailDdl(RenameColumnStmt(drop_table_test))); + */ + + // Restore to before rollback. + LOG(INFO) << "Start restoration to timestamp " << hybrid_time_before_rollback; + auto snapshot_id = + ASSERT_RESULT(snapshot_util_->PickSuitableSnapshot(schedule_id, hybrid_time_before_rollback)); + ASSERT_OK(snapshot_util_->RestoreSnapshot(snapshot_id, hybrid_time_before_rollback)); + + LOG(INFO) << "Restoration complete"; + + // We restored to a point before the first rollback occurred. That DDL transaction should have + // been re-detected to be a failure and rolled back again. + VerifyTableNotExists(client.get(), db(), create_table_test, 60); + for (const string& table : tables_to_create) { + ASSERT_OK(LoggedWaitFor([&]() -> Result { + return CheckIfSchemaMatches(client.get(), db(), table, {"key"}); + }, MonoDelta::FromSeconds(60), "Wait for rollback to complete")); + } +} + +} // namespace pgwrapper +} // namespace yb diff --git a/src/yb/yql/pgwrapper/pg_gin_index-test.cc b/src/yb/yql/pgwrapper/pg_gin_index-test.cc index 4e37b4a7adf9..6d2263148770 100644 --- a/src/yb/yql/pgwrapper/pg_gin_index-test.cc +++ b/src/yb/yql/pgwrapper/pg_gin_index-test.cc @@ -13,6 +13,7 @@ #include #include "yb/client/client.h" +#include "yb/client/client-test-util.h" #include "yb/client/table_info.h" #include "yb/client/yb_table_name.h" @@ -51,23 +52,6 @@ class PgGinIndexTest : public LibPqTestBase { std::unique_ptr conn_; }; -namespace { - -// A copy of the same function in pg_libpq-test.cc. Eventually, issue #6868 should provide a way to -// do this easily for both this file and that. -Result GetTableIdByTableName( - client::YBClient* client, const string& namespace_name, const string& table_name) { - const auto tables = VERIFY_RESULT(client->ListTables()); - for (const auto& t : tables) { - if (t.namespace_name() == namespace_name && t.table_name() == table_name) { - return t.table_id(); - } - } - return STATUS(NotFound, "The table does not exist"); -} - -} // namespace - // Test creating a ybgin index on an array whose element type is unsupported for primary key. TEST_F(PgGinIndexTest, YB_DISABLE_TEST_IN_TSAN(UnsupportedArrayElementType)) { ASSERT_OK(conn_->ExecuteFormat("CREATE TABLE $0 (a tsvector[])", kTableName)); diff --git a/src/yb/yql/pgwrapper/pg_index_backfill-test.cc b/src/yb/yql/pgwrapper/pg_index_backfill-test.cc index f75930c4eb2d..ee595aba3e54 100644 --- a/src/yb/yql/pgwrapper/pg_index_backfill-test.cc +++ b/src/yb/yql/pgwrapper/pg_index_backfill-test.cc @@ -15,6 +15,7 @@ #include #include +#include "yb/client/client-test-util.h" #include "yb/client/table_info.h" #include "yb/common/schema.h" @@ -146,19 +147,6 @@ class PgIndexBackfillTest : public LibPqTestBase { namespace { -// A copy of the same function in pg_libpq-test.cc. Eventually, issue #6868 should provide a way to -// do this easily for both this file and that. -Result GetTableIdByTableName( - client::YBClient* client, const string& namespace_name, const string& table_name) { - const auto tables = VERIFY_RESULT(client->ListTables()); - for (const auto& t : tables) { - if (t.namespace_name() == namespace_name && t.table_name() == table_name) { - return t.table_id(); - } - } - return STATUS(NotFound, "The table does not exist"); -} - Result TotalBackfillRpcMetric(ExternalMiniCluster* cluster, const char* type) { int total_rpc_calls = 0; constexpr auto metric_name = "handler_latency_yb_tserver_TabletServerAdminService_BackfillIndex"; diff --git a/src/yb/yql/pgwrapper/pg_libpq-test.cc b/src/yb/yql/pgwrapper/pg_libpq-test.cc index 99581a653740..52d19321aa10 100644 --- a/src/yb/yql/pgwrapper/pg_libpq-test.cc +++ b/src/yb/yql/pgwrapper/pg_libpq-test.cc @@ -28,6 +28,7 @@ #include "yb/client/client_fwd.h" #include "yb/client/table_info.h" #include "yb/client/yb_table_name.h" +#include "yb/client/client-test-util.h" #include "yb/common/common.pb.h" #include "yb/common/pgsql_error.h" @@ -886,32 +887,6 @@ TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(InTxnDelete)) { ASSERT_NO_FATALS(AssertRows(&conn, 1)); } -namespace { - -Result GetNamespaceIdByNamespaceName( - client::YBClient* client, const string& namespace_name) { - const auto namespaces = VERIFY_RESULT(client->ListNamespaces(YQL_DATABASE_PGSQL)); - for (const auto& ns : namespaces) { - if (ns.name() == namespace_name) { - return ns.id(); - } - } - return STATUS(NotFound, "The namespace does not exist"); -} - -Result GetTableIdByTableName( - client::YBClient* client, const string& namespace_name, const string& table_name) { - const auto tables = VERIFY_RESULT(client->ListTables()); - for (const auto& t : tables) { - if (t.namespace_name() == namespace_name && t.table_name() == table_name) { - return t.table_id(); - } - } - return STATUS(NotFound, "The table does not exist"); -} - -} // namespace - TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(CompoundKeyColumnOrder)) { const string namespace_name = "yugabyte"; const string table_name = "test"; @@ -2411,215 +2386,6 @@ TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(CacheRefreshRetryEnabled)) { TestCacheRefreshRetry(false /* is_retry_disabled */); } -class PgLibPqDatabaseTimeoutTest : public PgLibPqTest { - void UpdateMiniClusterOptions(ExternalMiniClusterOptions* options) override { - options->extra_tserver_flags.push_back("--TEST_user_ddl_operation_timeout_sec=1"); - options->extra_master_flags.push_back("--ysql_transaction_bg_task_wait_ms=5000"); - } -}; - -TEST_F(PgLibPqDatabaseTimeoutTest, YB_DISABLE_TEST_IN_TSAN(TestDatabaseTimeoutGC)) { - NamespaceName test_name = "test_pgsql"; - auto client = ASSERT_RESULT(cluster_->CreateClient()); - - // Create Database: will timeout because the admin setting is lower than the DB create latency. - { - auto conn = ASSERT_RESULT(Connect()); - ASSERT_NOK(conn.Execute("CREATE DATABASE " + test_name)); - } - - // Verify DocDB Database creation, even though it failed in PG layer. - // 'ysql_transaction_bg_task_wait_ms' setting ensures we can finish this before the GC. - ASSERT_OK(LoggedWaitFor([&]() -> Result { - Result ret = client->NamespaceExists(test_name, YQLDatabase::YQL_DATABASE_PGSQL); - WARN_NOT_OK(ResultToStatus(ret), "" /* prefix */); - return ret.ok() && ret.get(); - }, MonoDelta::FromSeconds(60), - "Verify Namespace was created in DocDB")); - - // After bg_task_wait, DocDB will notice the PG layer failure because the transaction aborts. - // Confirm that DocDB async deletes the namespace. - ASSERT_OK(LoggedWaitFor([&]() -> Result { - Result ret = client->NamespaceExists(test_name, YQLDatabase::YQL_DATABASE_PGSQL); - WARN_NOT_OK(ResultToStatus(ret), "ret"); - return ret.ok() && ret.get() == false; - }, MonoDelta::FromSeconds(20), "Verify Namespace was removed by Transaction GC")); -} - -TEST_F(PgLibPqDatabaseTimeoutTest, YB_DISABLE_TEST_IN_TSAN(TestDatabaseTimeoutAndRestartGC)) { - NamespaceName test_name = "test_pgsql"; - auto client = ASSERT_RESULT(cluster_->CreateClient()); - - // Create Database: will timeout because the admin setting is lower than the DB create latency. - { - auto conn = ASSERT_RESULT(Connect()); - ASSERT_NOK(conn.Execute("CREATE DATABASE " + test_name)); - } - - // Verify DocDB Database creation, even though it fails in PG layer. - // 'ysql_transaction_bg_task_wait_ms' setting ensures we can finish this before the GC. - ASSERT_OK(LoggedWaitFor([&]() -> Result { - Result ret = client->NamespaceExists(test_name, YQLDatabase::YQL_DATABASE_PGSQL); - WARN_NOT_OK(ResultToStatus(ret), ""); - return ret.ok() && ret.get() == true; - }, MonoDelta::FromSeconds(60), - "Verify Namespace was created in DocDB")); - - LOG(INFO) << "Restarting Master."; - - // Restart the master before the BG task can kick in and GC the failed transaction. - auto master = cluster_->GetLeaderMaster(); - master->Shutdown(); - ASSERT_OK(master->Restart()); - ASSERT_OK(LoggedWaitFor([&]() -> Result { - auto s = cluster_->GetIsMasterLeaderServiceReady(master); - return s.ok(); - }, MonoDelta::FromSeconds(20), "Wait for Master to be ready.")); - - // Confirm that Catalog Loader deletes the namespace on master restart. - client = ASSERT_RESULT(cluster_->CreateClient()); // Reinit the YBClient after restart. - ASSERT_OK(LoggedWaitFor([&]() -> Result { - Result ret = client->NamespaceExists(test_name, YQLDatabase::YQL_DATABASE_PGSQL); - WARN_NOT_OK(ResultToStatus(ret), ""); - return ret.ok() && ret.get() == false; - }, MonoDelta::FromSeconds(20), "Verify Namespace was removed by Transaction GC")); -} - -class PgLibPqTableTimeoutTest : public PgLibPqTest { - void UpdateMiniClusterOptions(ExternalMiniClusterOptions* options) override { - // Use small clock skew, to decrease number of read restarts. - options->extra_tserver_flags.push_back("--TEST_user_ddl_operation_timeout_sec=1"); - options->extra_master_flags.push_back("--TEST_simulate_slow_table_create_secs=2"); - options->extra_master_flags.push_back("--ysql_transaction_bg_task_wait_ms=3000"); - } -}; - -TEST_F(PgLibPqTableTimeoutTest, YB_DISABLE_TEST_IN_TSAN(TestTableTimeoutGC)) { - const string kDatabaseName ="yugabyte"; - NamespaceName test_name = "test_pgsql_table"; - auto client = ASSERT_RESULT(cluster_->CreateClient()); - - // Create Table: will timeout because the admin setting is lower than the DB create latency. - { - auto conn = ASSERT_RESULT(Connect()); - ASSERT_NOK(conn.Execute("CREATE TABLE " + test_name + " (key INT PRIMARY KEY)")); - } - - // Wait for DocDB Table creation, even though it will fail in PG layer. - // 'ysql_transaction_bg_task_wait_ms' setting ensures we can finish this before the GC. - ASSERT_OK(LoggedWaitFor([&]() -> Result { - LOG(INFO) << "Requesting TableExists"; - auto ret = client->TableExists( - client::YBTableName(YQL_DATABASE_PGSQL, kDatabaseName, test_name)); - WARN_NOT_OK(ResultToStatus(ret), ""); - return ret.ok() && ret.get() == true; - }, MonoDelta::FromSeconds(20), "Verify Table was created in DocDB")); - - // DocDB will notice the PG layer failure because the transaction aborts. - // Confirm that DocDB async deletes the namespace. - ASSERT_OK(LoggedWaitFor([&]() -> Result { - auto ret = client->TableExists( - client::YBTableName(YQL_DATABASE_PGSQL, kDatabaseName, test_name)); - WARN_NOT_OK(ResultToStatus(ret), ""); - return ret.ok() && ret.get() == false; - }, MonoDelta::FromSeconds(20), "Verify Table was removed by Transaction GC")); -} - -TEST_F(PgLibPqTableTimeoutTest, YB_DISABLE_TEST_IN_TSAN(TestTableTimeoutAndRestartGC)) { - const string kDatabaseName ="yugabyte"; - NamespaceName test_name = "test_pgsql_table"; - auto client = ASSERT_RESULT(cluster_->CreateClient()); - - // Create Table: will timeout because the admin setting is lower than the DB create latency. - { - auto conn = ASSERT_RESULT(Connect()); - ASSERT_NOK(conn.Execute("CREATE TABLE " + test_name + " (key INT PRIMARY KEY)")); - } - - // Wait for DocDB Table creation, even though it will fail in PG layer. - // 'ysql_transaction_bg_task_wait_ms' setting ensures we can finish this before the GC. - ASSERT_OK(LoggedWaitFor([&]() -> Result { - LOG(INFO) << "Requesting TableExists"; - auto ret = client->TableExists( - client::YBTableName(YQL_DATABASE_PGSQL, kDatabaseName, test_name)); - WARN_NOT_OK(ResultToStatus(ret), ""); - return ret.ok() && ret.get() == true; - }, MonoDelta::FromSeconds(20), "Verify Table was created in DocDB")); - - LOG(INFO) << "Restarting Master."; - - // Restart the master before the BG task can kick in and GC the failed transaction. - auto master = cluster_->GetLeaderMaster(); - master->Shutdown(); - ASSERT_OK(master->Restart()); - ASSERT_OK(LoggedWaitFor([&]() -> Result { - auto s = cluster_->GetIsMasterLeaderServiceReady(master); - return s.ok(); - }, MonoDelta::FromSeconds(20), "Wait for Master to be ready.")); - - // Confirm that Catalog Loader deletes the namespace on master restart. - client = ASSERT_RESULT(cluster_->CreateClient()); // Reinit the YBClient after restart. - ASSERT_OK(LoggedWaitFor([&]() -> Result { - auto ret = client->TableExists( - client::YBTableName(YQL_DATABASE_PGSQL, kDatabaseName, test_name)); - WARN_NOT_OK(ResultToStatus(ret), ""); - return ret.ok() && ret.get() == false; - }, MonoDelta::FromSeconds(20), "Verify Table was removed by Transaction GC")); -} - -class PgLibPqIndexTableTimeoutTest : public PgLibPqTest { - void UpdateMiniClusterOptions(ExternalMiniClusterOptions* options) override { - options->extra_tserver_flags.push_back("--TEST_user_ddl_operation_timeout_sec=10"); - } -}; - -TEST_F(PgLibPqIndexTableTimeoutTest, YB_DISABLE_TEST_IN_TSAN(TestIndexTableTimeoutGC)) { - const string kDatabaseName ="yugabyte"; - NamespaceName test_name = "test_pgsql_table"; - NamespaceName test_name_idx = test_name + "_idx"; - - auto client = ASSERT_RESULT(cluster_->CreateClient()); - - // Lower the delays so we successfully create this first table. - ASSERT_OK(cluster_->SetFlagOnMasters("ysql_transaction_bg_task_wait_ms", "10")); - ASSERT_OK(cluster_->SetFlagOnMasters("TEST_simulate_slow_table_create_secs", "0")); - - // Create Table that Index will be set on. - { - auto conn = ASSERT_RESULT(Connect()); - ASSERT_OK(conn.Execute("CREATE TABLE " + test_name + " (key INT PRIMARY KEY)")); - } - - // After successfully creating the first table, set to flags similar to: PgLibPqTableTimeoutTest. - ASSERT_OK(cluster_->SetFlagOnMasters("ysql_transaction_bg_task_wait_ms", "13000")); - ASSERT_OK(cluster_->SetFlagOnMasters("TEST_simulate_slow_table_create_secs", "12")); - - // Create Index: will timeout because the admin setting is lower than the DB create latency. - { - auto conn = ASSERT_RESULT(Connect()); - ASSERT_NOK(conn.Execute("CREATE INDEX " + test_name_idx + " ON " + test_name + "(key)")); - } - - // Wait for DocDB Table creation, even though it will fail in PG layer. - // 'ysql_transaction_bg_task_wait_ms' setting ensures we can finish this before the GC. - ASSERT_OK(LoggedWaitFor([&]() -> Result { - LOG(INFO) << "Requesting TableExists"; - auto ret = client->TableExists( - client::YBTableName(YQL_DATABASE_PGSQL, kDatabaseName, test_name_idx)); - WARN_NOT_OK(ResultToStatus(ret), ""); - return ret.ok() && ret.get() == true; - }, MonoDelta::FromSeconds(40), "Verify Index Table was created in DocDB")); - - // DocDB will notice the PG layer failure because the transaction aborts. - // Confirm that DocDB async deletes the namespace. - ASSERT_OK(LoggedWaitFor([&]() -> Result { - auto ret = client->TableExists( - client::YBTableName(YQL_DATABASE_PGSQL, kDatabaseName, test_name_idx)); - WARN_NOT_OK(ResultToStatus(ret), ""); - return ret.ok() && ret.get() == false; - }, MonoDelta::FromSeconds(40), "Verify Index Table was removed by Transaction GC")); -} - class PgLibPqTestEnumType: public PgLibPqTest { public: void UpdateMiniClusterOptions(ExternalMiniClusterOptions* options) override {