Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support count min sketch data structure and most commands #2524

Open
wants to merge 39 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
c9626df
CountMinSketch Additions
jonathanc-n Sep 5, 2024
bb48c5b
adding redis_cms
jonathanc-n Sep 7, 2024
b4359b5
Merge branch 'unstable' into cms
jonathanc-n Sep 7, 2024
a1b904a
small changes
jonathanc-n Sep 7, 2024
8cab9c9
Merge branch 'cms' of https://github.com/jonathanc-n/kvrocks into cms
jonathanc-n Sep 7, 2024
ecbaa44
Resolved changes
jonathanc-n Sep 8, 2024
bf41c60
Merge branch 'unstable' into cms
jonathanc-n Sep 9, 2024
3fb9f1f
small tweaks
jonathanc-n Sep 10, 2024
a92dc08
parse change
jonathanc-n Sep 10, 2024
e59e2b7
Merge branch 'unstable' into cms
jonathanc-n Sep 10, 2024
b8b0bf2
format changes
jonathanc-n Sep 12, 2024
dd1c4f5
tweaks + lint
jonathanc-n Sep 14, 2024
509d5ab
max memory checks for initbyprob command
jonathanc-n Sep 14, 2024
f19e364
Merge branch 'unstable' into cms
jonathanc-n Sep 14, 2024
c1c1e7f
Merge branch 'unstable' into cms
jonathanc-n Sep 16, 2024
f21792c
Merge branch 'unstable' into cms
jonathanc-n Sep 17, 2024
1e4e747
Update src/types/redis_cms.h
jonathanc-n Sep 18, 2024
a71d7f7
all fixes + go test case
jonathanc-n Sep 19, 2024
202a2f5
Merge branch 'unstable' into cms
jonathanc-n Sep 20, 2024
7743f7b
Merge branch 'unstable' into cms
jonathanc-n Sep 21, 2024
ede5481
Merge branch 'unstable' into cms
jonathanc-n Sep 22, 2024
551f3c8
Added additional test cases
jonathanc-n Sep 23, 2024
a0222d7
lint fix
jonathanc-n Sep 23, 2024
9a2fe26
Merge branch 'unstable' into cms
jonathanc-n Sep 27, 2024
de0c2ad
Update src/types/redis_cms.h
jonathanc-n Sep 29, 2024
69d6ea4
Fixes
jonathanc-n Sep 29, 2024
db73151
Merge branch 'cms' of https://github.com/jonathanc-n/kvrocks into cms
jonathanc-n Sep 29, 2024
0cdfb07
Small Changes
jonathanc-n Sep 29, 2024
02347df
logic fix
jonathanc-n Sep 29, 2024
606f9b5
lint
jonathanc-n Sep 29, 2024
0445b3d
Quick Fixes
jonathanc-n Sep 29, 2024
48e9bc2
lint fix
jonathanc-n Sep 29, 2024
f577146
typing fixes
jonathanc-n Sep 30, 2024
ab277e9
one mb limit update per key
jonathanc-n Sep 30, 2024
f2f1288
Update src/types/redis_cms.cc
jonathanc-n Sep 30, 2024
ca60e13
Update src/types/redis_cms.cc
jonathanc-n Sep 30, 2024
df7a7b2
[WIP] Some codereview check
mapleFU Sep 30, 2024
e64056f
Merge branch 'unstable' into cms
aleksraiden Oct 5, 2024
bc81447
Update: making content pass test
mapleFU Oct 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
277 changes: 277 additions & 0 deletions src/commands/cmd_cms.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

#include <types/cms.h>
#include <types/redis_cms.h>

#include "commander.h"
#include "commands/command_parser.h"
#include "parse_util.h"
#include "server/redis_reply.h"
#include "server/server.h"

