Skip to content

Commit

Permalink
[#3979] Add Transaction Cleanup to Catalog Manager Create DDLs
Browse files Browse the repository at this point in the history
Summary:
Postgres now creates a Transaction ID for the DDL operation and sends it to yb-master on CREATE.   This enables the following creation flow:

1. Create Transaction ID in Postgres
2. Create in yb-master
3. Postgres Writes its system table data & Commits/Aborts
4. Async thread in yb-master observes the Transaction result and performs a rollback on abort.

We query the Transaction Coordinator for this information because yb-master may start its query on Step #4 before Postgres flushes any actual system table writes to the Transaction Participant.  This is a best-effort cleanup, so we only rollback the entry if we know that the Transaction failed and keep it around if the async job encounters unknown errors.  Can toggle feature with 'FLAGS_enable_transactional_ddl_gc'

Test Plan: ybd --cxx-test pg_libpq-test --gtest_filter PgLibPq*TimeoutTest.* --test-timeout-sec 60

Reviewers: mihnea, bogdan, hector, amitanand, sergei, mikhail

Reviewed By: mikhail

Subscribers: ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D8821
  • Loading branch information
nspiegelberg committed Nov 5, 2020
1 parent 220bd5e commit 88e9d59
Show file tree
Hide file tree
Showing 28 changed files with 716 additions and 44 deletions.
8 changes: 7 additions & 1 deletion src/postgres/src/backend/utils/misc/pg_yb_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -904,9 +904,15 @@ YBDecrementDdlNestingLevel(bool success,
/*
* At this point we have already applied the DDL in the YSQL layer and
* executing the postponed DocDB statement is not strictly required.
* Ignore 'NotFound' because DocDB might already notice applied DDL.
* See comment for YBGetDdlHandles in xact.h for more details.
*/
HandleYBStatusAtErrorLevel(YBCPgExecPostponedDdlStmt(handle), WARNING);
YBCStatus status = YBCPgExecPostponedDdlStmt(handle);
if (YBCStatusIsNotFound(status)) {
YBCFreeStatus(status);
} else {
HandleYBStatusAtErrorLevel(status, WARNING);
}
}
YBClearDdlHandles();
}
Expand Down
9 changes: 4 additions & 5 deletions src/postgres/src/include/access/xact.h
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ extern void YBMarkDataNotSent(void);
extern bool YBIsDataSent(void);

/*
* Utilities for postponed pggate DDL statement handles, that should be
* Utilities for postponed pggate DDL statement handles, that can be
* executed after the YSQL DDL transaction has commited. To qualify for this
* the DDL must have the following properties:
* 1. It cannot be rolled back by abort (so we wait for commit to succeed).
Expand All @@ -456,10 +456,9 @@ extern bool YBIsDataSent(void);
* which we cannot roll back, and also does not cause inconsistency if it fails
* after YSQL-layer deleted the metadata entry for that object (because we do
* not reuse oids/uuids -- so objects remain simply orphaned/unused).
* Note: The expectation is that orphaned DocDB objects will be cleaned
* up by a DocDB (catalog manager) background-cleanup job. This would
* eventually also roll back failed (online) alter operations. Though as
* of 08/2020 this is not yet done (#3979).
* Note: Orphaned DocDB objects will be best-effort cleaned by a DocDB (catalog
* manager) background-cleanup job. This would eventually also roll back
* failed (online) alter operations (#3979).
*/
extern void YBSaveDdlHandle(YBCPgStatement handle);
extern List* YBGetDdlHandles(void);
Expand Down
1 change: 1 addition & 0 deletions src/yb/client/client-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2229,6 +2229,7 @@ TEST_F(ClientTest, GetNamespaceInfo) {
kPgsqlKeyspaceID,
"" /* source_namespace_id */,
boost::none /* next_pg_oid */,
boost::none /* txn */,
true /* colocated */));

