Skip to content

Commit

Permalink
Merge pull request #645 from oliver-sanders/643
Browse files Browse the repository at this point in the history
cat-log: add timeout to processes
  • Loading branch information
hjoliver authored Nov 12, 2024
2 parents 08a20c3 + b137137 commit 8461385
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 11 deletions.
2 changes: 2 additions & 0 deletions changes.d/645.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Add a default timeout for the `cylc cat-log` command which is used to provide access to log files in the cylc-ui.
This timeout can be adjusted with the `log_timeout` option.
21 changes: 21 additions & 0 deletions cylc/uiserver/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,26 @@ class CylcUIServer(ExtensionApp):
default_value=False,
)

log_timeout = Float(
# Note: This timeout it intended to clean up log streams that are no
# longer being actively monitored and prevent the associated "cat-log"
# processes from persisting in situations where they should not be
# (e.g. if the websocket connection unexpectedly closes)
config=True,
help='''
The maximum length of time Cylc will stream a log file for in
seconds.
The "Log" view in the Cylc GUI streams log files allowing you to
monitor the file while is grows.
After the configured timeout, the stream will close. The log
view in the GUI will display a "reconnect" button allowing you
to restart the stream if desired.
''',
default_value=(60 * 60 * 4), # 4 hours
)

@validate('ui_build_dir')
def _check_ui_build_dir_exists(self, proposed):
if proposed['value'].exists():
Expand Down Expand Up @@ -408,6 +428,7 @@ def __init__(self, *args, **kwargs):
# sub_status dictionary storing status of subscriptions
self.sub_statuses = {}
self.resolvers = Resolvers(
self,
self.data_store_mgr,
log=self.log,
executor=self.executor,
Expand Down
56 changes: 51 additions & 5 deletions cylc/uiserver/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
PIPE,
Popen,
)
from time import time
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -57,6 +58,7 @@
from cylc.flow.option_parsers import Options
from graphql import ResolveInfo

from cylc.uiserver.app import CylcUIServer
from cylc.uiserver.workflows_mgr import WorkflowsManager


Expand Down Expand Up @@ -205,6 +207,9 @@ def _clean(workflow_ids, opts):
class Services:
"""Cylc services provided by the UI Server."""

# log file stream lag
CAT_LOG_SLEEP = 1

@staticmethod
def _error(message: Union[Exception, str]):
"""Format error case response."""
Expand Down Expand Up @@ -351,7 +356,7 @@ async def enqueue(stream, queue):
await queue.put(line.decode())

@classmethod
async def cat_log(cls, id_: Tokens, log, info, file=None):
async def cat_log(cls, id_: Tokens, app: 'CylcUIServer', info, file=None):
"""Calls `cat log`.
Used for log subscriptions.
Expand All @@ -366,7 +371,7 @@ async def cat_log(cls, id_: Tokens, log, info, file=None):
]
if file:
cmd += ['-f', file]
log.info(f'$ {" ".join(cmd)}')
app.log.info(f'$ {" ".join(cmd)}')

# For info, below subprocess is safe (uses shell=false by default)
proc = await asyncio.subprocess.create_subprocess_exec(
Expand All @@ -380,26 +385,51 @@ async def cat_log(cls, id_: Tokens, log, info, file=None):
# This is to get around problem where stream is not EOF until
# subprocess ends
enqueue_task = asyncio.create_task(cls.enqueue(proc.stdout, queue))

# GraphQL operation ID
op_id = info.root_value

# track the number of lines received so far
line_count = 0

# the time we started the cylc cat-log process
start_time = time()

# configured cat-log process timeout
timeout = float(app.log_timeout)

try:
while info.context['sub_statuses'].get(op_id) != 'stop':
if time() - start_time > timeout:
# timeout exceeded -> kill the cat-log process
break

if queue.empty():
# there are *no* lines to read from the cat-log process
if buffer:
# yield everything in the buffer
yield {'lines': list(buffer)}
buffer.clear()

if proc.returncode is not None:
# process exited
# -> pass any stderr text to the client
(_, stderr) = await proc.communicate()
# pass any error onto ui
msg = process_cat_log_stderr(stderr) or (
f"cylc cat-log exited {proc.returncode}"
)
yield {'error': msg}

# stop reading log lines
break

# sleep set at 1, which matches the `tail` default interval
await asyncio.sleep(1)
await asyncio.sleep(cls.CAT_LOG_SLEEP)

else:
# there *are* lines to read from the cat-log process
if line_count > MAX_LINES:
# we have read beyond the line count
yield {'lines': buffer}
yield {
'error': (
Expand All @@ -408,25 +438,39 @@ async def cat_log(cls, id_: Tokens, log, info, file=None):
)
}
break

elif line_count == 0:
# this is the first line
# (this is a special line contains the file path)
line_count += 1
yield {
'connected': True,
'path': (await queue.get())[2:].strip(),
}
continue

# read in the log lines and add them to the buffer
line = await queue.get()
line_count += 1
buffer.append(line)
if len(buffer) >= 75:
yield {'lines': list(buffer)}
buffer.clear()
# there is more text to read so don't sleep (but
# still "sleep(0)" to yield control to other
# coroutines)
await asyncio.sleep(0)

finally:
# kill the cat-log process
kill_process_tree(proc.pid)

# terminate the queue
enqueue_task.cancel()
with suppress(asyncio.CancelledError):
await enqueue_task

# tell the client we have disconnected
yield {'connected': False}

@classmethod
Expand Down Expand Up @@ -467,13 +511,15 @@ class Resolvers(BaseResolvers):

def __init__(
self,
app: 'CylcUIServer',
data: 'DataStoreMgr',
log: 'Logger',
workflows_mgr: 'WorkflowsManager',
executor,
**kwargs
):
super().__init__(data)
self.app = app
self.log = log
self.workflows_mgr = workflows_mgr
self.executor = executor
Expand Down Expand Up @@ -561,7 +607,7 @@ async def subscription_service(
):
async for ret in Services.cat_log(
ids[0],
self.log,
self.app,
info,
file
):
Expand Down
73 changes: 67 additions & 6 deletions cylc/uiserver/tests/test_resolvers.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,27 @@
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import asyncio
from typing import Any, Dict, List, Optional, Tuple
from typing import Any, Dict, List, Tuple
from async_timeout import timeout
import logging
import os
import pytest
from unittest.mock import MagicMock, Mock
from subprocess import Popen, TimeoutExpired
from types import SimpleNamespace

from cylc.flow import CYLC_LOG
from cylc.flow.id import Tokens
Expand Down Expand Up @@ -228,7 +244,23 @@ def wait(timeout):
]


async def test_cat_log(workflow_run_dir):
@pytest.fixture
def app():
return SimpleNamespace(
log=logging.getLogger(CYLC_LOG),
log_timeout=10,
)


@pytest.fixture
def fast_sleep(monkeypatch):
monkeypatch.setattr(
'cylc.uiserver.resolvers.Services.CAT_LOG_SLEEP',
0.1,
)


async def test_cat_log(workflow_run_dir, app, fast_sleep):
"""This is a functional test for cat_log subscription resolver.
It creates a log file and then runs the cat_log service. Checking it
Expand Down Expand Up @@ -265,18 +297,17 @@ async def test_cat_log(workflow_run_dir):
# mock the context
info.context = {'sub_statuses': {2: "start"}}
workflow = Tokens(id_)
log = logging.getLogger(CYLC_LOG)
# note - timeout tests that the cat-log process is being stopped correctly

# note - timeout tests that the cat-log process is being stopped correctly
first_response = None
async with timeout(20):
ret = services.cat_log(workflow, log, info)
ret = services.cat_log(workflow, app, info)
actual = ''
is_first = True
async for response in ret:
if err := response.get('error'):
# Surface any unexpected errors for better visibility
log.exception(err)
app.log.exception(err)
if is_first:
first_response = response
is_first = False
Expand All @@ -298,6 +329,36 @@ async def test_cat_log(workflow_run_dir):
assert actual.rstrip() == log_file_content.rstrip()


async def test_cat_log_timeout(workflow_run_dir, app, fast_sleep):
"""This is a functional test for cat_log subscription resolver.
It creates a log file and then runs the cat_log service. Checking it
returns all the logs. Note the log content should be over 20 lines to check
the buffer logic.
"""
(id_, log_dir) = workflow_run_dir
log_file = log_dir / '01-start-01.log'
log_file.write_text('forty two')
info = MagicMock()
info.root_value = 2
# mock the context
info.context = {'sub_statuses': {2: "start"}}
workflow = Tokens(id_)

app.log_timeout = 0

ret = services.cat_log(workflow, app, info)
responses = []
async with timeout(5):
async for response in ret:
responses.append(response)
await asyncio.sleep(0)

assert len(responses) == 1
assert responses[0]['connected'] is False
assert 'error' not in responses[0]


@pytest.mark.parametrize(
'text, expected',
[
Expand Down

0 comments on commit 8461385

Please sign in to comment.