Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(online_migration): part3 - support ingestion_behind for pegasus #862

Merged
merged 5 commits into from
Jan 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rdsn
2 changes: 2 additions & 0 deletions src/base/pegasus_const.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,6 @@ const std::string SPLIT_VALIDATE_PARTITION_HASH("replica.split.validate_partitio
const std::string USER_SPECIFIED_COMPACTION("user_specified_compaction");

const std::string READ_SIZE_THROTTLING("replica.read_throttling_by_size");

const std::string ROCKSDB_ALLOW_INGEST_BEHIND("rocksdb.allow_ingest_behind");
} // namespace pegasus
2 changes: 2 additions & 0 deletions src/base/pegasus_const.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,6 @@ extern const std::string SPLIT_VALIDATE_PARTITION_HASH;
extern const std::string USER_SPECIFIED_COMPACTION;

extern const std::string READ_SIZE_THROTTLING;

extern const std::string ROCKSDB_ALLOW_INGEST_BEHIND;
} // namespace pegasus
18 changes: 18 additions & 0 deletions src/server/pegasus_server_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1487,11 +1487,13 @@ ::dsn::error_code pegasus_server_impl::start(int argc, char **argv)
// only be initialized with default values when calling 'LoadLatestOptions', see
// 'rocksdb/utilities/options_util.h'.
reset_usage_scenario_options(loaded_data_cf_opts, &tmp_data_cf_opts);
_db_opts.allow_ingest_behind = parse_allow_ingest_behind(envs);
}
} else {
// When create new DB, we have to create a new column family to store meta data (meta column
// family).
_db_opts.create_missing_column_families = true;
_db_opts.allow_ingest_behind = parse_allow_ingest_behind(envs);
}

std::vector<rocksdb::ColumnFamilyDescriptor> column_families(
Expand Down Expand Up @@ -2695,6 +2697,22 @@ void pegasus_server_impl::update_user_specified_compaction(
}
}

bool pegasus_server_impl::parse_allow_ingest_behind(const std::map<std::string, std::string> &envs)
{
bool allow_ingest_behind = false;
const auto &iter = envs.find(ROCKSDB_ALLOW_INGEST_BEHIND);
if (iter == envs.end()) {
return allow_ingest_behind;
}
if (!dsn::buf2bool(iter->second, allow_ingest_behind)) {
dwarn_replica(
"{}={} is invalid, set allow_ingest_behind = false", iter->first, iter->second);
return false;
}
ddebug_replica("update allow_ingest_behind = {}", allow_ingest_behind);
return allow_ingest_behind;
}

bool pegasus_server_impl::parse_compression_types(
const std::string &config, std::vector<rocksdb::CompressionType> &compression_per_level)
{
Expand Down
2 changes: 2 additions & 0 deletions src/server/pegasus_server_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,8 @@ class pegasus_server_impl : public pegasus_read_service

void update_throttling_controller(const std::map<std::string, std::string> &envs);

bool parse_allow_ingest_behind(const std::map<std::string, std::string> &envs);

// return true if parse compression types 'config' success, otherwise return false.
// 'compression_per_level' will not be changed if parse failed.
bool parse_compression_types(const std::string &config,
Expand Down
2 changes: 1 addition & 1 deletion src/server/pegasus_server_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ void pegasus_server_write::init_non_batch_write_handlers()
{dsn::apps::RPC_RRDB_RRDB_BULK_LOAD,
[this](dsn::message_ex *request) -> int {
auto rpc = ingestion_rpc::auto_reply(request);
return _write_svc->ingestion_files(_decree, rpc.request(), rpc.response());
return _write_svc->ingest_files(_decree, rpc.request(), rpc.response());
}},
};
}
Expand Down
8 changes: 4 additions & 4 deletions src/server/pegasus_write_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -373,9 +373,9 @@ int pegasus_write_service::duplicate(int64_t decree,
return empty_put(ctx.decree);
}

