Skip to content

Commit

Permalink
support for dynamic header format
Browse files Browse the repository at this point in the history
Summary:
This change implements the feature for TraceFileStream to parse the trace line (i.e., setNextLine)
and allow the replay generator to retrieve the field with the field ID (i.e., getField) later on. In
order to support this, the replay generator provides the column names to field ID mapping table when
constructing the TraceFileStream.

This change also modifies the KVReplayGenerator to apply the new feature. With this change the
memcached traces can be shared in any format in forward compatible manner as long as the appropriate
header line is specified.

This change also adds an unit test to verify the new format and backward compatibility.

This change discards (most if not all) and reimplements D41403750.

Reviewed By: therealgymmy

Differential Revision: D46456693

fbshipit-source-id: d5b044816675127a740b20f62bba0563f3079916
  • Loading branch information
Jaesoo Lee authored and facebook-github-bot committed Jun 6, 2023
1 parent f101577 commit d8577df
Show file tree
Hide file tree
Showing 4 changed files with 292 additions and 117 deletions.
87 changes: 59 additions & 28 deletions cachelib/cachebench/workload/KVReplayGenerator.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,31 @@ struct ReqWrapper {
// the requests are sharded to each stressor by doing hashing over the key.
class KVReplayGenerator : public ReplayGeneratorBase {
public:
// input format is: key,op,size,op_count,key_size,ttl
enum SampleFields { KEY = 0, OP, SIZE, OP_COUNT, KEY_SIZE, TTL, END };
// Default order is key,op,size,op_count,key_size,ttl
enum SampleFields : uint8_t {
KEY = 0,
OP,
SIZE,
OP_COUNT,
KEY_SIZE,
TTL,
OP_TIME,
CACHE_HIT,
END
};

const ColumnTable columnTable_ = {
{SampleFields::OP_TIME, false, {"op_time"}},
{SampleFields::KEY, true, {"key"}}, /* required */
{SampleFields::KEY_SIZE, false, {"key_size"}},
{SampleFields::OP, true, {"op"}}, /* required */
{SampleFields::OP_COUNT, false, {"op_count"}},
{SampleFields::SIZE, true, {"size"}}, /* required */
{SampleFields::CACHE_HIT, false, {"cache_hits"}},
{SampleFields::TTL, false, {"ttl"}}};

explicit KVReplayGenerator(const StressorConfig& config)
: ReplayGeneratorBase(config), traceStream_(config, 0) {
: ReplayGeneratorBase(config), traceStream_(config, 0, columnTable_) {
for (uint32_t i = 0; i < numShards_; ++i) {
stressorCtxs_.emplace_back(std::make_unique<StressorCtx>(i));
}
Expand Down Expand Up @@ -117,6 +137,11 @@ class KVReplayGenerator : public ReplayGeneratorBase {
// Parse the request from the trace line and set the ReqWrapper
bool parseRequest(const std::string& line, std::unique_ptr<ReqWrapper>& req);

// for unit test
bool setHeaderRow(const std::string& header) {
return traceStream_.setHeaderRow(header);
}

private:
// Interval at which the submission queue is polled when it is either
// full (producer) or empty (consumer).
Expand Down Expand Up @@ -196,34 +221,40 @@ class KVReplayGenerator : public ReplayGeneratorBase {

inline bool KVReplayGenerator::parseRequest(const std::string& line,
std::unique_ptr<ReqWrapper>& req) {
// input format is: key,op,size,op_count,key_size,ttl
std::vector<folly::StringPiece> fields;
folly::split(",", line, fields);

if (fields.size() <= SampleFields::KEY_SIZE) {
if (!traceStream_.setNextLine(line)) {
return false;
}

auto keySizeField = folly::tryTo<size_t>(fields[SampleFields::KEY_SIZE]);
auto sizeField = folly::tryTo<size_t>(fields[SampleFields::SIZE]);
auto opCountField = folly::tryTo<uint32_t>(fields[SampleFields::OP_COUNT]);

if (!keySizeField.hasValue() || !sizeField.hasValue() ||
!opCountField.hasValue()) {
auto sizeField = traceStream_.template getField<size_t>(SampleFields::SIZE);
if (!sizeField.hasValue()) {
return false;
}

// Set key
req->key_ = fields[SampleFields::KEY];
req->key_ = traceStream_.template getField<>(SampleFields::KEY).value();

auto keySizeField =
traceStream_.template getField<size_t>(SampleFields::KEY_SIZE);
if (keySizeField.hasValue()) {
// The key is encoded as <encoded key, key size>.
// Generate key whose size matches with that of the original one
size_t keySize = std::max<size_t>(keySizeField.value(), req->key_.size());
// The key size should not exceed 256
keySize = std::min<size_t>(keySize, 256);
req->key_.resize(keySize, '0');
}

// Generate key whose size matches with that of the original one.
size_t keySize = std::max<size_t>(keySizeField.value(), req->key_.size());
// The key size should not exceed 256
keySize = std::min<size_t>(keySize, 256);
req->key_.resize(keySize, '0');
// Convert timestamp to seconds.
auto timestampField =
traceStream_.template getField<uint64_t>(SampleFields::OP_TIME);
if (timestampField.hasValue()) {
uint64_t timestampRaw = timestampField.value();
uint64_t timestampSeconds = timestampRaw / timestampFactor_;
req->req_.timestamp = timestampSeconds;
}

// Set op
const auto& op = fields[SampleFields::OP];
auto op = traceStream_.template getField<>(SampleFields::OP).value();
// TODO implement GET_LEASE and SET_LEASE emulations
if (!op.compare("GET") || !op.compare("GET_LEASE")) {
req->req_.setOp(OpType::kGet);
Expand All @@ -239,7 +270,9 @@ inline bool KVReplayGenerator::parseRequest(const std::string& line,
req->sizes_[0] = sizeField.value();

// Set op_count
req->repeats_ = opCountField.value();
auto opCountField =
traceStream_.template getField<uint32_t>(SampleFields::OP_COUNT);
req->repeats_ = opCountField.value_or(1);
if (!req->repeats_) {
return false;
}
Expand All @@ -248,12 +281,8 @@ inline bool KVReplayGenerator::parseRequest(const std::string& line,
}

// Set TTL (optional)
if (fields.size() > SampleFields::TTL) {
auto ttlField = folly::tryTo<size_t>(fields[SampleFields::TTL]);
req->req_.ttlSecs = ttlField.hasValue() ? ttlField.value() : 0;
} else {
req->req_.ttlSecs = 0;
}
auto ttlField = traceStream_.template getField<size_t>(SampleFields::TTL);
req->req_.ttlSecs = ttlField.value_or(0);

return true;
}
Expand All @@ -266,6 +295,8 @@ inline std::unique_ptr<ReqWrapper> KVReplayGenerator::getReqInternal() {

if (!parseRequest(line, reqWrapper)) {
parseError++;
XLOG_N_PER_MS(ERR, 10, 1000) << folly::sformat(
"Parsing error (total {}): {}", parseError.load(), line);
} else {
parseSuccess++;
}
Expand Down
9 changes: 2 additions & 7 deletions cachelib/cachebench/workload/PieceWiseReplayGenerator.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,12 @@ class PieceWiseReplayGenerator : public ReplayGeneratorBase {
public:
explicit PieceWiseReplayGenerator(const StressorConfig& config)
: ReplayGeneratorBase(config),
traceStream_(config, 0),
traceStream_(config, 0, {}),
pieceCacheAdapter_(config.maxCachePieces,
config.replayGeneratorConfig.numAggregationFields,
config.replayGeneratorConfig.statsPerAggField),
activeReqQ_(config.numThreads),
threadFinished_(config.numThreads),
timestampFactor_(config.timestampFactor) {
threadFinished_(config.numThreads) {
for (uint32_t i = 0; i < numShards_; ++i) {
activeReqQ_[i] =
std::make_unique<folly::ProducerConsumerQueue<PieceWiseReqWrapper>>(
Expand Down Expand Up @@ -164,10 +163,6 @@ class PieceWiseReplayGenerator : public ReplayGeneratorBase {
std::thread traceGenThread_;
std::atomic<bool> isEndOfFile_{false};

// The constant to be divided from the timestamp value
// to turn the timestamp into seconds.
const uint64_t timestampFactor_{1};

AtomicCounter queueProducerWaitCounts_{0};
AtomicCounter queueConsumerWaitCounts_{0};

Expand Down
Loading

0 comments on commit d8577df

Please sign in to comment.