Skip to content

Commit

Permalink
Add the ApplyBatch command for data migration scenario (#2010)
Browse files Browse the repository at this point in the history
Co-authored-by: git-hulk <[email protected]>
  • Loading branch information
caipengbo and git-hulk authored Jan 14, 2024
1 parent 7cbf0a5 commit 9d656f1
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 4 deletions.
36 changes: 35 additions & 1 deletion src/commands/cmd_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1255,6 +1255,39 @@ class CommandReset : public Commander {
}
};

class CommandApplyBatch : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
raw_batch_ = args[1];
if (args.size() > 2) {
if (args.size() > 3) {
return {Status::RedisParseErr, errWrongNumOfArguments};
}
if (!util::EqualICase(args[2], "lowpri")) {
return {Status::RedisParseErr, "only support LOWPRI option"};
}
low_pri_ = true;
}
return Commander::Parse(args);
}

Status Execute(Server *svr, Connection *conn, std::string *output) override {
size_t size = raw_batch_.size();
auto options = svr->storage->DefaultWriteOptions();
options.low_pri = low_pri_;
auto s = svr->storage->ApplyWriteBatch(options, std::move(raw_batch_));
if (!s.IsOK()) {
return {Status::RedisExecErr, s.Msg()};
}
*output = redis::Integer(size);
return Status::OK();
}

private:
std::string raw_batch_;
bool low_pri_ = false;
};

REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandAuth>("auth", 2, "read-only ok-loading", 0, 0, 0),
MakeCmdAttr<CommandPing>("ping", -1, "read-only", 0, 0, 0),
MakeCmdAttr<CommandSelect>("select", 2, "read-only", 0, 0, 0),
Expand Down Expand Up @@ -1291,5 +1324,6 @@ REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandAuth>("auth", 2, "read-only ok-loadin
MakeCmdAttr<CommandStats>("stats", 1, "read-only", 0, 0, 0),
MakeCmdAttr<CommandRdb>("rdb", -3, "write exclusive", 0, 0, 0),
MakeCmdAttr<CommandAnalyze>("analyze", -1, "", 0, 0, 0),
MakeCmdAttr<CommandReset>("reset", -1, "multi pub-sub", 0, 0, 0), )
MakeCmdAttr<CommandReset>("reset", -1, "multi pub-sub", 0, 0, 0),
MakeCmdAttr<CommandApplyBatch>("applybatch", -2, "write no-multi", 0, 0, 0), )
} // namespace redis
8 changes: 5 additions & 3 deletions src/storage/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -661,16 +661,18 @@ rocksdb::Status Storage::FlushScripts(const rocksdb::WriteOptions &options, rock
}

Status Storage::ReplicaApplyWriteBatch(std::string &&raw_batch) {
return ApplyWriteBatch(write_opts_, std::move(raw_batch));
}

Status Storage::ApplyWriteBatch(const rocksdb::WriteOptions &options, std::string &&raw_batch) {
if (db_size_limit_reached_) {
return {Status::NotOK, "reach space limit"};
}

auto batch = rocksdb::WriteBatch(std::move(raw_batch));
auto s = db_->Write(write_opts_, &batch);
auto s = db_->Write(options, &batch);
if (!s.ok()) {
return {Status::NotOK, s.ToString()};
}

return Status::OK();
}

Expand Down
1 change: 1 addition & 0 deletions src/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ class Storage {
Status RestoreFromCheckpoint();
Status GetWALIter(rocksdb::SequenceNumber seq, std::unique_ptr<rocksdb::TransactionLogIterator> *iter);
Status ReplicaApplyWriteBatch(std::string &&raw_batch);
Status ApplyWriteBatch(const rocksdb::WriteOptions &options, std::string &&raw_batch);
rocksdb::SequenceNumber LatestSeqNumber();

[[nodiscard]] rocksdb::Status Get(const rocksdb::ReadOptions &options, const rocksdb::Slice &key, std::string *value);
Expand Down
58 changes: 58 additions & 0 deletions tests/gocase/unit/applybatch/applybatch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package applybatch

import (
"context"
"encoding/hex"
"testing"

"github.com/apache/kvrocks/tests/gocase/util"
"github.com/stretchr/testify/require"
)

func TestApplyBatch_Basic(t *testing.T) {
srv := util.StartServer(t, map[string]string{})
defer srv.Close()

ctx := context.Background()
rdb := srv.NewClient()
defer func() { require.NoError(t, rdb.Close()) }()

t.Run("Make sure the apply batch command works", func(t *testing.T) {
// SET a 1
batch, err := hex.DecodeString("04000000000000000100000003013105010D0B5F5F6E616D6573706163656106010000000031")
require.NoError(t, err)
r := rdb.Do(ctx, "ApplyBatch", string(batch))
val, err := r.Int64()
require.NoError(t, err)
require.EqualValues(t, len(batch), val)
require.Equal(t, "1", rdb.Get(ctx, "a").Val())

// HSET hash field value
batch, err = hex.DecodeString("05000000000000000200000003013201210B5F5F6E616D65737061636500000004686173683076F331696342A76669656C640576616C75650501100B5F5F6E616D657370616365686173681102000000003076F331696342A700000002")
require.NoError(t, err)
r = rdb.Do(ctx, "ApplyBatch", string(batch))
val, err = r.Int64()
require.NoError(t, err)
require.EqualValues(t, len(batch), val)
require.Equal(t, "value", rdb.HGet(ctx, "hash", "field").Val())
})
}

0 comments on commit 9d656f1

Please sign in to comment.