Skip to content

Commit

Permalink
chore(tiering): Faster smallbins serialization #2
Browse files Browse the repository at this point in the history
Signed-off-by: Vladislav Oleshko <[email protected]>
  • Loading branch information
dranikpg committed Jul 30, 2024
1 parent e2d65a0 commit 0b4a673
Show file tree
Hide file tree
Showing 13 changed files with 254 additions and 27 deletions.
2 changes: 1 addition & 1 deletion src/core/compact_object.cc
Original file line number Diff line number Diff line change
Expand Up @@ -985,7 +985,7 @@ std::pair<size_t, size_t> CompactObj::GetExternalSlice() const {
}

void CompactObj::Materialize(std::string_view blob, bool is_raw) {
CHECK(IsExternal()) << int(taglen_);
// CHECK(IsExternal()) << int(taglen_);

DCHECK_GT(blob.size(), kInlineLen);

Expand Down
9 changes: 9 additions & 0 deletions src/core/compact_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,15 @@ class CompactObj {

bool HasAllocated() const;

uint8_t GetEncodingMask() const {
return mask_ & kEncMask;
}

void SetEncodingMask(uint8_t mask) {
mask_ &= ~kEncMask;
mask_ |= (mask & kEncMask);
}

private:
void EncodeString(std::string_view str);
size_t DecodedLen(size_t sz) const;
Expand Down
5 changes: 4 additions & 1 deletion src/server/rdb_extensions.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ constexpr uint8_t RDB_TYPE_JSON = 30;
constexpr uint8_t RDB_TYPE_HASH_WITH_EXPIRY = 31;
constexpr uint8_t RDB_TYPE_SET_WITH_EXPIRY = 32;
constexpr uint8_t RDB_TYPE_SBF = 33;
constexpr uint8_t RDB_TYPE_TIERED_SEGMENT = 34;

constexpr bool rdbIsObjectTypeDF(uint8_t type) {
return __rdbIsObjectType(type) || (type == RDB_TYPE_JSON) ||
(type == RDB_TYPE_HASH_WITH_EXPIRY) || (type == RDB_TYPE_SET_WITH_EXPIRY) ||
(type == RDB_TYPE_SBF);
(type == RDB_TYPE_SBF) || (type == RDB_TYPE_TIERED_SEGMENT);
}

// Opcodes: Range 200-240 is used by DF extensions.
Expand All @@ -40,6 +41,8 @@ constexpr uint8_t RDB_OPCODE_JOURNAL_BLOB = 210;
// so it is always sent at the end of the RDB stream.
constexpr uint8_t RDB_OPCODE_JOURNAL_OFFSET = 211;

constexpr uint8_t RDB_OPCODE_TIERED_PAGE = 212;

constexpr uint8_t RDB_OPCODE_DF_MASK = 220; /* Mask for key properties */

// RDB_OPCODE_DF_MASK define 4byte field with next flags
Expand Down
135 changes: 130 additions & 5 deletions src/server/rdb_load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ extern "C" {
#include "base/logging.h"
#include "core/bloom.h"
#include "core/json/json_object.h"
#include "core/overloaded.h"
#include "core/sorted_map.h"
#include "core/string_map.h"
#include "core/string_set.h"
Expand All @@ -48,6 +49,7 @@ extern "C" {
#include "server/server_state.h"
#include "server/set_family.h"
#include "server/tiering/common.h" // for _KB literal
#include "server/tiering/disk_storage.h"
#include "server/transaction.h"
#include "strings/human_readable.h"

Expand Down Expand Up @@ -387,6 +389,7 @@ class RdbLoaderBase::OpaqueObjLoader {
void operator()(const LzfString& lzfstr);
void operator()(const unique_ptr<LoadTrace>& ptr);
void operator()(const RdbSBF& src);
void operator()(const RdbTieredSegment& segmnet);

std::error_code ec() const {
return ec_;
Expand Down Expand Up @@ -481,6 +484,10 @@ void RdbLoaderBase::OpaqueObjLoader::operator()(const RdbSBF& src) {
pv_->SetSBF(sbf);
}

void RdbLoaderBase::OpaqueObjLoader::operator()(const RdbTieredSegment& src) {
CHECK(false) << "unreachable";
}

void RdbLoaderBase::OpaqueObjLoader::CreateSet(const LoadTrace* ltrace) {
size_t len = ltrace->blob_count();

Expand Down Expand Up @@ -1385,6 +1392,9 @@ error_code RdbLoaderBase::ReadObj(int rdbtype, OpaqueObj* dest) {
case RDB_TYPE_SBF:
iores = ReadSBF();
break;
case RDB_TYPE_TIERED_SEGMENT:
iores = ReadTieredSegment();
break;
default:
LOG(ERROR) << "Unsupported rdb type " << rdbtype;

Expand Down Expand Up @@ -1878,6 +1888,14 @@ auto RdbLoaderBase::ReadSBF() -> io::Result<OpaqueObj> {
return OpaqueObj{std::move(res), RDB_TYPE_SBF};
}

auto RdbLoaderBase::ReadTieredSegment() -> io::Result<OpaqueObj> {
RdbTieredSegment segment;
SET_OR_UNEXPECT(LoadLen(nullptr), segment.offset);
SET_OR_UNEXPECT(LoadLen(nullptr), segment.length);
SET_OR_UNEXPECT(LoadLen(nullptr), segment.enc_mask);
return OpaqueObj{segment, RDB_TYPE_TIERED_SEGMENT};
};

template <typename T> io::Result<T> RdbLoaderBase::FetchInt() {
auto ec = EnsureRead(sizeof(T));
if (ec)
Expand Down Expand Up @@ -1924,6 +1942,18 @@ RdbLoader::RdbLoader(Service* service)
}

RdbLoader::~RdbLoader() {
for (auto& [_, page] : small_items_pages_) {
if (!holds_alternative<tiering::DiskSegment>(page))
continue;
auto segment = get<tiering::DiskSegment>(page);
EngineShard::tlocal()->tiered_storage()->BorrowStorage().MarkAsFree(segment);
}

for (auto& [_, items] : small_items_) {
for (Item* item : items)
delete item;
}

while (true) {
Item* item = item_queue_.Pop();
if (item == nullptr)
Expand Down Expand Up @@ -2117,6 +2147,11 @@ error_code RdbLoader::Load(io::Source* src) {
continue;
}

if (type == RDB_OPCODE_TIERED_PAGE) {
RETURN_ON_ERR(LoadTieredPage());
continue;
}

if (!rdbIsObjectTypeDF(type)) {
return RdbError(errc::invalid_rdb_type);
}
Expand All @@ -2126,6 +2161,11 @@ error_code RdbLoader::Load(io::Source* src) {
settings.Reset();
} // main load loop

// Flush all small items
HandleSmallItems(true);

FlushAllShards();

DVLOG(1) << "RdbLoad loop finished";

if (stop_early_) {
Expand Down Expand Up @@ -2348,6 +2388,38 @@ error_code RdbLoaderBase::HandleJournalBlob(Service* service) {
return std::error_code{};
}

error_code RdbLoader::LoadTieredPage() {
size_t offset;
SET_OR_RETURN(LoadLen(nullptr), offset);

std::string page;
SET_OR_RETURN(FetchGenericString(), page);

// If tiering is enabled, try saving the received page on disk
// Fall back to memory in case of errors
if (EngineShard::tlocal() && EngineShard::tlocal()->tiered_storage()) {
auto& storage = EngineShard::tlocal()->tiered_storage()->BorrowStorage();

util::fb2::Done done;
std::error_code ec;
auto cb = [this, offset, &ec, &done](io::Result<tiering::DiskSegment> res) {
if (res.has_value())
small_items_pages_[offset] = res.value();
else
ec = res.error();
done.Notify();
};
ec = storage.Stash(io::Buffer(page), {}, cb);

done.Wait();
if (!ec)
return {};
}

small_items_pages_[offset] = page;
return {};
}

error_code RdbLoader::HandleAux() {
/* AUX: generic string-string fields. Use to add state to RDB
* which is backward compatible. Implementations of RDB loading
Expand Down Expand Up @@ -2531,20 +2603,37 @@ error_code RdbLoader::LoadKeyValPair(int type, ObjSettings* settings) {

item->is_sticky = settings->is_sticky;

ShardId sid = Shard(item->key, shard_set->size());
item->expire_ms = settings->expiretime;

auto& out_buf = shard_buf_[sid];
std::move(cleanup).Cancel();

if (item->val.rdb_type == RDB_TYPE_TIERED_SEGMENT) {
auto segment = get<RdbTieredSegment>(item->val.obj);
{
size_t offset = segment.offset / tiering::kPageSize * tiering::kPageSize;
auto& items = small_items_[offset];
small_items_sizes_.erase({items.size(), offset});
items.push_back(item);
small_items_sizes_.insert({items.size(), offset});
}
HandleSmallItems(false); // don't force flush
return kOk;
}

Add(item);
return kOk;
}

void RdbLoader::Add(Item* item) {
ShardId sid = Shard(item->key, shard_set->size());

auto& out_buf = shard_buf_[sid];
out_buf.emplace_back(item);
std::move(cleanup).Cancel();

constexpr size_t kBufSize = 128;
if (out_buf.size() >= kBufSize) {
FlushShardAsync(sid);
}

return kOk;
}

void RdbLoader::LoadScriptFromAux(string&& body) {
Expand All @@ -2559,6 +2648,42 @@ void RdbLoader::LoadScriptFromAux(string&& body) {
}
}

void RdbLoader::HandleSmallItems(bool flush) {
while (!small_items_.empty() && (flush || small_items_.size() > 1000)) {
auto [_, offset] = small_items_sizes_.extract(small_items_sizes_.begin()).value();
auto node = small_items_.extract(offset);

auto page_reader = [](tiering::DiskSegment segment) {
auto& store = EngineShard::tlocal()->tiered_storage()->BorrowStorage();
util::fb2::Future<std::string> f;
store.Read(segment, [f](io::Result<std::string_view> result) mutable {
CHECK(result.has_value()); // TODO
f.Resolve(string{result.value()});
});
return f.Get();
};
string page = visit(Overloaded{[](const string& s) { return s; }, page_reader},
small_items_pages_[offset]);

for (Item* item : node.mapped()) {
RdbTieredSegment segment = get<RdbTieredSegment>(item->val.obj);

CompactObj co;
co.SetEncodingMask(segment.enc_mask);
co.Materialize({page.data() + (segment.offset - offset), segment.length}, true);

VLOG(0) << "Loaded " << co.ToString();

base::PODArray<char> arr(co.Size(), nullptr);
co.GetString(arr.data());

item->val.rdb_type = RDB_TYPE_STRING;
item->val.obj = std::move(arr);
Add(item);
}
}
}

void RdbLoader::LoadSearchIndexDefFromAux(string&& def) {
facade::CapturingReplyBuilder crb{};
ConnectionContext cntx{nullptr, nullptr, &crb};
Expand Down
32 changes: 30 additions & 2 deletions src/server/rdb_load.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
//
#pragma once

#include <absl/container/btree_set.h>
#include <absl/container/flat_hash_map.h>

#include <system_error>

extern "C" {
Expand All @@ -15,6 +18,7 @@ extern "C" {
#include "io/io_buf.h"
#include "server/common.h"
#include "server/journal/serializer.h"
#include "server/tiering/common.h"

namespace dfly {

Expand Down Expand Up @@ -54,8 +58,18 @@ class RdbLoaderBase {
std::vector<Filter> filters;
};

using RdbVariant =
std::variant<long long, base::PODArray<char>, LzfString, std::unique_ptr<LoadTrace>, RdbSBF>;
struct RdbTieredSegment {
size_t offset, length;
uint8_t enc_mask;
};

struct RdbTieredPage {
size_t offset;
std::string blob;
};

using RdbVariant = std::variant<long long, base::PODArray<char>, LzfString,
std::unique_ptr<LoadTrace>, RdbSBF, RdbTieredSegment>;

struct OpaqueObj {
RdbVariant obj;
Expand Down Expand Up @@ -148,6 +162,7 @@ class RdbLoaderBase {
::io::Result<OpaqueObj> ReadRedisJson();
::io::Result<OpaqueObj> ReadJson();
::io::Result<OpaqueObj> ReadSBF();
::io::Result<OpaqueObj> ReadTieredSegment();

std::error_code SkipModuleData();
std::error_code HandleCompressedBlob(int op_type);
Expand All @@ -168,10 +183,13 @@ class RdbLoaderBase {

size_t bytes_read_ = 0;
size_t source_limit_ = SIZE_MAX;

base::PODArray<uint8_t> compr_buf_;
std::unique_ptr<DecompressImpl> decompress_impl_;

JournalReader journal_reader_{nullptr, 0};
std::optional<uint64_t> journal_offset_ = std::nullopt;

RdbVersion rdb_version_ = RDB_VERSION;
};

Expand Down Expand Up @@ -259,10 +277,14 @@ class RdbLoader : protected RdbLoaderBase {
void FlushShardAsync(ShardId sid);
void FlushAllShards();

void Add(Item* item);
void LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib);

void LoadScriptFromAux(std::string&& value);

void HandleSmallItems(bool flush);
std::error_code LoadTieredPage();

// Load index definition from RESP string describing it in FT.CREATE format,
// issues an FT.CREATE call, but does not start indexing
void LoadSearchIndexDefFromAux(std::string&& value);
Expand All @@ -285,6 +307,12 @@ class RdbLoader : protected RdbLoaderBase {
std::function<void()> full_sync_cut_cb;

base::MPSCIntrusiveQueue<Item> item_queue_;

absl::flat_hash_map<size_t /* offset */, std::vector<Item*>> small_items_;
absl::btree_set<std::pair<size_t /* num entries*/, size_t /* offset */>, std::greater<>>
small_items_sizes_;
absl::flat_hash_map<size_t /* offset */, std::variant<std::string, tiering::DiskSegment>>
small_items_pages_;
};

} // namespace dfly
Loading

0 comments on commit 0b4a673

Please sign in to comment.