diff --git a/src/meta/meta_service.h b/src/meta/meta_service.h index 0630c68131..309fc06515 100644 --- a/src/meta/meta_service.h +++ b/src/meta/meta_service.h @@ -332,6 +332,7 @@ class meta_service : public serverlet friend class meta_partition_guardian_test; friend class meta_service_test; friend class meta_service_test_app; + friend class server_state_test; friend class meta_split_service_test; friend class meta_test_base; friend class policy_context_test; diff --git a/src/meta/server_state.cpp b/src/meta/server_state.cpp index 8d7c97f2f1..fe18206885 100644 --- a/src/meta/server_state.cpp +++ b/src/meta/server_state.cpp @@ -28,14 +28,15 @@ #include // IWYU pragma: no_include #include -#include #include #include #include #include +#include #include #include // IWYU pragma: keep #include +#include #include #include @@ -75,6 +76,7 @@ #include "utils/blob.h" #include "utils/command_manager.h" #include "utils/config_api.h" +#include "utils/fail_point.h" #include "utils/flags.h" #include "utils/fmt_logging.h" #include "utils/metrics.h" @@ -1004,7 +1006,7 @@ void server_state::query_configuration_by_index(const query_cfg_request &request /*out*/ query_cfg_response &response) { zauto_read_lock l(_lock); - auto iter = _exist_apps.find(request.app_name.c_str()); + auto iter = _exist_apps.find(request.app_name); if (iter == _exist_apps.end()) { response.err = ERR_OBJECT_NOT_FOUND; return; @@ -2990,12 +2992,13 @@ void server_state::set_app_envs(const app_env_rpc &env_rpc) LOG_WARNING("set app envs failed with invalid request"); return; } + const std::vector &keys = request.keys; const std::vector &values = request.values; const std::string &app_name = request.app_name; std::ostringstream os; - for (int i = 0; i < keys.size(); i++) { + for (size_t i = 0; i < keys.size(); ++i) { if (i != 0) { os << ", "; } @@ -3012,6 +3015,7 @@ void server_state::set_app_envs(const app_env_rpc &env_rpc) os << keys[i] << "=" << values[i]; } + LOG_INFO("set app envs for app({}) from remote({}): kvs = {}", app_name, env_rpc.remote_address(), @@ -3020,31 +3024,84 @@ void server_state::set_app_envs(const app_env_rpc &env_rpc) app_info ainfo; std::string app_path; { + FAIL_POINT_INJECT_NOT_RETURN_F("set_app_envs_failed", [app_name, this](std::string_view s) { + zauto_write_lock l(_lock); + + if (s == "not_found") { + CHECK_EQ(_exist_apps.erase(app_name), 1); + return; + } + + if (s == "dropping") { + gutil::FindOrDie(_exist_apps, app_name)->status = app_status::AS_DROPPING; + return; + } + }); + zauto_read_lock l(_lock); - std::shared_ptr app = get_app(app_name); - if (app == nullptr) { - LOG_WARNING("set app envs failed with invalid app_name({})", app_name); - env_rpc.response().err = ERR_INVALID_PARAMETERS; - env_rpc.response().hint_message = "invalid app name"; + + const auto &app = get_app(app_name); + if (!app) { + LOG_WARNING("set app envs failed since app_name({}) cannot be found", app_name); + env_rpc.response().err = ERR_APP_NOT_EXIST; + env_rpc.response().hint_message = "app cannot be found"; + return; + } + + if (app->status == app_status::AS_DROPPING) { + LOG_WARNING("set app envs failed since app(name={}, id={}) is being dropped", + app_name, + app->app_id); + env_rpc.response().err = ERR_BUSY_DROPPING; + env_rpc.response().hint_message = "app is being dropped"; return; - } else { - ainfo = *(reinterpret_cast(app.get())); - app_path = get_app_path(*app); } + + ainfo = *app; + app_path = get_app_path(*app); } - for (int idx = 0; idx < keys.size(); idx++) { + + for (size_t idx = 0; idx < keys.size(); ++idx) { ainfo.envs[keys[idx]] = values[idx]; } + do_update_app_info(app_path, ainfo, [this, app_name, keys, values, env_rpc](error_code ec) { - CHECK_EQ_MSG(ec, ERR_OK, "update app info to remote storage failed"); + CHECK_EQ_MSG(ec, ERR_OK, "update app({}) info to remote storage failed", app_name); zauto_write_lock l(_lock); + + FAIL_POINT_INJECT_NOT_RETURN_F("set_app_envs_failed", [app_name, this](std::string_view s) { + if (s == "dropped_after") { + CHECK_EQ(_exist_apps.erase(app_name), 1); + return; + } + }); + std::shared_ptr app = get_app(app_name); - std::string old_envs = dsn::utils::kv_map_to_string(app->envs, ',', '='); - for (int idx = 0; idx < keys.size(); idx++) { + + // The table might be removed just before the callback function is invoked, thus we must + // check if this table still exists. + // + // TODO(wangdan): should make updates to remote storage sequential by supporting atomic + // set, otherwise update might be missing. For example, an update is setting the envs + // while another is dropping a table. The update setting the envs does not contain the + // dropped state. Once it is applied by remote storage after another update dropping + // the table, the state of the table would always be non-dropped on remote storage. + if (!app) { + LOG_ERROR("set app envs failed since app({}) has just been dropped", app_name); + env_rpc.response().err = ERR_APP_DROPPED; + env_rpc.response().hint_message = "app has just been dropped"; + return; + } + + const auto &old_envs = dsn::utils::kv_map_to_string(app->envs, ',', '='); + + // Update envs of local memory. + for (size_t idx = 0; idx < keys.size(); ++idx) { app->envs[keys[idx]] = values[idx]; } - std::string new_envs = dsn::utils::kv_map_to_string(app->envs, ',', '='); + + const auto &new_envs = dsn::utils::kv_map_to_string(app->envs, ',', '='); LOG_INFO("app envs changed: old_envs = {}, new_envs = {}", old_envs, new_envs); }); } diff --git a/src/meta/server_state.h b/src/meta/server_state.h index ea9a418bad..56c7d6c039 100644 --- a/src/meta/server_state.h +++ b/src/meta/server_state.h @@ -29,7 +29,7 @@ // IWYU pragma: no_include #include #include -#include +#include #include #include #include @@ -42,6 +42,7 @@ #include "common/gpid.h" #include "common/manual_compact.h" #include "dsn.layer2_types.h" +#include "gutil/map_util.h" #include "meta/meta_rpc_types.h" #include "meta_data.h" #include "table_metrics.h" @@ -140,20 +141,17 @@ class server_state void lock_read(zauto_read_lock &other); void lock_write(zauto_write_lock &other); - const meta_view get_meta_view() { return {&_all_apps, &_nodes}; } - std::shared_ptr get_app(const std::string &name) const + + meta_view get_meta_view() { return {&_all_apps, &_nodes}; } + + std::shared_ptr get_app(const std::string &app_name) const { - auto iter = _exist_apps.find(name); - if (iter == _exist_apps.end()) - return nullptr; - return iter->second; + return gutil::FindWithDefault(_exist_apps, app_name); } + std::shared_ptr get_app(int32_t app_id) const { - auto iter = _all_apps.find(app_id); - if (iter == _all_apps.end()) - return nullptr; - return iter->second; + return gutil::FindWithDefault(_all_apps, app_id); } void query_configuration_by_index(const query_cfg_request &request, @@ -409,6 +407,7 @@ class server_state friend class meta_split_service; friend class meta_split_service_test; friend class meta_service_test_app; + friend class server_state_test; friend class meta_test_base; friend class test::test_checker; friend class server_state_restore_test; diff --git a/src/meta/test/meta_service_test_app.h b/src/meta/test/meta_service_test_app.h index 1171315502..2394701c5e 100644 --- a/src/meta/test/meta_service_test_app.h +++ b/src/meta/test/meta_service_test_app.h @@ -137,7 +137,7 @@ class meta_service_test_app : public dsn::service_app void json_compacity(); // test server_state set_app_envs/del_app_envs/clear_app_envs - void app_envs_basic_test(); + static void app_envs_basic_test(); // test for bug found void adjust_dropped_size(); diff --git a/src/meta/test/server_state_test.cpp b/src/meta/test/server_state_test.cpp index b77890d314..d84d586fe4 100644 --- a/src/meta/test/server_state_test.cpp +++ b/src/meta/test/server_state_test.cpp @@ -24,7 +24,11 @@ * THE SOFTWARE. */ +#include +#include +#include #include +#include #include #include #include @@ -42,9 +46,11 @@ #include "meta/server_state.h" #include "meta_admin_types.h" #include "meta_service_test_app.h" +#include "rpc/rpc_holder.h" #include "rpc/rpc_message.h" #include "rpc/serialization.h" #include "utils/error_code.h" +#include "utils/fail_point.h" #include "utils/flags.h" DSN_DECLARE_string(cluster_root); @@ -97,138 +103,259 @@ static std::string acquire_prefix(const std::string &str) } } +class server_state_test +{ +public: + server_state_test() : _ms(create_meta_service()), _ss(create_server_state(_ms.get())) {} + + void load_apps(const std::vector &app_names) + { + const auto &apps = fake_apps(app_names); + for (const auto &[_, app] : apps) { + _ss->_all_apps.emplace(std::make_pair(app->app_id, app)); + } + + ASSERT_EQ(dsn::ERR_OK, _ss->sync_apps_to_remote_storage()); + } + + [[nodiscard]] std::shared_ptr get_app(const std::string &app_name) const + { + return _ss->get_app(app_name); + } + + app_env_rpc set_app_envs(const configuration_update_app_env_request &request) + { + auto rpc = create_app_env_rpc(request); + _ss->set_app_envs(rpc); + _ss->wait_all_task(); + + return rpc; + } + + app_env_rpc del_app_envs(const configuration_update_app_env_request &request) + { + auto rpc = create_app_env_rpc(request); + _ss->del_app_envs(rpc); + _ss->wait_all_task(); + + return rpc; + } + + app_env_rpc clear_app_envs(const configuration_update_app_env_request &request) + { + auto rpc = create_app_env_rpc(request); + _ss->clear_app_envs(rpc); + _ss->wait_all_task(); + + return rpc; + } + +private: + static std::shared_ptr fake_app_state(const std::string &app_name, + const int32_t app_id) + { + dsn::app_info info; + info.is_stateful = true; + info.app_id = app_id; + info.app_type = "simple_kv"; + info.app_name = app_name; + info.max_replica_count = 3; + info.partition_count = 32; + info.status = dsn::app_status::AS_CREATING; + info.envs.clear(); + return app_state::create(info); + } + + static std::map> + fake_apps(const std::vector &app_names) + { + std::map> apps; + + int32_t app_id = 1; + std::transform(app_names.begin(), + app_names.end(), + std::inserter(apps, apps.end()), + [&app_id](const std::string &app_name) { + return std::make_pair(app_name, fake_app_state(app_name, app_id++)); + }); + + return apps; + } + + static std::unique_ptr create_meta_service() + { + auto ms = std::make_unique(); + + FLAGS_cluster_root = "/meta_test"; + FLAGS_meta_state_service_type = "meta_state_service_simple"; + ms->remote_storage_initialize(); + + return ms; + } + + static std::shared_ptr create_server_state(meta_service *ms) + { + std::string apps_root("/meta_test/apps"); + const auto &ss = ms->_state; + ss->initialize(ms, apps_root); + + return ss; + } + + static app_env_rpc create_app_env_rpc(const configuration_update_app_env_request &request) + { + dsn::message_ptr binary_req(dsn::message_ex::create_request(RPC_CM_UPDATE_APP_ENV)); + dsn::marshall(binary_req, request); + dsn::message_ex *recv_msg = create_corresponding_receive(binary_req); + return app_env_rpc(recv_msg); // Don't need to reply. + } + + std::unique_ptr _ms; + std::shared_ptr _ss; +}; + void meta_service_test_app::app_envs_basic_test() { - // create a fake app - dsn::app_info info; - info.is_stateful = true; - info.app_id = 1; - info.app_type = "simple_kv"; - info.app_name = "test_app1"; - info.max_replica_count = 3; - info.partition_count = 32; - info.status = dsn::app_status::AS_CREATING; - info.envs.clear(); - std::shared_ptr fake_app = app_state::create(info); - - // create meta_service - std::shared_ptr meta_svc = std::make_shared(); - meta_service *svc = meta_svc.get(); - - FLAGS_cluster_root = "/meta_test"; - FLAGS_meta_state_service_type = "meta_state_service_simple"; - svc->remote_storage_initialize(); - - std::string apps_root = "/meta_test/apps"; - std::shared_ptr ss = svc->_state; - ss->initialize(svc, apps_root); - - ss->_all_apps.emplace(std::make_pair(fake_app->app_id, fake_app)); - ASSERT_EQ(dsn::ERR_OK, ss->sync_apps_to_remote_storage()); - - std::cout << "test server_state::set_app_envs()..." << std::endl; + server_state_test test; + test.load_apps({"test_app1", + "test_set_app_envs_not_found", + "test_set_app_envs_dropping", + "test_set_app_envs_dropped_after"}); + +#define TEST_SET_APP_ENVS_FAILED(action, err_code) \ + std::cout << "test server_state::set_app_envs(" #action ")..." << std::endl; \ + do { \ + configuration_update_app_env_request request; \ + request.__set_app_name("test_set_app_envs_" #action); \ + request.__set_op(app_env_operation::type::APP_ENV_OP_SET); \ + request.__set_keys({replica_envs::ROCKSDB_WRITE_BUFFER_SIZE}); \ + request.__set_values({"67108864"}); \ + \ + fail::setup(); \ + fail::cfg("set_app_envs_failed", "void(" #action ")"); \ + \ + auto rpc = test.set_app_envs(request); \ + ASSERT_EQ(err_code, rpc.response().err); \ + \ + fail::teardown(); \ + } while (0) + + // Failed to setting envs while table was not found. + TEST_SET_APP_ENVS_FAILED(not_found, ERR_APP_NOT_EXIST); + + // Failed to setting envs while table was being dropped as the intermediate state. + TEST_SET_APP_ENVS_FAILED(dropping, ERR_BUSY_DROPPING); + + // The table was found dropped after the new envs had been persistent on the remote + // meta storage. + TEST_SET_APP_ENVS_FAILED(dropped_after, ERR_APP_DROPPED); + +#undef TEST_SET_APP_ENVS_FAILED + + // Normal case for setting envs. + std::cout << "test server_state::set_app_envs(success)..." << std::endl; { configuration_update_app_env_request request; - request.__set_app_name(fake_app->app_name); + request.__set_app_name("test_app1"); request.__set_op(app_env_operation::type::APP_ENV_OP_SET); request.__set_keys(keys); request.__set_values(values); - dsn::message_ptr binary_req = dsn::message_ex::create_request(RPC_CM_UPDATE_APP_ENV); - dsn::marshall(binary_req, request); - dsn::message_ex *recv_msg = create_corresponding_receive(binary_req); - app_env_rpc rpc(recv_msg); // don't need reply - ss->set_app_envs(rpc); - ss->wait_all_task(); - std::shared_ptr app = ss->get_app(fake_app->app_name); - ASSERT_TRUE(app != nullptr); - for (int idx = 0; idx < keys.size(); idx++) { - const std::string &key = keys[idx]; - ASSERT_EQ(app->envs.count(key), 1); - ASSERT_EQ(app->envs.at(key), values[idx]); + auto rpc = test.set_app_envs(request); + ASSERT_EQ(ERR_OK, rpc.response().err); + + const auto &app = test.get_app("test_app1"); + ASSERT_TRUE(app); + + for (size_t idx = 0; idx < keys.size(); ++idx) { + const auto &key = keys[idx]; + + // Every env should be inserted. + ASSERT_EQ(1, app->envs.count(key)); + ASSERT_EQ(values[idx], app->envs.at(key)); } } std::cout << "test server_state::del_app_envs()..." << std::endl; { configuration_update_app_env_request request; - request.__set_app_name(fake_app->app_name); + request.__set_app_name("test_app1"); request.__set_op(app_env_operation::type::APP_ENV_OP_DEL); request.__set_keys(del_keys); - dsn::message_ptr binary_req = dsn::message_ex::create_request(RPC_CM_UPDATE_APP_ENV); - dsn::marshall(binary_req, request); - dsn::message_ex *recv_msg = create_corresponding_receive(binary_req); - app_env_rpc rpc(recv_msg); // don't need reply - ss->del_app_envs(rpc); - ss->wait_all_task(); + auto rpc = test.del_app_envs(request); + ASSERT_EQ(ERR_OK, rpc.response().err); - std::shared_ptr app = ss->get_app(fake_app->app_name); - ASSERT_TRUE(app != nullptr); - for (int idx = 0; idx < keys.size(); idx++) { + const auto &app = test.get_app("test_app1"); + ASSERT_TRUE(app); + + for (size_t idx = 0; idx < keys.size(); ++idx) { const std::string &key = keys[idx]; - if (del_keys_set.count(key) >= 1) { - ASSERT_EQ(app->envs.count(key), 0); - } else { - ASSERT_EQ(app->envs.count(key), 1); - ASSERT_EQ(app->envs.at(key), values[idx]); + if (del_keys_set.count(key) > 0) { + // The env in `del_keys_set` should be deleted. + ASSERT_EQ(0, app->envs.count(key)); + continue; } + + // The env should still exist if it is not in `del_keys_set`. + ASSERT_EQ(1, app->envs.count(key)); + ASSERT_EQ(values[idx], app->envs.at(key)); } } std::cout << "test server_state::clear_app_envs()..." << std::endl; { - // test specify prefix + // Test specifying prefix. { configuration_update_app_env_request request; - request.__set_app_name(fake_app->app_name); + request.__set_app_name("test_app1"); request.__set_op(app_env_operation::type::APP_ENV_OP_CLEAR); request.__set_clear_prefix(clear_prefix); - dsn::message_ptr binary_req = dsn::message_ex::create_request(RPC_CM_UPDATE_APP_ENV); - dsn::marshall(binary_req, request); - dsn::message_ex *recv_msg = create_corresponding_receive(binary_req); - app_env_rpc rpc(recv_msg); // don't need reply - ss->clear_app_envs(rpc); - ss->wait_all_task(); + auto rpc = test.clear_app_envs(request); + ASSERT_EQ(ERR_OK, rpc.response().err); - std::shared_ptr app = ss->get_app(fake_app->app_name); - ASSERT_TRUE(app != nullptr); - for (int idx = 0; idx < keys.size(); idx++) { + const auto &app = test.get_app("test_app1"); + ASSERT_TRUE(app); + + for (size_t idx = 0; idx < keys.size(); ++idx) { const std::string &key = keys[idx]; - if (del_keys_set.count(key) <= 0) { - if (acquire_prefix(key) == clear_prefix) { - ASSERT_EQ(app->envs.count(key), 0); - } else { - ASSERT_EQ(app->envs.count(key), 1); - ASSERT_EQ(app->envs.at(key), values[idx]); - } - } else { - // key already delete - ASSERT_EQ(app->envs.count(key), 0); + if (del_keys_set.count(key) > 0) { + // The env should have been deleted during test for `del_app_envs`. + ASSERT_EQ(0, app->envs.count(key)); + continue; + } + + if (acquire_prefix(key) == clear_prefix) { + // The env with specified prefix should be deleted. + ASSERT_EQ(0, app->envs.count(key)); + continue; } + + // Otherwise, the env should still exist. + ASSERT_EQ(1, app->envs.count(key)); + ASSERT_EQ(values[idx], app->envs.at(key)); } } - // test clear all + // Test clearing all. { configuration_update_app_env_request request; - request.__set_app_name(fake_app->app_name); + request.__set_app_name("test_app1"); request.__set_op(app_env_operation::type::APP_ENV_OP_CLEAR); request.__set_clear_prefix(""); - dsn::message_ptr binary_req = dsn::message_ex::create_request(RPC_CM_UPDATE_APP_ENV); - dsn::marshall(binary_req, request); - dsn::message_ex *recv_msg = create_corresponding_receive(binary_req); - app_env_rpc rpc(recv_msg); // don't need reply - ss->clear_app_envs(rpc); - ss->wait_all_task(); + auto rpc = test.clear_app_envs(request); + ASSERT_EQ(ERR_OK, rpc.response().err); - std::shared_ptr app = ss->get_app(fake_app->app_name); - ASSERT_TRUE(app != nullptr); + const auto &app = test.get_app("test_app1"); + ASSERT_TRUE(app); + + // All envs should be cleared. ASSERT_TRUE(app->envs.empty()); } } } + } // namespace replication } // namespace dsn diff --git a/src/meta/test/state_sync_test.cpp b/src/meta/test/state_sync_test.cpp index fedb1ab384..d440c00dcb 100644 --- a/src/meta/test/state_sync_test.cpp +++ b/src/meta/test/state_sync_test.cpp @@ -393,7 +393,7 @@ void meta_service_test_app::construct_apps_test() generate_node_list(nodes, 1, 1); svc->_state->construct_apps({resp}, nodes, hint_message); - meta_view mv = svc->_state->get_meta_view(); + const meta_view mv = svc->_state->get_meta_view(); const app_mapper &mapper = *(mv.apps); ASSERT_EQ(6, mv.apps->size()); diff --git a/src/utils/fail_point.h b/src/utils/fail_point.h index 2c0e4111bf..f8ae60aefd 100644 --- a/src/utils/fail_point.h +++ b/src/utils/fail_point.h @@ -46,8 +46,9 @@ // argument, preprocess for this macro would fail for mismatched arguments. #define FAIL_POINT_INJECT_F(name, ...) \ do { \ - if (dsn_likely(!::dsn::fail::_S_FAIL_POINT_ENABLED)) \ + if (dsn_likely(!::dsn::fail::_S_FAIL_POINT_ENABLED)) { \ break; \ + } \ auto __Func = __VA_ARGS__; \ auto __Res = ::dsn::fail::eval(name); \ if (__Res != nullptr) { \ @@ -63,8 +64,9 @@ // argument, preprocess for this macro would fail for mismatched arguments. #define FAIL_POINT_INJECT_NOT_RETURN_F(name, ...) \ do { \ - if (dsn_likely(!::dsn::fail::_S_FAIL_POINT_ENABLED)) \ + if (dsn_likely(!::dsn::fail::_S_FAIL_POINT_ENABLED)) { \ break; \ + } \ auto __Func = __VA_ARGS__; \ auto __Res = ::dsn::fail::eval(name); \ if (__Res != nullptr) { \