Skip to content

Commit

Permalink
Add logging for number of id columns (#2127)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #2127

## Why
The purpose is to log how many multi-key the advertisers are using.
In PID SHARD stage, we have "rowsInShard" to log the number of rows of each shard. And now we need to log the number of columns of the file. So we rename "rowsInShard" to be more generic "pid_shard_info".
## What
 read the file header to calculate the number of columns;

Reviewed By: yuyashiraki

Differential Revision: D43066928

fbshipit-source-id: 2ae778a39fbbad5a1b152cb2507bd8bbd215e01c
  • Loading branch information
Mao Ye authored and facebook-github-bot committed Feb 18, 2023
1 parent fb54de5 commit be333ab
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 27 deletions.
33 changes: 17 additions & 16 deletions fbpcs/data_processing/sharding/GenericSharder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ void strReplaceNullColumnWithEmpty(std::string& str) {
} // namespace detail

static const std::string kIdColumnPrefix = "id_";
static const std::string numIds = "num_ids";

/*
The chunk size for writing to cloud storage (currently
Expand Down Expand Up @@ -125,6 +126,8 @@ void GenericSharder::shard() {
<< "Header: [" << folly::join(",", header) << "]";
}

pid_shard_info[numIds] = idColumnIndices.size();

std::string newLine = "\n";
std::size_t i = 0;
for (const auto& outFile : outFiles) {
Expand All @@ -150,9 +153,9 @@ void GenericSharder::shard() {
}
}

// Log number of rows in each shard to the
// Log number of rows in each shard and num_ids to the
// "<filepath_for_0th_shard>_shardDistribution" file.
logShardDistribution();
logShardInfo();

XLOG(INFO) << "Finished after processing "
<< private_lift::logging::formatNumber(lineIdx) << " lines.";
Expand Down Expand Up @@ -198,32 +201,30 @@ void GenericSharder::shardLine(
outFiles.at(shard)->writeString(newLine);
}

void GenericSharder::logShardDistribution() {
void GenericSharder::logShardInfo() {
std::size_t numShards = getOutputPaths().size();
if (numShards == 0) {
XLOG(INFO) << "No shard to write distribution to.";
XLOG(INFO) << "No shard to write pid_shard info to.";
return;
}
std::string outputPath = getOutputPaths().at(0);
if (outputPath.empty()) {
XLOG(INFO) << "No filepath present to log shard distribution to.";
XLOG(INFO) << "No filepath present to log shard info to.";
return;
}
const std::string& shardDistributionPath =
outputPath + '_' + "shardDistribution";
auto fWriter = std::make_unique<fbpcf::io::FileWriter>(shardDistributionPath);
const std::string& shardInfoPath = outputPath + '_' + "shardDistribution";
auto fWriter = std::make_unique<fbpcf::io::FileWriter>(shardInfoPath);
auto bWriter =
std::make_unique<fbpcf::io::BufferedWriter>(std::move(fWriter));
std::string shardDstributionJson = getShardDistributionJson();
bWriter->writeString(shardDstributionJson);
std::string shardInfoJson = getShardInfoJson();
bWriter->writeString(shardInfoJson);
bWriter->close();
XLOG(INFO) << "Distribution of shards written to: '" << shardDistributionPath
<< "'";
XLOG(INFO) << "PID shard info written to: '" << shardInfoPath << "'";
}

std::string GenericSharder::getShardDistributionJson() {
auto rowsInShardDynamic = folly::toDynamic(rowsInShard);
std::string shardDstributionStr = folly::toPrettyJson(rowsInShardDynamic);
return shardDstributionStr;
std::string GenericSharder::getShardInfoJson() {
auto pidShardInfoDynamic = folly::toDynamic(pid_shard_info);
std::string shardInfoStr = folly::toPrettyJson(pidShardInfoDynamic);
return shardInfoStr;
}
} // namespace data_processing::sharder
10 changes: 5 additions & 5 deletions fbpcs/data_processing/sharding/GenericSharder.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,14 @@ class GenericSharder {
std::stringstream ss;
ss << shard;
std::string key = ss.str();
rowsInShard[key]++;
pid_shard_info[key]++;
}

int getRowsForShard(std::size_t shard) {
std::stringstream ss;
ss << shard;
std::string key = ss.str();
return rowsInShard[key];
return pid_shard_info[key];
}

/**
Expand Down Expand Up @@ -175,17 +175,17 @@ class GenericSharder {
/**
* Log number of rows in each shard to a json file
*/
void logShardDistribution();
void logShardInfo();

/**
* Return the json string that contains the number of rows in each shard
*/
std::string getShardDistributionJson();
std::string getShardInfoJson();

private:
std::string inputPath_;
std::vector<std::string> outputPaths_;
int32_t logEveryN_;
std::unordered_map<std::string, int> rowsInShard;
std::unordered_map<std::string, int> pid_shard_info;
};
} // namespace data_processing::sharder
12 changes: 6 additions & 6 deletions fbpcs/data_processing/sharding/test/GenericSharderTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class GenericSharderTest final : public GenericSharder {
const std::vector<int32_t>& /* unused */) final {
linesCalledWith_.push_back(line);
// shardLine is overwritten here so we should mannually call the
// logRowsToShard to make changes to rowsInShard.
// logRowsToShard to make changes to pid_shard_info.
std::size_t s = {0};
logRowsToShard(s);
}
Expand Down Expand Up @@ -147,7 +147,7 @@ TEST(GenericSharderTest, TestShardLine) {
EXPECT_EQ(actual.linesCalledWith_, expected);
}

TEST(GenericSharderTest, TestGetShardDistributionJson) {
TEST(GenericSharderTest, TestGetShardInfoJson) {
// This test is just ensuring that the json file is correctly written
auto randStart = folly::Random::secureRand64();
std::string inputPath =
Expand All @@ -170,12 +170,12 @@ TEST(GenericSharderTest, TestGetShardDistributionJson) {
// There are 2 output paths, so there will be two shards.
// Since there are 4 rows in the input file, the shardLine() is called for 4
// times. logRowsToShard(0) is also called for 4 times. Thus, the first pair
// in json should be "0":4. As rowsInShard[1] is never specified, it should be
// a default 0, thus the second pair should be "1":0.
// in json should be "0":4. As pid_shard_info[1] is never specified, it should
// be a default 0, thus the second pair should be "1":0.
std::string expected{
"{\n \"0\": 4,\n \"1\": 0\n}",
"{\n \"0\": 4,\n \"1\": 0,\n \"num_ids\": 1\n}",
};
EXPECT_EQ(actual.getShardDistributionJson(), expected);
EXPECT_EQ(actual.getShardInfoJson(), expected);
}

} // namespace data_processing::sharder

0 comments on commit be333ab

Please sign in to comment.