namespace redis {

/// CMS.INCRBY key item increment [item increment ...]
///
/// The `key` should be an existing Count-Min Sketch key,
/// otherwise, the command will return an error.
///
/// The output should be an array of integers, each integer
/// means the counter value after the increment. If the increment
/// overflows, the return value will be `"CMS: INCRBY overflow"`.
class CommandCMSIncrBy final : public Commander {
public:
Status Execute(Server *srv, Connection *conn, std::string *output) override {
if ((args_.size() - 2) % 2 != 0) {
return {Status::RedisParseErr, errWrongNumOfArguments};
}
redis::CMS cms(srv->storage, conn->GetNamespace());
engine::Context ctx(srv->storage);
rocksdb::Status s;
// <item, increment> pairs
std::vector<redis::CMS::IncrByPair> elements;
elements.reserve((args_.size() - 2) / 2);
for (size_t i = 2; i < args_.size(); i += 2) {
std::string_view key = args_[i];
auto parse_result = ParseInt<int64_t>(args_[i + 1]);
if (!parse_result) {
return {Status::RedisParseErr, errValueNotInteger};
}
int64_t value = *parse_result;
elements.emplace_back(redis::CMS::IncrByPair{key, value});
}
std::vector<uint32_t> counters;
s = cms.IncrBy(ctx, args_[1], elements, &counters);
if (s.IsNotFound()) {
return {Status::RedisExecErr, "Key not found"};
jonathanc-n marked this conversation as resolved.
Show resolved Hide resolved
}
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}
std::vector<std::string> strs;
for (uint32_t counter : counters) {
if (counter == std::numeric_limits<uint32_t>::max()) {
strs.push_back(redis::Error({Status::RedisExecErr, "CMS: INCRBY overflow"}));
} else {
strs.push_back(redis::Integer(counter));
}
}
*output = redis::Array(strs);
return Status::OK();
}
};

/// CMS.INFO key
class CommandCMSInfo final : public Commander {
public:
Status Execute(Server *srv, Connection *conn, std::string *output) override {
redis::CMS cms(srv->storage, conn->GetNamespace());
engine::Context ctx(srv->storage);
rocksdb::Status s;
CMSketch::CMSInfo ret{};

s = cms.Info(ctx, args_[1], &ret);

if (s.IsNotFound()) {
return {Status::RedisExecErr, "Key not found"};
}

if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

*output = redis::Array({redis::BulkString("width"), redis::Integer(ret.width), redis::BulkString("depth"),
redis::Integer(ret.depth), redis::BulkString("count"), redis::Integer(ret.count)});

return Status::OK();
}
};

/// CMS.INITBYDIM key width depth
///
/// If the key already exists, the command will return an error.
class CommandCMSInitByDim final : public Commander {
public:
Status Execute(Server *srv, Connection *conn, std::string *output) override {
redis::CMS cms(srv->storage, conn->GetNamespace());
engine::Context ctx(srv->storage);
rocksdb::Status s;
auto width_result = ParseInt<uint32_t>(this->args_[2]);
if (!width_result) {
return {Status::RedisParseErr, errValueNotInteger};
}
uint32_t width = *width_result;

auto depth_result = ParseInt<uint32_t>(this->args_[3]);
if (!depth_result) {
return {Status::RedisParseErr, errValueNotInteger};
}
uint32_t depth = *depth_result;

s = cms.InitByDim(ctx, args_[1], width, depth);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

*output = redis::SimpleString("OK");
return Status::OK();
}
};

/// CMS.INITBYPROB key error probability
///
/// If the key already exists, the command will return an error.
class CommandCMSInitByProb final : public Commander {
public:
Status Execute(Server *srv, Connection *conn, std::string *output) override {
redis::CMS cms(srv->storage, conn->GetNamespace());
engine::Context ctx(srv->storage);
rocksdb::Status s;

auto error_result = ParseFloat<double>(args_[2]);
if (!error_result) {
return {Status::RedisParseErr, errValueNotInteger};
}
double error = *error_result;

auto delta_result = ParseFloat<double>(args_[3]);
if (!delta_result) {
return {Status::RedisParseErr, errValueNotInteger};
}
double delta = *delta_result;

s = cms.InitByProb(ctx, args_[1], error, delta);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

*output = redis::SimpleString("OK");
return Status::OK();
}
};

/// CMS.MERGE destination numKeys source [source ...] [WEIGHTS weight [weight ...]]
class CommandCMSMerge final : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
CommandParser parser(args, 2);
destination_ = args[1];

