Skip to content

Commit

Permalink
feat(server): remove multi shard sync from replication (#3085)
Browse files Browse the repository at this point in the history
feat server: remove multi shard sync from replication

Signed-off-by: adi_holden <[email protected]>
  • Loading branch information
adiholden authored May 27, 2024
1 parent 3474eeb commit c45f7bf
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 164 deletions.
28 changes: 12 additions & 16 deletions .github/actions/regression-tests/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,25 +60,21 @@ runs:
# used by PyTests
export DRAGONFLY_PATH="${GITHUB_WORKSPACE}/${{inputs.build-folder-name}}/${{inputs.dfly-executable}}"
run_pytest_with_args() {
timeout 20m pytest -m "${{inputs.filter}}" --durations=10 --color=yes --json-report \
--json-report-file=rep1_report.json dragonfly/replication_test.py --log-cli-level=INFO \
--df alsologtostderr $1 $2 || code=$?
# timeout returns 124 if we exceeded the timeout duration
if [[ $code -eq 124 ]]; then
echo "TIMEDOUT=1">> "$GITHUB_OUTPUT"
exit 1
fi
timeout 20m pytest -m "${{inputs.filter}}" --durations=10 --color=yes --json-report \
--json-report-file=rep1_report.json dragonfly/replication_test.py --log-cli-level=INFO \
--df alsologtostderr $1 $2 || code=$?
# when a test fails in pytest it returns 1 but there are other return codes as well so we just check if the code is non zero
if [[ $code -ne 0 ]]; then
exit 1
fi
}
# timeout returns 124 if we exceeded the timeout duration
if [[ $code -eq 124 ]]; then
echo "TIMEDOUT=1">> "$GITHUB_OUTPUT"
exit 1
fi
(run_pytest_with_args --df enable_multi_shard_sync=true)
(run_pytest_with_args --df enable_multi_shard_sync=false)
# when a test fails in pytest it returns 1 but there are other return codes as well so we just check if the code is non zero
if [[ $code -ne 0 ]]; then
exit 1
fi
- name: Print last log on timeout
if: failure()
Expand Down
6 changes: 3 additions & 3 deletions src/server/cluster/incoming_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class ClusterShardMigration {
socket_ = nullptr;
});
JournalReader reader{source, 0};
TransactionReader tx_reader{false};
TransactionReader tx_reader;