int pegasus_write_service::ingestion_files(int64_t decree,
const dsn::replication::ingestion_request &req,
dsn::replication::ingestion_response &resp)
int pegasus_write_service::ingest_files(int64_t decree,
const dsn::replication::ingestion_request &req,
dsn::replication::ingestion_response &resp)
{
// TODO(heyuchen): consider cu

Expand All @@ -391,7 +391,7 @@ int pegasus_write_service::ingestion_files(int64_t decree,
_server->set_ingestion_status(dsn::replication::ingestion_status::IS_RUNNING);
dsn::tasking::enqueue(LPC_INGESTION, &_server->_tracker, [this, decree, req]() {
dsn::error_code err =
_impl->ingestion_files(decree, _server->bulk_load_dir(), req.metadata);
_impl->ingest_files(decree, _server->bulk_load_dir(), req.metadata, req.ingest_behind);
if (err == dsn::ERR_OK) {
_server->set_ingestion_status(dsn::replication::ingestion_status::IS_SUCCEED);
} else {
Expand Down
6 changes: 3 additions & 3 deletions src/server/pegasus_write_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,9 @@ class pegasus_write_service : dsn::replication::replica_base
dsn::apps::duplicate_response &resp);

// Execute bulk load ingestion
int ingestion_files(int64_t decree,
const dsn::replication::ingestion_request &req,
dsn::replication::ingestion_response &resp);
int ingest_files(int64_t decree,
const dsn::replication::ingestion_request &req,
dsn::replication::ingestion_response &resp);

/// For batch write.

Expand Down
10 changes: 6 additions & 4 deletions src/server/pegasus_write_service_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -483,9 +483,10 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
// \return ERR_WRONG_CHECKSUM: verify files failed
// \return ERR_INGESTION_FAILED: rocksdb ingestion failed
// \return ERR_OK: rocksdb ingestion succeed
dsn::error_code ingestion_files(const int64_t decree,
const std::string &bulk_load_dir,
const dsn::replication::bulk_load_metadata &metadata)
dsn::error_code ingest_files(const int64_t decree,
const std::string &bulk_load_dir,
const dsn::replication::bulk_load_metadata &metadata,
const bool ingest_behind)
{
// verify external files before ingestion
std::vector<std::string> sst_file_list;
Expand All @@ -495,7 +496,8 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
}

// ingest external files
if (dsn_unlikely(_rocksdb_wrapper->ingestion_files(decree, sst_file_list) != 0)) {
if (dsn_unlikely(_rocksdb_wrapper->ingest_files(decree, sst_file_list, ingest_behind) !=
0)) {
return dsn::ERR_INGESTION_FAILED;
}
return dsn::ERR_OK;
Expand Down
5 changes: 4 additions & 1 deletion src/server/rocksdb_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,13 @@ int rocksdb_wrapper::write_batch_delete(int64_t decree, dsn::string_view raw_key

void rocksdb_wrapper::clear_up_write_batch() { _write_batch->Clear(); }

int rocksdb_wrapper::ingestion_files(int64_t decree, const std::vector<std::string> &sst_file_list)
int rocksdb_wrapper::ingest_files(int64_t decree,
const std::vector<std::string> &sst_file_list,
const bool ingest_behind)
{
rocksdb::IngestExternalFileOptions ifo;
ifo.move_files = true;
ifo.ingest_behind = ingest_behind;
rocksdb::Status s = _db->IngestExternalFile(sst_file_list, ifo);
if (dsn_unlikely(!s.ok())) {
derror_rocksdb("IngestExternalFile", s.ToString(), "decree = {}", decree);
Expand Down
4 changes: 3 additions & 1 deletion src/server/rocksdb_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ class rocksdb_wrapper : public dsn::replication::replica_base
int write(int64_t decree);
int write_batch_delete(int64_t decree, dsn::string_view raw_key);
void clear_up_write_batch();
int ingestion_files(int64_t decree, const std::vector<std::string> &sst_file_list);
int ingest_files(int64_t decree,
const std::vector<std::string> &sst_file_list,
const bool ingest_behind);

void set_default_ttl(uint32_t ttl);

Expand Down
9 changes: 7 additions & 2 deletions src/shell/commands/bulk_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,19 @@ bool start_bulk_load(command_executor *e, shell_context *sc, arguments args)
{"cluster_name", required_argument, 0, 'c'},
{"file_provider_type", required_argument, 0, 'p'},
{"root_path", required_argument, 0, 'r'},
{"ingest_behind", no_argument, 0, 'i'},
{0, 0, 0, 0}};
std::string app_name;
std::string cluster_name;
std::string file_provider_type;
std::string remote_root_path;
bool ingest_behind = false;

optind = 0;
while (true) {
int option_index = 0;
int c;
c = getopt_long(args.argc, args.argv, "a:c:p:r:", long_options, &option_index);
c = getopt_long(args.argc, args.argv, "a:c:p:r:i", long_options, &option_index);
if (c == -1)
break;
switch (c) {
Expand All @@ -51,6 +53,9 @@ bool start_bulk_load(command_executor *e, shell_context *sc, arguments args)
case 'r':
remote_root_path = optarg;
break;
case 'i':
ingest_behind = true;
break;
default:
return false;
}
Expand All @@ -76,7 +81,7 @@ bool start_bulk_load(command_executor *e, shell_context *sc, arguments args)
}

auto err_resp = sc->ddl_client->start_bulk_load(
app_name, cluster_name, file_provider_type, remote_root_path);
app_name, cluster_name, file_provider_type, remote_root_path, ingest_behind);
dsn::error_s err = err_resp.get_error();
std::string hint_msg;
if (err.is_ok()) {
Expand Down
2 changes: 1 addition & 1 deletion src/shell/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ static command_executor commands[] = {
"start_bulk_load",
"start app bulk load",
"<-a --app_name str> <-c --cluster_name str> <-p --file_provider_type str> <-r "
"--root_path>",
"--root_path> [-i --ingest_behind]",
start_bulk_load,
},
{
Expand Down
60 changes: 58 additions & 2 deletions src/test/function_test/test_bulk_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,10 @@ class bulk_load_test : public testing::Test
system(copy_file_cmd.c_str());
}

error_code start_bulk_load()
error_code start_bulk_load(bool ingest_behind = false)
{
auto err_resp = ddl_client->start_bulk_load(APP_NAME, CLUSTER, PROVIDER, LOCAL_ROOT);
auto err_resp =
ddl_client->start_bulk_load(APP_NAME, CLUSTER, PROVIDER, LOCAL_ROOT, ingest_behind);
return err_resp.get_value().err;
}

Expand All @@ -123,6 +124,26 @@ class bulk_load_test : public testing::Test
system(cmd.c_str());
}

void update_allow_ingest_behind(const std::string &allow_ingest_behind)
{
// update app envs
std::vector<std::string> keys;
keys.emplace_back(ROCKSDB_ALLOW_INGEST_BEHIND);
std::vector<std::string> values;
values.emplace_back(allow_ingest_behind);
auto err_resp = ddl_client->set_app_envs(APP_NAME, keys, values);
ASSERT_EQ(err_resp.get_value().err, ERR_OK);
std::cout << "sleep 30s to wait app_envs update" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(30));
// restart onebox to make config works
chdir(pegasus_root_dir.c_str());
system("./run.sh stop_onebox");
std::this_thread::sleep_for(std::chrono::seconds(5));
system("./run.sh start_onebox -w");
std::this_thread::sleep_for(std::chrono::seconds(20));
chdir(working_root_dir.c_str());
}

bulk_load_status::type wait_bulk_load_finish(int64_t seconds)
{
int64_t sleep_time = 5;
Expand Down Expand Up @@ -269,3 +290,38 @@ TEST_F(bulk_load_test, bulk_load_tests)
operate_data(operation::DEL, "", 15);
operate_data(operation::NO_VALUE, "", 15);
}

///
/// case1: inconsistent ingest_behind
/// case2: bulk load(ingest_behind) succeed with data verfied
/// case3: bulk load data consistent:
/// - bulk load data will be overrided by old data
/// - get/set/del succeed after bulk load
///
TEST_F(bulk_load_test, bulk_load_ingest_behind_tests)
{
// app envs allow_ingest_behind = false, request ingest_behind = true
ASSERT_EQ(start_bulk_load(true), ERR_INCONSISTENT_STATE);

update_allow_ingest_behind("true");

// write old data
operate_data(operation::SET, "oldValue", 10);
operate_data(operation::GET, "oldValue", 10);

ASSERT_EQ(start_bulk_load(true), ERR_OK);
ASSERT_EQ(wait_bulk_load_finish(300), bulk_load_status::BLS_SUCCEED);

std::cout << "Start to verify data..." << std::endl;
// value overide by bulk_loaded_data
operate_data(operation::GET, "oldValue", 10);
ASSERT_TRUE(verify_data("hashkey", "sortkey"));

// write data after bulk load succeed
operate_data(operation::SET, "valueAfterBulkLoad", 20);
operate_data(operation::GET, "valueAfterBulkLoad", 20);

// del data after bulk load succeed
operate_data(operation::DEL, "", 15);
operate_data(operation::NO_VALUE, "", 15);
}