diff --git a/src/internals/protos/distribicom.proto b/src/internals/protos/distribicom.proto index 92e47fc..f8f97fe 100644 --- a/src/internals/protos/distribicom.proto +++ b/src/internals/protos/distribicom.proto @@ -204,4 +204,11 @@ message WorkerConfigs { // the port the worker is listening to. uint32 workerPort = 2; +}; + +// used to group info at server to send onward to client +message QueryInfo { + uint32 client_mailbox = 1; + bytes galois_keys = 2; + ClientQueryRequest query = 3; // this is single ctx for now }; \ No newline at end of file diff --git a/src/services/manager.hpp b/src/services/manager.hpp index e4cafc5..47b2680 100644 --- a/src/services/manager.hpp +++ b/src/services/manager.hpp @@ -66,7 +66,7 @@ namespace services { // todo: // void distribute_work(const math_utils::matrix &db); - // todo: + // todo: break up query distribution, create unified structure for id lookups, modify ledger accoringly std::unique_ptr distribute_work( const math_utils::matrix &db, diff --git a/src/services/server.cpp b/src/services/server.cpp index c6abdf9..c3aec8a 100644 --- a/src/services/server.cpp +++ b/src/services/server.cpp @@ -32,13 +32,58 @@ void services::FullServer::finish_construction(const distribicom::AppConfigs &ap grpc::Status services::FullServer::RegisterAsClient(grpc::ServerContext *context, const distribicom::ClientRegistryRequest *request, distribicom::ClientRegistryReply *response) { - return Service::RegisterAsClient(context, request, response); + + + try { + + auto requesting_client = utils::extract_ipv4(context); + std::string subscribing_client_address = context->auth_context()->GetPeerIdentityPropertyName(); //@todo see if valid + + // creating stub to the client: + auto client_conn = std::make_unique(distribicom::Client::Stub( + grpc::CreateChannel( + subscribing_client_address, + grpc::InsecureChannelCredentials() + ) + )); + auto client_info = std::make_unique(ClientInfo()); + client_info->client_info_marshaled.set_galois_keys(request->galois_keys()); + + registration_mutex.lock(); + if (client_stubs.find(requesting_client) == client_stubs.end()) { + client_info->client_info_marshaled.set_client_mailbox(client_counter); + id_to_mailbox.insert({requesting_client ,client_counter}); + query_bookeeper.insert({client_counter, std::move(client_info)}); + + client_stubs.insert({requesting_client, std::move(client_conn)}); + response->set_mailbox_id(client_counter); + client_counter+=1; + + + } + registration_mutex.unlock(); + + + } catch (std::exception &e) { + std::cout << "Error: " << e.what() << std::endl; + return {grpc::StatusCode::INTERNAL, e.what()}; + } + + response->set_num_mailboxes(pir_configs.number_of_elements()); + + return grpc::Status::OK; } +//@todo this assumes that no one is registering, very dangerous grpc::Status services::FullServer::StoreQuery(grpc::ServerContext *context, const distribicom::ClientQueryRequest *request, distribicom::Ack *response) { - return Service::StoreQuery(context, request, response); + + auto id = context->auth_context()->GetPeerIdentityPropertyName(); + auto num_id = id_to_mailbox[id]; + query_bookeeper[num_id]->client_info_marshaled.CopyFrom(*request); + response->set_success(true); + return grpc::Status::OK; } //@todo fix this to translate bytes to coeffs properly when writing diff --git a/src/services/server.hpp b/src/services/server.hpp index e7e91c9..1a88553 100644 --- a/src/services/server.hpp +++ b/src/services/server.hpp @@ -9,6 +9,16 @@ #include namespace services { + /** + * ClientInfo stores objects related to an individual client + */ + struct ClientInfo { + distribicom::QueryInfo client_info_marshaled; + std::string stub_id; + PirQuery query; + seal::GaloisKeys galois_keys; + }; + // uses both the Manager and the Server services to complete a full distribicom server. class FullServer final : public distribicom::Server::Service{ // used for tests @@ -25,6 +35,15 @@ namespace services { std::unique_ptr client; // concurrency stuff + std::map> client_stubs; + std::map id_to_mailbox; + std::map> query_bookeeper; + std::mutex registration_mutex; + std::uint64_t client_counter=0; + + + + std::vector> db_write_requests; public: