Skip to content

Commit

Permalink
manager: minor change to db responsibilities
Browse files Browse the repository at this point in the history
  • Loading branch information
elkanatovey committed Dec 5, 2022
1 parent 8c079f0 commit aee2ef2
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 36 deletions.
5 changes: 5 additions & 0 deletions src/concurrency/counter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,8 @@ void concurrency::Counter::add(int delta) {
count += delta;
cv.notify_all();
}

int concurrency::Counter::val() {
std::lock_guard<std::mutex> lock(m);
return count;
}
2 changes: 2 additions & 0 deletions src/concurrency/counter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ namespace concurrency {
void add(int delta);

void wait_for(int c);

int val();
};

}
44 changes: 32 additions & 12 deletions src/services/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,18 +197,20 @@ namespace services {
for (auto &worker: worker_stubs) {
{ // this stream is released at the end of this scope.
auto stream = worker.second->SendTask(&context, &response);
auto db_row = worker_name_to_work_responsible_for[worker.first].db_row;
auto db_rows = worker_name_to_work_responsible_for[worker.first].db_rows;

// first send db
for (int j = 0; j < int(db.cols); ++j) {
std::string payload = marshalled_db(db_row, j);
for(const auto &db_row: db_rows) {
// first send db
for (int j = 0; j < int(db.cols); ++j) {
std::string payload = marshalled_db(db_row, j);

part.mutable_matrixpart()->set_row(db_row);
part.mutable_matrixpart()->set_col(j);
part.mutable_matrixpart()->mutable_ptx()->set_data(payload);
part.mutable_matrixpart()->set_row(db_row);
part.mutable_matrixpart()->set_col(j);
part.mutable_matrixpart()->mutable_ptx()->set_data(payload);

stream->Write(part);
part.mutable_matrixpart()->clear_ptx();
stream->Write(part);
part.mutable_matrixpart()->clear_ptx();
}
}
stream->WritesDone();
auto status = stream->Finish();
Expand Down Expand Up @@ -390,9 +392,9 @@ namespace services {
}
}

std::uint64_t get_row_id_to_work_with(std::uint64_t id, std::uint64_t num_rows){
std::vector<std::uint64_t> get_row_id_to_work_with(std::uint64_t id, std::uint64_t num_rows){
auto row_id = id%num_rows;
return row_id;
return {row_id};
}

std::pair<std::uint64_t, std::uint64_t> get_query_range_to_work_with(std::uint64_t worker_id, std::uint64_t num_queries, std::uint64_t num_queries_per_worker){
Expand All @@ -403,6 +405,12 @@ namespace services {
}

std::uint64_t query_count_per_worker(std::uint64_t num_workers, std::uint64_t num_rows, std::uint64_t num_queries){
#ifdef DISTRIBICOM_DEBUG
if(num_workers==1){
std::cout<<"Manager: using only 1 worker"<<std::endl;
return num_queries;}
#endif

auto num_workers_per_row = num_workers/num_rows;
auto num_queries_per_worker = num_queries / num_workers_per_row;
return num_queries_per_worker;
Expand All @@ -415,7 +423,19 @@ namespace services {

for(auto &worker: worker_stubs){
this->worker_name_to_work_responsible_for[worker.first].worker_number = i;
this->worker_name_to_work_responsible_for[worker.first].db_row = get_row_id_to_work_with(i, num_rows);
this->worker_name_to_work_responsible_for[worker.first].db_rows = get_row_id_to_work_with(i, num_rows);
#ifdef DISTRIBICOM_DEBUG
if(worker_stubs.size()==1){
std::vector<std::uint64_t> temp(num_rows);
for(int j=0; j<num_rows; j++){
temp[j] =j;
}
this->worker_name_to_work_responsible_for[worker.first].db_rows = std::move(temp);
this->worker_name_to_work_responsible_for[worker.first].query_range_start = 0;
this->worker_name_to_work_responsible_for[worker.first].query_range_end = num_queries;
return;
}
#endif
auto query_range = get_query_range_to_work_with(i, num_queries, num_queries_per_worker);

this->worker_name_to_work_responsible_for[worker.first].query_range_start = query_range.first;
Expand Down
24 changes: 1 addition & 23 deletions src/services/manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ namespace services {
std::uint64_t worker_number;
std::uint64_t query_range_start;
std::uint64_t query_range_end;
std::uint64_t db_row;
std::vector<std::uint64_t> db_rows;
};

/**
Expand Down Expand Up @@ -130,25 +130,3 @@ namespace services {
void send_queries(const ClientDB &all_clients, grpc::ClientContext &context);
};
}



//for (auto &worker: worker_stubs) {
//{ // this stream is released at the end of this scope.
//auto stream = worker.second->SendTask(&context, &response);
//auto range_start = worker_name_to_work_responsible_for[worker.first].query_range_start;
//auto range_end = worker_name_to_work_responsible_for[worker.first].query_range_end;
//
//for (std::uint64_t i = range_start; i < range_end; ++i) {
//prt.mutable_gkey()->CopyFrom(all_clients.id_to_info[i]->galois_keys_marshaled);
//stream->Write(prt);
//}
//stream->WritesDone();
//auto status = stream->Finish();
//if (!status.ok()) {
//std::cout << "manager:: distribute_work:: transmitting galois gal_key to " << worker.first <<
//" failed: " << status.error_message() << std::endl;
//continue;
//}
//}
//}
2 changes: 1 addition & 1 deletion src/services/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ std::shared_ptr<services::WorkDistributionLedger> services::FullServer::distribu

void services::FullServer::start_epoch() {
client_query_manager.mutex.lock_shared();
manager.map_workers_to_responsibilities(client_query_manager.client_counter, pir_configs.db_rows());
manager.map_workers_to_responsibilities(pir_configs.db_rows(), client_query_manager.client_counter);
client_query_manager.mutex.unlock_shared();

//todo: start epoch for registered clients as well -> make them send queries.
Expand Down

0 comments on commit aee2ef2

Please sign in to comment.