diff --git a/rdsn b/rdsn index ca49482a57..9622574877 160000 --- a/rdsn +++ b/rdsn @@ -1 +1 @@ -Subproject commit ca49482a57c8470a9f44fc904284e64fdc523a49 +Subproject commit 9622574877c8ed35f2b35e839b29a92de039aa14 diff --git a/run.sh b/run.sh index e7f462b408..64e4669ffd 100755 --- a/run.sh +++ b/run.sh @@ -209,7 +209,7 @@ function run_build() echo "INFO: start build rocksdb..." ROCKSDB_BUILD_DIR="$ROOT/rocksdb/build" ROCKSDB_BUILD_OUTPUT="$ROCKSDB_BUILD_DIR/output" - CMAKE_OPTIONS="-DCMAKE_C_COMPILER=$C_COMPILER -DCMAKE_CXX_COMPILER=$CXX_COMPILER -DWITH_LZ4=ON -DWITH_ZSTD=ON -DWITH_SNAPPY=ON -DWITH_BZ2=OFF -DWITH_TESTS=OFF" + CMAKE_OPTIONS="-DCMAKE_C_COMPILER=$C_COMPILER -DCMAKE_CXX_COMPILER=$CXX_COMPILER -DWITH_LZ4=ON -DWITH_ZSTD=ON -DWITH_SNAPPY=ON -DWITH_BZ2=OFF -DWITH_TESTS=OFF -DCMAKE_CXX_FLAGS=-g" if [ "$WARNING_ALL" == "YES" ] then echo "WARNING_ALL=YES" diff --git a/src/base/pegasus_key_schema.h b/src/base/pegasus_key_schema.h index 3222fcd0de..0b6183d33f 100644 --- a/src/base/pegasus_key_schema.h +++ b/src/base/pegasus_key_schema.h @@ -81,7 +81,7 @@ void pegasus_generate_next_blob(::dsn::blob &next, const T &hash_key, const T &s next = buf.range(0, p - (unsigned char *)(buf.data()) + 1); } -// restore hash_key and sort_key from rocksdb value. +// restore hash_key and sort_key from rocksdb key. // no data copied. inline void pegasus_restore_key(const ::dsn::blob &key, ::dsn::blob &hash_key, ::dsn::blob &sort_key) @@ -106,7 +106,7 @@ pegasus_restore_key(const ::dsn::blob &key, ::dsn::blob &hash_key, ::dsn::blob & } } -// restore hash_key and sort_key from rocksdb value. +// restore hash_key and sort_key from rocksdb key. // data is copied into output 'hash_key' and 'sort_key'. inline void pegasus_restore_key(const ::dsn::blob &key, std::string &hash_key, std::string &sort_key) diff --git a/src/server/config.ini b/src/server/config.ini index 854efc0f59..b787251b01 100644 --- a/src/server/config.ini +++ b/src/server/config.ini @@ -277,6 +277,8 @@ rocksdb_block_cache_capacity = 10737418240 rocksdb_block_cache_num_shard_bits = -1 rocksdb_disable_bloom_filter = false + # Bloom filter type, should be either 'common' or 'prefix' + rocksdb_filter_type = prefix checkpoint_reserve_min_count = 2 checkpoint_reserve_time_seconds = 1800 diff --git a/src/server/hashkey_transform.h b/src/server/hashkey_transform.h new file mode 100644 index 0000000000..33a89c5e76 --- /dev/null +++ b/src/server/hashkey_transform.h @@ -0,0 +1,51 @@ +// Copyright (c) 2017, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#pragma once + +#include +#include + +#include +#include + +namespace pegasus { +namespace server { + +class HashkeyTransform : public rocksdb::SliceTransform +{ +public: + HashkeyTransform() = default; + + // NOTE: You must change the name if Transform() algorithm changed. + const char *Name() const override { return "pegasus.HashkeyTransform"; } + + rocksdb::Slice Transform(const rocksdb::Slice &src) const override + { + // TODO(yingchun): There is a bug in rocksdb 5.9.2, it has been fixed by + // cca141ecf8634a42b5eb548cb0ac3a6b77d783c1, we can remove this judgement after upgrading + // rocksdb. + if (src.size() < 2) { + return src; + } + + // hash_key_len is in big endian + uint16_t hash_key_len = be16toh(*(int16_t *)(src.data())); + dassert(src.size() >= 2 + hash_key_len, + "key length must be no less than (2 + hash_key_len)"); + return rocksdb::Slice(src.data(), 2 + hash_key_len); + } + + bool InDomain(const rocksdb::Slice &src) const override + { + // Empty put keys are not in domain. + return src.size() >= 2; + } + + bool InRange(const rocksdb::Slice &dst) const override { return true; } + + bool SameResultWhenAppended(const rocksdb::Slice &prefix) const override { return false; } +}; +} // namespace server +} // namespace pegasus diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index 033462b063..6a71083000 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -20,6 +20,7 @@ #include "base/pegasus_value_schema.h" #include "base/pegasus_utils.h" #include "capacity_unit_calculator.h" +#include "hashkey_transform.h" #include "pegasus_event_listener.h" #include "pegasus_server_write.h" @@ -210,11 +211,25 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r) _tbl_opts.block_cache = _block_cache; } - if (!dsn_config_get_value_bool("pegasus.server", - "rocksdb_disable_bloom_filter", - false, - "rocksdb tbl_opts.filter_policy")) { + // Bloom filter configurations. + bool disable_bloom_filter = dsn_config_get_value_bool( + "pegasus.server", "rocksdb_disable_bloom_filter", false, "Whether to disable bloom filter"); + if (!disable_bloom_filter) { _tbl_opts.filter_policy.reset(rocksdb::NewBloomFilterPolicy(10, false)); + + std::string filter_type = + dsn_config_get_value_string("pegasus.server", + "rocksdb_filter_type", + "prefix", + "Bloom filter type, should be either 'common' or 'prefix'"); + dassert(filter_type == "common" || filter_type == "prefix", + "[pegasus.server]rocksdb_filter_type should be either 'common' or 'prefix'."); + if (filter_type == "prefix") { + _db_opts.prefix_extractor.reset(new HashkeyTransform()); + _db_opts.memtable_prefix_bloom_size_ratio = 0.1; + + _rd_opts.prefix_same_as_start = true; + } } _db_opts.table_factory.reset(NewBlockBasedTableFactory(_tbl_opts)); @@ -708,9 +723,10 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req return; } - std::unique_ptr it(_db->NewIterator(_rd_opts)); + std::unique_ptr it; bool complete = false; if (!request.reverse) { + it.reset(_db->NewIterator(_rd_opts)); it->Seek(start); bool first_exclusive = !start_inclusive; while (count < max_kv_count && size < max_kv_size && it->Valid()) { @@ -761,6 +777,18 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req it->Next(); } } else { // reverse + rocksdb::ReadOptions rd_opts(_rd_opts); + if (_db_opts.prefix_extractor) { + // NOTE: Prefix bloom filter is not supported in reverse seek mode (see + // https://github.com/facebook/rocksdb/wiki/Prefix-Seek-API-Changes#limitation for + // more details), and we have to do total order seek on rocksdb which might be worse + // performance. However we consider that reverse scan is a rare use case, and if + // your workload has many reverse scans, you'd better use 'common' bloom filter (by + // set [pegasus.server]rocksdb_filter_type to 'common'). + rd_opts.total_order_seek = true; + rd_opts.prefix_same_as_start = false; + } + it.reset(_db->NewIterator(rd_opts)); it->SeekForPrev(stop); bool first_exclusive = !stop_inclusive; std::vector<::dsn::apps::key_value> reverse_kvs; @@ -1140,6 +1168,17 @@ void pegasus_server_impl::on_get_scanner(const ::dsn::apps::get_scanner_request return; } + rocksdb::ReadOptions rd_opts(_rd_opts); + if (_db_opts.prefix_extractor) { + ::dsn::blob start_hash_key, tmp; + pegasus_restore_key(request.start_key, start_hash_key, tmp); + if (start_hash_key.size() == 0) { + // hash_key is not passed, only happened when do full scan (scanners got by + // get_unordered_scanners) on a partition, we have to do total order seek on rocksDB. + rd_opts.total_order_seek = true; + rd_opts.prefix_same_as_start = false; + } + } bool start_inclusive = request.start_inclusive; bool stop_inclusive = request.stop_inclusive; rocksdb::Slice start(request.start_key.data(), request.start_key.length()); @@ -1156,6 +1195,12 @@ void pegasus_server_impl::on_get_scanner(const ::dsn::apps::get_scanner_request if (prefix_start.compare(start) > 0) { start = prefix_start; start_inclusive = true; + // Now 'start' is generated by 'request.hash_key_filter_pattern', it may be not a real + // hashkey, we should not seek this prefix by prefix bloom filter. However, it only + // happen when do full scan (scanners got by get_unordered_scanners), in which case the + // following flags has been updated. + dassert(!_db_opts.prefix_extractor || rd_opts.total_order_seek, "Invalid option"); + dassert(!_db_opts.prefix_extractor || !rd_opts.prefix_same_as_start, "Invalid option"); } } @@ -1180,7 +1225,7 @@ void pegasus_server_impl::on_get_scanner(const ::dsn::apps::get_scanner_request return; } - std::unique_ptr it(_db->NewIterator(_rd_opts)); + std::unique_ptr it(_db->NewIterator(rd_opts)); it->Seek(start); bool complete = false; bool first_exclusive = !start_inclusive; diff --git a/src/server/test/hashkey_transform_test.cpp b/src/server/test/hashkey_transform_test.cpp new file mode 100644 index 0000000000..6b6a82e3ea --- /dev/null +++ b/src/server/test/hashkey_transform_test.cpp @@ -0,0 +1,64 @@ +// Copyright (c) 2017, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#include "server/hashkey_transform.h" + +#include +#include + +#include "base/pegasus_key_schema.h" + +// User define SliceTransform must obey the 4 rules of ColumnFamilyOptions.prefix_extractor: +// 1) key.starts_with(prefix(key)) +// 2) Compare(prefix(key), key) <= 0. +// 3) If Compare(k1, k2) <= 0, then Compare(prefix(k1), prefix(k2)) <= 0 +// 4) prefix(prefix(key)) == prefix(key) +TEST(HashkeyTransformTest, Basic) +{ + pegasus::server::HashkeyTransform prefix_extractor; + const rocksdb::Comparator *comp = rocksdb::BytewiseComparator(); + + dsn::blob bkey1, bkey2, bkey3, bkey4; + pegasus::pegasus_generate_key(bkey1, std::string("h1"), std::string("s1")); + pegasus::pegasus_generate_key(bkey2, std::string("h2"), std::string("s1")); + pegasus::pegasus_generate_key(bkey3, std::string("h1"), std::string("s2")); + pegasus::pegasus_generate_key(bkey4, std::string("h1"), std::string("")); + rocksdb::Slice skey1(bkey1.data(), bkey1.size()); + rocksdb::Slice skey2(bkey2.data(), bkey2.size()); + rocksdb::Slice skey3(bkey3.data(), bkey3.size()); + rocksdb::Slice skey4(bkey4.data(), bkey4.size()); + + // 1) key.starts_with(prefix(key)) + ASSERT_TRUE(skey1.starts_with(prefix_extractor.Transform(skey1))); + ASSERT_TRUE(skey2.starts_with(prefix_extractor.Transform(skey2))); + ASSERT_TRUE(skey3.starts_with(prefix_extractor.Transform(skey3))); + ASSERT_TRUE(skey4.starts_with(prefix_extractor.Transform(skey4))); + + // 2) Compare(prefix(key), key) <= 0. + ASSERT_LT(comp->Compare(prefix_extractor.Transform(skey1), skey1), 0); // h1 < h1s1 + ASSERT_LT(comp->Compare(prefix_extractor.Transform(skey2), skey2), 0); // h2 < h2s1 + ASSERT_LT(comp->Compare(prefix_extractor.Transform(skey3), skey3), 0); // h1 < h1s2 + ASSERT_EQ(comp->Compare(prefix_extractor.Transform(skey4), skey4), 0); // h1 == h1 + + // 3) If Compare(k1, k2) <= 0, then Compare(prefix(k1), prefix(k2)) <= 0 + ASSERT_LT(comp->Compare(skey1, skey2), 0); // h1s1 < h2s1 + ASSERT_LT(comp->Compare(prefix_extractor.Transform(skey1), prefix_extractor.Transform(skey2)), + 0); // h1 < h2 + ASSERT_LT(comp->Compare(skey1, skey3), 0); // h1s1 < h1s2 + ASSERT_EQ(comp->Compare(prefix_extractor.Transform(skey1), prefix_extractor.Transform(skey3)), + 0); // h1 == h1 + ASSERT_GT(comp->Compare(skey1, skey4), 0); // h1s1 > h1 + ASSERT_EQ(comp->Compare(prefix_extractor.Transform(skey1), prefix_extractor.Transform(skey4)), + 0); // h1 == h1 + + // 4) prefix(prefix(key)) == prefix(key) + ASSERT_EQ(prefix_extractor.Transform(prefix_extractor.Transform(skey1)), + prefix_extractor.Transform(skey1)); + ASSERT_EQ(prefix_extractor.Transform(prefix_extractor.Transform(skey2)), + prefix_extractor.Transform(skey2)); + ASSERT_EQ(prefix_extractor.Transform(prefix_extractor.Transform(skey3)), + prefix_extractor.Transform(skey3)); + ASSERT_EQ(prefix_extractor.Transform(prefix_extractor.Transform(skey4)), + prefix_extractor.Transform(skey4)); +} diff --git a/src/test/function_test/test_basic.cpp b/src/test/function_test/test_basic.cpp index ba382a5035..c26a300b0a 100644 --- a/src/test/function_test/test_basic.cpp +++ b/src/test/function_test/test_basic.cpp @@ -1627,7 +1627,7 @@ TEST(basic, scan_with_filter) kvs["n_1"] = "b"; kvs["n_2"] = "b"; kvs["n_3"] = "b"; - int ret = client->multi_set("x", kvs); + int ret = client->multi_set("xyz", kvs); ASSERT_EQ(PERR_OK, ret); // scan with batch_size = 10 @@ -1637,7 +1637,7 @@ TEST(basic, scan_with_filter) options.sort_key_filter_pattern = "m"; options.batch_size = 10; pegasus_client::pegasus_scanner *scanner = nullptr; - ret = client->get_scanner("x", "", "", options, scanner); + ret = client->get_scanner("xyz", "", "", options, scanner); ASSERT_EQ(0, ret) << "Error occurred when getting scanner. error=" << client->get_error_string(ret); ASSERT_NE(nullptr, scanner); @@ -1646,7 +1646,7 @@ TEST(basic, scan_with_filter) std::string sort_key; std::string value; while (!(ret = (scanner->next(hash_key, sort_key, value)))) { - ASSERT_EQ("x", hash_key); + ASSERT_EQ("xyz", hash_key); ASSERT_EQ("a", value); data[sort_key] = value; } @@ -1666,7 +1666,7 @@ TEST(basic, scan_with_filter) options.sort_key_filter_pattern = "m"; options.batch_size = 3; pegasus_client::pegasus_scanner *scanner = nullptr; - ret = client->get_scanner("x", "", "", options, scanner); + ret = client->get_scanner("xyz", "", "", options, scanner); ASSERT_EQ(PERR_OK, ret); ASSERT_NE(nullptr, scanner); std::map data; @@ -1674,7 +1674,7 @@ TEST(basic, scan_with_filter) std::string sort_key; std::string value; while (!(ret = (scanner->next(hash_key, sort_key, value)))) { - ASSERT_EQ("x", hash_key); + ASSERT_EQ("xyz", hash_key); ASSERT_EQ("a", value); data[sort_key] = value; } @@ -1687,6 +1687,29 @@ TEST(basic, scan_with_filter) ASSERT_NE(data.end(), data.find("m_5")); } + // scan with batch_size = 10 + { + pegasus_client::scan_options options; + options.hash_key_filter_type = pegasus_client::FT_MATCH_PREFIX; + options.hash_key_filter_pattern = "xy"; + options.batch_size = 10; + pegasus_client::pegasus_scanner *scanner = nullptr; + ret = client->get_scanner("xyz", "", "", options, scanner); + ASSERT_EQ(0, ret) << "Error occurred when getting scanner. error=" + << client->get_error_string(ret); + ASSERT_NE(nullptr, scanner); + std::map data; + std::string hash_key; + std::string sort_key; + std::string value; + while (!(ret = (scanner->next(hash_key, sort_key, value)))) { + ASSERT_EQ("xyz", hash_key); + data[sort_key] = value; + } + delete scanner; + ASSERT_EQ(kvs, data); + } + // multi_del std::set sortkeys; for (auto kv : kvs) { @@ -1710,10 +1733,10 @@ TEST(basic, full_scan_with_filter) kvs["n_1"] = "b"; kvs["n_2"] = "b"; kvs["n_3"] = "b"; - int ret = client->multi_set("x", kvs); + int ret = client->multi_set("xyz", kvs); ASSERT_EQ(PERR_OK, ret); - // scan with batch_size = 10 + // scan with sort key filter and batch_size = 10 { pegasus_client::scan_options options; options.sort_key_filter_type = pegasus_client::FT_MATCH_PREFIX; @@ -1729,7 +1752,7 @@ TEST(basic, full_scan_with_filter) std::string sort_key; std::string value; while (!(ret = (scanner->next(hash_key, sort_key, value)))) { - ASSERT_EQ("x", hash_key); + ASSERT_EQ("xyz", hash_key); ASSERT_EQ("a", value); data[sort_key] = value; } @@ -1742,7 +1765,7 @@ TEST(basic, full_scan_with_filter) ASSERT_NE(data.end(), data.find("m_5")); } - // scan with batch_size = 3 + // scan with sort key filter and batch_size = 3 { pegasus_client::scan_options options; options.sort_key_filter_type = pegasus_client::FT_MATCH_PREFIX; @@ -1758,7 +1781,7 @@ TEST(basic, full_scan_with_filter) std::string sort_key; std::string value; while (!(ret = (scanner->next(hash_key, sort_key, value)))) { - ASSERT_EQ("x", hash_key); + ASSERT_EQ("xyz", hash_key); ASSERT_EQ("a", value); data[sort_key] = value; } @@ -1771,6 +1794,29 @@ TEST(basic, full_scan_with_filter) ASSERT_NE(data.end(), data.find("m_5")); } + // scan with hash key filter and batch_size = 10 + { + pegasus_client::scan_options options; + options.hash_key_filter_type = pegasus_client::FT_MATCH_PREFIX; + options.hash_key_filter_pattern = "xy"; + options.batch_size = 10; + std::vector scanners; + ret = client->get_unordered_scanners(1, options, scanners); + ASSERT_EQ(PERR_OK, ret); + ASSERT_EQ(1, scanners.size()); + pegasus_client::pegasus_scanner *scanner = scanners[0]; + std::map data; + std::string hash_key; + std::string sort_key; + std::string value; + while (!(ret = (scanner->next(hash_key, sort_key, value)))) { + ASSERT_EQ("xyz", hash_key); + data[sort_key] = value; + } + delete scanner; + ASSERT_EQ(kvs, data); + } + // multi_del std::set sortkeys; for (auto kv : kvs) {