Skip to content

Commit

Permalink
Merge pull request #364 from MetRonnie/thread-pool
Browse files Browse the repository at this point in the history
DataStoreMgr - use single thread pool
  • Loading branch information
MetRonnie authored Jun 10, 2022
2 parents f415676 + d5a8091 commit 4373c0c
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 18 deletions.
3 changes: 1 addition & 2 deletions cylc/uiserver/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,6 @@ async def stop_extension(self):
for sub in self.data_store_mgr.w_subs.values():
sub.stop()
# Shutdown the thread pool executor
for executor in self.data_store_mgr.executors.values():
executor.shutdown(wait=False)
self.data_store_mgr.executor.shutdown(wait=False)
# Destroy ZeroMQ context of all sockets
self.workflows_mgr.context.destroy()
25 changes: 9 additions & 16 deletions cylc/uiserver/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,9 @@
import asyncio
from concurrent.futures import ThreadPoolExecutor
from copy import deepcopy
from functools import partial
from pathlib import Path
import time
from typing import Optional, Set
from typing import Dict, Optional, Set

from cylc.flow.id import Tokens
from cylc.flow.network.server import PB_METHOD_MAP
Expand Down Expand Up @@ -85,10 +84,10 @@ def __init__(self, workflows_mgr, log):
self.workflows_mgr = workflows_mgr
self.log = log
self.data = {}
self.w_subs = {}
self.w_subs: Dict[str, WorkflowSubscriber] = {}
self.topics = {ALL_DELTAS.encode('utf-8'), b'shutdown'}
self.loop = None
self.executors = {}
self.executor = ThreadPoolExecutor()
self.delta_queues = {}

@log_call
Expand Down Expand Up @@ -148,15 +147,12 @@ async def connect_workflow(self, w_id, contact_data):

# Might be options other than threads to achieve
# non-blocking subscriptions, but this works.
self.executors[w_id] = ThreadPoolExecutor()
self.executors[w_id].submit(
partial(
self._start_subscription,
w_id,
contact_data['name'],
contact_data[CFF.HOST],
contact_data[CFF.PUBLISH_PORT]
)
self.executor.submit(
self._start_subscription,
w_id,
contact_data['name'],
contact_data[CFF.HOST],
contact_data[CFF.PUBLISH_PORT]
)
successful_updates = await self._entire_workflow_update(ids=[w_id])

Expand Down Expand Up @@ -184,9 +180,6 @@ def disconnect_workflow(self, w_id):
if w_id in self.w_subs:
self.w_subs[w_id].stop()
del self.w_subs[w_id]
if w_id in self.executors:
self.executors[w_id].shutdown(wait=True)
del self.executors[w_id]

def get_workflows(self):
"""Return all workflows the data store is currently tracking.
Expand Down

0 comments on commit 4373c0c

Please sign in to comment.