diff --git a/src/yb/client/client-internal.cc b/src/yb/client/client-internal.cc index eff94bb5351d..d28cb13187ee 100644 --- a/src/yb/client/client-internal.cc +++ b/src/yb/client/client-internal.cc @@ -2386,11 +2386,9 @@ Status YBClient::Data::ValidateReplicationInfo( ValidateReplicationInfoResponsePB resp; auto new_ri = req.mutable_replication_info(); new_ri->CopyFrom(replication_info); - Status status = SyncLeaderMasterRpc( + RETURN_NOT_OK(SyncLeaderMasterRpc( deadline, req, &resp, "ValidateReplicationInfo", - &master::MasterReplicationProxy::ValidateReplicationInfoAsync); - RETURN_NOT_OK(status); - + &master::MasterReplicationProxy::ValidateReplicationInfoAsync)); if (resp.has_error()) { return StatusFromPB(resp.error().status()); } @@ -2415,6 +2413,22 @@ Result YBClient::Data::GetTableDiskSize( return TableSizeInfo{resp.size(), resp.num_missing_tablets()}; } +Status YBClient::Data::ReportYsqlDdlTxnStatus( + const TransactionMetadata& txn, bool is_committed, const CoarseTimePoint& deadline) { + master::ReportYsqlDdlTxnStatusRequestPB req; + master::ReportYsqlDdlTxnStatusResponsePB resp; + + req.set_transaction_id(txn.transaction_id.data(), txn.transaction_id.size()); + req.set_is_committed(is_committed); + RETURN_NOT_OK(SyncLeaderMasterRpc( + deadline, req, &resp, "ReportYsqlDdlTxnStatus", + &master::MasterDdlProxy::ReportYsqlDdlTxnStatusAsync)); + if (resp.has_error()) { + return StatusFromPB(resp.error().status()); + } + return Status::OK(); +} + Result YBClient::Data::CheckIfPitrActive(CoarseTimePoint deadline) { CheckIfPitrActiveRequestPB req; CheckIfPitrActiveResponsePB resp; diff --git a/src/yb/client/client-internal.h b/src/yb/client/client-internal.h index 183b8fadebbd..4c7651fd05f1 100644 --- a/src/yb/client/client-internal.h +++ b/src/yb/client/client-internal.h @@ -406,6 +406,10 @@ class YBClient::Data { // It does not take replication factor into account Result GetTableDiskSize(const TableId& table_id, CoarseTimePoint deadline); + // Provide the status of the transaction to YB-Master. + Status ReportYsqlDdlTxnStatus( + const TransactionMetadata& txn, bool is_committed, const CoarseTimePoint& deadline); + Result CheckIfPitrActive(CoarseTimePoint deadline); template diff --git a/src/yb/client/client.cc b/src/yb/client/client.cc index 16d629c81878..35f5f225d0ed 100644 --- a/src/yb/client/client.cc +++ b/src/yb/client/client.cc @@ -2056,6 +2056,11 @@ Result YBClient::GetTableDiskSize(const TableId& table_id) { return data_->GetTableDiskSize(table_id, deadline); } +Status YBClient::ReportYsqlDdlTxnStatus(const TransactionMetadata& txn, bool is_committed) { + auto deadline = CoarseMonoClock::Now() + default_rpc_timeout(); + return data_->ReportYsqlDdlTxnStatus(txn, is_committed, deadline); +} + Result YBClient::CheckIfPitrActive() { auto deadline = CoarseMonoClock::Now() + default_rpc_timeout(); return data_->CheckIfPitrActive(deadline); diff --git a/src/yb/client/client.h b/src/yb/client/client.h index 9e8dd3a209f5..b5f6fcf1931a 100644 --- a/src/yb/client/client.h +++ b/src/yb/client/client.h @@ -750,6 +750,9 @@ class YBClient { // Get the disk size of a table (calculated as SST file size + WAL file size) Result GetTableDiskSize(const TableId& table_id); + // Provide the completion status of 'txn' to the YB-Master. + Status ReportYsqlDdlTxnStatus(const TransactionMetadata& txn, bool is_committed); + Result CheckIfPitrActive(); void LookupTabletByKey(const std::shared_ptr& table, diff --git a/src/yb/master/catalog_entity_info.cc b/src/yb/master/catalog_entity_info.cc index 8d04c5bef0d9..d5deaa9aa39f 100644 --- a/src/yb/master/catalog_entity_info.cc +++ b/src/yb/master/catalog_entity_info.cc @@ -36,7 +36,6 @@ #include "yb/common/doc_hybrid_time.h" #include "yb/common/partition.h" -#include "yb/common/transaction.h" #include "yb/common/wire_protocol.h" #include "yb/master/master_client.pb.h" @@ -970,6 +969,11 @@ const std::string& PersistentTableInfo::indexed_table_id() const { : pb.has_indexed_table_id() ? pb.indexed_table_id() : kEmptyString; } +Result PersistentTableInfo::is_being_modified_by_ddl_transaction( + const TransactionId& txn) const { + return has_ysql_ddl_txn_verifier_state() && + txn == VERIFY_RESULT(FullyDecodeTransactionId(pb_transaction_id())); +} // ================================================================================================ // DeletedTableInfo diff --git a/src/yb/master/catalog_entity_info.h b/src/yb/master/catalog_entity_info.h index e5625d2eee94..f060187af497 100644 --- a/src/yb/master/catalog_entity_info.h +++ b/src/yb/master/catalog_entity_info.h @@ -41,6 +41,7 @@ #include "yb/common/entity_ids.h" #include "yb/common/index.h" #include "yb/common/partition.h" +#include "yb/common/transaction.h" #include "yb/master/master_client.fwd.h" #include "yb/master/master_fwd.h" @@ -386,11 +387,9 @@ struct PersistentTableInfo : public Persistent is_being_modified_by_ddl_transaction(const TransactionId& txn) const; + // Helper to set the state of the tablet with a custom message. void set_state(SysTablesEntryPB::State state, const std::string& msg); }; diff --git a/src/yb/master/catalog_manager.cc b/src/yb/master/catalog_manager.cc index 962c78c6044b..d1ab37ac7acd 100644 --- a/src/yb/master/catalog_manager.cc +++ b/src/yb/master/catalog_manager.cc @@ -347,6 +347,10 @@ TAG_FLAG(enable_transactional_ddl_gc, hidden); DECLARE_bool(ysql_ddl_rollback_enabled); +DEFINE_test_flag(bool, disable_ysql_ddl_txn_verification, false, + "Simulates a condition where the background process that checks whether the YSQL transaction " + "was a success or a failure is indefinitely delayed"); + // TODO: should this be a test flag? DEFINE_RUNTIME_bool(hide_pg_catalog_table_creation_logs, false, "Whether to hide detailed log messages for PostgreSQL catalog table creation. " @@ -541,6 +545,11 @@ METRIC_DEFINE_gauge_uint32(cluster, num_tablet_servers_dead, "heartbeat in the time interval defined by the gflag " "FLAGS_tserver_unresponsive_timeout_ms."); +DEFINE_test_flag(int32, delay_ysql_ddl_rollback_secs, 0, + "Number of seconds to sleep before rolling back a failed ddl transaction"); + +DECLARE_bool(ysql_ddl_rollback_enabled); + namespace yb { namespace master { @@ -6609,6 +6618,20 @@ Status CatalogManager::IsAlterTableDone(const IsAlterTableDoneRequestPB* req, void CatalogManager::ScheduleYsqlTxnVerification(const scoped_refptr& table, const TransactionMetadata& txn) { + // Add this transaction to the map containining all the transactions yet to be + // verified. + { + LockGuard lock(ddl_txn_verifier_mutex_); + ddl_txn_id_to_table_map_[txn.transaction_id].push_back(table); + } + + if (FLAGS_TEST_disable_ysql_ddl_txn_verification) { + LOG(INFO) << "Skip scheduling table " << table->ToString() << " for transaction verification " + << "as TEST_disable_ysql_ddl_txn_verification is set"; + return; + } + + // Schedule transaction verification. auto l = table->LockForRead(); LOG(INFO) << "Enqueuing table for DDL transaction Verification: " << table->name() << " id: " << table->id() << " schema version: " << l->pb.version(); @@ -6639,28 +6662,59 @@ Status CatalogManager::YsqlDdlTxnCompleteCallback(scoped_refptr table bool success) { DCHECK(!txn_id_pb.empty()); DCHECK(table); - const string& id = "table id: " + table->id(); - auto txn = VERIFY_RESULT(FullyDecodeTransactionId(txn_id_pb)); + const auto& table_id = table->id(); + const auto txn_id = VERIFY_RESULT(FullyDecodeTransactionId(txn_id_pb)); + bool table_present = false; + { + LockGuard lock(ddl_txn_verifier_mutex_); + const auto iter = ddl_txn_id_to_table_map_.find(txn_id); + if (iter == ddl_txn_id_to_table_map_.end()) { + LOG(INFO) << "DDL transaction " << txn_id << " for table " << table->ToString() + << " is already verified, ignoring"; + return Status::OK(); + } + + auto& tables = iter->second; + auto removed_elements_iter = std::remove_if(tables.begin(), tables.end(), + [&table_id](const scoped_refptr& table) { + return table->id() == table_id; + }); + if (removed_elements_iter != tables.end()) { + tables.erase(removed_elements_iter, tables.end()); + table_present = true; + if (tables.empty()) { + ddl_txn_id_to_table_map_.erase(iter); + } + } + } + if (!table_present) { + LOG(INFO) << "DDL transaction " << txn_id << " for table " << table->ToString() + << " is already verified, ignoring"; + return Status::OK(); + } + return YsqlDdlTxnCompleteCallbackInternal(table.get(), txn_id, success); +} + +Status CatalogManager::YsqlDdlTxnCompleteCallbackInternal(TableInfo *table, + const TransactionId& txn_id, + bool success) { + if (FLAGS_TEST_delay_ysql_ddl_rollback_secs > 0) { + LOG(INFO) << "YsqlDdlTxnCompleteCallbackInternal: Sleep for " + << FLAGS_TEST_delay_ysql_ddl_rollback_secs << " seconds"; + SleepFor(MonoDelta::FromSeconds(FLAGS_TEST_delay_ysql_ddl_rollback_secs)); + } + const auto id = "table id: " + table->id(); auto l = table->LockForWrite(); LOG(INFO) << "YsqlDdlTxnCompleteCallback for " << id - << " for transaction " << txn + << " for transaction " << txn_id << ": Success: " << (success ? "true" : "false") << " ysql_ddl_txn_verifier_state: " << l->ysql_ddl_txn_verifier_state().DebugString(); - if (!l->has_ysql_ddl_txn_verifier_state()) { - // The table no longer has any DDL state to clean up. It was probably cleared by - // another thread, nothing to do. - LOG(INFO) << "YsqlDdlTxnCompleteCallback was invoked but no ysql transaction " - << "verification state found for " << id << " , ignoring"; - return Status::OK(); - } - - if (txn_id_pb != l->pb_transaction_id()) { - // Now the table has transaction state for a different transaction. - // Do nothing. - LOG(INFO) << "YsqlDdlTxnCompleteCallback was invoked for transaction " << txn << " but the " - << "schema contains state for a different transaction"; + if (!VERIFY_RESULT(l->is_being_modified_by_ddl_transaction(txn_id))) { + // Transaction verification completed for this table. + LOG(INFO) << "Verification of transaction " << txn_id << " for " << id + << " is already complete, ignoring"; return Status::OK(); } @@ -6693,7 +6747,8 @@ Status CatalogManager::YsqlDdlTxnCompleteCallback(scoped_refptr table auto& table_pb = l.mutable_data()->pb; if (!success && l->ysql_ddl_txn_verifier_state().contains_alter_table_op()) { - LOG(INFO) << "Alter transaction " << txn << " failed, rolling back its schema changes"; + LOG(INFO) << "Alter transaction " << txn_id << " for table " << table->ToString() + << " failed, rolling back its schema changes"; std::vector ddl_log_entries; ddl_log_entries.emplace_back( master_->clock()->Now(), @@ -12560,7 +12615,48 @@ Status CatalogManager::PromoteAutoFlags( resp->set_new_config_version(new_config_version); resp->set_non_runtime_flags_promoted(non_runtime_flags_promoted); + return Status::OK(); +} +Status CatalogManager::ReportYsqlDdlTxnStatus(const ReportYsqlDdlTxnStatusRequestPB* req, + ReportYsqlDdlTxnStatusResponsePB* resp, + rpc::RpcContext* rpc) { + DCHECK(req); + const auto& req_txn = req->transaction_id(); + SCHECK(!req_txn.empty(), IllegalState, + "Received ReportYsqlDdlTxnStatus request without transaction id"); + auto txn = VERIFY_RESULT(FullyDecodeTransactionId(req_txn)); + + const auto is_committed = req->is_committed(); + LOG(INFO) << "Received ReportYsqlDdlTxnStatus request for transaction " << txn + << ". Status: " << (is_committed ? "Success" : "Aborted"); + { + SharedLock lock(ddl_txn_verifier_mutex_); + const auto iter = ddl_txn_id_to_table_map_.find(txn); + if (iter == ddl_txn_id_to_table_map_.end()) { + // Transaction not found in the list of transactions to be verified. Ideally this means that + // the YB-Master background task somehow got to it before PG backend sent this report. However + // it is possible to receive this report BEFORE we added the transaction to the map if: + // 1. The transaction failed before performing any DocDB schema change. + // 2. Transaction failed and this report arrived in the small window between schema change + // initiation and scheduling the verification task. + // We have to do nothing in case of (1). In case of (2), it is safe to do nothing as the + // background task will take care of it. This is not optimal but (2) is expected to be very + // rare. + LOG(INFO) << "DDL transaction " << txn << " not found in list of transactions to be " + << "verified, nothing to do"; + return Status::OK(); + } + for (const auto& table : iter->second) { + // Submit this table for transaction verification. + LOG(INFO) << "Enqueuing table " << table->ToString() + << " for verification of DDL transaction: " << txn; + WARN_NOT_OK(background_tasks_thread_pool_->SubmitFunc([this, table, req_txn, is_committed]() { + WARN_NOT_OK(YsqlDdlTxnCompleteCallback(table, req_txn, is_committed), + "Transaction verification failed for table " + table->ToString()); + }), "Could not submit YsqlDdlTxnCompleteCallback to thread pool"); + } + } return Status::OK(); } diff --git a/src/yb/master/catalog_manager.h b/src/yb/master/catalog_manager.h index 3da1fddada4c..bd3b2bf3aa84 100644 --- a/src/yb/master/catalog_manager.h +++ b/src/yb/master/catalog_manager.h @@ -149,6 +149,9 @@ typedef std::unordered_map XClusterNamespaceToSafeTimeM constexpr int32_t kInvalidClusterConfigVersion = 0; +using DdlTxnIdToTablesMap = + std::unordered_map>, TransactionIdHash>; + // The component of the master which tracks the state and location // of tables/tablets in the cluster. // @@ -363,7 +366,8 @@ class CatalogManager : public tserver::TabletPeerLookupIf, Result GetTableNamespaceId(TableId table_id) EXCLUDES(mutex_); void ScheduleYsqlTxnVerification(const scoped_refptr& table, - const TransactionMetadata& txn); + const TransactionMetadata& txn) + EXCLUDES(ddl_txn_verifier_mutex_); Status YsqlTableSchemaChecker(scoped_refptr table, const std::string& txn_id_pb, @@ -373,6 +377,9 @@ class CatalogManager : public tserver::TabletPeerLookupIf, const std::string& txn_id_pb, bool success); + Status YsqlDdlTxnCompleteCallbackInternal( + TableInfo *table, const TransactionId& txn_id, bool success); + // Get the information about the specified table. Status GetTableSchema(const GetTableSchemaRequestPB* req, GetTableSchemaResponsePB* resp) override; @@ -1025,6 +1032,11 @@ class CatalogManager : public tserver::TabletPeerLookupIf, Status PromoteAutoFlags(const PromoteAutoFlagsRequestPB* req, PromoteAutoFlagsResponsePB* resp); + Status ReportYsqlDdlTxnStatus( + const ReportYsqlDdlTxnStatusRequestPB* req, + ReportYsqlDdlTxnStatusResponsePB* resp, + rpc::RpcContext* rpc); + protected: // TODO Get rid of these friend classes and introduce formal interface. friend class TableLoader; @@ -2032,6 +2044,14 @@ class CatalogManager : public tserver::TabletPeerLookupIf, rpc::ScheduledTaskTracker refresh_ysql_tablespace_info_task_; + // Guards ddl_txn_id_to_table_map_ below. + mutable MutexType ddl_txn_verifier_mutex_; + + // This map stores the transaction ids of all the DDL transactions undergoing verification. + // For each transaction, it also stores pointers to the table info objects of the tables affected + // by that transaction. + DdlTxnIdToTablesMap ddl_txn_id_to_table_map_ GUARDED_BY(ddl_txn_verifier_mutex_); + ServerRegistrationPB server_registration_; TabletSplitManager tablet_split_manager_; diff --git a/src/yb/master/master_ddl.proto b/src/yb/master/master_ddl.proto index 96e809730340..288ca6f8e5b9 100644 --- a/src/yb/master/master_ddl.proto +++ b/src/yb/master/master_ddl.proto @@ -633,6 +633,18 @@ message GetUDTypeInfoResponsePB { optional UDTypeInfoPB udtype = 2; } +message ReportYsqlDdlTxnStatusRequestPB { + // The transaction whose status is being reported. + optional bytes transaction_id = 1; + + // Whether the above transaction is committed or aborted. + optional bool is_committed = 2; +} + +message ReportYsqlDdlTxnStatusResponsePB { + optional MasterErrorPB error = 1; +} + service MasterDdl { option (yb.rpc.custom_service_name) = "yb.master.MasterService"; @@ -681,4 +693,7 @@ service MasterDdl { rpc DeleteUDType(DeleteUDTypeRequestPB) returns (DeleteUDTypeResponsePB); rpc ListUDTypes(ListUDTypesRequestPB) returns (ListUDTypesResponsePB); rpc GetUDTypeInfo(GetUDTypeInfoRequestPB) returns (GetUDTypeInfoResponsePB); + + rpc ReportYsqlDdlTxnStatus(ReportYsqlDdlTxnStatusRequestPB) returns + (ReportYsqlDdlTxnStatusResponsePB); } diff --git a/src/yb/master/master_ddl_service.cc b/src/yb/master/master_ddl_service.cc index 011de9689690..f26b2c97b75e 100644 --- a/src/yb/master/master_ddl_service.cc +++ b/src/yb/master/master_ddl_service.cc @@ -57,6 +57,7 @@ class MasterDdlServiceImpl : public MasterServiceBase, public MasterDdlIf { (ListTablegroups) (ListTables) (ListUDTypes) + (ReportYsqlDdlTxnStatus) (TruncateTable) ) }; diff --git a/src/yb/master/ysql_transaction_ddl.cc b/src/yb/master/ysql_transaction_ddl.cc index 6557d50e53ab..6c23bc03a300 100644 --- a/src/yb/master/ysql_transaction_ddl.cc +++ b/src/yb/master/ysql_transaction_ddl.cc @@ -43,6 +43,27 @@ using std::vector; namespace yb { namespace master { +namespace { + +bool IsTableModifiedByTransaction(TableInfo* table, + const TransactionMetadata& transaction) { + auto l = table->LockForRead(); + const auto& txn = transaction.transaction_id; + auto result = l->is_being_modified_by_ddl_transaction(txn); + if (!result.ok()) { + LOG(ERROR) << "Failed to parse transaction for table " << table->id() + << " skipping transaction verification"; + return false; + } + if (!result.get()) { + LOG(INFO) << "Verification of DDL transaction " << txn << " already completed for table " + << table->id(); + return false; + } + return true; +} + +} // namespace YsqlTransactionDdl::~YsqlTransactionDdl() { // Shutdown any outstanding RPCs. @@ -124,6 +145,13 @@ void YsqlTransactionDdl::VerifyTransaction( SleepFor(MonoDelta::FromMilliseconds(FLAGS_ysql_transaction_bg_task_wait_ms)); + if (has_ysql_ddl_txn_state && !IsTableModifiedByTransaction(table.get(), transaction_metadata)) { + // The table no longer has any ddl transaction verification state pertaining to + // 'transaction_metadata'. It was parallelly completed in some other thread, so there is + // nothing to do. + return; + } + YB_LOG_EVERY_N_SECS(INFO, 1) << "Verifying Transaction " << transaction_metadata; tserver::GetTransactionStatusRequestPB req; @@ -164,10 +192,11 @@ void YsqlTransactionDdl::TransactionReceived( bool has_ysql_ddl_txn_state, std::function complete_callback, Status txn_status, const tserver::GetTransactionStatusResponsePB& resp) { - if (has_ysql_ddl_txn_state) { - // This was invoked for a table for which YSQL DDL rollback is enabled. Verify that it - // contains ysql_ddl_txn_state even now. - DCHECK(table->LockForRead()->has_ysql_ddl_txn_verifier_state()); + if (has_ysql_ddl_txn_state && !IsTableModifiedByTransaction(table.get(), transaction)) { + // The table no longer has any ddl transaction verification state pertaining to + // 'transaction_metadata'. It was parallelly completed in some other thread, so there is + // nothing to do. + return; } if (!txn_status.ok()) { diff --git a/src/yb/tserver/pg_client.proto b/src/yb/tserver/pg_client.proto index 12e2cf542a75..f841046a662a 100644 --- a/src/yb/tserver/pg_client.proto +++ b/src/yb/tserver/pg_client.proto @@ -256,6 +256,8 @@ message PgFinishTransactionRequestPB { uint64 session_id = 1; bool commit = 2; bool ddl_mode = 3; + // This transaction contains changes to the DocDB schema protobufs. + bool has_docdb_schema_changes = 4; } message PgFinishTransactionResponsePB { diff --git a/src/yb/tserver/pg_client_session.cc b/src/yb/tserver/pg_client_session.cc index 0e8a7d531b73..ed74f5ef8ea2 100644 --- a/src/yb/tserver/pg_client_session.cc +++ b/src/yb/tserver/pg_client_session.cc @@ -54,6 +54,10 @@ using std::string; +DEFINE_RUNTIME_bool(report_ysql_ddl_txn_status_to_master, false, + "If set, at the end of DDL operation, the TServer will notify the YB-Master " + "whether the DDL operation was committed or aborted"); + DECLARE_bool(ysql_serializable_isolation_for_ddl_txn); DECLARE_bool(ysql_ddl_rollback_enabled); @@ -676,6 +680,12 @@ Status PgClientSession::FinishTransaction( VLOG_WITH_PREFIX_AND_FUNC(2) << "ddl: " << req.ddl_mode() << ", no running transaction"; return Status::OK(); } + + const TransactionMetadata* metadata = nullptr; + if (FLAGS_report_ysql_ddl_txn_status_to_master && req.has_docdb_schema_changes()) { + metadata = VERIFY_RESULT(GetDdlTransactionMetadata(true, context->GetClientDeadline())); + LOG_IF(DFATAL, !metadata) << "metadata is required"; + } const auto txn_value = std::move(txn); Session(kind)->SetTransaction(nullptr); @@ -684,12 +694,26 @@ Status PgClientSession::FinishTransaction( VLOG_WITH_PREFIX_AND_FUNC(2) << "ddl: " << req.ddl_mode() << ", txn: " << txn_value->id() << ", commit: " << commit_status; - return commit_status; + // If commit_status is not ok, we cannot be sure whether the commit was successful or not. It + // is possible that the commit succeeded at the transaction coordinator but we failed to get + // the response back. Thus we will not report any status to the YB-Master in this case. It + // will run its background task to figure out whether the transaction succeeded or failed. + if (!commit_status.ok()) { + return commit_status; + } + } else { + VLOG_WITH_PREFIX_AND_FUNC(2) + << "ddl: " << req.ddl_mode() << ", txn: " << txn_value->id() << ", abort"; + txn_value->Abort(); } - VLOG_WITH_PREFIX_AND_FUNC(2) - << "ddl: " << req.ddl_mode() << ", txn: " << txn_value->id() << ", abort"; - txn_value->Abort(); + if (metadata) { + // If we failed to report the status of this DDL transaction, we can just log and ignore it, + // as the poller in the YB-Master will figure out the status of this transaction using the + // transaction status tablet and PG catalog. + ERROR_NOT_OK(client().ReportYsqlDdlTxnStatus(*metadata, req.commit()), + "Sending ReportYsqlDdlTxnStatus call failed"); + } return Status::OK(); } diff --git a/src/yb/yql/pggate/pg_client.cc b/src/yb/yql/pggate/pg_client.cc index f62714a3fd29..8926e6c99ee7 100644 --- a/src/yb/yql/pggate/pg_client.cc +++ b/src/yb/yql/pggate/pg_client.cc @@ -200,8 +200,8 @@ class PgClient::Impl { partitions->keys = {PartitionKey(), keys[keys.size() / 2]}; static auto key_printer = [](const auto& key) { return Slice(key).ToDebugHexString(); }; LOG(INFO) << "Partitions for " << table_id << " are joined." - << " source: " << ToString(keys, key_printer) - << " result: " << ToString(partitions->keys, key_printer); + << " source: " << yb::ToString(keys, key_printer) + << " result: " << yb::ToString(partitions->keys, key_printer); } else { partitions->keys.assign(keys.begin(), keys.end()); } @@ -212,11 +212,13 @@ class PgClient::Impl { return result; } - Status FinishTransaction(Commit commit, DdlMode ddl_mode) { + Status FinishTransaction(Commit commit, DdlType ddl_type) { tserver::PgFinishTransactionRequestPB req; req.set_session_id(session_id_); req.set_commit(commit); - req.set_ddl_mode(ddl_mode); + req.set_ddl_mode(ddl_type != DdlType::NonDdl); + req.set_has_docdb_schema_changes(ddl_type == DdlType::DdlWithDocdbSchemaChanges); + tserver::PgFinishTransactionResponsePB resp; RETURN_NOT_OK(proxy_->FinishTransaction(req, &resp, PrepareController())); @@ -638,8 +640,8 @@ Result PgClient::OpenTable( return impl_->OpenTable(table_id, reopen, invalidate_cache_time); } -Status PgClient::FinishTransaction(Commit commit, DdlMode ddl_mode) { - return impl_->FinishTransaction(commit, ddl_mode); +Status PgClient::FinishTransaction(Commit commit, DdlType ddl_type) { + return impl_->FinishTransaction(commit, ddl_type); } Result PgClient::GetDatabaseInfo(uint32_t oid) { diff --git a/src/yb/yql/pggate/pg_client.h b/src/yb/yql/pggate/pg_client.h index 25c0e5ec40f1..1ff9da7a8cd4 100644 --- a/src/yb/yql/pggate/pg_client.h +++ b/src/yb/yql/pggate/pg_client.h @@ -34,6 +34,7 @@ #include "yb/tserver/tserver_util_fwd.h" #include "yb/tserver/pg_client.fwd.h" +#include "yb/util/enums.h" #include "yb/util/monotime.h" #include "yb/yql/pggate/pg_gate_fwd.h" @@ -41,7 +42,15 @@ namespace yb { namespace pggate { -YB_STRONGLY_TYPED_BOOL(DdlMode); +YB_DEFINE_ENUM( + DdlType, + // Not a DDL operation. + ((NonDdl, 0)) + // DDL operation that does not modify the DocDB schema protobufs. + ((DdlWithoutDocdbSchemaChanges, 1)) + // DDL operation that modifies the DocDB schema protobufs. + ((DdlWithDocdbSchemaChanges, 2)) +); #define YB_PG_CLIENT_SIMPLE_METHODS \ (AlterDatabase)(AlterTable)(CreateDatabase)(CreateTable)(CreateTablegroup) \ @@ -74,7 +83,7 @@ class PgClient { Result OpenTable( const PgObjectId& table_id, bool reopen, CoarseTimePoint invalidate_cache_time); - Status FinishTransaction(Commit commit, DdlMode ddl_mode); + Status FinishTransaction(Commit commit, DdlType ddl_type); Result GetDatabaseInfo(PgOid oid); diff --git a/src/yb/yql/pggate/pg_session.cc b/src/yb/yql/pggate/pg_session.cc index bf644d67bbfb..59890b50bc48 100644 --- a/src/yb/yql/pggate/pg_session.cc +++ b/src/yb/yql/pggate/pg_session.cc @@ -735,6 +735,10 @@ bool PgSession::HasWriteOperationsInDdlMode() const { return has_write_ops_in_ddl_mode_ && pg_txn_manager_->IsDdlMode(); } +void PgSession::SetDdlHasSyscatalogChanges() { + pg_txn_manager_->SetDdlHasSyscatalogChanges(); +} + Status PgSession::ValidatePlacement(const std::string& placement_info) { tserver::PgValidatePlacementRequestPB req; diff --git a/src/yb/yql/pggate/pg_session.h b/src/yb/yql/pggate/pg_session.h index 8cfbe221d309..afb7cfd174a3 100644 --- a/src/yb/yql/pggate/pg_session.h +++ b/src/yb/yql/pggate/pg_session.h @@ -326,6 +326,8 @@ class PgSession : public RefCountedThreadSafe { void ResetHasWriteOperationsInDdlMode(); bool HasWriteOperationsInDdlMode() const; + void SetDdlHasSyscatalogChanges(); + Result CheckIfPitrActive(); void GetAndResetOperationFlushRpcStats(uint64_t* count, uint64_t* wait_time); diff --git a/src/yb/yql/pggate/pg_txn_manager.cc b/src/yb/yql/pggate/pg_txn_manager.cc index ef0ce5bd0111..d97250c4291b 100644 --- a/src/yb/yql/pggate/pg_txn_manager.cc +++ b/src/yb/yql/pggate/pg_txn_manager.cc @@ -31,7 +31,6 @@ #include "yb/util/status.h" #include "yb/util/status_format.h" -#include "yb/yql/pggate/pg_client.h" #include "yb/yql/pggate/pggate_flags.h" #include "yb/yql/pggate/ybc_pggate.h" #include "yb/util/flags.h" @@ -234,7 +233,7 @@ uint64_t PgTxnManager::NewPriority(TxnPriorityRequirement txn_priority_requireme Status PgTxnManager::CalculateIsolation(bool read_only_op, TxnPriorityRequirement txn_priority_requirement, uint64_t* in_txn_limit) { - if (ddl_mode_) { + if (IsDdlMode()) { VLOG_TXN_STATE(2); return Status::OK(); } @@ -313,7 +312,7 @@ Status PgTxnManager::RestartTransaction() { /* This is called at the start of each statement in READ COMMITTED isolation level */ Status PgTxnManager::ResetTransactionReadPoint() { - RSTATUS_DCHECK(!ddl_mode_, IllegalState, + RSTATUS_DCHECK(!IsDdlMode(), IllegalState, "READ COMMITTED semantics don't apply to DDL transactions"); read_time_manipulation_ = tserver::ReadTimeManipulation::RESET; read_time_for_follower_reads_ = HybridTime(); @@ -339,7 +338,7 @@ Status PgTxnManager::FinishTransaction(Commit commit) { // If a DDL operation during a DDL txn fails the txn will be aborted before we get here. // However if there are failures afterwards (i.e. during COMMIT or catalog version increment), // then we might get here with a ddl_txn_. Clean it up in that case. - if (ddl_mode_ && !commit) { + if (IsDdlMode() && !commit) { RETURN_NOT_OK(ExitSeparateDdlTxnMode(commit)); } @@ -355,7 +354,7 @@ Status PgTxnManager::FinishTransaction(Commit commit) { } VLOG_TXN_STATE(2) << (commit ? "Committing" : "Aborting") << " transaction."; - Status status = client_->FinishTransaction(commit, DdlMode::kFalse); + Status status = client_->FinishTransaction(commit, DdlType::NonDdl); VLOG_TXN_STATE(2) << "Transaction " << (commit ? "commit" : "abort") << " status: " << status; ResetTxnAndSession(); return status; @@ -380,28 +379,43 @@ void PgTxnManager::ResetTxnAndSession() { } Status PgTxnManager::EnterSeparateDdlTxnMode() { - RSTATUS_DCHECK(!ddl_mode_, IllegalState, + RSTATUS_DCHECK(!IsDdlMode(), IllegalState, "EnterSeparateDdlTxnMode called when already in a DDL transaction"); VLOG_TXN_STATE(2); - ddl_mode_ = true; + ddl_type_ = DdlType::DdlWithoutDocdbSchemaChanges; VLOG_TXN_STATE(2); return Status::OK(); } Status PgTxnManager::ExitSeparateDdlTxnMode(Commit commit) { VLOG_TXN_STATE(2); - if (!ddl_mode_) { + if (!IsDdlMode()) { RSTATUS_DCHECK(!commit, IllegalState, "Commit ddl txn called when not in a DDL transaction"); return Status::OK(); } - RETURN_NOT_OK(client_->FinishTransaction(commit, DdlMode::kTrue)); - ddl_mode_ = false; + RETURN_NOT_OK(client_->FinishTransaction(commit, ddl_type_)); + ddl_type_ = DdlType::NonDdl; return Status::OK(); } +void PgTxnManager::SetDdlHasSyscatalogChanges() { + if (IsDdlMode()) { + ddl_type_ = DdlType::DdlWithDocdbSchemaChanges; + return; + } + // There are only 2 cases where we may be performing DocDB schema changes outside of DDL mode: + // 1. During initdb, when we do not use a transaction at all. + // 2. When yb_non_ddl_txn_for_sys_tables_allowed is set. Here we would use a regular transaction. + // DdlWithDocdbSchemaChanges is mainly used for DDL atomicity, which is disabled for the PG + // system catalog tables. Both cases above are primarily used for modifying the system catalog, + // so there is no need to set this flag here. + DCHECK(YBCIsInitDbModeEnvVarSet() || + (IsTxnInProgress() && yb_non_ddl_txn_for_sys_tables_allowed)); +} + std::string PgTxnManager::TxnStateDebugStr() const { return YB_CLASS_TO_STRING( - ddl_mode, + ddl_type, read_only, deferrable, txn_in_progress, @@ -410,12 +424,12 @@ std::string PgTxnManager::TxnStateDebugStr() const { } uint64_t PgTxnManager::SetupPerformOptions(tserver::PgPerformOptionsPB* options) { - if (!ddl_mode_ && !txn_in_progress_) { + if (!IsDdlMode() && !txn_in_progress_) { ++txn_serial_no_; active_sub_transaction_id_ = 0; } options->set_isolation(isolation_level_); - options->set_ddl_mode(ddl_mode_); + options->set_ddl_mode(IsDdlMode()); options->set_txn_serial_no(txn_serial_no_); options->set_active_sub_transaction_id(active_sub_transaction_id_); @@ -435,7 +449,7 @@ uint64_t PgTxnManager::SetupPerformOptions(tserver::PgPerformOptionsPB* options) options->set_defer_read_point(true); need_defer_read_point_ = false; } - if (!ddl_mode_) { + if (!IsDdlMode()) { // The state in read_time_manipulation_ is only for kPlain transactions. And if YSQL switches to // kDdl mode for sometime, we should keep read_time_manipulation_ as is so that once YSQL // switches back to kDdl mode, the read_time_manipulation_ is not lost. diff --git a/src/yb/yql/pggate/pg_txn_manager.h b/src/yb/yql/pggate/pg_txn_manager.h index dcf0710d1426..e3b2dc0c9447 100644 --- a/src/yb/yql/pggate/pg_txn_manager.h +++ b/src/yb/yql/pggate/pg_txn_manager.h @@ -27,6 +27,7 @@ #include "yb/util/enums.h" +#include "yb/yql/pggate/pg_client.h" #include "yb/yql/pggate/pg_gate_fwd.h" #include "yb/yql/pggate/pg_callbacks.h" @@ -67,11 +68,12 @@ class PgTxnManager : public RefCountedThreadSafe { Status SetDeferrable(bool deferrable); Status EnterSeparateDdlTxnMode(); Status ExitSeparateDdlTxnMode(Commit commit); + void SetDdlHasSyscatalogChanges(); - bool IsDdlMode() const { return ddl_mode_; } bool IsTxnInProgress() const { return txn_in_progress_; } IsolationLevel GetIsolationLevel() const { return isolation_level_; } bool ShouldUseFollowerReads() const { return read_time_for_follower_reads_.is_valid(); } + bool IsDdlMode() const { return ddl_type_ != DdlType::NonDdl; } uint64_t SetupPerformOptions(tserver::PgPerformOptionsPB* options); @@ -114,7 +116,7 @@ class PgTxnManager : public RefCountedThreadSafe { HybridTime read_time_for_follower_reads_; bool deferrable_ = false; - bool ddl_mode_ = false; + DdlType ddl_type_ = DdlType::NonDdl; // On a transaction conflict error we want to recreate the transaction with the same priority as // the last transaction. This avoids the case where the current transaction gets a higher priority diff --git a/src/yb/yql/pggate/pggate.cc b/src/yb/yql/pggate/pggate.cc index 85b5681abbc4..9d3d6758b90d 100644 --- a/src/yb/yql/pggate/pggate.cc +++ b/src/yb/yql/pggate/pggate.cc @@ -799,6 +799,7 @@ Status PgApiImpl::ExecCreateTable(PgStatement *handle) { // Invalid handle. return STATUS(InvalidArgument, "Invalid statement handle"); } + pg_session_->SetDdlHasSyscatalogChanges(); return down_cast(handle)->Exec(); } @@ -870,6 +871,7 @@ Status PgApiImpl::ExecAlterTable(PgStatement *handle) { // Invalid handle. return STATUS(InvalidArgument, "Invalid statement handle"); } + pg_session_->SetDdlHasSyscatalogChanges(); PgAlterTable *pg_stmt = down_cast(handle); return pg_stmt->Exec(); } @@ -1068,7 +1070,7 @@ Status PgApiImpl::ExecDropTable(PgStatement *handle) { if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_DROP_TABLE)) { return STATUS(InvalidArgument, "Invalid statement handle"); } - + pg_session_->SetDdlHasSyscatalogChanges(); return down_cast(handle)->Exec(); } diff --git a/src/yb/yql/pggate/test/pggate_test.cc b/src/yb/yql/pggate/test/pggate_test.cc index fd151dd52b10..aa0498a642aa 100644 --- a/src/yb/yql/pggate/test/pggate_test.cc +++ b/src/yb/yql/pggate/test/pggate_test.cc @@ -216,6 +216,12 @@ void PggateTest::CommitTransaction() { CHECK_YBC_STATUS(YBCPgCommitTransaction()); } +void PggateTest::ExecCreateTableTransaction(YBCPgStatement pg_stmt) { + BeginDDLTransaction(); + CHECK_YBC_STATUS(YBCPgExecCreateTable(pg_stmt)); + CommitDDLTransaction(); +} + // ------------------------------------------------------------------------------------------------ // Make sure that DataType in common.proto matches the YBCPgDataType enum // TODO: find a better way to generate these enums. diff --git a/src/yb/yql/pggate/test/pggate_test.h b/src/yb/yql/pggate/test/pggate_test.h index e84b11dca32d..c7795985d221 100644 --- a/src/yb/yql/pggate/test/pggate_test.h +++ b/src/yb/yql/pggate/test/pggate_test.h @@ -71,8 +71,10 @@ class PggateTest : public YBTest { //------------------------------------------------------------------------------------------------ // Setup the database for testing. - void SetupDB(const std::string& db_name = kDefaultDatabase, YBCPgOid db_oid = kDefaultDatabaseOid); - void CreateDB(const std::string& db_name = kDefaultDatabase, YBCPgOid db_oid = kDefaultDatabaseOid); + void SetupDB(const std::string& db_name = kDefaultDatabase, + YBCPgOid db_oid = kDefaultDatabaseOid); + void CreateDB(const std::string& db_name = kDefaultDatabase, + YBCPgOid db_oid = kDefaultDatabaseOid); void ConnectDB(const std::string& db_name = kDefaultDatabase); protected: @@ -80,6 +82,7 @@ class PggateTest : public YBTest { void CommitDDLTransaction(); void BeginTransaction(); void CommitTransaction(); + void ExecCreateTableTransaction(YBCPgStatement pg_stmt); //------------------------------------------------------------------------------------------------ // Simulated cluster. diff --git a/src/yb/yql/pggate/test/pggate_test_catalog.cc b/src/yb/yql/pggate/test/pggate_test_catalog.cc index cadd558cab3e..0531e1752004 100644 --- a/src/yb/yql/pggate/test/pggate_test_catalog.cc +++ b/src/yb/yql/pggate/test/pggate_test_catalog.cc @@ -68,7 +68,7 @@ TEST_F(PggateTestCatalog, TestDml) { DataType::FLOAT, false, false)); CHECK_YBC_STATUS(YBCTestCreateTableAddColumn(pg_stmt, "job", ++col_count, DataType::STRING, false, false)); - CHECK_YBC_STATUS(YBCPgExecCreateTable(pg_stmt)); + ExecCreateTableTransaction(pg_stmt); pg_stmt = nullptr; // INSERT ---------------------------------------------------------------------------------------- @@ -410,7 +410,7 @@ TEST_F(PggateTestCatalog, TestCopydb) { &pg_stmt)); CHECK_YBC_STATUS(YBCTestCreateTableAddColumn(pg_stmt, "key", 1, DataType::INT32, false, true)); CHECK_YBC_STATUS(YBCTestCreateTableAddColumn(pg_stmt, "value", 2, DataType::INT32, false, false)); - CHECK_YBC_STATUS(YBCPgExecCreateTable(pg_stmt)); + ExecCreateTableTransaction(pg_stmt); pg_stmt = nullptr; CHECK_YBC_STATUS(YBCPgNewInsert(kDefaultDatabaseOid, tab_oid, false /* is_single_row_txn */, diff --git a/src/yb/yql/pggate/test/pggate_test_delete.cc b/src/yb/yql/pggate/test/pggate_test_delete.cc index ab2d2bfae06d..853de32f4d77 100644 --- a/src/yb/yql/pggate/test/pggate_test_delete.cc +++ b/src/yb/yql/pggate/test/pggate_test_delete.cc @@ -62,8 +62,7 @@ TEST_F(PggateTestDelete, TestDelete) { DataType::FLOAT, false, false)); CHECK_YBC_STATUS(YBCTestCreateTableAddColumn(pg_stmt, "job", ++col_count, DataType::STRING, false, false)); - CHECK_YBC_STATUS(YBCPgExecCreateTable(pg_stmt)); - + ExecCreateTableTransaction(pg_stmt); pg_stmt = nullptr; // INSERT ---------------------------------------------------------------------------------------- diff --git a/src/yb/yql/pggate/test/pggate_test_select.cc b/src/yb/yql/pggate/test/pggate_test_select.cc index b8be2d280de8..8a3f70a394a5 100644 --- a/src/yb/yql/pggate/test/pggate_test_select.cc +++ b/src/yb/yql/pggate/test/pggate_test_select.cc @@ -67,7 +67,7 @@ TEST_F(PggateTestSelect, TestSelectOneTablet) { CHECK_YBC_STATUS(YBCTestCreateTableAddColumn(pg_stmt, "oid", -2, DataType::INT32, false, false)); ++col_count; - CHECK_YBC_STATUS(YBCPgExecCreateTable(pg_stmt)); + ExecCreateTableTransaction(pg_stmt); pg_stmt = nullptr; diff --git a/src/yb/yql/pggate/test/pggate_test_select_inequality.cc b/src/yb/yql/pggate/test/pggate_test_select_inequality.cc index 267bc9e18b9a..8ccca126fe17 100644 --- a/src/yb/yql/pggate/test/pggate_test_select_inequality.cc +++ b/src/yb/yql/pggate/test/pggate_test_select_inequality.cc @@ -56,7 +56,7 @@ TEST_F(PggateTestSelectInequality, TestSelectInequality) { DataType::INT64, false, true)); CHECK_YBC_STATUS(YBCTestCreateTableAddColumn(pg_stmt, "val", ++col_count, DataType::STRING, false, false)); - CHECK_YBC_STATUS(YBCPgExecCreateTable(pg_stmt)); + ExecCreateTableTransaction(pg_stmt); pg_stmt = nullptr; diff --git a/src/yb/yql/pggate/test/pggate_test_select_multi_tablets.cc b/src/yb/yql/pggate/test/pggate_test_select_multi_tablets.cc index baaf58363e46..4dc59a9be709 100644 --- a/src/yb/yql/pggate/test/pggate_test_select_multi_tablets.cc +++ b/src/yb/yql/pggate/test/pggate_test_select_multi_tablets.cc @@ -62,7 +62,7 @@ TEST_F(PggateTestSelectMultiTablets, TestSelectMultiTablets) { DataType::FLOAT, false, false)); CHECK_YBC_STATUS(YBCTestCreateTableAddColumn(pg_stmt, "job", ++col_count, DataType::STRING, false, false)); - CHECK_YBC_STATUS(YBCPgExecCreateTable(pg_stmt)); + ExecCreateTableTransaction(pg_stmt); pg_stmt = nullptr; // SELECT: Empty Table --------------------------------------------------------------------------- diff --git a/src/yb/yql/pggate/test/pggate_test_table_size.cc b/src/yb/yql/pggate/test/pggate_test_table_size.cc index 3a89d2523820..0afc676c3fc1 100644 --- a/src/yb/yql/pggate/test/pggate_test_table_size.cc +++ b/src/yb/yql/pggate/test/pggate_test_table_size.cc @@ -67,7 +67,7 @@ TEST_F(PggateTestTableSize, YB_DISABLE_TEST_IN_TSAN(TestSimpleTable)) { DataType::FLOAT, false, false)); CHECK_YBC_STATUS(YBCTestCreateTableAddColumn(pg_stmt, "job", ++col_count, DataType::STRING, false, false)); - CHECK_YBC_STATUS(YBCPgExecCreateTable(pg_stmt)); + ExecCreateTableTransaction(pg_stmt); YBCPgDeleteStatement(pg_stmt); @@ -181,7 +181,9 @@ TEST_F(PggateTestTableSize, TestMissingTablets) { DataType::INT64, true, true)); CHECK_YBC_STATUS(YBCTestCreateTableAddColumn(pg_stmt, "id", ++col_count, DataType::INT32, false, true)); + BeginDDLTransaction(); CHECK_YBC_STATUS(YBCPgExecCreateTable(pg_stmt)); + CommitDDLTransaction(); YBCPgDeleteStatement(pg_stmt); diff --git a/src/yb/yql/pggate/test/pggate_test_update.cc b/src/yb/yql/pggate/test/pggate_test_update.cc index 759a4f3ab0bc..9779b9a45753 100644 --- a/src/yb/yql/pggate/test/pggate_test_update.cc +++ b/src/yb/yql/pggate/test/pggate_test_update.cc @@ -28,11 +28,11 @@ using std::string; namespace yb { namespace pggate { -class PggateTestDelete : public PggateTest { +class PggateTestUpdate : public PggateTest { }; -TEST_F(PggateTestDelete, TestDelete) { - CHECK_OK(Init("TestDelete")); +TEST_F(PggateTestUpdate, TestUpdate) { + CHECK_OK(Init("TestUpdate")); const char *tabname = "basic_table"; const YBCPgOid tab_oid = 3; @@ -64,7 +64,7 @@ TEST_F(PggateTestDelete, TestDelete) { DataType::FLOAT, false, false)); CHECK_YBC_STATUS(YBCTestCreateTableAddColumn(pg_stmt, "job", ++col_count, DataType::STRING, false, false)); - CHECK_YBC_STATUS(YBCPgExecCreateTable(pg_stmt)); + ExecCreateTableTransaction(pg_stmt); pg_stmt = nullptr; // INSERT ---------------------------------------------------------------------------------------- diff --git a/src/yb/yql/pgwrapper/pg_ddl_atomicity-test.cc b/src/yb/yql/pgwrapper/pg_ddl_atomicity-test.cc index fc1df08430cf..7a15368088cb 100644 --- a/src/yb/yql/pgwrapper/pg_ddl_atomicity-test.cc +++ b/src/yb/yql/pgwrapper/pg_ddl_atomicity-test.cc @@ -40,6 +40,15 @@ using std::vector; namespace yb { namespace pgwrapper { +const auto kCreateTable = "create_test"s; +const auto kRenameTable = "rename_table_test"s; +const auto kRenameCol = "rename_col_test"s; +const auto kAddCol = "add_col_test"s; +const auto kDropTable = "drop_test"s; + +const auto kDatabase = "yugabyte"s; +const auto kDdlVerificationError = "Table is undergoing DDL transaction verification"s; + class PgDdlAtomicityTest : public LibPqTestBase { void UpdateMiniClusterOptions(ExternalMiniClusterOptions* options) override { options->extra_master_flags.push_back("--ysql_transaction_bg_task_wait_ms=5000"); @@ -88,8 +97,20 @@ class PgDdlAtomicityTest : public LibPqTestBase { return "DROP TABLE " + tablename; } - string db() { - return "yugabyte"; + bool IsDdlVerificationError(const string& error) { + return error.find(kDdlVerificationError) != string::npos; + } + + Status ExecuteWithRetry(PGConn *conn, const string& ddl) { + for (size_t num_retries = 0; num_retries < 5; ++num_retries) { + auto s = conn->Execute(ddl); + if (s.ok() || !IsDdlVerificationError(s.ToString())) { + return s; + } + // Sleep before retrying again. + sleep(1); + } + return STATUS_FORMAT(IllegalState, "Failed to execute DDL statement $0", ddl); } void RestartMaster() { @@ -121,12 +142,13 @@ class PgDdlAtomicityTest : public LibPqTestBase { } } - void VerifySchema(client::YBClient* client, + Status VerifySchema(client::YBClient* client, const string& database_name, const string& table_name, const vector& expected_column_names) { - ASSERT_TRUE(ASSERT_RESULT( - CheckIfSchemaMatches(client, database_name, table_name, expected_column_names))); + return LoggedWaitFor([&] { + return CheckIfSchemaMatches(client, database_name, table_name, expected_column_names); + }, MonoDelta::FromSeconds(60), "Wait for schema to match"); } Result CheckIfSchemaMatches(client::YBClient* client, @@ -160,7 +182,6 @@ class PgDdlAtomicityTest : public LibPqTestBase { TEST_F(PgDdlAtomicityTest, YB_DISABLE_TEST_IN_TSAN(TestDatabaseGC)) { TableName test_name = "test_pgsql"; auto client = ASSERT_RESULT(cluster_->CreateClient()); - auto conn = ASSERT_RESULT(Connect()); ASSERT_OK(conn.TestFailDdl("CREATE DATABASE " + test_name)); @@ -214,11 +235,11 @@ TEST_F(PgDdlAtomicityTest, YB_DISABLE_TEST_IN_TSAN(TestIndexTableGC)) { // Wait for DocDB index creation, even though it will fail in PG layer. // 'ysql_transaction_bg_task_wait_ms' setting ensures we can finish this before the GC. - VerifyTableExists(client.get(), db(), test_name_idx, 10); + VerifyTableExists(client.get(), kDatabase, test_name_idx, 10); // DocDB will notice the PG layer failure because the transaction aborts. // Confirm that DocDB async deletes the index. - VerifyTableNotExists(client.get(), db(), test_name_idx, 40); + VerifyTableNotExists(client.get(), kDatabase, test_name_idx, 40); } // Class for sanity test. @@ -227,44 +248,37 @@ class PgDdlAtomicitySanityTest : public PgDdlAtomicityTest { void UpdateMiniClusterOptions(ExternalMiniClusterOptions* options) override { options->extra_master_flags.push_back("--ysql_ddl_rollback_enabled=true"); options->extra_tserver_flags.push_back("--ysql_ddl_rollback_enabled=true"); + options->extra_tserver_flags.push_back("--report_ysql_ddl_txn_status_to_master=true"); } }; TEST_F(PgDdlAtomicitySanityTest, YB_DISABLE_TEST_IN_TSAN(AlterDropTableRollback)) { - TableName rename_table_test = "rename_table_test"; - TableName rename_col_test = "rename_col_test"; - TableName add_col_test = "add_col_test"; - TableName drop_table_test = "drop_table_test"; - - vector tables = { - rename_table_test, rename_col_test, add_col_test, drop_table_test}; + const auto tables = {kRenameTable, kRenameCol, kAddCol, kDropTable}; auto client = ASSERT_RESULT(cluster_->CreateClient()); - auto conn = ASSERT_RESULT(Connect()); - for (size_t ii = 0; ii < tables.size(); ++ii) { - ASSERT_OK(conn.Execute(CreateTableStmt(tables[ii]))); + for (const auto& table : tables) { + ASSERT_OK(conn.Execute(CreateTableStmt(table))); } // Wait for the transaction state left by create table statement to clear. sleep(5); // Deliberately cause failure of the following Alter Table statements. - ASSERT_OK(conn.TestFailDdl(RenameTableStmt(rename_table_test))); - ASSERT_OK(conn.TestFailDdl(RenameColumnStmt(rename_col_test))); - ASSERT_OK(conn.TestFailDdl(AddColumnStmt(add_col_test))); - ASSERT_OK(conn.TestFailDdl(DropTableStmt(drop_table_test))); + ASSERT_OK(conn.TestFailDdl(DropTableStmt(kDropTable))); + ASSERT_OK(conn.TestFailDdl(RenameTableStmt(kRenameTable))); + ASSERT_OK(conn.TestFailDdl(RenameColumnStmt(kRenameCol))); + ASSERT_OK(conn.TestFailDdl(AddColumnStmt(kAddCol))); - // Wait for rollback. - sleep(5); - for (size_t ii = 0; ii < tables.size(); ++ii) { - VerifySchema(client.get(), db(), tables[ii], {"key"}); + for (const auto& table : tables) { + ASSERT_OK(VerifySchema(client.get(), kDatabase, table, {"key"})); } // Verify that DDL succeeds after rollback is complete. - ASSERT_OK(conn.Execute(RenameTableStmt(rename_table_test))); - ASSERT_OK(conn.Execute(RenameColumnStmt(rename_col_test))); - ASSERT_OK(conn.Execute(AddColumnStmt(add_col_test))); - ASSERT_OK(conn.Execute(DropTableStmt(drop_table_test))); + conn = ASSERT_RESULT(Connect()); + ASSERT_OK(conn.Execute(RenameTableStmt(kRenameTable))); + ASSERT_OK(conn.Execute(RenameColumnStmt(kRenameCol))); + ASSERT_OK(conn.Execute(AddColumnStmt(kAddCol))); + ASSERT_OK(conn.Execute(DropTableStmt(kDropTable))); } TEST_F(PgDdlAtomicitySanityTest, YB_DISABLE_TEST_IN_TSAN(PrimaryKeyRollback)) { @@ -274,25 +288,22 @@ TEST_F(PgDdlAtomicitySanityTest, YB_DISABLE_TEST_IN_TSAN(PrimaryKeyRollback)) { auto client = ASSERT_RESULT(cluster_->CreateClient()); auto conn = ASSERT_RESULT(Connect()); ASSERT_OK(conn.ExecuteFormat("CREATE TABLE $0 (id int)", add_pk_table)); - ASSERT_OK(conn.Execute(CreateTableStmt(drop_pk_table))); - - // Wait for transaction verification on the tables to complete. - sleep(1); + CreateTable(drop_pk_table); // Delay rollback. - ASSERT_OK(cluster_->SetFlagOnMasters("ysql_transaction_bg_task_wait_ms", "10000")); + ASSERT_OK(cluster_->SetFlagOnMasters("TEST_delay_ysql_ddl_rollback_secs", "15")); // Fail Alter operation that adds/drops primary key. ASSERT_OK(conn.TestFailDdl(Format("ALTER TABLE $0 ADD PRIMARY KEY(id)", add_pk_table))); ASSERT_OK(conn.TestFailDdl(Format("ALTER TABLE $0 DROP CONSTRAINT $0_pkey;", drop_pk_table))); // Verify presence of temp table created for adding a primary key. - VerifyTableExists(client.get(), db(), add_pk_table + "_temp_old", 10); - VerifyTableExists(client.get(), db(), drop_pk_table + "_temp_old", 10); + VerifyTableExists(client.get(), kDatabase, add_pk_table + "_temp_old", 10); + VerifyTableExists(client.get(), kDatabase, drop_pk_table + "_temp_old", 10); // Verify that the background task detected the failure of the DDL operation and removed the // temp table. - VerifyTableNotExists(client.get(), db(), add_pk_table + "_temp_old", 40); - VerifyTableNotExists(client.get(), db(), drop_pk_table + "_temp_old", 40); + VerifyTableNotExists(client.get(), kDatabase, add_pk_table + "_temp_old", 40); + VerifyTableNotExists(client.get(), kDatabase, drop_pk_table + "_temp_old", 40); // Verify that PK constraint is not present on the table. ASSERT_OK(conn.Execute("INSERT INTO " + add_pk_table + " VALUES (1), (1)")); @@ -302,114 +313,232 @@ TEST_F(PgDdlAtomicitySanityTest, YB_DISABLE_TEST_IN_TSAN(PrimaryKeyRollback)) { } TEST_F(PgDdlAtomicitySanityTest, YB_DISABLE_TEST_IN_TSAN(DdlRollbackMasterRestart)) { - TableName create_table_test = "create_test"; - TableName rename_table_test = "rename_table_test"; - TableName rename_col_test = "rename_col_test"; - TableName add_col_test = "add_col_test"; - TableName drop_table_test = "drop_test"; - - vector tables_to_create = { - rename_table_test, rename_col_test, add_col_test, drop_table_test}; - + const auto tables_to_create = {kRenameTable, kRenameCol, kAddCol, kDropTable}; auto client = ASSERT_RESULT(cluster_->CreateClient()); auto conn = ASSERT_RESULT(Connect()); - for (size_t ii = 0; ii < tables_to_create.size(); ++ii) { - ASSERT_OK(conn.Execute(CreateTableStmt(tables_to_create[ii]))); + for (const auto& table : tables_to_create) { + ASSERT_OK(conn.Execute(CreateTableStmt(table))); } - // Set ysql_transaction_bg_task_wait_ms so high that table rollback is nearly + // Set TEST_delay_ysql_ddl_rollback_secs so high that table rollback is nearly // disabled. - ASSERT_OK(cluster_->SetFlagOnMasters("ysql_transaction_bg_task_wait_ms", "100000")); - ASSERT_OK(conn.TestFailDdl(CreateTableStmt(create_table_test))); - ASSERT_OK(conn.TestFailDdl(RenameTableStmt(rename_table_test))); - ASSERT_OK(conn.TestFailDdl(RenameColumnStmt(rename_col_test))); - ASSERT_OK(conn.TestFailDdl(AddColumnStmt(add_col_test))); - ASSERT_OK(conn.TestFailDdl(DropTableStmt(drop_table_test))); + ASSERT_OK(cluster_->SetFlagOnMasters("TEST_delay_ysql_ddl_rollback_secs", "100")); + ASSERT_OK(conn.TestFailDdl(CreateTableStmt(kCreateTable))); + ASSERT_OK(conn.TestFailDdl(RenameTableStmt(kRenameTable))); + ASSERT_OK(conn.TestFailDdl(RenameColumnStmt(kRenameCol))); + ASSERT_OK(conn.TestFailDdl(AddColumnStmt(kAddCol))); + ASSERT_OK(conn.TestFailDdl(DropTableStmt(kDropTable))); // Verify that table was created on DocDB. - VerifyTableExists(client.get(), db(), create_table_test, 10); - VerifyTableExists(client.get(), db(), drop_table_test, 10); + VerifyTableExists(client.get(), kDatabase, kCreateTable, 10); + VerifyTableExists(client.get(), kDatabase, kDropTable, 10); // Verify that the tables were altered on DocDB. - VerifySchema(client.get(), db(), "foobar", {"key"}); - VerifySchema(client.get(), db(), rename_col_test, {"key2"}); - VerifySchema(client.get(), db(), add_col_test, {"key", "value"}); + ASSERT_OK(VerifySchema(client.get(), kDatabase, "foobar", {"key"})); + ASSERT_OK(VerifySchema(client.get(), kDatabase, kRenameCol, {"key2"})); + ASSERT_OK(VerifySchema(client.get(), kDatabase, kAddCol, {"key", "value"})); // Restart the master before the BG task can kick in and GC the failed transaction. - ASSERT_OK(cluster_->SetFlagOnMasters("ysql_transaction_bg_task_wait_ms", "0")); + ASSERT_OK(cluster_->SetFlagOnMasters("TEST_delay_ysql_ddl_rollback_secs", "0")); RestartMaster(); // Verify that rollback reflected on all the tables after restart. client = ASSERT_RESULT(cluster_->CreateClient()); // Reinit the YBClient after restart. - VerifyTableNotExists(client.get(), db(), create_table_test, 20); - VerifyTableExists(client.get(), db(), rename_table_test, 20); + VerifyTableNotExists(client.get(), kDatabase, kCreateTable, 20); + VerifyTableExists(client.get(), kDatabase, kRenameTable, 20); // Verify all the other tables are unchanged by all of the DDLs. for (const string& table : tables_to_create) { - ASSERT_OK(LoggedWaitFor([&]() -> Result { - return CheckIfSchemaMatches(client.get(), db(), table, {"key"}); - }, MonoDelta::FromSeconds(30), Format("Wait for rollback for table $0", table))); + ASSERT_OK(VerifySchema(client.get(), kDatabase, table, {"key"})); + } +} + +TEST_F(PgDdlAtomicitySanityTest, YB_DISABLE_TEST_IN_TSAN(TestYsqlTxnStatusReporting)) { + const auto tables_to_create = {kRenameTable, kRenameCol, kAddCol, kDropTable}; + + auto client = ASSERT_RESULT(cluster_->CreateClient()); + { + auto conn = ASSERT_RESULT(Connect()); + for (const auto& table : tables_to_create) { + ASSERT_OK(conn.Execute(CreateTableStmt(table))); + } + + // Disable YB-Master's background task. + ASSERT_OK(cluster_->SetFlagOnMasters("TEST_disable_ysql_ddl_txn_verification", "true")); + // Run some failing Ddl transactions + ASSERT_OK(conn.TestFailDdl(CreateTableStmt(kCreateTable))); + ASSERT_OK(conn.TestFailDdl(RenameTableStmt(kRenameTable))); + ASSERT_OK(conn.TestFailDdl(RenameColumnStmt(kRenameCol))); + ASSERT_OK(conn.TestFailDdl(AddColumnStmt(kAddCol))); + ASSERT_OK(conn.TestFailDdl(DropTableStmt(kDropTable))); + } + + // The rollback should have succeeded anyway because YSQL would have reported the + // status. + VerifyTableNotExists(client.get(), kDatabase, kCreateTable, 10); + + // Verify all the other tables are unchanged by all of the DDLs. + for (const auto& table : tables_to_create) { + ASSERT_OK(VerifySchema(client.get(), kDatabase, table, {"key"})); + } + + // Now test successful DDLs. + auto conn = ASSERT_RESULT(Connect()); + ASSERT_OK(conn.Execute(CreateTableStmt(kCreateTable))); + ASSERT_OK(conn.Execute(RenameTableStmt(kRenameTable))); + ASSERT_OK(conn.Execute(RenameColumnStmt(kRenameCol))); + ASSERT_OK(conn.Execute(AddColumnStmt(kAddCol))); + ASSERT_OK(conn.Execute(DropTableStmt(kDropTable))); + + VerifyTableExists(client.get(), kDatabase, kCreateTable, 10); + VerifyTableNotExists(client.get(), kDatabase, kDropTable, 10); + + // Verify that the tables were altered on DocDB. + ASSERT_OK(VerifySchema(client.get(), kDatabase, "foobar", {"key"})); + ASSERT_OK(VerifySchema(client.get(), kDatabase, kRenameCol, {"key2"})); + ASSERT_OK(VerifySchema(client.get(), kDatabase, kAddCol, {"key", "value"})); +} + +class PgDdlAtomicityParallelDdlTest : public PgDdlAtomicitySanityTest { + public: + template + Result RunDdlHelper(const std::string& format, const Args&... args) { + return DoRunDdlHelper(Format(format, args...)); + } + private: + Result DoRunDdlHelper(const string& ddl) { + auto conn = VERIFY_RESULT(Connect()); + auto s = conn.Execute(ddl); + if (s.ok()) { + return true; + } + + const auto msg = s.message().ToBuffer(); + static const auto allowed_msgs = { + "Catalog Version Mismatch"s, + "Conflicts with higher priority transaction"s, + "Restart read required"s, + "Transaction aborted"s, + "Transaction metadata missing"s, + "Unknown transaction, could be recently aborted"s, + "Flush: Value write after transaction start"s, + kDdlVerificationError + }; + if (std::find_if( + std::begin(allowed_msgs), + std::end(allowed_msgs), + [&msg] (const string& allowed_msg) { + return msg.find(allowed_msg) != string::npos; + }) != std::end(allowed_msgs)) { + LOG(ERROR) << "Unexpected failure status " << s; + return false; + } + return true; + } +}; + +TEST_F(PgDdlAtomicityParallelDdlTest, YB_DISABLE_TEST_IN_TSAN(TestParallelDdl)) { + constexpr size_t kNumIterations = 10; + const auto tablename = "test_table"s; + auto conn = ASSERT_RESULT(Connect()); + ASSERT_OK(conn.Execute(CreateTableStmt(tablename))); + ASSERT_OK(conn.ExecuteFormat( + "INSERT INTO $0 VALUES (generate_series(1, $1))", + tablename, + kNumIterations)); + + // Add some columns. + for (size_t i = 0; i < kNumIterations; ++i) { + ASSERT_OK(ExecuteWithRetry(&conn, + Format("ALTER TABLE $0 ADD COLUMN col_$1 TEXT", tablename, i))); } + + ASSERT_OK(cluster_->SetFlagOnMasters("ysql_transaction_bg_task_wait_ms", "0")); + // Add columns in the first thread. + TestThreadHolder thread_holder; + thread_holder.AddThreadFunctor([this, &tablename] { + auto conn = ASSERT_RESULT(Connect()); + for (size_t i = kNumIterations; i < kNumIterations * 2;) { + if (ASSERT_RESULT(RunDdlHelper("ALTER TABLE $0 ADD COLUMN col_$1 TEXT", tablename, i))) { + ++i; + } + } + }); + + // Rename columns in the second thread. + thread_holder.AddThreadFunctor([this, &tablename] { + auto conn = ASSERT_RESULT(Connect()); + for (size_t i = 0; i < kNumIterations;) { + if (ASSERT_RESULT(RunDdlHelper("ALTER TABLE $0 RENAME COLUMN col_$1 TO renamedcol_$2", + tablename, i, i))) { + ++i; + } + } + }); + + thread_holder.JoinAll(); + auto client = ASSERT_RESULT(cluster_->CreateClient()); + vector expected_cols; + expected_cols.reserve(kNumIterations * 2); + expected_cols.emplace_back("key"); + for (size_t i = 0; i < kNumIterations * 2; ++i) { + expected_cols.push_back(Format(i < kNumIterations ? "renamedcol_$0" : "col_$0", i)); + } + ASSERT_OK(VerifySchema(client.get(), kDatabase, tablename, expected_cols)); } // TODO (deepthi) : This test is flaky because of #14995. Re-enable it back after #14995 is fixed. TEST_F(PgDdlAtomicitySanityTest, YB_DISABLE_TEST(FailureRecoveryTest)) { - TableName rename_table_test = "rename_table_test"; - TableName rename_col_test = "rename_col_test"; - TableName add_col_test = "add_col_test"; - TableName drop_table_test = "drop_test"; - - vector tables_to_create = { - rename_table_test, rename_col_test, add_col_test, drop_table_test}; + const auto tables_to_create = {kRenameTable, kRenameCol, kAddCol, kDropTable}; auto client = ASSERT_RESULT(cluster_->CreateClient()); auto conn = ASSERT_RESULT(Connect()); - for (size_t ii = 0; ii < tables_to_create.size(); ++ii) { - ASSERT_OK(conn.Execute(CreateTableStmt(tables_to_create[ii]))); + for (const auto& table : tables_to_create) { + ASSERT_OK(conn.Execute(CreateTableStmt(table))); } // Set ysql_transaction_bg_task_wait_ms so high that table rollback is nearly // disabled. - ASSERT_OK(cluster_->SetFlagOnMasters("ysql_transaction_bg_task_wait_ms", "100000")); - ASSERT_OK(conn.TestFailDdl(RenameTableStmt(rename_table_test))); - ASSERT_OK(conn.TestFailDdl(RenameColumnStmt(rename_col_test))); - ASSERT_OK(conn.TestFailDdl(AddColumnStmt(add_col_test))); - ASSERT_OK(conn.TestFailDdl(DropTableStmt(drop_table_test))); + ASSERT_OK(cluster_->SetFlagOnMasters("TEST_delay_ysql_ddl_rollback_secs", "100")); + ASSERT_OK(conn.TestFailDdl(RenameTableStmt(kRenameTable))); + ASSERT_OK(conn.TestFailDdl(RenameColumnStmt(kRenameCol))); + ASSERT_OK(conn.TestFailDdl(AddColumnStmt(kAddCol))); + ASSERT_OK(conn.TestFailDdl(DropTableStmt(kDropTable))); // Verify that table was created on DocDB. - VerifyTableExists(client.get(), db(), drop_table_test, 10); + VerifyTableExists(client.get(), kDatabase, kDropTable, 10); // Verify that the tables were altered on DocDB. - VerifySchema(client.get(), db(), "foobar", {"key"}); - VerifySchema(client.get(), db(), rename_col_test, {"key2"}); - VerifySchema(client.get(), db(), add_col_test, {"key", "value"}); + ASSERT_OK(VerifySchema(client.get(), kDatabase, "foobar", {"key"})); + ASSERT_OK(VerifySchema(client.get(), kDatabase, kRenameCol, {"key2"})); + ASSERT_OK(VerifySchema(client.get(), kDatabase, kAddCol, {"key", "value"})); // Disable DDL rollback. ASSERT_OK(cluster_->SetFlagOnMasters("ysql_ddl_rollback_enabled", "false")); ASSERT_OK(cluster_->SetFlagOnTServers("ysql_ddl_rollback_enabled", "false")); // Verify that best effort rollback works when ysql_ddl_rollback is disabled. - ASSERT_OK(conn.ExecuteFormat("INSERT INTO $0 VALUES (1)", add_col_test)); + ASSERT_OK(conn.ExecuteFormat("INSERT INTO $0 VALUES (1)", kAddCol)); // The following DDL fails because the table already contains data, so it is not possible to // add a new column with a not null constraint. ASSERT_NOK(conn.ExecuteFormat("ALTER TABLE $0 ADD COLUMN value2 TEXT NOT NULL")); // Verify that the above DDL was rolled back by YSQL. - VerifySchema(client.get(), db(), add_col_test, {"key", "value"}); + ASSERT_OK(VerifySchema(client.get(), kDatabase, kAddCol, {"key", "value"})); // Restart the cluster with DDL rollback disabled. SetFlagOnAllProcessesWithRollingRestart("--ysql_ddl_rollback_enabled=false"); // Verify that rollback did not occur even after the restart. client = ASSERT_RESULT(cluster_->CreateClient()); - VerifyTableExists(client.get(), db(), drop_table_test, 10); - VerifySchema(client.get(), db(), "foobar", {"key"}); - VerifySchema(client.get(), db(), rename_col_test, {"key2"}); - VerifySchema(client.get(), db(), add_col_test, {"key", "value"}); + VerifyTableExists(client.get(), kDatabase, kDropTable, 10); + ASSERT_OK(VerifySchema(client.get(), kDatabase, "foobar", {"key"})); + ASSERT_OK(VerifySchema(client.get(), kDatabase, kRenameCol, {"key2"})); + ASSERT_OK(VerifySchema(client.get(), kDatabase, kAddCol, {"key", "value"})); // Verify that it is still possible to run DDL on an affected table. // Tables having unverified transaction state on them can still be altered if the DDL rollback // is not enabled. - ASSERT_OK(conn.Execute(RenameTableStmt(rename_table_test, "foobar2"))); - ASSERT_OK(conn.Execute(AddColumnStmt(add_col_test, "value2"))); + ASSERT_OK(conn.Execute(RenameTableStmt(kRenameTable, "foobar2"))); + ASSERT_OK(conn.Execute(AddColumnStmt(kAddCol, "value2"))); // Re-enable DDL rollback properly with restart. SetFlagOnAllProcessesWithRollingRestart("--ysql_ddl_rollback_enabled=true"); @@ -418,23 +547,32 @@ TEST_F(PgDdlAtomicitySanityTest, YB_DISABLE_TEST(FailureRecoveryTest)) { conn = ASSERT_RESULT(Connect()); // The tables with the transaction state on them must have had their state cleared upon restart // since the flag is now enabled again. - ASSERT_OK(conn.Execute(RenameColumnStmt(rename_col_test))); - ASSERT_OK(conn.Execute(DropTableStmt(drop_table_test))); + ASSERT_OK(conn.Execute(RenameColumnStmt(kRenameCol))); + ASSERT_OK(conn.Execute(DropTableStmt(kDropTable))); // However add_col_test will be corrupted because we never performed transaction rollback on it. // It should ideally have only one added column "value2", but we never rolled back the addition of // "value". - VerifySchema(client.get(), db(), add_col_test, {"key", "value", "value2"}); + ASSERT_OK(VerifySchema(client.get(), kDatabase, kAddCol, {"key", "value", "value2"})); // Add a column to add_col_test which is now corrupted. - ASSERT_OK(conn.Execute(AddColumnStmt(add_col_test, "value3"))); + ASSERT_OK(conn.Execute(AddColumnStmt(kAddCol, "value3"))); // Wait for transaction verification to run. - sleep(2); + + // Future DDLs still succeed even though the schema is corrupted. This is because we do not need + // to compare schemas to determine whether the transaction is a success. PG backend tells the + // YB-Master the status of the transaction. + ASSERT_OK(ExecuteWithRetry(&conn, AddColumnStmt(kAddCol, "value4"))); + + // Disable PG reporting to Yb-Master. + ASSERT_OK(cluster_->SetFlagOnTServers("report_ysql_ddl_txn_status_to_master", "false")); + + // Run a DDL on the corrupted table. + ASSERT_OK(conn.Execute(RenameTableStmt(kAddCol, "foobar3"))); // However since the PG schema and DocDB schema have become out-of-sync, the above DDL cannot // be verified. All future DDL operations will now fail. - ASSERT_NOK(conn.Execute(RenameTableStmt(add_col_test, "foobar3"))); - ASSERT_NOK(conn.Execute(RenameColumnStmt(add_col_test))); - ASSERT_NOK(conn.Execute(DropTableStmt(add_col_test))); + ASSERT_NOK(conn.Execute(RenameColumnStmt(kAddCol))); + ASSERT_NOK(conn.Execute(DropTableStmt(kAddCol))); } class PgDdlAtomicityConcurrentDdlTest : public PgDdlAtomicitySanityTest { @@ -450,16 +588,16 @@ class PgDdlAtomicityConcurrentDdlTest : public PgDdlAtomicitySanityTest { LOG(ERROR) << "Command " << cmd << " executed successfully when failure expected"; return false; } - return s.ToString().find("Table is undergoing DDL transaction verification") != string::npos; + return IsDdlVerificationError(s.ToString()); } void testConcurrentDDL(const string& stmt1, const string& stmt2) { // Test that until transaction verification is complete, another DDL operating on // the same object cannot go through. auto conn = ASSERT_RESULT(Connect()); - ASSERT_OK(conn.Execute(stmt1)); + ASSERT_OK(ExecuteWithRetry(&conn, stmt1)); // The first DDL was successful, but the second should fail because rollback has - // been delayed using 'ysql_transaction_bg_task_wait_ms'. + // been delayed using 'TEST_delay_ysql_ddl_rollback_secs'. auto conn2 = ASSERT_RESULT(Connect()); ASSERT_TRUE(testFailedDueToTxnVerification(&conn2, stmt2)); } @@ -472,6 +610,14 @@ class PgDdlAtomicityConcurrentDdlTest : public PgDdlAtomicitySanityTest { auto conn2 = ASSERT_RESULT(Connect()); ASSERT_TRUE(testFailedDueToTxnVerification(&conn2, stmt2)); } + + const string& table() const { + return table_; + } + + private: + const string database_name_ = "yugabyte"; + const string table_ = "test"; }; TEST_F(PgDdlAtomicityConcurrentDdlTest, YB_DISABLE_TEST_IN_TSAN(ConcurrentDdl)) { @@ -481,19 +627,16 @@ TEST_F(PgDdlAtomicityConcurrentDdlTest, YB_DISABLE_TEST_IN_TSAN(ConcurrentDdl)) const string kAlterAndAlter = "alter_and_alter_test"; const string kAlterAndDrop = "alter_and_drop_test"; - const vector tables_to_create = {kDropAndAlter, kAlterAndAlter, kAlterAndDrop}; + const auto tables_to_create = {kDropAndAlter, kAlterAndAlter, kAlterAndDrop}; auto conn = ASSERT_RESULT(Connect()); for (const auto& table : tables_to_create) { ASSERT_OK(conn.Execute(CreateTableStmt(table))); } - // Wait for YsqlDdlTxnVerifierState to be cleaned up from the tables. - sleep(5); - // Test that we can't run a second DDL on a table until the YsqlTxnVerifierState on the first // table is cleared. - ASSERT_OK(cluster_->SetFlagOnMasters("ysql_transaction_bg_task_wait_ms", "60000")); + ASSERT_OK(cluster_->SetFlagOnMasters("TEST_delay_ysql_ddl_rollback_secs", "6")); testConcurrentDDL(CreateTableStmt(kCreateAndAlter), AddColumnStmt(kCreateAndAlter)); testConcurrentDDL(CreateTableStmt(kCreateAndDrop), DropTableStmt(kCreateAndDrop)); testConcurrentDDL(AddColumnStmt(kAlterAndDrop), DropTableStmt(kAlterAndDrop)); @@ -538,7 +681,7 @@ TEST_F(PgDdlAtomicityTxnTest, YB_DISABLE_TEST_IN_TSAN(CreateAlterDropTest)) { runFailingDdlTransaction(ddl_statements); // Table should not exist. auto client = ASSERT_RESULT(cluster_->CreateClient()); - VerifyTableNotExists(client.get(), db(), table(), 10); + VerifyTableNotExists(client.get(), kDatabase, table(), 10); } TEST_F(PgDdlAtomicityTxnTest, YB_DISABLE_TEST_IN_TSAN(CreateDropTest)) { @@ -547,7 +690,7 @@ TEST_F(PgDdlAtomicityTxnTest, YB_DISABLE_TEST_IN_TSAN(CreateDropTest)) { runFailingDdlTransaction(ddl_statements); // Table should not exist. auto client = ASSERT_RESULT(cluster_->CreateClient()); - VerifyTableNotExists(client.get(), db(), table(), 10); + VerifyTableNotExists(client.get(), kDatabase, table(), 10); } TEST_F(PgDdlAtomicityTxnTest, YB_DISABLE_TEST_IN_TSAN(CreateAlterTest)) { @@ -558,7 +701,7 @@ TEST_F(PgDdlAtomicityTxnTest, YB_DISABLE_TEST_IN_TSAN(CreateAlterTest)) { runFailingDdlTransaction(ddl_statements); // Table should not exist. auto client = ASSERT_RESULT(cluster_->CreateClient()); - VerifyTableNotExists(client.get(), db(), table(), 10); + VerifyTableNotExists(client.get(), kDatabase, table(), 10); } TEST_F(PgDdlAtomicityTxnTest, YB_DISABLE_TEST_IN_TSAN(AlterDropTest)) { @@ -568,7 +711,7 @@ TEST_F(PgDdlAtomicityTxnTest, YB_DISABLE_TEST_IN_TSAN(AlterDropTest)) { // Table should exist with old schema intact. auto client = ASSERT_RESULT(cluster_->CreateClient()); - VerifySchema(client.get(), db(), table(), {"key"}); + ASSERT_OK(VerifySchema(client.get(), kDatabase, table(), {"key"})); } TEST_F(PgDdlAtomicityTxnTest, YB_DISABLE_TEST_IN_TSAN(AddColRenameColTest)) { @@ -578,7 +721,7 @@ TEST_F(PgDdlAtomicityTxnTest, YB_DISABLE_TEST_IN_TSAN(AddColRenameColTest)) { // Table should exist with old schema intact. auto client = ASSERT_RESULT(cluster_->CreateClient()); - VerifySchema(client.get(), db(), table(), {"key"}); + ASSERT_OK(VerifySchema(client.get(), kDatabase, table(), {"key"})); } TEST_F(PgDdlAtomicitySanityTest, YB_DISABLE_TEST_IN_TSAN(DmlWithAddColTest)) { @@ -593,7 +736,7 @@ TEST_F(PgDdlAtomicitySanityTest, YB_DISABLE_TEST_IN_TSAN(DmlWithAddColTest)) { ASSERT_OK(conn1.Execute("INSERT INTO " + table + " VALUES (1)")); // Conn2: Initiate rollback of the alter. - ASSERT_OK(cluster_->SetFlagOnMasters("ysql_transaction_bg_task_wait_ms", "10000")); + ASSERT_OK(cluster_->SetFlagOnMasters("TEST_delay_ysql_ddl_rollback_secs", "10")); ASSERT_OK(conn2.TestFailDdl(AddColumnStmt(table))); // Conn1: Since we parallely added a column to the table, the add-column operation would have @@ -613,9 +756,7 @@ TEST_F(PgDdlAtomicitySanityTest, YB_DISABLE_TEST_IN_TSAN(DmlWithAddColTest)) { // Rollback happens while the transaction is in progress. // Wait for the rollback to complete. - ASSERT_OK(LoggedWaitFor([&]() -> Result { - return CheckIfSchemaMatches(client.get(), db(), table, {"key"}); - }, MonoDelta::FromSeconds(30), "Wait for Add Column to be rolled back.")); + ASSERT_OK(VerifySchema(client.get(), kDatabase, table, {"key"})); // Transaction at conn1 succeeds. Normally, drop-column operation would also have aborted all // ongoing transactions on this table. However this is a drop-column operation initiated as part @@ -645,7 +786,7 @@ TEST_F(PgDdlAtomicitySanityTest, YB_DISABLE_TEST_IN_TSAN(DmlWithDropTableTest)) ASSERT_OK(conn2.Execute("INSERT INTO " + table + " VALUES (2)")); // Conn3: Initiate rollback of DROP table. - ASSERT_OK(cluster_->SetFlagOnMasters("ysql_transaction_bg_task_wait_ms", "5000")); + ASSERT_OK(cluster_->SetFlagOnMasters("TEST_delay_ysql_ddl_rollback_secs", "10")); ASSERT_OK(conn3.TestFailDdl(DropTableStmt(table))); // Conn1: This should succeed. @@ -687,38 +828,30 @@ class PgDdlAtomicityNegativeTestBase : public PgDdlAtomicitySanityTest { }; void PgDdlAtomicityNegativeTestBase::negativeTest() { - TableName create_table_test = "create_test"; - TableName create_table_idx = "create_test_idx"; - TableName rename_table_test = "rename_table_test"; - TableName rename_col_test = "rename_col_test"; - TableName add_col_test = "add_col_test"; - - vector tables_to_create = { - rename_table_test, rename_col_test, add_col_test}; + const auto create_table_idx = "create_table_idx"s; + const auto tables_to_create = {kRenameTable, kRenameCol, kAddCol}; auto client = ASSERT_RESULT(cluster_->CreateClient()); - for (size_t ii = 0; ii < tables_to_create.size(); ++ii) { - ASSERT_OK(conn_->Execute(CreateTableStmt(tables_to_create[ii]))); + + for (const auto& table : tables_to_create) { + ASSERT_OK(conn_->Execute(CreateTableStmt(table))); } - ASSERT_OK(conn_->TestFailDdl(CreateTableStmt(create_table_test))); - ASSERT_OK(conn_->TestFailDdl(CreateIndexStmt(rename_table_test, create_table_idx))); - ASSERT_OK(conn_->TestFailDdl(RenameTableStmt(rename_table_test))); - ASSERT_OK(conn_->TestFailDdl(RenameColumnStmt(rename_col_test))); - ASSERT_OK(conn_->TestFailDdl(AddColumnStmt(add_col_test))); - - // Wait for rollback to complete. - sleep(5); + ASSERT_OK(conn_->TestFailDdl(CreateTableStmt(kCreateTable))); + ASSERT_OK(conn_->TestFailDdl(CreateIndexStmt(kRenameTable, create_table_idx))); + ASSERT_OK(conn_->TestFailDdl(RenameTableStmt(kRenameTable))); + ASSERT_OK(conn_->TestFailDdl(RenameColumnStmt(kRenameCol))); + ASSERT_OK(conn_->TestFailDdl(AddColumnStmt(kAddCol))); // Create is rolled back using existing transaction GC infrastructure. - VerifyTableNotExists(client.get(), kDatabaseName, create_table_test, 15); - VerifyTableNotExists(client.get(), kDatabaseName, create_table_idx, 15); + VerifyTableNotExists(client.get(), kDatabaseName, kCreateTable, 30); + VerifyTableNotExists(client.get(), kDatabaseName, create_table_idx, 30); // Verify that Alter table transactions are not rolled back because rollback is not enabled yet // for certain cases like colocation and tablegroups. - VerifySchema(client.get(), kDatabaseName, "foobar", {"key"}); - VerifySchema(client.get(), kDatabaseName, rename_col_test, {"key2"}); - VerifySchema(client.get(), kDatabaseName, add_col_test, {"key", "value"}); + ASSERT_OK(VerifySchema(client.get(), kDatabaseName, "foobar", {"key"})); + ASSERT_OK(VerifySchema(client.get(), kDatabaseName, kRenameCol, {"key2"})); + ASSERT_OK(VerifySchema(client.get(), kDatabaseName, kAddCol, {"key", "value"})); } void PgDdlAtomicityNegativeTestBase::negativeDropTableTxnTest() { @@ -810,30 +943,26 @@ class PgDdlAtomicitySnapshotTest : public PgDdlAtomicitySanityTest { TEST_F(PgDdlAtomicitySnapshotTest, YB_DISABLE_TEST_IN_TSAN(SnapshotTest)) { // Create requisite tables. - TableName create_table_test = "create_test"; - TableName add_col_test = "add_col_test"; - TableName drop_table_test = "drop_test"; - - vector tables_to_create = {add_col_test, drop_table_test}; + const auto tables_to_create = {kAddCol, kDropTable}; auto client = ASSERT_RESULT(cluster_->CreateClient()); auto conn = ASSERT_RESULT(Connect()); - for (size_t ii = 0; ii < tables_to_create.size(); ++ii) { - ASSERT_OK(conn.Execute(CreateTableStmt(tables_to_create[ii]))); + for (const auto& table : tables_to_create) { + ASSERT_OK(conn.Execute(CreateTableStmt(table))); } const int snapshot_interval_secs = 10; // Create a snapshot schedule before running all the failed DDLs. auto schedule_id = ASSERT_RESULT( - snapshot_util_->CreateSchedule(db(), + snapshot_util_->CreateSchedule(kDatabase, client::WaitSnapshot::kTrue, MonoDelta::FromSeconds(snapshot_interval_secs))); // Run all the failed DDLs. - ASSERT_OK(cluster_->SetFlagOnMasters("ysql_transaction_bg_task_wait_ms", "35000")); - ASSERT_OK(conn.TestFailDdl(CreateTableStmt(create_table_test))); - ASSERT_OK(conn.TestFailDdl(AddColumnStmt(add_col_test))); - ASSERT_OK(conn.TestFailDdl(DropTableStmt(drop_table_test))); + ASSERT_OK(cluster_->SetFlagOnMasters("TEST_delay_ysql_ddl_rollback_secs", "35")); + ASSERT_OK(conn.TestFailDdl(CreateTableStmt(kCreateTable))); + ASSERT_OK(conn.TestFailDdl(AddColumnStmt(kAddCol))); + ASSERT_OK(conn.TestFailDdl(DropTableStmt(kDropTable))); // Wait 10s to ensure that a snapshot is taken right after these DDLs failed. sleep(snapshot_interval_secs); @@ -843,16 +972,16 @@ TEST_F(PgDdlAtomicitySnapshotTest, YB_DISABLE_TEST_IN_TSAN(SnapshotTest)) { HybridTime hybrid_time_before_rollback = HybridTime::FromMicros(current_time.ToInt64()); // Verify that rollback for Alter and Create has indeed not happened yet. - VerifyTableExists(client.get(), db(), create_table_test, 10); - VerifyTableExists(client.get(), db(), drop_table_test, 10); - VerifySchema(client.get(), db(), add_col_test, {"key", "value"}); + VerifyTableExists(client.get(), kDatabase, kCreateTable, 10); + VerifyTableExists(client.get(), kDatabase, kDropTable, 10); + ASSERT_OK(VerifySchema(client.get(), kDatabase, kAddCol, {"key", "value"})); - // Verify that rollback happens eventually. - VerifyTableNotExists(client.get(), db(), create_table_test, 60); - for (const string& table : tables_to_create) { - ASSERT_OK(LoggedWaitFor([&]() -> Result { - return CheckIfSchemaMatches(client.get(), db(), table, {"key"}); - }, MonoDelta::FromSeconds(60), "Wait for rollback to complete")); + // Verify that rollback indeed happened. + VerifyTableNotExists(client.get(), kDatabase, kCreateTable, 60); + + // Verify all the other tables are unchanged by all of the DDLs. + for (const auto& table : tables_to_create) { + ASSERT_OK(VerifySchema(client.get(), kDatabase, table, {"key"})); } /* @@ -872,11 +1001,9 @@ TEST_F(PgDdlAtomicitySnapshotTest, YB_DISABLE_TEST_IN_TSAN(SnapshotTest)) { // We restored to a point before the first rollback occurred. That DDL transaction should have // been re-detected to be a failure and rolled back again. - VerifyTableNotExists(client.get(), db(), create_table_test, 60); + VerifyTableNotExists(client.get(), kDatabase, kCreateTable, 60); for (const string& table : tables_to_create) { - ASSERT_OK(LoggedWaitFor([&]() -> Result { - return CheckIfSchemaMatches(client.get(), db(), table, {"key"}); - }, MonoDelta::FromSeconds(60), "Wait for rollback to complete")); + ASSERT_OK(VerifySchema(client.get(), kDatabase, table, {"key"})); } }