Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rocksdb): Adapt prefix bloom filter to speedup scans by hashkey #438

Merged
merged 5 commits into from
Dec 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ function run_build()
echo "INFO: start build rocksdb..."
ROCKSDB_BUILD_DIR="$ROOT/rocksdb/build"
ROCKSDB_BUILD_OUTPUT="$ROCKSDB_BUILD_DIR/output"
CMAKE_OPTIONS="-DCMAKE_C_COMPILER=$C_COMPILER -DCMAKE_CXX_COMPILER=$CXX_COMPILER -DWITH_LZ4=ON -DWITH_ZSTD=ON -DWITH_SNAPPY=ON -DWITH_BZ2=OFF -DWITH_TESTS=OFF"
CMAKE_OPTIONS="-DCMAKE_C_COMPILER=$C_COMPILER -DCMAKE_CXX_COMPILER=$CXX_COMPILER -DWITH_LZ4=ON -DWITH_ZSTD=ON -DWITH_SNAPPY=ON -DWITH_BZ2=OFF -DWITH_TESTS=OFF -DCMAKE_CXX_FLAGS=-g"
neverchanje marked this conversation as resolved.
Show resolved Hide resolved
if [ "$WARNING_ALL" == "YES" ]
then
echo "WARNING_ALL=YES"
Expand Down
4 changes: 2 additions & 2 deletions src/base/pegasus_key_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ void pegasus_generate_next_blob(::dsn::blob &next, const T &hash_key, const T &s
next = buf.range(0, p - (unsigned char *)(buf.data()) + 1);
}

// restore hash_key and sort_key from rocksdb value.
// restore hash_key and sort_key from rocksdb key.
// no data copied.
inline void
pegasus_restore_key(const ::dsn::blob &key, ::dsn::blob &hash_key, ::dsn::blob &sort_key)
Expand All @@ -106,7 +106,7 @@ pegasus_restore_key(const ::dsn::blob &key, ::dsn::blob &hash_key, ::dsn::blob &
}
}

// restore hash_key and sort_key from rocksdb value.
// restore hash_key and sort_key from rocksdb key.
// data is copied into output 'hash_key' and 'sort_key'.
inline void
pegasus_restore_key(const ::dsn::blob &key, std::string &hash_key, std::string &sort_key)
Expand Down
2 changes: 2 additions & 0 deletions src/server/config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,8 @@
rocksdb_block_cache_capacity = 10737418240
rocksdb_block_cache_num_shard_bits = -1
rocksdb_disable_bloom_filter = false
# Bloom filter type, should be either 'common' or 'prefix'
rocksdb_filter_type = prefix
hycdong marked this conversation as resolved.
Show resolved Hide resolved

checkpoint_reserve_min_count = 2
checkpoint_reserve_time_seconds = 1800
Expand Down
51 changes: 51 additions & 0 deletions src/server/hashkey_transform.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright (c) 2017, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.

#pragma once

#include <rocksdb/slice_transform.h>
#include <rocksdb/slice.h>

#include <dsn/c/api_utilities.h>
#include <dsn/utility/blob.h>

namespace pegasus {
namespace server {

class HashkeyTransform : public rocksdb::SliceTransform
{
public:
HashkeyTransform() = default;

// NOTE: You must change the name if Transform() algorithm changed.
const char *Name() const override { return "pegasus.HashkeyTransform"; }

rocksdb::Slice Transform(const rocksdb::Slice &src) const override
{
// TODO(yingchun): There is a bug in rocksdb 5.9.2, it has been fixed by
// cca141ecf8634a42b5eb548cb0ac3a6b77d783c1, we can remove this judgement after upgrading
// rocksdb.
if (src.size() < 2) {
return src;
}

// hash_key_len is in big endian
uint16_t hash_key_len = be16toh(*(int16_t *)(src.data()));
neverchanje marked this conversation as resolved.
Show resolved Hide resolved
dassert(src.size() >= 2 + hash_key_len,
"key length must be no less than (2 + hash_key_len)");
return rocksdb::Slice(src.data(), 2 + hash_key_len);
}

bool InDomain(const rocksdb::Slice &src) const override
{
// Empty put keys are not in domain.
return src.size() >= 2;
}

bool InRange(const rocksdb::Slice &dst) const override { return true; }

bool SameResultWhenAppended(const rocksdb::Slice &prefix) const override { return false; }
};
} // namespace server
} // namespace pegasus
57 changes: 51 additions & 6 deletions src/server/pegasus_server_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "base/pegasus_value_schema.h"
#include "base/pegasus_utils.h"
#include "capacity_unit_calculator.h"
#include "hashkey_transform.h"
#include "pegasus_event_listener.h"
#include "pegasus_server_write.h"

