From da9dd6374a692811241f6c341265eded1a6fc046 Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Wed, 18 Dec 2019 09:48:43 +1300 Subject: [PATCH 1/5] aotf: make graphql mutation arguments more declaritive --- cylc/flow/network/schema.py | 416 ++++++++++++++++++++++-------------- cylc/flow/scheduler.py | 57 ++++- cylc/flow/suite_status.py | 15 +- 3 files changed, 321 insertions(+), 167 deletions(-) diff --git a/cylc/flow/network/schema.py b/cylc/flow/network/schema.py index 6eebbf02847..6b44237957f 100644 --- a/cylc/flow/network/schema.py +++ b/cylc/flow/network/schema.py @@ -17,6 +17,9 @@ """GraphQL API schema via Graphene implementation.""" import asyncio +from functools import partial +import logging +from textwrap import dedent from typing import Callable, AsyncGenerator, Any from cylc.flow.task_state import TASK_STATUSES_ORDERED @@ -24,13 +27,21 @@ ID_DELIM, FAMILIES, FAMILY_PROXIES, JOBS, TASKS, TASK_PROXIES ) +from cylc.flow.suite_status import StopMode +from cylc.flow.task_state import TASK_STATUSES_ALL + from graphene import ( Boolean, Field, Float, ID, InputObjectType, Int, - List, Mutation, ObjectType, Schema, String, Union + List, Mutation, ObjectType, Schema, String, Union, Enum ) from graphene.types.generic import GenericScalar from graphene.utils.str_converters import to_snake_case + +def sstrip(text): + return dedent(text).strip() + + PROXY_NODES = 'proxy_nodes' @@ -817,7 +828,7 @@ class Meta: # Mutators: -async def mutator(root, info, command, workflows=None, +async def mutator(root, info, command=None, workflows=None, exworkflows=None, **args): """Call the resolver method that act on the workflow service via the internal command queue.""" @@ -868,22 +879,101 @@ async def nodes_mutator(root, info, command, ids, workflows=None, return GenericResponse(result=res) +class WorkflowName(String): + """a""" + + +class User(String): + """a""" + + # Mutations defined: +class WorkflowID(InputObjectType): + """A workflow identifier of the form `user|workflow_name`.""" + name = WorkflowName(required=True) + user = User() + + +class CyclePoint(String): + """An integer or date-time cyclepoint.""" + + +class CyclePointGlob(String): + """An integer or date-time cyclepoint.""" + + +TaskStatus = Enum( + 'TaskStatus', + [ + (status, status) + for status in TASK_STATUSES_ALL + ] + #description='The status of ac task e.g. waiting, running, succeeded.' +) + + +class TaskState(InputObjectType): + """The state of a task, a combination of status and other field.""" + + status = TaskStatus() + is_held = Boolean(description=sstrip(''' + If a task is held no new job submissions will be made + ''')) + + +class TaskName(String): + """The name a task. + + * Must be a task not a family. + * Does not include the cycle point. + * Any parameters must be expanded (e.g. can't be `foo`). + """ + + +class NamespaceName(String): + """The name of a task or family.""" + + +class NamespaceIDGlob(String): + """A glob search for an active task or family.""" + + cycle = CyclePointGlob() + namespace = NamespaceName() + status = TaskState() + + +class TaskID(InputObjectType): + """The name of an active task.""" + + cycle = CyclePoint(required=True) + name = TaskName(required=True) + + +class JobID(InputObjectType): + """A job submission from an active task.""" + + task = TaskID(required=None) + submission_number = Int(default=-1) + + +class TimePoint(String): + """A date-time in the ISO8601 format.""" + + class ClearBroadcast(Mutation): class Meta: description = """Expire all settings targeting cycle points earlier than cutoff.""" - resolver = mutator + resolver = partial(mutator, command='clear_broadcast') class Arguments: - workflows = List(String, required=True) - command = String(default_value='clear_broadcast') + workflows = List(WorkflowID, required=True) point_strings = List( - String, + CyclePoint, description="""`["*"]`""", default_value=['*']) namespaces = List( - String, + NamespaceName, description="""namespaces: `["foo", "BAZ"]`""",) cancel_settings = List( GenericScalar, @@ -897,11 +987,10 @@ class ExpireBroadcast(Mutation): class Meta: description = """Clear settings globally, or for listed namespaces and/or points.""" - resolver = mutator + resolver = partial(mutator, command='expire_broadcast') class Arguments: - workflows = List(String, required=True) - command = String(default_value='expire_broadcast') + workflows = List(WorkflowID, required=True) cutoff = String(description="""String""") result = GenericScalar() @@ -909,19 +998,15 @@ class Arguments: class HoldWorkflow(Mutation): class Meta: - description = """Hold workflow. -- hold on workflow. (default) -- hold point of workflow.""" - resolver = mutator + description = 'Hold workflow.' + resolver = partial(mutator, command='hold_workflow') class Arguments: - command = String( - description="""options: -- `hold_suite` (default) -- `hold_after_point_string`""", - default_value='hold_suite') - point_string = String() - workflows = List(String, required=True) + workflows = List(WorkflowID, required=True) + time = TimePoint(description=sstrip(''' + Get the workflow to hold after the specified wallclock time + has passed. + ''')) result = GenericScalar() @@ -929,11 +1014,10 @@ class Arguments: class NudgeWorkflow(Mutation): class Meta: description = """Tell workflow to try task processing.""" - resolver = mutator + resolver = partial(mutator, command='nudge') class Arguments: - command = String(default_value='nudge') - workflows = List(String, required=True) + workflows = List(WorkflowID, required=True) result = GenericScalar() @@ -942,17 +1026,16 @@ class PutBroadcast(Mutation): class Meta: description = """Put up new broadcast settings (server side interface).""" - resolver = mutator + resolver = partial(mutator, command='put_broadcast') class Arguments: - command = String(default_value='put_broadcast') - workflows = List(String, required=True) + workflows = List(WorkflowID, required=True) point_strings = List( - String, + CyclePointGlob, description="""`["*"]`""", default_value=['*']) namespaces = List( - String, + NamespaceName, description="""namespaces: `["foo", "BAZ"]`""",) settings = List( GenericScalar, @@ -964,18 +1047,14 @@ class Arguments: class PutMessages(Mutation): class Meta: - description = """Put task messages in queue for processing -later by the main loop.""" - resolver = nodes_mutator + description = sstrip(''' + Put task messages in queue for processing later by the main loop. + ''') + resolver = partial(nodes_mutator, command='put_messages') class Arguments: - workflows = List(String, required=True) - command = String(default_value='put_messages') - ids = List( - String, - description="""Task job in the form -`"CYCLE%TASK_NAME%SUBMIT_NUM"`""", - required=True) + workflows = List(WorkflowID, required=True) + ids = List(JobID, required=True) event_time = String(default_value=None) messages = List( List(String), @@ -988,11 +1067,10 @@ class Arguments: class ReleaseWorkflow(Mutation): class Meta: description = """Reload workflow definitions.""" - resolver = mutator + resolver = partial(mutator, command='release_suite') class Arguments: - command = String(default_value='release_suite') - workflows = List(String, required=True) + workflows = List(WorkflowID, required=True) result = GenericScalar() @@ -1000,136 +1078,48 @@ class Arguments: class ReloadWorkflow(Mutation): class Meta: description = """Tell workflow to reload the workflow definition.""" - resolver = mutator + resolver = partial(mutator, command='reload_suite') class Arguments: - workflows = List(String, required=True) - command = String(default_value='reload_suite') + workflows = List(WorkflowID, required=True) result = GenericScalar() +LogLevels = Enum( + 'LogLevels', + list(logging._nameToLevel.items()) + #description='Logging severity level.' +) + + class SetVerbosity(Mutation): class Meta: description = """Set workflow verbosity to new level.""" - resolver = mutator + resolver = partial(mutator, command='set_verbosity') class Arguments: - workflows = List(String, required=True) - command = String(default_value='set_verbosity') - level = String( - description="""levels: -`INFO`, `WARNING`, `NORMAL`, `CRITICAL`, `ERROR`, `DEBUG`""", - required=True) + workflows = List(WorkflowID, required=True) + level = LogLevels(required=True) result = GenericScalar() -class StopWorkflowArgs(InputObjectType): - datetime_string = String(description="""ISO 8601 compatible or -`YYYY/MM/DD-HH:mm` of wallclock/real-world date-time""") - point_string = String(description="""Workflow formatted point string""") - task_id = String() - kill_active_tasks = Boolean(description="""Use with: set_stop_cleanly""") - terminate = Boolean(description="""Use with: `stop_now`""") - - class StopWorkflow(Mutation): class Meta: - description = """Workflow stop actions: -- Cleanly or after kill active tasks. (default) -- After cycle point. -- After wallclock time. -- On event handler completion, or terminate right away. -- After an instance of a task.""" - resolver = mutator + description = 'Stop a running workflow' + resolver = partial(mutator, command='stop_workflow') class Arguments: - workflows = List(String, required=True) - command = String( - description="""String options: -- `set_stop_cleanly` (default) -- `set_stop_after_clock_time` -- `set_stop_after_point` -- `set_stop_after_task` -- `stop_now`""", - default_value='set_stop_cleanly',) - args = StopWorkflowArgs() - - result = GenericScalar() - - -class TaskArgs(InputObjectType): - check_syntax = Boolean(description="""Use with actions: -- `dry_run_tasks`""") - no_check = Boolean(description="""Use with actions: -- `insert_tasks`""") - stop_point_string = String(description="""Use with actions: -- `insert_tasks`""") - poll_succ = Boolean(description="""Use with actions: -- `poll_tasks`""") - spawn = Boolean(description="""Use with actions: -- `remove_tasks`""") - state = String(description="""Use with actions: -- `reset_task_states`""") - outputs = List(String, description="""Use with actions: -- `reset_task_states`""") - back_out = Boolean(description="""Use with actions: -- `trigger_tasks`""") - - -class TaskActions(Mutation): - class Meta: - description = """Task actions: -- Prepare job file for task(s). -- Hold tasks. -- Insert task proxies. -- Kill task jobs. -- Return True if task_id exists (and running). -- Unhold tasks. -- Remove tasks from task pool. -- Reset statuses tasks. -- Spawn tasks. -- Trigger submission of task jobs where possible.""" - resolver = nodes_mutator - - class Arguments: - workflows = List(String) - command = String( - description="""Task actions: -- `dry_run_tasks` -- `hold_tasks` -- `insert_tasks` -- `kill_tasks` -- `poll_tasks` -- `release_tasks` -- `remove_tasks` -- `reset_task_states` -- `spawn_tasks` -- `trigger_tasks`""", - required=True,) - ids = List( - String, - description="""Used with: -- All Commands - -A list of identifiers (family%glob%id) for matching task proxies, i.e. -``` -[ - "owner%workflow%201905*%foo", - "foo.201901*:failed", - "201901*%baa:failed", - "FAM.20190101T0000Z", - "FAM2", - "*.20190101T0000Z" -] -``` -(where % is the delimiter) - -Splits argument into components, creates workflows argument if non-existent. -""", - required=True) - args = TaskArgs() + workflows = List(WorkflowID, required=True) + mode = Enum.from_enum( + StopMode, + # TODO: this isn't working + # description=StopMode.description + )() + cycle_point = CyclePoint() + clock_time = TimePoint() + task = TaskID() result = GenericScalar() @@ -1137,14 +1127,13 @@ class Arguments: class TakeCheckpoint(Mutation): class Meta: description = """Checkpoint current task pool.""" - resolver = mutator + resolver = partial(mutator, command='take_checkpoints') class Arguments: - workflows = List(String, required=True) - command = String(default_value='take_checkpoints') + workflows = List(WorkflowID, required=True) name = String( description="""The checkpoint name""", - required=True,) + required=True) result = GenericScalar() @@ -1152,19 +1141,109 @@ class Arguments: class ExternalTrigger(Mutation): class Meta: description = """Server-side external event trigger interface.""" - resolver = mutator + resolver = partial(mutator, command='put_external_trigger') class Arguments: - workflows = List(String, required=True) - command = String(default_value='put_external_trigger') + workflows = List(WorkflowID, required=True) event_message = String(required=True) event_id = String(required=True) result = GenericScalar() +class TaskMutation: + class Arguments: + workflows = List(WorkflowID) + ids = List(NamespaceIDGlob, required=True) + + result = GenericScalar() + + +class DryRunTasks(Mutation, TaskMutation): + class Arguments(TaskMutation.Arguments): + check_syntax = Boolean() + + class Meta: + description = '' + resolver = partial(mutator, command='dry_run_tasks') + + +class HoldTasks(Mutation, TaskMutation): + class Meta: + description = '' + resolver = partial(mutator, command='hold_tasks') + + +class InsertTasks(Mutation, TaskMutation): + class Arguments(TaskMutation.Arguments): + no_check = Boolean() + stop_point_string = String() + + class Meta: + description = '' + resolver = partial(mutator, command='insert_tasks') + + +class KillTaskJobs(Mutation, TaskMutation): + # TODO: This should be a job mutation? + class Meta: + description = '' + # TODO: rename tasks=>jobs ??? + resolver = partial(mutator, command='kill_tasks') + + +class PollTasks(Mutation, TaskMutation): + class Arguments(TaskMutation.Arguments): + poll_succ = Boolean() + + class Meta: + description = '' + resolver = partial(mutator, command='poll_tasks') + + +class ReleaseTasks(Mutation, TaskMutation): + class Meta: + description = '' + resolver = partial(mutator, command='release_tasks') + + +class RemoveTasks(Mutation, TaskMutation): + class Arguments(TaskMutation.Arguments): + spawn = Boolean() + + class Meta: + description = '' + resolver = partial(mutator, command='remove_tasks') + + +class ResetTaskStates(Mutation, TaskMutation): + class Arguments(TaskMutation.Arguments): + state = String() + outputs = List(String) + + class Meta: + description = '' + resolver = partial(mutator, command='reset_task_states') + + +class SpawnTasks(Mutation, TaskMutation): + class Meta: + description = '' + resolver = partial(mutator, command='spawn_tasks') + + +class TriggerTasks(Mutation, TaskMutation): + class Arguments(TaskMutation.Arguments): + back_out = Boolean() + + class Meta: + description = '' + resolver = partial(mutator, command='trigger_tasks') + + # Mutation declarations class Mutations(ObjectType): + # workflow actions clear_broadcast = ClearBroadcast.Field( description=ClearBroadcast._meta.description) expire_broadcast = ExpireBroadcast.Field( @@ -1189,8 +1268,21 @@ class Mutations(ObjectType): description=StopWorkflow._meta.description) take_checkpoint = TakeCheckpoint.Field( description=TakeCheckpoint._meta.description) - task_actions = TaskActions.Field( - description=TaskActions._meta.description) + + # task actions + dry_run_tasks = DryRunTasks.Field() + hold_tasks = HoldTasks.Field() + insert_tasks = InsertTasks.Field() + kill_task_jobs = KillTaskJobs.Field() + poll_tasks = PollTasks.Field() + release_tasks = ReleaseTasks.Field() + remove_tasks = RemoveTasks.Field() + reset_task_states = ResetTaskStates.Field() + spawn_tasks = SpawnTasks.Field() + trigger_tasks = TriggerTasks.Field() + + # job actions + # TODO # ** Subscription Related ** # diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 5831ace8998..182cfe34fbd 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -815,14 +815,57 @@ def info_ping_task(self, task_id, exists_only=False): task_id = self.get_standardised_taskid(task_id) return self.pool.ping_task(task_id, exists_only) + def command_stop_workflow( + self, + stop_mode=None, + cycle_point=None, + # NOTE clock_time YYYY/MM/DD-HH:mm back-compat removed + clock_time=None, + task=None + ): + # immediate shutdown + if stop_mode: + self._set_stop(stop_mode) + elif not any([stop_mode, cycle_point, clock_time, task]): + # if no arguments provided do a standard clean shutdown + self._set_stop(StopMode.REQUEST_CLEAN) + + # schedule shutdown after tasks pass provided cycle point + if cycle_point: + point = self.get-standardised_point(cycle_point) + if self.pool.set_stop_point(point): + self.options.stopcp = str(point) + self.suite_db_mgr.put_suite_stop_cycle_point( + self.options.stopcp) + else: + # TODO: yield warning + pass + + # schedule shutdown after wallclock time passes provided time + if clock_time: + parser = TimePointParser() + time = parser.parse(clock_time) + self.set_stop_clock(int(stop_time.get("seconds_since_unix_epoch"))) + + # schedule shutdown after task succeeds + if task: + task_id = self.get_standardised_taskid(task_id) + if TaskID.is_valid_id(task_id): + self.set_stop_task(task_id) + else: + # TODO: yield warning + pass + def command_set_stop_cleanly(self, kill_active_tasks=False): """Stop job submission and set the flag for clean shutdown.""" + # TODO: deprecate self._set_stop() if kill_active_tasks: self.time_next_kill = time() def command_stop_now(self, terminate=False): """Shutdown immediately.""" + # TODO: deprecate if terminate: self._set_stop(StopMode.REQUEST_NOW_NOW) else: @@ -837,6 +880,7 @@ def _set_stop(self, stop_mode=None): def command_set_stop_after_point(self, point_string): """Set stop after ... point.""" + # TODO: deprecate stop_point = self.get_standardised_point(point_string) if self.pool.set_stop_point(stop_point): self.options.stopcp = str(stop_point) @@ -847,6 +891,7 @@ def command_set_stop_after_clock_time(self, arg): format: ISO 8601 compatible or YYYY/MM/DD-HH:mm (backwards comp.) """ + # TODO: deprecate parser = TimePointParser() try: stop_time = parser.parse(arg) @@ -859,6 +904,7 @@ def command_set_stop_after_clock_time(self, arg): def command_set_stop_after_task(self, task_id): """Set stop after a task.""" + # TODO: deprecate task_id = self.get_standardised_taskid(task_id) if TaskID.is_valid_id(task_id): self.set_stop_task(task_id) @@ -899,12 +945,17 @@ def command_hold_tasks(self, items): """Hold selected task proxies in the suite.""" return self.pool.hold_tasks(items) + def command_hold_workflow(self, point_string=None): + self.hold_suite(point_string) + def command_hold_suite(self): """Hold all task proxies in the suite.""" + # TODO: deprecate self.hold_suite() def command_hold_after_point_string(self, point_string): """Hold tasks AFTER this point (itask.point > point).""" + # TODO: deprecate point = self.get_standardised_point(point_string) self.hold_suite(point) LOG.info( @@ -1904,7 +1955,11 @@ def hold_suite(self, point=None): self.task_events_mgr.pflag = True self.suite_db_mgr.put_suite_hold() else: - LOG.info("Setting suite hold cycle point: %s", point) + LOG.info( + 'Setting suite hold cycle point: %s.' + '\nThe suite will hold once all tasks have passed this point.', + point + ) self.pool.set_hold_point(point) self.suite_db_mgr.put_suite_hold_cycle_point(point) diff --git a/cylc/flow/suite_status.py b/cylc/flow/suite_status.py index e350f1e2552..c721943a24c 100644 --- a/cylc/flow/suite_status.py +++ b/cylc/flow/suite_status.py @@ -74,17 +74,24 @@ class StopMode(Enum): def describe(self): """Return a user-friendly description of this state.""" if self == self.AUTO: - return 'suite has completed' + return 'wait until suite has completed' if self == self.AUTO_ON_TASK_FAILURE: - return 'a task has finished' + return 'wait until the first task fails' if self == self.REQUEST_CLEAN: - return 'waiting for active jobs to complete' + return 'wait for active jobs to complete' if self == self.REQUEST_NOW: - return 'waiting for event handlers to complete' + return 'immediate shutdown, wait for event handlers to complete' if self == self.REQUEST_NOW_NOW: return 'immediate shutdown' return '' + @classmethod + def description(cls, value): + try: + cls(value).describe() + except ValueError: + pass + class AutoRestartMode(Enum): """The possible modes of a suite auto-restart.""" From c61e77ea73984067d0c9967d6b6add2b537bde10 Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Fri, 20 Dec 2019 14:41:17 +1300 Subject: [PATCH 2/5] aotf: bring mutations inline with cli * collect stop, hold, broadcast * use enumerations where appropriate * route GQL mutations via the traditional endpoints this is a temporary interface for simplicity it can be torn down later --- cylc/flow/network/__init__.py | 4 +- cylc/flow/network/client.py | 2 +- cylc/flow/network/resolvers.py | 40 +- cylc/flow/network/schema.py | 648 +++++++++++++++------ cylc/flow/network/server.py | 85 +++ cylc/flow/network/subscriber.py | 2 +- cylc/flow/scheduler.py | 47 +- cylc/flow/suite_status.py | 17 +- cylc/flow/task_state.py | 27 + cylc/flow/tests/network/test_client.py | 6 +- cylc/flow/tests/network/test_publisher.py | 14 +- cylc/flow/tests/network/test_resolvers.py | 11 +- cylc/flow/tests/network/test_server.py | 4 +- cylc/flow/tests/network/test_subscriber.py | 6 +- cylc/flow/tests/network/test_zmq.py | 18 +- 15 files changed, 647 insertions(+), 284 deletions(-) diff --git a/cylc/flow/network/__init__.py b/cylc/flow/network/__init__.py index b55016dd48a..d98613c67f5 100644 --- a/cylc/flow/network/__init__.py +++ b/cylc/flow/network/__init__.py @@ -126,7 +126,7 @@ def __init__(self, pattern, suite=None, bind=False, context=None, self.loop = None self.stopping = False - def start(self, *args, **kwargs): + def start_(self, *args, **kwargs): """Start the server/network-component. Pass arguments to _start_ @@ -253,7 +253,7 @@ def _bespoke_start(self): self.stopping = False sleep(0) # yield control to other threads - def stop(self, stop_loop=True): + def stop_(self, stop_loop=True): """Stop the server. Args: diff --git a/cylc/flow/network/client.py b/cylc/flow/network/client.py index 1a99f23630a..c7c4fe013cf 100644 --- a/cylc/flow/network/client.py +++ b/cylc/flow/network/client.py @@ -127,7 +127,7 @@ def __init__( self._timeout_handler, suite, host, port) self.poller = None # Connect the ZMQ socket on instantiation - self.start(host, port, srv_public_key_loc) + self.start_(host, port, srv_public_key_loc) # gather header info post start self.header = self.get_header() diff --git a/cylc/flow/network/resolvers.py b/cylc/flow/network/resolvers.py index 83e72603b0c..e0023c4542f 100644 --- a/cylc/flow/network/resolvers.py +++ b/cylc/flow/network/resolvers.py @@ -16,8 +16,9 @@ """GraphQL resolvers for use in data accessing and mutation of workflows.""" -from operator import attrgetter from fnmatch import fnmatchcase +from getpass import getuser +from operator import attrgetter from graphene.utils.str_converters import to_snake_case from cylc.flow.data_store_mgr import ( @@ -342,38 +343,7 @@ async def nodes_mutator(self, *m_args): result = (True, 'Command queued') return [{'id': w_id, 'response': result}] - async def _mutation_mapper(self, command, args): + async def _mutation_mapper(self, command, kwargs): """Map between GraphQL resolvers and internal command interface.""" - if command in ['clear_broadcast', - 'expire_broadcast', - 'put_broadcast']: - return getattr(self.schd.task_events_mgr.broadcast_mgr, - command, None)(**args) - if command == 'put_ext_trigger': - return self.schd.ext_trigger_queue.put(( - args.get('event_message'), - args.get('event_id'))) - if command == 'put_messages': - messages = args.get('messages', []) - for severity, message in messages: - self.schd.message_queue.put(( - args.get('task_job', None), - args.get('event_time', None), - severity, message)) - return (True, 'Messages queued: %d' % len(messages)) - if command in ['set_stop_after_clock_time', - 'set_stop_after_point', - 'set_stop_after_task']: - mutate_args = [command, (), {}] - for val in args.values(): - mutate_args[1] = (val,) - return self.schd.command_queue.put(tuple(mutate_args)) - mutate_args = [command, (), {}] - for key, val in args.items(): - if isinstance(val, list): - mutate_args[1] = (val,) - elif isinstance(val, dict): - mutate_args[2] = val - else: - mutate_args[2][key] = val - return self.schd.command_queue.put(tuple(mutate_args)) + method = getattr(self.schd.server, command) + return method(user=getuser(), **kwargs) diff --git a/cylc/flow/network/schema.py b/cylc/flow/network/schema.py index 6b44237957f..745b5ed464b 100644 --- a/cylc/flow/network/schema.py +++ b/cylc/flow/network/schema.py @@ -22,7 +22,22 @@ from textwrap import dedent from typing import Callable, AsyncGenerator, Any -from cylc.flow.task_state import TASK_STATUSES_ORDERED +from cylc.flow.task_state import ( + TASK_STATUSES_ORDERED, + TASK_STATUS_DESC, + TASK_STATUS_RUNAHEAD, + TASK_STATUS_WAITING, + TASK_STATUS_QUEUED, + TASK_STATUS_EXPIRED, + TASK_STATUS_READY, + TASK_STATUS_SUBMIT_FAILED, + TASK_STATUS_SUBMIT_RETRYING, + TASK_STATUS_SUBMITTED, + TASK_STATUS_RETRYING, + TASK_STATUS_RUNNING, + TASK_STATUS_FAILED, + TASK_STATUS_SUCCEEDED +) from cylc.flow.data_store_mgr import ( ID_DELIM, FAMILIES, FAMILY_PROXIES, JOBS, TASKS, TASK_PROXIES @@ -878,20 +893,11 @@ async def nodes_mutator(root, info, command, ids, workflows=None, res = await resolvers.nodes_mutator(info, command, ids, w_args, args) return GenericResponse(result=res) - -class WorkflowName(String): - """a""" - - -class User(String): - """a""" +# Input types: -# Mutations defined: -class WorkflowID(InputObjectType): - """A workflow identifier of the form `user|workflow_name`.""" - name = WorkflowName(required=True) - user = User() +class WorkflowID(String): + """A registered workflow.""" class CyclePoint(String): @@ -899,21 +905,73 @@ class CyclePoint(String): class CyclePointGlob(String): - """An integer or date-time cyclepoint.""" + """A glob for integer or date-time cyclepoints. + The wildcard character (`*`) can be used to perform globbing. + For example `2000*` might match `2000-01-01T00:00Z`. -TaskStatus = Enum( - 'TaskStatus', - [ - (status, status) - for status in TASK_STATUSES_ALL - ] - #description='The status of ac task e.g. waiting, running, succeeded.' -) + """ + + +class RuntimeConfiguration(String): + """A configuration item for a task or family e.g. `script`.""" + + +class BroadcastSetting(InputObjectType): + """A task/family runtime setting as a key, value pair.""" + + key = RuntimeConfiguration( + description=sstrip(''' + The cylc namespace for the setting to modify. + e.g. `[environment]variable_name`. + '''), + required=True + ) + value = String( + description='The value of the modification' + ) + + +class BroadcastMode(Enum): + Set = 'put_broadcast' + Clear = 'clear_broadcast' + + @property + def description(self): + if self == BroadcastMode.Set: + return 'Create a new broadcast.' + if self == BroadcastMode.Clear: + return 'Revoke an existing broadcast.' + return '' + + +class TaskStatus(Enum): + """The status of a task in a workflow.""" + + # NOTE: this is an enumeration purely for the GraphQL schema + # TODO: the task statuses should be formally declared in a Python + # enumeration rendering this class unnecessary + # NOTE: runahead purposefully omitted to hide users from the task pool + # Runahead = TASK_STATUS_RUNAHEAD + Waiting = TASK_STATUS_WAITING + Queued = TASK_STATUS_QUEUED + Expired = TASK_STATUS_EXPIRED + Ready = TASK_STATUS_READY + SubmitFailed = TASK_STATUS_SUBMIT_FAILED + SubmitRetrying = TASK_STATUS_SUBMIT_RETRYING + Submitted = TASK_STATUS_SUBMITTED + Retrying = TASK_STATUS_RETRYING + Running = TASK_STATUS_RUNNING + Failed = TASK_STATUS_FAILED + Succeeded = TASK_STATUS_SUCCEEDED + + @property + def description(self): + return TASK_STATUS_DESC.get(self.value, '') class TaskState(InputObjectType): - """The state of a task, a combination of status and other field.""" + """The state of a task, a combination of status and other fields.""" status = TaskStatus() is_held = Boolean(description=sstrip(''' @@ -935,74 +993,144 @@ class NamespaceName(String): class NamespaceIDGlob(String): - """A glob search for an active task or family.""" + """A glob search for an active task or family. - cycle = CyclePointGlob() - namespace = NamespaceName() - status = TaskState() + Can use the wildcard character (`*`), e.g `foo*` might match `foot`. + """ + + # cycle = CyclePointGlob() + # namespace = NamespaceName() + # status = TaskState() -class TaskID(InputObjectType): +class TaskID(String): """The name of an active task.""" - cycle = CyclePoint(required=True) - name = TaskName(required=True) + # cycle = CyclePoint(required=True) + # name = TaskName(required=True) -class JobID(InputObjectType): +class JobID(String): """A job submission from an active task.""" - task = TaskID(required=None) - submission_number = Int(default=-1) + # task = TaskID(required=None) + # submission_number = Int(default=-1) class TimePoint(String): """A date-time in the ISO8601 format.""" -class ClearBroadcast(Mutation): - class Meta: - description = """Expire all settings targeting cycle points -earlier than cutoff.""" - resolver = partial(mutator, command='clear_broadcast') +LogLevels = Enum( + 'LogLevels', + list(logging._nameToLevel.items()), + description=lambda x: f'Python logging level: {x.name} = {x.value}.' + if x else '' +) - class Arguments: - workflows = List(WorkflowID, required=True) - point_strings = List( - CyclePoint, - description="""`["*"]`""", - default_value=['*']) - namespaces = List( - NamespaceName, - description="""namespaces: `["foo", "BAZ"]`""",) - cancel_settings = List( - GenericScalar, - description=""" -settings: `[{environment: {ENVKEY: "env_val"}}, ...]`""",) - result = GenericScalar() +class SuiteStopMode(Enum): + """The mode used to stop a running workflow.""" + + # Note: contains only the REQUEST_* values from StopMode + Clean = StopMode.REQUEST_CLEAN + Now = StopMode.REQUEST_NOW + NowNow = StopMode.REQUEST_NOW_NOW + + @property + def description(self): + return StopMode(self.value).describe() -class ExpireBroadcast(Mutation): +# Mutations: + +# TODO: re-instate: +# - get-broadcast (can just use GraphQL query BUT needs CLI access too) +# - expire-broadcast + +class Broadcast(Mutation): class Meta: - description = """Clear settings globally, -or for listed namespaces and/or points.""" - resolver = partial(mutator, command='expire_broadcast') + description = sstrip(''' + Override or add new [runtime] config in targeted namespaces in + a running suite. + + Uses for broadcast include making temporary changes to task + behaviour, and task-to-downstream-task communication via + environment variables. + + A broadcast can target any [runtime] namespace for all cycles or + for a specific cycle. If a task is affected by specific-cycle and + all-cycle broadcasts at once, the specific takes precedence. If + a task is affected by broadcasts to multiple ancestor + namespaces, the result is determined by normal [runtime] + inheritance. In other words, it follows this order: + + `all:root -> all:FAM -> all:task -> tag:root -> tag:FAM -> + tag:task` + + Broadcasts persist, even across suite restarts, until they expire + when their target cycle point is older than the oldest current in + the suite, or until they are explicitly cancelled with this + command. All-cycle broadcasts do not expire. + + For each task the final effect of all broadcasts to all namespaces + is computed on the fly just prior to job submission. The + `--cancel` and `--clear` options simply cancel (remove) active + broadcasts, they do not act directly on the final task-level + result. Consequently, for example, you cannot broadcast to "all + cycles except Tn" with an all-cycle broadcast followed by a cancel + to Tn (there is no direct broadcast to Tn to cancel); and you + cannot broadcast to "all members of FAMILY except member_n" with a + general broadcast to FAMILY followed by a cancel to member_n (there + is no direct broadcast to member_n to cancel). + ''') + resolver = partial(mutator, command='broadcast') class Arguments: workflows = List(WorkflowID, required=True) - cutoff = String(description="""String""") + mode = BroadcastMode( + default_value=1, + required=True + ) + cycle_points = List( + CyclePoint, + description=sstrip(''' + List of cycle points to target or `*` to cancel all all-cycle + broadcasts without canceling all specific-cycle broadcasts. + '''), + default_value=['*']) + namespaces = List( + NamespaceName, + description='Target namespaces.', + default_value=['root'] + ) + settings = List( + BroadcastSetting, + description='Target settings.' + ) + files = List( + String, + description=sstrip(''' + File with config to broadcast. Can be used multiple times + ''') + ) result = GenericScalar() -class HoldWorkflow(Mutation): +class Hold(Mutation): class Meta: - description = 'Hold workflow.' - resolver = partial(mutator, command='hold_workflow') + description = sstrip(''' + Hold a workflow or tasks within it. + ''') + resolver = partial(mutator, command='hold') class Arguments: workflows = List(WorkflowID, required=True) + ids = List( + NamespaceIDGlob, + description='Hold the specified tasks rather than the workflow.' + ) time = TimePoint(description=sstrip(''' Get the workflow to hold after the specified wallclock time has passed. @@ -1011,9 +1139,16 @@ class Arguments: result = GenericScalar() -class NudgeWorkflow(Mutation): +class Nudge(Mutation): class Meta: - description = """Tell workflow to try task processing.""" + description = sstrip(''' + Cause the Cylc task processing loop to be invoked on a running + suite. + + This happens automatically when the state of any task changes + such that task processing (dependency negotiation etc.) + is required, or if a clock-trigger task is ready to run. + ''') resolver = partial(mutator, command='nudge') class Arguments: @@ -1022,33 +1157,33 @@ class Arguments: result = GenericScalar() -class PutBroadcast(Mutation): +class Ping(Mutation): class Meta: - description = """Put up new broadcast settings -(server side interface).""" - resolver = partial(mutator, command='put_broadcast') + description = sstrip(''' + Send a test message to a running suite. + ''') + resolver = partial(mutator, command='ping_suite') class Arguments: workflows = List(WorkflowID, required=True) - point_strings = List( - CyclePointGlob, - description="""`["*"]`""", - default_value=['*']) - namespaces = List( - NamespaceName, - description="""namespaces: `["foo", "BAZ"]`""",) - settings = List( - GenericScalar, - description=""" -settings: `[{environment: {ENVKEY: "env_val"}}, ...]`""",) result = GenericScalar() -class PutMessages(Mutation): +class Message(Mutation): class Meta: description = sstrip(''' - Put task messages in queue for processing later by the main loop. + Record task job messages. + + Send task job messages to: + - The job stdout/stderr. + - The job status file, if there is one. + - The suite server program, if communication is possible. + + Task jobs use this to record and report status such + as success and failure. Applications run by task jobs can use + this command to report messages and to report registered task + outputs. ''') resolver = partial(nodes_mutator, command='put_messages') @@ -1059,25 +1194,60 @@ class Arguments: messages = List( List(String), description="""List in the form `[[severity, message], ...]`.""", - default_value=None) + default_value=None + ) result = GenericScalar() -class ReleaseWorkflow(Mutation): +class Release(Mutation): class Meta: - description = """Reload workflow definitions.""" - resolver = partial(mutator, command='release_suite') + description = sstrip(''' + Release a held workflow or tasks within it. + + See also the opposite command `hold`. + ''') + resolver = partial(mutator, command='release') class Arguments: workflows = List(WorkflowID, required=True) + ids = List( + NamespaceIDGlob, + description=sstrip(''' + Release matching tasks rather than the workflow as whole. + ''') + ) result = GenericScalar() -class ReloadWorkflow(Mutation): +class Reload(Mutation): class Meta: - description = """Tell workflow to reload the workflow definition.""" + description = sstrip(''' + Tell a suite to reload its definition at run time. + + All settings including task definitions, with the + exception of suite log configuration, can be changed on reload. + Note that defined tasks can be be added to or removed from a + running suite using "insert" and "remove" without reloading. This + command also allows addition and removal of actual task + definitions, and therefore insertion of tasks that were not defined + at all when the suite started (you will still need to manually + insert a particular instance of a newly defined task). Live task + proxies that are orphaned by a reload (i.e. their task definitions + have been removed) will be removed from the task pool if they have + not started running yet. Changes to task definitions take effect + immediately, unless a task is already running at reload time. + + If the suite was started with Jinja2 template variables + set on the command line (cylc run --set FOO=bar REG) the same + template settings apply to the reload (only changes to the suite.rc + file itself are reloaded). + + If the modified suite definition does not parse, + failure to reload will be reported but no harm will be done to the + running suite. + ''') resolver = partial(mutator, command='reload_suite') class Arguments: @@ -1086,16 +1256,15 @@ class Arguments: result = GenericScalar() -LogLevels = Enum( - 'LogLevels', - list(logging._nameToLevel.items()) - #description='Logging severity level.' -) - - class SetVerbosity(Mutation): class Meta: - description = """Set workflow verbosity to new level.""" + description = sstrip(''' + Change the logging severity level of a running suite. + + Only messages at or above the chosen severity level will be logged; + for example, if you choose `WARNING`, only warnings and critical + messages will be logged. + ''') resolver = partial(mutator, command='set_verbosity') class Arguments: @@ -1105,48 +1274,90 @@ class Arguments: result = GenericScalar() -class StopWorkflow(Mutation): +class Stop(Mutation): class Meta: - description = 'Stop a running workflow' + description = sstrip(f''' + Tell a suite server program to shut down. + + In order to prevent failures going unnoticed, suites only shut down + automatically at a final cycle point if no failed tasks are + present. There are several shutdown methods: + + Tasks that become ready after the shutdown is ordered will be + submitted immediately if the suite is restarted. Remaining task + event handlers and job poll and kill commands, however, will be + executed prior to shutdown, unless the stop mode is + `{StopMode.REQUEST_NOW}`. + ''') resolver = partial(mutator, command='stop_workflow') class Arguments: workflows = List(WorkflowID, required=True) - mode = Enum.from_enum( - StopMode, - # TODO: this isn't working - # description=StopMode.description - )() - cycle_point = CyclePoint() - clock_time = TimePoint() - task = TaskID() + mode = SuiteStopMode( + # TODO default + ) + cycle_point = CyclePoint( + description='Stop after the suite reaches this cycle.' + ) + clock_time = TimePoint( + description='Stop after wall-clock time passes this point.' + ) + task = TaskID( + description='Stop after this task succeeds.' + ) result = GenericScalar() class TakeCheckpoint(Mutation): class Meta: - description = """Checkpoint current task pool.""" + description = 'Tell the suite to checkpoint its current state.' resolver = partial(mutator, command='take_checkpoints') class Arguments: workflows = List(WorkflowID, required=True) name = String( - description="""The checkpoint name""", + description='The checkpoint name.', required=True) result = GenericScalar() -class ExternalTrigger(Mutation): +class ExtTrigger(Mutation): class Meta: - description = """Server-side external event trigger interface.""" - resolver = partial(mutator, command='put_external_trigger') + description = sstrip(''' + Report an external event message to a suite server program. + + It is expected that a task in the suite has registered the same + message as an external trigger - a special prerequisite to be + satisfied by an external system, via this command, rather than by + triggering off other tasks. + + The ID argument should uniquely distinguish one external trigger + event from the next. When a task's external trigger is satisfied by + an incoming message, the message ID is broadcast to all downstream + tasks in the cycle point as `$CYLC_EXT_TRIGGER_ID` so that they can + use it - e.g. to identify a new data file that the external + triggering system is responding to. + + Use the retry options in case the target suite is down or out of + contact. + + Note: To manually trigger a task use "Trigger" not + "ExtTrigger". + ''') + resolver = partial(mutator, command='put_ext_trigger') class Arguments: workflows = List(WorkflowID, required=True) - event_message = String(required=True) - event_id = String(required=True) + message = String( + description='External trigger message.', + required=True + ) + id = String( + description='Unique trigger ID.', + required=True + ) result = GenericScalar() @@ -1159,127 +1370,196 @@ class Arguments: result = GenericScalar() -class DryRunTasks(Mutation, TaskMutation): - class Arguments(TaskMutation.Arguments): - check_syntax = Boolean() - +class DryRun(Mutation, TaskMutation): class Meta: - description = '' + description = sstrip(''' + [For internal use] Prepare the job file for a task. + ''') resolver = partial(mutator, command='dry_run_tasks') + class Arguments(TaskMutation.Arguments): + check_syntax = Boolean( + description='Check shell syntax.', + default_value=True + ) + -class HoldTasks(Mutation, TaskMutation): +class Insert(Mutation, TaskMutation): class Meta: - description = '' - resolver = partial(mutator, command='hold_tasks') + description = sstrip(''' + Insert new task proxies into the task pool of a running workflow. + For example to enable re-triggering earlier tasks already removed + from the pool. -class InsertTasks(Mutation, TaskMutation): - class Arguments(TaskMutation.Arguments): - no_check = Boolean() - stop_point_string = String() + Note: inserted cycling tasks cycle on as normal, even if another + instance of the same task exists at a later cycle (instances of the + same task at different cycles can coexist, but a newly spawned task + will not be added to the pool if it catches up to another task with + the same ID). - class Meta: - description = '' + See also "Submit", for running tasks without the scheduler. + ''') resolver = partial(mutator, command='insert_tasks') - -class KillTaskJobs(Mutation, TaskMutation): + class Arguments(TaskMutation.Arguments): + no_check = Boolean( + description=sstrip(''' + Add task even if the provided cycle point is not valid for + the given task. + ''') + ) + stop_point = CyclePoint( + description='hold/stop cycle point for inserted task.' + ) + + +class Kill(Mutation, TaskMutation): # TODO: This should be a job mutation? class Meta: - description = '' - # TODO: rename tasks=>jobs ??? + description = sstrip(''' + Kill jobs of active tasks and update their statuses accordingly. + ''') resolver = partial(mutator, command='kill_tasks') -class PollTasks(Mutation, TaskMutation): - class Arguments(TaskMutation.Arguments): - poll_succ = Boolean() - +class Poll(Mutation, TaskMutation): class Meta: - description = '' + description = sstrip(''' + Poll (query) task jobs to verify and update their statuses. + ''') resolver = partial(mutator, command='poll_tasks') + class Arguments(TaskMutation.Arguments): + poll_succ = Boolean( + description='Allow polling of succeeded tasks.' + ) + -class ReleaseTasks(Mutation, TaskMutation): +class Remove(Mutation, TaskMutation): class Meta: - description = '' - resolver = partial(mutator, command='release_tasks') + description = sstrip(''' + Remove one or more task instances from a running workflow. + Tasks will be forced to spawn successors before removal if they + have not done so already, unless you use `no_spawn`. + ''') + resolver = partial(mutator, command='remove_tasks') -class RemoveTasks(Mutation, TaskMutation): class Arguments(TaskMutation.Arguments): - spawn = Boolean() + spawn = Boolean( + description='Spawn successors before removal.', + default_value=True + ) + +class Reset(Mutation, TaskMutation): class Meta: - description = '' - resolver = partial(mutator, command='remove_tasks') + description = sstrip(f''' + Force task instances to a specified state. + Outputs are automatically updated to reflect the new task state, + except for custom message outputs which can be manipulated directly + with `output`. -class ResetTaskStates(Mutation, TaskMutation): - class Arguments(TaskMutation.Arguments): - state = String() - outputs = List(String) + Prerequisites reflect the state of other tasks; they are not + changed except to unset them on resetting state to + `{TASK_STATUS_WAITING}` or earlier. - class Meta: - description = '' + Note: To hold and release tasks use "Hold" and "Release", not this + command. + ''') resolver = partial(mutator, command='reset_task_states') - -class SpawnTasks(Mutation, TaskMutation): + class Arguments(TaskMutation.Arguments): + state = TaskStatus( + description='Reset the task status to this.' + ) + outputs = List( + String, + description=sstrip(''' + Find task output by message string or trigger string, set + complete or incomplete with `!OUTPUT`, `*` to set all + complete, `!*` to set all incomplete. + ''') + ) + + +class Spawn(Mutation, TaskMutation): class Meta: - description = '' + description = sstrip(f''' + Force task proxies to spawn successors at their own next cycle + point. + + Tasks normally spawn on reaching the {TASK_STATUS_SUBMITTED} + status. Spawning them early allows running successive instances of + the same task out of order. See also the `spawn to max active + cycle points` workflow configuration. + + Note this command does not operate on tasks at any arbitrary point + in the abstract workflow graph - tasks not already in the pool must + be inserted first with "Insert". + ''') resolver = partial(mutator, command='spawn_tasks') -class TriggerTasks(Mutation, TaskMutation): - class Arguments(TaskMutation.Arguments): - back_out = Boolean() - +class Trigger(Mutation, TaskMutation): class Meta: - description = '' + description = sstrip(''' + Manually trigger tasks. + + TODO: re-implement edit funtionality! + + For single tasks you can use `edit` to edit the generated job + script before it submits, to apply one-off changes. A diff between + the original and edited job script will be saved to the task job + log directory. + + Warning: waiting tasks that are queue-limited will be queued if + triggered, to submit as normal when released by the queue; queued + tasks will submit immediately if triggered, even if that violates + the queue limit (so you may need to trigger a queue-limited task + twice to get it to submit immediately). + + Note: tasks not already in the pool must be inserted first with + "Insert" in order to be matched. + ''') resolver = partial(mutator, command='trigger_tasks') + class Arguments(TaskMutation.Arguments): + # back_out = Boolean() + # TODO: remove or re-implement? + pass + # Mutation declarations + class Mutations(ObjectType): # workflow actions - clear_broadcast = ClearBroadcast.Field( - description=ClearBroadcast._meta.description) - expire_broadcast = ExpireBroadcast.Field( - description=ExpireBroadcast._meta.description) - put_broadcast = PutBroadcast.Field( - description=PutBroadcast._meta.description) - ext_trigger = ExternalTrigger.Field( - description=ExternalTrigger._meta.description) - hold_workflow = HoldWorkflow.Field( - description=HoldWorkflow._meta.description) - nudge_workflow = NudgeWorkflow.Field( - description=NudgeWorkflow._meta.description) - put_messages = PutMessages.Field( - description=PutMessages._meta.description) - release_workflow = ReleaseWorkflow.Field( - description=ReleaseWorkflow._meta.description) - reload_workflow = ReloadWorkflow.Field( - description=ReloadWorkflow._meta.description) + broadcast = Broadcast.Field(description=Message._meta.description) + ext_trigger = ExtTrigger.Field( + description=ExtTrigger._meta.description) + hold = Hold.Field(description=Hold._meta.description) + nudge = Nudge.Field(description=Nudge._meta.description) + message = Message.Field(description=Message._meta.description) + ping = Ping.Field(description=Ping._meta.description) + release = Release.Field(description=Release._meta.description) + reload = Reload.Field(description=Reload._meta.description) set_verbosity = SetVerbosity.Field( description=SetVerbosity._meta.description) - stop_workflow = StopWorkflow.Field( - description=StopWorkflow._meta.description) + stop = Stop.Field(description=Stop._meta.description) take_checkpoint = TakeCheckpoint.Field( description=TakeCheckpoint._meta.description) # task actions - dry_run_tasks = DryRunTasks.Field() - hold_tasks = HoldTasks.Field() - insert_tasks = InsertTasks.Field() - kill_task_jobs = KillTaskJobs.Field() - poll_tasks = PollTasks.Field() - release_tasks = ReleaseTasks.Field() - remove_tasks = RemoveTasks.Field() - reset_task_states = ResetTaskStates.Field() - spawn_tasks = SpawnTasks.Field() - trigger_tasks = TriggerTasks.Field() + dry_run = DryRun.Field(description=DryRun._meta.description) + insert = Insert.Field(description=Insert._meta.description) + kill = Kill.Field(description=Kill._meta.description) + poll = Poll.Field(description=Poll._meta.description) + remove = Remove.Field(description=Remove._meta.description) + reset = Reset.Field(description=Reset._meta.description) + spawn = Spawn.Field(description=Spawn._meta.description) + trigger = Trigger.Field(description=Trigger._meta.description) # job actions # TODO diff --git a/cylc/flow/network/server.py b/cylc/flow/network/server.py index 49e952eb3f0..39601a0a775 100644 --- a/cylc/flow/network/server.py +++ b/cylc/flow/network/server.py @@ -50,6 +50,14 @@ def expose(func=None): return func +def filter_none(dictionary): + return { + key: value + for key, value in dictionary.items() + if value is not None + } + + class SuiteRuntimeServer(ZMQSocketBase): """Suite runtime service API facade exposed via zmq. @@ -291,6 +299,24 @@ def api(self, endpoint=None): return '%s\n%s' % (head, tail) return 'No method by name "%s"' % endpoint + @authorise(Priv.CONTROL) + @expose + def broadcast( + self, + cycle_points=None, + namespaces=None, + settings=None + ): + """Put or clear broadcasts.""" + if mode == 'put_broadcast': + return self.schd.task_events_mgr.broadcast_mgr.put_broadcast( + cycle_points, namespaces, settings) + if mode == 'clear_broadcast': + return self.schd.task_events_mgr.broadcast_mgr.clear_broadcast( + cycle_points, namespaces, cancel_settings) + else: + raise ValueError('TEMP: TODO: FixMe!') + @authorise(Priv.READ) @expose def graphql(self, request_string=None, variables=None): @@ -331,6 +357,7 @@ def graphql(self, request_string=None, variables=None): return errors return executed.data + # TODO: deprecated by broadcast() @authorise(Priv.CONTROL) @expose def clear_broadcast( @@ -565,6 +592,21 @@ def get_task_requisites(self, task_globs=None, list_prereqs=False): return self.schd.info_get_task_requisites( task_globs, list_prereqs=list_prereqs) + @authorise(Priv.CONTROL) + @expose + def hold(self, ids=None, time=None): + """Hold the workflow.""" + self.schd.command_queue.put(( + 'hold', + tuple(), + filter_none({ + 'ids': ids, + 'time': time + }) + )) + return (True, 'Command queued') + + # TODO: deprecated by hold() @authorise(Priv.CONTROL) @expose def hold_after_point_string(self, point_string): @@ -586,6 +628,7 @@ def hold_after_point_string(self, point_string): ("hold_after_point_string", (point_string,), {})) return (True, 'Command queued') + # TODO: deprecated by hold() @authorise(Priv.CONTROL) @expose def hold_suite(self): @@ -603,6 +646,7 @@ def hold_suite(self): self.schd.command_queue.put(("hold_suite", (), {})) return (True, 'Command queued') + # TODO: deprecated by hold() @authorise(Priv.CONTROL) @expose def hold_tasks(self, task_globs): @@ -817,6 +861,7 @@ def poll_tasks(self, task_globs=None, poll_succ=False): ("poll_tasks", (task_globs,), {"poll_succ": poll_succ})) return (True, 'Command queued') + # TODO: deprecated by broadcast() @authorise(Priv.CONTROL) @expose def put_broadcast( @@ -923,6 +968,17 @@ def reload_suite(self): self.schd.command_queue.put(("reload_suite", (), {})) return (True, 'Command queued') + @authorise(Priv.CONTROL) + @expose + def release(self, ids=None): + """Release (un-hold) the workflow.""" + if ids: + self.schd.command_queue.put(("release_tasks", (ids,), {})) + else: + self.schd.command_queue.put(("release_suite", (), {})) + return (True, 'Command queued') + + # TODO: deprecated by release() @authorise(Priv.CONTROL) @expose def release_suite(self): @@ -940,6 +996,7 @@ def release_suite(self): self.schd.command_queue.put(("release_suite", (), {})) return (True, 'Command queued') + # TODO: deprecated by release() @authorise(Priv.CONTROL) @expose def release_tasks(self, task_globs): @@ -1014,6 +1071,7 @@ def reset_task_states(self, task_globs, state=None, outputs=None): (task_globs,), {"state": state, "outputs": outputs})) return (True, 'Command queued') + # TODO: deprecated by stop() @authorise(Priv.SHUTDOWN) @expose def set_stop_after_clock_time(self, datetime_string): @@ -1038,6 +1096,7 @@ def set_stop_after_clock_time(self, datetime_string): ("set_stop_after_clock_time", (datetime_string,), {})) return (True, 'Command queued') + # TODO: deprecated by stop() @authorise(Priv.SHUTDOWN) @expose def set_stop_after_point(self, point_string): @@ -1060,6 +1119,7 @@ def set_stop_after_point(self, point_string): ("set_stop_after_point", (point_string,), {})) return (True, 'Command queued') + # TODO: deprecated by stop() @authorise(Priv.SHUTDOWN) @expose def set_stop_after_task(self, task_id): @@ -1081,6 +1141,7 @@ def set_stop_after_task(self, task_id): ("set_stop_after_task", (task_id,), {})) return (True, 'Command queued') + # TODO: deprecated by stop() @authorise(Priv.SHUTDOWN) @expose def set_stop_cleanly(self, kill_active_tasks=False): @@ -1147,6 +1208,30 @@ def spawn_tasks(self, task_globs): self.schd.command_queue.put(("spawn_tasks", (task_globs,), {})) return (True, 'Command queued') + @authorise(Priv.SHUTDOWN) + @expose + def stop( + self, + mode=None, + cycle_point=None, + clock_time=None, + task=None + ): + """Stop the workflow.""" + + self.schd.command_queue.put(( + "stop", + (), + filter_none({ + mode: mode, + cycle_point: cycle_point, + clock_time: clock_time, + task: task + }) + )) + return (True, 'Command queued') + + # TODO: deprecated by stop() @authorise(Priv.SHUTDOWN) @expose def stop_now(self, terminate=False): diff --git a/cylc/flow/network/subscriber.py b/cylc/flow/network/subscriber.py index fa1b492901c..11d2405fcf3 100644 --- a/cylc/flow/network/subscriber.py +++ b/cylc/flow/network/subscriber.py @@ -77,7 +77,7 @@ def __init__( self.topics = set(b'') self.topics = topics # Connect the ZMQ socket on instantiation - self.start(host, port, srv_public_key_loc) + self.start_(host, port, srv_public_key_loc) def _socket_options(self): """Set options after socket instantiation and before connect. diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 182cfe34fbd..2b1a133e762 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -285,10 +285,10 @@ def start(self): port_range = glbl_cfg().get(['suite servers', 'run ports']) self.server = SuiteRuntimeServer( self, context=self.zmq_context, barrier=barrier) - self.server.start(port_range[0], port_range[-1]) + self.server.start_(port_range[0], port_range[-1]) self.publisher = WorkflowPublisher( self.suite, context=self.zmq_context, barrier=barrier) - self.publisher.start(port_range[0], port_range[-1]) + self.publisher.start_(port_range[0], port_range[-1]) # wait for threads to setup socket ports before continuing barrier.wait() self.port = self.server.port @@ -815,7 +815,7 @@ def info_ping_task(self, task_id, exists_only=False): task_id = self.get_standardised_taskid(task_id) return self.pool.ping_task(task_id, exists_only) - def command_stop_workflow( + def command_stop( self, stop_mode=None, cycle_point=None, @@ -832,7 +832,7 @@ def command_stop_workflow( # schedule shutdown after tasks pass provided cycle point if cycle_point: - point = self.get-standardised_point(cycle_point) + point = self.get_standardised_point(cycle_point) if self.pool.set_stop_point(point): self.options.stopcp = str(point) self.suite_db_mgr.put_suite_stop_cycle_point( @@ -845,7 +845,8 @@ def command_stop_workflow( if clock_time: parser = TimePointParser() time = parser.parse(clock_time) - self.set_stop_clock(int(stop_time.get("seconds_since_unix_epoch"))) + self.set_stop_clock( + int(stop_time.get("seconds_since_unix_epoch"))) # schedule shutdown after task succeeds if task: @@ -858,14 +859,14 @@ def command_stop_workflow( def command_set_stop_cleanly(self, kill_active_tasks=False): """Stop job submission and set the flag for clean shutdown.""" - # TODO: deprecate + # TODO: deprecated by command_stop() self._set_stop() if kill_active_tasks: self.time_next_kill = time() def command_stop_now(self, terminate=False): """Shutdown immediately.""" - # TODO: deprecate + # TODO: deprecated by command_stop() if terminate: self._set_stop(StopMode.REQUEST_NOW_NOW) else: @@ -880,7 +881,7 @@ def _set_stop(self, stop_mode=None): def command_set_stop_after_point(self, point_string): """Set stop after ... point.""" - # TODO: deprecate + # TODO: deprecated by command_stop() stop_point = self.get_standardised_point(point_string) if self.pool.set_stop_point(stop_point): self.options.stopcp = str(stop_point) @@ -909,8 +910,14 @@ def command_set_stop_after_task(self, task_id): if TaskID.is_valid_id(task_id): self.set_stop_task(task_id) + def command_release(self, ids=None): + if ids: + return self.pool.release_tasks(ids) + self.release_suite() + def command_release_tasks(self, items): """Release tasks.""" + # TODO: deprecated by command_release() return self.pool.release_tasks(items) def command_poll_tasks(self, items=None, poll_succ=False): @@ -939,23 +946,33 @@ def command_kill_tasks(self, items=None): def command_release_suite(self): """Release all task proxies in the suite.""" + # TODO: deprecated by command_release() self.release_suite() + def command_hold(self, ids=None, time=None): + if ids: + self.pool.hold_tasks(ids) + if time: + point = self.get_standardised_point(time) + self.hold_suite(point) + LOG.info( + 'The suite will pause when all tasks have passed %s', point) + if not (ids or time): + self.hold_suite() + def command_hold_tasks(self, items): """Hold selected task proxies in the suite.""" + # TODO: deprecated by command_hold() return self.pool.hold_tasks(items) - def command_hold_workflow(self, point_string=None): - self.hold_suite(point_string) - def command_hold_suite(self): """Hold all task proxies in the suite.""" - # TODO: deprecate + # TODO: deprecated by command_hold() self.hold_suite() def command_hold_after_point_string(self, point_string): """Hold tasks AFTER this point (itask.point > point).""" - # TODO: deprecate + # TODO: deprecated by command_hold() point = self.get_standardised_point(point_string) self.hold_suite(point) LOG.info( @@ -1839,11 +1856,11 @@ def shutdown(self, reason): LOG.exception(exc) if self.server: - self.server.stop() + self.server.stop_() if self.publisher: self.publisher.publish( [(b'shutdown', f'{str(reason)}'.encode('utf-8'))]) - self.publisher.stop() + self.publisher.stop_() self.curve_auth.stop() # stop the authentication thread # Flush errors and info before removing suite contact file diff --git a/cylc/flow/suite_status.py b/cylc/flow/suite_status.py index c721943a24c..dc84ade63a7 100644 --- a/cylc/flow/suite_status.py +++ b/cylc/flow/suite_status.py @@ -74,24 +74,17 @@ class StopMode(Enum): def describe(self): """Return a user-friendly description of this state.""" if self == self.AUTO: - return 'wait until suite has completed' + return 'Wait until suite has completed.' if self == self.AUTO_ON_TASK_FAILURE: - return 'wait until the first task fails' + return 'Wait until the first task fails.' if self == self.REQUEST_CLEAN: - return 'wait for active jobs to complete' + return 'Wait for active jobs to complete.' if self == self.REQUEST_NOW: - return 'immediate shutdown, wait for event handlers to complete' + return 'Immediate shutdown, wait for event handlers to complete.' if self == self.REQUEST_NOW_NOW: - return 'immediate shutdown' + return 'Immediate shutdown.' return '' - @classmethod - def description(cls, value): - try: - cls(value).describe() - except ValueError: - pass - class AutoRestartMode(Enum): """The possible modes of a suite auto-restart.""" diff --git a/cylc/flow/task_state.py b/cylc/flow/task_state.py index 20e9fe0e085..76382fcfb73 100644 --- a/cylc/flow/task_state.py +++ b/cylc/flow/task_state.py @@ -53,6 +53,33 @@ # Job execution failed, but will try again soon: TASK_STATUS_RETRYING = "retrying" +TASK_STATUS_DESC = { + TASK_STATUS_RUNAHEAD: + 'Waiting for dependencies to be satisfied.', + TASK_STATUS_WAITING: + 'Waiting for dependencies to be satisfied.', + TASK_STATUS_QUEUED: + 'Depencies are satisfied, placed in internal queue.', + TASK_STATUS_EXPIRED: + 'Execution skipped.', + TASK_STATUS_READY: + 'Cylc is preparing a job for submission.', + TASK_STATUS_SUBMIT_FAILED: + 'Job submission has failed.', + TASK_STATUS_SUBMIT_RETRYING: + 'Atempting to re-submit job after previous failed attempt.', + TASK_STATUS_SUBMITTED: + 'Job has been submitted.', + TASK_STATUS_RETRYING: + 'Attempting to re-run job after previous failed attempt.', + TASK_STATUS_RUNNING: + 'Job is running.', + TASK_STATUS_FAILED: + 'Job has returned non-zero exit code.', + TASK_STATUS_SUCCEEDED: + 'Job has returned zero exit code.' +} + # Tasks statuses ordered according to task runtime progression. TASK_STATUSES_ORDERED = [ TASK_STATUS_RUNAHEAD, diff --git a/cylc/flow/tests/network/test_client.py b/cylc/flow/tests/network/test_client.py index 4e7a076b0a0..3a2ea5f68dc 100644 --- a/cylc/flow/tests/network/test_client.py +++ b/cylc/flow/tests/network/test_client.py @@ -89,7 +89,7 @@ def setUp(self) -> None: barrier=barrier, daemon=True) port_range = glbl_cfg().get(['suite servers', 'run ports']) - self.server.start(port_range[0], port_range[-1]) + self.server.start_(port_range[0], port_range[-1]) # barrier.wait() doesn't seem to work properly here # so this workaround will do while barrier.n_waiting < 1: @@ -103,8 +103,8 @@ def setUp(self) -> None: sleep(0.5) def tearDown(self): - self.server.stop() - self.client.stop() + self.server.stop_() + self.client.stop_() def test_constructor(self): self.assertFalse(self.client.socket.closed) diff --git a/cylc/flow/tests/network/test_publisher.py b/cylc/flow/tests/network/test_publisher.py index 796d78a63e3..671bc2e86fc 100644 --- a/cylc/flow/tests/network/test_publisher.py +++ b/cylc/flow/tests/network/test_publisher.py @@ -87,7 +87,7 @@ def setUp(self) -> None: self.pub_data = self.scheduler.ws_data_mgr.get_publish_deltas() def tearDown(self): - self.publisher.stop() + self.publisher.stop_() def test_constructor(self): self.assertFalse(self.publisher.threaded) @@ -95,7 +95,7 @@ def test_constructor(self): def test_publish(self): """Test publishing data.""" - self.publisher.start(*PORT_RANGE) + self.publisher.start_(*PORT_RANGE) subscriber = WorkflowSubscriber( self.suite_name, host=self.scheduler.host, @@ -110,7 +110,7 @@ def test_publish(self): delta = DELTAS_MAP[btopic.decode('utf-8')]() delta.ParseFromString(msg) self.assertEqual(delta.id, self.workflow_id) - subscriber.stop() + subscriber.stop_() with self.assertLogs(LOG, level='ERROR') as cm: self.publisher.publish(None) self.assertIn('publish: ', cm.output[0]) @@ -119,16 +119,16 @@ def test_start(self): """Test publisher start.""" self.assertIsNone(self.publisher.loop) self.assertIsNone(self.publisher.port) - self.publisher.start(*PORT_RANGE) + self.publisher.start_(*PORT_RANGE) self.assertIsNotNone(self.publisher.loop) self.assertIsNotNone(self.publisher.port) - self.publisher.stop() + self.publisher.stop_() def test_stop(self): """Test publisher stop.""" - self.publisher.start(*PORT_RANGE) + self.publisher.start_(*PORT_RANGE) self.assertFalse(self.publisher.socket.closed) - self.publisher.stop() + self.publisher.stop_() self.assertTrue(self.publisher.socket.closed) diff --git a/cylc/flow/tests/network/test_resolvers.py b/cylc/flow/tests/network/test_resolvers.py index d28b19ba9ad..646ade2a98d 100644 --- a/cylc/flow/tests/network/test_resolvers.py +++ b/cylc/flow/tests/network/test_resolvers.py @@ -23,7 +23,7 @@ DataStoreMgr, ID_DELIM, EDGES, TASK_PROXIES, WORKFLOW ) from cylc.flow.network.schema import parse_node_id -from cylc.flow.network.resolvers import workflow_filter, node_filter, Resolvers +from cylc.flow.network.resolvers import node_filter, Resolvers def _run_coroutine(coro): @@ -55,15 +55,6 @@ class FakeFlow: status = 'running' -def test_workflow_filter(): - data = {WORKFLOW: FakeFlow()} - args = deepcopy(FLOW_ARGS) - args['workflows'].append(('*', 'jin', None)) - assert not workflow_filter(data, args) - args['workflows'].append(('qux', 'baz', 'running')) - assert workflow_filter(data, args) - - class FakeNode: id = f'qux{ID_DELIM}baz{ID_DELIM}20130808T00{ID_DELIM}foo{ID_DELIM}1' namespace = ['root', 'foo'] diff --git a/cylc/flow/tests/network/test_server.py b/cylc/flow/tests/network/test_server.py index 95b53c14c93..857b9e62c10 100644 --- a/cylc/flow/tests/network/test_server.py +++ b/cylc/flow/tests/network/test_server.py @@ -91,7 +91,7 @@ def setUp(self) -> None: daemon=True ) self.server.public_priv = Priv.CONTROL - self.server.start(*PORT_RANGE) + self.server.start_(*PORT_RANGE) # barrier.wait() doesn't seem to work properly here # so this workaround will do while barrier.n_waiting < 1: @@ -100,7 +100,7 @@ def setUp(self) -> None: sleep(0.5) def tearDown(self): - self.server.stop() + self.server.stop_() def test_constructor(self): self.assertFalse(self.server.socket.closed) diff --git a/cylc/flow/tests/network/test_subscriber.py b/cylc/flow/tests/network/test_subscriber.py index 91786e866f0..71cb627937b 100644 --- a/cylc/flow/tests/network/test_subscriber.py +++ b/cylc/flow/tests/network/test_subscriber.py @@ -89,7 +89,7 @@ def setUp(self) -> None: self.workflow_id = self.scheduler.ws_data_mgr.workflow_id self.publisher = WorkflowPublisher( self.suite_name, threaded=False, daemon=True) - self.publisher.start(*PORT_RANGE) + self.publisher.start_(*PORT_RANGE) self.subscriber = WorkflowSubscriber( self.suite_name, host=self.scheduler.host, @@ -102,8 +102,8 @@ def setUp(self) -> None: self.data = None def tearDown(self): - self.subscriber.stop() - self.publisher.stop() + self.subscriber.stop_() + self.publisher.stop_() def test_constructor(self): """Test class constructor result.""" diff --git a/cylc/flow/tests/network/test_zmq.py b/cylc/flow/tests/network/test_zmq.py index 8a91aabf0c1..a98b48ee619 100644 --- a/cylc/flow/tests/network/test_zmq.py +++ b/cylc/flow/tests/network/test_zmq.py @@ -47,9 +47,9 @@ def test_server_requires_valid_keys(): server = ZMQSocketBase(zmq.REQ, bind=True, daemon=True) with pytest.raises(ValueError, match=r"No public key found in "): - server.start(*PORT_RANGE, private_key_location=fake.name) + server.start_(*PORT_RANGE, private_key_location=fake.name) - server.stop() + server.stop_() def test_client_requires_valid_keys(): @@ -62,7 +62,7 @@ def test_client_requires_valid_keys(): ClientError, match=r"Failed to load the suite's public " "key, so cannot connect."): # Assign a blank file masquerading as a CurveZMQ certificate - client.start(HOST, port, srv_public_key_loc=fake.name) + client.start_(HOST, port, srv_public_key_loc=fake.name) def test_single_port(): @@ -80,8 +80,8 @@ def test_single_port(): with pytest.raises(CylcError, match=r"Address already in use") as exc: serv2._socket_bind(port, port) - serv2.stop() - serv1.stop() + serv2.stop_() + serv1.stop_() context.destroy() @@ -94,7 +94,7 @@ def test_start(): assert publisher.barrier.n_waiting == 0 assert publisher.loop is None assert publisher.port is None - publisher.start(*PORT_RANGE) + publisher.start_(*PORT_RANGE) # barrier.wait() doesn't seem to work properly here # so this workaround will do while publisher.barrier.n_waiting < 1: @@ -102,7 +102,7 @@ def test_start(): assert barrier.wait() == 1 assert publisher.loop is not None assert publisher.port is not None - publisher.stop() + publisher.stop_() def test_stop(): @@ -111,7 +111,7 @@ def test_stop(): barrier = Barrier(2, timeout=20) publisher = ZMQSocketBase(zmq.PUB, suite='test_zmq_stop', bind=True, barrier=barrier, threaded=True, daemon=True) - publisher.start(*PORT_RANGE) + publisher.start_(*PORT_RANGE) # barrier.wait() doesn't seem to work properly here # so this workaround will do while publisher.barrier.n_waiting < 1: @@ -119,7 +119,7 @@ def test_stop(): barrier.wait() assert not publisher.socket.closed assert publisher.thread.is_alive() - publisher.stop() + publisher.stop_() assert publisher.socket.closed assert not publisher.thread.is_alive() From b4323c96778721388006183e92b400acc809e34d Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Thu, 16 Jan 2020 14:34:08 +1300 Subject: [PATCH 3/5] aotf: tidy and fixes --- cylc/flow/network/__init__.py | 4 +- cylc/flow/network/client.py | 2 +- cylc/flow/network/schema.py | 85 +++++++++------- cylc/flow/network/server.py | 108 ++++++++++++--------- cylc/flow/network/subscriber.py | 2 +- cylc/flow/scheduler.py | 44 ++++----- cylc/flow/scripts/cylc_ext_trigger.py | 2 +- cylc/flow/scripts/cylc_insert.py | 4 +- cylc/flow/scripts/cylc_kill.py | 2 +- cylc/flow/scripts/cylc_poll.py | 2 +- cylc/flow/scripts/cylc_remove.py | 2 +- cylc/flow/scripts/cylc_reset.py | 2 +- cylc/flow/scripts/cylc_spawn.py | 2 +- cylc/flow/scripts/cylc_trigger.py | 4 +- cylc/flow/task_pool.py | 4 +- cylc/flow/tests/network/test_client.py | 8 +- cylc/flow/tests/network/test_publisher.py | 16 +-- cylc/flow/tests/network/test_resolvers.py | 2 +- cylc/flow/tests/network/test_server.py | 6 +- cylc/flow/tests/network/test_subscriber.py | 8 +- cylc/flow/tests/network/test_zmq.py | 18 ++-- cylc/flow/tests/test_data_store_mgr.py | 2 +- tests/cylc-insert/12-cycle-500-tasks.t | 2 +- 23 files changed, 181 insertions(+), 150 deletions(-) diff --git a/cylc/flow/network/__init__.py b/cylc/flow/network/__init__.py index d98613c67f5..b55016dd48a 100644 --- a/cylc/flow/network/__init__.py +++ b/cylc/flow/network/__init__.py @@ -126,7 +126,7 @@ def __init__(self, pattern, suite=None, bind=False, context=None, self.loop = None self.stopping = False - def start_(self, *args, **kwargs): + def start(self, *args, **kwargs): """Start the server/network-component. Pass arguments to _start_ @@ -253,7 +253,7 @@ def _bespoke_start(self): self.stopping = False sleep(0) # yield control to other threads - def stop_(self, stop_loop=True): + def stop(self, stop_loop=True): """Stop the server. Args: diff --git a/cylc/flow/network/client.py b/cylc/flow/network/client.py index c7c4fe013cf..1a99f23630a 100644 --- a/cylc/flow/network/client.py +++ b/cylc/flow/network/client.py @@ -127,7 +127,7 @@ def __init__( self._timeout_handler, suite, host, port) self.poller = None # Connect the ZMQ socket on instantiation - self.start_(host, port, srv_public_key_loc) + self.start(host, port, srv_public_key_loc) # gather header info post start self.header = self.get_header() diff --git a/cylc/flow/network/schema.py b/cylc/flow/network/schema.py index 745b5ed464b..a5a7ba5cedb 100644 --- a/cylc/flow/network/schema.py +++ b/cylc/flow/network/schema.py @@ -22,10 +22,17 @@ from textwrap import dedent from typing import Callable, AsyncGenerator, Any +from graphene import ( + Boolean, Field, Float, ID, InputObjectType, Int, + List, Mutation, ObjectType, Schema, String, Union, Enum +) +from graphene.types.generic import GenericScalar +from graphene.utils.str_converters import to_snake_case + from cylc.flow.task_state import ( TASK_STATUSES_ORDERED, TASK_STATUS_DESC, - TASK_STATUS_RUNAHEAD, + # TASK_STATUS_RUNAHEAD, TASK_STATUS_WAITING, TASK_STATUS_QUEUED, TASK_STATUS_EXPIRED, @@ -43,17 +50,22 @@ JOBS, TASKS, TASK_PROXIES ) from cylc.flow.suite_status import StopMode -from cylc.flow.task_state import TASK_STATUSES_ALL - -from graphene import ( - Boolean, Field, Float, ID, InputObjectType, Int, - List, Mutation, ObjectType, Schema, String, Union, Enum -) -from graphene.types.generic import GenericScalar -from graphene.utils.str_converters import to_snake_case def sstrip(text): + """Simple function to dedent and strip text. + + Examples: + >>> print(sstrip(''' + ... foo + ... bar + ... baz + ... ''')) + foo + bar + baz + + """ return dedent(text).strip() @@ -928,7 +940,8 @@ class BroadcastSetting(InputObjectType): required=True ) value = String( - description='The value of the modification' + description='The value of the modification', + required=True ) @@ -998,24 +1011,14 @@ class NamespaceIDGlob(String): Can use the wildcard character (`*`), e.g `foo*` might match `foot`. """ - # cycle = CyclePointGlob() - # namespace = NamespaceName() - # status = TaskState() - class TaskID(String): """The name of an active task.""" - # cycle = CyclePoint(required=True) - # name = TaskName(required=True) - class JobID(String): """A job submission from an active task.""" - # task = TaskID(required=None) - # submission_number = Int(default=-1) - class TimePoint(String): """A date-time in the ISO8601 format.""" @@ -1099,7 +1102,7 @@ class Arguments: broadcasts without canceling all specific-cycle broadcasts. '''), default_value=['*']) - namespaces = List( + tasks = List( NamespaceName, description='Target namespaces.', default_value=['root'] @@ -1127,7 +1130,7 @@ class Meta: class Arguments: workflows = List(WorkflowID, required=True) - ids = List( + tasks = List( NamespaceIDGlob, description='Hold the specified tasks rather than the workflow.' ) @@ -1189,7 +1192,7 @@ class Meta: class Arguments: workflows = List(WorkflowID, required=True) - ids = List(JobID, required=True) + task_job = String(required=True) event_time = String(default_value=None) messages = List( List(String), @@ -1211,7 +1214,7 @@ class Meta: class Arguments: workflows = List(WorkflowID, required=True) - ids = List( + tasks = List( NamespaceIDGlob, description=sstrip(''' Release matching tasks rather than the workflow as whole. @@ -1309,7 +1312,7 @@ class Arguments: result = GenericScalar() -class TakeCheckpoint(Mutation): +class Checkpoint(Mutation): class Meta: description = 'Tell the suite to checkpoint its current state.' resolver = partial(mutator, command='take_checkpoints') @@ -1318,7 +1321,8 @@ class Arguments: workflows = List(WorkflowID, required=True) name = String( description='The checkpoint name.', - required=True) + required=True + ) result = GenericScalar() @@ -1364,8 +1368,14 @@ class Arguments: class TaskMutation: class Arguments: - workflows = List(WorkflowID) - ids = List(NamespaceIDGlob, required=True) + workflows = List( + WorkflowID, + required=True + ) + tasks = List( + NamespaceIDGlob, + required=True + ) result = GenericScalar() @@ -1403,11 +1413,13 @@ class Meta: resolver = partial(mutator, command='insert_tasks') class Arguments(TaskMutation.Arguments): - no_check = Boolean( + check_point = Boolean( description=sstrip(''' - Add task even if the provided cycle point is not valid for - the given task. - ''') + Check that the provided cycle point is on one of the task's + recurrences as defined in the suite configuration before + inserting. + '''), + default_value=True ) stop_point = CyclePoint( description='hold/stop cycle point for inserted task.' @@ -1431,8 +1443,9 @@ class Meta: resolver = partial(mutator, command='poll_tasks') class Arguments(TaskMutation.Arguments): - poll_succ = Boolean( - description='Allow polling of succeeded tasks.' + poll_succeeded = Boolean( + description='Allow polling of succeeded tasks.', + default_value=False ) @@ -1548,8 +1561,8 @@ class Mutations(ObjectType): set_verbosity = SetVerbosity.Field( description=SetVerbosity._meta.description) stop = Stop.Field(description=Stop._meta.description) - take_checkpoint = TakeCheckpoint.Field( - description=TakeCheckpoint._meta.description) + checkpoint = Checkpoint.Field( + description=Checkpoint._meta.description) # task actions dry_run = DryRun.Field(description=DryRun._meta.description) diff --git a/cylc/flow/network/server.py b/cylc/flow/network/server.py index 39601a0a775..01ee5265b7f 100644 --- a/cylc/flow/network/server.py +++ b/cylc/flow/network/server.py @@ -51,6 +51,17 @@ def expose(func=None): def filter_none(dictionary): + """Filter out `None` items from a dictionary: + + Examples: + >>> filter_none({ + ... 'a': 0, + ... 'b': '', + ... 'c': None + ... }) + {'a': 0, 'b': ''} + + """ return { key: value for key, value in dictionary.items() @@ -303,19 +314,19 @@ def api(self, endpoint=None): @expose def broadcast( self, + mode, cycle_points=None, - namespaces=None, + tasks=None, settings=None ): """Put or clear broadcasts.""" if mode == 'put_broadcast': return self.schd.task_events_mgr.broadcast_mgr.put_broadcast( - cycle_points, namespaces, settings) + cycle_points, tasks, settings) if mode == 'clear_broadcast': return self.schd.task_events_mgr.broadcast_mgr.clear_broadcast( - cycle_points, namespaces, cancel_settings) - else: - raise ValueError('TEMP: TODO: FixMe!') + cycle_points, tasks, settings) + raise ValueError('TEMP: TODO: FixMe!') @authorise(Priv.READ) @expose @@ -397,7 +408,7 @@ def clear_broadcast( @authorise(Priv.CONTROL) @expose - def dry_run_tasks(self, task_globs, check_syntax=True): + def dry_run_tasks(self, tasks, check_syntax=True): """Prepare job file for a task. Args: @@ -413,8 +424,11 @@ def dry_run_tasks(self, task_globs, check_syntax=True): Information about outcome. """ - self.schd.command_queue.put(('dry_run_tasks', (task_globs,), - {'check_syntax': check_syntax})) + self.schd.command_queue.put(( + 'dry_run_tasks', + (tasks,), + {'check_syntax': check_syntax} + )) return (True, 'Command queued') @authorise(Priv.CONTROL) @@ -594,13 +608,13 @@ def get_task_requisites(self, task_globs=None, list_prereqs=False): @authorise(Priv.CONTROL) @expose - def hold(self, ids=None, time=None): + def hold(self, tasks=None, time=None): """Hold the workflow.""" self.schd.command_queue.put(( 'hold', tuple(), filter_none({ - 'ids': ids, + 'tasks': tasks, 'time': time }) )) @@ -732,14 +746,14 @@ def state_totals(self): @authorise(Priv.CONTROL) @expose - def insert_tasks(self, items, stop_point_string=None, no_check=False): + def insert_tasks(self, tasks, stop_point=None, check_point=True): """Insert task proxies. Args: - items (list): + tasks (list): A list of `task globs`_ (strings) which *cannot* contain any glob characters (``*``). - stop_point_string (str, optional): + stop_point (str, optional): Optional hold/stop cycle point for inserted task. no_check (bool, optional): Add task even if the provided cycle point is not valid @@ -755,17 +769,21 @@ def insert_tasks(self, items, stop_point_string=None, no_check=False): """ self.schd.command_queue.put(( "insert_tasks", - (items,), - {"stop_point_string": stop_point_string, "no_check": no_check})) + (tasks,), + { + "stop_point_string": stop_point, + "check_point": check_point + } + )) return (True, 'Command queued') @authorise(Priv.CONTROL) @expose - def kill_tasks(self, task_globs): + def kill_tasks(self, tasks): """Kill task jobs. Args: - task_globs (list): List of identifiers, see `task globs`_ + tasks (list): List of identifiers, see `task globs`_ Returns: tuple: (outcome, message) @@ -776,7 +794,7 @@ def kill_tasks(self, task_globs): Information about outcome. """ - self.schd.command_queue.put(("kill_tasks", (task_globs,), {})) + self.schd.command_queue.put(("kill_tasks", (tasks,), {})) return (True, 'Command queued') @authorise(Priv.CONTROL) @@ -839,11 +857,11 @@ def ping_task(self, task_id, exists_only=False): @authorise(Priv.CONTROL) @expose - def poll_tasks(self, task_globs=None, poll_succ=False): + def poll_tasks(self, tasks=None, poll_succeeded=False): """Request the suite to poll task jobs. Args: - task_globs (list, optional): + tasks (list, optional): List of identifiers, see `task globs`_ poll_succ (bool, optional): Allow polling of remote tasks if True. @@ -858,7 +876,7 @@ def poll_tasks(self, task_globs=None, poll_succ=False): """ self.schd.command_queue.put( - ("poll_tasks", (task_globs,), {"poll_succ": poll_succ})) + ("poll_tasks", (tasks,), {"poll_succ": poll_succeeded})) return (True, 'Command queued') # TODO: deprecated by broadcast() @@ -903,12 +921,12 @@ def put_broadcast( @authorise(Priv.CONTROL) @expose - def put_ext_trigger(self, event_message, event_id): + def put_ext_trigger(self, message, id): """Server-side external event trigger interface. Args: - event_message (str): The external trigger message. - event_id (str): The unique trigger ID. + message (str): The external trigger message. + id (str): The unique trigger ID. Returns: tuple: (outcome, message) @@ -919,7 +937,7 @@ def put_ext_trigger(self, event_message, event_id): Information about outcome. """ - self.schd.ext_trigger_queue.put((event_message, event_id)) + self.schd.ext_trigger_queue.put((message, id)) return (True, 'Event queued') @authorise(Priv.CONTROL) @@ -970,10 +988,10 @@ def reload_suite(self): @authorise(Priv.CONTROL) @expose - def release(self, ids=None): + def release(self, tasks=None): """Release (un-hold) the workflow.""" - if ids: - self.schd.command_queue.put(("release_tasks", (ids,), {})) + if tasks: + self.schd.command_queue.put(("release_tasks", (tasks,), {})) else: self.schd.command_queue.put(("release_suite", (), {})) return (True, 'Command queued') @@ -1019,11 +1037,11 @@ def release_tasks(self, task_globs): @authorise(Priv.CONTROL) @expose - def remove_tasks(self, task_globs, spawn=False): + def remove_tasks(self, tasks, spawn=False): """Remove tasks from task pool. Args: - task_globs (list): + tasks (list): List of identifiers, see `task globs`_ spawn (bool, optional): If True ensure task has spawned before removal. @@ -1038,16 +1056,16 @@ def remove_tasks(self, task_globs, spawn=False): """ self.schd.command_queue.put( - ("remove_tasks", (task_globs,), {"spawn": spawn})) + ("remove_tasks", (tasks,), {"spawn": spawn})) return (True, 'Command queued') @authorise(Priv.CONTROL) @expose - def reset_task_states(self, task_globs, state=None, outputs=None): + def reset_task_states(self, tasks, state=None, outputs=None): """Reset statuses tasks. Args: - task_globs (list): + tasks (list): List of identifiers, see `task globs`_ state (str, optional): Task state to reset task to. @@ -1068,7 +1086,7 @@ def reset_task_states(self, task_globs, state=None, outputs=None): """ self.schd.command_queue.put(( "reset_task_states", - (task_globs,), {"state": state, "outputs": outputs})) + (tasks,), {"state": state, "outputs": outputs})) return (True, 'Command queued') # TODO: deprecated by stop() @@ -1190,11 +1208,11 @@ def set_verbosity(self, level): @authorise(Priv.CONTROL) @expose - def spawn_tasks(self, task_globs): + def spawn_tasks(self, tasks): """Spawn tasks. Args: - task_globs (list): List of identifiers, see `task globs`_ + tasks (list): List of identifiers, see `task globs`_ Returns: tuple: (outcome, message) @@ -1205,12 +1223,12 @@ def spawn_tasks(self, task_globs): Information about outcome. """ - self.schd.command_queue.put(("spawn_tasks", (task_globs,), {})) + self.schd.command_queue.put(("spawn_tasks", (tasks,), {})) return (True, 'Command queued') @authorise(Priv.SHUTDOWN) @expose - def stop( + def stop_workflow( self, mode=None, cycle_point=None, @@ -1223,10 +1241,10 @@ def stop( "stop", (), filter_none({ - mode: mode, - cycle_point: cycle_point, - clock_time: clock_time, - task: task + 'mode': mode, + 'cycle_point': cycle_point, + 'clock_time': clock_time, + 'task': task }) )) return (True, 'Command queued') @@ -1275,11 +1293,11 @@ def take_checkpoints(self, name): @authorise(Priv.CONTROL) @expose - def trigger_tasks(self, task_globs, back_out=False): + def trigger_tasks(self, tasks, back_out=False): """Trigger submission of task jobs where possible. Args: - task_globs (list): + tasks (list): List of identifiers, see `task globs`_ back_out (bool, optional): Abort e.g. in the event of a rejected trigger-edit. @@ -1294,7 +1312,7 @@ def trigger_tasks(self, task_globs, back_out=False): """ self.schd.command_queue.put( - ("trigger_tasks", (task_globs,), {"back_out": back_out})) + ("trigger_tasks", (tasks,), {"back_out": back_out})) return (True, 'Command queued') # UIServer Data Commands diff --git a/cylc/flow/network/subscriber.py b/cylc/flow/network/subscriber.py index 11d2405fcf3..fa1b492901c 100644 --- a/cylc/flow/network/subscriber.py +++ b/cylc/flow/network/subscriber.py @@ -77,7 +77,7 @@ def __init__( self.topics = set(b'') self.topics = topics # Connect the ZMQ socket on instantiation - self.start_(host, port, srv_public_key_loc) + self.start(host, port, srv_public_key_loc) def _socket_options(self): """Set options after socket instantiation and before connect. diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 2b1a133e762..d7425ffa6ae 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -285,10 +285,10 @@ def start(self): port_range = glbl_cfg().get(['suite servers', 'run ports']) self.server = SuiteRuntimeServer( self, context=self.zmq_context, barrier=barrier) - self.server.start_(port_range[0], port_range[-1]) + self.server.start(port_range[0], port_range[-1]) self.publisher = WorkflowPublisher( self.suite, context=self.zmq_context, barrier=barrier) - self.publisher.start_(port_range[0], port_range[-1]) + self.publisher.start(port_range[0], port_range[-1]) # wait for threads to setup socket ports before continuing barrier.wait() self.port = self.server.port @@ -816,17 +816,17 @@ def info_ping_task(self, task_id, exists_only=False): return self.pool.ping_task(task_id, exists_only) def command_stop( - self, - stop_mode=None, - cycle_point=None, - # NOTE clock_time YYYY/MM/DD-HH:mm back-compat removed - clock_time=None, - task=None + self, + mode=None, + cycle_point=None, + # NOTE clock_time YYYY/MM/DD-HH:mm back-compat removed + clock_time=None, + task=None ): # immediate shutdown - if stop_mode: - self._set_stop(stop_mode) - elif not any([stop_mode, cycle_point, clock_time, task]): + if mode: + self._set_stop(mode) + elif not any([mode, cycle_point, clock_time, task]): # if no arguments provided do a standard clean shutdown self._set_stop(StopMode.REQUEST_CLEAN) @@ -844,13 +844,13 @@ def command_stop( # schedule shutdown after wallclock time passes provided time if clock_time: parser = TimePointParser() - time = parser.parse(clock_time) + clock_time = parser.parse(clock_time) self.set_stop_clock( - int(stop_time.get("seconds_since_unix_epoch"))) + int(clock_time.get("seconds_since_unix_epoch"))) # schedule shutdown after task succeeds if task: - task_id = self.get_standardised_taskid(task_id) + task_id = self.get_standardised_taskid(task) if TaskID.is_valid_id(task_id): self.set_stop_task(task_id) else: @@ -949,15 +949,15 @@ def command_release_suite(self): # TODO: deprecated by command_release() self.release_suite() - def command_hold(self, ids=None, time=None): - if ids: - self.pool.hold_tasks(ids) + def command_hold(self, tasks=None, time=None): + if tasks: + self.pool.hold_tasks(tasks) if time: point = self.get_standardised_point(time) self.hold_suite(point) LOG.info( 'The suite will pause when all tasks have passed %s', point) - if not (ids or time): + if not (tasks or time): self.hold_suite() def command_hold_tasks(self, items): @@ -994,9 +994,9 @@ def command_remove_tasks(self, items, spawn=False): return self.pool.remove_tasks(items, spawn) def command_insert_tasks(self, items, stop_point_string=None, - no_check=False): + check_point=True): """Insert tasks.""" - return self.pool.insert_tasks(items, stop_point_string, no_check) + return self.pool.insert_tasks(items, stop_point_string, check_point) def command_nudge(self): """Cause the task processing loop to be invoked""" @@ -1856,11 +1856,11 @@ def shutdown(self, reason): LOG.exception(exc) if self.server: - self.server.stop_() + self.server.stop() if self.publisher: self.publisher.publish( [(b'shutdown', f'{str(reason)}'.encode('utf-8'))]) - self.publisher.stop_() + self.publisher.stop() self.curve_auth.stop() # stop the authentication thread # Flush errors and info before removing suite contact file diff --git a/cylc/flow/scripts/cylc_ext_trigger.py b/cylc/flow/scripts/cylc_ext_trigger.py index 84269066e01..89ffbf483b8 100755 --- a/cylc/flow/scripts/cylc_ext_trigger.py +++ b/cylc/flow/scripts/cylc_ext_trigger.py @@ -86,7 +86,7 @@ def main(parser, options, suite, event_msg, event_id): try: pclient( 'put_ext_trigger', - {'event_message': event_msg, 'event_id': event_id}, + {'message': event_msg, 'id': event_id}, timeout=options.comms_timeout ) except ClientError as exc: diff --git a/cylc/flow/scripts/cylc_insert.py b/cylc/flow/scripts/cylc_insert.py index 67290741b4d..da9b548ffca 100755 --- a/cylc/flow/scripts/cylc_insert.py +++ b/cylc/flow/scripts/cylc_insert.py @@ -74,8 +74,8 @@ def main(parser, options, suite, *items): pclient( 'insert_tasks', - {'items': items, 'no_check': options.no_check, - 'stop_point_string': options.stop_point_string}, + {'tasks': items, 'check_point': not options.no_check, + 'stop_point': options.stop_point_string}, timeout=options.comms_timeout ) diff --git a/cylc/flow/scripts/cylc_kill.py b/cylc/flow/scripts/cylc_kill.py index d78e6438374..6cbbb5af188 100755 --- a/cylc/flow/scripts/cylc_kill.py +++ b/cylc/flow/scripts/cylc_kill.py @@ -56,7 +56,7 @@ def main(parser, options, suite, *task_globs): suite, options.owner, options.host, options.port) pclient( 'kill_tasks', - {'task_globs': task_globs}, + {'tasks': task_globs}, timeout=options.comms_timeout ) diff --git a/cylc/flow/scripts/cylc_poll.py b/cylc/flow/scripts/cylc_poll.py index 26eca9bb5ac..6993d04546e 100755 --- a/cylc/flow/scripts/cylc_poll.py +++ b/cylc/flow/scripts/cylc_poll.py @@ -60,7 +60,7 @@ def main(parser, options, suite, *task_globs): options.comms_timeout) pclient( 'poll_tasks', - {'task_globs': task_globs, 'poll_succ': options.poll_succ} + {'tasks': task_globs, 'poll_succeeded': options.poll_succ} ) diff --git a/cylc/flow/scripts/cylc_remove.py b/cylc/flow/scripts/cylc_remove.py index a53126f26e1..56ac043c820 100755 --- a/cylc/flow/scripts/cylc_remove.py +++ b/cylc/flow/scripts/cylc_remove.py @@ -59,7 +59,7 @@ def main(parser, options, suite, *task_globs): options.comms_timeout) pclient( 'remove_tasks', - {'task_globs': task_globs, 'spawn': (not options.no_spawn)} + {'tasks': task_globs, 'spawn': (not options.no_spawn)} ) diff --git a/cylc/flow/scripts/cylc_reset.py b/cylc/flow/scripts/cylc_reset.py index 2cb6bbb854e..8f5d2615e78 100755 --- a/cylc/flow/scripts/cylc_reset.py +++ b/cylc/flow/scripts/cylc_reset.py @@ -97,7 +97,7 @@ def main(parser, options, suite, *task_globs): options.comms_timeout) pclient( 'reset_task_states', - {'task_globs': task_globs, 'state': options.state, + {'tasks': task_globs, 'state': options.state, 'outputs': options.outputs} ) diff --git a/cylc/flow/scripts/cylc_spawn.py b/cylc/flow/scripts/cylc_spawn.py index c8498dcd9c7..226a8bc2970 100755 --- a/cylc/flow/scripts/cylc_spawn.py +++ b/cylc/flow/scripts/cylc_spawn.py @@ -58,7 +58,7 @@ def main(parser, options, suite, *task_globs): pclient( 'spawn_tasks', - {'task_globs': task_globs} + {'tasks': task_globs} ) diff --git a/cylc/flow/scripts/cylc_trigger.py b/cylc/flow/scripts/cylc_trigger.py index fbc4b9afb11..27a19b16128 100755 --- a/cylc/flow/scripts/cylc_trigger.py +++ b/cylc/flow/scripts/cylc_trigger.py @@ -119,7 +119,7 @@ def main(parser, options, suite, *task_globs): # Tell the suite server program to generate the job file. pclient( 'dry_run_tasks', - {'task_globs': [task_id], 'check_syntax': False} + {'tasks': [task_id], 'check_syntax': False} ) # Wait for the new job file to be written. Use mtime because the same @@ -198,7 +198,7 @@ def main(parser, options, suite, *task_globs): # Trigger the task proxy(s). pclient( 'trigger_tasks', - {'task_globs': task_globs, 'back_out': aborted} + {'tasks': task_globs, 'back_out': aborted} ) diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index a70bb5ad1c8..e67199aaa98 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -101,7 +101,7 @@ def assign_queues(self): for queue, qconfig in self.config.cfg['scheduling']['queues'].items(): self.myq.update((name, queue) for name in qconfig['members']) - def insert_tasks(self, items, stopcp, no_check=False): + def insert_tasks(self, items, stopcp, check_point=True): """Insert tasks.""" n_warnings = 0 task_items = {} @@ -144,7 +144,7 @@ def insert_tasks(self, items, stopcp, no_check=False): # Check that the cycle point is on one of the tasks sequences. point = get_point(key[1]) - if not no_check: # Check if cycle point is on the tasks sequence. + if check_point: # Check if cycle point is on the tasks sequence. for sequence in taskdef.sequences: if sequence.is_on_sequence(point): break diff --git a/cylc/flow/tests/network/test_client.py b/cylc/flow/tests/network/test_client.py index 3a2ea5f68dc..a67d0a94b90 100644 --- a/cylc/flow/tests/network/test_client.py +++ b/cylc/flow/tests/network/test_client.py @@ -74,7 +74,7 @@ def setUp(self) -> None: warnings = self.task_pool.insert_tasks( items=[task_proxy.identity], stopcp=None, - no_check=False + check_point=True ) assert warnings == 0 self.task_pool.release_runahead_tasks() @@ -89,7 +89,7 @@ def setUp(self) -> None: barrier=barrier, daemon=True) port_range = glbl_cfg().get(['suite servers', 'run ports']) - self.server.start_(port_range[0], port_range[-1]) + self.server.start(port_range[0], port_range[-1]) # barrier.wait() doesn't seem to work properly here # so this workaround will do while barrier.n_waiting < 1: @@ -103,8 +103,8 @@ def setUp(self) -> None: sleep(0.5) def tearDown(self): - self.server.stop_() - self.client.stop_() + self.server.stop() + self.client.stop() def test_constructor(self): self.assertFalse(self.client.socket.closed) diff --git a/cylc/flow/tests/network/test_publisher.py b/cylc/flow/tests/network/test_publisher.py index 671bc2e86fc..3fd0be5ecce 100644 --- a/cylc/flow/tests/network/test_publisher.py +++ b/cylc/flow/tests/network/test_publisher.py @@ -76,7 +76,7 @@ def setUp(self) -> None: warnings = self.task_pool.insert_tasks( items=[task_proxy.identity], stopcp=None, - no_check=False + check_point=True ) assert 0 == warnings self.task_pool.release_runahead_tasks() @@ -87,7 +87,7 @@ def setUp(self) -> None: self.pub_data = self.scheduler.ws_data_mgr.get_publish_deltas() def tearDown(self): - self.publisher.stop_() + self.publisher.stop() def test_constructor(self): self.assertFalse(self.publisher.threaded) @@ -95,7 +95,7 @@ def test_constructor(self): def test_publish(self): """Test publishing data.""" - self.publisher.start_(*PORT_RANGE) + self.publisher.start(*PORT_RANGE) subscriber = WorkflowSubscriber( self.suite_name, host=self.scheduler.host, @@ -110,7 +110,7 @@ def test_publish(self): delta = DELTAS_MAP[btopic.decode('utf-8')]() delta.ParseFromString(msg) self.assertEqual(delta.id, self.workflow_id) - subscriber.stop_() + subscriber.stop() with self.assertLogs(LOG, level='ERROR') as cm: self.publisher.publish(None) self.assertIn('publish: ', cm.output[0]) @@ -119,16 +119,16 @@ def test_start(self): """Test publisher start.""" self.assertIsNone(self.publisher.loop) self.assertIsNone(self.publisher.port) - self.publisher.start_(*PORT_RANGE) + self.publisher.start(*PORT_RANGE) self.assertIsNotNone(self.publisher.loop) self.assertIsNotNone(self.publisher.port) - self.publisher.stop_() + self.publisher.stop() def test_stop(self): """Test publisher stop.""" - self.publisher.start_(*PORT_RANGE) + self.publisher.start(*PORT_RANGE) self.assertFalse(self.publisher.socket.closed) - self.publisher.stop_() + self.publisher.stop() self.assertTrue(self.publisher.socket.closed) diff --git a/cylc/flow/tests/network/test_resolvers.py b/cylc/flow/tests/network/test_resolvers.py index 646ade2a98d..566eb1d1ee4 100644 --- a/cylc/flow/tests/network/test_resolvers.py +++ b/cylc/flow/tests/network/test_resolvers.py @@ -110,7 +110,7 @@ def setUp(self) -> None: warnings = self.task_pool.insert_tasks( items=[task_proxy.identity], stopcp=None, - no_check=False + check_point=True ) assert 0 == warnings self.task_pool.release_runahead_tasks() diff --git a/cylc/flow/tests/network/test_server.py b/cylc/flow/tests/network/test_server.py index 857b9e62c10..0a21892d80c 100644 --- a/cylc/flow/tests/network/test_server.py +++ b/cylc/flow/tests/network/test_server.py @@ -75,7 +75,7 @@ def setUp(self) -> None: warnings = self.task_pool.insert_tasks( items=[task_proxy.identity], stopcp=None, - no_check=False + check_point=True ) assert 0 == warnings self.task_pool.release_runahead_tasks() @@ -91,7 +91,7 @@ def setUp(self) -> None: daemon=True ) self.server.public_priv = Priv.CONTROL - self.server.start_(*PORT_RANGE) + self.server.start(*PORT_RANGE) # barrier.wait() doesn't seem to work properly here # so this workaround will do while barrier.n_waiting < 1: @@ -100,7 +100,7 @@ def setUp(self) -> None: sleep(0.5) def tearDown(self): - self.server.stop_() + self.server.stop() def test_constructor(self): self.assertFalse(self.server.socket.closed) diff --git a/cylc/flow/tests/network/test_subscriber.py b/cylc/flow/tests/network/test_subscriber.py index 71cb627937b..91ab9f7a4e6 100644 --- a/cylc/flow/tests/network/test_subscriber.py +++ b/cylc/flow/tests/network/test_subscriber.py @@ -81,7 +81,7 @@ def setUp(self) -> None: warnings = self.task_pool.insert_tasks( items=[task_proxy.identity], stopcp=None, - no_check=False + check_point=True ) assert warnings == 0 self.task_pool.release_runahead_tasks() @@ -89,7 +89,7 @@ def setUp(self) -> None: self.workflow_id = self.scheduler.ws_data_mgr.workflow_id self.publisher = WorkflowPublisher( self.suite_name, threaded=False, daemon=True) - self.publisher.start_(*PORT_RANGE) + self.publisher.start(*PORT_RANGE) self.subscriber = WorkflowSubscriber( self.suite_name, host=self.scheduler.host, @@ -102,8 +102,8 @@ def setUp(self) -> None: self.data = None def tearDown(self): - self.subscriber.stop_() - self.publisher.stop_() + self.subscriber.stop() + self.publisher.stop() def test_constructor(self): """Test class constructor result.""" diff --git a/cylc/flow/tests/network/test_zmq.py b/cylc/flow/tests/network/test_zmq.py index a98b48ee619..8a91aabf0c1 100644 --- a/cylc/flow/tests/network/test_zmq.py +++ b/cylc/flow/tests/network/test_zmq.py @@ -47,9 +47,9 @@ def test_server_requires_valid_keys(): server = ZMQSocketBase(zmq.REQ, bind=True, daemon=True) with pytest.raises(ValueError, match=r"No public key found in "): - server.start_(*PORT_RANGE, private_key_location=fake.name) + server.start(*PORT_RANGE, private_key_location=fake.name) - server.stop_() + server.stop() def test_client_requires_valid_keys(): @@ -62,7 +62,7 @@ def test_client_requires_valid_keys(): ClientError, match=r"Failed to load the suite's public " "key, so cannot connect."): # Assign a blank file masquerading as a CurveZMQ certificate - client.start_(HOST, port, srv_public_key_loc=fake.name) + client.start(HOST, port, srv_public_key_loc=fake.name) def test_single_port(): @@ -80,8 +80,8 @@ def test_single_port(): with pytest.raises(CylcError, match=r"Address already in use") as exc: serv2._socket_bind(port, port) - serv2.stop_() - serv1.stop_() + serv2.stop() + serv1.stop() context.destroy() @@ -94,7 +94,7 @@ def test_start(): assert publisher.barrier.n_waiting == 0 assert publisher.loop is None assert publisher.port is None - publisher.start_(*PORT_RANGE) + publisher.start(*PORT_RANGE) # barrier.wait() doesn't seem to work properly here # so this workaround will do while publisher.barrier.n_waiting < 1: @@ -102,7 +102,7 @@ def test_start(): assert barrier.wait() == 1 assert publisher.loop is not None assert publisher.port is not None - publisher.stop_() + publisher.stop() def test_stop(): @@ -111,7 +111,7 @@ def test_stop(): barrier = Barrier(2, timeout=20) publisher = ZMQSocketBase(zmq.PUB, suite='test_zmq_stop', bind=True, barrier=barrier, threaded=True, daemon=True) - publisher.start_(*PORT_RANGE) + publisher.start(*PORT_RANGE) # barrier.wait() doesn't seem to work properly here # so this workaround will do while publisher.barrier.n_waiting < 1: @@ -119,7 +119,7 @@ def test_stop(): barrier.wait() assert not publisher.socket.closed assert publisher.thread.is_alive() - publisher.stop_() + publisher.stop() assert publisher.socket.closed assert not publisher.thread.is_alive() diff --git a/cylc/flow/tests/test_data_store_mgr.py b/cylc/flow/tests/test_data_store_mgr.py index 022b3892cf0..2ca63ac7361 100644 --- a/cylc/flow/tests/test_data_store_mgr.py +++ b/cylc/flow/tests/test_data_store_mgr.py @@ -69,7 +69,7 @@ def setUp(self) -> None: warnings = self.task_pool.insert_tasks( items=[task_proxy.identity], stopcp=None, - no_check=False + check_point=True ) assert 0 == warnings self.task_pool.release_runahead_tasks() diff --git a/tests/cylc-insert/12-cycle-500-tasks.t b/tests/cylc-insert/12-cycle-500-tasks.t index 6346227ba73..91ec2764111 100755 --- a/tests/cylc-insert/12-cycle-500-tasks.t +++ b/tests/cylc-insert/12-cycle-500-tasks.t @@ -31,7 +31,7 @@ cut -d' ' -f 2- "${SUITE_RUN_DIR}/log/suite/log" >'trimmed-log' for I in {001..500}; do echo "INFO - [v_i${I}.2008] -submit-num=00, inserted" done - echo "INFO - Command succeeded: insert_tasks(['2008/*'], stop_point_string=None, no_check=False)" + echo "INFO - Command succeeded: insert_tasks(['2008/*'], stop_point_string=None, check_point=True)" } >'expected-log' contains_ok 'trimmed-log' 'expected-log' From 29a1775cd06b78ec23f9652f29e5a1e8241be29b Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Tue, 28 Jan 2020 13:52:32 +1300 Subject: [PATCH 4/5] aotf: disable the files field of the broadcast mutation * this client-side logic doesn't fit into aoft very nicely * will require a bespoke solution later, perhaps an interface? --- cylc/flow/network/schema.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/cylc/flow/network/schema.py b/cylc/flow/network/schema.py index a5a7ba5cedb..0af1fdd1ada 100644 --- a/cylc/flow/network/schema.py +++ b/cylc/flow/network/schema.py @@ -1111,12 +1111,16 @@ class Arguments: BroadcastSetting, description='Target settings.' ) - files = List( - String, - description=sstrip(''' - File with config to broadcast. Can be used multiple times - ''') - ) + # TODO: work out how to implement this feature, it needs to be + # handled client-side which makes it slightly awkward in + # api-on-the-fly land + + # files = List( + # String, + # description=sstrip(''' + # File with config to broadcast. Can be used multiple times + # ''') + # ) result = GenericScalar() From b9613623fa4488c88c1781eaa6ccfadc8a2c5d2b Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Wed, 29 Jan 2020 18:29:30 +1300 Subject: [PATCH 5/5] doc fixes --- cylc/flow/network/schema.py | 16 +++++----------- cylc/flow/network/server.py | 13 +++++++------ cylc/flow/suite_status.py | 18 +++++++++++++++--- 3 files changed, 27 insertions(+), 20 deletions(-) diff --git a/cylc/flow/network/schema.py b/cylc/flow/network/schema.py index 0af1fdd1ada..e7d748de4e3 100644 --- a/cylc/flow/network/schema.py +++ b/cylc/flow/network/schema.py @@ -1286,15 +1286,9 @@ class Meta: description = sstrip(f''' Tell a suite server program to shut down. - In order to prevent failures going unnoticed, suites only shut down - automatically at a final cycle point if no failed tasks are - present. There are several shutdown methods: - - Tasks that become ready after the shutdown is ordered will be - submitted immediately if the suite is restarted. Remaining task - event handlers and job poll and kill commands, however, will be - executed prior to shutdown, unless the stop mode is - `{StopMode.REQUEST_NOW}`. + By default suites wait for all submitted and running tasks to + complete before shutting down. You can change this behaviour + with the "mode" option. ''') resolver = partial(mutator, command='stop_workflow') @@ -1459,7 +1453,7 @@ class Meta: Remove one or more task instances from a running workflow. Tasks will be forced to spawn successors before removal if they - have not done so already, unless you use `no_spawn`. + have not done so already, unless you change the `spawn` option. ''') resolver = partial(mutator, command='remove_tasks') @@ -1477,7 +1471,7 @@ class Meta: Outputs are automatically updated to reflect the new task state, except for custom message outputs which can be manipulated directly - with `output`. + with `outputs`. Prerequisites reflect the state of other tasks; they are not changed except to unset them on resetting state to diff --git a/cylc/flow/network/server.py b/cylc/flow/network/server.py index 01ee5265b7f..01495db954f 100644 --- a/cylc/flow/network/server.py +++ b/cylc/flow/network/server.py @@ -326,7 +326,8 @@ def broadcast( if mode == 'clear_broadcast': return self.schd.task_events_mgr.broadcast_mgr.clear_broadcast( cycle_points, tasks, settings) - raise ValueError('TEMP: TODO: FixMe!') + # TODO implement other broadcast interfaces (i.e. expire, display) + raise ValueError('Unsupported broadcast mode') @authorise(Priv.READ) @expose @@ -412,7 +413,7 @@ def dry_run_tasks(self, tasks, check_syntax=True): """Prepare job file for a task. Args: - task_globs (list): List of identifiers, see `task globs`_ + task (list): List of identifiers, see `task globs`_ check_syntax (bool, optional): Check shell syntax. Returns: @@ -755,9 +756,9 @@ def insert_tasks(self, tasks, stop_point=None, check_point=True): any glob characters (``*``). stop_point (str, optional): Optional hold/stop cycle point for inserted task. - no_check (bool, optional): - Add task even if the provided cycle point is not valid - for the given task. + check_point (bool, optional): + If True check that the cycle point is valid for the + given task and fail if it is not. Returns: tuple: (outcome, message) @@ -863,7 +864,7 @@ def poll_tasks(self, tasks=None, poll_succeeded=False): Args: tasks (list, optional): List of identifiers, see `task globs`_ - poll_succ (bool, optional): + poll_succeeded (bool, optional): Allow polling of remote tasks if True. Returns: diff --git a/cylc/flow/suite_status.py b/cylc/flow/suite_status.py index dc84ade63a7..008eede79e1 100644 --- a/cylc/flow/suite_status.py +++ b/cylc/flow/suite_status.py @@ -78,11 +78,23 @@ def describe(self): if self == self.AUTO_ON_TASK_FAILURE: return 'Wait until the first task fails.' if self == self.REQUEST_CLEAN: - return 'Wait for active jobs to complete.' + return ( + 'Regular shutdown:\n' + '* Wait for all active jobs to complete.\n' + '* Run suite event handlers and wait for them to complete.' + ) if self == self.REQUEST_NOW: - return 'Immediate shutdown, wait for event handlers to complete.' + return ( + 'Immediate shutdown\n' + "* Don't kill submitted or running jobs.\n" + '* Run suite event handlers and wait for them to complete.' + ) if self == self.REQUEST_NOW_NOW: - return 'Immediate shutdown.' + return ( + 'Immediate shutdown\n' + "* Don't kill submitted or running jobs.\n" + "* Don't run event handlers." + ) return ''