Skip to content

Commit

Permalink
manager.cpp: continue work on integration
Browse files Browse the repository at this point in the history
  • Loading branch information
elkanatovey committed Dec 6, 2022
1 parent e48447a commit 1a96c9e
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 26 deletions.
28 changes: 19 additions & 9 deletions src/services/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ namespace services {
::distribicom::Ack *response) {
try {

auto requesting_worker = utils::extract_ip(context);
std::string subscribing_worker_address = requesting_worker + ":" + std::to_string(request->workerport());
auto requesting_worker = utils::extract_ip(context);
std::string subscribing_worker_address = "localhost:52100";

// creating client to the worker:
auto worker_conn = std::make_unique<distribicom::Worker::Stub>(distribicom::Worker::Stub(
Expand Down Expand Up @@ -114,7 +114,7 @@ namespace services {

std::shared_ptr<WorkDistributionLedger>
Manager::distribute_work(const math_utils::matrix<seal::Plaintext> &db,
const math_utils::matrix<seal::Ciphertext> &compressed_queries,
const ClientDB &all_clients,
int rnd, int epoch,
#ifdef DISTRIBICOM_DEBUG
const seal::GaloisKeys &expansion_key
Expand All @@ -131,31 +131,40 @@ namespace services {
ledgers.insert({{rnd, epoch}, ledger});
ledger->worker_list = std::vector<std::string>();
ledger->worker_list.reserve(worker_stubs.size());
all_clients.mutex.lock_shared();
ledger->result_mat = math_utils::matrix<seal::Ciphertext>(
db.cols, compressed_queries.data.size());
db.cols, all_clients.client_counter);
all_clients.mutex.unlock_shared();
for (auto &worker: worker_stubs) {
ledger->worker_list.push_back(worker.first);
}

#ifdef DISTRIBICOM_DEBUG
create_res_matrix(db, compressed_queries, expansion_key, ledger);
create_res_matrix(db, all_clients, expansion_key, ledger);
#endif
if(rnd==1){
send_queries(all_clients, context);
}

send_db(db, context);

sendtask(db, compressed_queries, context, ledger);
return ledger;
}


void Manager::create_res_matrix(const math_utils::matrix<seal::Plaintext> &db,
const math_utils::matrix<seal::Ciphertext> &compressed_queries,
const ClientDB &all_clients,
const seal::GaloisKeys &expansion_key,
std::shared_ptr<WorkDistributionLedger> &ledger) const {
#ifdef DISTRIBICOM_DEBUG
auto exp = expansion_key;
auto expand_sz = db.cols;
math_utils::matrix<seal::Ciphertext> query_mat(expand_sz, compressed_queries.data.size());

all_clients.mutex.lock_shared();
math_utils::matrix<seal::Ciphertext> query_mat(expand_sz, all_clients.client_counter);
auto col = -1;
for (auto &c_query: compressed_queries.data) {
for (const auto &client: all_clients.id_to_info) {
auto c_query = client.second->query[0][0];
col++;
std::vector<seal::Ciphertext> quer(1);
quer[0] = c_query;
Expand All @@ -165,6 +174,7 @@ namespace services {
query_mat(i, col) = expanded[i];
}
}
all_clients.mutex.unlock_shared();

matops->to_ntt(query_mat.data);

Expand Down
4 changes: 2 additions & 2 deletions src/services/manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ namespace services {

std::shared_ptr<WorkDistributionLedger> distribute_work(
const math_utils::matrix<seal::Plaintext> &db,
const math_utils::matrix<seal::Ciphertext> &compressed_queries,
const ClientDB &all_clients,
int rnd,
int epoch,
#ifdef DISTRIBICOM_DEBUG
Expand All @@ -116,7 +116,7 @@ namespace services {


void create_res_matrix(const math_utils::matrix<seal::Plaintext> &db,
const math_utils::matrix<seal::Ciphertext> &compressed_queries,
const ClientDB &all_clients,
const seal::GaloisKeys &expansion_key,
std::shared_ptr<WorkDistributionLedger> &ledger) const;

Expand Down
9 changes: 5 additions & 4 deletions src/services/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,11 @@ std::shared_ptr<services::WorkDistributionLedger> services::FullServer::distribu
// block is to destroy the db and querries handles.
{
auto db_handle = db.many_reads();
auto queries_handle = queries.many_reads();
// todo: set specific round and handle.
// auto queries_handle = queries.many_reads();
// // todo: set specific round and handle.


ledger = manager.distribute_work(db_handle.mat, queries_handle.mat, 1, 1,
ledger = manager.distribute_work(db_handle.mat, client_query_manager, 1, 1,
#ifdef DISTRIBICOM_DEBUG
gal_keys.many_reads().mat.data[0]
#endif
Expand All @@ -140,7 +140,8 @@ void services::FullServer::start_epoch() {

//todo: start epoch for registered clients as well -> make them send queries.
auto handle = gal_keys.many_reads();
manager.send_galois_keys(handle.mat);
manager.send_galois_keys(client_query_manager);
// manager.send_galois_keys(handle.mat);
// wait_for_workers(0); todo: wait for app_configs.num_workers
}

Expand Down
22 changes: 11 additions & 11 deletions test/services/worker_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,24 +86,24 @@ full_server_instance(std::shared_ptr<TestUtils::CryptoObjects> &all, const distr
auto n = 5;
math_utils::matrix<seal::Plaintext> db(n, n);
// a single row of ctxs and their respective gal_key.
math_utils::matrix<seal::Ciphertext> queries(1, n);
math_utils::matrix<seal::GaloisKeys> gal_keys(1, n);
// math_utils::matrix<seal::Ciphertext> queries(1, n);
// math_utils::matrix<seal::GaloisKeys> gal_keys(1, n);
for (auto &p: db.data) {
p = all->random_plaintext();
}

for (auto &q: queries.data) {
q = all->random_ciphertext();
}

for (auto &g: gal_keys.data) {
g = all->gal_keys;
}
// for (auto &q: queries.data) {
// q = all->random_ciphertext();
// }
//
// for (auto &g: gal_keys.data) {
// g = all->gal_keys;
// }
auto cdb = create_client_db(n, all);
auto foo = services::FullServer(db, cdb, configs);
// auto foo = services::FullServer(db, cdb, configs);


return services::FullServer(db, queries, gal_keys, configs);
return services::FullServer(db, cdb, configs);
}


Expand Down

0 comments on commit 1a96c9e

Please sign in to comment.