From 50e7c94c752d5a4534ecb1635f187349d6baedd4 Mon Sep 17 00:00:00 2001 From: David Sutherland Date: Thu, 30 May 2024 18:48:45 +1200 Subject: [PATCH] add workflow only endpoint, sub/unsub subscriber --- cylc/flow/data_store_mgr.py | 17 +++++++++++++++++ cylc/flow/network/resolvers.py | 2 +- cylc/flow/network/server.py | 11 +++++++++++ cylc/flow/network/subscriber.py | 10 ++++++++++ 4 files changed, 39 insertions(+), 1 deletion(-) diff --git a/cylc/flow/data_store_mgr.py b/cylc/flow/data_store_mgr.py index 9b3b39509a2..55b2b5c20ed 100644 --- a/cylc/flow/data_store_mgr.py +++ b/cylc/flow/data_store_mgr.py @@ -2705,6 +2705,23 @@ def get_entire_workflow(self): return workflow_msg + def get_workflow_only(self): + """Gather workflow summary data into a Protobuf message. + + No tasks / cycles, etc, just workflow stuff. + + Returns: + cylc.flow.data_messages_pb2.PbEntireWorkflow + + """ + + workflow_msg = PbEntireWorkflow() + workflow_msg.workflow.CopyFrom( + self.data[self.workflow_id][WORKFLOW] + ) + + return workflow_msg + def get_publish_deltas(self): """Return deltas for publishing.""" all_deltas = DELTAS_MAP[ALL_DELTAS]() diff --git a/cylc/flow/network/resolvers.py b/cylc/flow/network/resolvers.py index 07b6121805e..6b4e69dc644 100644 --- a/cylc/flow/network/resolvers.py +++ b/cylc/flow/network/resolvers.py @@ -663,7 +663,7 @@ async def subscribe_delta( import traceback logger.warning(traceback.format_exc()) finally: - self.data_store_mgr.graphql_sub_discard(sub_id) + await self.data_store_mgr.graphql_sub_discard(sub_id) for w_id in w_ids: if delta_queues.get(w_id, {}).get(sub_id): del delta_queues[w_id][sub_id] diff --git a/cylc/flow/network/server.py b/cylc/flow/network/server.py index 2c170e61198..a4bd9dcd9bc 100644 --- a/cylc/flow/network/server.py +++ b/cylc/flow/network/server.py @@ -46,6 +46,7 @@ # maps server methods to the protobuf message (for client/UIS import) PB_METHOD_MAP: Dict[str, Any] = { 'pb_entire_workflow': PbEntireWorkflow, + 'pb_workflow_only': PbEntireWorkflow, 'pb_data_elements': DELTAS_MAP } @@ -416,6 +417,16 @@ def pb_entire_workflow(self, **_kwargs) -> bytes: pb_msg = self.schd.data_store_mgr.get_entire_workflow() return pb_msg.SerializeToString() + @authorise() + @expose + def pb_workflow_only(self, **_kwargs) -> bytes: + """Send only the workflow data, not tasks etc. + + Returns serialised Protobuf message + """ + pb_msg = self.schd.data_store_mgr.get_workflow_only() + return pb_msg.SerializeToString() + @authorise() @expose def pb_data_elements(self, element_type: str, **_kwargs) -> bytes: diff --git a/cylc/flow/network/subscriber.py b/cylc/flow/network/subscriber.py index 66bd16f81f8..88b45d9c286 100644 --- a/cylc/flow/network/subscriber.py +++ b/cylc/flow/network/subscriber.py @@ -93,6 +93,16 @@ def _socket_options(self) -> None: for topic in self.topics: self.socket.setsockopt(zmq.SUBSCRIBE, topic) + def unsubscribe_topic(self, topic): + if topic in self.topics: + self.socket.setsockopt(zmq.UNSUBSCRIBE, topic) + self.topics.discard(topic) + + def subscribe_topic(self, topic): + if topic not in self.topics: + self.socket.setsockopt(zmq.SUBSCRIBE, topic) + self.topics.add(topic) + async def subscribe(self, msg_handler, *args, **kwargs): """Subscribe to updates from the provided socket.""" while True: