Skip to content

Commit

Permalink
Revert "chore: get rid of kv_args and replace it with slices to full_… (
Browse files Browse the repository at this point in the history
#3024)

Revert "chore: get rid of kv_args and replace it with slices to full_args (#2942)"

This reverts commit de0e5cb.
  • Loading branch information
romange authored May 8, 2024
1 parent 5c4279c commit 25e6930
Show file tree
Hide file tree
Showing 13 changed files with 185 additions and 216 deletions.
5 changes: 5 additions & 0 deletions src/server/command_registry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ CommandId::CommandId(const char* name, uint32_t mask, int8_t arity, int8_t first
: facade::CommandId(name, mask, arity, first_key, last_key, acl_categories) {
if (mask & CO::ADMIN)
opt_mask_ |= CO::NOSCRIPT;

if (mask & CO::BLOCKING)
opt_mask_ |= CO::REVERSE_MAPPING;
}

bool CommandId::IsTransactional() const {
Expand Down Expand Up @@ -170,6 +173,8 @@ const char* OptName(CO::CommandOpt fl) {
return "readonly";
case DENYOOM:
return "denyoom";
case REVERSE_MAPPING:
return "reverse-mapping";
case FAST:
return "fast";
case LOADING:
Expand Down
7 changes: 5 additions & 2 deletions src/server/command_registry.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,16 @@ enum CommandOpt : uint32_t {
LOADING = 1U << 3, // Command allowed during LOADING state.
DENYOOM = 1U << 4, // use-memory in redis.

// UNUSED = 1U << 5,
// marked commands that demand preserve the order of keys to work correctly.
// For example, MGET needs to know the order of keys to return the values in the same order.
// BLPOP needs to know the order of keys to return the first non-empty list from the left.
REVERSE_MAPPING = 1U << 5,

VARIADIC_KEYS = 1U << 6, // arg 2 determines number of keys. Relevant for ZUNIONSTORE, EVAL etc.

ADMIN = 1U << 7, // implies NOSCRIPT,
NOSCRIPT = 1U << 8,
BLOCKING = 1U << 9,
BLOCKING = 1U << 9, // implies REVERSE_MAPPING
HIDDEN = 1U << 10, // does not show in COMMAND command output
INTERLEAVED_KEYS = 1U << 11, // keys are interleaved with arguments
GLOBAL_TRANS = 1U << 12,
Expand Down
12 changes: 7 additions & 5 deletions src/server/container_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@ OpResult<std::pair<DbSlice::ConstIterator, unsigned>> FindFirstReadOnly(const Db
int req_obj_type) {
DCHECK(!args.Empty());

for (auto it = args.begin(); it != args.end(); ++it) {
OpResult<DbSlice::ConstIterator> res = db_slice.FindReadOnly(cntx, *it, req_obj_type);
unsigned i = 0;
for (string_view key : args) {
OpResult<DbSlice::ConstIterator> res = db_slice.FindReadOnly(cntx, key, req_obj_type);
if (res)
return make_pair(res.value(), unsigned(it.index()));
return make_pair(res.value(), i);
if (res.status() != OpStatus::KEY_NOTFOUND)
return res.status();
++i;
}

VLOG(2) << "FindFirst not found";
Expand Down Expand Up @@ -117,8 +119,8 @@ OpResult<ShardFFResult> FindFirstNonEmpty(Transaction* trans, int req_obj_type)
auto comp = [trans](const OpResult<FFResult>& lhs, const OpResult<FFResult>& rhs) {
if (!lhs || !rhs)
return lhs.ok();
size_t i1 = std::get<1>(*lhs);
size_t i2 = std::get<1>(*rhs);
size_t i1 = trans->ReverseArgIndex(std::get<ShardId>(*lhs), std::get<unsigned>(*lhs));
size_t i2 = trans->ReverseArgIndex(std::get<ShardId>(*rhs), std::get<unsigned>(*rhs));
return i1 < i2;
};

Expand Down
6 changes: 2 additions & 4 deletions src/server/journal/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,15 @@ struct Entry : public EntryBase {
struct Payload {
std::string_view cmd;
std::variant<CmdArgList, // Parts of a full command.
ShardArgs, // Shard parts.
ArgSlice>
ShardArgs // Command and its shard parts.
>
args;

Payload() = default;
Payload(std::string_view c, CmdArgList a) : cmd(c), args(a) {
}
Payload(std::string_view c, const ShardArgs& a) : cmd(c), args(a) {
}
Payload(std::string_view c, ArgSlice a) : cmd(c), args(a) {
}
};

Entry(TxId txid, Op opcode, DbIndex dbid, uint32_t shard_cnt,
Expand Down
13 changes: 6 additions & 7 deletions src/server/json_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1543,14 +1543,12 @@ void JsonFamily::MGet(CmdArgList args, ConnectionContext* cntx) {
continue;

vector<OptString>& res = mget_resp[sid];
ShardArgs shard_args = transaction->GetShardArgs(sid);
unsigned src_index = 0;
for (auto it = shard_args.begin(); it != shard_args.end(); ++it, ++src_index) {
if (!res[src_index])
for (size_t j = 0; j < res.size(); ++j) {
if (!res[j])
continue;

uint32_t dst_indx = it.index();
results[dst_indx] = std::move(res[src_index]);
uint32_t indx = transaction->ReverseArgIndex(sid, j);
results[indx] = std::move(res[j]);
}
}

Expand Down Expand Up @@ -2093,7 +2091,8 @@ void JsonFamily::Register(CommandRegistry* registry) {
constexpr size_t kMsetFlags = CO::WRITE | CO::DENYOOM | CO::FAST | CO::INTERLEAVED_KEYS;
registry->StartFamily();
*registry << CI{"JSON.GET", CO::READONLY | CO::FAST, -2, 1, 1, acl::JSON}.HFUNC(Get);
*registry << CI{"JSON.MGET", CO::READONLY | CO::FAST, -3, 1, -2, acl::JSON}.HFUNC(MGet);
*registry << CI{"JSON.MGET", CO::READONLY | CO::FAST | CO::REVERSE_MAPPING, -3, 1, -2, acl::JSON}
.HFUNC(MGet);
*registry << CI{"JSON.TYPE", CO::READONLY | CO::FAST, 3, 1, 1, acl::JSON}.HFUNC(Type);
*registry << CI{"JSON.STRLEN", CO::READONLY | CO::FAST, 3, 1, 1, acl::JSON}.HFUNC(StrLen);
*registry << CI{"JSON.OBJLEN", CO::READONLY | CO::FAST, 3, 1, 1, acl::JSON}.HFUNC(ObjLen);
Expand Down
27 changes: 3 additions & 24 deletions src/server/list_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -158,22 +158,6 @@ struct CircularMessages {
// Used to recover logs for BLPOP failures. See OpBPop.
thread_local CircularMessages debugMessages{50};

// A bit awkward translation from a single key to ShardArgs.
// We create a mutable slice (which will never be mutated) from the key, then we create
// a CmdArgList of size 1 that references mslice and finally
// we reference the first element in the CmdArgList via islice.
struct SingleArg {
MutableSlice mslice;
IndexSlice islice{0, 1};

SingleArg(string_view arg) : mslice(const_cast<char*>(arg.data()), arg.size()) {
}

ShardArgs Get() {
return ShardArgs{CmdArgList{&mslice, 1}, absl::MakeSpan(&islice, 1)};
}
};

class BPopPusher {
public:
BPopPusher(string_view pop_key, string_view push_key, ListDir popdir, ListDir pushdir);
Expand Down Expand Up @@ -464,9 +448,7 @@ OpResult<string> MoveTwoShards(Transaction* trans, string_view src, string_view
// hack, again. since we hacked which queue we are waiting on (see RunPair)
// we must clean-up src key here manually. See RunPair why we do this.
// in short- we suspended on "src" on both shards.

SingleArg single_arg{src};
shard->blocking_controller()->FinalizeWatched(single_arg.Get(), t);
shard->blocking_controller()->FinalizeWatched(ArgSlice{&src, 1}, t);
}
} else {
DVLOG(1) << "Popping value from list: " << key;
Expand Down Expand Up @@ -891,8 +873,7 @@ OpResult<string> BPopPusher::RunSingle(ConnectionContext* cntx, time_point tp) {
return op_res;
}

SingleArg single_arg{pop_key_};
auto wcb = [&](Transaction* t, EngineShard* shard) { return single_arg.Get(); };
auto wcb = [&](Transaction* t, EngineShard* shard) { return ShardArgs{&this->pop_key_, 1}; };

const auto key_checker = [](EngineShard* owner, const DbContext& context, Transaction*,
std::string_view key) -> bool {
Expand All @@ -919,13 +900,11 @@ OpResult<string> BPopPusher::RunPair(ConnectionContext* cntx, time_point tp) {
return op_res;
}

SingleArg single_arg(this->pop_key_);

// a hack: we watch in both shards for pop_key but only in the source shard it's relevant.
// Therefore we follow the regular flow of watching the key but for the destination shard it
// will never be triggerred.
// This allows us to run Transaction::Execute on watched transactions in both shards.
auto wcb = [&](Transaction* t, EngineShard* shard) { return single_arg.Get(); };
auto wcb = [&](Transaction* t, EngineShard* shard) { return ArgSlice{&this->pop_key_, 1}; };

const auto key_checker = [](EngineShard* owner, const DbContext& context, Transaction*,
std::string_view key) -> bool {
Expand Down
12 changes: 5 additions & 7 deletions src/server/stream_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2989,19 +2989,17 @@ void XReadImpl(CmdArgList args, std::optional<ReadOpts> opts, ConnectionContext*
continue;

vector<RecordVec>& results = xread_resp[sid];
unsigned src_index = 0;
ShardArgs shard_args = cntx->transaction->GetShardArgs(sid);

for (auto it = shard_args.begin(); it != shard_args.end(); ++it, ++src_index) {
if (results[src_index].size() == 0) {
for (size_t i = 0; i < results.size(); ++i) {
if (results[i].size() == 0) {
continue;
}

resolved_streams++;

// Add the stream records ordered by the original stream arguments.
size_t dst_indx = it.index();
res[dst_indx - opts->streams_arg] = std::move(results[src_index]);
size_t indx = cntx->transaction->ReverseArgIndex(sid, i);
res[indx - opts->streams_arg] = std::move(results[i]);
}
}

Expand Down Expand Up @@ -3325,7 +3323,7 @@ constexpr uint32_t kXAutoClaim = WRITE | STREAM | FAST;
void StreamFamily::Register(CommandRegistry* registry) {
using CI = CommandId;
registry->StartFamily();
constexpr auto kReadFlags = CO::READONLY | CO::BLOCKING | CO::VARIADIC_KEYS;
constexpr auto kReadFlags = CO::READONLY | CO::BLOCKING | CO::REVERSE_MAPPING | CO::VARIADIC_KEYS;
*registry << CI{"XADD", CO::WRITE | CO::DENYOOM | CO::FAST, -5, 1, 1, acl::kXAdd}.HFUNC(XAdd)
<< CI{"XCLAIM", CO::WRITE | CO::FAST, -6, 1, 1, acl::kXClaim}.HFUNC(XClaim)
<< CI{"XDEL", CO::WRITE | CO::FAST, -3, 1, 1, acl::kXDel}.HFUNC(XDel)
Expand Down
50 changes: 19 additions & 31 deletions src/server/string_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -270,15 +270,13 @@ void OpMSet(const OpArgs& op_args, const ShardArgs& args, atomic_bool* success)
SetCmd sg(op_args, false);

size_t index = 0;
bool partial = false;
for (auto it = args.begin(); it != args.end(); ++it) {
string_view key = *it;
++it;
string_view value = *it;
DVLOG(1) << "MSet " << key << ":" << value;
if (sg.Set(params, key, value) != OpStatus::OK) { // OOM for example.
success->store(false);
partial = true;
break;
}
index += 2;
Expand All @@ -287,29 +285,18 @@ void OpMSet(const OpArgs& op_args, const ShardArgs& args, atomic_bool* success)
if (auto journal = op_args.shard->journal(); journal) {
// We write a custom journal because an OOM in the above loop could lead to partial success, so
// we replicate only what was changed.
if (partial) {
string_view cmd;
ArgSlice cmd_args;
vector<string_view> store_args(index);
if (index == 0) {
// All shards must record the tx was executed for the replica to execute it, so we send a
// PING in case nothing was changed
cmd = "PING";
} else {
// journal [0, i)
cmd = "MSET";
unsigned i = 0;
for (string_view arg : args) {
store_args[i++] = arg;
if (i >= store_args.size())
break;
}
cmd_args = absl::MakeSpan(store_args);
}
RecordJournal(op_args, cmd, cmd_args, op_args.tx->GetUniqueShardCnt());
string_view cmd;
ArgSlice cmd_args;
if (index == 0) {
// All shards must record the tx was executed for the replica to execute it, so we send a PING
// in case nothing was changed
cmd = "PING";
} else {
RecordJournal(op_args, "MSET", args, op_args.tx->GetUniqueShardCnt());
// journal [0, i)
cmd = "MSET";
cmd_args = ArgSlice(args.begin(), index);
}
RecordJournal(op_args, cmd, cmd_args, op_args.tx->GetUniqueShardCnt());
}
}

Expand Down Expand Up @@ -1179,17 +1166,16 @@ void StringFamily::MGet(CmdArgList args, ConnectionContext* cntx) {
src.storage_list->next = res.storage_list;
res.storage_list = src.storage_list;
src.storage_list = nullptr;
ShardArgs shard_args = transaction->GetShardArgs(sid);
unsigned src_indx = 0;
for (auto it = shard_args.begin(); it != shard_args.end(); ++it, ++src_indx) {
if (!src.resp_arr[src_indx])

for (size_t j = 0; j < src.resp_arr.size(); ++j) {
if (!src.resp_arr[j])
continue;

uint32_t indx = it.index();
uint32_t indx = transaction->ReverseArgIndex(sid, j);

res.resp_arr[indx] = std::move(src.resp_arr[src_indx]);
res.resp_arr[indx] = std::move(src.resp_arr[j]);
if (cntx->protocol() == Protocol::MEMCACHE) {
res.resp_arr[indx]->key = *it;
res.resp_arr[indx]->key = ArgS(args, indx);
}
}
}
Expand Down Expand Up @@ -1505,7 +1491,9 @@ void StringFamily::Register(CommandRegistry* registry) {
<< CI{"GETEX", CO::WRITE | CO::DENYOOM | CO::FAST | CO::NO_AUTOJOURNAL, -1, 1, 1, acl::kGetEx}
.HFUNC(GetEx)
<< CI{"GETSET", CO::WRITE | CO::DENYOOM | CO::FAST, 3, 1, 1, acl::kGetSet}.HFUNC(GetSet)
<< CI{"MGET", CO::READONLY | CO::FAST | CO::IDEMPOTENT, -2, 1, -1, acl::kMGet}.HFUNC(MGet)
<< CI{"MGET", CO::READONLY | CO::FAST | CO::REVERSE_MAPPING | CO::IDEMPOTENT, -2, 1, -1,
acl::kMGet}
.HFUNC(MGet)
<< CI{"MSET", kMSetMask, -3, 1, -1, acl::kMSet}.HFUNC(MSet)
<< CI{"MSETNX", kMSetMask, -3, 1, -1, acl::kMSetNx}.HFUNC(MSetNx)
<< CI{"STRLEN", CO::READONLY | CO::FAST, 2, 1, 1, acl::kStrLen}.HFUNC(StrLen)
Expand Down
Loading

0 comments on commit 25e6930

Please sign in to comment.