From b3f3584713030faf66f71ba1b559fcc86d15288f Mon Sep 17 00:00:00 2001 From: Lai Yingchun <405403881@qq.com> Date: Wed, 11 Jul 2018 17:36:47 +0800 Subject: [PATCH] Geo: geo library support (#74) * add geo library, support set/del/search_radial/distance APIs * integrate the library into redis proxy Former-commit-id: cb41fabdc0711354c7760c0882598ed981a31374 [formerly 24fec41a3218cee22b4722911808ab060349bb97] Former-commit-id: d97b5f76ee0dc10974d0edcbf57a0d35c9a3e3b3 --- scripts/pack_tools.sh | 1 + src/CMakeLists.txt | 2 + src/base/pegasus_utils.h | 33 + src/base/test/CMakeLists.txt | 20 + src/base/test/config.ini | 77 ++ src/base/test/main.cpp | 13 + src/base/test/utils_test.cpp | 51 ++ src/geo/CMakeLists.txt | 3 + src/geo/bench/CMakeLists.txt | 31 + src/geo/bench/bench.cpp | 109 +++ src/geo/bench/config.ini | 81 ++ src/geo/lib/CMakeLists.txt | 23 + src/geo/lib/geo_client.cpp | 853 ++++++++++++++++++ src/geo/lib/geo_client.h | 414 +++++++++ src/geo/lib/latlng_extractor.cpp | 66 ++ src/geo/lib/latlng_extractor.h | 33 + src/geo/test/CMakeLists.txt | 27 + src/geo/test/config.ini | 77 ++ src/geo/test/extractor_test.cpp | 68 ++ src/geo/test/geo_test.cpp | 563 ++++++++++++ src/geo/test/main.cpp | 13 + src/include/pegasus/error_def.h | 2 + src/redis_protocol/proxy/CMakeLists.txt | 7 +- src/redis_protocol/proxy/config.ini | 3 +- src/redis_protocol/proxy/main.cpp | 20 +- src/redis_protocol/proxy_lib/CMakeLists.txt | 2 + src/redis_protocol/proxy_lib/proxy_layer.cpp | 17 +- src/redis_protocol/proxy_lib/proxy_layer.h | 12 +- src/redis_protocol/proxy_lib/redis_parser.cpp | 510 ++++++++++- src/redis_protocol/proxy_lib/redis_parser.h | 88 +- src/redis_protocol/proxy_ut/CMakeLists.txt | 9 +- src/redis_protocol/proxy_ut/config.ini | 2 +- .../proxy_ut/redis_proxy_test.cpp | 204 ++++- src/server/CMakeLists.txt | 2 + src/shell/CMakeLists.txt | 4 + src/shell/command_helper.h | 32 +- src/shell/commands.h | 29 +- src/shell/main.cpp | 2 +- 38 files changed, 3425 insertions(+), 78 deletions(-) create mode 100644 src/base/test/CMakeLists.txt create mode 100644 src/base/test/config.ini create mode 100644 src/base/test/main.cpp create mode 100644 src/base/test/utils_test.cpp create mode 100644 src/geo/CMakeLists.txt create mode 100644 src/geo/bench/CMakeLists.txt create mode 100644 src/geo/bench/bench.cpp create mode 100644 src/geo/bench/config.ini create mode 100644 src/geo/lib/CMakeLists.txt create mode 100644 src/geo/lib/geo_client.cpp create mode 100644 src/geo/lib/geo_client.h create mode 100644 src/geo/lib/latlng_extractor.cpp create mode 100644 src/geo/lib/latlng_extractor.h create mode 100644 src/geo/test/CMakeLists.txt create mode 100644 src/geo/test/config.ini create mode 100644 src/geo/test/extractor_test.cpp create mode 100644 src/geo/test/geo_test.cpp create mode 100644 src/geo/test/main.cpp diff --git a/scripts/pack_tools.sh b/scripts/pack_tools.sh index 38ffd9401c..ef58f11b03 100755 --- a/scripts/pack_tools.sh +++ b/scripts/pack_tools.sh @@ -101,6 +101,7 @@ mkdir -p ${pack}/DSN_ROOT/lib copy_file ./DSN_ROOT/lib/*.so* ${pack}/DSN_ROOT/lib/ copy_file ./rdsn/thirdparty/output/lib/libPoco*.so.48 ${pack}/DSN_ROOT/lib/ copy_file ./rdsn/thirdparty/output/lib/libtcmalloc.so.4 ${pack}/DSN_ROOT/lib/ +copy_file ./rdsn/thirdparty/output/lib/libs2.so ${pack}/DSN_ROOT/lib/ copy_file `get_boost_lib $custom_boost_lib system` ${pack}/DSN_ROOT/lib/ copy_file `get_boost_lib $custom_boost_lib filesystem` ${pack}/DSN_ROOT/lib/ copy_file `get_stdcpp_lib $custom_gcc` ${pack}/DSN_ROOT/lib/ diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 6c5cfa9018..d704817959 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -20,10 +20,12 @@ include_directories(${CMAKE_CURRENT_SOURCE_DIR}/include) include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../rocksdb/include) add_subdirectory(base) +add_subdirectory(base/test) add_subdirectory(client_lib) add_subdirectory(server) add_subdirectory(server/test) add_subdirectory(shell) +add_subdirectory(geo) add_subdirectory(redis_protocol) add_subdirectory(test/function_test) add_subdirectory(test/kill_test) diff --git a/src/base/pegasus_utils.h b/src/base/pegasus_utils.h index 837f38c2ab..f5cee71bee 100644 --- a/src/base/pegasus_utils.h +++ b/src/base/pegasus_utils.h @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -41,6 +42,37 @@ int binary_compare(const T &a, const T &b) return r; } +template > +class top_n +{ +public: + typedef typename std::priority_queue, compare> + data_priority_queue; + + top_n(const std::list &data, int n) + { + for (const auto &r : data) { + _queue.emplace(r); + if (_queue.size() > n) { + _queue.pop(); + } + } + } + + std::list to() + { + std::list result; + while (!_queue.empty()) { + result.emplace_front(_queue.top()); + _queue.pop(); + } + return std::move(result); + } + +protected: + data_priority_queue _queue; +}; + // ---------------------------------------------------------------------- // c_escape_string() // Copies 'src' to 'dest', escaping dangerous characters using @@ -78,3 +110,4 @@ inline rocksdb::Slice to_rocksdb_slice(dsn::string_view s) { return {s.data(), s } // namespace utils } // namespace pegasus + diff --git a/src/base/test/CMakeLists.txt b/src/base/test/CMakeLists.txt new file mode 100644 index 0000000000..631aeef4e1 --- /dev/null +++ b/src/base/test/CMakeLists.txt @@ -0,0 +1,20 @@ +set(MY_PROJ_NAME base_test) +project(${MY_PROJ_NAME} C CXX) + +# Source files under CURRENT project directory will be automatically included. +# You can manually set MY_PROJ_SRC to include source files under other directories. +set(MY_PROJ_SRC "") + +# Search mode for source files under CURRENT project directory? +# "GLOB_RECURSE" for recursive search +# "GLOB" for non-recursive search +set(MY_SRC_SEARCH_MODE "GLOB") + +set(MY_PROJ_LIBS + gtest) + +set(MY_BOOST_PACKAGES system filesystem) + +set(MY_BINPLACES "config.ini") + +dsn_add_executable() diff --git a/src/base/test/config.ini b/src/base/test/config.ini new file mode 100644 index 0000000000..05d9396976 --- /dev/null +++ b/src/base/test/config.ini @@ -0,0 +1,77 @@ +[apps..default] +run = true +count = 1 +;network.client.RPC_CHANNEL_TCP = dsn::tools::sim_network_provider, 65536 +;network.client.RPC_CHANNEL_UDP = dsn::tools::sim_network_provider, 65536 +;network.server.0.RPC_CHANNEL_TCP = dsn::tools::sim_network_provider, 65536 + +[apps.mimic] +type = dsn.app.mimic +arguments = +pools = THREAD_POOL_DEFAULT +run = true +count = 1 + +[core] +;tool = simulator +;tool = fastrun +tool = nativerun +;toollets = tracer +;toollets = tracer, profiler, fault_injector +pause_on_start = false +cli_local = false +cli_remote = false + +;aio_factory_name = dsn::tools::native_aio_provider +start_nfs = false + +logging_start_level = LOG_LEVEL_DEBUG +logging_factory_name = dsn::tools::simple_logger +;logging_factory_name = dsn::tools::screen_logger +;logging_factory_name = dsn::tools::hpc_logger +logging_flush_on_exit = true + +enable_default_app_mimic = true + +data_dir = ./data + +[tools.simple_logger] +short_header = true +fast_flush = true +max_number_of_log_files_on_disk = 10 +stderr_start_level = LOG_LEVEL_ERROR + +[tools.hpc_logger] +per_thread_buffer_bytes = 8192 +max_number_of_log_files_on_disk = 10 + +[tools.simulator] +random_seed = 0 + +[network] +; how many network threads for network library(used by asio) +io_service_worker_count = 4 + +; specification for each thread pool +[threadpool..default] +worker_count = 4 + +[threadpool.THREAD_POOL_DEFAULT] +name = default +partitioned = false +max_input_queue_length = 1024 +worker_priority = THREAD_xPRIORITY_NORMAL +worker_count = 4 + +[task..default] +is_trace = false +is_profile = false +allow_inline = false +fast_execution_in_network_thread = false +rpc_call_header_format = NET_HDR_DSN +rpc_call_channel = RPC_CHANNEL_TCP +rpc_timeout_milliseconds = 5000 + +[uri-resolver.dsn://onebox] +factory = partition_resolver_simple +arguments = 127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603 diff --git a/src/base/test/main.cpp b/src/base/test/main.cpp new file mode 100644 index 0000000000..95d26e3bab --- /dev/null +++ b/src/base/test/main.cpp @@ -0,0 +1,13 @@ +// Copyright (c) 2017, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#include +#include + +GTEST_API_ int main(int argc, char **argv) +{ + testing::InitGoogleTest(&argc, argv); + int ans = RUN_ALL_TESTS(); + dsn_exit(ans); +} diff --git a/src/base/test/utils_test.cpp b/src/base/test/utils_test.cpp new file mode 100644 index 0000000000..75396988cc --- /dev/null +++ b/src/base/test/utils_test.cpp @@ -0,0 +1,51 @@ +#include "../pegasus_utils.h" +#include + +namespace pegasus { +namespace utils { + +TEST(utils_test, top_n) +{ + { + std::list data({2, 3, 7, 8, 9, 0, 1, 5, 4, 6}); + std::list result = top_n(data, 5).to(); + ASSERT_EQ(result, std::list({0, 1, 2, 3, 4})); + } + + { + std::list data({"2", "3", "7", "8", "9", "0", "1", "5", "4", "6"}); + std::list result = top_n(data, 5).to(); + ASSERT_EQ(result, std::list({"0", "1", "2", "3", "4"})); + } + + { + struct longer + { + inline bool operator()(const std::string &l, const std::string &r) + { + return l.length() < r.length(); + } + }; + + std::list data({std::string(2, 'a'), + std::string(3, 'a'), + std::string(7, 'a'), + std::string(8, 'a'), + std::string(9, 'a'), + std::string(0, 'a'), + std::string(1, 'a'), + std::string(5, 'a'), + std::string(4, 'a'), + std::string(6, 'a')}); + std::list result = top_n(data, 5).to(); + ASSERT_EQ(result, + std::list({std::string(0, 'a'), + std::string(1, 'a'), + std::string(2, 'a'), + std::string(3, 'a'), + std::string(4, 'a')})); + } +} + +} // namespace utils +} // namespace pegasus diff --git a/src/geo/CMakeLists.txt b/src/geo/CMakeLists.txt new file mode 100644 index 0000000000..d7d20c2211 --- /dev/null +++ b/src/geo/CMakeLists.txt @@ -0,0 +1,3 @@ +add_subdirectory(lib) +add_subdirectory(test) +add_subdirectory(bench) diff --git a/src/geo/bench/CMakeLists.txt b/src/geo/bench/CMakeLists.txt new file mode 100644 index 0000000000..6fbede8bf5 --- /dev/null +++ b/src/geo/bench/CMakeLists.txt @@ -0,0 +1,31 @@ +set(MY_PROJ_NAME pegasus_geo_bench) +project(${MY_PROJ_NAME} C CXX) + +# Source files under CURRENT project directory will be automatically included. +# You can manually set MY_PROJ_SRC to include source files under other directories. +set(MY_PROJ_SRC "") + +# Search mode for source files under CURRENT project directory? +# "GLOB_RECURSE" for recursive search +# "GLOB" for non-recursive search +set(MY_SRC_SEARCH_MODE "GLOB") + +set(MY_PROJ_INC_PATH "../../../rocksdb") +set(MY_PROJ_LIB_PATH "../../../rocksdb") + +set(MY_PROJ_LIBS + s2 + s2testing + pegasus_geo_lib + pegasus_client_static + fmt + rocksdb + z + bz2 + snappy) + +set(MY_BOOST_PACKAGES system filesystem) + +set(MY_BINPLACES "config.ini") + +dsn_add_executable() diff --git a/src/geo/bench/bench.cpp b/src/geo/bench/bench.cpp new file mode 100644 index 0000000000..fbbd87b088 --- /dev/null +++ b/src/geo/bench/bench.cpp @@ -0,0 +1,109 @@ +// Copyright (c) 2018-present, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#include "geo/lib/geo_client.h" + +#include +#include +#include +#include "monitoring/histogram.h" +#include +#include +#include +#include + +static const int data_count = 10000; + +int main(int argc, char **argv) +{ + if (argc != 7) { + std::cerr << "USAGE: " << argv[0] + << " " + << std::endl; + return -1; + } + + std::string cluster_name = argv[1]; + std::string app_name = argv[2]; + std::string geo_app_name = argv[3]; + double radius = 0.0; + if (!dsn::buf2double(argv[4], radius)) { + std::cerr << "radius is invalid: " << argv[4] << std::endl; + return -1; + } + int test_count = 2000; + if (!dsn::buf2int32(argv[5], test_count)) { + std::cerr << "test_count is invalid: " << argv[5] << std::endl; + return -1; + } + int max_level = 16; + if (!dsn::buf2int32(argv[6], max_level)) { + std::cerr << "max_level is invalid: " << argv[6] << std::endl; + return -1; + } + pegasus::geo::geo_client my_geo("config.ini", + cluster_name.c_str(), + app_name.c_str(), + geo_app_name.c_str(), + new pegasus::geo::latlng_extractor_for_lbs()); + my_geo.set_max_level(max_level); + + // cover beijing 5th ring road + S2LatLngRect rect(S2LatLng::FromDegrees(39.810151, 116.194511), + S2LatLng::FromDegrees(40.028697, 116.535087)); + + // generate data for test + // for (int i = 0; i < data_count; ++i) { + // S2LatLng latlng(S2Testing::SamplePoint(rect)); + // std::string id = std::to_string(i); + // std::string value = id + "|2018-06-05 12:00:00|2018-06-05 13:00:00|abcdefg|" + + // std::to_string(latlng.lng().degrees()) + "|" + + // std::to_string(latlng.lat().degrees()) + "|123.456|456.789|0|-1"; + // + // int ret = my_geo.set(id, "", value, 1000); + // if (ret != pegasus::PERR_OK) { + // std::cerr << "set data failed. error=" << ret << std::endl; + // } + // } + + rocksdb::HistogramImpl latency_histogram; + rocksdb::HistogramImpl result_count_histogram; + rocksdb::Env *env = rocksdb::Env::Default(); + uint64_t start = env->NowNanos(); + std::atomic count(test_count); + dsn::utils::notify_event get_completed; + // test search_radial by lat & lng + for (int i = 0; i < test_count; ++i) { + S2LatLng latlng(S2Testing::SamplePoint(rect)); + + uint64_t start_nanos = env->NowNanos(); + my_geo.async_search_radial( + latlng.lat().degrees(), + latlng.lng().degrees(), + radius, + -1, + pegasus::geo::geo_client::SortType::random, + 500, + [&, start_nanos](int error_code, std::list &&results) { + latency_histogram.Add(env->NowNanos() - start_nanos); + result_count_histogram.Add(results.size()); + uint64_t left = count.fetch_sub(1); + if (left == 1) { + get_completed.notify(); + } + }); + } + std::cout << "get_completed.wait" << std::endl; + get_completed.wait(); + uint64_t end = env->NowNanos(); + + std::cout << "start time: " << start << ", end time: " << end + << ", QPS: " << test_count / ((end - start) / 1e9) << std::endl; + std::cout << "latency_histogram: " << std::endl; + std::cout << latency_histogram.ToString() << std::endl; + std::cout << "result_count_histogram: " << std::endl; + std::cout << result_count_histogram.ToString() << std::endl; + + return 0; +} diff --git a/src/geo/bench/config.ini b/src/geo/bench/config.ini new file mode 100644 index 0000000000..334c196094 --- /dev/null +++ b/src/geo/bench/config.ini @@ -0,0 +1,81 @@ +[apps..default] +run = true +count = 1 +;network.client.RPC_CHANNEL_TCP = dsn::tools::sim_network_provider, 65536 +;network.client.RPC_CHANNEL_UDP = dsn::tools::sim_network_provider, 65536 +;network.server.0.RPC_CHANNEL_TCP = dsn::tools::sim_network_provider, 65536 + +[apps.mimic] +type = dsn.app.mimic +arguments = +pools = THREAD_POOL_DEFAULT +run = true +count = 1 + +[core] +;tool = simulator +;tool = fastrun +tool = nativerun +;toollets = tracer +;toollets = tracer, profiler, fault_injector +pause_on_start = false +cli_local = false +cli_remote = false + +;aio_factory_name = dsn::tools::native_aio_provider +start_nfs = false + +logging_start_level = LOG_LEVEL_DEBUG +logging_factory_name = dsn::tools::simple_logger +;logging_factory_name = dsn::tools::screen_logger +;logging_factory_name = dsn::tools::hpc_logger +logging_flush_on_exit = true + +enable_default_app_mimic = true + +data_dir = ./data + +[tools.simple_logger] +short_header = true +fast_flush = true +max_number_of_log_files_on_disk = 10 +stderr_start_level = LOG_LEVEL_ERROR + +[tools.hpc_logger] +per_thread_buffer_bytes = 8192 +max_number_of_log_files_on_disk = 10 + +[tools.simulator] +random_seed = 0 + +[network] +; how many network threads for network library(used by asio) +io_service_worker_count = 4 + +; specification for each thread pool +[threadpool..default] +worker_count = 4 + +[threadpool.THREAD_POOL_DEFAULT] +name = default +partitioned = false +max_input_queue_length = 1024 +worker_priority = THREAD_xPRIORITY_NORMAL +worker_count = 4 + +[task..default] +is_trace = false +is_profile = false +allow_inline = false +fast_execution_in_network_thread = false +rpc_call_header_format = NET_HDR_DSN +rpc_call_channel = RPC_CHANNEL_TCP +rpc_timeout_milliseconds = 5000 + +[uri-resolver.dsn://onebox] +factory = partition_resolver_simple +arguments = 127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603 + +[geo_client.lib] +max_level = 16 + diff --git a/src/geo/lib/CMakeLists.txt b/src/geo/lib/CMakeLists.txt new file mode 100644 index 0000000000..3139104d14 --- /dev/null +++ b/src/geo/lib/CMakeLists.txt @@ -0,0 +1,23 @@ +set(MY_PROJ_NAME pegasus_geo_lib) +project(${MY_PROJ_NAME} C CXX) + +# Source files under CURRENT project directory will be automatically included. +# You can manually set MY_PROJ_SRC to include source files under other directories. +set(MY_PROJ_SRC "") + +# Search mode for source files under CURRENT project directory? +# "GLOB_RECURSE" for recursive search +# "GLOB" for non-recursive search +set(MY_SRC_SEARCH_MODE "GLOB") + +set(MY_PROJ_INC_PATH "../../include") + +set(MY_PROJ_LIBS + s2 + s2testing + pegasus_client_static + fmt) + +set(MY_BOOST_PACKAGES system filesystem) + +dsn_add_static_library() diff --git a/src/geo/lib/geo_client.cpp b/src/geo/lib/geo_client.cpp new file mode 100644 index 0000000000..12901d1bb8 --- /dev/null +++ b/src/geo/lib/geo_client.cpp @@ -0,0 +1,853 @@ +// Copyright (c) 2018-present, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#include "geo_client.h" + +#include +#include +#include +#include +#include +#include +#include + +namespace pegasus { +namespace geo { + +struct SearchResultNearer +{ + inline bool operator()(const SearchResult &l, const SearchResult &r) + { + return l.distance < r.distance; + } +}; + +struct SearchResultFarther +{ + inline bool operator()(const SearchResult &l, const SearchResult &r) + { + return l.distance > r.distance; + } +}; + +geo_client::geo_client(const char *config_file, + const char *cluster_name, + const char *common_app_name, + const char *geo_app_name, + latlng_extractor *extractor) +{ + bool ok = pegasus_client_factory::initialize(config_file); + dassert(ok, "init pegasus client factory failed"); + + _common_data_client = pegasus_client_factory::get_client(cluster_name, common_app_name); + dassert(_common_data_client != nullptr, "init pegasus _common_data_client failed"); + + _geo_data_client = pegasus_client_factory::get_client(cluster_name, geo_app_name); + dassert(_geo_data_client != nullptr, "init pegasus _geo_data_client failed"); + + _extractor.reset(extractor); + + // default: 16. edge length at level 16 is about 150m + _max_level = (int32_t)dsn_config_get_value_uint64( + "geo_client.lib", "max_level", 16, "max cell level for scan"); +} + +int geo_client::set(const std::string &hash_key, + const std::string &sort_key, + const std::string &value, + int timeout_ms, + int ttl_seconds, + pegasus_client::internal_info *info) +{ + int ret = PERR_OK; + dsn::utils::notify_event set_completed; + auto async_set_callback = [&](int ec_, pegasus_client::internal_info &&info_) { + if (ec_ != PERR_OK) { + derror_f("set data failed. hash_key={}, sort_key={}, error={}", + hash_key, + sort_key, + get_error_string(ec_)); + ret = ec_; + } + if (info != nullptr) { + *info = std::move(info_); + } + set_completed.notify(); + }; + async_set(hash_key, sort_key, value, async_set_callback, timeout_ms, ttl_seconds); + set_completed.wait(); + + return ret; +} + +void geo_client::async_set(const std::string &hash_key, + const std::string &sort_key, + const std::string &value, + pegasus_client::async_set_callback_t &&callback, + int timeout_ms, + int ttl_seconds) +{ + async_del( + hash_key, + sort_key, + true, + [ this, hash_key, sort_key, value, timeout_ms, ttl_seconds, cb = std::move(callback) ]( + int ec_, pegasus_client::internal_info &&info_) { + if (ec_ != PERR_OK) { + cb(ec_, std::move(info_)); + return; + } + + std::shared_ptr ret = std::make_shared(PERR_OK); + std::shared_ptr> set_count = + std::make_shared>(2); + std::shared_ptr info = + std::make_shared(); + auto async_set_callback = + [=](int ec_, pegasus_client::internal_info &&info_, DataType data_type_) { + if (data_type_ == DataType::common) { + *info = std::move(info_); + } + + if (ec_ != PERR_OK) { + derror_f("set {} data failed. hash_key={}, sort_key={}, error={}", + data_type_ == DataType::common ? "common" : "geo", + hash_key, + sort_key, + get_error_string(ec_)); + *ret = ec_; + } + + if (set_count->fetch_sub(1) == 1) { + if (cb != nullptr) { + cb(*ret, std::move(*info)); + } + } + }; + + async_set_common_data( + hash_key, sort_key, value, async_set_callback, timeout_ms, ttl_seconds); + async_set_geo_data( + hash_key, sort_key, value, async_set_callback, timeout_ms, ttl_seconds); + }, + timeout_ms); +} + +int geo_client::del(const std::string &hash_key, + const std::string &sort_key, + int timeout_ms, + pegasus_client::internal_info *info) +{ + int ret = PERR_OK; + dsn::utils::notify_event del_completed; + auto async_del_callback = [&](int ec_, pegasus_client::internal_info &&info_) { + if (ec_ != PERR_OK) { + derror_f("del data failed. hash_key={}, sort_key={}, error={}", + hash_key, + sort_key, + get_error_string(ec_)); + ret = ec_; + } + if (info != nullptr) { + *info = std::move(info_); + } + del_completed.notify(); + }; + async_del(hash_key, sort_key, false, async_del_callback, timeout_ms); + del_completed.wait(); + + return ret; +} + +void geo_client::async_del(const std::string &hash_key, + const std::string &sort_key, + bool keep_common_data, + pegasus_client::async_del_callback_t &&callback, + int timeout_ms) +{ + _common_data_client->async_get( + hash_key, + sort_key, + [ this, hash_key, sort_key, keep_common_data, timeout_ms, cb = std::move(callback) ]( + int ec_, std::string &&value_, pegasus::pegasus_client::internal_info &&info_) { + if (ec_ == PERR_NOT_FOUND) { + if (cb != nullptr) { + cb(PERR_OK, std::move(info_)); + } + return; + } + + if (ec_ != PERR_OK) { + if (cb != nullptr) { + cb(ec_, std::move(info_)); + } + return; + } + + std::string geo_hash_key; + std::string geo_sort_key; + if (!generate_geo_keys(hash_key, sort_key, value_, geo_hash_key, geo_sort_key)) { + cb(PERR_GEO_DECODE_VALUE_ERROR, pegasus_client::internal_info()); + return; + } + + std::shared_ptr ret = std::make_shared(PERR_OK); + std::shared_ptr> del_count = + std::make_shared>(keep_common_data ? 1 : 2); + auto async_del_callback = + [=](int ec__, pegasus_client::internal_info &&, DataType data_type_) mutable { + if (ec__ != PERR_OK) { + derror_f("del {} data failed. hash_key={}, sort_key={}, error={}", + data_type_ == DataType::common ? "common" : "geo", + hash_key, + sort_key, + get_error_string(ec_)); + *ret = ec__; + } + + if (del_count->fetch_sub(1) == 1) { + cb(*ret, std::move(info_)); + } + }; + + if (!keep_common_data) { + async_del_common_data(hash_key, sort_key, async_del_callback, timeout_ms); + } + async_del_geo_data(geo_hash_key, geo_sort_key, async_del_callback, timeout_ms); + }, + timeout_ms); +} + +int geo_client::set_geo_data(const std::string &hash_key, + const std::string &sort_key, + const std::string &value, + int timeout_ms, + int ttl_seconds) +{ + int ret = PERR_OK; + dsn::utils::notify_event set_completed; + auto async_set_callback = [&](int ec_, pegasus_client::internal_info &&info_) { + if (ec_ != PERR_OK) { + ret = ec_; + derror_f("set geo data failed. hash_key={}, sort_key={}, error={}", + hash_key, + sort_key, + get_error_string(ec_)); + } + set_completed.notify(); + }; + async_set_geo_data(hash_key, sort_key, value, async_set_callback, timeout_ms, ttl_seconds); + set_completed.wait(); + return ret; +} + +void geo_client::async_set_geo_data(const std::string &hash_key, + const std::string &sort_key, + const std::string &value, + pegasus_client::async_set_callback_t &&callback, + int timeout_ms, + int ttl_seconds) +{ + async_set_geo_data( + hash_key, + sort_key, + value, + [cb = std::move(callback)](int ec_, pegasus_client::internal_info &&info_, DataType) { + if (cb != nullptr) { + cb(ec_, std::move(info_)); + } + }, + timeout_ms, + ttl_seconds); +} + +int geo_client::search_radial(double lat_degrees, + double lng_degrees, + double radius_m, + int count, + SortType sort_type, + int timeout_ms, + std::list &result) +{ + int ret = PERR_OK; + S2LatLng latlng = S2LatLng::FromDegrees(lat_degrees, lng_degrees); + if (!latlng.is_valid()) { + derror_f("latlng is invalid. lat_degrees={}, lng_degrees={}", lat_degrees, lng_degrees); + return PERR_GEO_INVALID_LATLNG_ERROR; + } + dsn::utils::notify_event search_completed; + async_search_radial(latlng, + radius_m, + count, + sort_type, + timeout_ms, + [&](int ec_, std::list &&result_) { + if (PERR_OK == ec_) { + result = std::move(result_); + } + ret = ec_; + search_completed.notify(); + }); + search_completed.wait(); + return ret; +} + +void geo_client::async_search_radial(double lat_degrees, + double lng_degrees, + double radius_m, + int count, + SortType sort_type, + int timeout_ms, + geo_search_callback_t &&callback) +{ + S2LatLng latlng = S2LatLng::FromDegrees(lat_degrees, lng_degrees); + if (!latlng.is_valid()) { + derror_f("latlng is invalid. lat_degrees={}, lng_degrees={}", lat_degrees, lng_degrees); + callback(PERR_GEO_INVALID_LATLNG_ERROR, {}); + } + + async_search_radial(latlng, radius_m, count, sort_type, timeout_ms, std::move(callback)); +} + +int geo_client::search_radial(const std::string &hash_key, + const std::string &sort_key, + double radius_m, + int count, + SortType sort_type, + int timeout_ms, + std::list &result) +{ + int ret = PERR_OK; + dsn::utils::notify_event search_completed; + async_search_radial(hash_key, + sort_key, + radius_m, + count, + sort_type, + timeout_ms, + [&](int ec_, std::list &&result_) { + if (ec_ != PERR_OK) { + ret = ec_; + } + result = std::move(result_); + search_completed.notify(); + }); + search_completed.wait(); + + return ret; +} + +void geo_client::async_search_radial(const std::string &hash_key, + const std::string &sort_key, + double radius_m, + int count, + SortType sort_type, + int timeout_ms, + geo_search_callback_t &&callback) +{ + _common_data_client->async_get( + hash_key, + sort_key, + [ + this, + hash_key, + sort_key, + radius_m, + count, + sort_type, + timeout_ms, + cb = std::move(callback) + ](int ec_, std::string &&value_, pegasus_client::internal_info &&) mutable { + if (ec_ != PERR_OK) { + derror_f("get failed. hash_key={}, sort_key={}, error={}", + hash_key, + sort_key, + get_error_string(ec_)); + cb(ec_, {}); + return; + } + + S2LatLng latlng; + if (!_extractor->extract_from_value(value_, latlng)) { + derror_f("extract_from_value failed. hash_key={}, sort_key={}, value={}", + hash_key, + sort_key, + value_); + cb(ec_, {}); + return; + } + + async_search_radial( + latlng, radius_m, count, sort_type, (int)ceil(timeout_ms * 0.8), std::move(cb)); + }, + (int)ceil(timeout_ms * 0.2)); +} + +void geo_client::async_search_radial(const S2LatLng &latlng, + double radius_m, + int count, + SortType sort_type, + int timeout_ms, + geo_search_callback_t &&callback) +{ + // generate a cap + std::shared_ptr cap_ptr = std::make_shared(); + gen_search_cap(latlng, radius_m, *cap_ptr); + + // generate cell ids + S2CellUnion cids; + gen_cells_covered_by_cap(*cap_ptr, cids); + + // search data in the cell ids + async_get_result_from_cells(cids, + cap_ptr, + count, + sort_type, + timeout_ms, + [ this, count, sort_type, cb = std::move(callback) ]( + std::list> && results_) { + std::list result; + normalize_result(std::move(results_), count, sort_type, result); + cb(PERR_OK, std::move(result)); + }); +} + +void geo_client::gen_search_cap(const S2LatLng &latlng, double radius_m, S2Cap &cap) +{ + util::units::Meters radius((float)radius_m); + cap = S2Cap(latlng.ToPoint(), S2Earth::ToAngle(radius)); +} + +void geo_client::gen_cells_covered_by_cap(const S2Cap &cap, S2CellUnion &cids) +{ + S2RegionCoverer rc; + rc.mutable_options()->set_fixed_level(_min_level); + cids = rc.GetCovering(cap); +} + +void geo_client::async_get_result_from_cells(const S2CellUnion &cids, + std::shared_ptr cap_ptr, + int count, + SortType sort_type, + int timeout_ms, + scan_all_area_callback_t &&callback) +{ + int single_scan_count = count; + if (sort_type == SortType::asc || sort_type == SortType::desc) { + single_scan_count = -1; // scan all data to make full sort + } + + // scan all cell ids + std::shared_ptr>> results = + std::make_shared>>(); + std::shared_ptr> send_finish = std::make_shared>(false); + std::shared_ptr> scan_count = std::make_shared>(0); + auto single_scan_finish_callback = + [ send_finish, scan_count, results, cb = std::move(callback) ]() + { + // NOTE: make sure fetch_sub is at first of the if expression to make it always execute + if (scan_count->fetch_sub(1) == 1 && send_finish->load()) { + cb(std::move(*results.get())); + } + }; + + for (const auto &cid : cids) { + if (cap_ptr->Contains(S2Cell(cid))) { + // for the full contained cell, scan all data in this cell(which is at the `_min_level`) + results->emplace_back(std::list()); + scan_count->fetch_add(1); + start_scan(cid.ToString(), + "", + "", + cap_ptr, + single_scan_count, + timeout_ms, + single_scan_finish_callback, + results->back()); + } else { + // for the partial contained cell, scan cells covered by the cap at the `_max_level` + // which is more accurate than the ones at `_min_level`, but it will cost more time on + // calculating here. + std::string hash_key = cid.parent(_min_level).ToString(); + std::pair start_stop_sort_keys; + S2CellId pre; + // traverse all sub cell ids of `cid` on `_max_level` along the Hilbert curve, to find + // the needed ones. + for (S2CellId cur = cid.child_begin(_max_level); cur != cid.child_end(_max_level); + cur = cur.next()) { + if (cap_ptr->MayIntersect(S2Cell(cur))) { + // only cells whose any vertex is contained by the cap is needed + if (!pre.is_valid()) { + // `cur` is the very first cell in Hilbert curve contained by the cap + pre = cur; + start_stop_sort_keys.first = gen_start_sort_key(pre, hash_key); + } else { + if (pre.next() != cur) { + // `pre` is the last cell in Hilbert curve contained by the cap + // `cur` is a new start cell in Hilbert curve contained by the cap + start_stop_sort_keys.second = gen_stop_sort_key(pre, hash_key); + results->emplace_back(std::list()); + scan_count->fetch_add(1); + start_scan(hash_key, + std::move(start_stop_sort_keys.first), + std::move(start_stop_sort_keys.second), + cap_ptr, + single_scan_count, + timeout_ms, + single_scan_finish_callback, + results->back()); + + start_stop_sort_keys.first = gen_start_sort_key(cur, hash_key); + start_stop_sort_keys.second.clear(); + } + pre = cur; + } + } + } + + dassert(!start_stop_sort_keys.first.empty(), ""); + // the last sub slice of current `cid` on `_max_level` in Hilbert curve covered by `cap` + if (start_stop_sort_keys.second.empty()) { + start_stop_sort_keys.second = gen_stop_sort_key(pre, hash_key); + results->emplace_back(std::list()); + scan_count->fetch_add(1); + start_scan(hash_key, + std::move(start_stop_sort_keys.first), + std::move(start_stop_sort_keys.second), + cap_ptr, + single_scan_count, + timeout_ms, + single_scan_finish_callback, + results->back()); + } + } + } + + // when all scan rpc have received before send_finish is set to true, the callback will never be + // called, so we add 2 lines tricky code as follows + scan_count->fetch_add(1); + send_finish->store(true); + single_scan_finish_callback(); +} + +void geo_client::normalize_result(std::list> &&results, + int count, + SortType sort_type, + std::list &result) +{ + result.clear(); + for (auto &r : results) { + result.splice(result.end(), r); + if (sort_type == SortType::random && count > 0 && result.size() >= count) { + break; + } + } + + if (sort_type == SortType::asc) { + result = utils::top_n(result, count).to(); + } else if (sort_type == SortType::desc) { + result = utils::top_n(result, count).to(); + } else if (count > 0 && result.size() > count) { + result.resize((size_t)count); + } +} + +bool geo_client::generate_geo_keys(const std::string &hash_key, + const std::string &sort_key, + const std::string &value, + std::string &geo_hash_key, + std::string &geo_sort_key) +{ + // extract latitude and longitude from value + S2LatLng latlng; + if (!_extractor->extract_from_value(value, latlng)) { + derror_f("extract_from_value failed. value={}", value); + return false; + } + + // generate hash key + S2CellId leaf_cell_id = S2Cell(latlng).id(); + S2CellId parent_cell_id = leaf_cell_id.parent(_min_level); + geo_hash_key = parent_cell_id.ToString(); // [0,5]{1}/[0,3]{_min_level} + + // generate sort key + dsn::blob sort_key_postfix; + pegasus_generate_key(sort_key_postfix, hash_key, sort_key); + geo_sort_key = leaf_cell_id.ToString().substr(geo_hash_key.length()) + ":" + + sort_key_postfix.to_string(); // [0,3]{30-_min_level}:combine_keys + + return true; +} + +bool geo_client::restore_origin_keys(const std::string &geo_sort_key, + std::string &origin_hash_key, + std::string &origin_sort_key) +{ + // geo_sort_key: [0,3]{30-_min_level}:combine_keys + int cid_prefix_len = 30 - _min_level + 1; + if (geo_sort_key.length() <= cid_prefix_len) { + return false; + } + + auto origin_keys_len = static_cast(geo_sort_key.length() - cid_prefix_len); + pegasus_restore_key(dsn::blob(geo_sort_key.c_str(), cid_prefix_len, origin_keys_len), + origin_hash_key, + origin_sort_key); + + return true; +} + +std::string geo_client::gen_sort_key(const S2CellId &max_level_cid, const std::string &hash_key) +{ + return max_level_cid.ToString().substr(hash_key.length()); +} + +std::string geo_client::gen_start_sort_key(const S2CellId &max_level_cid, + const std::string &hash_key) +{ + return gen_sort_key(max_level_cid, hash_key); +} + +std::string geo_client::gen_stop_sort_key(const S2CellId &max_level_cid, + const std::string &hash_key) +{ + return gen_sort_key(max_level_cid, hash_key) + "z"; +} + +void geo_client::async_set_common_data(const std::string &hash_key, + const std::string &sort_key, + const std::string &value, + update_callback_t &&callback, + int timeout_ms, + int ttl_seconds) +{ + _common_data_client->async_set( + hash_key, + sort_key, + value, + [cb = std::move(callback)](int error_code, pegasus_client::internal_info &&info) { + cb(error_code, std::move(info), DataType::common); + }, + timeout_ms, + ttl_seconds); +} + +void geo_client::async_set_geo_data(const std::string &hash_key, + const std::string &sort_key, + const std::string &value, + update_callback_t &&callback, + int timeout_ms, + int ttl_seconds) +{ + std::string geo_hash_key; + std::string geo_sort_key; + if (!generate_geo_keys(hash_key, sort_key, value, geo_hash_key, geo_sort_key)) { + callback(PERR_GEO_DECODE_VALUE_ERROR, pegasus_client::internal_info(), DataType::geo); + return; + } + + _geo_data_client->async_set( + geo_hash_key, + geo_sort_key, + value, + [cb = std::move(callback)](int error_code, pegasus_client::internal_info &&info) { + cb(error_code, std::move(info), DataType::geo); + }, + timeout_ms, + ttl_seconds); +} + +void geo_client::async_del_common_data(const std::string &hash_key, + const std::string &sort_key, + update_callback_t &&callback, + int timeout_ms) +{ + _common_data_client->async_del( + hash_key, + sort_key, + [cb = std::move(callback)](int error_code, pegasus_client::internal_info &&info) { + cb(error_code, std::move(info), DataType::common); + }, + timeout_ms); +} + +void geo_client::async_del_geo_data(const std::string &geo_hash_key, + const std::string &geo_sort_key, + update_callback_t &&callback, + int timeout_ms) +{ + _geo_data_client->async_del( + geo_hash_key, + geo_sort_key, + [cb = std::move(callback)](int error_code, pegasus_client::internal_info &&info) { + cb(error_code, std::move(info), DataType::geo); + }, + timeout_ms); +} + +void geo_client::start_scan(const std::string &hash_key, + std::string &&start_sort_key, + std::string &&stop_sort_key, + std::shared_ptr cap_ptr, + int count, + int timeout_ms, + scan_one_area_callback &&callback, + std::list &result) +{ + pegasus_client::scan_options options; + options.start_inclusive = true; + options.stop_inclusive = true; + options.batch_size = 1000; + options.timeout_ms = timeout_ms; + + _geo_data_client->async_get_scanner( + hash_key, + start_sort_key, + stop_sort_key, + options, + [ this, cap_ptr, count, cb = std::move(callback), &result ]( + int error_code, pegasus_client::pegasus_scanner *hash_scanner) mutable { + if (error_code == PERR_OK) { + do_scan(hash_scanner->get_smart_wrapper(), cap_ptr, count, std::move(cb), result); + } else { + cb(); + } + }); +} + +void geo_client::do_scan(pegasus_client::pegasus_scanner_wrapper scanner_wrapper, + std::shared_ptr cap_ptr, + int count, + scan_one_area_callback &&callback, + std::list &result) +{ + scanner_wrapper->async_next( + [ this, cap_ptr, count, scanner_wrapper, cb = std::move(callback), &result ]( + int ret, + std::string &&geo_hash_key, + std::string &&geo_sort_key, + std::string &&value, + pegasus_client::internal_info &&info) mutable { + if (ret == PERR_SCAN_COMPLETE) { + cb(); + return; + } + + if (ret != PERR_OK) { + derror_f("async_next failed. error={}", get_error_string(ret)); + cb(); + return; + } + + S2LatLng latlng; + if (!_extractor->extract_from_value(value, latlng)) { + derror_f("extract_from_value failed. value={}", value); + cb(); + return; + } + + double distance = S2Earth::GetDistanceMeters(S2LatLng(cap_ptr->center()), latlng); + if (distance <= S2Earth::ToMeters(cap_ptr->radius())) { + std::string origin_hash_key, origin_sort_key; + if (!restore_origin_keys(geo_sort_key, origin_hash_key, origin_sort_key)) { + derror_f("restore_origin_keys failed. geo_sort_key={}", geo_sort_key); + cb(); + return; + } + + result.emplace_back(SearchResult(latlng.lat().degrees(), + latlng.lng().degrees(), + distance, + std::move(origin_hash_key), + std::move(origin_sort_key), + std::move(value))); + } + + if (count != -1 && result.size() >= count) { + cb(); + return; + } + + do_scan(scanner_wrapper, cap_ptr, count, std::move(cb), result); + }); +} + +int geo_client::distance(const std::string &hash_key1, + const std::string &sort_key1, + const std::string &hash_key2, + const std::string &sort_key2, + int timeout_ms, + double &distance) +{ + int ret = PERR_OK; + dsn::utils::notify_event get_completed; + auto async_calculate_callback = [&](int ec_, double &&distance_) { + if (ec_ != PERR_OK) { + derror_f("get distance failed. hash_key1={}, sort_key1={}, hash_key2={}, sort_key2={}, " + "error={}", + hash_key1, + sort_key1, + hash_key2, + sort_key2, + get_error_string(ec_)); + ret = ec_; + } + distance = distance_; + get_completed.notify(); + }; + async_distance( + hash_key1, sort_key1, hash_key2, sort_key2, timeout_ms, async_calculate_callback); + get_completed.wait(); + + return ret; +} + +void geo_client::async_distance(const std::string &hash_key1, + const std::string &sort_key1, + const std::string &hash_key2, + const std::string &sort_key2, + int timeout_ms, + distance_callback_t &&callback) +{ + std::shared_ptr ret = std::make_shared(PERR_OK); + std::shared_ptr mutex = std::make_shared(); + std::shared_ptr> get_result = std::make_shared>(); + auto async_get_callback = [ =, cb = std::move(callback) ]( + int ec_, std::string &&value_, pegasus_client::internal_info &&) + { + if (ec_ != PERR_OK) { + derror_f("get data failed. hash_key1={}, sort_key1={}, hash_key2={}, sort_key2={}, " + "error={}", + hash_key1, + sort_key1, + hash_key2, + sort_key2, + get_error_string(ec_)); + *ret = ec_; + } + + S2LatLng latlng; + if (!_extractor->extract_from_value(value_, latlng)) { + derror_f("extract_from_value failed. value={}", value_); + *ret = PERR_GEO_DECODE_VALUE_ERROR; + } + + std::lock_guard lock(*mutex); + get_result->push_back(latlng); + if (get_result->size() == 2) { + if (*ret == PERR_OK) { + double distance = S2Earth::GetDistanceMeters((*get_result)[0], (*get_result)[1]); + cb(*ret, distance); + } else { + cb(*ret, std::numeric_limits::max()); + } + } + }; + + _common_data_client->async_get(hash_key1, sort_key1, async_get_callback, timeout_ms); + _common_data_client->async_get(hash_key2, sort_key2, async_get_callback, timeout_ms); +} + +} // namespace geo +} // namespace pegasus diff --git a/src/geo/lib/geo_client.h b/src/geo/lib/geo_client.h new file mode 100644 index 0000000000..ed241b8d2c --- /dev/null +++ b/src/geo/lib/geo_client.h @@ -0,0 +1,414 @@ +// Copyright (c) 2018-present, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include "latlng_extractor.h" + +namespace pegasus { +namespace geo { + +struct SearchResult; +using geo_search_callback_t = + std::function &&results)>; +using distance_callback_t = std::function; + +/// the search result structure used by `search_radial` APIs +struct SearchResult +{ + double lat_degrees; // latitude and longitude extract by `latlng_extractor`, in degree + double lng_degrees; + double distance; // distance from the input and the result, in meter + std::string hash_key; // the original hash_key, sort_key, and value when data inserted + std::string sort_key; + std::string value; + std::string cellid; + + explicit SearchResult(double lat = 0.0, + double lng = 0.0, + double dis = 0.0, + std::string &&hk = "", + std::string &&sk = "", + std::string &&v = "", + std::string &&cid = "") + : lat_degrees(lat), + lng_degrees(lng), + distance(dis), + hash_key(std::move(hk)), + sort_key(std::move(sk)), + value(std::move(v)), + cellid(std::move(cid)) + { + } + + std::string to_string() const + { + std::stringstream ss; + ss << "[" << hash_key << " : " << sort_key << " => " << value << ", (" << lat_degrees + << ", " << lng_degrees << "): " << distance << ", " << cellid << "]"; + return ss.str(); + } +}; + +/// geo_client is the class for users to operate geometry data on pegasus +/// geo_client use two separate apps on the same cluster, one for common origin data, the +/// other for geometry data +/// !!!NOTE: API operations on the two separate apps are not atomic!!! +/// we use S2Geometry as the underlying library to calculate geometry data, see more: +/// http://s2geometry.io +class geo_client +{ +public: + enum class SortType + { + random = 0, + asc = 1, + desc = 2, + }; + +public: + /// REQUIRES: app/table `common_app_name` and `geo_app_name` have been created on cluster + /// `cluster_name` + geo_client(const char *config_file, + const char *cluster_name, + const char *common_app_name, + const char *geo_app_name, + latlng_extractor *extractor); + + ~geo_client() { _tracker.wait_outstanding_tasks(); } + + /// + /// \brief set + /// store the k-v to the cluster, both app/table `common_app_name` and `geo_app_name` + /// key is composed of hash_key and sort_key. + /// \param hash_key + /// used to decide which partition to put this k-v + /// \param sort_key + /// all the k-v under hash_key will be sorted by sort_key. + /// \param value + /// the value we want to store. + /// \param timeout_ms + /// if wait longer than this value, will return time out error + /// \param ttl_seconds + /// time to live of this value, if expired, will return not found; 0 means no ttl + /// \return + /// int, the error indicates whether or not the operation is succeeded. + /// this error can be converted to a string using get_error_string() + /// + /// REQUIRES: latitude and longitude can be correctly extracted from `value` by latlng_extractor + int set(const std::string &hash_key, + const std::string &sort_key, + const std::string &value, + int timeout_ms = 5000, + int ttl_seconds = 0, + pegasus_client::internal_info *info = nullptr); + + void async_set(const std::string &hash_key, + const std::string &sort_key, + const std::string &value, + pegasus_client::async_set_callback_t &&callback = nullptr, + int timeout_ms = 5000, + int ttl_seconds = 0); + + /// + /// \brief del + /// remove the k-v from the cluster, both app/table `common_app_name` and `geo_app_name` + /// key is composed of hash_key and sort_key. + /// \param hash_key + /// used to decide which partition to put this k-v + /// \param sort_key + /// all the k-v under hash_key will be sorted by sort_key. + /// \param keep_common_data + /// only delete geo data, keep common data. + /// \param timeout_ms + /// if wait longer than this value, will return time out error + /// \return + /// int, the error indicates whether or not the operation is succeeded. + /// this error can be converted to a string using get_error_string() + /// + int del(const std::string &hash_key, + const std::string &sort_key, + int timeout_ms = 5000, + pegasus_client::internal_info *info = nullptr); + + void async_del(const std::string &hash_key, + const std::string &sort_key, + bool keep_common_data, + pegasus_client::async_del_callback_t &&callback = nullptr, + int timeout_ms = 5000); + + /// + /// \brief set_geo_data + /// store the k-v to the cluster, only app/table `geo_app_name` + /// key is composed of hash_key and sort_key. + /// \param hash_key + /// used to decide which partition to put this k-v + /// \param sort_key + /// all the k-v under hash_key will be sorted by sort_key. + /// \param value + /// the value we want to store. + /// \param timeout_ms + /// if wait longer than this value, will return time out error + /// \param ttl_seconds + /// time to live of this value, if expired, will return not found; 0 means no ttl + /// \return + /// int, the error indicates whether or not the operation is succeeded. + /// this error can be converted to a string using get_error_string() + /// + /// REQUIRES: latitude and longitude can be correctly extracted from `value` by latlng_extractor + int set_geo_data(const std::string &hash_key, + const std::string &sort_key, + const std::string &value, + int timeout_ms = 5000, + int ttl_seconds = 0); + + void async_set_geo_data(const std::string &hash_key, + const std::string &sort_key, + const std::string &value, + pegasus_client::async_set_callback_t &&callback = nullptr, + int timeout_ms = 5000, + int ttl_seconds = 0); + + /// + /// \brief search_radial + /// search data from app/table `geo_app_name`, the results are `radius_m` meters far from + /// the (lat_degrees, lng_degrees). + /// \param lat_degrees + /// latitude in degree, range in [-90.0, 90.0] + /// \param lng_degrees + /// longitude in degree, range in [-180.0, 180.0] + /// \param radius_m + /// the results are limited by its distance from the (lat_degrees, lng_degrees). + /// \param count + /// limit results count + /// \param sort_type + /// results sorted type + /// \param timeout_ms + /// if wait longer than this value, will return time out error + /// \param result + /// results container + /// \return + /// int, the error indicates whether or not the operation is succeeded. + /// this error can be converted to a string using get_error_string() + /// + int search_radial(double lat_degrees, + double lng_degrees, + double radius_m, + int count, + SortType sort_type, + int timeout_ms, + std::list &result); + + void async_search_radial(double lat_degrees, + double lng_degrees, + double radius_m, + int count, + SortType sort_type, + int timeout_ms, + geo_search_callback_t &&callback); + + /// + /// \brief search_radial + /// search data from app/table `geo_app_name`, the results are `radius_m` meters far from + /// the (lat_degrees, lng_degrees). + /// \param hash_key + /// used to decide which partition to get this k-v + /// \param sort_key + /// all the k-v under hash_key will be sorted by sort_key. + /// \param radius_m + /// the results are limited by its distance from the (lat_degrees, lng_degrees). + /// \param count + /// limit results count + /// \param sort_type + /// results sorted type + /// \param timeout_ms + /// if wait longer than this value, will return time out error + /// \param result + /// results container + /// \return + /// int, the error indicates whether or not the operation is succeeded. + /// this error can be converted to a string using get_error_string() + /// + /// REQUIRES: latitude and longitude can be correctly extracted by latlng_extractor from the + /// value corresponding to `hash_key` and `sort_key` + int search_radial(const std::string &hash_key, + const std::string &sort_key, + double radius_m, + int count, + SortType sort_type, + int timeout_ms, + std::list &result); + + void async_search_radial(const std::string &hash_key, + const std::string &sort_key, + double radius_m, + int count, + SortType sort_type, + int timeout_ms, + geo_search_callback_t &&callback); + + /// + /// \brief distance + /// get the distance of the two given keys + /// \param hash_key1, hash_key2 + /// used to decide which partition to get this k-v + /// \param sort_key1, sort_key2 + /// all the k-v under hash_key will be sorted by sort_key. + /// \param distance + /// the returned distance of the two given keys. + /// \return + /// int, the error indicates whether or not the operation is succeeded. + /// this error can be converted to a string using get_error_string() + int distance(const std::string &hash_key1, + const std::string &sort_key1, + const std::string &hash_key2, + const std::string &sort_key2, + int timeout_ms, + double &distance); + + void async_distance(const std::string &hash_key1, + const std::string &sort_key1, + const std::string &hash_key2, + const std::string &sort_key2, + int timeout_ms, + distance_callback_t &&callback); + + const char *get_error_string(int error_code) const + { + return _common_data_client->get_error_string(error_code); + } + + void set_max_level(int level) { _max_level = level; } + +private: + friend class geo_client_test; + + enum class DataType + { + common = 0, + geo = 1 + }; + + using update_callback_t = std::function; + using scan_all_area_callback_t = + std::function> &&results)>; + using scan_one_area_callback = std::function; + + // generate hash_key and sort_key in geo database from hash_key and sort_key in common data + // database + // geo hash_key is the prefix of cell id which is calculated from value by `_extractor`, its + // length is associated with `_min_level` + // geo sort_key is composed with the postfix of the same cell id and origin hash_key and + // sort_key + bool generate_geo_keys(const std::string &hash_key, + const std::string &sort_key, + const std::string &value, + std::string &geo_hash_key, + std::string &geo_sort_key); + + bool restore_origin_keys(const std::string &geo_sort_key, + std::string &origin_hash_key, + std::string &origin_sort_key); + + void async_set_common_data(const std::string &hash_key, + const std::string &sort_key, + const std::string &value, + update_callback_t &&callback, + int timeout_ms, + int ttl_seconds); + + void async_set_geo_data(const std::string &hash_key, + const std::string &sort_key, + const std::string &value, + update_callback_t &&callback = nullptr, + int timeout_ms = 5000, + int ttl_seconds = 0); + + void async_del_common_data(const std::string &hash_key, + const std::string &sort_key, + update_callback_t &&callback, + int timeout_ms); + + void async_del_geo_data(const std::string &geo_hash_key, + const std::string &geo_sort_key, + update_callback_t &&callback, + int timeout_ms); + + void async_search_radial(const S2LatLng &latlng, + double radius_m, + int count, + SortType sort_type, + int timeout_ms, + geo_search_callback_t &&callback); + + // generate a cap by center point and radius + void gen_search_cap(const S2LatLng &latlng, double radius_m, S2Cap &cap); + + // generate cell ids covered by the cap on a pre-defined level + void gen_cells_covered_by_cap(const S2Cap &cap, S2CellUnion &cids); + + // search data covered by `cap` in all `cids` + void async_get_result_from_cells(const S2CellUnion &cids, + std::shared_ptr cap_ptr, + int count, + SortType sort_type, + int timeout_ms, + scan_all_area_callback_t &&callback); + + // normalize the result by count, sort type, ... + void normalize_result(std::list> &&results, + int count, + SortType sort_type, + std::list &result); + + // generate sort key of `max_level_cid` under `hash_key` + std::string gen_sort_key(const S2CellId &max_level_cid, const std::string &hash_key); + // generate start sort key of `max_level_cid` under `hash_key` + std::string gen_start_sort_key(const S2CellId &max_level_cid, const std::string &hash_key); + // generate stop sort key of `max_level_cid` under `hash_key` + std::string gen_stop_sort_key(const S2CellId &max_level_cid, const std::string &hash_key); + + void start_scan(const std::string &hash_key, + std::string &&start_sort_key, + std::string &&stop_sort_key, + std::shared_ptr cap_ptr, + int count, + int timeout_ms, + scan_one_area_callback &&callback, + std::list &result); + + void do_scan(pegasus_client::pegasus_scanner_wrapper scanner_wrapper, + std::shared_ptr cap_ptr, + int count, + scan_one_area_callback &&callback, + std::list &result); + +private: + // cell id at this level is the hash-key in pegasus + // `_min_level` is immutable after geo_client data has been inserted into DB. + const int _min_level = 12; // edge length at level 12 is about 2km + + // cell id at this level is the prefix of sort-key in pegasus, and + // it's convenient for scan operation + // `_max_level` is mutable at any time, and geo_client-lib users can change it to a appropriate + // value + // to improve performance in their scenario. + int _max_level = 16; + + dsn::task_tracker _tracker; + + std::shared_ptr _extractor = nullptr; + pegasus_client *_common_data_client = nullptr; + pegasus_client *_geo_data_client = nullptr; +}; + +} // namespace geo +} // namespace pegasus diff --git a/src/geo/lib/latlng_extractor.cpp b/src/geo/lib/latlng_extractor.cpp new file mode 100644 index 0000000000..28fa65e69d --- /dev/null +++ b/src/geo/lib/latlng_extractor.cpp @@ -0,0 +1,66 @@ +// Copyright (c) 2018-present, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#include +#include "latlng_extractor.h" + +namespace pegasus { +namespace geo { + +void extract_indexs(const std::string &text, + const std::vector &indexs, + std::vector &values, + char splitter) +{ + size_t begin_pos = 0; + size_t end_pos = 0; + int cur_index = -1; + for (auto index : indexs) { + while (cur_index < index) { + begin_pos = (cur_index == -1 ? 0 : end_pos + 1); // at first time, seek from 0 + // then, seek from end_pos + 1 + end_pos = text.find(splitter, begin_pos); + if (end_pos == std::string::npos) { + break; + } + cur_index++; + } + + if (end_pos == std::string::npos) { + values.emplace_back(text.substr(begin_pos)); + break; + } else { + values.emplace_back(text.substr(begin_pos, end_pos - begin_pos)); + } + } +} + +const char *latlng_extractor_for_lbs::name() const { return "latlng_extractor_for_lbs"; } + +const char *latlng_extractor_for_lbs::value_sample() const +{ + return "00:00:00:00:01:5e|2018-04-26|2018-04-28|ezp8xchrr|160.356396|39.469644|24.0|4.15|0|-1"; +} + +bool latlng_extractor_for_lbs::extract_from_value(const std::string &value, S2LatLng &latlng) const +{ + std::vector data; + extract_indexs(value, {4, 5}, data, '|'); + if (data.size() != 2) { + return false; + } + + std::string lng = data[0]; + std::string lat = data[1]; + double lat_degrees, lng_degrees = 0.0; + if (!dsn::buf2double(lat, lat_degrees) || !dsn::buf2double(lng, lng_degrees)) { + return false; + } + latlng = S2LatLng::FromDegrees(lat_degrees, lng_degrees); + + return latlng.is_valid(); +} + +} // namespace geo +} // namespace pegasus diff --git a/src/geo/lib/latlng_extractor.h b/src/geo/lib/latlng_extractor.h new file mode 100644 index 0000000000..b45efa92d7 --- /dev/null +++ b/src/geo/lib/latlng_extractor.h @@ -0,0 +1,33 @@ +// Copyright (c) 2018-present, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#pragma once + +#include +#include +#include +#include + +namespace pegasus { +namespace geo { + +class latlng_extractor +{ +public: + virtual ~latlng_extractor() = default; + virtual const char *name() const = 0; + virtual const char *value_sample() const = 0; + virtual bool extract_from_value(const std::string &value, S2LatLng &latlng) const = 0; +}; + +class latlng_extractor_for_lbs : public latlng_extractor +{ +public: + const char *name() const final; + const char *value_sample() const final; + bool extract_from_value(const std::string &value, S2LatLng &latlng) const final; +}; + +} // namespace geo +} // namespace pegasus diff --git a/src/geo/test/CMakeLists.txt b/src/geo/test/CMakeLists.txt new file mode 100644 index 0000000000..14b2ceb545 --- /dev/null +++ b/src/geo/test/CMakeLists.txt @@ -0,0 +1,27 @@ +set(MY_PROJ_NAME pegasus_geo_test) +project(${MY_PROJ_NAME} C CXX) + +# Source files under CURRENT project directory will be automatically included. +# You can manually set MY_PROJ_SRC to include source files under other directories. +set(MY_PROJ_SRC "") + +# Search mode for source files under CURRENT project directory? +# "GLOB_RECURSE" for recursive search +# "GLOB" for non-recursive search +set(MY_SRC_SEARCH_MODE "GLOB") + +set(MY_PROJ_LIBS + s2testing + pegasus_geo_lib + s2 + pegasus_client_static + fmt + gtest) + +set(MY_BOOST_PACKAGES system filesystem) + +add_definitions(-Wno-attributes) + +set(MY_BINPLACES "config.ini") + +dsn_add_executable() diff --git a/src/geo/test/config.ini b/src/geo/test/config.ini new file mode 100644 index 0000000000..05d9396976 --- /dev/null +++ b/src/geo/test/config.ini @@ -0,0 +1,77 @@ +[apps..default] +run = true +count = 1 +;network.client.RPC_CHANNEL_TCP = dsn::tools::sim_network_provider, 65536 +;network.client.RPC_CHANNEL_UDP = dsn::tools::sim_network_provider, 65536 +;network.server.0.RPC_CHANNEL_TCP = dsn::tools::sim_network_provider, 65536 + +[apps.mimic] +type = dsn.app.mimic +arguments = +pools = THREAD_POOL_DEFAULT +run = true +count = 1 + +[core] +;tool = simulator +;tool = fastrun +tool = nativerun +;toollets = tracer +;toollets = tracer, profiler, fault_injector +pause_on_start = false +cli_local = false +cli_remote = false + +;aio_factory_name = dsn::tools::native_aio_provider +start_nfs = false + +logging_start_level = LOG_LEVEL_DEBUG +logging_factory_name = dsn::tools::simple_logger +;logging_factory_name = dsn::tools::screen_logger +;logging_factory_name = dsn::tools::hpc_logger +logging_flush_on_exit = true + +enable_default_app_mimic = true + +data_dir = ./data + +[tools.simple_logger] +short_header = true +fast_flush = true +max_number_of_log_files_on_disk = 10 +stderr_start_level = LOG_LEVEL_ERROR + +[tools.hpc_logger] +per_thread_buffer_bytes = 8192 +max_number_of_log_files_on_disk = 10 + +[tools.simulator] +random_seed = 0 + +[network] +; how many network threads for network library(used by asio) +io_service_worker_count = 4 + +; specification for each thread pool +[threadpool..default] +worker_count = 4 + +[threadpool.THREAD_POOL_DEFAULT] +name = default +partitioned = false +max_input_queue_length = 1024 +worker_priority = THREAD_xPRIORITY_NORMAL +worker_count = 4 + +[task..default] +is_trace = false +is_profile = false +allow_inline = false +fast_execution_in_network_thread = false +rpc_call_header_format = NET_HDR_DSN +rpc_call_channel = RPC_CHANNEL_TCP +rpc_timeout_milliseconds = 5000 + +[uri-resolver.dsn://onebox] +factory = partition_resolver_simple +arguments = 127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603 diff --git a/src/geo/test/extractor_test.cpp b/src/geo/test/extractor_test.cpp new file mode 100644 index 0000000000..7f264048c2 --- /dev/null +++ b/src/geo/test/extractor_test.cpp @@ -0,0 +1,68 @@ +// Copyright (c) 2018-present, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#include +#include +#include + +namespace pegasus { +namespace geo { + +static std::shared_ptr lbs_extractor = + std::make_shared(); +TEST(latlng_extractor_for_lbs_test, extract_from_value) +{ + ASSERT_EQ(std::string(lbs_extractor->name()), "latlng_extractor_for_lbs"); + + S2LatLng latlng; + ASSERT_TRUE(lbs_extractor->extract_from_value(lbs_extractor->value_sample(), latlng)); + + double lat_degrees = 12.345; + double lng_degrees = 67.890; + std::string test_value = "00:00:00:00:01:5e|2018-04-26|2018-04-28|ezp8xchrr|" + + std::to_string(lng_degrees) + "|" + std::to_string(lat_degrees) + + "|24.043028|4.15921|0|-1"; + ASSERT_TRUE(lbs_extractor->extract_from_value(test_value, latlng)); + ASSERT_LE(std::abs(latlng.lat().degrees() - lat_degrees), 0.000001); + ASSERT_LE(std::abs(latlng.lng().degrees() - lng_degrees), 0.000001); + + test_value = "|2018-04-26|2018-04-28|ezp8xchrr|" + std::to_string(lng_degrees) + "|" + + std::to_string(lat_degrees) + "|24.043028|4.15921|0|-1"; + ASSERT_TRUE(lbs_extractor->extract_from_value(test_value, latlng)); + ASSERT_LE(std::abs(latlng.lat().degrees() - lat_degrees), 0.000001); + ASSERT_LE(std::abs(latlng.lng().degrees() - lng_degrees), 0.000001); + + test_value = "00:00:00:00:01:5e||2018-04-28|ezp8xchrr|" + std::to_string(lng_degrees) + "|" + + std::to_string(lat_degrees) + "|24.043028|4.15921|0|-1"; + ASSERT_TRUE(lbs_extractor->extract_from_value(test_value, latlng)); + ASSERT_LE(std::abs(latlng.lat().degrees() - lat_degrees), 0.000001); + ASSERT_LE(std::abs(latlng.lng().degrees() - lng_degrees), 0.000001); + + test_value = "00:00:00:00:01:5e|2018-04-26|2018-04-28|ezp8xchrr|" + + std::to_string(lng_degrees) + "|" + std::to_string(lat_degrees) + "||4.15921|0|-1"; + ASSERT_TRUE(lbs_extractor->extract_from_value(test_value, latlng)); + ASSERT_LE(std::abs(latlng.lat().degrees() - lat_degrees), 0.000001); + ASSERT_LE(std::abs(latlng.lng().degrees() - lng_degrees), 0.000001); + + test_value = "00:00:00:00:01:5e|2018-04-26|2018-04-28|ezp8xchrr|" + + std::to_string(lng_degrees) + "|" + std::to_string(lat_degrees) + + "|24.043028|4.15921|0|"; + ASSERT_TRUE(lbs_extractor->extract_from_value(test_value, latlng)); + ASSERT_LE(std::abs(latlng.lat().degrees() - lat_degrees), 0.000001); + ASSERT_LE(std::abs(latlng.lng().degrees() - lng_degrees), 0.000001); + + test_value = "00:00:00:00:01:5e|2018-04-26|2018-04-28|ezp8xchrr||" + + std::to_string(lat_degrees) + "|24.043028|4.15921|0|-1"; + ASSERT_FALSE(lbs_extractor->extract_from_value(test_value, latlng)); + + test_value = "00:00:00:00:01:5e|2018-04-26|2018-04-28|ezp8xchrr|" + + std::to_string(lng_degrees) + "||24.043028|4.15921|0|-1"; + ASSERT_FALSE(lbs_extractor->extract_from_value(test_value, latlng)); + + test_value = "00:00:00:00:01:5e|2018-04-26|2018-04-28|ezp8xchrr|||24.043028|4.15921|0|-1"; + ASSERT_FALSE(lbs_extractor->extract_from_value(test_value, latlng)); +} + +} // namespace geo +} // namespace pegasus diff --git a/src/geo/test/geo_test.cpp b/src/geo/test/geo_test.cpp new file mode 100644 index 0000000000..984d15a1fc --- /dev/null +++ b/src/geo/test/geo_test.cpp @@ -0,0 +1,563 @@ +// Copyright (c) 2018-present, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#include "geo/lib/geo_client.h" +#include +#include +#include +#include +#include +#include +#include +#include + +namespace pegasus { +namespace geo { + +class geo_client_test : public ::testing::Test +{ +public: + geo_client_test() + { + _geo_client.reset(new pegasus::geo::geo_client( + "config.ini", "onebox", "temp", "temp_geo", new latlng_extractor_for_lbs())); + } + + pegasus_client *common_data_client() { return _geo_client->_common_data_client; } + + int min_level() { return _geo_client->_min_level; } + + bool generate_geo_keys(const std::string &hash_key, + const std::string &sort_key, + const std::string &value, + std::string &geo_hash_key, + std::string &geo_sort_key) + { + return _geo_client->generate_geo_keys( + hash_key, sort_key, value, geo_hash_key, geo_sort_key); + } + + bool restore_origin_keys(const std::string &geo_sort_key, + std::string &origin_hash_key, + std::string &origin_sort_key) + { + return _geo_client->restore_origin_keys(geo_sort_key, origin_hash_key, origin_sort_key); + } + + void normalize_result(std::list> &&results, + int count, + geo::geo_client::SortType sort_type, + std::list &result) + { + _geo_client->normalize_result(std::move(results), count, sort_type, result); + } + + void gen_search_cap(const S2LatLng &latlng, double radius_m, S2Cap &cap) + { + _geo_client->gen_search_cap(latlng, radius_m, cap); + } + + std::string gen_value(double lat_degrees, double lng_degrees) + { + return "00:00:00:00:01:5e|2018-04-26|2018-04-28|ezp8xchrr|" + std::to_string(lng_degrees) + + "|" + std::to_string(lat_degrees) + "|24.043028|4.15921|0|-1"; + } + +public: + std::shared_ptr _geo_client; +}; + +inline bool operator==(const SearchResult &l, const SearchResult &r) +{ + return l.lat_degrees == r.lat_degrees && l.lng_degrees == r.lng_degrees && + l.distance == r.distance && l.hash_key == r.hash_key && l.sort_key == r.sort_key && + l.value == r.value && l.cellid == r.cellid; +} + +TEST_F(geo_client_test, set_and_del) +{ + double lat_degrees = 12.345; + double lng_degrees = 67.890; + std::string test_hash_key = "test_hash_key"; + std::string test_sort_key = "test_sort_key"; + std::string test_value = gen_value(lat_degrees, lng_degrees); + + // geo set + int ret = _geo_client->set(test_hash_key, test_sort_key, test_value); + ASSERT_EQ(ret, pegasus::PERR_OK); + + // get from common db + std::string value; + ret = common_data_client()->get(test_hash_key, test_sort_key, value); + ASSERT_EQ(ret, pegasus::PERR_OK); + ASSERT_EQ(value, test_value); + + // search the inserted data + { + std::list result; + ret = _geo_client->search_radial( + test_hash_key, test_sort_key, 1, 1, geo::geo_client::SortType::random, 500, result); + ASSERT_EQ(ret, pegasus::PERR_OK); + ASSERT_EQ(result.size(), 1); + ASSERT_NEAR(result.front().distance, 0.0, 1e-6); + ASSERT_EQ(result.front().hash_key, test_hash_key); + ASSERT_EQ(result.front().sort_key, test_sort_key); + ASSERT_EQ(result.front().value, test_value); + } + + { + std::list result; + ret = _geo_client->search_radial( + lat_degrees, lng_degrees, 1, 1, geo::geo_client::SortType::random, 500, result); + ASSERT_EQ(ret, pegasus::PERR_OK); + ASSERT_EQ(result.size(), 1); + ASSERT_NEAR(result.front().distance, 0.0, 1e-6); + ASSERT_EQ(result.front().hash_key, test_hash_key); + ASSERT_EQ(result.front().sort_key, test_sort_key); + ASSERT_EQ(result.front().value, test_value); + } + + // del + ret = _geo_client->del(test_hash_key, test_sort_key); + ASSERT_EQ(ret, pegasus::PERR_OK); + + // get from common db + ret = common_data_client()->get(test_hash_key, test_sort_key, value); + ASSERT_EQ(ret, pegasus::PERR_NOT_FOUND); + + // search the inserted data + { + std::list result; + ret = _geo_client->search_radial( + test_hash_key, test_sort_key, 1, 1, geo::geo_client::SortType::random, 500, result); + ASSERT_EQ(ret, pegasus::PERR_NOT_FOUND); + ASSERT_TRUE(result.empty()); + } + + { + std::list result; + ret = _geo_client->search_radial( + lat_degrees, lng_degrees, 1, 1, geo::geo_client::SortType::random, 500, result); + ASSERT_EQ(ret, pegasus::PERR_OK); + ASSERT_TRUE(result.empty()); + } +} + +TEST_F(geo_client_test, set_geo_data) +{ + double lat_degrees = 56.789; + double lng_degrees = 12.345; + std::string test_hash_key = "test_hash_key_set_geo_data"; + std::string test_sort_key = "test_sort_key_set_geo_data"; + std::string test_value = gen_value(lat_degrees, lng_degrees); + + // geo set_geo_data + int ret = _geo_client->set_geo_data(test_hash_key, test_sort_key, test_value); + ASSERT_EQ(ret, pegasus::PERR_OK); + + // get from common db + std::string value; + ret = common_data_client()->get(test_hash_key, test_sort_key, value); + ASSERT_EQ(ret, pegasus::PERR_NOT_FOUND); + + // search the inserted data + std::list result; + ret = _geo_client->search_radial( + test_hash_key, test_sort_key, 1, 1, geo::geo_client::SortType::random, 500, result); + ASSERT_EQ(ret, pegasus::PERR_NOT_FOUND); + + ret = _geo_client->search_radial( + lat_degrees, lng_degrees, 1, 1, geo::geo_client::SortType::random, 500, result); + ASSERT_EQ(ret, pegasus::PERR_OK); + ASSERT_EQ(result.size(), 1); + ASSERT_NEAR(result.front().distance, 0.0, 1e-6); + ASSERT_EQ(result.front().hash_key, test_hash_key); + ASSERT_EQ(result.front().sort_key, test_sort_key); + ASSERT_EQ(result.front().value, test_value); +} + +TEST_F(geo_client_test, same_point_diff_hash_key) +{ + double lat_degrees = 22.345; + double lng_degrees = 67.890; + std::string test_hash_key = "test_hash_key"; + std::string test_sort_key = "test_sort_key"; + std::string test_value1 = gen_value(lat_degrees, lng_degrees); + std::string test_value2 = gen_value(lat_degrees, lng_degrees); + + // geo set + int ret = _geo_client->set(test_hash_key + "1", test_sort_key, test_value1); + ASSERT_EQ(ret, pegasus::PERR_OK); + ret = _geo_client->set(test_hash_key + "2", test_sort_key, test_value2); + ASSERT_EQ(ret, pegasus::PERR_OK); + + // get from common db + std::string value; + ret = common_data_client()->get(test_hash_key + "1", test_sort_key, value); + ASSERT_EQ(ret, pegasus::PERR_OK); + ASSERT_EQ(value, test_value1); + ret = common_data_client()->get(test_hash_key + "2", test_sort_key, value); + ASSERT_EQ(ret, pegasus::PERR_OK); + ASSERT_EQ(value, test_value2); + + // search the inserted data + { + std::list result; + ret = _geo_client->search_radial(test_hash_key + "1", + test_sort_key, + 1, + 2, + geo::geo_client::SortType::random, + 500, + result); + ASSERT_EQ(ret, pegasus::PERR_OK); + ASSERT_EQ(result.size(), 2); + for (auto &r : result) { + ASSERT_NEAR(r.distance, 0.0, 1e-6); + ASSERT_TRUE(r.hash_key == test_hash_key + "1" || r.hash_key == test_hash_key + "2") + << r.hash_key; + ASSERT_EQ(r.sort_key, test_sort_key); + ASSERT_TRUE(r.value == test_value1 || r.value == test_value2) << r.value; + } + } + + { + std::list result; + ret = _geo_client->search_radial( + lat_degrees, lng_degrees, 1, 2, geo::geo_client::SortType::random, 500, result); + ASSERT_EQ(ret, pegasus::PERR_OK); + ASSERT_EQ(result.size(), 2); + for (auto &r : result) { + ASSERT_NEAR(r.distance, 0.0, 1e-6); + ASSERT_TRUE(r.hash_key == test_hash_key + "1" || r.hash_key == test_hash_key + "2") + << r.hash_key; + ASSERT_EQ(r.sort_key, test_sort_key); + ASSERT_TRUE(r.value == test_value1 || r.value == test_value2) << r.value; + } + } + + // del + ret = _geo_client->del(test_hash_key + "1", test_sort_key); + ASSERT_EQ(ret, pegasus::PERR_OK); + ret = _geo_client->del(test_hash_key + "2", test_sort_key); + ASSERT_EQ(ret, pegasus::PERR_OK); +} + +TEST_F(geo_client_test, same_point_diff_sort_key) +{ + double lat_degrees = 32.345; + double lng_degrees = 67.890; + std::string test_hash_key = "test_hash_key"; + std::string test_sort_key = "test_sort_key"; + std::string test_value1 = gen_value(lat_degrees, lng_degrees); + std::string test_value2 = gen_value(lat_degrees, lng_degrees); + + // geo set + int ret = _geo_client->set(test_hash_key, test_sort_key + "1", test_value1); + ASSERT_EQ(ret, pegasus::PERR_OK); + ret = _geo_client->set(test_hash_key, test_sort_key + "2", test_value2); + ASSERT_EQ(ret, pegasus::PERR_OK); + + // get from common db + std::string value; + ret = common_data_client()->get(test_hash_key, test_sort_key + "1", value); + ASSERT_EQ(ret, pegasus::PERR_OK); + ASSERT_EQ(value, test_value1); + ret = common_data_client()->get(test_hash_key, test_sort_key + "2", value); + ASSERT_EQ(ret, pegasus::PERR_OK); + ASSERT_EQ(value, test_value2); + + // search the inserted data + { + std::list result; + ret = _geo_client->search_radial(test_hash_key, + test_sort_key + "1", + 1, + 2, + geo::geo_client::SortType::random, + 500, + result); + ASSERT_EQ(ret, pegasus::PERR_OK); + ASSERT_EQ(result.size(), 2); + for (auto &r : result) { + ASSERT_NEAR(r.distance, 0.0, 1e-6); + ASSERT_EQ(r.hash_key, test_hash_key); + ASSERT_TRUE(r.sort_key == test_sort_key + "1" || r.sort_key == test_sort_key + "2") + << r.sort_key; + ASSERT_TRUE(r.value == test_value1 || r.value == test_value2) << r.value; + } + } + + { + std::list result; + ret = _geo_client->search_radial( + lat_degrees, lng_degrees, 1, 2, geo::geo_client::SortType::random, 500, result); + ASSERT_EQ(ret, pegasus::PERR_OK); + ASSERT_EQ(result.size(), 2); + for (auto &r : result) { + ASSERT_NEAR(r.distance, 0.0, 1e-6); + ASSERT_EQ(r.hash_key, test_hash_key); + ASSERT_TRUE(r.sort_key == test_sort_key + "1" || r.sort_key == test_sort_key + "2") + << r.sort_key; + ASSERT_TRUE(r.value == test_value1 || r.value == test_value2) << r.value; + } + } + + // del + ret = _geo_client->del(test_hash_key, test_sort_key + "1"); + ASSERT_EQ(ret, pegasus::PERR_OK); + ret = _geo_client->del(test_hash_key, test_sort_key + "2"); + ASSERT_EQ(ret, pegasus::PERR_OK); +} + +TEST_F(geo_client_test, generate_and_restore_geo_keys) +{ + std::string geo_hash_key; + std::string geo_sort_key; + ASSERT_FALSE(generate_geo_keys("", "", "", geo_hash_key, geo_sort_key)); + + double lat_degrees = 32.345; + double lng_degrees = 67.890; + std::string test_hash_key = "test_hash_key"; + std::string test_sort_key = "test_sort_key"; + std::string test_value = gen_value(lat_degrees, lng_degrees); + + std::string leaf_cell_id = + S2Cell(S2LatLng::FromDegrees(lat_degrees, lng_degrees)).id().ToString(); + ASSERT_EQ(leaf_cell_id.length(), 32); // 1 width face, 1 width '/' and 30 width level + + ASSERT_TRUE( + generate_geo_keys(test_hash_key, test_sort_key, test_value, geo_hash_key, geo_sort_key)); + ASSERT_EQ(min_level() + 2, geo_hash_key.length()); + ASSERT_EQ(leaf_cell_id.substr(0, geo_hash_key.length()), geo_hash_key); + ASSERT_EQ(leaf_cell_id.substr(geo_hash_key.length()), + geo_sort_key.substr(0, leaf_cell_id.length() - geo_hash_key.length())); + + dsn::blob hash_key_bb, sort_key_bb; + int skip_length = (int)(32 - geo_hash_key.length()) + 1; // postfix of cell id and ':' + pegasus_restore_key(dsn::blob(geo_sort_key.data(), + skip_length, + (unsigned int)(geo_sort_key.length() - skip_length)), + hash_key_bb, + sort_key_bb); + ASSERT_EQ(hash_key_bb.to_string(), test_hash_key); + ASSERT_EQ(sort_key_bb.to_string(), test_sort_key); + + std::string restore_hash_key; + std::string restore_sort_key; + ASSERT_TRUE(restore_origin_keys(geo_sort_key, restore_hash_key, restore_sort_key)); + ASSERT_EQ(test_hash_key, restore_hash_key); + ASSERT_EQ(test_sort_key, restore_sort_key); +} + +TEST_F(geo_client_test, normalize_result_random_order) +{ + geo::SearchResult r1(1.1, 1.1, 1, "test_hash_key_1", "test_sort_key_1", "value_1"); + geo::SearchResult r2(2.2, 2.2, 2, "test_hash_key_2", "test_sort_key_2", "value_2"); + int count = 100; + std::list result; + + { + std::list> results; + results.push_back({geo::SearchResult(r1)}); + normalize_result(std::move(results), count, geo::geo_client::SortType::random, result); + ASSERT_EQ(result.size(), 1); + ASSERT_EQ(result.front(), r1); + } + + { + std::list> results; + results.push_back({geo::SearchResult(r1)}); + results.push_back({geo::SearchResult(r2)}); + normalize_result(std::move(results), 1, geo::geo_client::SortType::random, result); + ASSERT_EQ(result.size(), 1); + ASSERT_EQ(result.front(), r1); + } + + { + std::list> results; + results.push_back({geo::SearchResult(r1)}); + results.push_back({geo::SearchResult(r2)}); + normalize_result(std::move(results), count, geo::geo_client::SortType::random, result); + ASSERT_EQ(result.size(), 2); + ASSERT_EQ(result.front(), r1); + ASSERT_EQ(result.back(), r2); + } + + { + std::list> results; + results.push_back({geo::SearchResult(r1)}); + results.push_back({geo::SearchResult(r2)}); + normalize_result(std::move(results), -1, geo::geo_client::SortType::random, result); + ASSERT_EQ(result.size(), 2); + ASSERT_EQ(result.front(), r1); + ASSERT_EQ(result.back(), r2); + } +} + +TEST_F(geo_client_test, normalize_result_distance_order) +{ + geo::SearchResult r1(1.1, 1.1, 1, "test_hash_key_1", "test_sort_key_1", "value_1"); + geo::SearchResult r2(2.2, 2.2, 2, "test_hash_key_2", "test_sort_key_2", "value_2"); + int count = 100; + std::list result; + + { + std::list> results; + results.push_back({geo::SearchResult(r2)}); + normalize_result(std::move(results), count, geo::geo_client::SortType::asc, result); + ASSERT_EQ(result.size(), 1); + ASSERT_EQ(result.front(), r2); + } + + { + std::list> results; + results.push_back({geo::SearchResult(r1)}); + normalize_result(std::move(results), 1, geo::geo_client::SortType::asc, result); + ASSERT_EQ(result.size(), 1); + ASSERT_EQ(result.front(), r1); + } + + { + std::list> results; + results.push_back({geo::SearchResult(r1)}); + results.push_back({geo::SearchResult(r2)}); + normalize_result(std::move(results), count, geo::geo_client::SortType::asc, result); + ASSERT_EQ(result.size(), 2); + ASSERT_EQ(result.front(), r1); + ASSERT_EQ(result.back(), r2); + } + + { + std::list> results; + results.push_back({geo::SearchResult(r1)}); + results.push_back({geo::SearchResult(r2)}); + normalize_result(std::move(results), -1, geo::geo_client::SortType::asc, result); + ASSERT_EQ(result.size(), 2); + ASSERT_EQ(result.front(), r1); + ASSERT_EQ(result.back(), r2); + } +} + +TEST_F(geo_client_test, distance) +{ + { + double lat_degrees = 80; + double lng_degrees = 27; + std::string test_hash_key = "test_hash_key1"; + std::string test_sort_key = "test_sort_key1"; + std::string test_value = gen_value(lat_degrees, lng_degrees); + int ret = _geo_client->set(test_hash_key, test_sort_key, test_value); + ASSERT_EQ(ret, pegasus::PERR_OK); + } + + { + double lat_degrees = 55; + double lng_degrees = -153; + std::string test_hash_key = "test_hash_key2"; + std::string test_sort_key = "test_sort_key2"; + std::string test_value = gen_value(lat_degrees, lng_degrees); + int ret = _geo_client->set(test_hash_key, test_sort_key, test_value); + ASSERT_EQ(ret, pegasus::PERR_OK); + } + + double distance = 0.0; + int ret = _geo_client->distance( + "test_hash_key1", "test_sort_key1", "test_hash_key2", "test_sort_key2", 2000, distance); + ASSERT_EQ(ret, pegasus::PERR_OK); + ASSERT_NEAR(distance, 1000 * S2Earth::RadiusKm() * M_PI / 4, 1e-6); + + ret = _geo_client->distance( + "test_hash_key1", "test_sort_key1", "test_hash_key1", "test_sort_key1", 2000, distance); + ASSERT_EQ(ret, pegasus::PERR_OK); + ASSERT_NEAR(distance, 0.0, 1e-6); +} + +TEST_F(geo_client_test, large_cap) +{ + double lat_degrees = 40.039752; + double lng_degrees = 116.332557; + double radius_m = 10000; + int test_data_count = 10000; + + S2Cap cap; + gen_search_cap(S2LatLng::FromDegrees(lat_degrees, lng_degrees), radius_m, cap); + for (int i = 0; i < test_data_count; ++i) { + S2LatLng latlng(S2Testing::SamplePoint(cap)); + ASSERT_TRUE(cap.Contains(latlng.ToPoint())); + std::string id = std::to_string(i); + std::string value = id + "|2018-06-05 12:00:00|2018-06-05 13:00:00|abcdefg|" + + std::to_string(latlng.lng().degrees()) + "|" + + std::to_string(latlng.lat().degrees()) + "|123.456|456.789|0|-1"; + + int ret = _geo_client->set(id, "", value, 1000); + ASSERT_EQ(ret, pegasus::PERR_OK); + } + + { + // search the inserted data + std::list result; + int ret = _geo_client->search_radial( + "0", "", radius_m * 2, -1, geo::geo_client::SortType::asc, 5000, result); + ASSERT_EQ(ret, pegasus::PERR_OK); + ASSERT_GE(result.size(), test_data_count); + geo::SearchResult last; + for (const auto &r : result) { + ASSERT_LE(last.distance, r.distance); + uint64_t val; + ASSERT_TRUE(dsn::buf2uint64(r.hash_key.c_str(), val)) << r.hash_key; + ASSERT_LE(0, val); + ASSERT_LE(val, test_data_count); + ASSERT_NE(last.hash_key, r.hash_key); + ASSERT_EQ(r.sort_key, ""); + + double distance = 0.0; + ret = _geo_client->distance("0", "", r.hash_key, r.sort_key, 2000, distance); + ASSERT_EQ(ret, pegasus::PERR_OK); + ASSERT_NEAR(distance, r.distance, 1e-6); + + last = r; + } + } + + { + // search the inserted data + std::list result; + int ret = _geo_client->search_radial( + lat_degrees, lng_degrees, radius_m, -1, geo::geo_client::SortType::asc, 5000, result); + ASSERT_EQ(ret, pegasus::PERR_OK); + ASSERT_GE(result.size(), test_data_count); + + std::string test_hash_key = "test_hash_key_large_cap"; + std::string test_sort_key = "test_sort_key_large_cap"; + std::string test_value = gen_value(lat_degrees, lng_degrees); + ret = _geo_client->set(test_hash_key, test_sort_key, test_value); + ASSERT_EQ(ret, pegasus::PERR_OK); + + geo::SearchResult last; + for (const auto &r : result) { + ASSERT_LE(last.distance, r.distance); + uint64_t val; + ASSERT_TRUE(dsn::buf2uint64(r.hash_key.c_str(), val)); + ASSERT_LE(0, val); + ASSERT_LE(val, test_data_count); + ASSERT_NE(last.hash_key, r.hash_key); + ASSERT_EQ(r.sort_key, ""); + + double distance = 0.0; + ret = _geo_client->distance( + test_hash_key, test_sort_key, r.hash_key, r.sort_key, 2000, distance); + ASSERT_EQ(ret, pegasus::PERR_OK); + ASSERT_NEAR(distance, r.distance, 1e-6); + + last = r; + } + + // del + ret = _geo_client->del(test_hash_key, test_sort_key); + ASSERT_EQ(ret, pegasus::PERR_OK); + } +} +} // namespace geo +} // namespace pegasus diff --git a/src/geo/test/main.cpp b/src/geo/test/main.cpp new file mode 100644 index 0000000000..95d26e3bab --- /dev/null +++ b/src/geo/test/main.cpp @@ -0,0 +1,13 @@ +// Copyright (c) 2017, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#include +#include + +GTEST_API_ int main(int argc, char **argv) +{ + testing::InitGoogleTest(&argc, argv); + int ans = RUN_ALL_TESTS(); + dsn_exit(ans); +} diff --git a/src/include/pegasus/error_def.h b/src/include/pegasus/error_def.h index f4e02a1ea9..774919b0f4 100644 --- a/src/include/pegasus/error_def.h +++ b/src/include/pegasus/error_def.h @@ -26,6 +26,8 @@ PEGASUS_ERR_CODE(PERR_INVALID_VALUE, -203, "value can't be empty"); PEGASUS_ERR_CODE(PERR_INVALID_PAR_COUNT, -204, "partition count must be a power of 2"); PEGASUS_ERR_CODE(PERR_INVALID_REP_COUNT, -205, "replication count must be 3"); PEGASUS_ERR_CODE(PERR_INVALID_SPLIT_COUNT, -206, "split count must be greater than 0"); +PEGASUS_ERR_CODE(PERR_GEO_DECODE_VALUE_ERROR, -207, "decode latitude and longitude from value error"); +PEGASUS_ERR_CODE(PERR_GEO_INVALID_LATLNG_ERROR, -208, "latitude or longitude is invalid"); // SERVER ERROR // start from -301 diff --git a/src/redis_protocol/proxy/CMakeLists.txt b/src/redis_protocol/proxy/CMakeLists.txt index 3078d5821f..f6dfca0906 100644 --- a/src/redis_protocol/proxy/CMakeLists.txt +++ b/src/redis_protocol/proxy/CMakeLists.txt @@ -11,7 +11,12 @@ set(MY_SRC_SEARCH_MODE "GLOB") set(MY_PROJ_INC_PATH "../../include" "../../base" "../proxy_lib") -set(MY_PROJ_LIBS pegasus.rproxylib pegasus.base) +set(MY_PROJ_LIBS pegasus.rproxylib + pegasus.base + pegasus_geo_lib + s2 + pegasus_client_static + fmt) set(MY_BINPLACES "config.ini") diff --git a/src/redis_protocol/proxy/config.ini b/src/redis_protocol/proxy/config.ini index 772ce8e72a..39b1b74153 100644 --- a/src/redis_protocol/proxy/config.ini +++ b/src/redis_protocol/proxy/config.ini @@ -8,7 +8,7 @@ count = 1 [apps.proxy] name = proxy type = proxy -arguments = dsn://redis_cluster/temp +arguments = redis_cluster temp temp_geo ports = 6379 pools = THREAD_POOL_DEFAULT run = true @@ -101,3 +101,4 @@ allow_inline = false [uri-resolver.dsn://redis_cluster] factory = partition_resolver_simple arguments = localhost:34601 + diff --git a/src/redis_protocol/proxy/main.cpp b/src/redis_protocol/proxy/main.cpp index 18ff34de0a..1fa46e2ab4 100644 --- a/src/redis_protocol/proxy/main.cpp +++ b/src/redis_protocol/proxy/main.cpp @@ -16,25 +16,29 @@ namespace proxy { class proxy_app : public ::dsn::service_app { public: - proxy_app(const dsn::service_app_info *info) : service_app(info) {} - virtual ~proxy_app() {} + explicit proxy_app(const dsn::service_app_info *info) : service_app(info) {} - virtual ::dsn::error_code start(const std::vector &args) override + ::dsn::error_code start(const std::vector &args) override { - if (args.size() < 2) + if (args.size() < 2) { return ::dsn::ERR_INVALID_PARAMETERS; + } + proxy_session::factory f = [](proxy_stub *p, dsn_message_t m) { return std::make_shared(p, m); }; - _proxy.reset(new proxy_stub(f, args[1].c_str())); + _proxy = dsn::make_unique( + f, args[1].c_str(), args[2].c_str(), args.size() > 3 ? args[3].c_str() : nullptr); return ::dsn::ERR_OK; } - virtual ::dsn::error_code stop(bool) override { return ::dsn::ERR_OK; } + + ::dsn::error_code stop(bool) final { return ::dsn::ERR_OK; } + private: std::unique_ptr _proxy; }; -} -} // namespace +} // namespace proxy +} // namespace pegasus void register_apps() { ::dsn::service_app::register_factory<::pegasus::proxy::proxy_app>("proxy"); } diff --git a/src/redis_protocol/proxy_lib/CMakeLists.txt b/src/redis_protocol/proxy_lib/CMakeLists.txt index c970d5609c..0fdb2af9e7 100644 --- a/src/redis_protocol/proxy_lib/CMakeLists.txt +++ b/src/redis_protocol/proxy_lib/CMakeLists.txt @@ -15,4 +15,6 @@ set(MY_PROJ_LIBS "") set(MY_PROJ_LIB_PATH "") +add_definitions(-Wno-attributes) + dsn_add_static_library() diff --git a/src/redis_protocol/proxy_lib/proxy_layer.cpp b/src/redis_protocol/proxy_lib/proxy_layer.cpp index 59e3eefdd8..fe2d5712f0 100644 --- a/src/redis_protocol/proxy_lib/proxy_layer.cpp +++ b/src/redis_protocol/proxy_lib/proxy_layer.cpp @@ -11,8 +11,15 @@ namespace pegasus { namespace proxy { -proxy_stub::proxy_stub(const proxy_session::factory &factory, const char *uri) - : serverlet("proxy_stub"), _factory(factory) +proxy_stub::proxy_stub(const proxy_session::factory &f, + const char *cluster, + const char *app, + const char *geo_app) + : serverlet("proxy_stub"), + _factory(f), + _cluster(cluster), + _app(app), + _geo_app(geo_app) { dsn::task_spec::get(RPC_CALL_RAW_MESSAGE)->allow_inline = true; dsn::task_spec::get(RPC_CALL_RAW_SESSION_DISCONNECT)->allow_inline = true; @@ -29,12 +36,12 @@ proxy_stub::proxy_stub(const proxy_session::factory &factory, const char *uri) dsn::task_spec::get(dsn::apps::RPC_RRDB_RRDB_SCAN_ACK)->allow_inline = true; dsn::task_spec::get(dsn::apps::RPC_RRDB_RRDB_CLEAR_SCANNER_ACK)->allow_inline = true; - _uri_address.assign_uri(uri); + _uri_address.assign_uri( + std::string("dsn://").append(_cluster).append("/").append(_app).c_str()); + open_service(); } -proxy_stub::~proxy_stub() {} - void proxy_stub::on_rpc_request(dsn_message_t request) { ::dsn::rpc_address source = dsn_msg_from_address(request); diff --git a/src/redis_protocol/proxy_lib/proxy_layer.h b/src/redis_protocol/proxy_lib/proxy_layer.h index 6f53df2ec7..2ba36f3d35 100644 --- a/src/redis_protocol/proxy_lib/proxy_layer.h +++ b/src/redis_protocol/proxy_lib/proxy_layer.h @@ -57,9 +57,14 @@ class proxy_session : public std::enable_shared_from_this class proxy_stub : public ::dsn::serverlet { public: - proxy_stub(const proxy_session::factory &f, const char *uri); - virtual ~proxy_stub() override; + proxy_stub(const proxy_session::factory &f, + const char *cluster, + const char *app, + const char *geo_app = ""); const ::dsn::rpc_address get_service_uri() const { return _uri_address; } + const char *get_cluster() const { return _cluster.c_str(); } + const char *get_app() const { return _app.c_str(); } + const char *get_geo_app() const { return _geo_app.c_str(); } void open_service() { this->register_rpc_handler( @@ -83,6 +88,9 @@ class proxy_stub : public ::dsn::serverlet std::unordered_map<::dsn::rpc_address, std::shared_ptr> _sessions; proxy_session::factory _factory; ::dsn::rpc_address _uri_address; + std::string _cluster; + std::string _app; + std::string _geo_app; }; } } // namespace diff --git a/src/redis_protocol/proxy_lib/redis_parser.cpp b/src/redis_protocol/proxy_lib/redis_parser.cpp index fd280665aa..9273f38046 100644 --- a/src/redis_protocol/proxy_lib/redis_parser.cpp +++ b/src/redis_protocol/proxy_lib/redis_parser.cpp @@ -2,14 +2,16 @@ // This source code is licensed under the Apache License Version 2.0, which // can be found in the LICENSE file in the root directory of this source tree. +#include "redis_parser.h" + +#include +#include #include #include #include -#include #include #include -#include "redis_parser.h" #define CR '\015' #define LF '\012' @@ -26,6 +28,9 @@ std::unordered_map redis_parser:: {"SETEX", redis_parser::g_setex}, {"TTL", redis_parser::g_ttl}, {"PTTL", redis_parser::g_ttl}, + {"GEODIST", redis_parser::g_geo_dist}, + {"GEORADIUS", redis_parser::g_geo_radius}, + {"GEORADIUSBYMEMBER", redis_parser::g_geo_radius_by_member}, }; redis_parser::redis_call_handler redis_parser::get_handler(const char *command, unsigned int length) @@ -49,10 +54,18 @@ redis_parser::redis_parser(proxy_stub *op, dsn_message_t first_msg) current_cursor(0) { ::dsn::apps::rrdb_client *r; - if (op) + if (op) { r = new ::dsn::apps::rrdb_client(op->get_service_uri()); - else + if (strlen(op->get_geo_app()) != 0) { + _geo_client = dsn::make_unique("config.ini", + op->get_cluster(), + op->get_app(), + op->get_geo_app(), + new geo::latlng_extractor_for_lbs()); + } + } else { r = new ::dsn::apps::rrdb_client(); + } client.reset(r); } @@ -200,6 +213,7 @@ bool redis_parser::end_bulk_string_size() dverify_logged(result, LOG_LEVEL_ERROR, "invalid size string \"%s\"", current_size.c_str()); current_str.length = l; + current_str.data.assign(nullptr, 0, 0); current_size.clear(); if (-1 == current_str.length) { append_current_bulk_string(); @@ -351,6 +365,15 @@ void redis_parser::default_handler(redis_parser::message_entry &entry) } void redis_parser::set(redis_parser::message_entry &entry) +{ + if (_geo_client == nullptr) { + set_internal(entry); + } else { + set_geo_internal(entry); + } +} + +void redis_parser::set_internal(redis_parser::message_entry &entry) { redis_request &request = entry.request; if (request.buffers.size() < 3) { @@ -362,9 +385,11 @@ void redis_parser::set(redis_parser::message_entry &entry) result.message = "ERR wrong number of arguments for 'set' command"; reply_message(entry, result); } else { - // with a reference to prevent the object from being destoryed - std::shared_ptr ref_this = shared_from_this(); + int ttl_seconds = 0; + parse_set_parameters(request.buffers, ttl_seconds); + // with a reference to prevent the object from being destroyed + std::shared_ptr ref_this = shared_from_this(); dinfo("%s: send set command(%" PRId64 ")", remote_address.to_string(), entry.sequence_id); auto on_set_reply = [ref_this, this, &entry](::dsn::error_code ec, dsn_message_t, dsn_message_t response) { @@ -416,13 +441,59 @@ void redis_parser::set(redis_parser::message_entry &entry) ::dsn::blob null_blob; pegasus_generate_key(req.key, request.buffers[1].data, null_blob); req.value = request.buffers[2].data; - req.expire_ts_seconds = 0; + req.expire_ts_seconds = ttl_seconds; auto partition_hash = pegasus_key_hash(req.key); // TODO: set the timeout client->put(req, on_set_reply, std::chrono::milliseconds(2000), 0, partition_hash); } } +// origin command format: +// SET key value [EX seconds] [PX milliseconds] [NX|XX] +// NOTE: only 'EX' option is supported +void redis_parser::set_geo_internal(message_entry &entry) +{ + redis_request &redis_request = entry.request; + if (redis_request.buffers.size() < 3) { + redis_simple_string result; + result.is_error = true; + result.message = "ERR wrong number of arguments for 'SET' command"; + reply_message(entry, result); + } else { + int ttl_seconds = 0; + parse_set_parameters(redis_request.buffers, ttl_seconds); + + // with a reference to prevent the object from being destroyed + std::shared_ptr ref_this = shared_from_this(); + auto set_callback = [ref_this, this, &entry](int ec, pegasus_client::internal_info &&) { + if (is_session_reset.load(std::memory_order_acquire)) { + ddebug("%s: setex command seqid(%" PRId64 ") got reply, but session has reset", + remote_address.to_string(), + entry.sequence_id); + return; + } + + if (PERR_OK != ec) { + redis_simple_string result; + result.is_error = true; + result.message = std::string("ERR ") + _geo_client->get_error_string(ec); + reply_message(entry, result); + } else { + redis_simple_string result; + result.is_error = false; + result.message = "OK"; + reply_message(entry, result); + } + }; + _geo_client->async_set(redis_request.buffers[1].data.to_string(), // key => hash_key + std::string(), // "" => sort_key + redis_request.buffers[2].data.to_string(), // value + set_callback, + 2000, // TODO: set the timeout + ttl_seconds); + } +} + void redis_parser::setex(message_entry &entry) { redis_request &redis_req = entry.request; @@ -577,6 +648,15 @@ void redis_parser::get(message_entry &entry) } void redis_parser::del(message_entry &entry) +{ + if (_geo_client == nullptr) { + del_internal(entry); + } else { + del_geo_internal(entry); + } +} + +void redis_parser::del_internal(message_entry &entry) { redis_request &redis_req = entry.request; if (redis_req.buffers.size() != 2) { @@ -638,6 +718,48 @@ void redis_parser::del(message_entry &entry) } } +// origin command format: +// DEL key [key ...] +// NOTE: only one key is supported +void redis_parser::del_geo_internal(message_entry &entry) +{ + redis_request &redis_request = entry.request; + if (redis_request.buffers.size() != 2) { + redis_simple_string result; + result.is_error = true; + result.message = "ERR wrong number of arguments for 'DEL' command"; + reply_message(entry, result); + } else { + // with a reference to prevent the object from being destroyed + std::shared_ptr ref_this = shared_from_this(); + auto del_callback = [ref_this, this, &entry](int ec, pegasus_client::internal_info &&) { + if (is_session_reset.load(std::memory_order_acquire)) { + ddebug("%s: setex command seqid(%" PRId64 ") got reply, but session has reset", + remote_address.to_string(), + entry.sequence_id); + return; + } + + if (PERR_OK != ec) { + redis_simple_string result; + result.is_error = true; + result.message = std::string("ERR ") + _geo_client->get_error_string(ec); + reply_message(entry, result); + } else { + redis_simple_string result; + result.is_error = false; + result.message = "OK"; + reply_message(entry, result); + } + }; + _geo_client->async_del(redis_request.buffers[1].data.to_string(), // key => hash_key + std::string(), // "" => sort_key + false, + del_callback, + 2000); // TODO: set the timeout + } +} + // process 'ttl' and 'pttl' void redis_parser::ttl(message_entry &entry) { @@ -714,6 +836,330 @@ void redis_parser::ttl(message_entry &entry) } } +// command format: +// GEORADIUS key longitude latitude radius m|km|ft|mi [WITHCOORD] [WITHDIST] [WITHHASH] [COUNT +// count] [ASC|DESC] [STORE key] [STOREDIST key] +// NOTE: [STORE key] [STOREDIST key] are not supported +// NOTE: [WITHHASH] will return origin value of member when enabled +// NOTE: we use SET instead of GEOADD to insert data into pegasus, so there is not a `key` as in +// Redis(`GEOADD key longitude latitude member`), and we consider that all geo data in pegasus is +// under "" key, so when execute 'GEORADIUS' command, the `key` parameter will always be ignored and +// treated as "". +// eg: GEORADIUS "" 146.123 34.567 1000 +void redis_parser::geo_radius(message_entry &entry) +{ + redis_request &redis_request = entry.request; + if (redis_request.buffers.size() < 5) { + redis_simple_string result; + result.is_error = true; + result.message = "ERR wrong number of arguments for 'GEORADIUS' command"; + reply_message(entry, result); + return; + } + + // longitude latitude + double lng_degrees = 0.0; + const std::string &str_lng_degrees = redis_request.buffers[2].data.to_string(); + if (!dsn::buf2double(str_lng_degrees, lng_degrees)) { + dwarn_f("longitude parameter '{}' is error, use {}", str_lng_degrees, lng_degrees); + } + double lat_degrees = 0.0; + const std::string &str_lat_degrees = redis_request.buffers[3].data.to_string(); + if (!dsn::buf2double(str_lat_degrees, lat_degrees)) { + dwarn_f("latitude parameter '{}' is error, use {}", str_lat_degrees, lat_degrees); + } + + // radius m|km|ft|mi [WITHCOORD] [WITHDIST] [COUNT count] [ASC|DESC] + double radius_m = 100.0; + std::string unit; + geo::geo_client::SortType sort_type = geo::geo_client::SortType::random; + int count = -1; + bool WITHCOORD = false; + bool WITHDIST = false; + bool WITHHASH = false; + parse_geo_radius_parameters( + redis_request.buffers, 4, radius_m, unit, sort_type, count, WITHCOORD, WITHDIST, WITHHASH); + + std::shared_ptr ref_this = shared_from_this(); + auto search_callback = [ref_this, this, &entry, unit, WITHCOORD, WITHDIST, WITHHASH]( + int ec, std::list &&results) { + process_geo_radius_result( + entry, unit, WITHCOORD, WITHDIST, WITHHASH, ec, std::move(results)); + }; + + _geo_client->async_search_radial( + lat_degrees, lng_degrees, radius_m, count, sort_type, 2000, search_callback); +} + +// command format: +// GEORADIUSBYMEMBER key member radius m|km|ft|mi [WITHCOORD] [WITHDIST] [WITHHASH] [COUNT count] +// [ASC|DESC] [STORE key] [STOREDIST key] +// NOTE: [STORE key] [STOREDIST key] are not supported +// NOTE: [WITHHASH] will return origin value of member when enabled +// NOTE: we use SET instead of GEOADD to insert data into pegasus, so there is not a `key` as in +// Redis(`GEOADD key longitude latitude member`), and we consider that all geo data in pegasus is +// under "" key, so when execute 'GEORADIUSBYMEMBER' command, the `key` parameter will always be +// ignored and +// treated as "", and the `member` parameter is treated as `key` which is inserted by SET command. +// eg: GEORADIUSBYMEMBER "" some_key 1000 +void redis_parser::geo_radius_by_member(message_entry &entry) +{ + redis_request &redis_request = entry.request; + if (redis_request.buffers.size() < 4) { + redis_simple_string result; + result.is_error = true; + result.message = "ERR wrong number of arguments for 'GEORADIUSBYMEMBER' command"; + reply_message(entry, result); + return; + } + + // member + std::string hash_key = redis_request.buffers[2].data.to_string(); // member => hash_key + + // radius m|km|ft|mi [WITHCOORD] [WITHDIST] [COUNT count] [ASC|DESC] [WITHHASH] + double radius_m = 100.0; + std::string unit; + geo::geo_client::SortType sort_type = geo::geo_client::SortType::random; + int count = -1; + bool WITHCOORD = false; + bool WITHDIST = false; + bool WITHHASH = false; + parse_geo_radius_parameters( + redis_request.buffers, 3, radius_m, unit, sort_type, count, WITHCOORD, WITHDIST, WITHHASH); + + std::shared_ptr ref_this = shared_from_this(); + auto search_callback = [ref_this, this, &entry, unit, WITHCOORD, WITHDIST, WITHHASH]( + int ec, std::list &&results) { + process_geo_radius_result( + entry, unit, WITHCOORD, WITHDIST, WITHHASH, ec, std::move(results)); + }; + + _geo_client->async_search_radial( + hash_key, "", radius_m, count, sort_type, 2000, search_callback); +} + +void redis_parser::parse_set_parameters(const std::vector &opts, + int &ttl_seconds) +{ + // [EX seconds] + ttl_seconds = 0; + for (int i = 3; i < opts.size(); ++i) { + const std::string &opt = opts[i].data.to_string(); + if (strcasecmp(opt.c_str(), "EX") == 0 && i + 1 < opts.size()) { + const std::string &str_ttl_seconds = opts[i + 1].data.to_string(); + if (!dsn::buf2int32(str_ttl_seconds, ttl_seconds)) { + dwarn_f("'EX {}' option is error, use {}", str_ttl_seconds, ttl_seconds); + } + } else { + dwarn_f("only 'EX' option is supported"); + } + } +} + +void redis_parser::parse_geo_radius_parameters(const std::vector &opts, + int base_index, + double &radius_m, + std::string &unit, + geo::geo_client::SortType &sort_type, + int &count, + bool &WITHCOORD, + bool &WITHDIST, + bool &WITHHASH) +{ + // radius + if (base_index >= opts.size()) { + return; + } + const std::string &str_radius = opts[base_index++].data.to_string(); + if (!dsn::buf2double(str_radius, radius_m)) { + dwarn_f("radius parameter '{}' is error, use {}", str_radius, radius_m); + } + + // m|km|ft|mi + if (base_index >= opts.size()) { + return; + } + unit = opts[base_index++].data.to_string(); + if (unit == "km") { + radius_m *= 1000; + } else if (unit == "mi") { + radius_m *= 1609.344; + } else if (unit == "ft") { + radius_m *= 0.3048; + } else { + // keep as meter unit + unit = "m"; + base_index--; + } + + // [WITHCOORD] [WITHDIST] [WITHHASH] [COUNT count] [ASC|DESC] + while (base_index < opts.size()) { + const std::string &opt = opts[base_index].data.to_string(); + if (strcasecmp(opt.c_str(), "WITHCOORD") == 0) { + WITHCOORD = true; + } else if (strcasecmp(opt.c_str(), "WITHDIST") == 0) { + WITHDIST = true; + } else if (strcasecmp(opt.c_str(), "WITHHASH") == 0) { + WITHHASH = true; + } else if (strcasecmp(opt.c_str(), "COUNT") == 0 && base_index + 1 < opts.size()) { + const std::string &str_count = opts[base_index + 1].data.to_string(); + if (!dsn::buf2int32(str_count, count)) { + derror_f("'COUNT {}' option is error, use {}", str_count, count); + } + } else if (strcasecmp(opt.c_str(), "ASC") == 0) { + sort_type = geo::geo_client::SortType::asc; + } else if (strcasecmp(opt.c_str(), "DESC") == 0) { + sort_type = geo::geo_client::SortType::desc; + } + base_index++; + } +} + +void redis_parser::process_geo_radius_result(message_entry &entry, + const std::string &unit, + bool WITHCOORD, + bool WITHDIST, + bool WITHHASH, + int ec, + std::list &&results) +{ + if (is_session_reset.load(std::memory_order_acquire)) { + ddebug("%s: setex command seqid(%" PRId64 ") got reply, but session has reset", + remote_address.to_string(), + entry.sequence_id); + return; + } + + if (PERR_OK != ec) { + redis_simple_string result; + result.is_error = true; + result.message = std::string("ERR ") + _geo_client->get_error_string(ec); + reply_message(entry, result); + } else { + redis_array result; + result.count = (int)results.size(); + for (const auto &elem : results) { + std::shared_ptr key = std::make_shared( + (int)elem.hash_key.size(), elem.hash_key.data()); // hash_key => member + if (!WITHCOORD && !WITHDIST && !WITHHASH) { + // only member + result.array.push_back(key); + } else { + // member and some WITH* parameters + std::shared_ptr sub_array = std::make_shared(); + + // member + sub_array->array.push_back(key); + sub_array->count++; + + // NOTE: the order of WITH* parameters should not be changed for the redis + // protocol + if (WITHDIST) { + // with distance + double distance = elem.distance; + if (unit == "km") { + distance /= 1000; + } else if (unit == "mi") { + distance /= 1609.344; + } else if (unit == "ft") { + distance /= 0.3048; + } else { + // keep as meter unit + } + std::string dist = std::to_string(distance); + std::shared_ptr dist_buf = + dsn::utils::make_shared_array(dist.size()); + memcpy(dist_buf.get(), dist.data(), dist.size()); + sub_array->array.push_back(std::make_shared( + dsn::blob(std::move(dist_buf), (int)dist.size()))); + sub_array->count++; + } + if (WITHCOORD) { + // with coordinate + std::shared_ptr coordinate = std::make_shared(); + + // longitude + std::string lng = std::to_string(elem.lng_degrees); + std::shared_ptr lng_buf = dsn::utils::make_shared_array(lng.size()); + memcpy(lng_buf.get(), lng.data(), lng.size()); + coordinate->array.push_back(std::make_shared( + dsn::blob(std::move(lng_buf), (int)lng.size()))); + coordinate->count++; + + // latitude + std::string lat = std::to_string(elem.lat_degrees); + std::shared_ptr lat_buf = dsn::utils::make_shared_array(lat.size()); + memcpy(lat_buf.get(), lat.data(), lat.size()); + coordinate->array.push_back(std::make_shared( + dsn::blob(std::move(lat_buf), (int)lat.size()))); + coordinate->count++; + + sub_array->array.push_back(coordinate); + sub_array->count++; + } + if (WITHHASH) { + // with origin value + sub_array->array.push_back(std::make_shared( + (int)elem.value.size(), elem.value.data())); + sub_array->count++; + } + result.array.push_back(sub_array); + } + } + reply_message(entry, result); + } +} + +// command format: +// GEODIST key member1 member2 [unit] +void redis_parser::geo_dist(message_entry &entry) +{ + redis_request &redis_request = entry.request; + if (redis_request.buffers.size() < 4) { + redis_simple_string result; + result.is_error = true; + result.message = "ERR wrong number of arguments for 'GEODIST' command"; + reply_message(entry, result); + } else { + // TODO: set the timeout + std::string hash_key1 = redis_request.buffers[2].data.to_string(); // member1 => hash_key1 + std::string hash_key2 = redis_request.buffers[3].data.to_string(); // member2 => hash_key2 + std::string unit = redis_request.buffers[4].data.to_string(); + + std::shared_ptr ref_this = shared_from_this(); + auto get_callback = [ref_this, this, &entry, unit](int error_code, double &&distance) { + if (is_session_reset.load(std::memory_order_acquire)) { + ddebug("%s: setex command seqid(%" PRId64 ") got reply, but session has reset", + remote_address.to_string(), + entry.sequence_id); + return; + } + + if (PERR_OK != error_code) { + redis_simple_string result; + result.is_error = true; + result.message = std::string("ERR ") + _geo_client->get_error_string(error_code); + reply_message(entry, result); + } else { + if (unit == "km") { + distance /= 1000; + } else if (unit == "mi") { + distance /= 1609.344; + } else if (unit == "ft") { + distance /= 0.3048; + } else { + // keep as meter unit + } + + std::string str_distance = std::to_string(distance); + redis_bulk_string result((int)str_distance.size(), str_distance.data()); + reply_message(entry, result); + } + }; + _geo_client->async_distance(hash_key1, "", hash_key2, "", 2000, get_callback); + } +} + void redis_parser::handle_command(std::unique_ptr &&entry) { message_entry &e = *entry.get(); @@ -732,38 +1178,54 @@ void redis_parser::handle_command(std::unique_ptr &&entry) handler(this, e); } -void redis_parser::marshalling(::dsn::binary_writer &write_stream, const redis_bulk_string &bs) +void redis_parser::redis_integer::marshalling(::dsn::binary_writer &write_stream) const { - write_stream.write_pod('$'); - std::string result = boost::lexical_cast(bs.length); - write_stream.write(result.c_str(), result.length()); + write_stream.write_pod(':'); + std::string result = std::to_string(value); + write_stream.write(result.c_str(), (int)result.length()); write_stream.write_pod(CR); write_stream.write_pod(LF); - if (bs.length < 0) - return; - if (bs.length > 0) { - dassert(bs.data.length() == bs.length, "%u VS %d", bs.data.length(), bs.length); - write_stream.write(bs.data.data(), bs.length); - } +} + +void redis_parser::redis_simple_string::marshalling(::dsn::binary_writer &write_stream) const +{ + write_stream.write_pod(is_error ? '-' : '+'); + write_stream.write(message.c_str(), (int)message.length()); write_stream.write_pod(CR); write_stream.write_pod(LF); } -void redis_parser::marshalling(::dsn::binary_writer &write_stream, const redis_simple_string &data) +void redis_parser::redis_bulk_string::marshalling(::dsn::binary_writer &write_stream) const { - write_stream.write_pod(data.is_error ? '-' : '+'); - write_stream.write(data.message.c_str(), data.message.length()); + write_stream.write_pod('$'); + std::string result = std::to_string(length); + write_stream.write(result.c_str(), (int)result.length()); + write_stream.write_pod(CR); + write_stream.write_pod(LF); + if (length < 0) { + return; + } + if (length > 0) { + dassert(data.length() == length, "%u VS %d", data.length(), length); + write_stream.write(data.data(), length); + } write_stream.write_pod(CR); write_stream.write_pod(LF); } -void redis_parser::marshalling(::dsn::binary_writer &write_stream, const redis_integer &data) +void redis_parser::redis_array::marshalling(::dsn::binary_writer &write_stream) const { - write_stream.write_pod(':'); - std::string result = boost::lexical_cast(data.value); - write_stream.write(result.c_str(), result.length()); + write_stream.write_pod('*'); + std::string result = std::to_string(count); + write_stream.write(result.c_str(), (int)result.length()); write_stream.write_pod(CR); write_stream.write_pod(LF); + if (count > 0) { + dassert_f(array.size() == count, "{} VS {}", array.size(), count); + for (const auto &elem : array) { + elem->marshalling(write_stream); + } + } } } } // namespace diff --git a/src/redis_protocol/proxy_lib/redis_parser.h b/src/redis_protocol/proxy_lib/redis_parser.h index d57efc53ba..75f72b2eeb 100644 --- a/src/redis_protocol/proxy_lib/redis_parser.h +++ b/src/redis_protocol/proxy_lib/redis_parser.h @@ -6,7 +6,9 @@ #include #include +#include #include "proxy_layer.h" +#include "geo/lib/geo_client.h" namespace dsn { namespace apps { @@ -29,23 +31,49 @@ class redis_parser : public proxy_session in_bulk_string_size, start_bulk_string_data, }; - struct redis_bulk_string + struct redis_base_type { - int length; - ::dsn::blob data; - redis_bulk_string() : length(0) {} - redis_bulk_string(int len, const char *str) : length(len), data(str, 0, len) {} - redis_bulk_string(const ::dsn::blob &bb) : length(bb.length()), data(bb) {} + virtual ~redis_base_type() = default; + virtual void marshalling(::dsn::binary_writer &write_stream) const = 0; + }; + struct redis_integer : public redis_base_type + { + int64_t value = 0; + + void marshalling(::dsn::binary_writer &write_stream) const final; }; - struct redis_simple_string + // represent both redis simple string and error + struct redis_simple_string : public redis_base_type { - bool is_error; + bool is_error = false; std::string message; + + void marshalling(::dsn::binary_writer &write_stream) const final; }; - struct redis_integer + struct redis_bulk_string : public redis_base_type { - int64_t value; + int length = 0; + ::dsn::blob data; + + redis_bulk_string(const std::string &str) + : length((int)str.length()), data(str.data(), 0, (unsigned int)str.length()) + { + } + redis_bulk_string(int len = 0, const char *str = nullptr) : length(len), data(str, 0, len) + { + } + explicit redis_bulk_string(const ::dsn::blob &bb) : length(bb.length()), data(bb) {} + + void marshalling(::dsn::binary_writer &write_stream) const final; }; + struct redis_array : public redis_base_type + { + int count = 0; + std::list> array; + + void marshalling(::dsn::binary_writer &write_stream) const final; + }; + struct redis_request { int length; @@ -56,14 +84,10 @@ class redis_parser : public proxy_session { redis_request request; std::atomic response; - int64_t sequence_id; + int64_t sequence_id = 0; }; - static void marshalling(::dsn::binary_writer &write_stream, const redis_simple_string &data); - static void marshalling(::dsn::binary_writer &write_stream, const redis_bulk_string &data); - static void marshalling(::dsn::binary_writer &write_stream, const redis_integer &data); - - virtual bool parse(dsn_message_t msg) override; + bool parse(dsn_message_t msg) override; // this is virtual only because we can override and test other modules virtual void handle_command(std::unique_ptr &&entry); @@ -91,8 +115,9 @@ class redis_parser : public proxy_session // for rrdb std::unique_ptr<::dsn::apps::rrdb_client> client; + std::unique_ptr _geo_client; -private: +protected: // function for data stream void append_message(dsn_message_t msg); void prepare_current_buffer(); @@ -120,8 +145,33 @@ class redis_parser : public proxy_session DECLARE_REDIS_HANDLER(del) DECLARE_REDIS_HANDLER(setex) DECLARE_REDIS_HANDLER(ttl) + DECLARE_REDIS_HANDLER(geo_dist) + DECLARE_REDIS_HANDLER(geo_radius) + DECLARE_REDIS_HANDLER(geo_radius_by_member) DECLARE_REDIS_HANDLER(default_handler) + void set_internal(message_entry &entry); + void set_geo_internal(message_entry &entry); + void del_internal(message_entry &entry); + void del_geo_internal(message_entry &entry); + void parse_set_parameters(const std::vector &opts, int &ttl_seconds); + void parse_geo_radius_parameters(const std::vector &opts, + int base_index, + double &radius_m, + std::string &unit, + geo::geo_client::SortType &sort_type, + int &count, + bool &WITHCOORD, + bool &WITHDIST, + bool &WITHHASH); + void process_geo_radius_result(message_entry &entry, + const std::string &unit, + bool WITHCOORD, + bool WITHDIST, + bool WITHHASH, + int ec, + std::list &&results); + // function for pipeline reply void enqueue_pending_response(std::unique_ptr &&entry); void fetch_and_dequeue_messages(std::vector &msgs, bool only_ready_ones); @@ -136,7 +186,7 @@ class redis_parser : public proxy_session dsn_msg_add_ref(resp); dsn::rpc_write_stream s(resp); - marshalling(s, value); + value.marshalling(s); s.commit_buffer(); entry.response.store(resp, std::memory_order_release); @@ -150,7 +200,7 @@ class redis_parser : public proxy_session public: redis_parser(proxy_stub *op, dsn_message_t first_msg); - virtual ~redis_parser(); + ~redis_parser() override; }; } } // namespace diff --git a/src/redis_protocol/proxy_ut/CMakeLists.txt b/src/redis_protocol/proxy_ut/CMakeLists.txt index a999a71c5e..ccf0f780c7 100644 --- a/src/redis_protocol/proxy_ut/CMakeLists.txt +++ b/src/redis_protocol/proxy_ut/CMakeLists.txt @@ -20,10 +20,17 @@ endif() set(MY_BOOST_PACKAGES system filesystem) -set(MY_PROJ_LIBS pegasus.rproxylib pegasus.base ${MY_PROJ_LIBS}) +set(MY_PROJ_LIBS pegasus.rproxylib + pegasus.base ${MY_PROJ_LIBS} + s2testing + pegasus_geo_lib + s2 + pegasus_client_static + fmt) set(MY_BINPLACES "config.ini" "run.sh") add_compile_options("-Wno-dangling-else") +add_definitions(-Wno-attributes) dsn_add_executable() diff --git a/src/redis_protocol/proxy_ut/config.ini b/src/redis_protocol/proxy_ut/config.ini index 491fd0815d..ac4fa11764 100644 --- a/src/redis_protocol/proxy_ut/config.ini +++ b/src/redis_protocol/proxy_ut/config.ini @@ -8,7 +8,7 @@ count = 1 [apps.proxy] name = proxy type = proxy -arguments = dsn://redis_cluster/temp +arguments = redis_cluster temp ports = 12345 pools = THREAD_POOL_DEFAULT run = true diff --git a/src/redis_protocol/proxy_ut/redis_proxy_test.cpp b/src/redis_protocol/proxy_ut/redis_proxy_test.cpp index b8d812afc8..09c1c3a374 100644 --- a/src/redis_protocol/proxy_ut/redis_proxy_test.cpp +++ b/src/redis_protocol/proxy_ut/redis_proxy_test.cpp @@ -3,6 +3,7 @@ // can be found in the LICENSE file in the root directory of this source tree. #include +#include #include #include #include @@ -21,20 +22,21 @@ using namespace ::pegasus::proxy; class proxy_app : public ::dsn::service_app { public: - proxy_app(const dsn::service_app_info *info) : service_app(info) {} - virtual ~proxy_app() {} + explicit proxy_app(const dsn::service_app_info *info) : service_app(info) {} - virtual ::dsn::error_code start(const std::vector &args) override + ::dsn::error_code start(const std::vector &args) override { - if (args.size() < 2) + if (args.size() < 3) { return ::dsn::ERR_INVALID_PARAMETERS; + } + proxy_session::factory f = [](proxy_stub *p, dsn_message_t m) { return std::make_shared(p, m); }; - _proxy.reset(new proxy_stub(f, args[1].c_str())); + _proxy = dsn::make_unique(f, args[1].c_str(), args[2].c_str()); return ::dsn::ERR_OK; } - virtual ::dsn::error_code stop(bool) override { return ::dsn::ERR_OK; } + ::dsn::error_code stop(bool) override { return ::dsn::ERR_OK; } private: std::unique_ptr _proxy; }; @@ -44,10 +46,10 @@ bool blob_compare(const ::dsn::blob &bb1, const ::dsn::blob &bb2) return bb1.length() == bb2.length() && memcmp(bb1.data(), bb2.data(), bb1.length()) == 0; } -class redis_test_parser1 : public redis_parser +class redis_test_parser : public redis_parser { protected: - virtual void handle_command(std::unique_ptr &&entry) + void handle_command(std::unique_ptr &&entry) override { redis_request &act_request = entry->request; redis_request &exp_request = reserved_entry[entry_index]->request; @@ -67,11 +69,13 @@ class redis_test_parser1 : public redis_parser } public: - redis_test_parser1(proxy_stub *stub, dsn_message_t msg) : redis_parser(stub, msg) + redis_test_parser(proxy_stub *stub, dsn_message_t msg) : redis_parser(stub, msg) { reserved_entry.reserve(20); - for (int i = 0; i < 20; ++i) + for (int i = 0; i < 20; ++i) { reserved_entry.emplace_back(new message_entry()); + } + got_a_message = false; } void test_fixed_cases() @@ -105,6 +109,36 @@ class redis_test_parser1 : public redis_parser ASSERT_TRUE(got_a_message); } + // geo GEORADIUS command + { + got_a_message = false; + entry_index = 0; + rr.length = 6; + rr.buffers = { + {9, "GEORADIUS"}, {0, ""}, {5, "123.4"}, {5, "56.78"}, {3, "100"}, {1, "m"}}; + + const char *request_data = "*6\r\n$9\r\nGEORADIUS\r\n$0\r\n\r\n$5\r\n123.4\r\n$5\r\n56." + "78\r\n$3\r\n100\r\n$1\r\nm\r\n"; + auto request = create_message(request_data); + ASSERT_TRUE(parse(request)); + ASSERT_TRUE(got_a_message); + } + + // geo GEORADIUSBYMEMBER command + { + got_a_message = false; + entry_index = 0; + rr.length = 5; + rr.buffers = { + {17, "GEORADIUSBYMEMBER"}, {0, ""}, {7, "member1"}, {6, "1000.5"}, {2, "km"}}; + + const char *request_data = "*5\r\n$17\r\nGEORADIUSBYMEMBER\r\n$0\r\n\r\n$" + "7\r\nmember1\r\n$6\r\n1000.5\r\n$2\r\nkm\r\n"; + auto request = create_message(request_data); + ASSERT_TRUE(parse(request)); + ASSERT_TRUE(got_a_message); + } + // wrong message { got_a_message = false; @@ -242,6 +276,150 @@ class redis_test_parser1 : public redis_parser } } + void test_parse_parameters() + { + double radius_m = 0; + std::string unit; + pegasus::geo::geo_client::SortType sort_type = pegasus::geo::geo_client::SortType::random; + int count = 0; + bool WITHCOORD = false; + bool WITHDIST = false; + bool WITHVALUE = false; + + { + radius_m = 0; + sort_type = pegasus::geo::geo_client::SortType::random; + count = 0; + WITHCOORD = false; + WITHDIST = false; + WITHVALUE = false; + std::vector opts({{"GEORADIUS"}, + {""}, + {"12.3"}, + {"45.6"}, + {"100"}, + {"m"}, + {"WITHCOORD"}, + {"WITHDIST"}, + {"WITHHASH"}, + {"COUNT"}, + {"-1"}, + {"ASC"}, + {"WITHVALUE"}}); + + parse_geo_radius_parameters( + opts, 4, radius_m, unit, sort_type, count, WITHCOORD, WITHDIST, WITHVALUE); + + ASSERT_DOUBLE_EQ(radius_m, 100); + ASSERT_EQ(unit, "m"); + ASSERT_EQ(sort_type, pegasus::geo::geo_client::SortType::asc); + ASSERT_EQ(count, -1); + ASSERT_TRUE(WITHCOORD); + ASSERT_TRUE(WITHDIST); + ASSERT_TRUE(WITHVALUE); + } + + { + radius_m = 0; + sort_type = pegasus::geo::geo_client::SortType::random; + count = 0; + WITHCOORD = false; + WITHDIST = false; + WITHVALUE = false; + std::vector opts({{"GEORADIUS"}, + {""}, + {"12.3"}, + {"45.6"}, + {"100.23"}, + {"km"}, + {"COUNT"}, + {"500"}, + {"DESC"}}); + + parse_geo_radius_parameters( + opts, 4, radius_m, unit, sort_type, count, WITHCOORD, WITHDIST, WITHVALUE); + + ASSERT_DOUBLE_EQ(radius_m, 100230); + ASSERT_EQ(unit, "km"); + ASSERT_EQ(sort_type, pegasus::geo::geo_client::SortType::desc); + ASSERT_EQ(count, 500); + ASSERT_FALSE(WITHCOORD); + ASSERT_FALSE(WITHDIST); + ASSERT_FALSE(WITHVALUE); + } + + { + radius_m = 0; + sort_type = pegasus::geo::geo_client::SortType::random; + count = 0; + WITHCOORD = false; + WITHDIST = false; + WITHVALUE = false; + std::vector opts({{"GEORADIUSBYMEMBER"}, + {""}, + {"somekey"}, + {"100"}, + {"m"}, + {"WITHCOORD"}, + {"WITHDIST"}, + {"WITHHASH"}, + {"COUNT"}, + {"-1"}, + {"ASC"}, + {"WITHVALUE"}}); + + parse_geo_radius_parameters( + opts, 3, radius_m, unit, sort_type, count, WITHCOORD, WITHDIST, WITHVALUE); + + ASSERT_DOUBLE_EQ(radius_m, 100); + ASSERT_EQ(unit, "m"); + ASSERT_EQ(sort_type, pegasus::geo::geo_client::SortType::asc); + ASSERT_EQ(count, -1); + ASSERT_TRUE(WITHCOORD); + ASSERT_TRUE(WITHDIST); + ASSERT_TRUE(WITHVALUE); + } + + { + radius_m = 0; + sort_type = pegasus::geo::geo_client::SortType::random; + count = 0; + WITHCOORD = false; + WITHDIST = false; + WITHVALUE = false; + std::vector opts({{"GEORADIUSBYMEMBER"}, + {""}, + {"somekey"}, + {"100.23"}, + {"km"}, + {"COUNT"}, + {"500"}, + {"DESC"}}); + + parse_geo_radius_parameters( + opts, 3, radius_m, unit, sort_type, count, WITHCOORD, WITHDIST, WITHVALUE); + + ASSERT_DOUBLE_EQ(radius_m, 100230); + ASSERT_EQ(unit, "km"); + ASSERT_EQ(sort_type, pegasus::geo::geo_client::SortType::desc); + ASSERT_EQ(count, 500); + ASSERT_FALSE(WITHCOORD); + ASSERT_FALSE(WITHDIST); + ASSERT_FALSE(WITHVALUE); + } + + { + int ttl_seconds = 0; + std::vector opts({{"SET"}, + {"KK"}, + {"vv"}, + {"EX"}, + {"123"}}); + parse_set_parameters(opts, ttl_seconds); + ASSERT_EQ(ttl_seconds, 123); + } + } + public: static dsn_message_t create_message(const char *data) { @@ -269,8 +447,9 @@ class redis_test_parser1 : public redis_parser stream.write_pod('\n'); for (unsigned int i = 0; i != ra.length; ++i) { - redis_parser::marshalling(stream, ra.buffers[i]); + ra.buffers[i].marshalling(stream); } + dsn_msg_release_ref(m); return result; } @@ -288,9 +467,10 @@ TEST(proxy, parser) dsn::message_ex *msg = (dsn::message_ex *)m; msg->header->from_address = dsn::rpc_address("127.0.0.1", 123); } - std::shared_ptr parser(new redis_test_parser1(nullptr, m)); + std::shared_ptr parser(new redis_test_parser(nullptr, m)); parser->test_fixed_cases(); parser->test_random_cases(); + parser->test_parse_parameters(); dsn_msg_release_ref(m); } diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index 4b98348c50..83c20416d6 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -51,4 +51,6 @@ if (UNIX) SET(CMAKE_BUILD_WITH_INSTALL_RPATH TRUE) endif() +add_definitions(-Wno-attributes) + dsn_add_executable() diff --git a/src/shell/CMakeLists.txt b/src/shell/CMakeLists.txt index 8504681147..f54587aafb 100644 --- a/src/shell/CMakeLists.txt +++ b/src/shell/CMakeLists.txt @@ -29,6 +29,8 @@ set(MY_PROJ_LIBS PocoJSON crypto fmt + pegasus_geo_lib + s2 ) if (UNIX) @@ -48,4 +50,6 @@ if (UNIX) SET(CMAKE_BUILD_WITH_INSTALL_RPATH TRUE) endif() +add_definitions(-Wno-attributes) + dsn_add_executable() diff --git a/src/shell/command_helper.h b/src/shell/command_helper.h index 94c488d224..55b7a6bf22 100644 --- a/src/shell/command_helper.h +++ b/src/shell/command_helper.h @@ -21,6 +21,7 @@ #include #include #include +#include #include "base/pegasus_key_schema.h" #include "base/pegasus_value_schema.h" @@ -45,7 +46,8 @@ enum scan_data_operator { SCAN_COPY, SCAN_CLEAR, - SCAN_COUNT + SCAN_COUNT, + SCAN_GEN_GEO }; class top_container { @@ -99,6 +101,7 @@ struct scan_data_context int timeout_ms; pegasus::pegasus_client::pegasus_scanner_wrapper scanner; pegasus::pegasus_client *client; + pegasus::geo::geo_client *geoclient; std::atomic_bool *error_occurred; std::atomic_long split_rows; std::atomic_long split_request_count; @@ -119,6 +122,7 @@ struct scan_data_context int timeout_ms_, pegasus::pegasus_client::pegasus_scanner_wrapper scanner_, pegasus::pegasus_client *client_, + pegasus::geo::geo_client *geoclient_, std::atomic_bool *error_occurred_, bool stat_size_ = false, int top_count_ = 0) @@ -128,6 +132,7 @@ struct scan_data_context timeout_ms(timeout_ms_), scanner(scanner_), client(client_), + geoclient(geoclient_), error_occurred(error_occurred_), split_rows(0), split_request_count(0), @@ -236,6 +241,31 @@ inline void scan_data_next(scan_data_context *context) } scan_data_next(context); break; + case SCAN_GEN_GEO: + context->split_request_count++; + context->geoclient->async_set( + hash_key, + sort_key, + value, + [context](int err, pegasus::pegasus_client::internal_info &&info) { + if (err != pegasus::PERR_OK) { + if (!context->split_completed.exchange(true)) { + fprintf(stderr, + "ERROR: split[%d] async set failed: %s\n", + context->split_id, + context->client->get_error_string(err)); + context->error_occurred->store(true); + } + } else { + context->split_rows++; + scan_data_next(context); + } + // should put "split_request_count--" at end of the scope, + // to prevent that split_request_count becomes 0 in the middle. + context->split_request_count--; + }, + context->timeout_ms); + break; default: dassert(false, "op = %d", context->op); break; diff --git a/src/shell/commands.h b/src/shell/commands.h index c6194c0c91..2a0b1f9ef9 100644 --- a/src/shell/commands.h +++ b/src/shell/commands.h @@ -1742,19 +1742,22 @@ inline bool copy_data(command_executor *e, shell_context *sc, arguments args) {"max_split_count", required_argument, 0, 's'}, {"max_batch_count", required_argument, 0, 'b'}, {"timeout_ms", required_argument, 0, 't'}, + {"geo_data", no_argument, 0, 'g'}, {0, 0, 0, 0}}; std::string target_cluster_name; std::string target_app_name; + std::string target_geo_app_name; int max_split_count = 100000000; int max_batch_count = 500; int timeout_ms = sc->timeout_ms; + bool is_geo_data = false; optind = 0; while (true) { int option_index = 0; int c; - c = getopt_long(args.argc, args.argv, "c:a:s:b:t:", long_options, &option_index); + c = getopt_long(args.argc, args.argv, "c:a:s:b:t:g", long_options, &option_index); if (c == -1) break; switch (c) { @@ -1763,6 +1766,7 @@ inline bool copy_data(command_executor *e, shell_context *sc, arguments args) break; case 'a': target_app_name = optarg; + target_geo_app_name = target_app_name + "_geo"; break; case 's': if (!dsn::buf2int32(optarg, max_split_count)) { @@ -1782,6 +1786,9 @@ inline bool copy_data(command_executor *e, shell_context *sc, arguments args) return false; } break; + case 'g': + is_geo_data = true; + break; default: return false; } @@ -1816,6 +1823,9 @@ inline bool copy_data(command_executor *e, shell_context *sc, arguments args) fprintf(stderr, "INFO: source_app_name = %s\n", sc->pg_client->get_app_name()); fprintf(stderr, "INFO: target_cluster_name = %s\n", target_cluster_name.c_str()); fprintf(stderr, "INFO: target_app_name = %s\n", target_app_name.c_str()); + if (is_geo_data) { + fprintf(stderr, "INFO: target_geo_app_name = %s\n", target_geo_app_name.c_str()); + } fprintf(stderr, "INFO: max_split_count = %d\n", max_split_count); fprintf(stderr, "INFO: max_batch_count = %d\n", max_batch_count); fprintf(stderr, "INFO: timeout_ms = %d\n", timeout_ms); @@ -1840,6 +1850,16 @@ inline bool copy_data(command_executor *e, shell_context *sc, arguments args) return true; } + pegasus::geo::geo_client *target_geo_client = nullptr; + if (is_geo_data) { + target_geo_client = + new pegasus::geo::geo_client("config.ini", + target_cluster_name.c_str(), + target_app_name.c_str(), + target_geo_app_name.c_str(), + new pegasus::geo::latlng_extractor_for_lbs()); + } + std::vector scanners; pegasus::pegasus_client::scan_options options; options.timeout_ms = timeout_ms; @@ -1848,6 +1868,7 @@ inline bool copy_data(command_executor *e, shell_context *sc, arguments args) fprintf(stderr, "ERROR: open source app scanner failed: %s\n", sc->pg_client->get_error_string(ret)); + delete target_geo_client; return true; } int split_count = scanners.size(); @@ -1856,12 +1877,13 @@ inline bool copy_data(command_executor *e, shell_context *sc, arguments args) std::atomic_bool error_occurred(false); std::vector contexts; for (int i = 0; i < scanners.size(); i++) { - scan_data_context *context = new scan_data_context(SCAN_COPY, + scan_data_context *context = new scan_data_context(is_geo_data ? SCAN_GEN_GEO : SCAN_COPY, i, max_batch_count, timeout_ms, scanners[i]->get_smart_wrapper(), target_client, + target_geo_client, &error_occurred); contexts.push_back(context); dsn::tasking::enqueue(LPC_SCAN_DATA, nullptr, std::bind(scan_data_next, context)); @@ -1918,6 +1940,7 @@ inline bool copy_data(command_executor *e, shell_context *sc, arguments args) delete contexts[i]; } contexts.clear(); + delete target_geo_client; fprintf(stderr, "\nCopy %s, total %ld rows.\n", @@ -2024,6 +2047,7 @@ inline bool clear_data(command_executor *e, shell_context *sc, arguments args) timeout_ms, scanners[i]->get_smart_wrapper(), sc->pg_client, + nullptr, &error_occurred); contexts.push_back(context); dsn::tasking::enqueue(LPC_SCAN_DATA, nullptr, std::bind(scan_data_next, context)); @@ -2207,6 +2231,7 @@ inline bool count_data(command_executor *e, shell_context *sc, arguments args) timeout_ms, scanners[i]->get_smart_wrapper(), sc->pg_client, + nullptr, &error_occurred, stat_size, top_count); diff --git a/src/shell/main.cpp b/src/shell/main.cpp index 8218fe7d6d..4d15b6055f 100644 --- a/src/shell/main.cpp +++ b/src/shell/main.cpp @@ -210,7 +210,7 @@ static command_executor commands[] = { "copy_data", "copy app data", "<-c|--target_cluster_name str> <-a|--target_app_name str> " - "[-s|--max_split_count num] [-b|--max_batch_count num] [-t|--timeout_ms num]", + "[-s|--max_split_count num] [-b|--max_batch_count num] [-t|--timeout_ms num] [-g|--geo_data]", data_operations, }, {