Skip to content

Commit

Permalink
manager: fix bug and factor out old db constructs
Browse files Browse the repository at this point in the history
server: factor out old db constructs
proto: minor update to unsigned integer types for consistency
  • Loading branch information
elkanatovey committed Dec 6, 2022
1 parent ecda72a commit e54a9d2
Show file tree
Hide file tree
Showing 8 changed files with 22 additions and 140 deletions.
8 changes: 4 additions & 4 deletions src/internals/protos/distribicom.proto
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ message MatrixPart {
Plaintext ptx = 2;
}

int32 row = 3;
int32 col = 4;
uint32 row = 3;
uint32 col = 4;
}

message QueryCiphertext {
Expand All @@ -90,7 +90,7 @@ message QueryCiphertext {
message GaloisKeys{
bytes keys = 1;
// should point to the matching ciphertext.
int64 key_pos = 2;
uint64 key_pos = 2;
}

message Ciphertext {bytes data = 1;}
Expand Down Expand Up @@ -145,7 +145,7 @@ message ClientConfigs {


message TellNewRoundRequest {
int32 round = 1;
uint32 round = 1;
}

message WorkerRegistryRequest {
Expand Down
4 changes: 2 additions & 2 deletions src/services/db.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#include "db.hpp"
namespace services{
template<>
void DB<seal::Plaintext>::write(const vector<uint64_t> &new_element, const int ptx_num, uint64_t offset,
void DB<seal::Plaintext>::write(const std::vector<std::uint64_t> &new_element, const std::uint32_t ptx_num, std::uint64_t offset,
PIRClient *replacer_machine) {
mtx.lock();
memory_matrix.data[ptx_num] = replacer_machine->replace_element(memory_matrix.data[ptx_num], new_element,
Expand All @@ -11,7 +11,7 @@ namespace services{

template<typename T>
void
DB<T>::write(const vector<uint64_t> &new_element, const int ptx_num, uint64_t offset, PIRClient *replacer_machine) {
DB<T>::write(const std::vector<std::uint64_t> &new_element, const std::uint32_t ptx_num, std::uint64_t offset, PIRClient *replacer_machine) {
throw std::runtime_error("unsupported type");
}

Expand Down
6 changes: 3 additions & 3 deletions src/services/db.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ namespace services{

public:
// TODO: add a proper way to create PIR DB.
explicit DB(const int a, const int b) : memory_matrix(), mtx() {}
explicit DB(const std::uint32_t a, const std::uint32_t b) : memory_matrix(), mtx() {}

// used mainly for testing.
explicit DB(math_utils::matrix<T> &mat) : memory_matrix(mat), mtx() {}
Expand All @@ -38,14 +38,14 @@ namespace services{
return shared_mat(memory_matrix, mtx);
}

void write(const T &t, const int row, const int col) {
void write(const T &t, const std::uint32_t row, const std::uint32_t col) {
mtx.lock();
memory_matrix(row, col) = t;
mtx.unlock();
}

void
write(const std::vector<uint64_t> &new_element, const int ptx_num, uint64_t offset, PIRClient
write(const std::vector<uint64_t> &new_element, const std::uint32_t ptx_num, uint64_t offset, PIRClient
*replacer_machine);


Expand Down
92 changes: 6 additions & 86 deletions src/services/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ namespace services {

mtx.lock_shared();
auto exists = worker_stubs.find(sending_worker) != worker_stubs.end();
mtx.lock_shared();
mtx.unlock_shared();

if (!exists) {
return {grpc::StatusCode::INVALID_ARGUMENT, "worker not registered"};
Expand All @@ -75,8 +75,8 @@ namespace services {
// TODO: should verify the incoming data - corresponding to the expected {ctx, row, col} from each worker.
distribicom::MatrixPart tmp;
while (reader->Read(&tmp)) {
int row = tmp.row();
int col = tmp.col();
std::uint32_t row = tmp.row();
std::uint32_t col = tmp.col();

#ifdef DISTRIBICOM_DEBUG
matops->w_evaluator->evaluator->sub_inplace(
Expand Down Expand Up @@ -207,7 +207,7 @@ namespace services {

for(const auto &db_row: db_rows) {
// first send db
for (int j = 0; j < int(db.cols); ++j) {
for (std::uint32_t j = 0; j < db.cols; ++j) {
std::string payload = marshalled_db(db_row, j);

part.mutable_matrixpart()->set_row(db_row);
Expand Down Expand Up @@ -270,56 +270,6 @@ namespace services {
}


std::shared_ptr<WorkDistributionLedger>
Manager::sendtask(const math_utils::matrix<seal::Plaintext> &db,
const math_utils::matrix<seal::Ciphertext> &compressed_queries,
grpc::ClientContext &context, std::shared_ptr<WorkDistributionLedger> ledger) {

// TODO: write into map


distribicom::Ack response;
distribicom::WorkerTaskPart part;
for (auto &worker: worker_stubs) {
{ // this stream is released at the end of this scope.
auto stream = worker.second->SendTask(&context, &response);
for (int i = 0; i < int(db.rows); ++i) {
for (int j = 0; j < int(db.cols); ++j) {
std::string payload = marshal->marshal_seal_object(db(i, j));

part.mutable_matrixpart()->set_row(i);
part.mutable_matrixpart()->set_col(j);
part.mutable_matrixpart()->mutable_ptx()->set_data(payload);

stream->Write(part);
part.mutable_matrixpart()->clear_ptx();
}
}

for (int i = 0; i < int(compressed_queries.data.size()); ++i) {

std::string payload = marshal->marshal_seal_object(compressed_queries.data[i]);
part.mutable_matrixpart()->set_row(0);
part.mutable_matrixpart()->set_col(i);
part.mutable_matrixpart()->mutable_ctx()->set_data(payload);

stream->Write(part);
part.mutable_matrixpart()->clear_ctx();

}

stream->WritesDone();
auto status = stream->Finish();
if (!status.ok()) {
std::cout << "manager:: distribute_work:: transmitting db to " << worker.first <<
" failed: " << status.error_message() << std::endl;
continue;
}
}
}
return ledger;
}

void Manager::wait_for_workers(int i) {
worker_counter.wait_for(i);
}
Expand Down Expand Up @@ -358,36 +308,6 @@ namespace services {
}
}

void Manager::send_galois_keys(const math_utils::matrix<seal::GaloisKeys> &matrix) {
grpc::ClientContext context;

utils::add_metadata_size(context, services::constants::round_md, 1);
utils::add_metadata_size(context, services::constants::epoch_md, 1);

distribicom::Ack response;
distribicom::WorkerTaskPart prt;
std::unique_lock lock(mtx);

for (auto &worker: worker_stubs) {
{ // this stream is released at the end of this scope.
auto stream = worker.second->SendTask(&context, &response);
for (int i = 0; i < int(matrix.cols); ++i) {
prt.mutable_gkey()->set_key_pos(i);
prt.mutable_gkey()->mutable_keys()->assign(
marshal->marshal_seal_object(matrix(0, i))
);
stream->Write(prt);
}
stream->WritesDone();
auto status = stream->Finish();
if (!status.ok()) {
std::cout << "manager:: distribute_work:: transmitting galois gal_key to " << worker.first <<
" failed: " << status.error_message() << std::endl;
continue;
}
}
}
}

std::vector<std::uint64_t> get_row_id_to_work_with(std::uint64_t id, std::uint64_t num_rows){
auto row_id = id%num_rows;
Expand All @@ -414,7 +334,7 @@ namespace services {
}

void Manager::map_workers_to_responsibilities(std::uint64_t num_rows, std::uint64_t num_queries) {
int i = 0;
std::uint32_t i = 0;
std::shared_lock lock(mtx);
auto num_queries_per_worker = query_count_per_worker(worker_stubs.size(), num_rows, num_queries);

Expand All @@ -424,7 +344,7 @@ namespace services {
#ifdef DISTRIBICOM_DEBUG
if(worker_stubs.size()==1){
std::vector<std::uint64_t> temp(num_rows);
for(int j=0; j<num_rows; j++){
for(std::uint32_t j=0; j<num_rows; j++){
temp[j] =j;
}
this->worker_name_to_work_responsible_for[worker.first].db_rows = std::move(temp);
Expand Down
14 changes: 1 addition & 13 deletions src/services/manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ namespace services {
private:
distribicom::AppConfigs app_configs;

std::shared_mutex mtx; // todo: use shared_mtx,
std::shared_mutex mtx;
std::map<std::string, std::unique_ptr<distribicom::Worker::Stub>> worker_stubs;

std::map<std::string, WorkerInfo> worker_name_to_work_responsible_for; // todo: refactor into struct
Expand Down Expand Up @@ -87,11 +87,6 @@ namespace services {
ReturnLocalWork(::grpc::ServerContext *context, ::grpc::ServerReader<::distribicom::MatrixPart> *reader,
::distribicom::Ack *response) override;

// This one is the holder of the DB.


// todo:
// void distribute_work(const math_utils::matrix<seal::Plaintext> &db);

// todo: break up query distribution, create unified structure for id lookups, modify ledger accoringly

Expand All @@ -105,15 +100,8 @@ namespace services {
#endif
);

std::shared_ptr<WorkDistributionLedger> sendtask(const math_utils::matrix<seal::Plaintext> &db,
const math_utils::matrix<seal::Ciphertext> &compressed_queries,
grpc::ClientContext &context,
std::shared_ptr<WorkDistributionLedger> ptr);

void wait_for_workers(int i);

void send_galois_keys(const math_utils::matrix<seal::GaloisKeys> &matrix);


void create_res_matrix(const math_utils::matrix<seal::Plaintext> &db,
const ClientDB &all_clients,
Expand Down
17 changes: 4 additions & 13 deletions src/services/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,19 @@
#include "client_context.hpp"


services::FullServer::FullServer(math_utils::matrix<seal::Plaintext> &db, math_utils::matrix<seal::Ciphertext> &queries,
math_utils::matrix<seal::GaloisKeys> &gal_keys,
const distribicom::AppConfigs &app_configs) :
db(db), queries(queries), gal_keys(gal_keys), manager(app_configs) {
finish_construction(app_configs);

}

services::FullServer::FullServer(math_utils::matrix<seal::Plaintext> &db, std::map<uint32_t,
std::unique_ptr<services::ClientInfo>> &client_db,
const distribicom::AppConfigs &app_configs) :
db(db),queries(0, 0), gal_keys(0, 0), manager(app_configs) {
db(db), manager(app_configs) {
this->client_query_manager.client_counter = client_db.size();
this->client_query_manager.id_to_info = std::move(client_db);
finish_construction(app_configs);

}

services::FullServer::FullServer(const distribicom::AppConfigs &app_configs) :
db(0, 0), queries(0, 0), gal_keys(0, 0), manager(app_configs) {
db(app_configs.configs().db_rows(), app_configs.configs().db_cols()), manager(app_configs) {
finish_construction(app_configs);

}
Expand Down Expand Up @@ -116,10 +109,10 @@ grpc::Status services::FullServer::WriteToDB(grpc::ServerContext *context, const
std::shared_ptr<services::WorkDistributionLedger> services::FullServer::distribute_work() {
std::shared_ptr<services::WorkDistributionLedger> ledger;

// block is to destroy the db and querries handles.
// block is to destroy the db handle.
{
auto db_handle = db.many_reads();
// auto queries_handle = queries.many_reads();

// // todo: set specific round and handle.


Expand All @@ -139,9 +132,7 @@ void services::FullServer::start_epoch() {
client_query_manager.mutex.unlock_shared();

//todo: start epoch for registered clients as well -> make them send queries.
auto handle = gal_keys.many_reads();
manager.send_galois_keys(client_query_manager);
// manager.send_galois_keys(handle.mat);
// wait_for_workers(0); todo: wait for app_configs.num_workers
}

Expand Down
8 changes: 1 addition & 7 deletions src/services/server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ namespace services {
class FullServer final : public distribicom::Server::Service {
// used for tests
services::DB<seal::Plaintext> db;
services::DB<seal::Ciphertext> queries;
services::DB<seal::GaloisKeys> gal_keys;

// using composition to implement the interface of the manager.
services::Manager manager;
Expand All @@ -34,18 +32,14 @@ namespace services {
std::vector<std::future<int>> db_write_requests;

public:
// mainly for testing.
explicit FullServer(math_utils::matrix<seal::Plaintext> &db,
std::map<uint32_t, std::unique_ptr<services::ClientInfo>> &client_db,
const distribicom::AppConfigs &app_configs);

explicit FullServer(const distribicom::AppConfigs &app_configs);


// mainly for testing.
explicit FullServer(math_utils::matrix<seal::Plaintext> &db,
math_utils::matrix<seal::Ciphertext> &queries,
math_utils::matrix<seal::GaloisKeys> &gal_keys,
const distribicom::AppConfigs &app_configs);;

grpc::Status
RegisterAsClient(grpc::ServerContext *context, const distribicom::ClientRegistryRequest *request,
Expand Down
13 changes: 1 addition & 12 deletions test/services/worker_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,23 +85,12 @@ services::FullServer
full_server_instance(std::shared_ptr<TestUtils::CryptoObjects> &all, const distribicom::AppConfigs &configs) {
auto n = 5;
math_utils::matrix<seal::Plaintext> db(n, n);
// a single row of ctxs and their respective gal_key.
// math_utils::matrix<seal::Ciphertext> queries(1, n);
// math_utils::matrix<seal::GaloisKeys> gal_keys(1, n);

for (auto &p: db.data) {
p = all->random_plaintext();
}

// for (auto &q: queries.data) {
// q = all->random_ciphertext();
// }
//
// for (auto &g: gal_keys.data) {
// g = all->gal_keys;
// }
auto cdb = create_client_db(n, all);
// auto foo = services::FullServer(db, cdb, configs);


return services::FullServer(db, cdb, configs);
}
Expand Down

0 comments on commit e54a9d2

Please sign in to comment.