Skip to content

Commit

Permalink
io_tester: implement latency correction for jobs with RPS
Browse files Browse the repository at this point in the history
Currently, for jobs with RPS (request-per-second) rate specified,
io_tester measures only service time. It means, that if servicing
one of requests takes much longer than available delta time between
consecutive requests (issuing the next request is delayed), then
the measured latency does not depict that fact. In such case we
issue less requests than required and high latency is reported just
for the first request.

For instance, if there is a latency spike for one of requests, that
exceeds the available time of service according to RPS schedule, then
the total number of scheduled requests does not match the expected
count calculated as 'TOTAL = duration_seconds * RPS'.

Furthermore, the percentiles with latency printed at the end of the
simulation may show inaccurate data. Firstly, the count of samples
is lower than expected. Secondly, if the amount of time needed to
handle requests after the latency spike returned to the ordinary
value, then our statistics show that handling of only one request
was long, but it is not true - io_tester stopped sending requests
at given RPS and this way other requests could not be properly measured.
This indicates that io_tester suffers from coordinated omission problem.

This change implements latency correction flag. When it is enabled,
then io_tester measures the total request latency including the delay
between the expected schedule time and the actual schedule time.
Moreover, if any of requests take more time than available, then
io_tester tries to schedule 'delayed' requests as soon as possible
to return to the defined schedule.

Signed-off-by: Patryk Wrobel <[email protected]>
  • Loading branch information
pwrobelse committed May 22, 2024
1 parent 914a424 commit fb659bc
Showing 1 changed file with 48 additions and 6 deletions.
54 changes: 48 additions & 6 deletions apps/io_tester/io_tester.cc
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ struct job_config {

std::array<double, 4> quantiles = { 0.5, 0.95, 0.99, 0.999};
static bool keep_files = false;
static bool latency_correction_enabled = false;

future<> maybe_remove_file(sstring fname) {
return keep_files ? make_ready_future<>() : remove_file(fname);
Expand Down Expand Up @@ -284,6 +285,7 @@ class class_data {
std::uniform_int_distribution<uint32_t> _pos_distribution;
file _file;
bool _think = false;
bool _latency_correction_enabled = false;
::sleep_fn _sleep_fn = timer_sleep<lowres_clock>;
timer<> _thinker;

Expand All @@ -296,6 +298,7 @@ class class_data {
, _sg(cfg.shard_info.scheduling_group)
, _latencies(extended_p_square_probabilities = quantiles)
, _pos_distribution(0, _config.file_size / _config.shard_info.request_size)
, _latency_correction_enabled(latency_correction_enabled)
, _sleep_fn(_config.options.sleep_fn)
, _thinker([this] { think_tick(); })
{
Expand All @@ -304,6 +307,12 @@ class class_data {
} else if (_config.shard_info.think_time > 0us) {
_think = true;
}

// When using the schedule for issuing requests we need to ensure that requests are not issued before
// their exact schedule time. Therefore, the sleep function must have the desired accuracy.
if (_latency_correction_enabled && rps() != 0) {
_sleep_fn = timer_sleep<std::chrono::steady_clock>;
}
}

virtual ~class_data() = default;
Expand Down Expand Up @@ -344,11 +353,28 @@ class class_data {
auto buf = bufptr.get();
auto pause = std::chrono::duration_cast<std::chrono::microseconds>(1s) / rps;
auto pause_dist = _config.options.pause_fn(pause);
return seastar::sleep((pause / parallelism) * dummy).then([this, buf, stop, pause = pause_dist.get(), &intent, &in_flight] () mutable {
return do_until([this, stop] { return std::chrono::steady_clock::now() > stop || requests() > limit(); }, [this, buf, stop, pause, &intent, &in_flight] () mutable {
auto request_id_buf = std::make_unique<unsigned>(0u);
auto request_id_ptr = request_id_buf.get();
return seastar::sleep((pause / parallelism) * dummy).then([this, buf, stop, pause = pause_dist.get(), &intent, &in_flight, request_id_ptr] () mutable {
auto issuer_start = std::chrono::steady_clock::now();
return do_until([this, stop] { return std::chrono::steady_clock::now() > stop || requests() > limit(); },
[this, buf, stop, pause, &intent, &in_flight, request_id_ptr, issuer_start] () mutable {
auto start = std::chrono::steady_clock::now();
auto& request_id = *request_id_ptr;
auto intended_request_start = start;

if (_latency_correction_enabled) {
auto request_delta_time = pause->template get_as<std::chrono::microseconds>();
intended_request_start = issuer_start + (request_id * request_delta_time);
if (std::chrono::steady_clock::now() < intended_request_start) {
throw std::runtime_error{fmt::format("Issuing request would invalidate the schedule - it is too early!")};
}
}

in_flight++;
return issue_request(buf, &intent).then_wrapped([this, start, pause, stop, &in_flight] (auto size_f) {
request_id++;

return issue_request(buf, &intent).then_wrapped([this, start, intended_request_start, pause, stop, &in_flight] (auto size_f) {
size_t size;
try {
size = size_f.get();
Expand All @@ -359,12 +385,20 @@ class class_data {
}

auto now = std::chrono::steady_clock::now();
auto service_time = std::chrono::duration_cast<std::chrono::microseconds>(now - start);
auto request_latency = service_time;
if (_latency_correction_enabled) {
auto submission_delay = std::chrono::duration_cast<std::chrono::microseconds>(start - intended_request_start);
request_latency = (submission_delay + service_time);
}

if (now < stop) {
this->add_result(size, std::chrono::duration_cast<std::chrono::microseconds>(now - start));
this->add_result(size, request_latency);
}
in_flight--;

auto p = pause->template get_as<std::chrono::microseconds>();
auto next = start + p;
auto next = intended_request_start + p;

if (next > now) {
return this->_sleep_fn(next, now);
Expand All @@ -377,7 +411,7 @@ class class_data {
}).then([&intent, &in_flight] {
intent.cancel();
return do_until([&in_flight] { return in_flight == 0; }, [] { return seastar::sleep(100ms /* ¯\_(ツ)_/¯ */); });
}).finally([bufptr = std::move(bufptr), pause = std::move(pause_dist)] {});
}).finally([bufptr = std::move(bufptr), pause = std::move(pause_dist), request_id_buf = std::move(request_id_buf)] {});
});
});
}
Expand Down Expand Up @@ -1090,6 +1124,9 @@ int main(int ac, char** av) {
("duration", bpo::value<unsigned>()->default_value(10), "for how long (in seconds) to run the test")
("conf", bpo::value<sstring>()->default_value("./conf.yaml"), "YAML file containing benchmark specification")
("keep-files", bpo::value<bool>()->default_value(false), "keep test files, next run may re-use them")
("latency-correction", bpo::value<bool>()->default_value(false), "meaningful only for jobs that specify RPS (requests per second) rate; "
"accounts delay between expected request submission time and the actual "
"submission time into measured latency")
;

distributed<context> ctx;
Expand All @@ -1111,6 +1148,11 @@ int main(int ac, char** av) {
}
}

latency_correction_enabled = opts["latency-correction"].as<bool>();
if (latency_correction_enabled) {
fmt::print("Warning: sleep function for jobs using RPS will be forced to use steady_clock because of latency correction enabled!");
}

keep_files = opts["keep-files"].as<bool>();
auto& duration = opts["duration"].as<unsigned>();
auto& yaml = opts["conf"].as<sstring>();
Expand Down

0 comments on commit fb659bc

Please sign in to comment.