Skip to content

Commit

Permalink
server: put in stubs for client connections, start work on bookeeping…
Browse files Browse the repository at this point in the history
… for work distribution
  • Loading branch information
elkanatovey committed Nov 30, 2022
1 parent 80f53c6 commit 7649d7f
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 3 deletions.
7 changes: 7 additions & 0 deletions src/internals/protos/distribicom.proto
Original file line number Diff line number Diff line change
Expand Up @@ -204,4 +204,11 @@ message WorkerConfigs {

// the port the worker is listening to.
uint32 workerPort = 2;
};

// used to group info at server to send onward to client
message QueryInfo {
uint32 client_mailbox = 1;
bytes galois_keys = 2;
ClientQueryRequest query = 3; // this is single ctx for now
};
2 changes: 1 addition & 1 deletion src/services/manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ namespace services {
// todo:
// void distribute_work(const math_utils::matrix<seal::Plaintext> &db);

// todo:
// todo: break up query distribution, create unified structure for id lookups, modify ledger accoringly

std::unique_ptr<WorkDistributionLedger> distribute_work(
const math_utils::matrix<seal::Plaintext> &db,
Expand Down
49 changes: 47 additions & 2 deletions src/services/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,58 @@ void services::FullServer::finish_construction(const distribicom::AppConfigs &ap
grpc::Status
services::FullServer::RegisterAsClient(grpc::ServerContext *context, const distribicom::ClientRegistryRequest *request,
distribicom::ClientRegistryReply *response) {
return Service::RegisterAsClient(context, request, response);


try {

auto requesting_client = utils::extract_ipv4(context);
std::string subscribing_client_address = context->auth_context()->GetPeerIdentityPropertyName(); //@todo see if valid

// creating stub to the client:
auto client_conn = std::make_unique<distribicom::Client::Stub>(distribicom::Client::Stub(
grpc::CreateChannel(
subscribing_client_address,
grpc::InsecureChannelCredentials()
)
));
auto client_info = std::make_unique<services::ClientInfo>(ClientInfo());
client_info->client_info_marshaled.set_galois_keys(request->galois_keys());

registration_mutex.lock();
if (client_stubs.find(requesting_client) == client_stubs.end()) {
client_info->client_info_marshaled.set_client_mailbox(client_counter);
id_to_mailbox.insert({requesting_client ,client_counter});
query_bookeeper.insert({client_counter, std::move(client_info)});

client_stubs.insert({requesting_client, std::move(client_conn)});
response->set_mailbox_id(client_counter);
client_counter+=1;


}
registration_mutex.unlock();


} catch (std::exception &e) {
std::cout << "Error: " << e.what() << std::endl;
return {grpc::StatusCode::INTERNAL, e.what()};
}

response->set_num_mailboxes(pir_configs.number_of_elements());

return grpc::Status::OK;
}

//@todo this assumes that no one is registering, very dangerous
grpc::Status
services::FullServer::StoreQuery(grpc::ServerContext *context, const distribicom::ClientQueryRequest *request,
distribicom::Ack *response) {
return Service::StoreQuery(context, request, response);

auto id = context->auth_context()->GetPeerIdentityPropertyName();
auto num_id = id_to_mailbox[id];
query_bookeeper[num_id]->client_info_marshaled.CopyFrom(*request);
response->set_success(true);
return grpc::Status::OK;
}

//@todo fix this to translate bytes to coeffs properly when writing
Expand Down
19 changes: 19 additions & 0 deletions src/services/server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,16 @@
#include <future>

namespace services {
/**
* ClientInfo stores objects related to an individual client
*/
struct ClientInfo {
distribicom::QueryInfo client_info_marshaled;
std::string stub_id;
PirQuery query;
seal::GaloisKeys galois_keys;
};

// uses both the Manager and the Server services to complete a full distribicom server.
class FullServer final : public distribicom::Server::Service{
// used for tests
Expand All @@ -25,6 +35,15 @@ namespace services {
std::unique_ptr<PIRClient> client;

// concurrency stuff
std::map<std::string, std::unique_ptr<distribicom::Client::Stub>> client_stubs;
std::map<std::string, std::uint32_t> id_to_mailbox;
std::map<std::uint32_t, std::unique_ptr<ClientInfo>> query_bookeeper;
std::mutex registration_mutex;
std::uint64_t client_counter=0;




std::vector<std::future<int>> db_write_requests;

public:
Expand Down

0 comments on commit 7649d7f

Please sign in to comment.