Skip to content

Commit

Permalink
Fix sending event messages to non-subscribers (#7014)
Browse files Browse the repository at this point in the history
Co-authored-by: Hendrik Makait <[email protected]>
  • Loading branch information
lwatt and hendrikmakait authored Dec 12, 2022
1 parent 047b082 commit 19deee3
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 9 deletions.
16 changes: 7 additions & 9 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7634,15 +7634,13 @@ def log_event(self, topic: str | Collection[str], msg: Any) -> None:
logger.info("Plugin failed with exception", exc_info=True)

def _report_event(self, name, event):
for client in self.event_subscriber[name]:
self.report(
{
"op": "event",
"topic": name,
"event": event,
},
client=client,
)
msg = {
"op": "event",
"topic": name,
"event": event,
}
client_msgs = {client: [msg] for client in self.event_subscriber[name]}
self.send_all(client_msgs, worker_msgs={})

def subscribe_topic(self, topic, client):
self.event_subscriber[topic].add(client)
Expand Down
30 changes: 30 additions & 0 deletions distributed/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6763,6 +6763,36 @@ def log_scheduler(dask_scheduler):
assert events[1][1] == ("alice", "bob")


@gen_cluster(client=True, nthreads=[])
async def test_log_event_multiple_clients(c, s):
async with Client(s.address, asynchronous=True) as c2, Client(
s.address, asynchronous=True
) as c3:
received_events = []

def get_event_handler(handler_id):
def handler(event):
received_events.append((handler_id, event))

return handler

c.subscribe_topic("test-topic", get_event_handler(1))
c2.subscribe_topic("test-topic", get_event_handler(2))

while len(s.event_subscriber["test-topic"]) != 2:
await asyncio.sleep(0.01)

with captured_logger(logging.getLogger("distributed.client")) as logger:
await c.log_event("test-topic", {})

while len(received_events) < 2:
await asyncio.sleep(0.01)

assert len(received_events) == 2
assert {handler_id for handler_id, _ in received_events} == {1, 2}
assert "ValueError" not in logger.getvalue()


@gen_cluster(client=True)
async def test_annotations_task_state(c, s, a, b):
da = pytest.importorskip("dask.array")
Expand Down

0 comments on commit 19deee3

Please sign in to comment.