Skip to content

Commit

Permalink
[#22519] YSQL: Simplify API of the ExplicitRowLockBuffer class
Browse files Browse the repository at this point in the history
Summary:
This diff simplifies API of the `ExplicitRowLockBuffer` class by omitting `ybctid_reader` argument from `Add` and `Flush` methods.
The `ybctid_reader` is stored as an object field instead.

**Note:**
In context of this class several formatting changes are performed.
Jira: DB-11445

Test Plan: Jenkins

Reviewers: pjain, kramanathan, telgersma, myang

Reviewed By: kramanathan

Subscribers: ybase, yql

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D37467
  • Loading branch information
d-uspenskiy committed Sep 5, 2024
1 parent 1655e69 commit 16262f7
Show file tree
Hide file tree
Showing 10 changed files with 97 additions and 104 deletions.
6 changes: 3 additions & 3 deletions src/yb/util/lw_function.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) YugaByte, Inc.
// Copyright (c) YugaByteDB, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -27,7 +27,7 @@ template<class Ret, class... Args>
class LightweightFunction<Ret(Args...)> {
public:
Ret operator()(Args... args) const {
return Call(std::move(args)...);
return Call(std::forward<decltype(args)>(args)...);
}

protected:
Expand Down Expand Up @@ -71,7 +71,7 @@ class LightweightFunctionImpl : public LWFunction<Ret(Args...)> {

private:
Ret Call(Args... args) const override {
return Trans::Transform(functor_)(std::move(args)...);
return Trans::Transform(functor_)(std::forward<decltype(args)>(args)...);
}

const Func& functor_;
Expand Down
4 changes: 2 additions & 2 deletions src/yb/yql/pggate/pg_doc_metrics.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//--------------------------------------------------------------------------------------------------
// Copyright (c) YugaByte, Inc.
// Copyright (c) YugaByteDB, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -68,7 +68,7 @@ PgDocMetrics::DurationWatcher::~DurationWatcher() {
*duration_ = GetNow(use_zero_duration_, use_high_res_timer_) - start_;
}

PgDocMetrics::PgDocMetrics(YBCPgExecStatsState* state) : state_(*state) {}
PgDocMetrics::PgDocMetrics(YBCPgExecStatsState& state) : state_(state) {}

void PgDocMetrics::ReadRequest(TableType relation, uint64_t wait_time) {
IncRead(&GetStat(&state_, relation), wait_time);
Expand Down
4 changes: 2 additions & 2 deletions src/yb/yql/pggate/pg_doc_metrics.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//--------------------------------------------------------------------------------------------------
// Copyright (c) YugaByte, Inc.
// Copyright (c) YugaByteDB, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -33,7 +33,7 @@ YB_DEFINE_ENUM(TableType, (SYSTEM)(USER)(INDEX));

class PgDocMetrics {
public:
explicit PgDocMetrics(YBCPgExecStatsState* state);
explicit PgDocMetrics(YBCPgExecStatsState& state);

void ReadRequest(TableType relation, uint64_t wait_time);
void WriteRequest(TableType relation);
Expand Down
8 changes: 4 additions & 4 deletions src/yb/yql/pggate/pg_function.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//--------------------------------------------------------------------------------------------------
// Copyright (c) YugaByte, Inc.
// Copyright (c) YugaByteDB, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -74,12 +74,12 @@ PgFunctionParams::GetValueAndType(const std::string& name) const {
//--------------------------------------------------------------------------------------------------

Status PgFunction::AddParam(
const std::string name, const YBCPgTypeEntity* type_entity, uint64_t datum, bool is_null) {
const std::string& name, const YBCPgTypeEntity* type_entity, uint64_t datum, bool is_null) {
return params_.AddParam(name, type_entity, datum, is_null);
}

Status PgFunction::AddTarget(
const std::string name, const YBCPgTypeEntity* type_entity, const YBCPgTypeAttrs type_attrs) {
const std::string& name, const YBCPgTypeEntity* type_entity, const YBCPgTypeAttrs type_attrs) {
RETURN_NOT_OK(schema_builder_.AddColumn(name, ToLW(PersistentDataType(type_entity->yb_type))));
RETURN_NOT_OK(schema_builder_.SetColumnPGType(name, type_entity->type_oid));
return schema_builder_.SetColumnPGTypmod(name, type_attrs.typmod);
Expand Down Expand Up @@ -240,7 +240,7 @@ Result<std::vector<TransactionId>> GetDecodedBlockerTransactionIds(

Result<std::list<PgTableRow>> PgLockStatusRequestor(
const PgFunctionParams& params, const Schema& schema, const ReaderProjection& projection,
const scoped_refptr<PgSession> pg_session) {
const scoped_refptr<PgSession>& pg_session) {
std::string table_id;
const auto [relation, rel_null] = VERIFY_RESULT(params.GetParamValue<PgOid>("relation"));
if (!rel_null) {
Expand Down
8 changes: 4 additions & 4 deletions src/yb/yql/pggate/pg_function.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//--------------------------------------------------------------------------------------------------
// Copyright (c) YugaByte, Inc.
// Copyright (c) YugaByteDB, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -82,10 +82,10 @@ class PgFunction : public PgMemctx::Registrable {
virtual ~PgFunction() = default;

Status AddParam(
const std::string name, const YBCPgTypeEntity* type_entity, uint64_t datum, bool is_null);
const std::string& name, const YBCPgTypeEntity* type_entity, uint64_t datum, bool is_null);

Status AddTarget(
const std::string name, const YBCPgTypeEntity* type_entity, const YBCPgTypeAttrs type_attrs);
const std::string& name, const YBCPgTypeEntity* type_entity, const YBCPgTypeAttrs type_attrs);

Status FinalizeTargets();

Expand All @@ -109,7 +109,7 @@ class PgFunction : public PgMemctx::Registrable {

Result<std::list<dockv::PgTableRow>> PgLockStatusRequestor(
const PgFunctionParams& params, const Schema& schema,
const dockv::ReaderProjection& reader_projection, const scoped_refptr<PgSession> pg_session);
const dockv::ReaderProjection& reader_projection, const scoped_refptr<PgSession>& pg_session);

} // namespace pggate
} // namespace yb
53 changes: 33 additions & 20 deletions src/yb/yql/pggate/pg_session.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//--------------------------------------------------------------------------------------------------
// Copyright (c) YugaByte, Inc.
// Copyright (c) YugaByteDB, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -49,7 +49,6 @@
#include "yb/yql/pggate/pg_client.h"
#include "yb/yql/pggate/pg_expr.h"
#include "yb/yql/pggate/pg_op.h"
#include "yb/yql/pggate/pg_txn_manager.h"
#include "yb/yql/pggate/pggate_flags.h"
#include "yb/yql/pggate/ybc_pggate.h"

Expand Down Expand Up @@ -100,7 +99,7 @@ bool IsNeedTransaction(const PgsqlOp& op, bool non_ddl_txn_for_sys_tables_allowe
// Last statement inserts row with k = 1 and this is a single row transaction.
// But row with k = 1 already exists in the t table. As a result the
// 'duplicate key value violates unique constraint "t_pkey"' will be raised.
// But this error contains contraints name which is read from sys table pg_class (in case it
// But this error contains constraints name which is read from sys table pg_class (in case it
// is not yet in the postgres's cache). And this read from sys table will be performed in context
// of currently running transaction (single row) because the yb_non_ddl_txn_for_sys_tables_allowed
// GUC variable is true. As a result there will be 2 operations in context of single row
Expand Down Expand Up @@ -404,15 +403,16 @@ size_t TableYbctidHasher::operator()(const TableYbctid& value) const {
return (*this)(static_cast<LightweightTableYbctid>(value));
}

ExplicitRowLockBuffer::ExplicitRowLockBuffer(TableYbctidVectorProvider* ybctid_container_provider)
: ybctid_container_provider_(*ybctid_container_provider) {
ExplicitRowLockBuffer::ExplicitRowLockBuffer(
TableYbctidVectorProvider& ybctid_container_provider,
std::reference_wrapper<const YbctidReader> ybctid_reader)
: ybctid_container_provider_(ybctid_container_provider), ybctid_reader_(ybctid_reader) {
}

Status ExplicitRowLockBuffer::Add(
Info&& info, const LightweightTableYbctid& key, bool is_region_local,
const YbctidReader& reader) {
Info&& info, const LightweightTableYbctid& key, bool is_region_local) {
if (info_ && *info_ != info) {
RETURN_NOT_OK(DoFlush(reader));
RETURN_NOT_OK(DoFlush());
}
if (!info_) {
info_.emplace(std::move(info));
Expand All @@ -426,14 +426,14 @@ Status ExplicitRowLockBuffer::Add(
DCHECK(is_region_local || !region_local_tables_.contains(key.table_id));
intents_.emplace(key.table_id, std::string(key.ybctid));
return narrow_cast<int>(intents_.size()) >= yb_explicit_row_locking_batch_size
? DoFlush(reader) : Status::OK();
? DoFlush() : Status::OK();
}

Status ExplicitRowLockBuffer::Flush(const YbctidReader& reader) {
return IsEmpty() ? Status::OK() : DoFlush(reader);
Status ExplicitRowLockBuffer::Flush() {
return IsEmpty() ? Status::OK() : DoFlush();
}

Status ExplicitRowLockBuffer::DoFlush(const YbctidReader& reader) {
Status ExplicitRowLockBuffer::DoFlush() {
DCHECK(!IsEmpty());
auto scope = ScopeExit([this] { Clear(); });
auto ybctids = ybctid_container_provider_.Get();
Expand All @@ -443,7 +443,14 @@ Status ExplicitRowLockBuffer::DoFlush(const YbctidReader& reader) {
auto node = intents_.extract(it++);
ybctids->push_back(std::move(node.value()));
}
RETURN_NOT_OK(reader(&*ybctids, *info_, region_local_tables_));
RETURN_NOT_OK(ybctid_reader_(
info_->database_id, ybctids, region_local_tables_,
make_lw_function(
[&info = *info_](PgExecParameters& params) {
params.rowmark = info.rowmark;
params.pg_wait_policy = info.pg_wait_policy;
params.docdb_wait_policy = info.docdb_wait_policy;
})));
SCHECK(initial_intents_size == ybctids->size(), NotFound,
"Some of the requested ybctids are missing");
return Status::OK();
Expand All @@ -464,13 +471,15 @@ bool ExplicitRowLockBuffer::IsEmpty() const {
//--------------------------------------------------------------------------------------------------

PgSession::PgSession(
PgClient* pg_client,
PgClient& pg_client,
scoped_refptr<PgTxnManager> pg_txn_manager,
const YBCPgCallbacks& pg_callbacks,
YBCPgExecStatsState* stats_state)
: pg_client_(*pg_client),
YBCPgExecStatsState& stats_state,
YbctidReader&& ybctid_reader)
: pg_client_(pg_client),
pg_txn_manager_(std::move(pg_txn_manager)),
explicit_row_lock_buffer_(&aux_ybctid_container_provider_),
ybctid_reader_(std::move(ybctid_reader)),
explicit_row_lock_buffer_(aux_ybctid_container_provider_, ybctid_reader_),
metrics_(stats_state),
pg_callbacks_(pg_callbacks),
buffer_(
Expand Down Expand Up @@ -856,8 +865,8 @@ Result<PerformFuture> PgSession::Perform(BufferableOperations&& ops, PerformOpti
return PerformFuture(std::move(future), std::move(ops.relations));
}

Result<bool> PgSession::ForeignKeyReferenceExists(const LightweightTableYbctid& key,
const YbctidReader& reader) {
Result<bool> PgSession::ForeignKeyReferenceExists(
PgOid database_id, const LightweightTableYbctid& key) {
if (fk_reference_cache_.find(key) != fk_reference_cache_.end()) {
return true;
}
Expand Down Expand Up @@ -889,7 +898,11 @@ Result<bool> PgSession::ForeignKeyReferenceExists(const LightweightTableYbctid&
}

// Add the keys found in docdb to the FK cache.
RETURN_NOT_OK(reader(&ybctids, fk_intent_region_local_tables_));
RETURN_NOT_OK(ybctid_reader_(
database_id, ybctids, fk_intent_region_local_tables_,
make_lw_function([](PgExecParameters& params) {
params.rowmark = ROW_MARK_KEYSHARE;
})));
for (auto& ybctid : ybctids) {
fk_reference_cache_.insert(std::move(ybctid));
}
Expand Down
37 changes: 20 additions & 17 deletions src/yb/yql/pggate/pg_session.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) YugaByte, Inc.
// Copyright (c) YugaByteDB, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
Expand All @@ -13,6 +13,7 @@

#pragma once

#include <functional>
#include <optional>
#include <string>
#include <unordered_map>
Expand Down Expand Up @@ -50,9 +51,6 @@ YB_STRONGLY_TYPED_BOOL(InvalidateOnPgClient);
YB_STRONGLY_TYPED_BOOL(UseCatalogSession);
YB_STRONGLY_TYPED_BOOL(ForceNonBufferable);

class PgTxnManager;
class PgSession;

struct LightweightTableYbctid {
LightweightTableYbctid(PgOid table_id_, const std::string_view& ybctid_)
: table_id(table_id_), ybctid(ybctid_) {}
Expand Down Expand Up @@ -134,6 +132,11 @@ class TableYbctidVectorProvider {
TableYbctidVector container_;
};

using ExecParametersMutator = LWFunction<void(PgExecParameters&)>;

using YbctidReader =
std::function<Status(PgOid, TableYbctidVector&, const OidSet&, const ExecParametersMutator&)>;

class ExplicitRowLockBuffer {
public:
struct Info {
Expand All @@ -145,20 +148,20 @@ class ExplicitRowLockBuffer {
friend bool operator==(const Info&, const Info&) = default;
};

using YbctidReader = LWFunction<Status(TableYbctidVector*, const Info&, const OidSet&)>;

explicit ExplicitRowLockBuffer(TableYbctidVectorProvider* ybctid_container_provider);
ExplicitRowLockBuffer(
TableYbctidVectorProvider& ybctid_container_provider,
std::reference_wrapper<const YbctidReader> ybctid_reader);
Status Add(
Info&& info, const LightweightTableYbctid& key, bool is_region_local,
const YbctidReader& reader);
Status Flush(const YbctidReader& reader);
Info&& info, const LightweightTableYbctid& key, bool is_region_local);
Status Flush();
void Clear();
bool IsEmpty() const;

private:
Status DoFlush(const YbctidReader& reader);
Status DoFlush();

TableYbctidVectorProvider& ybctid_container_provider_;
const YbctidReader& ybctid_reader_;
TableYbctidSet intents_;
OidSet region_local_tables_;
std::optional<Info> info_;
Expand All @@ -173,11 +176,12 @@ class PgSession : public RefCountedThreadSafe<PgSession> {

// Constructors.
PgSession(
PgClient* pg_client,
PgClient& pg_client,
scoped_refptr<PgTxnManager> pg_txn_manager,
const YBCPgCallbacks& pg_callbacks,
YBCPgExecStatsState* stats_state);
virtual ~PgSession();
YBCPgExecStatsState& stats_state,
YbctidReader&& ybctid_reader);
~PgSession();

// Resets the read point for catalog tables.
// Next catalog read operation will read the very latest catalog's state.
Expand Down Expand Up @@ -341,9 +345,7 @@ class PgSession : public RefCountedThreadSafe<PgSession> {
// Check if initdb has already been run before. Needed to make initdb idempotent.
Result<bool> IsInitDbDone();

using YbctidReader = LWFunction<Status(TableYbctidVector*, const OidSet&)>;
Result<bool> ForeignKeyReferenceExists(
const LightweightTableYbctid& key, const YbctidReader& reader);
Result<bool> ForeignKeyReferenceExists(PgOid database_id, const LightweightTableYbctid& key);
void AddForeignKeyReferenceIntent(const LightweightTableYbctid& key, bool is_region_local);
void AddForeignKeyReference(const LightweightTableYbctid& key);

Expand Down Expand Up @@ -445,6 +447,7 @@ class PgSession : public RefCountedThreadSafe<PgSession> {

CoarseTimePoint invalidate_table_cache_time_;
std::unordered_map<PgObjectId, PgTableDescPtr, PgObjectIdHash> table_cache_;
const YbctidReader ybctid_reader_;
TableYbctidSet fk_reference_cache_;
TableYbctidSet fk_reference_intent_;
OidSet fk_intent_region_local_tables_;
Expand Down
Loading

0 comments on commit 16262f7

Please sign in to comment.