Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
acelyc111 committed Mar 4, 2024
1 parent 8a4bd8c commit 8f1e2b7
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 85 deletions.
51 changes: 51 additions & 0 deletions src/shell/commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,57 @@ struct list_nodes_helper
}
};

// A helper macro to parse command argument, the result is filled in a string vector variable named
// 'container'.
#define PARSE_STRS(container) \
do { \
const auto param = cmd(param_index++).str(); \
::dsn::utils::split_args(param.c_str(), container, ','); \
if (container.empty()) { \
fmt::print(stderr, \
"invalid command, '{}' should be in the form of 'val1,val2,val3' and " \
"should not be empty\n", \
param); \
return false; \
} \
std::set<std::string> str_set(container.begin(), container.end()); \
if (str_set.size() != container.size()) { \
fmt::print(stderr, "invalid command, '{}' has duplicate values\n", param); \
return false; \
} \
} while (false)

// A helper macro to parse command argument, the result is filled in an uint32_t variable named
// 'value'.
#define PARSE_UINT(value) \
do { \
const auto param = cmd(param_index++).str(); \
if (!::dsn::buf2uint32(param, value)) { \
fmt::print(stderr, "invalid command, '{}' should be an unsigned integer\n", param); \
return false; \
} \
} while (false)

// A helper macro to parse command argument, the result is filled in an uint32_t vector variable
// named 'container'.
#define PARSE_UINTS(container) \
do { \
std::vector<std::string> strs; \
PARSE_STRS(strs); \
container.clear(); \
for (const auto &str : strs) { \
uint32_t v; \
if (!::dsn::buf2uint32(str, v)) { \
fmt::print(stderr, \
"invalid command, '{}' in <" #container \
"> should be an unsigned integer\n", \
str); \
return false; \
} \
container.push_back(v); \
} \
} while (false)

// == miscellaneous (see 'commands/misc.cpp') == //

bool help_info(command_executor *e, shell_context *sc, arguments args);
Expand Down
133 changes: 50 additions & 83 deletions src/shell/commands/local_partition_split.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
// Copyright (c) 2019, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "shell/commands.h"

Expand All @@ -17,63 +30,20 @@
#include "utils/load_dump_object.h"
#include "utils/string_conv.h"

// TODO(yingchun): improve the parameters, use flags (i.e. --src_data_dirs=xxx) would be more
// friendly.
const std::string local_partition_split_help = "<src_data_dirs> <dst_data_dirs> <src_app_id> "
"<dst_app_id> <partition_ids> <src_partition_count> "
"<dst_partition_count>";
const std::string local_partition_split_help =
"<src_data_dirs> <dst_data_dirs> <src_app_id> "
"<dst_app_id> <src_partition_ids> <src_partition_count> "
"<dst_partition_count>";
bool local_partition_split(command_executor *e, shell_context *sc, arguments args)
{
// Parse parameters.
argh::parser cmd(args.argc, args.argv);
if (cmd.pos_args().size() != 8) {
fmt::print(
stderr, "invalid command, should be in the form of {}\n", local_partition_split_help);
stderr, "invalid command, should be in the form of '{}'\n", local_partition_split_help);
return false;
}
int index = 1;

