Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(tiering): Faster small bins serialization #2 #3396

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/core/compact_object.cc
Original file line number Diff line number Diff line change
Expand Up @@ -985,7 +985,7 @@ std::pair<size_t, size_t> CompactObj::GetExternalSlice() const {
}

void CompactObj::Materialize(std::string_view blob, bool is_raw) {
CHECK(IsExternal()) << int(taglen_);
// CHECK(IsExternal()) << int(taglen_);

DCHECK_GT(blob.size(), kInlineLen);

Expand Down
9 changes: 9 additions & 0 deletions src/core/compact_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,15 @@ class CompactObj {

bool HasAllocated() const;

uint8_t GetEncodingMask() const {
return mask_ & kEncMask;
}

void SetEncodingMask(uint8_t mask) {
mask_ &= ~kEncMask;
mask_ |= (mask & kEncMask);
}

private:
void EncodeString(std::string_view str);
size_t DecodedLen(size_t sz) const;
Expand Down
5 changes: 4 additions & 1 deletion src/server/rdb_extensions.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ constexpr uint8_t RDB_TYPE_JSON = 30;
constexpr uint8_t RDB_TYPE_HASH_WITH_EXPIRY = 31;
constexpr uint8_t RDB_TYPE_SET_WITH_EXPIRY = 32;
constexpr uint8_t RDB_TYPE_SBF = 33;
constexpr uint8_t RDB_TYPE_TIERED_SEGMENT = 34;

constexpr bool rdbIsObjectTypeDF(uint8_t type) {
return __rdbIsObjectType(type) || (type == RDB_TYPE_JSON) ||
(type == RDB_TYPE_HASH_WITH_EXPIRY) || (type == RDB_TYPE_SET_WITH_EXPIRY) ||
(type == RDB_TYPE_SBF);
(type == RDB_TYPE_SBF) || (type == RDB_TYPE_TIERED_SEGMENT);
}

// Opcodes: Range 200-240 is used by DF extensions.
Expand All @@ -40,6 +41,8 @@ constexpr uint8_t RDB_OPCODE_JOURNAL_BLOB = 210;
// so it is always sent at the end of the RDB stream.
constexpr uint8_t RDB_OPCODE_JOURNAL_OFFSET = 211;

constexpr uint8_t RDB_OPCODE_TIERED_PAGE = 212;

constexpr uint8_t RDB_OPCODE_DF_MASK = 220; /* Mask for key properties */

// RDB_OPCODE_DF_MASK define 4byte field with next flags
Expand Down
135 changes: 130 additions & 5 deletions src/server/rdb_load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ extern "C" {
#include "base/logging.h"
#include "core/bloom.h"
#include "core/json/json_object.h"
#include "core/overloaded.h"
#include "core/sorted_map.h"
#include "core/string_map.h"
#include "core/string_set.h"
Expand All @@ -48,6 +49,7 @@ extern "C" {
#include "server/server_state.h"
#include "server/set_family.h"
#include "server/tiering/common.h" // for _KB literal
#include "server/tiering/disk_storage.h"
#include "server/transaction.h"
#include "strings/human_readable.h"

Expand Down Expand Up @@ -387,6 +389,7 @@ class RdbLoaderBase::OpaqueObjLoader {
void operator()(const LzfString& lzfstr);
void operator()(const unique_ptr<LoadTrace>& ptr);
void operator()(const RdbSBF& src);
void operator()(const RdbTieredSegment& segmnet);

std::error_code ec() const {
return ec_;
Expand Down Expand Up @@ -481,6 +484,10 @@ void RdbLoaderBase::OpaqueObjLoader::operator()(const RdbSBF& src) {
pv_->SetSBF(sbf);
}

void RdbLoaderBase::OpaqueObjLoader::operator()(const RdbTieredSegment& src) {
CHECK(false) << "unreachable";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LOG(FATAL)

}

void RdbLoaderBase::OpaqueObjLoader::CreateSet(const LoadTrace* ltrace) {
size_t len = ltrace->blob_count();

Expand Down Expand Up @@ -1385,6 +1392,9 @@ error_code RdbLoaderBase::ReadObj(int rdbtype, OpaqueObj* dest) {
case RDB_TYPE_SBF:
iores = ReadSBF();
break;
case RDB_TYPE_TIERED_SEGMENT:
iores = ReadTieredSegment();
break;
default:
LOG(ERROR) << "Unsupported rdb type " << rdbtype;

Expand Down Expand Up @@ -1878,6 +1888,14 @@ auto RdbLoaderBase::ReadSBF() -> io::Result<OpaqueObj> {
return OpaqueObj{std::move(res), RDB_TYPE_SBF};
}

auto RdbLoaderBase::ReadTieredSegment() -> io::Result<OpaqueObj> {
RdbTieredSegment segment;
SET_OR_UNEXPECT(LoadLen(nullptr), segment.offset);
SET_OR_UNEXPECT(LoadLen(nullptr), segment.length);
SET_OR_UNEXPECT(LoadLen(nullptr), segment.enc_mask);
return OpaqueObj{segment, RDB_TYPE_TIERED_SEGMENT};
};

template <typename T> io::Result<T> RdbLoaderBase::FetchInt() {
auto ec = EnsureRead(sizeof(T));
if (ec)
Expand Down Expand Up @@ -1924,6 +1942,18 @@ RdbLoader::RdbLoader(Service* service)
}

RdbLoader::~RdbLoader() {
for (auto& [_, page] : small_items_pages_) {
if (!holds_alternative<tiering::DiskSegment>(page))
continue;
auto segment = get<tiering::DiskSegment>(page);
EngineShard::tlocal()->tiered_storage()->BorrowStorage().MarkAsFree(segment);
}

for (auto& [_, items] : small_items_) {
for (Item* item : items)
delete item;
}

while (true) {
Item* item = item_queue_.Pop();
if (item == nullptr)
Expand Down Expand Up @@ -2117,6 +2147,11 @@ error_code RdbLoader::Load(io::Source* src) {
continue;
}

if (type == RDB_OPCODE_TIERED_PAGE) {
RETURN_ON_ERR(LoadTieredPage());
continue;
}

if (!rdbIsObjectTypeDF(type)) {
return RdbError(errc::invalid_rdb_type);
}
Expand All @@ -2126,6 +2161,11 @@ error_code RdbLoader::Load(io::Source* src) {
settings.Reset();
} // main load loop

// Flush all small items
HandleSmallItems(true);

FlushAllShards();

DVLOG(1) << "RdbLoad loop finished";

if (stop_early_) {
Expand Down Expand Up @@ -2348,6 +2388,38 @@ error_code RdbLoaderBase::HandleJournalBlob(Service* service) {
return std::error_code{};
}

error_code RdbLoader::LoadTieredPage() {
size_t offset;
SET_OR_RETURN(LoadLen(nullptr), offset);

std::string page;
SET_OR_RETURN(FetchGenericString(), page);

// If tiering is enabled, try saving the received page on disk
// Fall back to memory in case of errors
if (EngineShard::tlocal() && EngineShard::tlocal()->tiered_storage()) {
auto& storage = EngineShard::tlocal()->tiered_storage()->BorrowStorage();

util::fb2::Done done;
std::error_code ec;
auto cb = [this, offset, &ec, &done](io::Result<tiering::DiskSegment> res) {
if (res.has_value())
small_items_pages_[offset] = res.value();
else
ec = res.error();
done.Notify();
};
ec = storage.Stash(io::Buffer(page), {}, cb);

done.Wait();
if (!ec)
return {};
}

small_items_pages_[offset] = page;
return {};
}
Comment on lines +2399 to +2422
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really wish to use "virtual" keys to store pages. This would not require having custom code for their serialization, as well custom code for loading and stashing them. We could simply rely on the tiered storage for doing this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Second, a missing optimization is that we don't re-use the offloaded page, we only use it to read from it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea would be to create a mapping table between new and old offsets and re-create those offsets in small bins

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can add TODOs to detail your ideas..
Regarding virtual keys, I think it's fine to use a predefined prefix like __df__ for dragonfly specific keys if it helps you. one thing to note - how it works with multiple databases. pages are shared between entities from differrent databases,imho

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it helps you

I want to use the existing code for serialization and offloading, so pages will be "just values" and managed automatically


error_code RdbLoader::HandleAux() {
/* AUX: generic string-string fields. Use to add state to RDB
* which is backward compatible. Implementations of RDB loading
Expand Down Expand Up @@ -2531,20 +2603,37 @@ error_code RdbLoader::LoadKeyValPair(int type, ObjSettings* settings) {

item->is_sticky = settings->is_sticky;

ShardId sid = Shard(item->key, shard_set->size());
item->expire_ms = settings->expiretime;

auto& out_buf = shard_buf_[sid];
std::move(cleanup).Cancel();

if (item->val.rdb_type == RDB_TYPE_TIERED_SEGMENT) {
auto segment = get<RdbTieredSegment>(item->val.obj);
{
size_t offset = segment.offset / tiering::kPageSize * tiering::kPageSize;
auto& items = small_items_[offset];
small_items_sizes_.erase({items.size(), offset});
items.push_back(item);
small_items_sizes_.insert({items.size(), offset});
}
HandleSmallItems(false); // don't force flush
return kOk;
}

Add(item);
return kOk;
}

void RdbLoader::Add(Item* item) {
ShardId sid = Shard(item->key, shard_set->size());

auto& out_buf = shard_buf_[sid];
out_buf.emplace_back(item);
std::move(cleanup).Cancel();

constexpr size_t kBufSize = 128;
if (out_buf.size() >= kBufSize) {
FlushShardAsync(sid);
}

return kOk;
}

void RdbLoader::LoadScriptFromAux(string&& body) {
Expand All @@ -2559,6 +2648,42 @@ void RdbLoader::LoadScriptFromAux(string&& body) {
}
}

void RdbLoader::HandleSmallItems(bool flush) {
while (!small_items_.empty() && (flush || small_items_.size() > 1000)) {
auto [_, offset] = small_items_sizes_.extract(small_items_sizes_.begin()).value();
auto node = small_items_.extract(offset);

auto page_reader = [](tiering::DiskSegment segment) {
auto& store = EngineShard::tlocal()->tiered_storage()->BorrowStorage();
util::fb2::Future<std::string> f;
store.Read(segment, [f](io::Result<std::string_view> result) mutable {
CHECK(result.has_value()); // TODO
f.Resolve(string{result.value()});
});
return f.Get();
};
string page = visit(Overloaded{[](const string& s) { return s; }, page_reader},
small_items_pages_[offset]);

for (Item* item : node.mapped()) {
RdbTieredSegment segment = get<RdbTieredSegment>(item->val.obj);

CompactObj co;
co.SetEncodingMask(segment.enc_mask);
co.Materialize({page.data() + (segment.offset - offset), segment.length}, true);

VLOG(0) << "Loaded " << co.ToString();

base::PODArray<char> arr(co.Size(), nullptr);
co.GetString(arr.data());

item->val.rdb_type = RDB_TYPE_STRING;
item->val.obj = std::move(arr);
Add(item);
}
}
}

void RdbLoader::LoadSearchIndexDefFromAux(string&& def) {
facade::CapturingReplyBuilder crb{};
ConnectionContext cntx{nullptr, nullptr, &crb};
Expand Down
32 changes: 30 additions & 2 deletions src/server/rdb_load.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
//
#pragma once

#include <absl/container/btree_set.h>
#include <absl/container/flat_hash_map.h>

#include <system_error>

extern "C" {
Expand All @@ -15,6 +18,7 @@ extern "C" {
#include "io/io_buf.h"
#include "server/common.h"
#include "server/journal/serializer.h"
#include "server/tiering/common.h"

namespace dfly {

Expand Down Expand Up @@ -54,8 +58,18 @@ class RdbLoaderBase {
std::vector<Filter> filters;
};

using RdbVariant =
std::variant<long long, base::PODArray<char>, LzfString, std::unique_ptr<LoadTrace>, RdbSBF>;
struct RdbTieredSegment {
size_t offset, length;
uint8_t enc_mask;
};

struct RdbTieredPage {
size_t offset;
std::string blob;
};

using RdbVariant = std::variant<long long, base::PODArray<char>, LzfString,
std::unique_ptr<LoadTrace>, RdbSBF, RdbTieredSegment>;

struct OpaqueObj {
RdbVariant obj;
Expand Down Expand Up @@ -148,6 +162,7 @@ class RdbLoaderBase {
::io::Result<OpaqueObj> ReadRedisJson();
::io::Result<OpaqueObj> ReadJson();
::io::Result<OpaqueObj> ReadSBF();
::io::Result<OpaqueObj> ReadTieredSegment();

std::error_code SkipModuleData();
std::error_code HandleCompressedBlob(int op_type);
Expand All @@ -168,10 +183,13 @@ class RdbLoaderBase {

size_t bytes_read_ = 0;
size_t source_limit_ = SIZE_MAX;

base::PODArray<uint8_t> compr_buf_;
std::unique_ptr<DecompressImpl> decompress_impl_;

JournalReader journal_reader_{nullptr, 0};
std::optional<uint64_t> journal_offset_ = std::nullopt;

RdbVersion rdb_version_ = RDB_VERSION;
};

Expand Down Expand Up @@ -259,10 +277,14 @@ class RdbLoader : protected RdbLoaderBase {
void FlushShardAsync(ShardId sid);
void FlushAllShards();

void Add(Item* item);
void LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib);

void LoadScriptFromAux(std::string&& value);

void HandleSmallItems(bool flush);
std::error_code LoadTieredPage();

// Load index definition from RESP string describing it in FT.CREATE format,
// issues an FT.CREATE call, but does not start indexing
void LoadSearchIndexDefFromAux(std::string&& value);
Expand All @@ -285,6 +307,12 @@ class RdbLoader : protected RdbLoaderBase {
std::function<void()> full_sync_cut_cb;

base::MPSCIntrusiveQueue<Item> item_queue_;

absl::flat_hash_map<size_t /* offset */, std::vector<Item*>> small_items_;
absl::btree_set<std::pair<size_t /* num entries*/, size_t /* offset */>, std::greater<>>
small_items_sizes_;
absl::flat_hash_map<size_t /* offset */, std::variant<std::string, tiering::DiskSegment>>
small_items_pages_;
};

} // namespace dfly
Loading