Skip to content

Commit

Permalink
formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
rystsov committed Jul 27, 2023
1 parent ec3c7a2 commit b67a2ad
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 51 deletions.
4 changes: 1 addition & 3 deletions src/v/cluster/tm_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,7 @@ class tm_stm final : public persisted_stm<> {

ss::future<std::optional<size_t>> tx_cache_size();

mutex& get_cache_control_lock() {
return _cache_control_lock;
}
mutex& get_cache_control_lock() { return _cache_control_lock; }

std::optional<tm_transaction> oldest_log_transaction();

Expand Down
19 changes: 9 additions & 10 deletions src/v/cluster/tm_stm_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ void tm_stm_cache::set_log(tm_transaction tx) {
auto [tx_it, inserted] = _log_txes.try_emplace(tx.id, tx);
if (!inserted) {
tx_it->second.tx = tx;
}
}
unlink_lru_tx(tx_it->second);
lru_txes.push_back(tx_it->second);

Expand All @@ -158,7 +158,7 @@ void tm_stm_cache::erase_log(kafka::transactional_id tx_id) {
tx.etag,
tx.pid,
tx.tx_seq);

unlink_lru_tx(tx_it->second);
_log_txes.erase(tx_it);
}
Expand All @@ -171,7 +171,8 @@ fragmented_vector<tm_transaction> tm_stm_cache::get_log_transactions() {
return txes;
}