StatusOr<int> num_key_result = parser.TakeInt();
if (!num_key_result || *num_key_result <= 0) {
return {Status::RedisParseErr, "invalid number of source keys"};
}
num_keys_ = *num_key_result;

src_keys_.reserve(num_keys_);
for (int i = 0; i < num_keys_; i++) {
auto result = parser.TakeStr();
if (!result) {
return {Status::RedisParseErr, "Error parsing source key"};
}
src_keys_.emplace_back(std::move(*result));
}

bool weights_found = false;
while (parser.Good()) {
// Parse "WEIGHTS" if exists.
if (parser.EatEqICase("WEIGHTS")) {
if (weights_found) {
return {Status::RedisParseErr, "WEIGHTS option cannot be specified multiple times"};
}
src_weights_.reserve(num_keys_);
for (int i = 0; i < num_keys_; i++) {
StatusOr<uint32_t> weight_result = parser.TakeInt<uint32_t>();
if (!weight_result || *weight_result == 0) {
return {Status::RedisParseErr, "invalid weight value"};
}
src_weights_.emplace_back(*weight_result);
}
weights_found = true;
} else {
return {Status::RedisParseErr, "Syntax error: unexpected token"};
}
}

if (!weights_found) {
src_weights_.resize(num_keys_, 1);
}

return Status::OK();
}

Status Execute(Server *srv, Connection *conn, std::string *output) override {
redis::CMS cms(srv->storage, conn->GetNamespace());
engine::Context ctx(srv->storage);

rocksdb::Status s = cms.MergeUserKeys(ctx, destination_, src_keys_, src_weights_);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

*output = redis::SimpleString("OK");
return Status::OK();
}

private:
Slice destination_;
int num_keys_{0};
std::vector<Slice> src_keys_;
std::vector<uint32_t> src_weights_;
};

/// CMS.QUERY key item [item ...]
class CommandCMSQuery final : public Commander {
public:
Status Execute(Server *srv, Connection *conn, std::string *output) override {
redis::CMS cms(srv->storage, conn->GetNamespace());
engine::Context ctx(srv->storage);

std::vector<uint32_t> counters{};
std::vector<std::string> elements;

for (size_t i = 2; i < args_.size(); ++i) {
elements.emplace_back(args_[i]);
}

rocksdb::Status s = cms.Query(ctx, args_[1], elements, counters);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

std::vector<std::string> output_values;
output_values.reserve(counters.size());
for (const auto &counter : counters) {
output_values.emplace_back(Integer(counter));
}
*output = redis::Array(output_values);

return Status::OK();
}
};