while (!cntx->IsCancelled()) {
auto tx_data = tx_reader.NextTxData(&reader, cntx);
Expand Down Expand Up @@ -89,10 +89,10 @@ class ClusterShardMigration {
CHECK(tx_data.shard_cnt <= 1); // we don't support sync for multishard execution
if (!tx_data.IsGlobalCmd()) {
VLOG(3) << "Execute cmd without sync between shards. txid: " << tx_data.txid;
executor_.Execute(tx_data.dbid, absl::MakeSpan(tx_data.commands));
executor_.Execute(tx_data.dbid, tx_data.command);
} else {
// TODO check which global commands should be supported
CHECK(false) << "We don't support command: " << ToSV(tx_data.commands.front().cmd_args[0])
CHECK(false) << "We don't support command: " << ToSV(tx_data.command.cmd_args[0])
<< "in cluster migration process.";
}
}
Expand Down
47 changes: 9 additions & 38 deletions src/server/journal/tx_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
using namespace std;
using namespace facade;

ABSL_DECLARE_FLAG(bool, enable_multi_shard_sync);

namespace dfly {

bool MultiShardExecution::InsertTxToSharedMap(TxId txid, uint32_t shard_cnt) {
Expand Down Expand Up @@ -50,7 +48,6 @@ void MultiShardExecution::CancelAllBlockingEntities() {
}

void TransactionData::AddEntry(journal::ParsedEntry&& entry) {
++journal_rec_count;
opcode = entry.opcode;

switch (entry.opcode) {
Expand All @@ -63,7 +60,7 @@ void TransactionData::AddEntry(journal::ParsedEntry&& entry) {
case journal::Op::EXPIRED:
case journal::Op::COMMAND:
case journal::Op::MULTI_COMMAND:
commands.push_back(std::move(entry.cmd));
command = std::move(entry.cmd);
[[fallthrough]];
case journal::Op::EXEC:
shard_cnt = entry.shard_cnt;
Expand All @@ -76,11 +73,6 @@ void TransactionData::AddEntry(journal::ParsedEntry&& entry) {
}

bool TransactionData::IsGlobalCmd() const {
if (commands.size() > 1) {
return false;
}

auto& command = commands.front();
if (command.cmd_args.empty()) {
return false;
}
Expand All @@ -96,7 +88,7 @@ bool TransactionData::IsGlobalCmd() const {
return false;
}

TransactionData TransactionData::FromSingle(journal::ParsedEntry&& entry) {
TransactionData TransactionData::FromEntry(journal::ParsedEntry&& entry) {
TransactionData data;
data.AddEntry(std::move(entry));
return data;
Expand All @@ -116,35 +108,14 @@ std::optional<TransactionData> TransactionReader::NextTxData(JournalReader* read
VLOG(2) << "read lsn: " << *lsn_;
}

// Check if journal command can be executed right away.
// Expiration checks lock on master, so it never conflicts with running multi transactions.
if (res->opcode == journal::Op::EXPIRED || res->opcode == journal::Op::COMMAND ||
res->opcode == journal::Op::PING || res->opcode == journal::Op::FIN ||
res->opcode == journal::Op::LSN ||
(res->opcode == journal::Op::MULTI_COMMAND && !accumulate_multi_)) {
TransactionData tx_data = TransactionData::FromSingle(std::move(res.value()));
if (lsn_.has_value() && tx_data.opcode == journal::Op::LSN) {
DCHECK_NE(tx_data.lsn, 0u);
LOG_IF_EVERY_N(WARNING, tx_data.lsn != *lsn_, 10000)
<< "master lsn:" << tx_data.lsn << " replica lsn" << *lsn_;
DCHECK_EQ(tx_data.lsn, *lsn_);
}
return tx_data;
}

// Otherwise, continue building multi command.
DCHECK(res->opcode == journal::Op::MULTI_COMMAND || res->opcode == journal::Op::EXEC);
DCHECK(res->txid > 0 || res->shard_cnt == 1);

auto txid = res->txid;
auto& txdata = current_[txid];
txdata.AddEntry(std::move(res.value()));
// accumulate multi until we get exec opcode.
if (txdata.opcode == journal::Op::EXEC) {
auto out = std::move(txdata);
current_.erase(txid);
return out;
TransactionData tx_data = TransactionData::FromEntry(std::move(res.value()));
if (lsn_.has_value() && tx_data.opcode == journal::Op::LSN) {
DCHECK_NE(tx_data.lsn, 0u);
LOG_IF_EVERY_N(WARNING, tx_data.lsn != *lsn_, 10000)
<< "master lsn:" << tx_data.lsn << " replica lsn" << *lsn_;
DCHECK_EQ(tx_data.lsn, *lsn_);
}
return tx_data;
}

return std::nullopt;
Expand Down
12 changes: 4 additions & 8 deletions src/server/journal/tx_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ struct TransactionData {

bool IsGlobalCmd() const;

static TransactionData FromSingle(journal::ParsedEntry&& entry);
static TransactionData FromEntry(journal::ParsedEntry&& entry);

TxId txid{0};
DbIndex dbid{0};
uint32_t shard_cnt{0};
absl::InlinedVector<journal::ParsedEntry::CmdData, 1> commands{0};
uint32_t journal_rec_count{0}; // Count number of source entries to check offset.
journal::ParsedEntry::CmdData command;

journal::Op opcode = journal::Op::NOOP;
uint64_t lsn = 0;
};
Expand All @@ -58,15 +58,11 @@ struct TransactionData {
// The journal stream can contain interleaved data for multiple multi transactions,
// expiries and out of order executed transactions that need to be grouped on the replica side.
struct TransactionReader {
TransactionReader(bool accumulate_multi, std::optional<uint64_t> lsn = std::nullopt)
: accumulate_multi_(accumulate_multi), lsn_(lsn) {
TransactionReader(std::optional<uint64_t> lsn = std::nullopt) : lsn_(lsn) {
}
std::optional<TransactionData> NextTxData(JournalReader* reader, Context* cntx);

private:
// Stores ongoing multi transaction data.
absl::flat_hash_map<TxId, TransactionData> current_;
bool accumulate_multi_ = false;
std::optional<uint64_t> lsn_ = 0;
};

Expand Down
115 changes: 25 additions & 90 deletions src/server/replica.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ extern "C" {
#include "strings/human_readable.h"

ABSL_FLAG(int, replication_acks_interval, 3000, "Interval between acks in milliseconds.");
ABSL_FLAG(bool, enable_multi_shard_sync, false,
"Execute multi shards commands on replica synchronized");
ABSL_FLAG(int, master_connect_timeout_ms, 20000,
"Timeout for establishing connection to a replication master");
ABSL_FLAG(int, master_reconnect_timeout_ms, 1000,
Expand Down Expand Up @@ -746,10 +744,6 @@ error_code DflyShardReplica::StartStableSyncFlow(Context* cntx) {

sync_fb_ =
fb2::Fiber("shard_stable_sync_read", &DflyShardReplica::StableSyncDflyReadFb, this, cntx);
if (use_multi_shard_exe_sync_) {
execution_fb_ =
fb2::Fiber("shard_stable_sync_exec", &DflyShardReplica::StableSyncDflyExecFb, this, cntx);
}

return std::error_code{};
}
Expand Down Expand Up @@ -816,20 +810,13 @@ void DflyShardReplica::StableSyncDflyReadFb(Context* cntx) {

JournalReader reader{&ps, 0};
DCHECK_GE(journal_rec_executed_, 1u);
TransactionReader tx_reader{use_multi_shard_exe_sync_,
journal_rec_executed_.load(std::memory_order_relaxed) - 1};
TransactionReader tx_reader{journal_rec_executed_.load(std::memory_order_relaxed) - 1};

if (master_context_.version > DflyVersion::VER0) {
acks_fb_ = fb2::Fiber("shard_acks", &DflyShardReplica::StableSyncDflyAcksFb, this, cntx);
}

while (!cntx->IsCancelled()) {
shard_replica_waker_.await([&]() {
return ((trans_data_queue_.size() < kYieldAfterItemsInQueue) || cntx->IsCancelled());
});
if (cntx->IsCancelled())
break;

auto tx_data = tx_reader.NextTxData(&reader, cntx);
if (!tx_data)
break;
Expand All @@ -841,20 +828,10 @@ void DflyShardReplica::StableSyncDflyReadFb(Context* cntx) {
force_ping_ = true;
journal_rec_executed_.fetch_add(1, std::memory_order_relaxed);
} else if (tx_data->opcode == journal::Op::EXEC) {
if (use_multi_shard_exe_sync_) {
InsertTxDataToShardResource(std::move(*tx_data));
} else {
// On no shard sync mode we execute multi commands once they are recieved, therefor when
// receiving exec opcode, we only increase the journal counting.
DCHECK_EQ(tx_data->commands.size(), 0u);
journal_rec_executed_.fetch_add(1, std::memory_order_relaxed);
}
journal_rec_executed_.fetch_add(1, std::memory_order_relaxed);
} else {
if (use_multi_shard_exe_sync_) {
InsertTxDataToShardResource(std::move(*tx_data));
} else {
ExecuteTxWithNoShardSync(std::move(*tx_data), cntx);
}
ExecuteTx(std::move(*tx_data), cntx);
journal_rec_executed_.fetch_add(1, std::memory_order_relaxed);
}
shard_replica_waker_.notifyAll();
}
Expand Down Expand Up @@ -923,7 +900,6 @@ DflyShardReplica::DflyShardReplica(ServerContext server_context, MasterContext m
master_context_(master_context),
multi_shard_exe_(multi_shard_exe),
flow_id_(flow_id) {
use_multi_shard_exe_sync_ = GetFlag(FLAGS_enable_multi_shard_sync);
executor_ = std::make_unique<JournalExecutor>(service);
rdb_loader_ = std::make_unique<RdbLoader>(&service_);
}
Expand All @@ -932,53 +908,19 @@ DflyShardReplica::~DflyShardReplica() {
JoinFlow();
}

void DflyShardReplica::ExecuteTxWithNoShardSync(TransactionData&& tx_data, Context* cntx) {
void DflyShardReplica::ExecuteTx(TransactionData&& tx_data, Context* cntx) {
if (cntx->IsCancelled()) {
return;
}

bool was_insert = tx_data.IsGlobalCmd() &&
multi_shard_exe_->InsertTxToSharedMap(tx_data.txid, tx_data.shard_cnt);

ExecuteTx(std::move(tx_data), was_insert, cntx);
}

void DflyShardReplica::InsertTxDataToShardResource(TransactionData&& tx_data) {
bool was_insert = false;
if (tx_data.shard_cnt > 1) {
was_insert = multi_shard_exe_->InsertTxToSharedMap(tx_data.txid, tx_data.shard_cnt);
}

VLOG(2) << "txid: " << tx_data.txid << " pushed to queue";
trans_data_queue_.emplace(std::move(tx_data), was_insert);
}

void DflyShardReplica::StableSyncDflyExecFb(Context* cntx) {
while (!cntx->IsCancelled()) {
shard_replica_waker_.await(
[&]() { return (!trans_data_queue_.empty() || cntx->IsCancelled()); });
if (cntx->IsCancelled()) {
return;
}
DCHECK(!trans_data_queue_.empty());
auto& data = trans_data_queue_.front();
ExecuteTx(std::move(data.first), data.second, cntx);
trans_data_queue_.pop();
shard_replica_waker_.notifyAll();
}
}

void DflyShardReplica::ExecuteTx(TransactionData&& tx_data, bool inserted_by_me, Context* cntx) {
if (cntx->IsCancelled()) {
return;
}
if (tx_data.shard_cnt <= 1 || (!use_multi_shard_exe_sync_ && !tx_data.IsGlobalCmd())) {
if (!tx_data.IsGlobalCmd()) {
VLOG(2) << "Execute cmd without sync between shards. txid: " << tx_data.txid;
executor_->Execute(tx_data.dbid, absl::MakeSpan(tx_data.commands));
journal_rec_executed_.fetch_add(tx_data.journal_rec_count, std::memory_order_relaxed);
executor_->Execute(tx_data.dbid, tx_data.command);
return;
}

bool inserted_by_me = multi_shard_exe_->InsertTxToSharedMap(tx_data.txid, tx_data.shard_cnt);

auto& multi_shard_data = multi_shard_exe_->Find(tx_data.txid);

VLOG(2) << "Execute txid: " << tx_data.txid << " waiting for data in all shards";
Expand All @@ -991,29 +933,23 @@ void DflyShardReplica::ExecuteTx(TransactionData&& tx_data, bool inserted_by_me,
return;
VLOG(2) << "Execute txid: " << tx_data.txid << " block wait finished";

if (tx_data.IsGlobalCmd()) {
VLOG(2) << "Execute txid: " << tx_data.txid << " global command execution";
// Wait until all shards flows get to execution step of this transaction.
multi_shard_data.barrier.Wait();
// Check if we woke up due to cancellation.
if (cntx_.IsCancelled())
return;
// Global command will be executed only from one flow fiber. This ensure corectness of data in
// replica.
if (inserted_by_me) {
executor_->Execute(tx_data.dbid, absl::MakeSpan(tx_data.commands));
}
// Wait until exection is done, to make sure we done execute next commands while the global is
// executed.
multi_shard_data.barrier.Wait();
// Check if we woke up due to cancellation.
if (cntx_.IsCancelled())
return;
} else { // Non global command will be executed by each flow fiber
VLOG(2) << "Execute txid: " << tx_data.txid << " executing shard transaction commands";
executor_->Execute(tx_data.dbid, absl::MakeSpan(tx_data.commands));
VLOG(2) << "Execute txid: " << tx_data.txid << " global command execution";
// Wait until all shards flows get to execution step of this transaction.
multi_shard_data.barrier.Wait();
// Check if we woke up due to cancellation.
if (cntx_.IsCancelled())
return;
// Global command will be executed only from one flow fiber. This ensure corectness of data in
// replica.
if (inserted_by_me) {
executor_->Execute(tx_data.dbid, tx_data.command);
}
journal_rec_executed_.fetch_add(tx_data.journal_rec_count, std::memory_order_relaxed);
// Wait until exection is done, to make sure we done execute next commands while the global is
// executed.
multi_shard_data.barrier.Wait();
// Check if we woke up due to cancellation.
if (cntx_.IsCancelled())
return;

// Erase from map can be done only after all flow fibers executed the transaction commands.
// The last fiber which will decrease the counter to 0 will be the one to erase the data from
Expand Down Expand Up @@ -1164,7 +1100,6 @@ uint64_t DflyShardReplica::JournalExecutedCount() const {
void DflyShardReplica::JoinFlow() {
sync_fb_.JoinIfNeeded();
acks_fb_.JoinIfNeeded();
execution_fb_.JoinIfNeeded();
}

void DflyShardReplica::Cancel() {
Expand Down
Loading

0 comments on commit c45f7bf

Please sign in to comment.