From c6ba581830aae2f36dc7ed92aec6c9900dabdca3 Mon Sep 17 00:00:00 2001 From: elkana Date: Sun, 4 Dec 2022 22:37:03 +0200 Subject: [PATCH] manager.cpp: write galois mapping distribute work --- src/services/CMakeLists.txt | 2 +- src/services/client_context.hpp | 28 ++++++++++++++++++ src/services/manager.cpp | 51 +++++++++++++++++++++++++++++++-- src/services/manager.hpp | 3 ++ src/services/server.cpp | 3 +- src/services/server.hpp | 18 +----------- 6 files changed, 84 insertions(+), 21 deletions(-) create mode 100644 src/services/client_context.hpp diff --git a/src/services/CMakeLists.txt b/src/services/CMakeLists.txt index 72f209c0..caa2c935 100644 --- a/src/services/CMakeLists.txt +++ b/src/services/CMakeLists.txt @@ -1,6 +1,6 @@ add_library(services worker_strategy.hpp worker_strategy.cpp worker.hpp factory.hpp factory.cpp worker.cpp constants.hpp manager.hpp manager.cpp - server.hpp server.cpp db.hpp db.cpp utils.hpp utils.cpp client_service.cpp client_service.hpp) + server.hpp server.cpp db.hpp db.cpp utils.hpp utils.cpp client_service.cpp client_service.hpp client_context.hpp) target_include_directories(services PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}) target_include_directories(services PRIVATE ${com_sealpir_SOURCE_DIR}/src) diff --git a/src/services/client_context.hpp b/src/services/client_context.hpp new file mode 100644 index 00000000..3fa07789 --- /dev/null +++ b/src/services/client_context.hpp @@ -0,0 +1,28 @@ +#pragma once + +#include +#include "pir_client.hpp" +#include "db.hpp" +#include "manager.hpp" +#include "distribicom.grpc.pb.h" +#include "distribicom.pb.h" + +namespace services { +/** +* ClientInfo stores objects related to an individual client +*/ + struct ClientInfo { + distribicom::ClientQueryRequest query_info_marshaled; + distribicom::GaloisKeys galois_keys_marshaled; + + PirQuery query; + seal::GaloisKeys galois_keys; + unique_ptr client_stub; + }; + struct ClientDB { + shared_mutex mutex; + map> id_to_info; + uint64_t client_counter = 0; + }; +} + diff --git a/src/services/manager.cpp b/src/services/manager.cpp index 2ec25e66..997c3b0b 100644 --- a/src/services/manager.cpp +++ b/src/services/manager.cpp @@ -8,6 +8,9 @@ namespace services { // fulfilled... the map can be refered to using a round, and an epoch as its key. // upon completion of a task it should go back to the map and state all workers have freaking completed their tasks. + std::uint64_t query_count_per_worker(std::uint64_t num_workers, std::uint64_t num_rows, std::uint64_t num_queries); + + ::grpc::Status Manager::RegisterAsWorker(::grpc::ServerContext *context, const ::distribicom::WorkerRegistryRequest *request, @@ -231,6 +234,45 @@ namespace services { worker_counter.wait_for(i); } + void Manager::send_galois_keys( ClientDB &all_clients) { + grpc::ClientContext context; + + std::shared_lock client_db_lock(all_clients.mutex); + auto num_clients = all_clients.client_counter; + auto num_rows = app_configs.configs().db_rows(); + + std::unique_lock lock(mtx); + auto num_queries_per_worker = query_count_per_worker(worker_stubs.size(), num_rows, num_clients); + + utils::add_metadata_size(context, services::constants::size_md, int(num_queries_per_worker)); + // todo: specify epoch! + utils::add_metadata_size(context, services::constants::round_md, 1); + utils::add_metadata_size(context, services::constants::epoch_md, 1); + + distribicom::Ack response; + distribicom::WorkerTaskPart prt; + + for (auto &worker: worker_stubs) { + { // this stream is released at the end of this scope. + auto stream = worker.second->SendTask(&context, &response); + auto range_start = worker_name_to_work_responsible_for[worker.first].query_range_start; + auto range_end = worker_name_to_work_responsible_for[worker.first].query_range_end; + + for (std::uint64_t i = range_start; i < range_end; ++i) { + prt.mutable_gkey()->CopyFrom(all_clients.id_to_info[i]->galois_keys_marshaled); + stream->Write(prt); + } + stream->WritesDone(); + auto status = stream->Finish(); + if (!status.ok()) { + std::cout << "manager:: distribute_work:: transmitting galois gal_key to " << worker.first << + " failed: " << status.error_message() << std::endl; + continue; + } + } + } + } + void Manager::send_galois_keys(const math_utils::matrix &matrix) { grpc::ClientContext context; @@ -276,11 +318,16 @@ namespace services { return {range_start, range_end}; } + std::uint64_t query_count_per_worker(std::uint64_t num_workers, std::uint64_t num_rows, std::uint64_t num_queries){ + auto num_workers_per_row = num_workers/num_rows; + auto num_queries_per_worker = num_queries / num_workers_per_row; + return num_queries_per_worker; + } + void Manager::map_workers_to_responsibilities(std::uint64_t num_rows, std::uint64_t num_queries) { int i = 0; std::unique_lock lock(mtx); - auto num_workers_per_row = worker_stubs.size()/num_rows; - auto num_queries_per_worker = num_queries / num_workers_per_row; + auto num_queries_per_worker = query_count_per_worker(worker_stubs.size(), num_rows, num_queries); for(auto &worker: worker_stubs){ this->worker_name_to_work_responsible_for[worker.first].worker_number = i; diff --git a/src/services/manager.hpp b/src/services/manager.hpp index bc44e794..37fcc7bc 100644 --- a/src/services/manager.hpp +++ b/src/services/manager.hpp @@ -10,6 +10,7 @@ #include "marshal/marshal.hpp" #include "concurrency/concurrency.h" #include "utils.hpp" +#include "client_context.hpp" namespace services { @@ -120,6 +121,8 @@ namespace services { // assumes num workers map well to db and queries void map_workers_to_responsibilities(uint64_t num_rows, uint64_t num_queries); + + void send_galois_keys( ClientDB &all_clients); }; } diff --git a/src/services/server.cpp b/src/services/server.cpp index 95a808b9..c5262c3c 100644 --- a/src/services/server.cpp +++ b/src/services/server.cpp @@ -3,6 +3,7 @@ // #include "server.hpp" +#include "client_context.hpp" services::FullServer::FullServer(math_utils::matrix &db, math_utils::matrix &queries, @@ -47,7 +48,7 @@ services::FullServer::RegisterAsClient(grpc::ServerContext *context, const distr ) )); - auto client_info = std::make_unique(ClientInfo()); + auto client_info = std::make_unique(ClientInfo()); client_info->galois_keys_marshaled.set_keys(request->galois_keys()); client_info->client_stub = std::move(client_conn); diff --git a/src/services/server.hpp b/src/services/server.hpp index d4fcb056..952f19d0 100644 --- a/src/services/server.hpp +++ b/src/services/server.hpp @@ -6,26 +6,10 @@ #include "manager.hpp" #include "db.hpp" #include "pir_client.hpp" +#include "client_context.hpp" #include namespace services { - /** - * ClientInfo stores objects related to an individual client - */ - struct ClientInfo { - distribicom::ClientQueryRequest query_info_marshaled; - distribicom::GaloisKeys galois_keys_marshaled; - - PirQuery query; - seal::GaloisKeys galois_keys; - std::unique_ptr client_stub; - }; - - struct ClientDB { - std::shared_mutex mutex; - std::map> id_to_info; - std::uint64_t client_counter=0; - }; // uses both the Manager and the Server services to complete a full distribicom server.