REDIS_REGISTER_COMMANDS(CMS, MakeCmdAttr<CommandCMSIncrBy>("cms.incrby", -4, "write", 0, 0, 0),
MakeCmdAttr<CommandCMSInfo>("cms.info", 2, "read-only", 0, 0, 0),
MakeCmdAttr<CommandCMSInitByDim>("cms.initbydim", 4, "write", 0, 0, 0),
MakeCmdAttr<CommandCMSInitByProb>("cms.initbyprob", 4, "write", 0, 0, 0),
MakeCmdAttr<CommandCMSMerge>("cms.merge", -4, "write", 0, 0, 0),
MakeCmdAttr<CommandCMSQuery>("cms.query", -3, "read-only", 0, 0, 0), );
} // namespace redis
1 change: 1 addition & 0 deletions src/commands/commander.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ enum class CommandCategory : uint8_t {
Bit,
BloomFilter,
Cluster,
CMS,
jonathanc-n marked this conversation as resolved.
Show resolved Hide resolved
Function,
Geo,
Hash,
Expand Down
42 changes: 41 additions & 1 deletion src/storage/redis_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,9 @@ bool Metadata::ExpireAt(uint64_t expired_ts) const {
return expire < expired_ts;
}

bool Metadata::IsSingleKVType() const { return Type() == kRedisString || Type() == kRedisJson; }
bool Metadata::IsSingleKVType() const {
return Type() == kRedisString || Type() == kRedisJson || Type() == kRedisCountMinSketch;
}

bool Metadata::IsEmptyableType() const {
return IsSingleKVType() || Type() == kRedisStream || Type() == kRedisBloomFilter || Type() == kRedisHyperLogLog;
Expand Down Expand Up @@ -495,3 +497,41 @@ rocksdb::Status HyperLogLogMetadata::Decode(Slice *input) {

return rocksdb::Status::OK();
}

void CountMinSketchMetadata::Encode(std::string *dst) const {
Metadata::Encode(dst);
PutFixed32(dst, width);
PutFixed32(dst, depth);
PutFixed64(dst, counter);
for (const auto &count : array) {
PutFixed32(dst, count);
}
}

rocksdb::Status CountMinSketchMetadata::Decode(Slice *input) {
if (auto s = Metadata::Decode(input); !s.ok()) {
return s;
}
if (!GetFixed32(input, &width)) {
return rocksdb::Status::InvalidArgument(kErrMetadataTooShort);
}
if (!GetFixed32(input, &depth)) {
return rocksdb::Status::InvalidArgument(kErrMetadataTooShort);
}
if (!GetFixed64(input, &counter)) {
return rocksdb::Status::InvalidArgument(kErrMetadataTooShort);
}

size_t array_size = width * depth;
array.resize(array_size);

for (size_t i = 0; i < array_size; ++i) {
uint32_t count = 0;
if (!GetFixed32(input, &count)) {
return rocksdb::Status::InvalidArgument(kErrMetadataTooShort);
}
array[i] = count;
}

return rocksdb::Status::OK();
}
19 changes: 16 additions & 3 deletions src/storage/redis_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ enum RedisType : uint8_t {
kRedisBloomFilter = 9,
kRedisJson = 10,
kRedisHyperLogLog = 11,
kRedisCountMinSketch = 12,
};

struct RedisTypes {
Expand Down Expand Up @@ -91,9 +92,9 @@ enum RedisCommand {
kRedisCmdLMove,
};

const std::vector<std::string> RedisTypeNames = {"none", "string", "hash", "list",
"set", "zset", "bitmap", "sortedint",
"stream", "MBbloom--", "ReJSON-RL", "hyperloglog"};
const std::vector<std::string> RedisTypeNames = {"none", "string", "hash", "list", "set",
"zset", "bitmap", "sortedint", "stream", "MBbloom--",
"ReJSON-RL", "hyperloglog", "countminsketch"};

constexpr const char *kErrMsgWrongType = "WRONGTYPE Operation against a key holding the wrong kind of value";
constexpr const char *kErrMsgKeyExpired = "the key was expired";
Expand Down Expand Up @@ -335,3 +336,15 @@ class HyperLogLogMetadata : public Metadata {

EncodeType encode_type = EncodeType::DENSE;
};

class CountMinSketchMetadata : public Metadata {
public:
uint32_t width;
uint32_t depth;
uint64_t counter = 0;
std::vector<uint32_t> array;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@git-hulk @PragmaTwice

I'm thinking about handling the array. Since regarding the cmsketch as a string is ok to me, but parsing it to an std::vector is a bit weird to me. Can we take a Buffer and do zero copying when doing this?

Like:

LoadMetadata from string. When storing, forcing LittleEndian in the array
When reading from metadata, report error if not length enough, and hold a sliced buffer on underlying data
When read/write, using memcpy to avoid unaligned access.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it can be like how we handle the JSON data structure.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But here I'm more interested in whether it's better to be a single key or putting the array in subkeys.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But here I'm more interested in whether it's better to be a single key or putting the array in subkeys.

Actually I think this in single key is ok since we merely "only query metadata" if not calling "info"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the limit of the size of such an array? Do we need to consider to split it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if it's good to have one rocksdb key with a 50M-bytes value.

The default engine doesn't handle this well. There're some blog engine which could do this, and we can enable the blob in kvrocks here.

Perhaps we can limit the size to 1MB firstly? 🤔

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me.

Also yeah we should not put the array inside the metadata.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may try a round tonight

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll adjust it to 1MB limit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So do we want to hold the array in a buffer on storage, and do operations on that?


explicit CountMinSketchMetadata(bool generate_version = true) : Metadata(kRedisCountMinSketch, generate_version) {}
void Encode(std::string *dst) const override;
rocksdb::Status Decode(Slice *input) override;
};
Loading
Loading