Skip to content

Commit

Permalink
bigtrace: Add preemption for multi-user
Browse files Browse the repository at this point in the history
- Add preemption of queries using a ResizableTaskPool per query, which changes size based on the number of users in order to allocate an equal amount of workers to all users

Change-Id: I6777857961a098789f7575212a1854f7b0cc12eb
  • Loading branch information
ivan-chong committed Sep 9, 2024
1 parent 7a40138 commit aceafed
Show file tree
Hide file tree
Showing 9 changed files with 461 additions and 114 deletions.
4 changes: 4 additions & 0 deletions src/bigtrace/orchestrator/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ if (enable_perfetto_grpc) {
"orchestrator_impl.cc",
"orchestrator_impl.h",
"orchestrator_main.cc",
"resizable_task_pool.cc",
"resizable_task_pool.h",
"trace_address_pool.cc",
"trace_address_pool.h",
]
deps = [
"../../../gn:default_deps",
Expand Down
234 changes: 157 additions & 77 deletions src/bigtrace/orchestrator/orchestrator_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,115 +15,195 @@
*/

#include <chrono>
#include <memory>
#include <mutex>
#include <thread>

#include <grpcpp/client_context.h>
#include <grpcpp/support/status.h>

#include "perfetto/base/logging.h"
#include "perfetto/base/time.h"
#include "perfetto/ext/base/utils.h"
#include "protos/perfetto/bigtrace/orchestrator.pb.h"
#include "src/bigtrace/orchestrator/orchestrator_impl.h"
#include "src/bigtrace/orchestrator/resizable_task_pool.h"
#include "src/bigtrace/orchestrator/trace_address_pool.h"

namespace perfetto::bigtrace {

namespace {
const uint32_t kBufferPushDelay = 100;
const uint32_t kBufferPushDelayMicroseconds = 100;

grpc::Status ExecuteQueryOnTrace(
std::string sql_query,
std::string trace,
grpc::Status& query_status,
std::mutex& worker_lock,
std::vector<protos::BigtraceQueryResponse>& response_buffer,
std::unique_ptr<protos::BigtraceWorker::Stub>& stub,
ThreadWithContext* contextual_thread) {
protos::BigtraceQueryTraceArgs trace_args;
protos::BigtraceQueryTraceResponse trace_response;

trace_args.set_sql_query(sql_query);
trace_args.set_trace(trace);
grpc::Status status = stub->QueryTrace(
contextual_thread->client_context.get(), trace_args, &trace_response);

if (!status.ok()) {
{
std::lock_guard<std::mutex> status_guard(worker_lock);
// We check and only update the query status if it was not already errored
// to avoid unnecessary updates.
if (query_status.ok()) {
query_status = status;
}
}

return status;
}

protos::BigtraceQueryResponse response;
response.set_trace(trace_response.trace());
for (const protos::QueryResult& query_result : trace_response.result()) {
response.add_result()->CopyFrom(query_result);
}
std::lock_guard<std::mutex> buffer_guard(worker_lock);
response_buffer.emplace_back(std::move(response));

return grpc::Status::OK;
}

void ThreadRunLoop(ThreadWithContext* contextual_thread,
TraceAddressPool& address_pool,
std::string sql_query,
grpc::Status& query_status,
std::mutex& worker_lock,
std::vector<protos::BigtraceQueryResponse>& response_buffer,
std::unique_ptr<protos::BigtraceWorker::Stub>& stub) {
for (;;) {
auto maybe_trace_address = address_pool.Pop();
if (!maybe_trace_address) {
return;
}

// The ordering of this context swap followed by the check on thread
// cancellation is essential and should not be changed to avoid a race where
// a request to cancel a thread is sent, followed by a context swap, causing
// the cancel to not be caught and the execution of the loop body to
// continue.
contextual_thread->client_context = std::make_unique<grpc::ClientContext>();

if (contextual_thread->IsCancelled()) {
address_pool.MarkCancelled(std::move(*maybe_trace_address));
return;
}

grpc::Status status = ExecuteQueryOnTrace(
sql_query, *maybe_trace_address, query_status, worker_lock,
response_buffer, stub, contextual_thread);

if (!status.ok()) {
if (status.error_code() == grpc::StatusCode::CANCELLED) {
address_pool.MarkCancelled(std::move(*maybe_trace_address));
}
return;
}
}
}

} // namespace

OrchestratorImpl::OrchestratorImpl(
std::unique_ptr<protos::BigtraceWorker::Stub> stub,
uint32_t pool_size)
: stub_(std::move(stub)),
pool_(std::make_unique<base::ThreadPool>(pool_size)),
semaphore_(pool_size) {}
uint32_t max_query_concurrency)
: stub_(std::move(stub)), max_query_concurrency_(max_query_concurrency) {}

grpc::Status OrchestratorImpl::Query(
grpc::ServerContext*,
const protos::BigtraceQueryArgs* args,
grpc::ServerWriter<protos::BigtraceQueryResponse>* writer) {
grpc::Status query_status;
std::mutex status_lock;
std::mutex worker_lock;
const std::string& sql_query = args->sql_query();
std::vector<std::string> traces(args->traces().begin(), args->traces().end());

std::vector<protos::BigtraceQueryResponse> response_buffer;
uint64_t trace_count = static_cast<uint64_t>(args->traces_size());

std::thread push_response_buffer_thread([&]() {
uint64_t pushed_response_count = 0;
for (;;) {
{
std::lock_guard<std::mutex> status_guard(status_lock);
if (pushed_response_count == trace_count || !query_status.ok()) {
break;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(kBufferPushDelay));
if (response_buffer.empty()) {
continue;
}
std::vector<protos::BigtraceQueryResponse> buffer;
{
std::lock_guard<std::mutex> buffer_guard(buffer_lock_);
buffer = std::move(response_buffer);
response_buffer.clear();
}
for (protos::BigtraceQueryResponse& response : buffer) {
writer->Write(std::move(response));
}
pushed_response_count += buffer.size();
}
TraceAddressPool address_pool(std::move(traces));

// Update the query count on start and end ensuring that the query count is
// always decremented whenever the function is exited.
{
std::lock_guard<std::mutex> lk(query_count_mutex_);
query_count_++;
}
auto query_count_decrement = base::OnScopeExit([&]() {
std::lock_guard<std::mutex> lk(query_count_mutex_);
query_count_--;
});

for (const std::string& trace : args->traces()) {
ResizableTaskPool task_pool([&](ThreadWithContext* new_contextual_thread) {
ThreadRunLoop(new_contextual_thread, address_pool, sql_query, query_status,
worker_lock, response_buffer, stub_);
});

uint64_t pushed_response_count = 0;
uint32_t last_query_count = 0;
uint32_t current_query_count = 0;

for (;;) {
{
std::lock_guard<std::mutex> lk(query_count_mutex_);
current_query_count = query_count_;
}

PERFETTO_CHECK(current_query_count != 0);

// Update the number of threads to the lower of {the remaining number of
// traces} and the {maximum concurrency divided by the number of active
// queries}. This ensures that at most |max_query_concurrency_| calls to the
// backend are outstanding at any one point.
if (last_query_count != current_query_count) {
auto new_size =
std::min(std::max<uint32_t>(address_pool.RemainingCount(), 1u),
max_query_concurrency_ / current_query_count);
task_pool.Resize(new_size);
last_query_count = current_query_count;
}

// Exit the loop when either all responses have been successfully completed
// or if there is an error.
{
std::lock_guard<std::mutex> status_guard(status_lock);
if (!query_status.ok()) {
std::lock_guard<std::mutex> status_guard(worker_lock);
if (pushed_response_count == trace_count || !query_status.ok()) {
break;
}
}
semaphore_.Acquire();
pool_->PostTask([&]() {
grpc::ClientContext client_context;
protos::BigtraceQueryTraceArgs trace_args;
protos::BigtraceQueryTraceResponse trace_response;

trace_args.set_sql_query(sql_query);
trace_args.set_trace(trace);
grpc::Status status =
stub_->QueryTrace(&client_context, trace_args, &trace_response);
if (!status.ok()) {
PERFETTO_ELOG("QueryTrace returned an error status %s",
status.error_message().c_str());
{
std::lock_guard<std::mutex> status_guard(status_lock);
query_status = status;
}
} else {
protos::BigtraceQueryResponse response;
response.set_trace(trace_response.trace());
for (const protos::QueryResult& query_result :
trace_response.result()) {
response.add_result()->CopyFrom(query_result);
}
std::lock_guard<std::mutex> buffer_guard(buffer_lock_);
response_buffer.emplace_back(std::move(response));
}
semaphore_.Release();
});
}
push_response_buffer_thread.join();
return query_status;
}

void OrchestratorImpl::Semaphore::Acquire() {
std::unique_lock<std::mutex> lk(mutex_);
while (!count_) {
cv_.wait(lk);
// A buffer is used to periodically make writes to the client instead of
// writing every individual response in order to reduce contention on the
// writer.
base::SleepMicroseconds(kBufferPushDelayMicroseconds);
if (response_buffer.empty()) {
continue;
}
std::vector<protos::BigtraceQueryResponse> buffer;
{
std::lock_guard<std::mutex> buffer_guard(worker_lock);
buffer = std::move(response_buffer);
response_buffer.clear();
}
for (protos::BigtraceQueryResponse& response : buffer) {
writer->Write(std::move(response));
}
pushed_response_count += buffer.size();
}
--count_;
}

void OrchestratorImpl::Semaphore::Release() {
std::lock_guard<std::mutex> lk(mutex_);
++count_;
cv_.notify_one();
task_pool.JoinAll();

return query_status;
}

} // namespace perfetto::bigtrace
28 changes: 12 additions & 16 deletions src/bigtrace/orchestrator/orchestrator_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,39 +17,35 @@
#ifndef SRC_BIGTRACE_ORCHESTRATOR_ORCHESTRATOR_IMPL_H_
#define SRC_BIGTRACE_ORCHESTRATOR_ORCHESTRATOR_IMPL_H_

#include <grpcpp/client_context.h>
#include <memory>
#include <mutex>
#include <optional>
#include "perfetto/ext/base/threading/thread_pool.h"
#include "protos/perfetto/bigtrace/orchestrator.grpc.pb.h"
#include "protos/perfetto/bigtrace/worker.grpc.pb.h"

namespace perfetto::bigtrace {
namespace {
const uint64_t kDefaultMaxQueryConcurrency = 8;
} // namespace

class OrchestratorImpl final : public protos::BigtraceOrchestrator::Service {
public:
explicit OrchestratorImpl(std::unique_ptr<protos::BigtraceWorker::Stub> stub,
uint32_t pool_size);
uint32_t max_query_concurrency);

grpc::Status Query(
grpc::ServerContext*,
const protos::BigtraceQueryArgs* args,
grpc::ServerWriter<protos::BigtraceQueryResponse>* writer) override;

private:
class Semaphore {
public:
explicit Semaphore(uint32_t count) : count_(count) {}
void Acquire();
void Release();

private:
std::mutex mutex_;
std::condition_variable cv_;
uint32_t count_;
};
std::unique_ptr<protos::BigtraceWorker::Stub> stub_;
std::unique_ptr<base::ThreadPool> pool_;
std::mutex buffer_lock_;
// Used to interleave requests to the Orchestrator to distribute jobs more
// fairly
Semaphore semaphore_;
uint32_t max_query_concurrency_ = kDefaultMaxQueryConcurrency;
uint32_t query_count_ = 0;
std::mutex query_count_mutex_;
};

} // namespace perfetto::bigtrace
Expand Down
6 changes: 2 additions & 4 deletions src/bigtrace/orchestrator/orchestrator_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,8 @@ Usage: %s [OPTIONS]
-w -p -n EXCLUSIVELY)
-r --name_resolution_scheme SCHEME Specify the name resolution
scheme for gRPC (e.g. ipv4:, dns://)
-t -thread_pool_size POOL_SIZE Specify the size of the thread pool
which determines number of concurrent
gRPCs from the Orchestrator
-t -max_query_concurrency Specify the number of concurrent
MAX_QUERY_CONCURRENCY queries/gRPCs from the Orchestrator
)",
argv[0]);
}
Expand Down Expand Up @@ -157,7 +156,6 @@ base::Status OrchestratorMain(int argc, char** argv) {
std::string worker_address_list = options->worker_address_list;
uint64_t worker_count = options->worker_count;

// TODO(ivankc) Replace with DNS resolver
std::string target_address = options->name_resolution_scheme.empty()
? "ipv4:"
: options->name_resolution_scheme;
Expand Down
Loading

0 comments on commit aceafed

Please sign in to comment.