// CQL non-colocated.
Expand Down
6 changes: 5 additions & 1 deletion src/yb/client/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,7 @@ Status YBClient::CreateNamespace(const std::string& namespace_name,
const std::string& namespace_id,
const std::string& source_namespace_id,
const boost::optional<uint32_t>& next_pg_oid,
const boost::optional<TransactionMetadata>& txn,
const bool colocated) {
CreateNamespaceRequestPB req;
CreateNamespaceResponsePB resp;
Expand All @@ -728,6 +729,9 @@ Status YBClient::CreateNamespace(const std::string& namespace_name,
if (next_pg_oid) {
req.set_next_pg_oid(*next_pg_oid);
}
if (txn) {
txn->ToPB(req.mutable_transaction());
}
req.set_colocated(colocated);
auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
Status s = data_->SyncLeaderMasterRpc<CreateNamespaceRequestPB, CreateNamespaceResponsePB>(
Expand Down Expand Up @@ -763,7 +767,7 @@ Status YBClient::CreateNamespaceIfNotExists(const std::string& namespace_name,
}

Status s = CreateNamespace(namespace_name, database_type, creator_role_name, namespace_id,
source_namespace_id, next_pg_oid, colocated);
source_namespace_id, next_pg_oid, boost::none /* txn */, colocated);
if (s.IsAlreadyPresent() && database_type && *database_type == YQLDatabase::YQL_DATABASE_CQL) {
return Status::OK();
}
Expand Down
2 changes: 2 additions & 0 deletions src/yb/client/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include "yb/client/client_fwd.h"
#include "yb/client/schema.h"
#include "yb/common/common.pb.h"
#include "yb/common/transaction.h"
#include "yb/common/wire_protocol.h"

#ifdef YB_HEADERS_NO_STUBS
Expand Down Expand Up @@ -372,6 +373,7 @@ class YBClient {
const std::string& namespace_id = "",
const std::string& source_namespace_id = "",
const boost::optional<uint32_t>& next_pg_oid = boost::none,
const boost::optional<TransactionMetadata>& txn = boost::none,
const bool colocated = false);

// It calls CreateNamespace(), but before it checks that the namespace has NOT been yet
Expand Down
6 changes: 3 additions & 3 deletions src/yb/client/ql-transaction-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1126,7 +1126,7 @@ TEST_F_EX(QLTransactionTest, CorrectStatusRequestBatching, QLTransactionBigLogSe

struct TransactionState {
YBTransactionPtr transaction;
std::shared_future<TransactionMetadata> metadata_future;
std::shared_future<Result<TransactionMetadata>> metadata_future;
std::future<Status> commit_future;
std::future<Result<tserver::GetTransactionStatusResponsePB>> status_future;
TransactionMetadata metadata;
Expand Down Expand Up @@ -1195,7 +1195,7 @@ TEST_F(QLTransactionTest, StatusEvolution) {
// Insert using different keys to avoid conflicts.
ASSERT_OK(WriteRow(session, states.size(), states.size()));
}
states.push_back({ txn, txn->TEST_GetMetadata() });
states.push_back({ txn, txn->GetMetadata() });
++active_transactions;
--transactions_to_create;
}
Expand All @@ -1221,7 +1221,7 @@ TEST_F(QLTransactionTest, StatusEvolution) {
if (!IsReady(state.metadata_future)) {
continue;
}
state.metadata = state.metadata_future.get();
state.metadata = ASSERT_RESULT(state.metadata_future.get());
}
tserver::GetTransactionStatusRequestPB req;
req.set_tablet_id(state.metadata.status_tablet);
Expand Down
9 changes: 9 additions & 0 deletions src/yb/client/table_creator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ YBTableCreator& YBTableCreator::schema(const YBSchema* schema) {
return *this;
}

YBTableCreator& YBTableCreator::part_of_transaction(const TransactionMetadata* txn) {
txn_ = txn;
return *this;
}

YBTableCreator& YBTableCreator::add_hash_partitions(const std::vector<std::string>& columns,
int32_t num_buckets) {
return add_hash_partitions(columns, num_buckets, 0);
Expand Down Expand Up @@ -237,6 +242,10 @@ Status YBTableCreator::Create() {

SchemaToPB(internal::GetSchema(*schema_), req.mutable_schema());

if (txn_) {
txn_->ToPB(req.mutable_transaction());
}

// Setup the number splits (i.e. number of splits).
if (num_tablets_ > 0) {
VLOG(1) << "num_tablets: number of tablets explicitly specified: " << num_tablets_;
Expand Down
7 changes: 7 additions & 0 deletions src/yb/client/table_creator.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include "yb/master/master.pb.h"

namespace yb {
struct TransactionMetadata;

namespace client {

// Creates a new table with the desired options.
Expand Down Expand Up @@ -58,6 +60,9 @@ class YBTableCreator {
// the lifetime of the builder. Required.
YBTableCreator& schema(const YBSchema* schema);

// The creation of this table is dependent upon the success of this higher-level transaction.
YBTableCreator& part_of_transaction(const TransactionMetadata* txn);

// Adds a set of hash partitions to the table.
//
// For each set of hash partitions added to the table, the total number of
Expand Down Expand Up @@ -181,6 +186,8 @@ class YBTableCreator {

bool colocated_ = true;

const TransactionMetadata * txn_ = nullptr;

// The tablegroup id to assign (if a table is in a tablegroup).
std::string tablegroup_id_;

Expand Down
25 changes: 15 additions & 10 deletions src/yb/client/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -406,23 +406,28 @@ class YBTransaction::Impl final {
return read_point_.IsRestartRequired();
}

std::shared_future<TransactionMetadata> TEST_GetMetadata() {
std::shared_future<Result<TransactionMetadata>> GetMetadata() {
std::unique_lock<std::mutex> lock(mutex_);
if (metadata_future_.valid()) {
return metadata_future_;
}
metadata_future_ = std::shared_future<TransactionMetadata>(metadata_promise_.get_future());
metadata_future_ = std::shared_future<Result<TransactionMetadata>>(
metadata_promise_.get_future());
if (!ready_) {
auto transaction = transaction_->shared_from_this();
waiters_.push_back([this, transaction](const Status& status) {
// OK to crash here, because we are in test
CHECK_OK(status);
metadata_promise_.set_value(metadata_);
WARN_NOT_OK(status, "Transaction request failed");
if (status.ok()) {
metadata_promise_.set_value(metadata_);
} else {
metadata_promise_.set_value(status);
}
});
lock.unlock();
RequestStatusTablet(TransactionRpcDeadline());
} else {
metadata_promise_.set_value(metadata_);
}
metadata_promise_.set_value(metadata_);
return metadata_future_;
}

Expand Down Expand Up @@ -1010,8 +1015,8 @@ class YBTransaction::Impl final {
std::mutex mutex_;
TabletStates tablets_;
std::vector<Waiter> waiters_;
std::promise<TransactionMetadata> metadata_promise_;
std::shared_future<TransactionMetadata> metadata_future_;
std::promise<Result<TransactionMetadata>> metadata_promise_;
std::shared_future<Result<TransactionMetadata>> metadata_future_;
size_t running_requests_ = 0;
// Set to true after commit record is replicated. Used only during transaction sealing.
bool commit_replicated_ = false;
Expand Down Expand Up @@ -1139,8 +1144,8 @@ Result<ChildTransactionResultPB> YBTransaction::FinishChild() {
return impl_->FinishChild();
}

std::shared_future<TransactionMetadata> YBTransaction::TEST_GetMetadata() const {
return impl_->TEST_GetMetadata();
std::shared_future<Result<TransactionMetadata>> YBTransaction::GetMetadata() const {
return impl_->GetMetadata();
}

Status YBTransaction::ApplyChildResult(const ChildTransactionResultPB& result) {
Expand Down
2 changes: 1 addition & 1 deletion src/yb/client/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ class YBTransaction : public std::enable_shared_from_this<YBTransaction> {
// `result` should be prepared with FinishChild of child transaction.
CHECKED_STATUS ApplyChildResult(const ChildTransactionResultPB& result);

std::shared_future<TransactionMetadata> TEST_GetMetadata() const;
std::shared_future<Result<TransactionMetadata>> GetMetadata() const;

std::string ToString() const;

Expand Down
2 changes: 2 additions & 0 deletions src/yb/common/entity_ids.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ static const uint32_t kPgSequencesDataTableOid = 0xFFFF;
static const uint32_t kPgSequencesDataDatabaseOid = 0xFFFF;

static const uint32_t kPgIndexTableOid = 2610; // Hardcoded for pg_index. (in pg_index.h)
static const uint32_t kPgClassTableOid = 1259; // Hardcoded for pg_class. (in pg_class.h)
static const uint32_t kPgDatabaseTableOid = 1262; // Hardcoded for pg_database. (in pg_database.h)

extern const TableId kPgProcTableId;
extern const TableId kPgYbCatalogVersionTableId;
Expand Down
9 changes: 6 additions & 3 deletions src/yb/integration-tests/create-table-itest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,9 @@ TEST_F(CreateTableITest, TableColocationRemoteBootstrapTest) {
ts_flags.push_back("--follower_unavailable_considered_failed_sec=3");
ASSERT_NO_FATALS(StartCluster(ts_flags, master_flags, kNumReplicas));
ASSERT_OK(
client_->CreateNamespace("colocation_test", boost::none, "", "", "", boost::none, true));
client_->CreateNamespace("colocation_test", boost::none /* db */, "" /* creator */,
"" /* ns_id */, "" /* src_ns_id */,
boost::none /* next_pg_oid */, boost::none /* txn */, true));

{
string ns_id;
Expand Down Expand Up @@ -426,8 +428,9 @@ TEST_F(CreateTableITest, TablegroupRemoteBootstrapTest) {
ts_flags.push_back("--ysql_beta_feature_tablegroup=true");
ASSERT_NO_FATALS(StartCluster(ts_flags, master_flags, kNumReplicas));

ASSERT_OK(
client_->CreateNamespace(namespace_name, YQL_DATABASE_PGSQL, "", "", "", boost::none, false));
ASSERT_OK(client_->CreateNamespace(namespace_name, YQL_DATABASE_PGSQL, "" /* creator */,
"" /* ns_id */, "" /* src_ns_id */,
boost::none /* next_pg_oid */, boost::none /* txn */, false));

{
auto namespaces = ASSERT_RESULT(client_->ListNamespaces(boost::none));
Expand Down
1 change: 1 addition & 0 deletions src/yb/master/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ set(MASTER_SRCS
yql_types_vtable.cc
yql_views_vtable.cc
yql_partitions_vtable.cc
ysql_transaction_ddl.cc
${MASTER_SRCS_EXTENSIONS})

add_library(master ${MASTER_SRCS})
Expand Down
30 changes: 30 additions & 0 deletions src/yb/master/catalog_loaders.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ DEFINE_bool(master_ignore_deleted_on_load, true,
namespace yb {
namespace master {

using namespace std::placeholders;

////////////////////////////////////////////////////////////
// Table Loader
////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -74,6 +76,19 @@ Status TableLoader::Visit(const TableId& table_id, const SysTablesEntryPB& metad
l->Commit();
catalog_manager_->HandleNewTableId(table->id());

// Tables created as part of a Transaction should check transaction status and be deleted
// if the transaction is aborted.
if (metadata.has_transaction()) {
LOG(INFO) << "Enqueuing table for Transaction Verification: " << table->ToString();
TransactionMetadata txn = VERIFY_RESULT(TransactionMetadata::FromPB(metadata.transaction()));
std::function<Status(bool)> when_done =
std::bind(&CatalogManager::VerifyTablePgLayer, catalog_manager_, table, _1);
WARN_NOT_OK(catalog_manager_->background_tasks_thread_pool_->SubmitFunc(
std::bind(&YsqlTransactionDdl::VerifyTransaction, &catalog_manager_->ysql_transaction_,
txn, when_done)),
"Could not submit VerifyTransaction to thread pool");
}

LOG(INFO) << "Loaded metadata for table " << table->ToString();
VLOG(1) << "Metadata for table " << table->ToString() << ": " << metadata.ShortDebugString();

Expand Down Expand Up @@ -254,6 +269,20 @@ Status NamespaceLoader::Visit(const NamespaceId& ns_id, const SysNamespaceEntryP
}
l->Commit();
LOG(INFO) << "Loaded metadata for namespace " << ns->ToString();

// Namespaces created as part of a Transaction should check transaction status and be deleted
// if the transaction is aborted.
if (metadata.has_transaction()) {
LOG(INFO) << "Enqueuing keyspace for Transaction Verification: " << ns->ToString();
TransactionMetadata txn = VERIFY_RESULT(
TransactionMetadata::FromPB(metadata.transaction()));
std::function<Status(bool)> when_done =
std::bind(&CatalogManager::VerifyNamespacePgLayer, catalog_manager_, ns, _1);
WARN_NOT_OK(catalog_manager_->background_tasks_thread_pool_->SubmitFunc(
std::bind(&YsqlTransactionDdl::VerifyTransaction, &catalog_manager_->ysql_transaction_,
txn, when_done)),
"Could not submit VerifyTransaction to thread pool");
}
break;
case SysNamespaceEntryPB::PREPARING:
// PREPARING means the server restarted before completing NS creation.
Expand All @@ -273,6 +302,7 @@ Status NamespaceLoader::Visit(const NamespaceId& ns_id, const SysNamespaceEntryP
}
l->Commit();
LOG(INFO) << "Loaded metadata to DELETE namespace " << ns->ToString();
LOG_IF(DFATAL, ns->database_type() != YQL_DATABASE_PGSQL) << "PGSQL Databases only";
WARN_NOT_OK(catalog_manager_->background_tasks_thread_pool_->SubmitFunc(
std::bind(&CatalogManager::DeleteYsqlDatabaseAsync, catalog_manager_, ns)),
"Could not submit DeleteYsqlDatabaseAsync to thread pool");
Expand Down
Loading

0 comments on commit 88e9d59

Please sign in to comment.