void tm_stm_cache::set_mem(model::term_id term, kafka::transactional_id tx_id, tm_transaction tx) {
void tm_stm_cache::set_mem(
model::term_id term, kafka::transactional_id tx_id, tm_transaction tx) {
auto entry_it = _state.find(term);
if (entry_it == _state.end()) {
_state[term] = tm_stm_cache_entry{.term = term};
Expand Down Expand Up @@ -203,9 +204,9 @@ void tm_stm_cache::clear_log() {
}
_log_txes.clear();
vassert(
lru_txes.size() == 0,
"Unexpected entries in the lru tx list {}",
lru_txes.size());
lru_txes.size() == 0,
"Unexpected entries in the lru tx list {}",
lru_txes.size());
}

void tm_stm_cache::erase_mem(kafka::transactional_id tx_id) {
Expand Down Expand Up @@ -267,7 +268,7 @@ std::deque<tm_transaction> tm_stm_cache::checkpoint() {

auto can_transfer = [](const tm_transaction& tx) {
return !tx.transferring
&& (tx.status == tm_transaction::ready || tx.status == tm_transaction::ongoing);
&& (tx.status == tm_transaction::ready || tx.status == tm_transaction::ongoing);
};
// Loop through all ongoing/pending txns in memory and checkpoint.

Expand All @@ -288,8 +289,6 @@ std::optional<tm_transaction> tm_stm_cache::oldest_log_transaction() {
return lru_txes.front().tx;
}

size_t tm_stm_cache::tx_cache_size() {
return lru_txes.size();
}
size_t tm_stm_cache::tx_cache_size() { return lru_txes.size(); }

} // namespace cluster
6 changes: 2 additions & 4 deletions src/v/cluster/tm_stm_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
#include "raft/types.h"
#include "storage/snapshot.h"
#include "utils/expiring_promise.h"
#include "utils/mutex.h"
#include "utils/intrusive_list_helpers.h"
#include "utils/mutex.h"

#include <absl/container/btree_set.h>
#include <absl/container/flat_hash_map.h>
Expand Down Expand Up @@ -245,9 +245,7 @@ class tm_stm_cache {
struct tx_wrapper {
tx_wrapper() {}

tx_wrapper(const tm_transaction& tx) {
this->tx = tx;
}
tx_wrapper(const tm_transaction& tx) { this->tx = tx; }

tm_transaction tx;
safe_intrusive_list_hook _hook;
Expand Down
88 changes: 55 additions & 33 deletions src/v/cluster/tx_gateway_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,7 @@ tx_gateway_frontend::tx_gateway_frontend(
config::shard_local_cfg().metadata_dissemination_retry_delay_ms.value())
, _transactional_id_expiration(
config::shard_local_cfg().transactional_id_expiration_ms.value())
, _transactions_enabled(
config::shard_local_cfg().enable_transactions.value())
, _transactions_enabled(config::shard_local_cfg().enable_transactions.value())
, _max_transactions_per_coordinator(max_transactions_per_coordinator) {
/**
* do not start expriry timer when transactions are disabled
Expand Down Expand Up @@ -1096,12 +1095,14 @@ ss::future<cluster::init_tm_tx_reply> tx_gateway_frontend::init_tm_tx_locally(
transaction_timeout_ms,
expected_pid,
timeout](ss::basic_rwlock<>::holder unit) {
return self.limit_init_tm_tx(
stm,
tx_id,
transaction_timeout_ms,
timeout,
expected_pid).finally([u = std::move(unit)] {});
return self
.limit_init_tm_tx(
stm,
tx_id,
transaction_timeout_ms,
timeout,
expected_pid)
.finally([u = std::move(unit)] {});
});
});
});
Expand Down Expand Up @@ -1210,67 +1211,88 @@ ss::future<cluster::init_tm_tx_reply> tx_gateway_frontend::limit_init_tm_tx(
std::chrono::milliseconds transaction_timeout_ms,
model::timeout_clock::duration timeout,
model::producer_identity expected_pid) {

size_t tx_cache_size;
if (auto opt = co_await stm->tx_cache_size(); opt) {
tx_cache_size = opt.value();
} else {
vlog(txlog.trace, "can't get tx_cache_size on initializing tx.id={}", tx_id);
vlog(
txlog.trace,
"can't get tx_cache_size on initializing tx.id={}",
tx_id);
co_return init_tm_tx_reply{tx_errc::not_coordinator};
}

if (tx_cache_size > _max_transactions_per_coordinator()) {
// lock is sloppy and doesn't guarantee that tx_cache_size
// never exceeds _max_transactions_per_coordinator
auto init_units = co_await stm->get_cache_control_lock().get_units();

if (auto opt = co_await stm->tx_cache_size(); opt) {
tx_cache_size = opt.value();
} else {
vlog(txlog.trace, "can't get tx_cache_size on initializing tx.id={}", tx_id);
vlog(
txlog.trace,
"can't get tx_cache_size on initializing tx.id={}",
tx_id);
co_return init_tm_tx_reply{tx_errc::not_coordinator};
}
// similar to double-checked locking pattern
// it protects concurrent access to oldest_log_transaction
if (tx_cache_size > _max_transactions_per_coordinator()) {
if (auto tx_opt = stm->oldest_log_transaction(); tx_opt) {
auto tx = tx_opt.value();
vlog(txlog.info, "tx cache is at capacity; expiring oldest tx with id:{}", tx.id);
vlog(
txlog.info,
"tx cache is at capacity; expiring oldest tx with id:{}",
tx.id);
auto tx_lock = stm->get_tx_lock(tx.id);
auto tx_units = co_await tx_lock->get_units();
vlog(txlog.trace, "got_lock name:init_tm_tx, tx_id:{}", tx.id);
auto ec = co_await do_expire_old_tx(stm, tx.id, config::shard_local_cfg().create_topic_timeout_ms(), true);
auto ec = co_await do_expire_old_tx(
stm,
tx.id,
config::shard_local_cfg().create_topic_timeout_ms(),
true);
if (ec != tx_errc::none) {
vlog(txlog.trace, "do_expire_old_tx with tx_id={} returned ec={}", tx.id, ec);
vlog(
txlog.trace,
"do_expire_old_tx with tx_id={} returned ec={}",
tx.id,
ec);
tx_units.return_all();
vlog(txlog.trace, "released_lock name:init_tm_tx, tx_id:{}", tx.id);
vlog(
txlog.trace,
"released_lock name:init_tm_tx, tx_id:{}",
tx.id);
stm->try_rm_lock(tx.id);
co_return init_tm_tx_reply{tx_errc::not_coordinator};
}
tx_units.return_all();
vlog(txlog.trace, "released_lock name:init_tm_tx, tx_id:{}", tx.id);
vlog(
txlog.trace,
"released_lock name:init_tm_tx, tx_id:{}",
tx.id);
stm->try_rm_lock(tx.id);
} else {
vlog(txlog.warn, "oldest_log_transaction shouldn't return empty when tx cache is at capacity");
vlog(
txlog.warn,
"oldest_log_transaction shouldn't return empty when tx cache "
"is at capacity");
co_return init_tm_tx_reply{tx_errc::not_coordinator};
}
}

init_units.return_all();
}

auto lock = stm->get_tx_lock(tx_id);
auto units = co_await lock->get_units();
vlog(txlog.trace, "got_lock name:init_tm_tx, tx_id:{}", tx_id);
init_tm_tx_reply r;
try {
r = co_await do_init_tm_tx(
stm,
tx_id,
transaction_timeout_ms,
timeout,
expected_pid);
} catch(...) {
stm, tx_id, transaction_timeout_ms, timeout, expected_pid);
} catch (...) {
units.return_all();
vlog(txlog.trace, "released_lock name:init_tm_tx, tx_id:{}", tx_id);
stm->try_rm_lock(tx_id);
Expand Down Expand Up @@ -3169,7 +3191,11 @@ ss::future<> tx_gateway_frontend::expire_old_tx(
ss::shared_ptr<tm_stm> stm, kafka::transactional_id tx_id) {
return with(stm, tx_id, "expire_old_tx", [this, stm, tx_id]() {
return do_expire_old_tx(
stm, tx_id, config::shard_local_cfg().create_topic_timeout_ms(), false).discard_result();
stm,
tx_id,
config::shard_local_cfg().create_topic_timeout_ms(),
false)
.discard_result();
});
}

Expand Down Expand Up @@ -3222,12 +3248,8 @@ ss::future<tx_errc> tx_gateway_frontend::do_expire_old_tx(
r = co_await do_abort_tm_tx(term, stm, tx, timeout);
}
if (!r.has_value()) {
vlog(
txlog.warn,
"got error {} on aborting tx.id={}",
r.error(),
tx_id);

vlog(txlog.warn, "got error {} on aborting tx.id={}", r.error(), tx_id);

co_return r.error();
}

Expand Down
3 changes: 2 additions & 1 deletion src/v/redpanda/application.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1513,7 +1513,8 @@ void application::wire_up_redpanda_services(model::node_id node_id) {
std::ref(feature_table),
std::ref(tm_stm_cache_manager),
ss::sharded_parameter([] {
return config::shard_local_cfg().max_transactions_per_coordinator.bind();
return config::shard_local_cfg()
.max_transactions_per_coordinator.bind();
}))
.get();
_kafka_conn_quotas
Expand Down

0 comments on commit b67a2ad

Please sign in to comment.