Skip to content

Commit

Permalink
refactor(simple_kv): use rocksdb API to read/write file
Browse files Browse the repository at this point in the history
  • Loading branch information
acelyc111 committed Sep 21, 2023
1 parent c8793d2 commit 124a44e
Show file tree
Hide file tree
Showing 57 changed files with 339 additions and 81 deletions.
2 changes: 1 addition & 1 deletion src/replica/storage/simple_kv/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ set(MY_PROJ_SRC ${SIMPLE_KV_THRIFT_SRCS})
# "GLOB" for non-recursive search
set(MY_SRC_SEARCH_MODE "GLOB")

set(MY_PROJ_LIBS dsn_replica_server dsn_meta_server dsn_client dsn_runtime hashtable)
set(MY_PROJ_LIBS dsn_replica_server dsn_meta_server dsn_client dsn_runtime hashtable rocksdb)

set(MY_BOOST_LIBS Boost::system Boost::filesystem Boost::regex)

Expand Down
122 changes: 83 additions & 39 deletions src/replica/storage/simple_kv/simple_kv.server.impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,24 +35,36 @@

#include "simple_kv.server.impl.h"

#include <fcntl.h>
#include <fmt/core.h>
#include <inttypes.h>
#include <rocksdb/slice.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <atomic>
#include <fstream>
#include <memory>
#include <utility>
#include <vector>

#include "aio/aio_task.h"
#include "aio/file_io.h"
#include "common/replication.codes.h"
#include "consensus_types.h"
#include "replica/storage/simple_kv/simple_kv.server.h"
#include "rocksdb/env.h"
#include "rocksdb/status.h"
#include "runtime/serverlet.h"
#include "simple_kv_types.h"
#include "utils/autoref_ptr.h"
#include "utils/binary_reader.h"
#include "utils/blob.h"
#include "utils/env.h"
#include "utils/filesystem.h"
#include "utils/fmt_logging.h"
#include "utils/utils.h"

