Skip to content

Commit

Permalink
chore: get rid of kv_args and replace it with slices to full_args (#2942
Browse files Browse the repository at this point in the history
)

The main change is in tx_base.* where we introduce ShardArgs slice that
is only forward iterable. It allows us to go over sub-ranges of the full arguments
slice or read an index of any of its elements.

Since ShardArgs provide now indices into the original argument list we do not need to build the reverse index in transactions.

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange authored May 6, 2024
1 parent 135af96 commit de0e5cb
Show file tree
Hide file tree
Showing 13 changed files with 216 additions and 185 deletions.
5 changes: 0 additions & 5 deletions src/server/command_registry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@ CommandId::CommandId(const char* name, uint32_t mask, int8_t arity, int8_t first
: facade::CommandId(name, mask, arity, first_key, last_key, acl_categories) {
if (mask & CO::ADMIN)
opt_mask_ |= CO::NOSCRIPT;

if (mask & CO::BLOCKING)
opt_mask_ |= CO::REVERSE_MAPPING;
}

bool CommandId::IsTransactional() const {
Expand Down Expand Up @@ -173,8 +170,6 @@ const char* OptName(CO::CommandOpt fl) {
return "readonly";
case DENYOOM:
return "denyoom";
case REVERSE_MAPPING:
return "reverse-mapping";
case FAST:
return "fast";
case LOADING:
Expand Down
7 changes: 2 additions & 5 deletions src/server/command_registry.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,13 @@ enum CommandOpt : uint32_t {
LOADING = 1U << 3, // Command allowed during LOADING state.
DENYOOM = 1U << 4, // use-memory in redis.

// marked commands that demand preserve the order of keys to work correctly.
// For example, MGET needs to know the order of keys to return the values in the same order.
// BLPOP needs to know the order of keys to return the first non-empty list from the left.
REVERSE_MAPPING = 1U << 5,
// UNUSED = 1U << 5,

VARIADIC_KEYS = 1U << 6, // arg 2 determines number of keys. Relevant for ZUNIONSTORE, EVAL etc.

ADMIN = 1U << 7, // implies NOSCRIPT,
NOSCRIPT = 1U << 8,
BLOCKING = 1U << 9, // implies REVERSE_MAPPING
BLOCKING = 1U << 9,
HIDDEN = 1U << 10, // does not show in COMMAND command output
INTERLEAVED_KEYS = 1U << 11, // keys are interleaved with arguments
GLOBAL_TRANS = 1U << 12,
Expand Down
12 changes: 5 additions & 7 deletions src/server/container_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,12 @@ OpResult<std::pair<DbSlice::ConstIterator, unsigned>> FindFirstReadOnly(const Db
int req_obj_type) {
DCHECK(!args.Empty());

unsigned i = 0;
for (string_view key : args) {
OpResult<DbSlice::ConstIterator> res = db_slice.FindReadOnly(cntx, key, req_obj_type);
for (auto it = args.begin(); it != args.end(); ++it) {
OpResult<DbSlice::ConstIterator> res = db_slice.FindReadOnly(cntx, *it, req_obj_type);
if (res)
return make_pair(res.value(), i);
return make_pair(res.value(), unsigned(it.index()));
if (res.status() != OpStatus::KEY_NOTFOUND)
return res.status();
++i;
}

VLOG(2) << "FindFirst not found";
Expand Down Expand Up @@ -119,8 +117,8 @@ OpResult<ShardFFResult> FindFirstNonEmpty(Transaction* trans, int req_obj_type)
auto comp = [trans](const OpResult<FFResult>& lhs, const OpResult<FFResult>& rhs) {
if (!lhs || !rhs)
return lhs.ok();
size_t i1 = trans->ReverseArgIndex(std::get<ShardId>(*lhs), std::get<unsigned>(*lhs));
size_t i2 = trans->ReverseArgIndex(std::get<ShardId>(*rhs), std::get<unsigned>(*rhs));
size_t i1 = std::get<1>(*lhs);
size_t i2 = std::get<1>(*rhs);
return i1 < i2;
};

Expand Down
6 changes: 4 additions & 2 deletions src/server/journal/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,17 @@ struct Entry : public EntryBase {
struct Payload {
std::string_view cmd;
std::variant<CmdArgList, // Parts of a full command.
ShardArgs // Command and its shard parts.
>
ShardArgs, // Shard parts.
ArgSlice>
args;

Payload() = default;
Payload(std::string_view c, CmdArgList a) : cmd(c), args(a) {
}
Payload(std::string_view c, const ShardArgs& a) : cmd(c), args(a) {
}
Payload(std::string_view c, ArgSlice a) : cmd(c), args(a) {
}
};

Entry(TxId txid, Op opcode, DbIndex dbid, uint32_t shard_cnt,
Expand Down
13 changes: 7 additions & 6 deletions src/server/json_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1543,12 +1543,14 @@ void JsonFamily::MGet(CmdArgList args, ConnectionContext* cntx) {
continue;

vector<OptString>& res = mget_resp[sid];
for (size_t j = 0; j < res.size(); ++j) {
if (!res[j])
ShardArgs shard_args = transaction->GetShardArgs(sid);
unsigned src_index = 0;
for (auto it = shard_args.begin(); it != shard_args.end(); ++it, ++src_index) {
if (!res[src_index])
continue;

uint32_t indx = transaction->ReverseArgIndex(sid, j);
results[indx] = std::move(res[j]);
uint32_t dst_indx = it.index();
results[dst_indx] = std::move(res[src_index]);
}
}

Expand Down Expand Up @@ -2091,8 +2093,7 @@ void JsonFamily::Register(CommandRegistry* registry) {
constexpr size_t kMsetFlags = CO::WRITE | CO::DENYOOM | CO::FAST | CO::INTERLEAVED_KEYS;
registry->StartFamily();
*registry << CI{"JSON.GET", CO::READONLY | CO::FAST, -2, 1, 1, acl::JSON}.HFUNC(Get);
*registry << CI{"JSON.MGET", CO::READONLY | CO::FAST | CO::REVERSE_MAPPING, -3, 1, -2, acl::JSON}
.HFUNC(MGet);
*registry << CI{"JSON.MGET", CO::READONLY | CO::FAST, -3, 1, -2, acl::JSON}.HFUNC(MGet);
*registry << CI{"JSON.TYPE", CO::READONLY | CO::FAST, 3, 1, 1, acl::JSON}.HFUNC(Type);
*registry << CI{"JSON.STRLEN", CO::READONLY | CO::FAST, 3, 1, 1, acl::JSON}.HFUNC(StrLen);
*registry << CI{"JSON.OBJLEN", CO::READONLY | CO::FAST, 3, 1, 1, acl::JSON}.HFUNC(ObjLen);
Expand Down
27 changes: 24 additions & 3 deletions src/server/list_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,22 @@ struct CircularMessages {
// Used to recover logs for BLPOP failures. See OpBPop.
thread_local CircularMessages debugMessages{50};

// A bit awkward translation from a single key to ShardArgs.
// We create a mutable slice (which will never be mutated) from the key, then we create
// a CmdArgList of size 1 that references mslice and finally
// we reference the first element in the CmdArgList via islice.
struct SingleArg {
MutableSlice mslice;
IndexSlice islice{0, 1};

SingleArg(string_view arg) : mslice(const_cast<char*>(arg.data()), arg.size()) {
}

ShardArgs Get() {
return ShardArgs{CmdArgList{&mslice, 1}, absl::MakeSpan(&islice, 1)};
}
};

class BPopPusher {
public:
BPopPusher(string_view pop_key, string_view push_key, ListDir popdir, ListDir pushdir);
Expand Down Expand Up @@ -448,7 +464,9 @@ OpResult<string> MoveTwoShards(Transaction* trans, string_view src, string_view
// hack, again. since we hacked which queue we are waiting on (see RunPair)
// we must clean-up src key here manually. See RunPair why we do this.
// in short- we suspended on "src" on both shards.
shard->blocking_controller()->FinalizeWatched(ArgSlice{&src, 1}, t);

SingleArg single_arg{src};
shard->blocking_controller()->FinalizeWatched(single_arg.Get(), t);
}
} else {
DVLOG(1) << "Popping value from list: " << key;
Expand Down Expand Up @@ -873,7 +891,8 @@ OpResult<string> BPopPusher::RunSingle(ConnectionContext* cntx, time_point tp) {
return op_res;
}

auto wcb = [&](Transaction* t, EngineShard* shard) { return ShardArgs{&this->pop_key_, 1}; };
SingleArg single_arg{pop_key_};
auto wcb = [&](Transaction* t, EngineShard* shard) { return single_arg.Get(); };

const auto key_checker = [](EngineShard* owner, const DbContext& context, Transaction*,
std::string_view key) -> bool {
Expand All @@ -900,11 +919,13 @@ OpResult<string> BPopPusher::RunPair(ConnectionContext* cntx, time_point tp) {
return op_res;
}

SingleArg single_arg(this->pop_key_);

// a hack: we watch in both shards for pop_key but only in the source shard it's relevant.
// Therefore we follow the regular flow of watching the key but for the destination shard it
// will never be triggerred.
// This allows us to run Transaction::Execute on watched transactions in both shards.
auto wcb = [&](Transaction* t, EngineShard* shard) { return ArgSlice{&this->pop_key_, 1}; };
auto wcb = [&](Transaction* t, EngineShard* shard) { return single_arg.Get(); };

const auto key_checker = [](EngineShard* owner, const DbContext& context, Transaction*,
std::string_view key) -> bool {
Expand Down
12 changes: 7 additions & 5 deletions src/server/stream_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2989,17 +2989,19 @@ void XReadImpl(CmdArgList args, std::optional<ReadOpts> opts, ConnectionContext*
continue;

vector<RecordVec>& results = xread_resp[sid];
unsigned src_index = 0;
ShardArgs shard_args = cntx->transaction->GetShardArgs(sid);

for (size_t i = 0; i < results.size(); ++i) {
if (results[i].size() == 0) {
for (auto it = shard_args.begin(); it != shard_args.end(); ++it, ++src_index) {
if (results[src_index].size() == 0) {
continue;
}

resolved_streams++;

// Add the stream records ordered by the original stream arguments.
size_t indx = cntx->transaction->ReverseArgIndex(sid, i);
res[indx - opts->streams_arg] = std::move(results[i]);
size_t dst_indx = it.index();
res[dst_indx - opts->streams_arg] = std::move(results[src_index]);
}
}

Expand Down Expand Up @@ -3323,7 +3325,7 @@ constexpr uint32_t kXAutoClaim = WRITE | STREAM | FAST;
void StreamFamily::Register(CommandRegistry* registry) {
using CI = CommandId;
registry->StartFamily();
constexpr auto kReadFlags = CO::READONLY | CO::BLOCKING | CO::REVERSE_MAPPING | CO::VARIADIC_KEYS;
constexpr auto kReadFlags = CO::READONLY | CO::BLOCKING | CO::VARIADIC_KEYS;
*registry << CI{"XADD", CO::WRITE | CO::DENYOOM | CO::FAST, -5, 1, 1, acl::kXAdd}.HFUNC(XAdd)
<< CI{"XCLAIM", CO::WRITE | CO::FAST, -6, 1, 1, acl::kXClaim}.HFUNC(XClaim)
<< CI{"XDEL", CO::WRITE | CO::FAST, -3, 1, 1, acl::kXDel}.HFUNC(XDel)
Expand Down
50 changes: 31 additions & 19 deletions src/server/string_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -271,13 +271,15 @@ void OpMSet(const OpArgs& op_args, const ShardArgs& args, atomic_bool* success)
SetCmd sg(op_args, false);

size_t index = 0;
bool partial = false;
for (auto it = args.begin(); it != args.end(); ++it) {
string_view key = *it;
++it;
string_view value = *it;
DVLOG(1) << "MSet " << key << ":" << value;
if (sg.Set(params, key, value) != OpStatus::OK) { // OOM for example.
success->store(false);
partial = true;
break;
}
index += 2;
Expand All @@ -286,18 +288,29 @@ void OpMSet(const OpArgs& op_args, const ShardArgs& args, atomic_bool* success)
if (auto journal = op_args.shard->journal(); journal) {
// We write a custom journal because an OOM in the above loop could lead to partial success, so
// we replicate only what was changed.
string_view cmd;
ArgSlice cmd_args;
if (index == 0) {
// All shards must record the tx was executed for the replica to execute it, so we send a PING
// in case nothing was changed
cmd = "PING";
if (partial) {
string_view cmd;
ArgSlice cmd_args;
vector<string_view> store_args(index);
if (index == 0) {
// All shards must record the tx was executed for the replica to execute it, so we send a
// PING in case nothing was changed
cmd = "PING";
} else {
// journal [0, i)
cmd = "MSET";
unsigned i = 0;
for (string_view arg : args) {
store_args[i++] = arg;
if (i >= store_args.size())
break;
}
cmd_args = absl::MakeSpan(store_args);
}
RecordJournal(op_args, cmd, cmd_args, op_args.tx->GetUniqueShardCnt());
} else {
// journal [0, i)
cmd = "MSET";
cmd_args = ArgSlice(args.begin(), index);
RecordJournal(op_args, "MSET", args, op_args.tx->GetUniqueShardCnt());
}
RecordJournal(op_args, cmd, cmd_args, op_args.tx->GetUniqueShardCnt());
}
}

Expand Down Expand Up @@ -1161,16 +1174,17 @@ void StringFamily::MGet(CmdArgList args, ConnectionContext* cntx) {
src.storage_list->next = res.storage_list;
res.storage_list = src.storage_list;
src.storage_list = nullptr;

for (size_t j = 0; j < src.resp_arr.size(); ++j) {
if (!src.resp_arr[j])
ShardArgs shard_args = transaction->GetShardArgs(sid);
unsigned src_indx = 0;
for (auto it = shard_args.begin(); it != shard_args.end(); ++it, ++src_indx) {
if (!src.resp_arr[src_indx])
continue;

uint32_t indx = transaction->ReverseArgIndex(sid, j);
uint32_t indx = it.index();

res.resp_arr[indx] = std::move(src.resp_arr[j]);
res.resp_arr[indx] = std::move(src.resp_arr[src_indx]);
if (cntx->protocol() == Protocol::MEMCACHE) {
res.resp_arr[indx]->key = ArgS(args, indx);
res.resp_arr[indx]->key = *it;
}
}
}
Expand Down Expand Up @@ -1486,9 +1500,7 @@ void StringFamily::Register(CommandRegistry* registry) {
<< CI{"GETEX", CO::WRITE | CO::DENYOOM | CO::FAST | CO::NO_AUTOJOURNAL, -1, 1, 1, acl::kGetEx}
.HFUNC(GetEx)
<< CI{"GETSET", CO::WRITE | CO::DENYOOM | CO::FAST, 3, 1, 1, acl::kGetSet}.HFUNC(GetSet)
<< CI{"MGET", CO::READONLY | CO::FAST | CO::REVERSE_MAPPING | CO::IDEMPOTENT, -2, 1, -1,
acl::kMGet}
.HFUNC(MGet)
<< CI{"MGET", CO::READONLY | CO::FAST | CO::IDEMPOTENT, -2, 1, -1, acl::kMGet}.HFUNC(MGet)
<< CI{"MSET", kMSetMask, -3, 1, -1, acl::kMSet}.HFUNC(MSet)
<< CI{"MSETNX", kMSetMask, -3, 1, -1, acl::kMSetNx}.HFUNC(MSetNx)
<< CI{"STRLEN", CO::READONLY | CO::FAST, 2, 1, 1, acl::kStrLen}.HFUNC(StrLen)
Expand Down
Loading

0 comments on commit de0e5cb

Please sign in to comment.