Skip to content

Commit

Permalink
manager: mostly done integration
Browse files Browse the repository at this point in the history
  • Loading branch information
elkanatovey committed Dec 6, 2022
1 parent 1a96c9e commit ecda72a
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 27 deletions.
51 changes: 28 additions & 23 deletions src/services/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ namespace services {
::distribicom::Ack *response) {
auto sending_worker = utils::extract_ip(context);

mtx.lock();
mtx.lock_shared();
auto exists = worker_stubs.find(sending_worker) != worker_stubs.end();
mtx.unlock();
mtx.lock_shared();

if (!exists) {
return {grpc::StatusCode::INVALID_ARGUMENT, "worker not registered"};
Expand Down Expand Up @@ -120,33 +120,38 @@ namespace services {
const seal::GaloisKeys &expansion_key
#endif
) {
grpc::ClientContext context;

utils::add_metadata_size(context, services::constants::round_md, rnd);
utils::add_metadata_size(context, services::constants::epoch_md, epoch);

// currently, there is no work distribution. everyone receives the entire DB and the entire queries.
std::lock_guard<std::mutex> lock(mtx);
auto ledger = std::make_shared<WorkDistributionLedger>();
ledgers.insert({{rnd, epoch}, ledger});
ledger->worker_list = std::vector<std::string>();
ledger->worker_list.reserve(worker_stubs.size());
all_clients.mutex.lock_shared();
ledger->result_mat = math_utils::matrix<seal::Ciphertext>(
db.cols, all_clients.client_counter);
all_clients.mutex.unlock_shared();
for (auto &worker: worker_stubs) {
ledger->worker_list.push_back(worker.first);
{
// currently, there is no work distribution. everyone receives the entire DB and the entire queries.
std::shared_lock lock(mtx);
ledgers.insert({{rnd, epoch}, ledger});
ledger->worker_list = std::vector<std::string>();
ledger->worker_list.reserve(worker_stubs.size());
all_clients.mutex.lock_shared();
ledger->result_mat = math_utils::matrix<seal::Ciphertext>(
db.cols, all_clients.client_counter);
all_clients.mutex.unlock_shared();
for (auto &worker: worker_stubs) {
ledger->worker_list.push_back(worker.first);
}
}

#ifdef DISTRIBICOM_DEBUG
create_res_matrix(db, all_clients, expansion_key, ledger);
#endif
if(rnd==1){
grpc::ClientContext context;
utils::add_metadata_size(context, services::constants::round_md, rnd);
utils::add_metadata_size(context, services::constants::epoch_md, epoch);

send_queries(all_clients, context);
}

send_db(db, context);
grpc::ClientContext context1;
utils::add_metadata_size(context1, services::constants::round_md, rnd);
utils::add_metadata_size(context1, services::constants::epoch_md, epoch);

send_db(db, context1);

return ledger;
}
Expand Down Expand Up @@ -193,7 +198,7 @@ namespace services {

distribicom::Ack response;
distribicom::WorkerTaskPart part;
std::unique_lock lock(mtx);
std::shared_lock lock(mtx);

for (auto &worker: worker_stubs) {
{ // this stream is released at the end of this scope.
Expand Down Expand Up @@ -233,7 +238,7 @@ namespace services {
distribicom::Ack response;
distribicom::WorkerTaskPart part;
std::shared_lock client_db_lock(all_clients.mutex);
std::unique_lock lock(mtx);
std::shared_lock lock(mtx);
for (auto &worker: worker_stubs) {
{ // this stream is released at the end of this scope.
auto stream = worker.second->SendTask(&context, &response);
Expand Down Expand Up @@ -330,7 +335,7 @@ namespace services {
distribicom::WorkerTaskPart prt;

std::shared_lock client_db_lock(all_clients.mutex);
std::unique_lock lock(mtx);
std::shared_lock lock(mtx);

for (auto &worker: worker_stubs) {
{ // this stream is released at the end of this scope.
Expand Down Expand Up @@ -410,7 +415,7 @@ namespace services {

void Manager::map_workers_to_responsibilities(std::uint64_t num_rows, std::uint64_t num_queries) {
int i = 0;
std::unique_lock lock(mtx);
std::shared_lock lock(mtx);
auto num_queries_per_worker = query_count_per_worker(worker_stubs.size(), num_rows, num_queries);

for(auto &worker: worker_stubs){
Expand Down
6 changes: 4 additions & 2 deletions src/services/manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ namespace services {
private:
distribicom::AppConfigs app_configs;

std::mutex mtx; // todo: use shared_mtx,
std::shared_mutex mtx; // todo: use shared_mtx,
std::map<std::string, std::unique_ptr<distribicom::Worker::Stub>> worker_stubs;

std::map<std::string, WorkerInfo> worker_name_to_work_responsible_for; // todo: refactor into struct
Expand Down Expand Up @@ -120,7 +120,9 @@ namespace services {
const seal::GaloisKeys &expansion_key,
std::shared_ptr<WorkDistributionLedger> &ledger) const;

// assumes num workers map well to db and queries
/**
* assumes num workers map well to db and queries
*/
void map_workers_to_responsibilities(uint64_t num_rows, uint64_t num_queries);

void send_galois_keys(const ClientDB &all_clients);
Expand Down
2 changes: 1 addition & 1 deletion src/services/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ std::shared_ptr<services::WorkDistributionLedger> services::FullServer::distribu

ledger = manager.distribute_work(db_handle.mat, client_query_manager, 1, 1,
#ifdef DISTRIBICOM_DEBUG
gal_keys.many_reads().mat.data[0]
client_query_manager.id_to_info.begin()->second->galois_keys
#endif
);
}
Expand Down
2 changes: 1 addition & 1 deletion src/services/worker_strategy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ namespace services::work_strategy {
void process_task(WorkerServiceTask &&task) override {
// expand all queries.
expand_queries(task);

if(task.ptx_rows.empty()){ return;}
auto answer = multiply_rows(task);

send_response(task, answer);
Expand Down

0 comments on commit ecda72a

Please sign in to comment.