Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Cylc Clean Support #323

Merged
merged 40 commits into from
Jun 17, 2022
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
b5b9425
WIP making cylc clean work on UI
wxtim Feb 16, 2022
5fbcfd1
refactor to common logic from play and clean into one place.
wxtim Feb 18, 2022
cca15e3
just a note
wxtim Feb 21, 2022
670fcd1
implement cylc clean in a blocking way
wxtim Feb 22, 2022
17b2b0b
run clean in subprocpool
wxtim Feb 24, 2022
9cb564d
added testing
wxtim Feb 24, 2022
4023a37
put play function back
wxtim Feb 24, 2022
5baddf2
keep the _build_cmd refactor
wxtim Feb 24, 2022
185edf0
newline
wxtim Feb 24, 2022
60355d0
flake8
wxtim Feb 24, 2022
bdbc5b2
Fix mypy bug
wxtim Feb 25, 2022
cc7ccb6
Made ProcPoolExecutor a whole UIS object
wxtim Feb 25, 2022
1accec8
fix style
wxtim Feb 28, 2022
ff85529
fix tests which check for mutations
wxtim Feb 28, 2022
b2185cd
Update CHANGES.md
wxtim Mar 2, 2022
9138aaa
Apply suggestions from code review
wxtim Mar 10, 2022
3ecb249
fix issue with remote timeout
wxtim Mar 14, 2022
ca399dc
fixed test error
wxtim Mar 14, 2022
9242db1
Update cylc/uiserver/resolvers.py
wxtim Mar 14, 2022
3a8ab0b
Merge branch 'master' into backup
wxtim Apr 7, 2022
01c5868
fix after merge
wxtim Apr 7, 2022
31d9097
make _schema_opts_to_api_opts a generic function
wxtim Apr 7, 2022
6696835
reinstate deleted command building
wxtim Apr 7, 2022
deee2a5
Revert "reinstate deleted command building"
wxtim Apr 7, 2022
11c86ef
reinstate deleted command building
wxtim Apr 7, 2022
6733cab
response to review
wxtim Apr 7, 2022
69332cf
Merge branch 'master' of github.com:cylc/cylc-uiserver into add_clean…
wxtim Apr 29, 2022
00c39ec
Fix broken code caused by booleans testing true as ints and being con…
wxtim Apr 29, 2022
cf7a76b
added log entry for cleaning
wxtim Apr 29, 2022
113183c
produce nicer error message when user attempts to stop running workfl…
wxtim Apr 29, 2022
f050ab6
fix flake8
wxtim Apr 29, 2022
1145ede
Update cylc/uiserver/resolvers.py
wxtim May 31, 2022
5ae1703
Update cylc/uiserver/resolvers.py
wxtim May 31, 2022
875f2a8
response to review
wxtim Jun 1, 2022
3299e00
fix rm bug
wxtim Jun 1, 2022
aca76f9
Update cylc/uiserver/resolvers.py
wxtim Jun 7, 2022
d610994
Update cylc/uiserver/resolvers.py
wxtim Jun 7, 2022
2c3aafd
merge
wxtim Jun 14, 2022
b698883
Merge branch 'add_clean_to_ui2' of github.com:wxtim/cylc-uiserver int…
wxtim Jun 14, 2022
f1d3a6c
fix change in tests
wxtim Jun 17, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ creating a new release entry be sure to copy & paste the span tag with the
updated. Only the first match gets replaced, so it's fine to leave the old
ones in. -->

## __cylc-uiserver-1.1.0 (<span actions:bind='release-date'></span>)__

### Enhancements