Expand Down Expand Up @@ -210,11 +211,25 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
_tbl_opts.block_cache = _block_cache;
}

if (!dsn_config_get_value_bool("pegasus.server",
"rocksdb_disable_bloom_filter",
false,
"rocksdb tbl_opts.filter_policy")) {
// Bloom filter configurations.
bool disable_bloom_filter = dsn_config_get_value_bool(
"pegasus.server", "rocksdb_disable_bloom_filter", false, "Whether to disable bloom filter");
if (!disable_bloom_filter) {
_tbl_opts.filter_policy.reset(rocksdb::NewBloomFilterPolicy(10, false));

std::string filter_type =
dsn_config_get_value_string("pegasus.server",
"rocksdb_filter_type",
"prefix",
"Bloom filter type, should be either 'common' or 'prefix'");
dassert(filter_type == "common" || filter_type == "prefix",
"[pegasus.server]rocksdb_filter_type should be either 'common' or 'prefix'.");
if (filter_type == "prefix") {
_db_opts.prefix_extractor.reset(new HashkeyTransform());
_db_opts.memtable_prefix_bloom_size_ratio = 0.1;

_rd_opts.prefix_same_as_start = true;
}
neverchanje marked this conversation as resolved.
Show resolved Hide resolved
}

_db_opts.table_factory.reset(NewBlockBasedTableFactory(_tbl_opts));
Expand Down Expand Up @@ -708,9 +723,10 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req
return;
}

