Skip to content

Commit

Permalink
manager.cpp: write galois mapping distribute work
Browse files Browse the repository at this point in the history
  • Loading branch information
elkanatovey committed Dec 4, 2022
1 parent 5122396 commit c6ba581
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 21 deletions.
2 changes: 1 addition & 1 deletion src/services/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@

add_library(services worker_strategy.hpp worker_strategy.cpp worker.hpp factory.hpp factory.cpp worker.cpp constants.hpp manager.hpp manager.cpp
server.hpp server.cpp db.hpp db.cpp utils.hpp utils.cpp client_service.cpp client_service.hpp)
server.hpp server.cpp db.hpp db.cpp utils.hpp utils.cpp client_service.cpp client_service.hpp client_context.hpp)

target_include_directories(services PUBLIC ${CMAKE_CURRENT_SOURCE_DIR})
target_include_directories(services PRIVATE ${com_sealpir_SOURCE_DIR}/src)
Expand Down
28 changes: 28 additions & 0 deletions src/services/client_context.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#pragma once

#include <future>
#include "pir_client.hpp"
#include "db.hpp"
#include "manager.hpp"
#include "distribicom.grpc.pb.h"
#include "distribicom.pb.h"

namespace services {
/**
* ClientInfo stores objects related to an individual client
*/
struct ClientInfo {
distribicom::ClientQueryRequest query_info_marshaled;
distribicom::GaloisKeys galois_keys_marshaled;

PirQuery query;
seal::GaloisKeys galois_keys;
unique_ptr<distribicom::Client::Stub> client_stub;
};
struct ClientDB {
shared_mutex mutex;
map<uint32_t, unique_ptr<ClientInfo>> id_to_info;
uint64_t client_counter = 0;
};
}

51 changes: 49 additions & 2 deletions src/services/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ namespace services {
// fulfilled... the map can be refered to using a round, and an epoch as its key.
// upon completion of a task it should go back to the map and state all workers have freaking completed their tasks.

std::uint64_t query_count_per_worker(std::uint64_t num_workers, std::uint64_t num_rows, std::uint64_t num_queries);


::grpc::Status
Manager::RegisterAsWorker(::grpc::ServerContext *context,
const ::distribicom::WorkerRegistryRequest *request,
Expand Down Expand Up @@ -231,6 +234,45 @@ namespace services {
worker_counter.wait_for(i);
}

void Manager::send_galois_keys( ClientDB &all_clients) {
grpc::ClientContext context;

std::shared_lock client_db_lock(all_clients.mutex);
auto num_clients = all_clients.client_counter;
auto num_rows = app_configs.configs().db_rows();

std::unique_lock lock(mtx);
auto num_queries_per_worker = query_count_per_worker(worker_stubs.size(), num_rows, num_clients);

utils::add_metadata_size(context, services::constants::size_md, int(num_queries_per_worker));
// todo: specify epoch!
utils::add_metadata_size(context, services::constants::round_md, 1);
utils::add_metadata_size(context, services::constants::epoch_md, 1);

distribicom::Ack response;
distribicom::WorkerTaskPart prt;

for (auto &worker: worker_stubs) {
{ // this stream is released at the end of this scope.
auto stream = worker.second->SendTask(&context, &response);
auto range_start = worker_name_to_work_responsible_for[worker.first].query_range_start;
auto range_end = worker_name_to_work_responsible_for[worker.first].query_range_end;

for (std::uint64_t i = range_start; i < range_end; ++i) {
prt.mutable_gkey()->CopyFrom(all_clients.id_to_info[i]->galois_keys_marshaled);
stream->Write(prt);
}
stream->WritesDone();
auto status = stream->Finish();
if (!status.ok()) {
std::cout << "manager:: distribute_work:: transmitting galois gal_key to " << worker.first <<
" failed: " << status.error_message() << std::endl;
continue;
}
}
}
}

void Manager::send_galois_keys(const math_utils::matrix<seal::GaloisKeys> &matrix) {
grpc::ClientContext context;

Expand Down Expand Up @@ -276,11 +318,16 @@ namespace services {
return {range_start, range_end};
}

std::uint64_t query_count_per_worker(std::uint64_t num_workers, std::uint64_t num_rows, std::uint64_t num_queries){
auto num_workers_per_row = num_workers/num_rows;
auto num_queries_per_worker = num_queries / num_workers_per_row;
return num_queries_per_worker;
}

void Manager::map_workers_to_responsibilities(std::uint64_t num_rows, std::uint64_t num_queries) {
int i = 0;
std::unique_lock lock(mtx);
auto num_workers_per_row = worker_stubs.size()/num_rows;
auto num_queries_per_worker = num_queries / num_workers_per_row;
auto num_queries_per_worker = query_count_per_worker(worker_stubs.size(), num_rows, num_queries);

for(auto &worker: worker_stubs){
this->worker_name_to_work_responsible_for[worker.first].worker_number = i;
Expand Down
3 changes: 3 additions & 0 deletions src/services/manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "marshal/marshal.hpp"
#include "concurrency/concurrency.h"
#include "utils.hpp"
#include "client_context.hpp"


namespace services {
Expand Down Expand Up @@ -120,6 +121,8 @@ namespace services {

// assumes num workers map well to db and queries
void map_workers_to_responsibilities(uint64_t num_rows, uint64_t num_queries);

void send_galois_keys( ClientDB &all_clients);
};
}

3 changes: 2 additions & 1 deletion src/services/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//

#include "server.hpp"
#include "client_context.hpp"


services::FullServer::FullServer(math_utils::matrix<seal::Plaintext> &db, math_utils::matrix<seal::Ciphertext> &queries,
Expand Down Expand Up @@ -47,7 +48,7 @@ services::FullServer::RegisterAsClient(grpc::ServerContext *context, const distr
)
));

auto client_info = std::make_unique<services::ClientInfo>(ClientInfo());
auto client_info = std::make_unique<ClientInfo>(ClientInfo());
client_info->galois_keys_marshaled.set_keys(request->galois_keys());
client_info->client_stub = std::move(client_conn);

Expand Down
18 changes: 1 addition & 17 deletions src/services/server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,10 @@
#include "manager.hpp"
#include "db.hpp"
#include "pir_client.hpp"
#include "client_context.hpp"
#include <future>

namespace services {
/**
* ClientInfo stores objects related to an individual client
*/
struct ClientInfo {
distribicom::ClientQueryRequest query_info_marshaled;
distribicom::GaloisKeys galois_keys_marshaled;

PirQuery query;
seal::GaloisKeys galois_keys;
std::unique_ptr<distribicom::Client::Stub> client_stub;
};

struct ClientDB {
std::shared_mutex mutex;
std::map<std::uint32_t, std::unique_ptr<ClientInfo>> id_to_info;
std::uint64_t client_counter=0;
};


// uses both the Manager and the Server services to complete a full distribicom server.
Expand Down

0 comments on commit c6ba581

Please sign in to comment.