Skip to content

Commit

Permalink
add method to flex base class for acquiring local grpc client
Browse files Browse the repository at this point in the history
Signed-off-by: kta-intel <[email protected]>
  • Loading branch information
kta-intel committed Dec 17, 2024
1 parent 8020a2f commit 700b21b
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 2 deletions.
2 changes: 1 addition & 1 deletion openfl/component/aggregator/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,7 @@ def stop_flex(self):
def get_local_grpc_client(self):
if not self.is_flex_available():
raise RuntimeError("Federated Learning exchange as not been enabled.")
return self.flex.local_grpc_client
return self.flex.get_local_grpc_client()

def _end_of_round_with_stragglers_check(self):
"""
Expand Down
8 changes: 8 additions & 0 deletions openfl/component/interoperability/flex.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ def __init__(self, command: list[str], **kwargs):
Args:
command (list[str]): The command to run the server as a subprocess.
"""
self.local_grpc_client = None
self._command = command
self._process = None

Expand All @@ -40,3 +41,10 @@ def stop(self):
logger.info("[FLEX] Subprocess stopped.")
else:
logger.info("[FLEX] No subprocess is currently running.")


def get_local_grpc_client(self):
"""
Get the local gRPC client.
"""
return self.local_grpc_client
1 change: 0 additions & 1 deletion openfl/federated/task/runner_flower.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from flwr.proto import grpcadapter_pb2_grpc
from multiprocessing import cpu_count
from openfl.federated.task.runner import TaskRunner
from openfl.transport import AggregatorGRPCClient
from openfl.transport.grpc.flex.flower.local_grpc_server import LocalGRPCServer
import subprocess

Expand Down
1 change: 1 addition & 0 deletions openfl/transport/grpc/aggregator_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,7 @@ def send_message_to_server(self, flower_message, collaborator_name):
The response from the OpenFL server, converted back to a Flower message.
"""
self._set_header(collaborator_name)
#TODO: use a general to/from openfl_message function with "add_header" option, do the message conversion before calling send_message_to_server
openfl_message = flower_to_openfl_message(flower_message,
header=self.header)
openfl_response = self.stub.PelicanDrop(openfl_message)
Expand Down
1 change: 1 addition & 0 deletions openfl/transport/grpc/flex/flower/local_grpc_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def process_queue(self):
while True:
request, response_queue = self.request_queue.get()
# Send request to the OpenFL server
# TODO: do message conversions here
flower_response = self.openfl_client.send_message_to_server(request, self.collaborator_name)
# Send response to Flower client
response_queue.put(flower_response)
Expand Down

0 comments on commit 700b21b

Please sign in to comment.