std::unique_ptr<rocksdb::Iterator> it(_db->NewIterator(_rd_opts));
std::unique_ptr<rocksdb::Iterator> it;
bool complete = false;
if (!request.reverse) {
it.reset(_db->NewIterator(_rd_opts));
it->Seek(start);
bool first_exclusive = !start_inclusive;
while (count < max_kv_count && size < max_kv_size && it->Valid()) {
Expand Down Expand Up @@ -761,6 +777,18 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req
it->Next();
}
} else { // reverse
rocksdb::ReadOptions rd_opts(_rd_opts);
if (_db_opts.prefix_extractor) {
// NOTE: Prefix bloom filter is not supported in reverse seek mode (see
// https://github.com/facebook/rocksdb/wiki/Prefix-Seek-API-Changes#limitation for
// more details), and we have to do total order seek on rocksdb which might be worse
// performance. However we consider that reverse scan is a rare use case, and if
// your workload has many reverse scans, you'd better use 'common' bloom filter (by
// set [pegasus.server]rocksdb_filter_type to 'common').
rd_opts.total_order_seek = true;
rd_opts.prefix_same_as_start = false;
}
it.reset(_db->NewIterator(rd_opts));
it->SeekForPrev(stop);
bool first_exclusive = !stop_inclusive;
std::vector<::dsn::apps::key_value> reverse_kvs;
Expand Down Expand Up @@ -1140,6 +1168,17 @@ void pegasus_server_impl::on_get_scanner(const ::dsn::apps::get_scanner_request
return;
}

rocksdb::ReadOptions rd_opts(_rd_opts);
if (_db_opts.prefix_extractor) {
::dsn::blob start_hash_key, tmp;
pegasus_restore_key(request.start_key, start_hash_key, tmp);
if (start_hash_key.size() == 0) {
// hash_key is not passed, only happened when do full scan (scanners got by
// get_unordered_scanners) on a partition, we have to do total order seek on rocksDB.
rd_opts.total_order_seek = true;
rd_opts.prefix_same_as_start = false;
}
}
bool start_inclusive = request.start_inclusive;
bool stop_inclusive = request.stop_inclusive;
rocksdb::Slice start(request.start_key.data(), request.start_key.length());
Expand All @@ -1156,6 +1195,12 @@ void pegasus_server_impl::on_get_scanner(const ::dsn::apps::get_scanner_request
if (prefix_start.compare(start) > 0) {
start = prefix_start;
start_inclusive = true;
// Now 'start' is generated by 'request.hash_key_filter_pattern', it may be not a real
// hashkey, we should not seek this prefix by prefix bloom filter. However, it only
// happen when do full scan (scanners got by get_unordered_scanners), in which case the
// following flags has been updated.
dassert(!_db_opts.prefix_extractor || rd_opts.total_order_seek, "Invalid option");
dassert(!_db_opts.prefix_extractor || !rd_opts.prefix_same_as_start, "Invalid option");
}
}

Expand All @@ -1180,7 +1225,7 @@ void pegasus_server_impl::on_get_scanner(const ::dsn::apps::get_scanner_request
return;
}

std::unique_ptr<rocksdb::Iterator> it(_db->NewIterator(_rd_opts));
std::unique_ptr<rocksdb::Iterator> it(_db->NewIterator(rd_opts));
it->Seek(start);
bool complete = false;
bool first_exclusive = !start_inclusive;
Expand Down
64 changes: 64 additions & 0 deletions src/server/test/hashkey_transform_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright (c) 2017, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.

#include "server/hashkey_transform.h"

#include <gtest/gtest.h>
#include <rocksdb/comparator.h>

#include "base/pegasus_key_schema.h"

// User define SliceTransform must obey the 4 rules of ColumnFamilyOptions.prefix_extractor:
// 1) key.starts_with(prefix(key))
// 2) Compare(prefix(key), key) <= 0.
// 3) If Compare(k1, k2) <= 0, then Compare(prefix(k1), prefix(k2)) <= 0
// 4) prefix(prefix(key)) == prefix(key)
TEST(HashkeyTransformTest, Basic)
{
pegasus::server::HashkeyTransform prefix_extractor;
const rocksdb::Comparator *comp = rocksdb::BytewiseComparator();

dsn::blob bkey1, bkey2, bkey3, bkey4;
pegasus::pegasus_generate_key(bkey1, std::string("h1"), std::string("s1"));
pegasus::pegasus_generate_key(bkey2, std::string("h2"), std::string("s1"));
pegasus::pegasus_generate_key(bkey3, std::string("h1"), std::string("s2"));
pegasus::pegasus_generate_key(bkey4, std::string("h1"), std::string(""));
rocksdb::Slice skey1(bkey1.data(), bkey1.size());
rocksdb::Slice skey2(bkey2.data(), bkey2.size());
rocksdb::Slice skey3(bkey3.data(), bkey3.size());
rocksdb::Slice skey4(bkey4.data(), bkey4.size());

// 1) key.starts_with(prefix(key))
ASSERT_TRUE(skey1.starts_with(prefix_extractor.Transform(skey1)));
ASSERT_TRUE(skey2.starts_with(prefix_extractor.Transform(skey2)));
ASSERT_TRUE(skey3.starts_with(prefix_extractor.Transform(skey3)));
ASSERT_TRUE(skey4.starts_with(prefix_extractor.Transform(skey4)));

// 2) Compare(prefix(key), key) <= 0.
ASSERT_LT(comp->Compare(prefix_extractor.Transform(skey1), skey1), 0); // h1 < h1s1
ASSERT_LT(comp->Compare(prefix_extractor.Transform(skey2), skey2), 0); // h2 < h2s1
ASSERT_LT(comp->Compare(prefix_extractor.Transform(skey3), skey3), 0); // h1 < h1s2
ASSERT_EQ(comp->Compare(prefix_extractor.Transform(skey4), skey4), 0); // h1 == h1

// 3) If Compare(k1, k2) <= 0, then Compare(prefix(k1), prefix(k2)) <= 0
ASSERT_LT(comp->Compare(skey1, skey2), 0); // h1s1 < h2s1
ASSERT_LT(comp->Compare(prefix_extractor.Transform(skey1), prefix_extractor.Transform(skey2)),
0); // h1 < h2
ASSERT_LT(comp->Compare(skey1, skey3), 0); // h1s1 < h1s2
ASSERT_EQ(comp->Compare(prefix_extractor.Transform(skey1), prefix_extractor.Transform(skey3)),
0); // h1 == h1
ASSERT_GT(comp->Compare(skey1, skey4), 0); // h1s1 > h1
ASSERT_EQ(comp->Compare(prefix_extractor.Transform(skey1), prefix_extractor.Transform(skey4)),
0); // h1 == h1

// 4) prefix(prefix(key)) == prefix(key)
ASSERT_EQ(prefix_extractor.Transform(prefix_extractor.Transform(skey1)),
prefix_extractor.Transform(skey1));
ASSERT_EQ(prefix_extractor.Transform(prefix_extractor.Transform(skey2)),
prefix_extractor.Transform(skey2));
ASSERT_EQ(prefix_extractor.Transform(prefix_extractor.Transform(skey3)),
prefix_extractor.Transform(skey3));
ASSERT_EQ(prefix_extractor.Transform(prefix_extractor.Transform(skey4)),
prefix_extractor.Transform(skey4));
}
Loading