Skip to content

Commit

Permalink
[#13358] YSQL: DDL Atomicity Part 2 - YSQL reports DDL transaction st…
Browse files Browse the repository at this point in the history
…atus to YB-Master

Summary:
In Part-1, for DDL operations that change the DocDB schema, YB-Master
asynchronously polls the transaction status tablet to fetch the status of those
DDL transactions.
Once the transaction is determined to have completed, Yb-Master performs
the relatively expensive operation of comparing the DocDB schema and PG
schema to determine whether the transaction was a success to know
whether to roll-back the DocDB changes.

This behavior is optimized in this patch, wherein at the end of the
transaction, YSQL sends the status of the transaction to YB-Master. This
way YB-Master can stop polling the transaction status tablet and also
skip the relatively expensive schema comparison.

If the YSQL backend crashes or is unable to send the status of the transaction
to YB-Master, it will still fall-back to the behavior of Part-1 and compare the
DocDB and PG schema to know whether the transaction succeeded or failed.

**Flags**
The above behavior can be controlled by the runtime safe TServer flag **report_ysql_ddl_txn_status_to_master**.
It is turned off for now as ysql_ddl_rollback enabled is false

Test Plan: ybd --cxx-test pg_ddl_atomicity

Reviewers: nicolas, myang, dmitry

Reviewed By: myang, dmitry

Subscribers: lnguyen, yql, ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D18915
  • Loading branch information
deeps1991 committed Jan 21, 2023
1 parent f9ca331 commit c1ef8c3
Show file tree
Hide file tree
Showing 30 changed files with 633 additions and 243 deletions.
22 changes: 18 additions & 4 deletions src/yb/client/client-internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2386,11 +2386,9 @@ Status YBClient::Data::ValidateReplicationInfo(
ValidateReplicationInfoResponsePB resp;
auto new_ri = req.mutable_replication_info();
new_ri->CopyFrom(replication_info);
Status status = SyncLeaderMasterRpc(
RETURN_NOT_OK(SyncLeaderMasterRpc(
deadline, req, &resp, "ValidateReplicationInfo",
&master::MasterReplicationProxy::ValidateReplicationInfoAsync);
RETURN_NOT_OK(status);

&master::MasterReplicationProxy::ValidateReplicationInfoAsync));
if (resp.has_error()) {
return StatusFromPB(resp.error().status());
}
Expand All @@ -2415,6 +2413,22 @@ Result<TableSizeInfo> YBClient::Data::GetTableDiskSize(
return TableSizeInfo{resp.size(), resp.num_missing_tablets()};
}

Status YBClient::Data::ReportYsqlDdlTxnStatus(
const TransactionMetadata& txn, bool is_committed, const CoarseTimePoint& deadline) {
master::ReportYsqlDdlTxnStatusRequestPB req;
master::ReportYsqlDdlTxnStatusResponsePB resp;

req.set_transaction_id(txn.transaction_id.data(), txn.transaction_id.size());
req.set_is_committed(is_committed);
RETURN_NOT_OK(SyncLeaderMasterRpc(
deadline, req, &resp, "ReportYsqlDdlTxnStatus",
&master::MasterDdlProxy::ReportYsqlDdlTxnStatusAsync));
if (resp.has_error()) {
return StatusFromPB(resp.error().status());
}
return Status::OK();
}

Result<bool> YBClient::Data::CheckIfPitrActive(CoarseTimePoint deadline) {
CheckIfPitrActiveRequestPB req;
CheckIfPitrActiveResponsePB resp;
Expand Down
4 changes: 4 additions & 0 deletions src/yb/client/client-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,10 @@ class YBClient::Data {
// It does not take replication factor into account
Result<TableSizeInfo> GetTableDiskSize(const TableId& table_id, CoarseTimePoint deadline);

// Provide the status of the transaction to YB-Master.
Status ReportYsqlDdlTxnStatus(
const TransactionMetadata& txn, bool is_committed, const CoarseTimePoint& deadline);

Result<bool> CheckIfPitrActive(CoarseTimePoint deadline);

template <class ProxyClass, class ReqClass, class RespClass>
Expand Down
5 changes: 5 additions & 0 deletions src/yb/client/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2056,6 +2056,11 @@ Result<TableSizeInfo> YBClient::GetTableDiskSize(const TableId& table_id) {
return data_->GetTableDiskSize(table_id, deadline);
}

Status YBClient::ReportYsqlDdlTxnStatus(const TransactionMetadata& txn, bool is_committed) {
auto deadline = CoarseMonoClock::Now() + default_rpc_timeout();
return data_->ReportYsqlDdlTxnStatus(txn, is_committed, deadline);
}

Result<bool> YBClient::CheckIfPitrActive() {
auto deadline = CoarseMonoClock::Now() + default_rpc_timeout();
return data_->CheckIfPitrActive(deadline);
Expand Down
3 changes: 3 additions & 0 deletions src/yb/client/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,9 @@ class YBClient {
// Get the disk size of a table (calculated as SST file size + WAL file size)
Result<TableSizeInfo> GetTableDiskSize(const TableId& table_id);

// Provide the completion status of 'txn' to the YB-Master.
Status ReportYsqlDdlTxnStatus(const TransactionMetadata& txn, bool is_committed);

Result<bool> CheckIfPitrActive();

void LookupTabletByKey(const std::shared_ptr<YBTable>& table,
Expand Down
6 changes: 5 additions & 1 deletion src/yb/master/catalog_entity_info.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@

#include "yb/common/doc_hybrid_time.h"
#include "yb/common/partition.h"
#include "yb/common/transaction.h"
#include "yb/common/wire_protocol.h"

#include "yb/master/master_client.pb.h"
Expand Down Expand Up @@ -970,6 +969,11 @@ const std::string& PersistentTableInfo::indexed_table_id() const {
: pb.has_indexed_table_id() ? pb.indexed_table_id() : kEmptyString;
}

Result<bool> PersistentTableInfo::is_being_modified_by_ddl_transaction(
const TransactionId& txn) const {
return has_ysql_ddl_txn_verifier_state() &&
txn == VERIFY_RESULT(FullyDecodeTransactionId(pb_transaction_id()));
}

// ================================================================================================
// DeletedTableInfo
Expand Down
11 changes: 6 additions & 5 deletions src/yb/master/catalog_entity_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include "yb/common/entity_ids.h"
#include "yb/common/index.h"
#include "yb/common/partition.h"
#include "yb/common/transaction.h"

#include "yb/master/master_client.fwd.h"
#include "yb/master/master_fwd.h"
Expand Down Expand Up @@ -386,11 +387,9 @@ struct PersistentTableInfo : public Persistent<SysTablesEntryPB, SysRowEntryType
return pb.mutable_schema();
}

std::string pb_transaction_id() const {
if (!pb.has_transaction()) {
return {};
}
return pb.transaction().transaction_id();
const std::string& pb_transaction_id() const {
static std::string kEmptyString;
return pb.has_transaction() ? pb.transaction().transaction_id() : kEmptyString;
}

bool has_ysql_ddl_txn_verifier_state() const {
Expand All @@ -413,6 +412,8 @@ struct PersistentTableInfo : public Persistent<SysTablesEntryPB, SysRowEntryType
ysql_ddl_txn_verifier_state().contains_create_table_op();
}

Result<bool> is_being_modified_by_ddl_transaction(const TransactionId& txn) const;

// Helper to set the state of the tablet with a custom message.
void set_state(SysTablesEntryPB::State state, const std::string& msg);
};
Expand Down
130 changes: 113 additions & 17 deletions src/yb/master/catalog_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,10 @@ TAG_FLAG(enable_transactional_ddl_gc, hidden);

DECLARE_bool(ysql_ddl_rollback_enabled);

DEFINE_test_flag(bool, disable_ysql_ddl_txn_verification, false,
"Simulates a condition where the background process that checks whether the YSQL transaction "
"was a success or a failure is indefinitely delayed");

// TODO: should this be a test flag?
DEFINE_RUNTIME_bool(hide_pg_catalog_table_creation_logs, false,
"Whether to hide detailed log messages for PostgreSQL catalog table creation. "
Expand Down Expand Up @@ -541,6 +545,11 @@ METRIC_DEFINE_gauge_uint32(cluster, num_tablet_servers_dead,
"heartbeat in the time interval defined by the gflag "
"FLAGS_tserver_unresponsive_timeout_ms.");

DEFINE_test_flag(int32, delay_ysql_ddl_rollback_secs, 0,
"Number of seconds to sleep before rolling back a failed ddl transaction");

DECLARE_bool(ysql_ddl_rollback_enabled);

namespace yb {
namespace master {

Expand Down Expand Up @@ -6609,6 +6618,20 @@ Status CatalogManager::IsAlterTableDone(const IsAlterTableDoneRequestPB* req,

void CatalogManager::ScheduleYsqlTxnVerification(const scoped_refptr<TableInfo>& table,
const TransactionMetadata& txn) {
// Add this transaction to the map containining all the transactions yet to be
// verified.
{
LockGuard lock(ddl_txn_verifier_mutex_);
ddl_txn_id_to_table_map_[txn.transaction_id].push_back(table);
}

if (FLAGS_TEST_disable_ysql_ddl_txn_verification) {
LOG(INFO) << "Skip scheduling table " << table->ToString() << " for transaction verification "
<< "as TEST_disable_ysql_ddl_txn_verification is set";
return;
}

// Schedule transaction verification.
auto l = table->LockForRead();
LOG(INFO) << "Enqueuing table for DDL transaction Verification: " << table->name()
<< " id: " << table->id() << " schema version: " << l->pb.version();
Expand Down Expand Up @@ -6639,28 +6662,59 @@ Status CatalogManager::YsqlDdlTxnCompleteCallback(scoped_refptr<TableInfo> table
bool success) {
DCHECK(!txn_id_pb.empty());
DCHECK(table);
const string& id = "table id: " + table->id();
auto txn = VERIFY_RESULT(FullyDecodeTransactionId(txn_id_pb));
const auto& table_id = table->id();
const auto txn_id = VERIFY_RESULT(FullyDecodeTransactionId(txn_id_pb));
bool table_present = false;
{
LockGuard lock(ddl_txn_verifier_mutex_);
const auto iter = ddl_txn_id_to_table_map_.find(txn_id);
if (iter == ddl_txn_id_to_table_map_.end()) {
LOG(INFO) << "DDL transaction " << txn_id << " for table " << table->ToString()
<< " is already verified, ignoring";
return Status::OK();
}

auto& tables = iter->second;
auto removed_elements_iter = std::remove_if(tables.begin(), tables.end(),
[&table_id](const scoped_refptr<TableInfo>& table) {
return table->id() == table_id;
});
if (removed_elements_iter != tables.end()) {
tables.erase(removed_elements_iter, tables.end());
table_present = true;
if (tables.empty()) {
ddl_txn_id_to_table_map_.erase(iter);
}
}
}
if (!table_present) {
LOG(INFO) << "DDL transaction " << txn_id << " for table " << table->ToString()
<< " is already verified, ignoring";
return Status::OK();
}
return YsqlDdlTxnCompleteCallbackInternal(table.get(), txn_id, success);
}

Status CatalogManager::YsqlDdlTxnCompleteCallbackInternal(TableInfo *table,
const TransactionId& txn_id,
bool success) {
if (FLAGS_TEST_delay_ysql_ddl_rollback_secs > 0) {
LOG(INFO) << "YsqlDdlTxnCompleteCallbackInternal: Sleep for "
<< FLAGS_TEST_delay_ysql_ddl_rollback_secs << " seconds";
SleepFor(MonoDelta::FromSeconds(FLAGS_TEST_delay_ysql_ddl_rollback_secs));
}
const auto id = "table id: " + table->id();
auto l = table->LockForWrite();
LOG(INFO) << "YsqlDdlTxnCompleteCallback for " << id
<< " for transaction " << txn
<< " for transaction " << txn_id
<< ": Success: " << (success ? "true" : "false")
<< " ysql_ddl_txn_verifier_state: "
<< l->ysql_ddl_txn_verifier_state().DebugString();

if (!l->has_ysql_ddl_txn_verifier_state()) {
// The table no longer has any DDL state to clean up. It was probably cleared by
// another thread, nothing to do.
LOG(INFO) << "YsqlDdlTxnCompleteCallback was invoked but no ysql transaction "
<< "verification state found for " << id << " , ignoring";
return Status::OK();
}

if (txn_id_pb != l->pb_transaction_id()) {
// Now the table has transaction state for a different transaction.
// Do nothing.
LOG(INFO) << "YsqlDdlTxnCompleteCallback was invoked for transaction " << txn << " but the "
<< "schema contains state for a different transaction";
if (!VERIFY_RESULT(l->is_being_modified_by_ddl_transaction(txn_id))) {
// Transaction verification completed for this table.
LOG(INFO) << "Verification of transaction " << txn_id << " for " << id
<< " is already complete, ignoring";
return Status::OK();
}

Expand Down Expand Up @@ -6693,7 +6747,8 @@ Status CatalogManager::YsqlDdlTxnCompleteCallback(scoped_refptr<TableInfo> table

auto& table_pb = l.mutable_data()->pb;
if (!success && l->ysql_ddl_txn_verifier_state().contains_alter_table_op()) {
LOG(INFO) << "Alter transaction " << txn << " failed, rolling back its schema changes";
LOG(INFO) << "Alter transaction " << txn_id << " for table " << table->ToString()
<< " failed, rolling back its schema changes";
std::vector<DdlLogEntry> ddl_log_entries;
ddl_log_entries.emplace_back(
master_->clock()->Now(),
Expand Down Expand Up @@ -12560,7 +12615,48 @@ Status CatalogManager::PromoteAutoFlags(

resp->set_new_config_version(new_config_version);
resp->set_non_runtime_flags_promoted(non_runtime_flags_promoted);
return Status::OK();
}

Status CatalogManager::ReportYsqlDdlTxnStatus(const ReportYsqlDdlTxnStatusRequestPB* req,
ReportYsqlDdlTxnStatusResponsePB* resp,
rpc::RpcContext* rpc) {
DCHECK(req);
const auto& req_txn = req->transaction_id();
SCHECK(!req_txn.empty(), IllegalState,
"Received ReportYsqlDdlTxnStatus request without transaction id");
auto txn = VERIFY_RESULT(FullyDecodeTransactionId(req_txn));

const auto is_committed = req->is_committed();
LOG(INFO) << "Received ReportYsqlDdlTxnStatus request for transaction " << txn
<< ". Status: " << (is_committed ? "Success" : "Aborted");
{
SharedLock lock(ddl_txn_verifier_mutex_);
const auto iter = ddl_txn_id_to_table_map_.find(txn);
if (iter == ddl_txn_id_to_table_map_.end()) {
// Transaction not found in the list of transactions to be verified. Ideally this means that
// the YB-Master background task somehow got to it before PG backend sent this report. However
// it is possible to receive this report BEFORE we added the transaction to the map if:
// 1. The transaction failed before performing any DocDB schema change.
// 2. Transaction failed and this report arrived in the small window between schema change
// initiation and scheduling the verification task.
// We have to do nothing in case of (1). In case of (2), it is safe to do nothing as the
// background task will take care of it. This is not optimal but (2) is expected to be very
// rare.
LOG(INFO) << "DDL transaction " << txn << " not found in list of transactions to be "
<< "verified, nothing to do";
return Status::OK();
}
for (const auto& table : iter->second) {
// Submit this table for transaction verification.
LOG(INFO) << "Enqueuing table " << table->ToString()
<< " for verification of DDL transaction: " << txn;
WARN_NOT_OK(background_tasks_thread_pool_->SubmitFunc([this, table, req_txn, is_committed]() {
WARN_NOT_OK(YsqlDdlTxnCompleteCallback(table, req_txn, is_committed),
"Transaction verification failed for table " + table->ToString());
}), "Could not submit YsqlDdlTxnCompleteCallback to thread pool");
}
}
return Status::OK();
}

Expand Down
22 changes: 21 additions & 1 deletion src/yb/master/catalog_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ typedef std::unordered_map<NamespaceId, HybridTime> XClusterNamespaceToSafeTimeM

constexpr int32_t kInvalidClusterConfigVersion = 0;

using DdlTxnIdToTablesMap =
std::unordered_map<TransactionId, std::vector<scoped_refptr<TableInfo>>, TransactionIdHash>;

// The component of the master which tracks the state and location
// of tables/tablets in the cluster.
//
Expand Down Expand Up @@ -363,7 +366,8 @@ class CatalogManager : public tserver::TabletPeerLookupIf,
Result<NamespaceId> GetTableNamespaceId(TableId table_id) EXCLUDES(mutex_);

void ScheduleYsqlTxnVerification(const scoped_refptr<TableInfo>& table,
const TransactionMetadata& txn);
const TransactionMetadata& txn)
EXCLUDES(ddl_txn_verifier_mutex_);

Status YsqlTableSchemaChecker(scoped_refptr<TableInfo> table,
const std::string& txn_id_pb,
Expand All @@ -373,6 +377,9 @@ class CatalogManager : public tserver::TabletPeerLookupIf,
const std::string& txn_id_pb,
bool success);

Status YsqlDdlTxnCompleteCallbackInternal(
TableInfo *table, const TransactionId& txn_id, bool success);

// Get the information about the specified table.
Status GetTableSchema(const GetTableSchemaRequestPB* req,
GetTableSchemaResponsePB* resp) override;
Expand Down Expand Up @@ -1025,6 +1032,11 @@ class CatalogManager : public tserver::TabletPeerLookupIf,

Status PromoteAutoFlags(const PromoteAutoFlagsRequestPB* req, PromoteAutoFlagsResponsePB* resp);

Status ReportYsqlDdlTxnStatus(
const ReportYsqlDdlTxnStatusRequestPB* req,
ReportYsqlDdlTxnStatusResponsePB* resp,
rpc::RpcContext* rpc);

protected:
// TODO Get rid of these friend classes and introduce formal interface.
friend class TableLoader;
Expand Down Expand Up @@ -2032,6 +2044,14 @@ class CatalogManager : public tserver::TabletPeerLookupIf,

rpc::ScheduledTaskTracker refresh_ysql_tablespace_info_task_;

// Guards ddl_txn_id_to_table_map_ below.
mutable MutexType ddl_txn_verifier_mutex_;

// This map stores the transaction ids of all the DDL transactions undergoing verification.
// For each transaction, it also stores pointers to the table info objects of the tables affected
// by that transaction.
DdlTxnIdToTablesMap ddl_txn_id_to_table_map_ GUARDED_BY(ddl_txn_verifier_mutex_);

ServerRegistrationPB server_registration_;

TabletSplitManager tablet_split_manager_;
Expand Down
15 changes: 15 additions & 0 deletions src/yb/master/master_ddl.proto
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,18 @@ message GetUDTypeInfoResponsePB {
optional UDTypeInfoPB udtype = 2;
}

message ReportYsqlDdlTxnStatusRequestPB {
// The transaction whose status is being reported.
optional bytes transaction_id = 1;

// Whether the above transaction is committed or aborted.
optional bool is_committed = 2;
}

message ReportYsqlDdlTxnStatusResponsePB {
optional MasterErrorPB error = 1;
}

service MasterDdl {
option (yb.rpc.custom_service_name) = "yb.master.MasterService";

Expand Down Expand Up @@ -681,4 +693,7 @@ service MasterDdl {
rpc DeleteUDType(DeleteUDTypeRequestPB) returns (DeleteUDTypeResponsePB);
rpc ListUDTypes(ListUDTypesRequestPB) returns (ListUDTypesResponsePB);
rpc GetUDTypeInfo(GetUDTypeInfoRequestPB) returns (GetUDTypeInfoResponsePB);

rpc ReportYsqlDdlTxnStatus(ReportYsqlDdlTxnStatusRequestPB) returns
(ReportYsqlDdlTxnStatusResponsePB);
}
1 change: 1 addition & 0 deletions src/yb/master/master_ddl_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class MasterDdlServiceImpl : public MasterServiceBase, public MasterDdlIf {
(ListTablegroups)
(ListTables)
(ListUDTypes)
(ReportYsqlDdlTxnStatus)
(TruncateTable)
)
};
Expand Down
Loading

0 comments on commit c1ef8c3

Please sign in to comment.