namespace dsn {
class blob;

namespace replication {
class replica;
Expand Down Expand Up @@ -175,37 +187,55 @@ void simple_kv_service_impl::recover(const std::string &name, int64_t version)
{
zauto_lock l(_lock);

std::ifstream is(name.c_str(), std::ios::binary);
if (!is.is_open())
return;
std::unique_ptr<rocksdb::SequentialFile> rfile;
auto s = rocksdb::Env::Default()->NewSequentialFile(name, &rfile, rocksdb::EnvOptions());
CHECK(s.ok(), "open log file '{}' failed, err = {}", name, s.ToString());

_store.clear();

uint64_t count;
int magic;

is.read((char *)&count, sizeof(count));
is.read((char *)&magic, sizeof(magic));
// Read header.
uint64_t count = 0;
int magic = 0;
rocksdb::Slice result;
static const uint64_t kHeaderSize = sizeof(count) + sizeof(magic);
char buff[kHeaderSize] = {0};
s = rfile->Read(kHeaderSize, &result, buff);
CHECK(s.ok(), "read header failed, err = {}", s.ToString());
CHECK(!result.empty(), "read EOF of file '{}'", name);

binary_reader reader(blob(buff, 0, kHeaderSize));
CHECK_EQ(sizeof(count), reader.read(count));
CHECK_EQ(sizeof(magic), reader.read(magic));
CHECK_EQ_MSG(magic, 0xdeadbeef, "invalid checkpoint");

// Read kv pairs.
for (uint64_t i = 0; i < count; i++) {
std::string key;
std::string value;

uint32_t sz;
is.read((char *)&sz, (uint32_t)sizeof(sz));
key.resize(sz);

is.read((char *)&key[0], sz);

is.read((char *)&sz, (uint32_t)sizeof(sz));
value.resize(sz);

is.read((char *)&value[0], sz);

// Read key.
uint32_t sz = 0;
s = rfile->Read(sizeof(sz), &result, (char *)&sz);
CHECK(s.ok(), "read key size failed, err = {}", s.ToString());
CHECK(!result.empty(), "read EOF of file '{}'", name);

std::shared_ptr<char> key_buffer(dsn::utils::make_shared_array<char>(sz));
s = rfile->Read(sz, &result, key_buffer.get());
CHECK(s.ok(), "read key failed, err = {}", s.ToString());
CHECK(!result.empty(), "read EOF of file '{}'", name);
std::string key = result.ToString();

// Read value.
s = rfile->Read(sizeof(sz), &result, (char *)&sz);
CHECK(s.ok(), "read value size failed, err = {}", s.ToString());
CHECK(!result.empty(), "read EOF of file '{}'", name);

std::shared_ptr<char> value_buffer(dsn::utils::make_shared_array<char>(sz));
s = rfile->Read(sz, &result, value_buffer.get());
CHECK(s.ok(), "read value failed, err = {}", s.ToString());
CHECK(!result.empty(), "read EOF of file '{}'", name);
std::string value = result.ToString();

// Store the kv pair.
_store[key] = value;
}
is.close();
}

::dsn::error_code simple_kv_service_impl::sync_checkpoint()
Expand All @@ -221,29 +251,43 @@ ::dsn::error_code simple_kv_service_impl::sync_checkpoint()
return ERR_OK;
}

std::ofstream os(name, std::ios::binary);
std::string fname = fmt::format("{}/checkpoint.{}", data_dir(), last_commit);
auto wfile = file::open(fname.c_str(), O_RDWR | O_CREAT | O_BINARY, 0666);
CHECK_NOTNULL(wfile, "");

#define WRITE_DATA_SIZE(data, size) \
do { \
auto tsk = ::dsn::file::write( \
wfile, (char *)&data, size, offset, LPC_AIO_IMMEDIATE_CALLBACK, nullptr, nullptr); \
tsk->wait(); \
offset += size; \
} while (false)

#define WRITE_DATA(data) WRITE_DATA_SIZE(data, sizeof(data))

uint64_t offset = 0;
uint64_t count = (uint64_t)_store.size();
int magic = 0xdeadbeef;
WRITE_DATA(count);

os.write((const char *)&count, (uint32_t)sizeof(count));
os.write((const char *)&magic, (uint32_t)sizeof(magic));
int magic = 0xdeadbeef;
WRITE_DATA(magic);

for (auto it = _store.begin(); it != _store.end(); ++it) {
const std::string &k = it->first;
for (const auto &kv : _store) {
const std::string &k = kv.first;
uint32_t sz = (uint32_t)k.length();
WRITE_DATA(sz);
WRITE_DATA_SIZE(k[0], sz);

os.write((const char *)&sz, (uint32_t)sizeof(sz));
os.write((const char *)&k[0], sz);

const std::string &v = it->second;
const std::string &v = kv.second;
sz = (uint32_t)v.length();

os.write((const char *)&sz, (uint32_t)sizeof(sz));
os.write((const char *)&v[0], sz);
WRITE_DATA(sz);
WRITE_DATA_SIZE(v[0], sz);
}
#undef WRITE_DATA
#undef WRITE_DATA_SIZE

os.close();
CHECK_EQ(ERR_OK, file::flush(wfile));
CHECK_EQ(ERR_OK, file::close(wfile));

// TODO: gc checkpoints
set_last_durable_decree(last_commit);
Expand Down
3 changes: 2 additions & 1 deletion src/replica/storage/simple_kv/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ set(MY_PROJ_LIBS dsn_replica_server
zookeeper
hashtable
gtest
)
dsn_utils
rocksdb)

set(MY_BOOST_LIBS Boost::system Boost::filesystem Boost::regex)

Expand Down
3 changes: 3 additions & 0 deletions src/replica/storage/simple_kv/test/case-000.ini
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ arguments = localhost:34601
[meta_server]
server_list = localhost:34601

[pegasus.server]
encrypt_data_at_rest = false

[replication.app]
app_name = simple_kv.instance0
app_type = simple_kv
Expand Down
3 changes: 3 additions & 0 deletions src/replica/storage/simple_kv/test/case-001.ini
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ arguments = localhost:34601
[meta_server]
server_list = localhost:34601

[pegasus.server]
encrypt_data_at_rest = false

[replication.app]
app_name = simple_kv.instance0
app_type = simple_kv
Expand Down
3 changes: 3 additions & 0 deletions src/replica/storage/simple_kv/test/case-002.ini
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ arguments = localhost:34601
[meta_server]
server_list = localhost:34601

[pegasus.server]
encrypt_data_at_rest = false

[replication.app]
app_name = simple_kv.instance0
app_type = simple_kv
Expand Down
3 changes: 3 additions & 0 deletions src/replica/storage/simple_kv/test/case-003.ini
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ arguments = localhost:34601
[meta_server]
server_list = localhost:34601

[pegasus.server]
encrypt_data_at_rest = false

[replication.app]
app_name = simple_kv.instance0
app_type = simple_kv
Expand Down
3 changes: 3 additions & 0 deletions src/replica/storage/simple_kv/test/case-004.ini
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ arguments = localhost:34601
[meta_server]
server_list = localhost:34601

[pegasus.server]
encrypt_data_at_rest = false

[replication.app]
app_name = simple_kv.instance0
app_type = simple_kv
Expand Down
3 changes: 3 additions & 0 deletions src/replica/storage/simple_kv/test/case-005.ini
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ arguments = localhost:34601
[meta_server]
server_list = localhost:34601

[pegasus.server]
encrypt_data_at_rest = false

[replication.app]
app_name = simple_kv.instance0
app_type = simple_kv
Expand Down
3 changes: 3 additions & 0 deletions src/replica/storage/simple_kv/test/case-006.ini
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ arguments = localhost:34601
[meta_server]
server_list = localhost:34601

[pegasus.server]
encrypt_data_at_rest = false

[replication.app]
app_name = simple_kv.instance0
app_type = simple_kv
Expand Down
3 changes: 3 additions & 0 deletions src/replica/storage/simple_kv/test/case-100.ini
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ arguments = localhost:34601
[meta_server]
server_list = localhost:34601

[pegasus.server]
encrypt_data_at_rest = false

[replication.app]
app_name = simple_kv.instance0
app_type = simple_kv
Expand Down
3 changes: 3 additions & 0 deletions src/replica/storage/simple_kv/test/case-101.ini
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ arguments = localhost:34601
[meta_server]
server_list = localhost:34601

[pegasus.server]
encrypt_data_at_rest = false

[replication.app]
app_name = simple_kv.instance0
app_type = simple_kv
Expand Down
3 changes: 3 additions & 0 deletions src/replica/storage/simple_kv/test/case-102.ini
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ arguments = localhost:34601
[meta_server]
server_list = localhost:34601

[pegasus.server]
encrypt_data_at_rest = false

[replication.app]
app_name = simple_kv.instance0
app_type = simple_kv
Expand Down
3 changes: 3 additions & 0 deletions src/replica/storage/simple_kv/test/case-103.ini
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ arguments = localhost:34601
[meta_server]
server_list = localhost:34601

[pegasus.server]
encrypt_data_at_rest = false

[replication.app]
app_name = simple_kv.instance0
app_type = simple_kv
Expand Down
3 changes: 3 additions & 0 deletions src/replica/storage/simple_kv/test/case-104.ini
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ arguments = localhost:34601
[meta_server]
server_list = localhost:34601

[pegasus.server]
encrypt_data_at_rest = false

[replication.app]
app_name = simple_kv.instance0
app_type = simple_kv
Expand Down
3 changes: 3 additions & 0 deletions src/replica/storage/simple_kv/test/case-105.ini
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ arguments = localhost:34601
[meta_server]
server_list = localhost:34601

[pegasus.server]
encrypt_data_at_rest = false

[replication.app]
app_name = simple_kv.instance0
app_type = simple_kv
Expand Down
3 changes: 3 additions & 0 deletions src/replica/storage/simple_kv/test/case-106.ini
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ arguments = localhost:34601
[meta_server]
server_list = localhost:34601

[pegasus.server]
encrypt_data_at_rest = false

[replication.app]
app_name = simple_kv.instance0
app_type = simple_kv
Expand Down
3 changes: 3 additions & 0 deletions src/replica/storage/simple_kv/test/case-107.ini
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ arguments = localhost:34601
[meta_server]
server_list = localhost:34601

[pegasus.server]
encrypt_data_at_rest = false

[replication.app]
app_name = simple_kv.instance0
app_type = simple_kv
Expand Down
3 changes: 3 additions & 0 deletions src/replica/storage/simple_kv/test/case-108.ini
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ arguments = localhost:34601
[meta_server]
server_list = localhost:34601

[pegasus.server]
encrypt_data_at_rest = false

[replication.app]
app_name = simple_kv.instance0
app_type = simple_kv
Expand Down
3 changes: 3 additions & 0 deletions src/replica/storage/simple_kv/test/case-109.ini
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ arguments = localhost:34601
[meta_server]
server_list = localhost:34601

[pegasus.server]
encrypt_data_at_rest = false

[replication.app]
app_name = simple_kv.instance0
app_type = simple_kv
Expand Down
3 changes: 3 additions & 0 deletions src/replica/storage/simple_kv/test/case-200.ini
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ arguments = localhost:34601
[meta_server]
server_list = localhost:34601

[pegasus.server]
encrypt_data_at_rest = false

[replication.app]
app_name = simple_kv.instance0
app_type = simple_kv
Expand Down
3 changes: 3 additions & 0 deletions src/replica/storage/simple_kv/test/case-201.ini
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ arguments = localhost:34601
[meta_server]
server_list = localhost:34601

[pegasus.server]
encrypt_data_at_rest = false

[replication.app]
app_name = simple_kv.instance0
app_type = simple_kv
Expand Down
3 changes: 3 additions & 0 deletions src/replica/storage/simple_kv/test/case-202-0.ini
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ arguments = localhost:34601
[meta_server]
server_list = localhost:34601

[pegasus.server]
encrypt_data_at_rest = false

[replication.app]
app_name = simple_kv.instance0
app_type = simple_kv
Expand Down
Loading

0 comments on commit 124a44e

Please sign in to comment.