Skip to content

Commit

Permalink
reorg streaming and rm for engine
Browse files Browse the repository at this point in the history
  • Loading branch information
yanchengnv committed Dec 11, 2024
1 parent 8db69a6 commit 6880ac2
Show file tree
Hide file tree
Showing 13 changed files with 240 additions and 567 deletions.
84 changes: 84 additions & 0 deletions nvflare/apis/aux_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
# limitations under the License.

import enum
from abc import ABC, abstractmethod
from typing import Dict

from .fl_context import FLContext
from .shareable import Shareable
Expand Down Expand Up @@ -44,3 +46,85 @@ def aux_request_handle_func_signature(topic: str, request: Shareable, fl_ctx: FL
"""
pass


class AuxMessenger(ABC):
@abstractmethod
def register_aux_message_handler(self, topic: str, message_handle_func):
"""Register aux message handling function with specified topics.
Exception is raised when:
a handler is already registered for the topic;
bad topic - must be a non-empty string
bad message_handle_func - must be callable
Implementation Note:
This method should simply call the ServerAuxRunner's register_aux_message_handler method.
Args:
topic: the topic to be handled by the func
message_handle_func: the func to handle the message. Must follow aux_message_handle_func_signature.
"""
pass

@abstractmethod
def send_aux_request(
self,
targets: [],
topic: str,
request: Shareable,
timeout: float,
fl_ctx: FLContext,
optional=False,
secure=False,
) -> dict:
"""Send a request to specified clients via the aux channel.
Implementation: simply calls the AuxRunner's send_aux_request method.
Args:
targets: target clients. None or empty list means all clients.
topic: topic of the request.
request: request to be sent
timeout: number of secs to wait for replies. 0 means fire-and-forget.
fl_ctx: FL context
optional: whether this message is optional
secure: send the aux request in a secure way
Returns: a dict of replies (client name => reply Shareable)
"""
pass

@abstractmethod
def multicast_aux_requests(
self,
topic: str,
target_requests: Dict[str, Shareable],
timeout: float,
fl_ctx: FLContext,
optional: bool = False,
secure: bool = False,
) -> dict:
"""Send requests to specified clients via the aux channel.
Implementation: simply calls the AuxRunner's multicast_aux_requests method.
Args:
topic: topic of the request
target_requests: requests of the target clients. Different target can have different request.
timeout: amount of time to wait for responses. 0 means fire and forget.
fl_ctx: FL context
optional: whether this request is optional
secure: whether to send the aux request in P2P secure
Returns: a dict of replies (client name => reply Shareable)
"""
pass

def fire_and_forget_aux_request(
self, targets: [], topic: str, request: Shareable, fl_ctx: FLContext, optional=False, secure=False
) -> dict:
return self.send_aux_request(targets, topic, request, 0.0, fl_ctx, optional, secure=secure)
2 changes: 1 addition & 1 deletion nvflare/apis/rm.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ def send_reliable_request(
per_msg_timeout: float,
tx_timeout: float,
fl_ctx: FLContext,
secure=False,
optional=False,
secure=False,
) -> Shareable:
"""Send a reliable request.
Expand Down
84 changes: 2 additions & 82 deletions nvflare/apis/server_engine_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Tuple

from nvflare.apis.shareable import Shareable
from nvflare.apis.aux_spec import AuxMessenger
from nvflare.widgets.widget import Widget

from .client import Client
Expand All @@ -26,7 +26,7 @@
from .workspace import Workspace


class ServerEngineSpec(EngineSpec, ABC):
class ServerEngineSpec(EngineSpec, AuxMessenger, ABC):
@abstractmethod
def fire_event(self, event_type: str, fl_ctx: FLContext):
pass
Expand Down Expand Up @@ -84,86 +84,6 @@ def get_component(self, component_id: str) -> object:
"""
pass

@abstractmethod
def register_aux_message_handler(self, topic: str, message_handle_func):
"""Register aux message handling function with specified topics.
Exception is raised when:
a handler is already registered for the topic;
bad topic - must be a non-empty string
bad message_handle_func - must be callable
Implementation Note:
This method should simply call the ServerAuxRunner's register_aux_message_handler method.
Args:
topic: the topic to be handled by the func
message_handle_func: the func to handle the message. Must follow aux_message_handle_func_signature.
"""
pass

@abstractmethod
def send_aux_request(
self,
targets: [],
topic: str,
request: Shareable,
timeout: float,
fl_ctx: FLContext,
optional=False,
secure=False,
) -> dict:
"""Send a request to specified clients via the aux channel.
Implementation: simply calls the AuxRunner's send_aux_request method.
Args:
targets: target clients. None or empty list means all clients.
topic: topic of the request.
request: request to be sent
timeout: number of secs to wait for replies. 0 means fire-and-forget.
fl_ctx: FL context
optional: whether this message is optional
secure: send the aux request in a secure way
Returns: a dict of replies (client name => reply Shareable)
"""
pass

@abstractmethod
def multicast_aux_requests(
self,
topic: str,
target_requests: Dict[str, Shareable],
timeout: float,
fl_ctx: FLContext,
optional: bool = False,
secure: bool = False,
) -> dict:
"""Send requests to specified clients via the aux channel.
Implementation: simply calls the AuxRunner's multicast_aux_requests method.
Args:
topic: topic of the request
target_requests: requests of the target clients. Different target can have different request.
timeout: amount of time to wait for responses. 0 means fire and forget.
fl_ctx: FL context
optional: whether this request is optional
secure: whether to send the aux request in P2P secure
Returns: a dict of replies (client name => reply Shareable)
"""
pass

def fire_and_forget_aux_request(
self, targets: [], topic: str, request: Shareable, fl_ctx: FLContext, optional=False, secure=False
) -> dict:
return self.send_aux_request(targets, topic, request, 0.0, fl_ctx, optional, secure=secure)

@abstractmethod
def get_widget(self, widget_id: str) -> Widget:
"""Get the widget with the specified ID.
Expand Down
1 change: 0 additions & 1 deletion nvflare/app_opt/job_launcher/docker_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ class DOCKER_STATE:


class DockerJobHandle(JobHandleSpec):

def __init__(self, container, timeout=None):
super().__init__()

Expand Down
Loading

0 comments on commit 6880ac2

Please sign in to comment.