#define PARSE_STRS(container) \
do { \
::dsn::utils::split_args(cmd(index++).str().c_str(), container, ','); \
if (container.empty()) { \
fmt::print(stderr, \
"invalid command, <" #container \
"> should be in the form of 'val1,val2,val3' and should not be empty\n"); \
return false; \
} \
std::set<std::string> str_set(container.begin(), container.end()); \
if (str_set.size() != container.size()) { \
fmt::print(stderr, "invalid command, <" #container "> has duplicate values\n"); \
return false; \
} \
} while (false)

#define PARSE_UINT(value) \
do { \
if (!::dsn::buf2uint32(cmd(index++).str(), value)) { \
fmt::print(stderr, "invalid command, <" #value "> should be an unsigned integer\n"); \
return false; \
} \
} while (false)

#define PARSE_UINTS(container) \
do { \
std::vector<std::string> strs; \
PARSE_STRS(strs); \
container.clear(); \
for (const auto &str : strs) { \
uint32_t v; \
if (!::dsn::buf2uint32(str, v)) { \
fmt::print(stderr, \
"invalid command, '{}' in <" #container \
"> should be an unsigned integer\n", \
str); \
return false; \
} \
container.push_back(v); \
} \
} while (false)
int param_index = 1;

std::vector<std::string> src_data_dirs;
PARSE_STRS(src_data_dirs);
Expand All @@ -94,41 +64,38 @@ bool local_partition_split(command_executor *e, shell_context *sc, arguments arg
uint32_t dst_app_id;
PARSE_UINT(dst_app_id);

std::vector<uint32_t> partition_ids;
PARSE_UINTS(partition_ids);
std::set<uint32_t> src_partition_ids(partition_ids.begin(), partition_ids.end());
std::vector<uint32_t> src_partition_ids;
PARSE_UINTS(src_partition_ids);
std::set<uint32_t> sorted_src_partition_ids(src_partition_ids.begin(), src_partition_ids.end());

uint32_t src_partition_count;
PARSE_UINT(src_partition_count);

uint32_t dst_partition_count;
PARSE_UINT(dst_partition_count);

#undef PARSE_STRS
#undef PARSE_UINT
#undef PARSE_UINTS

// Check parameters.
// TODO(yingchun): 1. check disk space.
// 2. check app id
// TODO(yingchun): check 'dst_app_id' is not exist.
// TODO(yingchun): check disk space.
// Check 'dst_app_id'.
if (src_app_id == dst_app_id) {
fmt::print(stderr,
"invalid command, <src_app_id> and <dst_app_id> should be equal ({} vs. {})\n",
src_app_id,
dst_app_id);
return false;
}
// 3. check 'partition_ids'.
for (const auto partition_id : partition_ids) {
if (partition_id >= src_partition_count) {
// Check 'src_partition_ids'.
for (const auto src_partition_id : sorted_src_partition_ids) {
if (src_partition_id >= src_partition_count) {
fmt::print(stderr,
"invalid command, <partition_ids> should be in range [0, {})\n",
"invalid command, partition ids in <src_partition_ids> should be in range "
"[0, {})\n",
src_partition_count);
return false;
}
}

// Check 'dst_partition_count'.
if (dst_partition_count <= src_partition_count) {
fmt::print(stderr,
"invalid command, <dst_partition_count> should be larger than "
Expand All @@ -137,12 +104,9 @@ bool local_partition_split(command_executor *e, shell_context *sc, arguments arg
src_partition_count);
return false;
}

// TODO(yingchun): check the correction.
const int split_count = dst_partition_count / src_partition_count;
CHECK_GT(split_count, 0);
int log2n = log2(split_count);
if (pow(2, log2n) != split_count) {
const uint32_t kSplitCount = dst_partition_count / src_partition_count;
const auto log2n = static_cast<uint32_t>(log2(kSplitCount));
if (pow(2, log2n) != kSplitCount) {
fmt::print(stderr,
"invalid command, <dst_partition_count> should be 2^n times of "
"<src_partition_count> ({} vs. {})\n",
Expand Down Expand Up @@ -214,6 +178,7 @@ bool local_partition_split(command_executor *e, shell_context *sc, arguments arg
}

exist_app_ids.insert(app_id);
// The 'dst_app_id' is not exist.
if (exist_app_ids.count(dst_app_id) > 0) {
fmt::print(
stderr, "invalid command, <dst_app_id> {} is already exist\n", dst_app_id);
Expand Down Expand Up @@ -249,7 +214,7 @@ bool local_partition_split(command_executor *e, shell_context *sc, arguments arg
}

// All checks passed, do the split.
if (src_partition_ids.count(pidx) > 0) {
if (sorted_src_partition_ids.count(pidx) > 0) {
to_split_partitions.push_back({replica_dir, info, pidx});
}
}
Expand Down Expand Up @@ -334,7 +299,8 @@ bool local_partition_split(command_executor *e, shell_context *sc, arguments arg
// 'fname' has a duplicate '/' when contact 'file.db_path' and 'file.name', it
// doesn't matter.
const auto fname = file.db_path + file.name;
// fmt::print(stdout, "column family: {}, file: {}\n", file.column_family_name, fname);
// fmt::print(stdout, "column family: {}, file: {}\n", file.column_family_name,
// fname);
// Open reader.
// TODO(yingchun): options?
auto reader = std::make_unique<rocksdb::SstFileReader>(rocksdb::Options());
Expand All @@ -358,8 +324,8 @@ bool local_partition_split(command_executor *e, shell_context *sc, arguments arg

// Open writers.
std::vector<std::string> dst_tmp_rdb_dirs;
dst_tmp_rdb_dirs.resize(split_count);
for (int i = 0; i < split_count; i++) {
dst_tmp_rdb_dirs.resize(kSplitCount);
for (int i = 0; i < kSplitCount; i++) {
// Split temporary dir.
auto dst_tmp_rdb_dir = fmt::format("{}/{}.{}.pegasus",
split_data_dir_replica_dirs,
Expand All @@ -383,8 +349,8 @@ bool local_partition_split(command_executor *e, shell_context *sc, arguments arg
int total_count = 0;
int empty_count = 0;
std::vector<int> split_counts;
split_counts.resize(split_count);
std::shared_ptr<rocksdb::SstFileWriter> writers[split_count];
split_counts.resize(kSplitCount);
std::shared_ptr<rocksdb::SstFileWriter> writers[kSplitCount];
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
total_count++;
// Calc the hash value and corresponding new partition index and sst writer.
Expand All @@ -401,7 +367,7 @@ bool local_partition_split(command_executor *e, shell_context *sc, arguments arg
CHECK_LT(new_pidx, dst_partition_count);
int writer_idx = new_pidx / src_partition_count;
CHECK_LE(0, writer_idx);
CHECK_LT(writer_idx, split_count);
CHECK_LT(writer_idx, kSplitCount);

// TODO(yingchun): improve to check expired data.

Expand Down Expand Up @@ -433,7 +399,7 @@ bool local_partition_split(command_executor *e, shell_context *sc, arguments arg

// Release reader and writers.
fmt::print(stderr, "total_count: {}, empty_count: {}\n", total_count, empty_count);
for (int i = 0; i < split_count; i++) {
for (int i = 0; i < kSplitCount; i++) {
fmt::print(stderr, "[{}] count: {}\n", i, split_counts[i]);
if (split_counts[i] == 0) {
continue;
Expand All @@ -450,7 +416,7 @@ bool local_partition_split(command_executor *e, shell_context *sc, arguments arg
}

// Create new partitions.
for (int i = 0; i < split_count; i++) {
for (int i = 0; i < kSplitCount; i++) {
const auto new_data_dir_replica_dirs =
fmt::format("{}/{}.{}.pegasus",
data_dir_replica_dirs,
Expand Down Expand Up @@ -517,7 +483,7 @@ bool local_partition_split(command_executor *e, shell_context *sc, arguments arg
}

std::unique_ptr<rocksdb::Iterator> iter(
_db->NewIterator(rocksdb::ReadOptions()));
_db->NewIterator(rocksdb::ReadOptions()));
int new_total_count = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
new_total_count++;
Expand Down Expand Up @@ -568,7 +534,8 @@ bool local_partition_split(command_executor *e, shell_context *sc, arguments arg
new_ai.app_name += "_new"; // TODO(yingchun): customize it.
new_ai.app_id = dst_app_id;
new_ai.partition_count = dst_partition_count;
// new_ai.create_second = dsn_now_s(); // TODO(yingchun): all partitions must be the same!
// new_ai.create_second = dsn_now_s(); // TODO(yingchun): all
// partitions must be the same!
dsn::replication::replica_app_info rai(&new_ai);
const auto rai_path =
dsn::utils::filesystem::path_combine(new_data_dir_replica_dirs, kAppInfo);
Expand Down
4 changes: 2 additions & 2 deletions src/shell/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
#include <string>
#include <utility>
#include <vector>
#include <fmt/format.h>

#include "args.h"
#include "client/replication_ddl_client.h"
Expand Down Expand Up @@ -535,7 +534,8 @@ static command_executor commands[] = {
},
{
"local_partition_split",
"split the local partitions",
"split the local partitions offline. It's needed to stop the replica server before "
"executing this command.",
local_partition_split_help.c_str(),
local_partition_split,
},
Expand Down

0 comments on commit 8f1e2b7

Please sign in to comment.