[#323](https://github.com/cylc/cylc-uiserver/pull/323) -
`cylc clean` made available for runs and files within runs.

-------------------------------------------------------------------------------
## __cylc-uiserver-1.0.1 (<span actions:bind='release-date'>Released 2022-03-23</span>)__

Expand Down
13 changes: 13 additions & 0 deletions cylc/uiserver/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
the environment variable ``CYLC_SITE_CONF_PATH``.
"""

from concurrent.futures import ProcessPoolExecutor
import getpass
from pathlib import Path, PurePath
import sys
Expand All @@ -64,6 +65,7 @@
from traitlets import (
Dict,
Float,
Int,
TraitError,
TraitType,
Undefined,
Expand Down Expand Up @@ -306,6 +308,13 @@ class CylcUIServer(ExtensionApp):
''',
default_value=5.0 # default values as kwargs correctly display in docs
)
max_workers = Int(
config=True,
help='''
Set the maximum number of workers for process pools.
''',
default_value=1
)

@validate('ui_build_dir')
def _check_ui_build_dir_exists(self, proposed):
Expand Down Expand Up @@ -364,11 +373,13 @@ def _get_ui_path(self):

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.executor = ProcessPoolExecutor(max_workers=self.max_workers)
self.workflows_mgr = WorkflowsManager(self, log=self.log)
self.data_store_mgr = DataStoreMgr(self.workflows_mgr, self.log)
self.resolvers = Resolvers(
self.data_store_mgr,
log=self.log,
executor=self.executor,
workflows_mgr=self.workflows_mgr,
)

Expand Down Expand Up @@ -510,5 +521,7 @@ async def stop_extension(self):
# Shutdown the thread pool executor
for executor in self.data_store_mgr.executors.values():
executor.shutdown(wait=False)
# Shutdown processpool executor.
self.executor.shutdown(wait=False)
# Destroy ZeroMQ context of all sockets
self.workflows_mgr.context.destroy()
192 changes: 170 additions & 22 deletions cylc/uiserver/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,27 @@

"""GraphQL resolvers for use in data accessing and mutation of workflows."""

import asyncio
from getpass import getuser
import os
from copy import deepcopy
from functools import partial
from subprocess import Popen, PIPE, DEVNULL
from types import SimpleNamespace
from typing import (
TYPE_CHECKING, Any, Dict, Iterable, List, Union
TYPE_CHECKING, Any, Callable, Dict, Iterable, List, Union
)

from graphql.language.base import print_ast

from cylc.flow.data_store_mgr import WORKFLOW
from cylc.flow.exceptions import CylcError, ServiceFileError
from cylc.flow.network.resolvers import BaseResolvers
from cylc.flow.workflow_files import init_clean


class InvalidSchemaOptionError(CylcError):
...


if TYPE_CHECKING:
Expand All @@ -39,6 +48,21 @@

# show traceback from cylc commands
DEBUG = True
CLEAN = 'clean'
OPT_CONVERTERS: Dict[str, Dict[str, Union[Callable, None]]] = {
CLEAN: {
'rm': lambda opt, value: (
'rm_dirs', [v.strip() for v in value.split(':')]
),
wxtim marked this conversation as resolved.
Show resolved Hide resolved
'local_only': None,
'remote_only': None,
'debug':
lambda opt, value:
('verbosity', 2) if value is True else ('verbosity', 0),
wxtim marked this conversation as resolved.
Show resolved Hide resolved
'no_timestamp': lambda opt, value: ('log_timestamp', not value),
}
}
WORKFLOW_RUNNING_MSG = 'You can\'t clean a running workflow'


def snake_to_kebab(snake):
Expand Down Expand Up @@ -81,6 +105,103 @@ def check_cylc_version(version):
return ret or out.strip() == version


def _build_cmd(cmd: List, args: Dict) -> List:
"""Add args to command.

Args:
cmd: A base command.
args: Args to append to base command.

Returns: An elaborated command.

Examples:
It adds one arg to a command:
>>> _build_cmd(['foo', 'bar'], {'set_baz': 'qux'})
['foo', 'bar', '--set-baz', 'qux']

It adds one integer arg to a command:
>>> _build_cmd(['foo', 'bar'], {'set_baz': 42})
['foo', 'bar', '--set-baz', '42']

It adds a list of the same arg to a command:
>>> _build_cmd(['foo', 'bar'], {'set_baz': ['qux', 'quiz']})
['foo', 'bar', '--set-baz', 'qux', '--set-baz', 'quiz']

It doesn't append args == False:
>>> _build_cmd(['foo', 'bar'], {'set_baz': False})
['foo', 'bar']

It doesn't add boolean values, just the switch
>>> _build_cmd(['foo', 'bar'], {'set_baz': True})
['foo', 'bar', '--set-baz']
"""
for key, value in args.items():
if value is False:
# don't add binary flags
continue
key = snake_to_kebab(key)
if not isinstance(value, list):
if isinstance(value, int) and not isinstance(value, bool):
# Any integer items need converting to strings:
value = str(value)
value = [value]
for item in value:
cmd.append(key)
if item is not True:
# don't provide values for binary flags
cmd.append(item)
return cmd


def _schema_opts_to_api_opts(
schema_opts: Dict, schema: str
) -> SimpleNamespace:
"""Convert Schema opts to api Opts

Contains data SCHEMA_TO_API:
A mapping of schema options to functions in the form:
def func(schema_key, schema_value):
return (option_key, option _value)

Args:
schema_opts: Opts as described by the schema.
schema: Name of schema for conversion - used to select
converter functions from SCHEMA_TO_API.

Returns:
Namespace for use as options.
"""
converters = OPT_CONVERTERS[schema]
api_opts = {}
for opt, value in schema_opts.items():
# All valid options should be in SCHEMA_TO_API:
if opt not in converters:
raise InvalidSchemaOptionError(
f'{opt} is not a valid option for Cylc Clean'
)

# If converter is callable, call it on opt, value,
# else just copy them verbatim to api_opts
converter = converters[opt]
if callable(converter):
api_opt_name, api_opt_value = converter(opt, value)
api_opts[api_opt_name] = api_opt_value
else:
api_opts[opt] = value
return SimpleNamespace(**api_opts)


def _clean(tokens, opts):
"""Run Cylc Clean using `cylc.flow.workflow_files` api.
"""
try:
init_clean(tokens.pop('workflow'), opts)
except ServiceFileError as exc:
return str(exc).split('\n')[0]
else:
return 'Workflow cleaned'


class Services:
"""Cylc services provided by the UI Server."""

Expand All @@ -100,11 +221,39 @@ def _return(message):
message
]

@classmethod
async def clean(cls, workflows, args, workflows_mgr, executor, log):
"""Calls `init_clean`"""
# Convert Schema options → cylc.flow.workflow_files.init_clean opts:
try:
opts = _schema_opts_to_api_opts(args, schema=CLEAN)
except Exception as exc:
return cls._error(exc)
# Hard set remote timeout.
opts.remote_timeout = "600"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very minor point but this defaults to 120 in cylc flow. Wonder if it would be worth making them consistent, for documentation purposes mainly.
I think 600 may seem like the better timeout than 120, since nfs and file deletions can perhaps take a while?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the CLI 120 is the default, but the user can over-ride it.
In the UI I haven't exposed it to the user, so I think it's reasonable to set it longer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason not to expose it to the user? (Happy to leave this as is for now, but perhaps there should be a question issue for how long the timeouts should be on the CLI vs UI and whether to expose it to user on the UI)

Copy link
Member Author

@wxtim wxtim May 31, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't remember the reason, but I think @oliver-sanders and I agreed not to - I think it ought to be documented for discussion...


# clean each requested flow
for tokens in workflows:
clean_func = partial(_clean, tokens, opts)
try:
log.info(f'Cleaning {tokens}')
future = await asyncio.wrap_future(executor.submit(clean_func))
except Exception as exc:
log.exception(exc)
return cls._error(exc)
else:
if future == WORKFLOW_RUNNING_MSG:
log.error(ServiceFileError(WORKFLOW_RUNNING_MSG))
return cls._error(WORKFLOW_RUNNING_MSG)

# trigger a re-scan
await workflows_mgr.update()
return cls._return("Workflow(s) cleaned")

@classmethod
async def play(cls, workflows, args, workflows_mgr, log):
"""Calls `cylc play`."""
response = []

# get ready to run the command
try:
# check that the request cylc version is available
Expand All @@ -120,23 +269,11 @@ async def play(cls, workflows, args, workflows_mgr, log):

# build the command
cmd = ['cylc', 'play', '--color=never']
for key, value in args.items():
if value is False:
# don't add binary flags
continue
key = snake_to_kebab(key)
if not isinstance(value, list):
value = [value]
for item in value:
cmd.append(key)
if item is not True:
# don't provide values for binary flags
cmd.append(item)
cmd = _build_cmd(cmd, args)

except Exception as exc:
# oh noes, something went wrong, send back confirmation
return cls._error(exc)

# start each requested flow
for tokens in workflows:
try:
Expand Down Expand Up @@ -182,7 +319,6 @@ async def play(cls, workflows, args, workflows_mgr, log):
return cls._return(
'Workflow started'
)

# trigger a re-scan
await workflows_mgr.update()
return response
Expand All @@ -196,11 +332,13 @@ def __init__(
data: 'DataStoreMgr',
log: 'Logger',
workflows_mgr: 'WorkflowsManager',
executor,
**kwargs
):
super().__init__(data)
self.log = log
self.workflows_mgr = workflows_mgr
self.executor = executor

# Set extra attributes
for key, value in kwargs.items():
Expand Down Expand Up @@ -252,9 +390,19 @@ async def service(
workflows: Iterable['Tokens'],
kwargs: Dict[str, Any]
) -> List[Union[bool, str]]:
return await Services.play(
workflows,
kwargs,
self.workflows_mgr,
log=self.log
)
if command == 'clean':
return await Services.clean(
workflows,
kwargs,
self.workflows_mgr,
log=self.log,
executor=self.executor
)

else:
return await Services.play(
workflows,
kwargs,
self.workflows_mgr,
log=self.log
)
48 changes: 48 additions & 0 deletions cylc/uiserver/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,57 @@ class Arguments:
result = GenericScalar()


class Clean(graphene.Mutation):
class Meta:
description = sstrip('''
Clean a workflow from the run directory.
''')
resolver = partial(mutator, command='clean')

class Arguments:
workflows = graphene.List(WorkflowID, required=True)
rm = graphene.String(
Copy link
Member

@MetRonnie MetRonnie Jun 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tested it out with rm and it failed. This should be a list of strings rather than a single string, I think

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - according to the documentation it can be a colon-separated list of strings. I've added something to deal.

default_value='',
description=sstrip('''
Only clean the specified subdirectories (or files) in
the run directory, rather than the whole run

Can be a colon separated list:
E.g. '.service/db:log:share:work'.
''')
)
local_only = graphene.Boolean(
default_value=False,
description=sstrip('''
Only clean on the local filesystem (not remote hosts).
''')
)
remote_only = graphene.Boolean(
default_value=False,
description=sstrip('''
Only clean on remote hosts (not the local filesystem).
''')
)
debug = graphene.Boolean(
default_value=False,
description=sstrip('''
Output developer information and show exception tracebacks.
''')
)
no_timestamp = graphene.Boolean(
default_value=False,
description=sstrip('''
Don't timestamp logged messages.
''')
)

result = GenericScalar()


class UISMutations(Mutations):

play = _mut_field(Play)
clean = _mut_field(Clean)


schema = graphene.Schema(
Expand Down
Loading