diff --git a/src/server/command_registry.cc b/src/server/command_registry.cc index 1ad459ca90d7..82548d8d8488 100644 --- a/src/server/command_registry.cc +++ b/src/server/command_registry.cc @@ -40,9 +40,6 @@ CommandId::CommandId(const char* name, uint32_t mask, int8_t arity, int8_t first : facade::CommandId(name, mask, arity, first_key, last_key, acl_categories) { if (mask & CO::ADMIN) opt_mask_ |= CO::NOSCRIPT; - - if (mask & CO::BLOCKING) - opt_mask_ |= CO::REVERSE_MAPPING; } bool CommandId::IsTransactional() const { @@ -173,8 +170,6 @@ const char* OptName(CO::CommandOpt fl) { return "readonly"; case DENYOOM: return "denyoom"; - case REVERSE_MAPPING: - return "reverse-mapping"; case FAST: return "fast"; case LOADING: diff --git a/src/server/command_registry.h b/src/server/command_registry.h index 2b3ccb0c487a..d6ba0092132b 100644 --- a/src/server/command_registry.h +++ b/src/server/command_registry.h @@ -27,16 +27,13 @@ enum CommandOpt : uint32_t { LOADING = 1U << 3, // Command allowed during LOADING state. DENYOOM = 1U << 4, // use-memory in redis. - // marked commands that demand preserve the order of keys to work correctly. - // For example, MGET needs to know the order of keys to return the values in the same order. - // BLPOP needs to know the order of keys to return the first non-empty list from the left. - REVERSE_MAPPING = 1U << 5, + // UNUSED = 1U << 5, VARIADIC_KEYS = 1U << 6, // arg 2 determines number of keys. Relevant for ZUNIONSTORE, EVAL etc. ADMIN = 1U << 7, // implies NOSCRIPT, NOSCRIPT = 1U << 8, - BLOCKING = 1U << 9, // implies REVERSE_MAPPING + BLOCKING = 1U << 9, HIDDEN = 1U << 10, // does not show in COMMAND command output INTERLEAVED_KEYS = 1U << 11, // keys are interleaved with arguments GLOBAL_TRANS = 1U << 12, diff --git a/src/server/container_utils.cc b/src/server/container_utils.cc index 3b0402c7bd74..ce79da6c86c0 100644 --- a/src/server/container_utils.cc +++ b/src/server/container_utils.cc @@ -40,14 +40,12 @@ OpResult> FindFirstReadOnly(const Db int req_obj_type) { DCHECK(!args.Empty()); - unsigned i = 0; - for (string_view key : args) { - OpResult res = db_slice.FindReadOnly(cntx, key, req_obj_type); + for (auto it = args.begin(); it != args.end(); ++it) { + OpResult res = db_slice.FindReadOnly(cntx, *it, req_obj_type); if (res) - return make_pair(res.value(), i); + return make_pair(res.value(), unsigned(it.index())); if (res.status() != OpStatus::KEY_NOTFOUND) return res.status(); - ++i; } VLOG(2) << "FindFirst not found"; @@ -119,8 +117,8 @@ OpResult FindFirstNonEmpty(Transaction* trans, int req_obj_type) auto comp = [trans](const OpResult& lhs, const OpResult& rhs) { if (!lhs || !rhs) return lhs.ok(); - size_t i1 = trans->ReverseArgIndex(std::get(*lhs), std::get(*lhs)); - size_t i2 = trans->ReverseArgIndex(std::get(*rhs), std::get(*rhs)); + size_t i1 = std::get<1>(*lhs); + size_t i2 = std::get<1>(*rhs); return i1 < i2; }; diff --git a/src/server/journal/types.h b/src/server/journal/types.h index 4d86e3325f1a..aeb0286ca65f 100644 --- a/src/server/journal/types.h +++ b/src/server/journal/types.h @@ -42,8 +42,8 @@ struct Entry : public EntryBase { struct Payload { std::string_view cmd; std::variant + ShardArgs, // Shard parts. + ArgSlice> args; Payload() = default; @@ -51,6 +51,8 @@ struct Entry : public EntryBase { } Payload(std::string_view c, const ShardArgs& a) : cmd(c), args(a) { } + Payload(std::string_view c, ArgSlice a) : cmd(c), args(a) { + } }; Entry(TxId txid, Op opcode, DbIndex dbid, uint32_t shard_cnt, diff --git a/src/server/json_family.cc b/src/server/json_family.cc index da5ff13ed43c..0617ebc556d0 100644 --- a/src/server/json_family.cc +++ b/src/server/json_family.cc @@ -1543,12 +1543,14 @@ void JsonFamily::MGet(CmdArgList args, ConnectionContext* cntx) { continue; vector& res = mget_resp[sid]; - for (size_t j = 0; j < res.size(); ++j) { - if (!res[j]) + ShardArgs shard_args = transaction->GetShardArgs(sid); + unsigned src_index = 0; + for (auto it = shard_args.begin(); it != shard_args.end(); ++it, ++src_index) { + if (!res[src_index]) continue; - uint32_t indx = transaction->ReverseArgIndex(sid, j); - results[indx] = std::move(res[j]); + uint32_t dst_indx = it.index(); + results[dst_indx] = std::move(res[src_index]); } } @@ -2091,8 +2093,7 @@ void JsonFamily::Register(CommandRegistry* registry) { constexpr size_t kMsetFlags = CO::WRITE | CO::DENYOOM | CO::FAST | CO::INTERLEAVED_KEYS; registry->StartFamily(); *registry << CI{"JSON.GET", CO::READONLY | CO::FAST, -2, 1, 1, acl::JSON}.HFUNC(Get); - *registry << CI{"JSON.MGET", CO::READONLY | CO::FAST | CO::REVERSE_MAPPING, -3, 1, -2, acl::JSON} - .HFUNC(MGet); + *registry << CI{"JSON.MGET", CO::READONLY | CO::FAST, -3, 1, -2, acl::JSON}.HFUNC(MGet); *registry << CI{"JSON.TYPE", CO::READONLY | CO::FAST, 3, 1, 1, acl::JSON}.HFUNC(Type); *registry << CI{"JSON.STRLEN", CO::READONLY | CO::FAST, 3, 1, 1, acl::JSON}.HFUNC(StrLen); *registry << CI{"JSON.OBJLEN", CO::READONLY | CO::FAST, 3, 1, 1, acl::JSON}.HFUNC(ObjLen); diff --git a/src/server/list_family.cc b/src/server/list_family.cc index 79adbf336c09..9515c7239036 100644 --- a/src/server/list_family.cc +++ b/src/server/list_family.cc @@ -158,6 +158,22 @@ struct CircularMessages { // Used to recover logs for BLPOP failures. See OpBPop. thread_local CircularMessages debugMessages{50}; +// A bit awkward translation from a single key to ShardArgs. +// We create a mutable slice (which will never be mutated) from the key, then we create +// a CmdArgList of size 1 that references mslice and finally +// we reference the first element in the CmdArgList via islice. +struct SingleArg { + MutableSlice mslice; + IndexSlice islice{0, 1}; + + SingleArg(string_view arg) : mslice(const_cast(arg.data()), arg.size()) { + } + + ShardArgs Get() { + return ShardArgs{CmdArgList{&mslice, 1}, absl::MakeSpan(&islice, 1)}; + } +}; + class BPopPusher { public: BPopPusher(string_view pop_key, string_view push_key, ListDir popdir, ListDir pushdir); @@ -448,7 +464,9 @@ OpResult MoveTwoShards(Transaction* trans, string_view src, string_view // hack, again. since we hacked which queue we are waiting on (see RunPair) // we must clean-up src key here manually. See RunPair why we do this. // in short- we suspended on "src" on both shards. - shard->blocking_controller()->FinalizeWatched(ArgSlice{&src, 1}, t); + + SingleArg single_arg{src}; + shard->blocking_controller()->FinalizeWatched(single_arg.Get(), t); } } else { DVLOG(1) << "Popping value from list: " << key; @@ -873,7 +891,8 @@ OpResult BPopPusher::RunSingle(ConnectionContext* cntx, time_point tp) { return op_res; } - auto wcb = [&](Transaction* t, EngineShard* shard) { return ShardArgs{&this->pop_key_, 1}; }; + SingleArg single_arg{pop_key_}; + auto wcb = [&](Transaction* t, EngineShard* shard) { return single_arg.Get(); }; const auto key_checker = [](EngineShard* owner, const DbContext& context, Transaction*, std::string_view key) -> bool { @@ -900,11 +919,13 @@ OpResult BPopPusher::RunPair(ConnectionContext* cntx, time_point tp) { return op_res; } + SingleArg single_arg(this->pop_key_); + // a hack: we watch in both shards for pop_key but only in the source shard it's relevant. // Therefore we follow the regular flow of watching the key but for the destination shard it // will never be triggerred. // This allows us to run Transaction::Execute on watched transactions in both shards. - auto wcb = [&](Transaction* t, EngineShard* shard) { return ArgSlice{&this->pop_key_, 1}; }; + auto wcb = [&](Transaction* t, EngineShard* shard) { return single_arg.Get(); }; const auto key_checker = [](EngineShard* owner, const DbContext& context, Transaction*, std::string_view key) -> bool { diff --git a/src/server/stream_family.cc b/src/server/stream_family.cc index 3274c439cc7d..675871ac6a61 100644 --- a/src/server/stream_family.cc +++ b/src/server/stream_family.cc @@ -2989,17 +2989,19 @@ void XReadImpl(CmdArgList args, std::optional opts, ConnectionContext* continue; vector& results = xread_resp[sid]; + unsigned src_index = 0; + ShardArgs shard_args = cntx->transaction->GetShardArgs(sid); - for (size_t i = 0; i < results.size(); ++i) { - if (results[i].size() == 0) { + for (auto it = shard_args.begin(); it != shard_args.end(); ++it, ++src_index) { + if (results[src_index].size() == 0) { continue; } resolved_streams++; // Add the stream records ordered by the original stream arguments. - size_t indx = cntx->transaction->ReverseArgIndex(sid, i); - res[indx - opts->streams_arg] = std::move(results[i]); + size_t dst_indx = it.index(); + res[dst_indx - opts->streams_arg] = std::move(results[src_index]); } } @@ -3323,7 +3325,7 @@ constexpr uint32_t kXAutoClaim = WRITE | STREAM | FAST; void StreamFamily::Register(CommandRegistry* registry) { using CI = CommandId; registry->StartFamily(); - constexpr auto kReadFlags = CO::READONLY | CO::BLOCKING | CO::REVERSE_MAPPING | CO::VARIADIC_KEYS; + constexpr auto kReadFlags = CO::READONLY | CO::BLOCKING | CO::VARIADIC_KEYS; *registry << CI{"XADD", CO::WRITE | CO::DENYOOM | CO::FAST, -5, 1, 1, acl::kXAdd}.HFUNC(XAdd) << CI{"XCLAIM", CO::WRITE | CO::FAST, -6, 1, 1, acl::kXClaim}.HFUNC(XClaim) << CI{"XDEL", CO::WRITE | CO::FAST, -3, 1, 1, acl::kXDel}.HFUNC(XDel) diff --git a/src/server/string_family.cc b/src/server/string_family.cc index d10de123ebcc..96db64f88afe 100644 --- a/src/server/string_family.cc +++ b/src/server/string_family.cc @@ -271,6 +271,7 @@ void OpMSet(const OpArgs& op_args, const ShardArgs& args, atomic_bool* success) SetCmd sg(op_args, false); size_t index = 0; + bool partial = false; for (auto it = args.begin(); it != args.end(); ++it) { string_view key = *it; ++it; @@ -278,6 +279,7 @@ void OpMSet(const OpArgs& op_args, const ShardArgs& args, atomic_bool* success) DVLOG(1) << "MSet " << key << ":" << value; if (sg.Set(params, key, value) != OpStatus::OK) { // OOM for example. success->store(false); + partial = true; break; } index += 2; @@ -286,18 +288,29 @@ void OpMSet(const OpArgs& op_args, const ShardArgs& args, atomic_bool* success) if (auto journal = op_args.shard->journal(); journal) { // We write a custom journal because an OOM in the above loop could lead to partial success, so // we replicate only what was changed. - string_view cmd; - ArgSlice cmd_args; - if (index == 0) { - // All shards must record the tx was executed for the replica to execute it, so we send a PING - // in case nothing was changed - cmd = "PING"; + if (partial) { + string_view cmd; + ArgSlice cmd_args; + vector store_args(index); + if (index == 0) { + // All shards must record the tx was executed for the replica to execute it, so we send a + // PING in case nothing was changed + cmd = "PING"; + } else { + // journal [0, i) + cmd = "MSET"; + unsigned i = 0; + for (string_view arg : args) { + store_args[i++] = arg; + if (i >= store_args.size()) + break; + } + cmd_args = absl::MakeSpan(store_args); + } + RecordJournal(op_args, cmd, cmd_args, op_args.tx->GetUniqueShardCnt()); } else { - // journal [0, i) - cmd = "MSET"; - cmd_args = ArgSlice(args.begin(), index); + RecordJournal(op_args, "MSET", args, op_args.tx->GetUniqueShardCnt()); } - RecordJournal(op_args, cmd, cmd_args, op_args.tx->GetUniqueShardCnt()); } } @@ -1161,16 +1174,17 @@ void StringFamily::MGet(CmdArgList args, ConnectionContext* cntx) { src.storage_list->next = res.storage_list; res.storage_list = src.storage_list; src.storage_list = nullptr; - - for (size_t j = 0; j < src.resp_arr.size(); ++j) { - if (!src.resp_arr[j]) + ShardArgs shard_args = transaction->GetShardArgs(sid); + unsigned src_indx = 0; + for (auto it = shard_args.begin(); it != shard_args.end(); ++it, ++src_indx) { + if (!src.resp_arr[src_indx]) continue; - uint32_t indx = transaction->ReverseArgIndex(sid, j); + uint32_t indx = it.index(); - res.resp_arr[indx] = std::move(src.resp_arr[j]); + res.resp_arr[indx] = std::move(src.resp_arr[src_indx]); if (cntx->protocol() == Protocol::MEMCACHE) { - res.resp_arr[indx]->key = ArgS(args, indx); + res.resp_arr[indx]->key = *it; } } } @@ -1486,9 +1500,7 @@ void StringFamily::Register(CommandRegistry* registry) { << CI{"GETEX", CO::WRITE | CO::DENYOOM | CO::FAST | CO::NO_AUTOJOURNAL, -1, 1, 1, acl::kGetEx} .HFUNC(GetEx) << CI{"GETSET", CO::WRITE | CO::DENYOOM | CO::FAST, 3, 1, 1, acl::kGetSet}.HFUNC(GetSet) - << CI{"MGET", CO::READONLY | CO::FAST | CO::REVERSE_MAPPING | CO::IDEMPOTENT, -2, 1, -1, - acl::kMGet} - .HFUNC(MGet) + << CI{"MGET", CO::READONLY | CO::FAST | CO::IDEMPOTENT, -2, 1, -1, acl::kMGet}.HFUNC(MGet) << CI{"MSET", kMSetMask, -3, 1, -1, acl::kMSet}.HFUNC(MSet) << CI{"MSETNX", kMSetMask, -3, 1, -1, acl::kMSetNx}.HFUNC(MSetNx) << CI{"STRLEN", CO::READONLY | CO::FAST, 2, 1, 1, acl::kStrLen}.HFUNC(StrLen) diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 65795a52ea14..b09969478c8b 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -184,12 +184,13 @@ void Transaction::InitGlobal() { void Transaction::BuildShardIndex(const KeyIndex& key_index, std::vector* out) { auto& shard_index = *out; - auto add = [this, rev_mapping = key_index.has_reverse_mapping, &shard_index](uint32_t sid, - uint32_t i) { - string_view val = ArgS(full_args_, i); - shard_index[sid].args.push_back(val); - if (rev_mapping) - shard_index[sid].original_index.push_back(i); + auto add = [this, &shard_index](uint32_t sid, uint32_t b, uint32_t e) { + auto& slices = shard_index[sid].slices; + if (!slices.empty() && slices.back().second == b) { + slices.back().second = e; + } else { + slices.emplace_back(b, e); + } }; if (key_index.bonus) { @@ -197,47 +198,39 @@ void Transaction::BuildShardIndex(const KeyIndex& key_index, std::vector shard_index, size_t num_args, - bool rev_mapping) { - kv_args_.reserve(num_args); +void Transaction::InitShardData(absl::Span shard_index, size_t num_args) { + args_slices_.reserve(num_args); DCHECK(kv_fp_.empty()); kv_fp_.reserve(num_args); - if (rev_mapping) - reverse_index_.reserve(num_args); - // Store the concatenated per-shard arguments from the shard index inside kv_args_ // and make each shard data point to its own sub-span inside kv_args_. for (size_t i = 0; i < shard_data_.size(); ++i) { auto& sd = shard_data_[i]; - const auto& si = shard_index[i]; + const auto& src = shard_index[i]; - sd.arg_count = si.args.size(); - sd.arg_start = kv_args_.size(); + sd.slice_count = src.slices.size(); + sd.slice_start = args_slices_.size(); sd.fp_start = kv_fp_.size(); sd.fp_count = 0; // Multi transactions can re-initialize on different shards, so clear ACTIVE flag. DCHECK_EQ(sd.local_mask & ACTIVE, 0); - if (sd.arg_count == 0) + if (sd.slice_count == 0) continue; sd.local_mask |= ACTIVE; @@ -245,19 +238,16 @@ void Transaction::InitShardData(absl::Span shard_index, siz unique_shard_cnt_++; unique_shard_id_ = i; - for (size_t j = 0; j < si.args.size(); ++j) { - string_view arg = si.args[j]; - kv_args_.push_back(arg); - if (si.key_step == 1 || j % si.key_step == 0) { - kv_fp_.push_back(LockTag(arg).Fingerprint()); + for (size_t j = 0; j < src.slices.size(); ++j) { + IndexSlice slice = src.slices[j]; + args_slices_.push_back(slice); + for (uint32_t k = slice.first; k < slice.second; k += src.key_step) { + string_view key = ArgS(full_args_, k); + kv_fp_.push_back(LockTag(key).Fingerprint()); sd.fp_count++; } - if (rev_mapping) - reverse_index_.push_back(si.original_index[j]); } } - - DCHECK_EQ(kv_args_.size(), num_args); } void Transaction::PrepareMultiFps(CmdArgList keys) { @@ -277,22 +267,13 @@ void Transaction::PrepareMultiFps(CmdArgList keys) { void Transaction::StoreKeysInArgs(const KeyIndex& key_index) { DCHECK(!key_index.bonus); DCHECK(kv_fp_.empty()); + DCHECK(args_slices_.empty()); // even for a single key we may have multiple arguments per key (MSET). + args_slices_.emplace_back(key_index.start, key_index.end); for (unsigned j = key_index.start; j < key_index.end; j += key_index.step) { - string_view arg = ArgS(full_args_, j); - kv_args_.push_back(arg); - kv_fp_.push_back(LockTag(arg).Fingerprint()); - - for (unsigned k = j + 1; k < j + key_index.step; ++k) - kv_args_.push_back(ArgS(full_args_, k)); - } - - if (key_index.has_reverse_mapping) { - reverse_index_.resize(kv_args_.size()); - for (unsigned j = 0; j < reverse_index_.size(); ++j) { - reverse_index_[j] = j + key_index.start; - } + string_view key = ArgS(full_args_, j); + kv_fp_.push_back(LockTag(key).Fingerprint()); } } @@ -314,7 +295,7 @@ void Transaction::InitByKeys(const KeyIndex& key_index) { StoreKeysInArgs(key_index); unique_shard_cnt_ = 1; - string_view akey = kv_args_.front(); + string_view akey = ArgS(full_args_, key_index.start); if (is_stub) // stub transactions don't migrate DCHECK_EQ(unique_shard_id_, Shard(akey, shard_set->size())); else { @@ -340,11 +321,11 @@ void Transaction::InitByKeys(const KeyIndex& key_index) { BuildShardIndex(key_index, &shard_index); // Initialize shard data based on distributed arguments. - InitShardData(shard_index, key_index.num_args(), key_index.has_reverse_mapping); + InitShardData(shard_index, key_index.num_args()); DCHECK(!multi_ || multi_->mode != LOCK_AHEAD || !multi_->tag_fps.empty()); - DVLOG(1) << "InitByArgs " << DebugId() << " " << kv_args_.front(); + DVLOG(1) << "InitByArgs " << DebugId() << facade::ToSV(full_args_.front()); // Compress shard data, if we occupy only one shard. if (unique_shard_cnt_ == 1) { @@ -357,15 +338,8 @@ void Transaction::InitByKeys(const KeyIndex& key_index) { sd = &shard_data_.front(); sd->local_mask |= ACTIVE; } - sd->arg_count = -1; - sd->arg_start = -1; - } - - // Validation. Check reverse mapping was built correctly. - if (key_index.has_reverse_mapping) { - for (size_t i = 0; i < kv_args_.size(); ++i) { - DCHECK_EQ(kv_args_[i], ArgS(full_args_, reverse_index_[i])) << full_args_; - } + sd->slice_count = -1; + sd->slice_start = -1; } // Validation. @@ -396,7 +370,7 @@ OpStatus Transaction::InitByArgs(DbIndex index, CmdArgList args) { } DCHECK_EQ(unique_shard_cnt_, 0u); - DCHECK(kv_args_.empty()); + DCHECK(args_slices_.empty()); DCHECK(kv_fp_.empty()); OpResult key_index = DetermineKeys(cid_, args); @@ -427,8 +401,8 @@ void Transaction::PrepareSquashedMultiHop(const CommandId* cid, } else { shard_data_[i].local_mask &= ~ACTIVE; } - shard_data_[i].arg_start = 0; - shard_data_[i].arg_count = 0; + shard_data_[i].slice_start = 0; + shard_data_[i].slice_count = 0; } MultiBecomeSquasher(); @@ -485,15 +459,14 @@ void Transaction::MultiSwitchCmd(const CommandId* cid) { unique_shard_id_ = 0; unique_shard_cnt_ = 0; - kv_args_.clear(); + args_slices_.clear(); kv_fp_.clear(); - reverse_index_.clear(); cid_ = cid; cb_ptr_ = nullptr; for (auto& sd : shard_data_) { - sd.arg_count = sd.arg_start = 0; + sd.slice_count = sd.slice_start = 0; if (multi_->mode == NON_ATOMIC) { sd.local_mask = 0; // Non atomic transactions schedule each time, so remove all flags @@ -555,7 +528,6 @@ void Transaction::PrepareMultiForScheduleSingleHop(ShardId sid, DbIndex db, CmdA EnableShard(sid); OpResult key_index = DetermineKeys(cid_, args); CHECK(key_index); - DCHECK(!key_index->has_reverse_mapping); StoreKeysInArgs(*key_index); } @@ -1181,23 +1153,12 @@ ShardArgs Transaction::GetShardArgs(ShardId sid) const { // We can read unique_shard_cnt_ only because ShardArgsInShard is called after IsArmedInShard // barrier. if (unique_shard_cnt_ == 1) { - return kv_args_; + return ShardArgs{full_args_, absl::MakeSpan(args_slices_)}; } const auto& sd = shard_data_[sid]; - return ShardArgs{kv_args_.data() + sd.arg_start, sd.arg_count}; -} - -// from local index back to original arg index skipping the command. -// i.e. returns (first_key_pos -1) or bigger. -size_t Transaction::ReverseArgIndex(ShardId shard_id, size_t arg_index) const { - DCHECK_LT(arg_index, reverse_index_.size()); - - if (unique_shard_cnt_ == 1) - return reverse_index_[arg_index]; - - const auto& sd = shard_data_[shard_id]; - return reverse_index_[sd.arg_start + arg_index]; + return ShardArgs{full_args_, + absl::MakeSpan(args_slices_.data() + sd.slice_start, sd.slice_count)}; } OpStatus Transaction::WaitOnWatch(const time_point& tp, WaitKeysProvider wkeys_provider, @@ -1373,7 +1334,7 @@ bool Transaction::NotifySuspended(TxId committed_txid, ShardId sid, string_view // Change state to awaked and store index of awakened key sd.local_mask &= ~SUSPENDED_Q; sd.local_mask |= AWAKED_Q; - sd.wake_key_pos = it - args.begin(); + sd.wake_key_pos = it.index(); blocking_barrier_.Close(); return true; @@ -1384,8 +1345,8 @@ optional Transaction::GetWakeKey(ShardId sid) const { if ((sd.local_mask & AWAKED_Q) == 0) return nullopt; - CHECK_NE(sd.wake_key_pos, UINT16_MAX); - return GetShardArgs(sid).at(sd.wake_key_pos); + CHECK_LT(sd.wake_key_pos, full_args_.size()); + return ArgS(full_args_, sd.wake_key_pos); } void Transaction::LogAutoJournalOnShard(EngineShard* shard, RunnableResult result) { @@ -1421,10 +1382,11 @@ void Transaction::LogAutoJournalOnShard(EngineShard* shard, RunnableResult resul journal::Entry::Payload entry_payload; string_view cmd{cid_->name()}; - if (unique_shard_cnt_ == 1 || kv_args_.empty()) { + if (unique_shard_cnt_ == 1 || args_slices_.empty()) { entry_payload = journal::Entry::Payload(cmd, full_args_); } else { - entry_payload = journal::Entry::Payload(cmd, GetShardArgs(shard->shard_id()).AsSlice()); + ShardArgs shard_args = GetShardArgs(shard->shard_id()); + entry_payload = journal::Entry::Payload(cmd, shard_args); } LogJournalOnShard(shard, std::move(entry_payload), unique_shard_cnt_, false, true); } @@ -1511,10 +1473,6 @@ OpResult DetermineKeys(const CommandId* cid, CmdArgList args) { int num_custom_keys = -1; - if (cid->opt_mask() & CO::REVERSE_MAPPING) { - key_index.has_reverse_mapping = true; - } - if (cid->opt_mask() & CO::VARIADIC_KEYS) { // ZUNION/INTER [ ...] // EVAL