Skip to content

Commit

Permalink
Honor optional flag at streaming level (#2771)
Browse files Browse the repository at this point in the history
* Updated FOBS readme to add DatumManager, added agrpcs as secure scheme

* Added optional flag support in streaming layer
  • Loading branch information
nvidianz authored Aug 8, 2024
1 parent 4c6d19d commit fe56bd7
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 1 deletion.
12 changes: 11 additions & 1 deletion nvflare/fuel/f3/streaming/byte_receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,17 @@ def stop_task(self, task: RxTask, error: StreamError = None, notify=True):
self.rx_task_map.pop(task.sid, None)

if error:
log.error(f"Stream error: {error}")
if task.headers:
optional = task.headers.get(MessageHeaderKey.OPTIONAL, False)
else:
optional = False

msg = f"Stream error: {error}"
if optional:
log.debug(msg)
else:
log.error(msg)

task.stream_future.set_exception(error)

if notify:
Expand Down
1 change: 1 addition & 0 deletions nvflare/fuel/f3/streaming/byte_streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ def _transmit(self, task: TxTask, final=False):
StreamHeaderKey.DATA_TYPE: StreamDataType.FINAL if final else StreamDataType.CHUNK,
StreamHeaderKey.SEQUENCE: task.seq,
StreamHeaderKey.OFFSET: task.offset,
StreamHeaderKey.OPTIONAL: task.optional,
}
)

Expand Down
1 change: 1 addition & 0 deletions nvflare/fuel/f3/streaming/stream_const.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,4 @@ class StreamHeaderKey:
OBJECT_INDEX = STREAM_PREFIX + "oi"
STREAM_REQ_ID = STREAM_PREFIX + "ri"
PAYLOAD_ENCODING = STREAM_PREFIX + "pe"
OPTIONAL = STREAM_PREFIX + "op"

0 comments on commit fe56bd7

Please sign in to comment.