From f2084d4462196a8a26a3bcd0e88df831aaa95ceb Mon Sep 17 00:00:00 2001 From: HeYuchen Date: Thu, 27 Jan 2022 11:04:54 +0800 Subject: [PATCH] feat(bulk_load): support disk_level ingesting restriction part1 - add ingestion_context class (#1035) --- src/meta/meta_bulk_load_ingestion_context.cpp | 195 +++++++++ src/meta/meta_bulk_load_ingestion_context.h | 92 +++++ .../test/meta_bulk_load_ingestion_test.cpp | 390 ++++++++++++++++++ 3 files changed, 677 insertions(+) create mode 100644 src/meta/meta_bulk_load_ingestion_context.cpp create mode 100644 src/meta/meta_bulk_load_ingestion_context.h create mode 100644 src/meta/test/meta_bulk_load_ingestion_test.cpp diff --git a/src/meta/meta_bulk_load_ingestion_context.cpp b/src/meta/meta_bulk_load_ingestion_context.cpp new file mode 100644 index 0000000000..4522ee12f8 --- /dev/null +++ b/src/meta/meta_bulk_load_ingestion_context.cpp @@ -0,0 +1,195 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "meta_bulk_load_ingestion_context.h" + +#include +#include + +namespace dsn { +namespace replication { + +DSN_DEFINE_uint32("meta_server", + bulk_load_node_max_ingesting_count, + 4, + "max partition_count executing ingestion for one node at the same time"); +DSN_TAG_VARIABLE(bulk_load_node_max_ingesting_count, FT_MUTABLE); + +DSN_DEFINE_uint32("meta_server", bulk_load_node_min_disk_count, 1, "min disk count of one node"); +DSN_TAG_VARIABLE(bulk_load_node_min_disk_count, FT_MUTABLE); + +ingestion_context::ingestion_context() { reset_all(); } + +ingestion_context::~ingestion_context() { reset_all(); } + +void ingestion_context::partition_node_info::create(const partition_configuration &config, + const config_context &cc) +{ + pid = config.pid; + std::unordered_set current_nodes; + current_nodes.insert(config.primary); + for (const auto &secondary : config.secondaries) { + current_nodes.insert(secondary); + } + for (const auto &node : current_nodes) { + std::string disk_tag; + if (cc.get_disk_tag(node, disk_tag)) { + node_disk[node] = disk_tag; + } + } +} + +void ingestion_context::node_context::init_disk(const std::string &disk_tag) +{ + if (disk_ingesting_counts.find(disk_tag) != disk_ingesting_counts.end()) { + return; + } + disk_ingesting_counts[disk_tag] = 0; +} + +uint32_t ingestion_context::node_context::get_max_disk_ingestion_count( + const uint32_t max_node_ingestion_count) const +{ + FAIL_POINT_INJECT_F("ingestion_node_context_disk_count", [](string_view count_str) -> uint32_t { + uint32_t count = 0; + buf2uint32(count_str, count); + return count; + }); + + const auto node_disk_count = disk_ingesting_counts.size() > FLAGS_bulk_load_node_min_disk_count + ? disk_ingesting_counts.size() + : FLAGS_bulk_load_node_min_disk_count; + return (max_node_ingestion_count + node_disk_count - 1) / node_disk_count; +} + +bool ingestion_context::node_context::check_if_add(const std::string &disk_tag) +{ + auto max_node_ingestion_count = FLAGS_bulk_load_node_max_ingesting_count; + if (node_ingesting_count >= max_node_ingestion_count) { + dwarn_f("node[{}] has {} partition executing ingestion, max_count = {}", + address.to_string(), + node_ingesting_count, + max_node_ingestion_count); + return false; + } + + auto max_disk_ingestion_count = get_max_disk_ingestion_count(max_node_ingestion_count); + if (disk_ingesting_counts[disk_tag] >= max_disk_ingestion_count) { + dwarn_f("node[{}] disk[{}] has {} partition executing ingestion, max_count = {}", + address.to_string(), + disk_tag, + disk_ingesting_counts[disk_tag], + max_disk_ingestion_count); + return false; + } + return true; +} + +void ingestion_context::node_context::add(const std::string &disk_tag) +{ + disk_ingesting_counts[disk_tag]++; + node_ingesting_count++; +} + +void ingestion_context::node_context::decrease(const std::string &disk_tag) +{ + node_ingesting_count--; + disk_ingesting_counts[disk_tag]--; +} + +bool ingestion_context::try_partition_ingestion(const partition_configuration &config, + const config_context &cc) +{ + FAIL_POINT_INJECT_F("ingestion_try_partition_ingestion", [=](string_view) -> bool { + auto info = partition_node_info(); + info.pid = config.pid; + _running_partitions[config.pid] = info; + return true; + }); + partition_node_info info(config, cc); + for (const auto &kv : info.node_disk) { + if (!check_node_ingestion(kv.first, kv.second)) { + return false; + } + } + add_partition(info); + return true; +} + +bool ingestion_context::check_node_ingestion(const rpc_address &node, const std::string &disk_tag) +{ + if (_nodes_context.find(node) == _nodes_context.end()) { + _nodes_context[node] = node_context(node, disk_tag); + } + return _nodes_context[node].check_if_add(disk_tag); +} + +void ingestion_context::add_partition(const partition_node_info &info) +{ + for (const auto &kv : info.node_disk) { + _nodes_context[kv.first].add(kv.second); + } + _running_partitions[info.pid] = info; +} + +void ingestion_context::remove_partition(const gpid &pid) +{ + FAIL_POINT_INJECT_F("ingestion_context_remove_partition", + [=](string_view) { _running_partitions.erase(pid); }); + + if (_running_partitions.find(pid) == _running_partitions.end()) { + return; + } + auto &info = _running_partitions[pid]; + for (const auto &kv : info.node_disk) { + _nodes_context[kv.first].decrease(kv.second); + } + _running_partitions.erase(pid); +} + +uint32_t ingestion_context::get_app_ingesting_count(const uint32_t app_id) const +{ + uint32_t running_count = 0; + for (const auto &kv : _running_partitions) { + if (kv.first.get_app_id() == app_id) { + running_count++; + } + } + return running_count; +} + +void ingestion_context::reset_app(const uint32_t app_id) +{ + std::unordered_set removing_partitions; + for (const auto &kv : _running_partitions) { + if (kv.first.get_app_id() == app_id) { + removing_partitions.insert(kv.first); + } + } + for (const auto &pid : removing_partitions) { + remove_partition(pid); + } +} + +void ingestion_context::reset_all() +{ + _running_partitions.clear(); + _nodes_context.clear(); +} + +} // namespace replication +} // namespace dsn diff --git a/src/meta/meta_bulk_load_ingestion_context.h b/src/meta/meta_bulk_load_ingestion_context.h new file mode 100644 index 0000000000..83107a0988 --- /dev/null +++ b/src/meta/meta_bulk_load_ingestion_context.h @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "meta_service.h" +#include "server_state.h" + +namespace dsn { +namespace replication { + +DSN_DECLARE_uint32(bulk_load_node_max_ingesting_count); +DSN_DECLARE_uint32(bulk_load_node_min_disk_count); + +// Meta bulk load helper class, used to manage ingesting partitions +class ingestion_context +{ +public: + explicit ingestion_context(); + ~ingestion_context(); + +private: + struct partition_node_info + { + gpid pid; + // node address -> disk_tag + std::unordered_map node_disk; + + partition_node_info() {} + partition_node_info(const partition_configuration &config, const config_context &cc) + { + create(config, cc); + } + void create(const partition_configuration &config, const config_context &cc); + }; + + struct node_context + { + rpc_address address; + uint32_t node_ingesting_count; + // disk tag -> ingesting partition count + std::unordered_map disk_ingesting_counts; + + node_context() {} + node_context(const rpc_address &address, const std::string &disk_tag) + : address(address), node_ingesting_count(0) + { + init_disk(disk_tag); + } + + void init_disk(const std::string &disk_tag); + uint32_t get_max_disk_ingestion_count(const uint32_t max_node_ingestion_count) const; + bool check_if_add(const std::string &disk_tag); + void add(const std::string &disk_tag); + void decrease(const std::string &disk_tag); + }; + + bool try_partition_ingestion(const partition_configuration &config, const config_context &cc); + bool check_node_ingestion(const rpc_address &node, const std::string &disk_tag); + void add_partition(const partition_node_info &info); + void remove_partition(const gpid &pid); + uint32_t get_app_ingesting_count(const uint32_t app_id) const; + void reset_app(const uint32_t app_id); + void reset_all(); + +private: + friend class bulk_load_service; + friend class node_context_test; + friend class ingestion_context_test; + + // ingesting partitions + std::unordered_map _running_partitions; + // every node and every disk ingesting partition count + std::unordered_map _nodes_context; +}; + +} // namespace replication +} // namespace dsn diff --git a/src/meta/test/meta_bulk_load_ingestion_test.cpp b/src/meta/test/meta_bulk_load_ingestion_test.cpp new file mode 100644 index 0000000000..783a97bfb3 --- /dev/null +++ b/src/meta/test/meta_bulk_load_ingestion_test.cpp @@ -0,0 +1,390 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include + +#include "meta_test_base.h" +#include "meta_service_test_app.h" +#include "meta/meta_bulk_load_ingestion_context.h" +#include "meta/meta_data.h" + +namespace dsn { +namespace replication { + +class node_context_test : public meta_test_base +{ +public: + void SetUp() + { + _context = ingestion_context::node_context(); + _context.node_ingesting_count = 0; + _context.address = NODE; + FLAGS_bulk_load_node_max_ingesting_count = 1; + FLAGS_bulk_load_node_min_disk_count = 1; + } + + void TearDown() + { + _context.disk_ingesting_counts.clear(); + _context.node_ingesting_count = 0; + } + + void mock_context(const std::string &disk_tag, + const uint32_t disk_count = 0, + const uint32_t total_count = 0) + { + _context.node_ingesting_count = total_count; + _context.disk_ingesting_counts[disk_tag] = disk_count; + } + + void init_disk(const std::string &disk_tag) { _context.init_disk(disk_tag); } + + uint32_t get_disk_count(const std::string &disk_tag) + { + if (_context.disk_ingesting_counts.find(disk_tag) == _context.disk_ingesting_counts.end()) { + return -1; + } + return _context.disk_ingesting_counts[disk_tag]; + } + + void mock_get_max_disk_ingestion_count(const uint32_t node_min_disk_count, + const uint32_t current_disk_count) + { + FLAGS_bulk_load_node_min_disk_count = node_min_disk_count; + _context.disk_ingesting_counts.clear(); + for (auto i = 0; i < current_disk_count; i++) { + _context.init_disk(std::to_string(i)); + } + } + + uint32_t get_max_disk_ingestion_count(const uint32_t max_node_count) const + { + return _context.get_max_disk_ingestion_count(max_node_count); + } + + bool check_if_add() { return _context.check_if_add(TAG); } + +public: + ingestion_context::node_context _context; + const rpc_address NODE = rpc_address("127.0.0.1", 10086); + const std::string TAG = "default"; + const std::string TAG2 = "tag2"; +}; + +TEST_F(node_context_test, init_disk_test) +{ + mock_context(TAG, 1, 1); + struct init_disk_test + { + std::string disk_tag; + uint32_t expected_disk_count; + } tests[] = {{TAG, 1}, {TAG2, 0}}; + for (const auto &test : tests) { + init_disk(test.disk_tag); + ASSERT_EQ(get_disk_count(test.disk_tag), test.expected_disk_count); + } +} + +TEST_F(node_context_test, get_max_disk_ingestion_count_test) +{ + struct get_max_disk_ingestion_count_test + { + uint32_t max_node_count; + uint32_t min_disk_count; + uint32_t current_disk_count; + uint32_t expected_count; + } tests[] = {// min_disk_count = 1 + {1, 1, 1, 1}, + {2, 1, 1, 2}, + // min_disk_count = 3 + {1, 3, 1, 1}, + {4, 3, 3, 2}, + // min_disk_count = 7 + {1, 7, 1, 1}, + {1, 7, 11, 1}, + {7, 7, 1, 1}, + {7, 7, 11, 1}, + {8, 7, 3, 2}, + {8, 7, 8, 1}, + {8, 7, 11, 1}}; + for (const auto &test : tests) { + mock_get_max_disk_ingestion_count(test.min_disk_count, test.current_disk_count); + ASSERT_EQ(get_max_disk_ingestion_count(test.max_node_count), test.expected_count); + } +} + +TEST_F(node_context_test, check_if_add_test) +{ + fail::setup(); + struct check_if_add_test + { + const uint32_t max_node_count; + const uint32_t current_node_count; + std::string max_disk_count_str; + const uint32_t current_disk_count; + bool expected_result; + } tests[] = {{1, 1, "1", 1, false}, {3, 2, "2", 2, false}, {1, 0, "7", 0, true}}; + for (const auto &test : tests) { + FLAGS_bulk_load_node_max_ingesting_count = test.max_node_count; + mock_context(TAG, test.current_disk_count, test.current_node_count); + auto str = "return(" + test.max_disk_count_str + ")"; + fail::cfg("ingestion_node_context_disk_count", str); + ASSERT_EQ(check_if_add(), test.expected_result); + } + fail::teardown(); +} + +class ingestion_context_test : public meta_test_base +{ +public: + /// mock app and node info context + /// node1 node2 node3 node4 + /// p0(tag1) s0(tag1) s0(tag2) + /// s1(tag1) s1(tag2) p1(tag2) + /// s2(tag2) p2(tag1) s2(tag1) + /// p3(tag1) s3(tag1) s3(tag2) + void SetUp() + { + _context = make_unique(); + add_node_context({NODE1, NODE2, NODE3, NODE4}); + mock_app(); + FLAGS_bulk_load_node_min_disk_count = MIN_DISK_COUNT; + FLAGS_bulk_load_node_max_ingesting_count = MAX_NODE_COUNT; + } + + void TearDown() { _context->reset_all(); } + + void update_max_node_count(const uint32_t max_node_count) + { + FLAGS_bulk_load_node_max_ingesting_count = max_node_count; + } + + bool check_node_ingestion(const uint32_t max_node_count, + const rpc_address &node, + const std::string &tag) + { + _context->reset_all(); + update_max_node_count(max_node_count); + _context->_nodes_context[NODE1] = ingestion_context::node_context(NODE1, TAG1); + _context->_nodes_context[NODE1].add(TAG1); + return _context->check_node_ingestion(node, tag); + } + + void mock_app() + { + app_info ainfo; + ainfo.app_id = APP_ID; + ainfo.partition_count = PARTITION_COUNT; + _app = std::make_shared(ainfo); + _app->partitions.reserve(PARTITION_COUNT); + _app->helpers->contexts.reserve(PARTITION_COUNT); + mock_partition(0, + {NODE1, NODE2, NODE3}, + {TAG1, TAG1, TAG2}, + _app->partitions[0], + _app->helpers->contexts[0]); + mock_partition(1, + {NODE4, NODE1, NODE2}, + {TAG2, TAG1, TAG2}, + _app->partitions[1], + _app->helpers->contexts[1]); + mock_partition(2, + {NODE3, NODE1, NODE4}, + {TAG1, TAG2, TAG1}, + _app->partitions[2], + _app->helpers->contexts[2]); + mock_partition(3, + {NODE2, NODE3, NODE4}, + {TAG1, TAG1, TAG2}, + _app->partitions[3], + _app->helpers->contexts[3]); + } + + void mock_partition(const uint32_t pidx, + std::vector nodes, + const std::vector tags, + partition_configuration &config, + config_context &cc) + { + config.pid = gpid(APP_ID, pidx); + config.primary = nodes[0]; + config.secondaries.emplace_back(nodes[1]); + config.secondaries.emplace_back(nodes[2]); + + auto count = nodes.size(); + for (auto i = 0; i < count; i++) { + serving_replica r; + r.node = nodes[i]; + r.disk_tag = tags[i]; + cc.serving.emplace_back(r); + } + } + + void add_node_context(std::vector nodes) + { + for (const auto &address : nodes) { + ingestion_context::node_context node(address, TAG1); + node.init_disk(TAG2); + _context->_nodes_context[address] = node; + } + } + + bool try_partition_ingestion(const uint32_t pidx) + { + return _context->try_partition_ingestion(_app->partitions[pidx], + _app->helpers->contexts[pidx]); + } + + void add_partition(const uint32_t pidx) + { + auto pinfo = ingestion_context::partition_node_info(_app->partitions[pidx], + _app->helpers->contexts[pidx]); + _context->add_partition(pinfo); + } + + void remove_partition(const uint32_t pidx) { _context->remove_partition(gpid(APP_ID, pidx)); } + + bool is_partition_ingesting(const uint32_t pidx) const + { + return _context->_running_partitions.find(gpid(APP_ID, pidx)) != + _context->_running_partitions.end(); + } + + uint32_t get_app_ingesting_count() const { return _context->get_app_ingesting_count(APP_ID); } + + void reset_app() { return _context->reset_app(APP_ID); } + + int32_t get_node_running_count(const rpc_address &node) + { + if (_context->_nodes_context.find(node) == _context->_nodes_context.end()) { + return 0; + } + return _context->_nodes_context[node].node_ingesting_count; + } + + uint32_t get_disk_running_count(const rpc_address &node, const std::string &disk_tag) + { + if (_context->_nodes_context.find(node) == _context->_nodes_context.end()) { + return 0; + } + auto node_cc = _context->_nodes_context[node]; + if (node_cc.disk_ingesting_counts.find(disk_tag) == node_cc.disk_ingesting_counts.end()) { + return 0; + } + return node_cc.disk_ingesting_counts[disk_tag]; + } + + bool validate_count(const rpc_address &node, + const uint32_t expected_node_count, + const uint32_t expected_disk1_count, + const uint32_t expected_disk2_count) + { + return get_node_running_count(node) == expected_node_count && + get_disk_running_count(node, TAG1) == expected_disk1_count && + get_disk_running_count(node, TAG2) == expected_disk2_count; + } + +public: + std::unique_ptr _context; + std::shared_ptr _app; + const uint32_t APP_ID = 1; + const uint32_t PARTITION_COUNT = 4; + const uint32_t MAX_NODE_COUNT = 2; + const uint32_t MIN_DISK_COUNT = 2; + const rpc_address NODE1 = rpc_address("127.0.0.1", 10086); + const rpc_address NODE2 = rpc_address("127.0.0.1", 10085); + const rpc_address NODE3 = rpc_address("127.0.0.1", 10087); + const rpc_address NODE4 = rpc_address("127.0.0.1", 10088); + const std::string TAG1 = "tag1"; + const std::string TAG2 = "tag2"; +}; + +TEST_F(ingestion_context_test, check_node_ingestion_test) +{ + struct check_node_ingestion_test + { + rpc_address node; + std::string tag; + uint32_t max_node_count; + bool expected_result; + } tests[] = {{NODE2, TAG1, 1, true}, {NODE1, TAG2, 2, true}, {NODE1, TAG2, 1, false}}; + for (const auto &test : tests) { + ASSERT_EQ(check_node_ingestion(test.max_node_count, test.node, test.tag), + test.expected_result); + } +} + +TEST_F(ingestion_context_test, try_partition_ingestion_test) +{ + update_max_node_count(1); + ASSERT_EQ(try_partition_ingestion(0), true); + ASSERT_EQ(try_partition_ingestion(1), false); + + update_max_node_count(2); + ASSERT_EQ(try_partition_ingestion(1), false); + ASSERT_EQ(try_partition_ingestion(2), true); + ASSERT_EQ(try_partition_ingestion(3), false); + + update_max_node_count(3); + ASSERT_EQ(try_partition_ingestion(1), true); + ASSERT_EQ(try_partition_ingestion(3), true); + + ASSERT_EQ(get_app_ingesting_count(), 4); +} + +TEST_F(ingestion_context_test, operation_test) +{ + ASSERT_FALSE(is_partition_ingesting(0)); + add_partition(0); + ASSERT_TRUE(is_partition_ingesting(0)); + ASSERT_TRUE(validate_count(NODE1, 1, 1, 0)); + ASSERT_TRUE(validate_count(NODE2, 1, 1, 0)); + ASSERT_TRUE(validate_count(NODE3, 1, 0, 1)); + ASSERT_TRUE(validate_count(NODE4, 0, 0, 0)); + ASSERT_EQ(get_app_ingesting_count(), 1); + + ASSERT_FALSE(is_partition_ingesting(1)); + add_partition(1); + ASSERT_TRUE(is_partition_ingesting(1)); + ASSERT_TRUE(validate_count(NODE1, 2, 2, 0)); + ASSERT_TRUE(validate_count(NODE2, 2, 1, 1)); + ASSERT_TRUE(validate_count(NODE3, 1, 0, 1)); + ASSERT_TRUE(validate_count(NODE4, 1, 0, 1)); + ASSERT_EQ(get_app_ingesting_count(), 2); + + add_partition(2); + remove_partition(0); + ASSERT_TRUE(is_partition_ingesting(2)); + ASSERT_FALSE(is_partition_ingesting(0)); + ASSERT_TRUE(validate_count(NODE1, 2, 1, 1)); + ASSERT_TRUE(validate_count(NODE2, 1, 0, 1)); + ASSERT_TRUE(validate_count(NODE3, 1, 1, 0)); + ASSERT_TRUE(validate_count(NODE4, 2, 1, 1)); + ASSERT_EQ(get_app_ingesting_count(), 2); + + reset_app(); + ASSERT_TRUE(validate_count(NODE1, 0, 0, 0)); + ASSERT_TRUE(validate_count(NODE2, 0, 0, 0)); + ASSERT_TRUE(validate_count(NODE3, 0, 0, 0)); + ASSERT_TRUE(validate_count(NODE4, 0, 0, 0)); + ASSERT_EQ(get_app_ingesting_count(), 0); +} + +} // namespace replication +} // namespace dsn