diff --git a/cylc/flow/network/resolvers.py b/cylc/flow/network/resolvers.py index 83e72603b0c..e0023c4542f 100644 --- a/cylc/flow/network/resolvers.py +++ b/cylc/flow/network/resolvers.py @@ -16,8 +16,9 @@ """GraphQL resolvers for use in data accessing and mutation of workflows.""" -from operator import attrgetter from fnmatch import fnmatchcase +from getpass import getuser +from operator import attrgetter from graphene.utils.str_converters import to_snake_case from cylc.flow.data_store_mgr import ( @@ -342,38 +343,7 @@ async def nodes_mutator(self, *m_args): result = (True, 'Command queued') return [{'id': w_id, 'response': result}] - async def _mutation_mapper(self, command, args): + async def _mutation_mapper(self, command, kwargs): """Map between GraphQL resolvers and internal command interface.""" - if command in ['clear_broadcast', - 'expire_broadcast', - 'put_broadcast']: - return getattr(self.schd.task_events_mgr.broadcast_mgr, - command, None)(**args) - if command == 'put_ext_trigger': - return self.schd.ext_trigger_queue.put(( - args.get('event_message'), - args.get('event_id'))) - if command == 'put_messages': - messages = args.get('messages', []) - for severity, message in messages: - self.schd.message_queue.put(( - args.get('task_job', None), - args.get('event_time', None), - severity, message)) - return (True, 'Messages queued: %d' % len(messages)) - if command in ['set_stop_after_clock_time', - 'set_stop_after_point', - 'set_stop_after_task']: - mutate_args = [command, (), {}] - for val in args.values(): - mutate_args[1] = (val,) - return self.schd.command_queue.put(tuple(mutate_args)) - mutate_args = [command, (), {}] - for key, val in args.items(): - if isinstance(val, list): - mutate_args[1] = (val,) - elif isinstance(val, dict): - mutate_args[2] = val - else: - mutate_args[2][key] = val - return self.schd.command_queue.put(tuple(mutate_args)) + method = getattr(self.schd.server, command) + return method(user=getuser(), **kwargs) diff --git a/cylc/flow/network/schema.py b/cylc/flow/network/schema.py index 6eebbf02847..e7d748de4e3 100644 --- a/cylc/flow/network/schema.py +++ b/cylc/flow/network/schema.py @@ -17,20 +17,58 @@ """GraphQL API schema via Graphene implementation.""" import asyncio +from functools import partial +import logging +from textwrap import dedent from typing import Callable, AsyncGenerator, Any -from cylc.flow.task_state import TASK_STATUSES_ORDERED -from cylc.flow.data_store_mgr import ( - ID_DELIM, FAMILIES, FAMILY_PROXIES, - JOBS, TASKS, TASK_PROXIES -) from graphene import ( Boolean, Field, Float, ID, InputObjectType, Int, - List, Mutation, ObjectType, Schema, String, Union + List, Mutation, ObjectType, Schema, String, Union, Enum ) from graphene.types.generic import GenericScalar from graphene.utils.str_converters import to_snake_case +from cylc.flow.task_state import ( + TASK_STATUSES_ORDERED, + TASK_STATUS_DESC, + # TASK_STATUS_RUNAHEAD, + TASK_STATUS_WAITING, + TASK_STATUS_QUEUED, + TASK_STATUS_EXPIRED, + TASK_STATUS_READY, + TASK_STATUS_SUBMIT_FAILED, + TASK_STATUS_SUBMIT_RETRYING, + TASK_STATUS_SUBMITTED, + TASK_STATUS_RETRYING, + TASK_STATUS_RUNNING, + TASK_STATUS_FAILED, + TASK_STATUS_SUCCEEDED +) +from cylc.flow.data_store_mgr import ( + ID_DELIM, FAMILIES, FAMILY_PROXIES, + JOBS, TASKS, TASK_PROXIES +) +from cylc.flow.suite_status import StopMode + + +def sstrip(text): + """Simple function to dedent and strip text. + + Examples: + >>> print(sstrip(''' + ... foo + ... bar + ... baz + ... ''')) + foo + bar + baz + + """ + return dedent(text).strip() + + PROXY_NODES = 'proxy_nodes' @@ -817,7 +855,7 @@ class Meta: # Mutators: -async def mutator(root, info, command, workflows=None, +async def mutator(root, info, command=None, workflows=None, exworkflows=None, **args): """Call the resolver method that act on the workflow service via the internal command queue.""" @@ -867,330 +905,675 @@ async def nodes_mutator(root, info, command, ids, workflows=None, res = await resolvers.nodes_mutator(info, command, ids, w_args, args) return GenericResponse(result=res) +# Input types: -# Mutations defined: -class ClearBroadcast(Mutation): - class Meta: - description = """Expire all settings targeting cycle points -earlier than cutoff.""" - resolver = mutator - class Arguments: - workflows = List(String, required=True) - command = String(default_value='clear_broadcast') - point_strings = List( - String, - description="""`["*"]`""", - default_value=['*']) - namespaces = List( - String, - description="""namespaces: `["foo", "BAZ"]`""",) - cancel_settings = List( - GenericScalar, - description=""" -settings: `[{environment: {ENVKEY: "env_val"}}, ...]`""",) +class WorkflowID(String): + """A registered workflow.""" - result = GenericScalar() +class CyclePoint(String): + """An integer or date-time cyclepoint.""" + + +class CyclePointGlob(String): + """A glob for integer or date-time cyclepoints. + + The wildcard character (`*`) can be used to perform globbing. + For example `2000*` might match `2000-01-01T00:00Z`. + + """ + + +class RuntimeConfiguration(String): + """A configuration item for a task or family e.g. `script`.""" + + +class BroadcastSetting(InputObjectType): + """A task/family runtime setting as a key, value pair.""" + + key = RuntimeConfiguration( + description=sstrip(''' + The cylc namespace for the setting to modify. + e.g. `[environment]variable_name`. + '''), + required=True + ) + value = String( + description='The value of the modification', + required=True + ) + + +class BroadcastMode(Enum): + Set = 'put_broadcast' + Clear = 'clear_broadcast' + + @property + def description(self): + if self == BroadcastMode.Set: + return 'Create a new broadcast.' + if self == BroadcastMode.Clear: + return 'Revoke an existing broadcast.' + return '' + + +class TaskStatus(Enum): + """The status of a task in a workflow.""" + + # NOTE: this is an enumeration purely for the GraphQL schema + # TODO: the task statuses should be formally declared in a Python + # enumeration rendering this class unnecessary + # NOTE: runahead purposefully omitted to hide users from the task pool + # Runahead = TASK_STATUS_RUNAHEAD + Waiting = TASK_STATUS_WAITING + Queued = TASK_STATUS_QUEUED + Expired = TASK_STATUS_EXPIRED + Ready = TASK_STATUS_READY + SubmitFailed = TASK_STATUS_SUBMIT_FAILED + SubmitRetrying = TASK_STATUS_SUBMIT_RETRYING + Submitted = TASK_STATUS_SUBMITTED + Retrying = TASK_STATUS_RETRYING + Running = TASK_STATUS_RUNNING + Failed = TASK_STATUS_FAILED + Succeeded = TASK_STATUS_SUCCEEDED + + @property + def description(self): + return TASK_STATUS_DESC.get(self.value, '') + + +class TaskState(InputObjectType): + """The state of a task, a combination of status and other fields.""" + + status = TaskStatus() + is_held = Boolean(description=sstrip(''' + If a task is held no new job submissions will be made + ''')) + + +class TaskName(String): + """The name a task. + + * Must be a task not a family. + * Does not include the cycle point. + * Any parameters must be expanded (e.g. can't be `foo`). + """ + + +class NamespaceName(String): + """The name of a task or family.""" + + +class NamespaceIDGlob(String): + """A glob search for an active task or family. + + Can use the wildcard character (`*`), e.g `foo*` might match `foot`. + """ + + +class TaskID(String): + """The name of an active task.""" + + +class JobID(String): + """A job submission from an active task.""" + + +class TimePoint(String): + """A date-time in the ISO8601 format.""" + + +LogLevels = Enum( + 'LogLevels', + list(logging._nameToLevel.items()), + description=lambda x: f'Python logging level: {x.name} = {x.value}.' + if x else '' +) + + +class SuiteStopMode(Enum): + """The mode used to stop a running workflow.""" + + # Note: contains only the REQUEST_* values from StopMode + Clean = StopMode.REQUEST_CLEAN + Now = StopMode.REQUEST_NOW + NowNow = StopMode.REQUEST_NOW_NOW -class ExpireBroadcast(Mutation): + @property + def description(self): + return StopMode(self.value).describe() + + +# Mutations: + +# TODO: re-instate: +# - get-broadcast (can just use GraphQL query BUT needs CLI access too) +# - expire-broadcast + +class Broadcast(Mutation): class Meta: - description = """Clear settings globally, -or for listed namespaces and/or points.""" - resolver = mutator + description = sstrip(''' + Override or add new [runtime] config in targeted namespaces in + a running suite. + + Uses for broadcast include making temporary changes to task + behaviour, and task-to-downstream-task communication via + environment variables. + + A broadcast can target any [runtime] namespace for all cycles or + for a specific cycle. If a task is affected by specific-cycle and + all-cycle broadcasts at once, the specific takes precedence. If + a task is affected by broadcasts to multiple ancestor + namespaces, the result is determined by normal [runtime] + inheritance. In other words, it follows this order: + + `all:root -> all:FAM -> all:task -> tag:root -> tag:FAM -> + tag:task` + + Broadcasts persist, even across suite restarts, until they expire + when their target cycle point is older than the oldest current in + the suite, or until they are explicitly cancelled with this + command. All-cycle broadcasts do not expire. + + For each task the final effect of all broadcasts to all namespaces + is computed on the fly just prior to job submission. The + `--cancel` and `--clear` options simply cancel (remove) active + broadcasts, they do not act directly on the final task-level + result. Consequently, for example, you cannot broadcast to "all + cycles except Tn" with an all-cycle broadcast followed by a cancel + to Tn (there is no direct broadcast to Tn to cancel); and you + cannot broadcast to "all members of FAMILY except member_n" with a + general broadcast to FAMILY followed by a cancel to member_n (there + is no direct broadcast to member_n to cancel). + ''') + resolver = partial(mutator, command='broadcast') class Arguments: - workflows = List(String, required=True) - command = String(default_value='expire_broadcast') - cutoff = String(description="""String""") + workflows = List(WorkflowID, required=True) + mode = BroadcastMode( + default_value=1, + required=True + ) + cycle_points = List( + CyclePoint, + description=sstrip(''' + List of cycle points to target or `*` to cancel all all-cycle + broadcasts without canceling all specific-cycle broadcasts. + '''), + default_value=['*']) + tasks = List( + NamespaceName, + description='Target namespaces.', + default_value=['root'] + ) + settings = List( + BroadcastSetting, + description='Target settings.' + ) + # TODO: work out how to implement this feature, it needs to be + # handled client-side which makes it slightly awkward in + # api-on-the-fly land + + # files = List( + # String, + # description=sstrip(''' + # File with config to broadcast. Can be used multiple times + # ''') + # ) result = GenericScalar() -class HoldWorkflow(Mutation): +class Hold(Mutation): class Meta: - description = """Hold workflow. -- hold on workflow. (default) -- hold point of workflow.""" - resolver = mutator + description = sstrip(''' + Hold a workflow or tasks within it. + ''') + resolver = partial(mutator, command='hold') class Arguments: - command = String( - description="""options: -- `hold_suite` (default) -- `hold_after_point_string`""", - default_value='hold_suite') - point_string = String() - workflows = List(String, required=True) + workflows = List(WorkflowID, required=True) + tasks = List( + NamespaceIDGlob, + description='Hold the specified tasks rather than the workflow.' + ) + time = TimePoint(description=sstrip(''' + Get the workflow to hold after the specified wallclock time + has passed. + ''')) result = GenericScalar() -class NudgeWorkflow(Mutation): +class Nudge(Mutation): class Meta: - description = """Tell workflow to try task processing.""" - resolver = mutator + description = sstrip(''' + Cause the Cylc task processing loop to be invoked on a running + suite. + + This happens automatically when the state of any task changes + such that task processing (dependency negotiation etc.) + is required, or if a clock-trigger task is ready to run. + ''') + resolver = partial(mutator, command='nudge') class Arguments: - command = String(default_value='nudge') - workflows = List(String, required=True) + workflows = List(WorkflowID, required=True) result = GenericScalar() -class PutBroadcast(Mutation): +class Ping(Mutation): class Meta: - description = """Put up new broadcast settings -(server side interface).""" - resolver = mutator + description = sstrip(''' + Send a test message to a running suite. + ''') + resolver = partial(mutator, command='ping_suite') class Arguments: - command = String(default_value='put_broadcast') - workflows = List(String, required=True) - point_strings = List( - String, - description="""`["*"]`""", - default_value=['*']) - namespaces = List( - String, - description="""namespaces: `["foo", "BAZ"]`""",) - settings = List( - GenericScalar, - description=""" -settings: `[{environment: {ENVKEY: "env_val"}}, ...]`""",) + workflows = List(WorkflowID, required=True) result = GenericScalar() -class PutMessages(Mutation): +class Message(Mutation): class Meta: - description = """Put task messages in queue for processing -later by the main loop.""" - resolver = nodes_mutator + description = sstrip(''' + Record task job messages. + + Send task job messages to: + - The job stdout/stderr. + - The job status file, if there is one. + - The suite server program, if communication is possible. + + Task jobs use this to record and report status such + as success and failure. Applications run by task jobs can use + this command to report messages and to report registered task + outputs. + ''') + resolver = partial(nodes_mutator, command='put_messages') class Arguments: - workflows = List(String, required=True) - command = String(default_value='put_messages') - ids = List( - String, - description="""Task job in the form -`"CYCLE%TASK_NAME%SUBMIT_NUM"`""", - required=True) + workflows = List(WorkflowID, required=True) + task_job = String(required=True) event_time = String(default_value=None) messages = List( List(String), description="""List in the form `[[severity, message], ...]`.""", - default_value=None) + default_value=None + ) result = GenericScalar() -class ReleaseWorkflow(Mutation): +class Release(Mutation): class Meta: - description = """Reload workflow definitions.""" - resolver = mutator + description = sstrip(''' + Release a held workflow or tasks within it. + + See also the opposite command `hold`. + ''') + resolver = partial(mutator, command='release') class Arguments: - command = String(default_value='release_suite') - workflows = List(String, required=True) + workflows = List(WorkflowID, required=True) + tasks = List( + NamespaceIDGlob, + description=sstrip(''' + Release matching tasks rather than the workflow as whole. + ''') + ) result = GenericScalar() -class ReloadWorkflow(Mutation): +class Reload(Mutation): class Meta: - description = """Tell workflow to reload the workflow definition.""" - resolver = mutator + description = sstrip(''' + Tell a suite to reload its definition at run time. + + All settings including task definitions, with the + exception of suite log configuration, can be changed on reload. + Note that defined tasks can be be added to or removed from a + running suite using "insert" and "remove" without reloading. This + command also allows addition and removal of actual task + definitions, and therefore insertion of tasks that were not defined + at all when the suite started (you will still need to manually + insert a particular instance of a newly defined task). Live task + proxies that are orphaned by a reload (i.e. their task definitions + have been removed) will be removed from the task pool if they have + not started running yet. Changes to task definitions take effect + immediately, unless a task is already running at reload time. + + If the suite was started with Jinja2 template variables + set on the command line (cylc run --set FOO=bar REG) the same + template settings apply to the reload (only changes to the suite.rc + file itself are reloaded). + + If the modified suite definition does not parse, + failure to reload will be reported but no harm will be done to the + running suite. + ''') + resolver = partial(mutator, command='reload_suite') class Arguments: - workflows = List(String, required=True) - command = String(default_value='reload_suite') + workflows = List(WorkflowID, required=True) result = GenericScalar() class SetVerbosity(Mutation): class Meta: - description = """Set workflow verbosity to new level.""" - resolver = mutator + description = sstrip(''' + Change the logging severity level of a running suite. + + Only messages at or above the chosen severity level will be logged; + for example, if you choose `WARNING`, only warnings and critical + messages will be logged. + ''') + resolver = partial(mutator, command='set_verbosity') class Arguments: - workflows = List(String, required=True) - command = String(default_value='set_verbosity') - level = String( - description="""levels: -`INFO`, `WARNING`, `NORMAL`, `CRITICAL`, `ERROR`, `DEBUG`""", - required=True) + workflows = List(WorkflowID, required=True) + level = LogLevels(required=True) result = GenericScalar() -class StopWorkflowArgs(InputObjectType): - datetime_string = String(description="""ISO 8601 compatible or -`YYYY/MM/DD-HH:mm` of wallclock/real-world date-time""") - point_string = String(description="""Workflow formatted point string""") - task_id = String() - kill_active_tasks = Boolean(description="""Use with: set_stop_cleanly""") - terminate = Boolean(description="""Use with: `stop_now`""") +class Stop(Mutation): + class Meta: + description = sstrip(f''' + Tell a suite server program to shut down. + + By default suites wait for all submitted and running tasks to + complete before shutting down. You can change this behaviour + with the "mode" option. + ''') + resolver = partial(mutator, command='stop_workflow') + + class Arguments: + workflows = List(WorkflowID, required=True) + mode = SuiteStopMode( + # TODO default + ) + cycle_point = CyclePoint( + description='Stop after the suite reaches this cycle.' + ) + clock_time = TimePoint( + description='Stop after wall-clock time passes this point.' + ) + task = TaskID( + description='Stop after this task succeeds.' + ) + + result = GenericScalar() -class StopWorkflow(Mutation): +class Checkpoint(Mutation): class Meta: - description = """Workflow stop actions: -- Cleanly or after kill active tasks. (default) -- After cycle point. -- After wallclock time. -- On event handler completion, or terminate right away. -- After an instance of a task.""" - resolver = mutator + description = 'Tell the suite to checkpoint its current state.' + resolver = partial(mutator, command='take_checkpoints') class Arguments: - workflows = List(String, required=True) - command = String( - description="""String options: -- `set_stop_cleanly` (default) -- `set_stop_after_clock_time` -- `set_stop_after_point` -- `set_stop_after_task` -- `stop_now`""", - default_value='set_stop_cleanly',) - args = StopWorkflowArgs() + workflows = List(WorkflowID, required=True) + name = String( + description='The checkpoint name.', + required=True + ) result = GenericScalar() -class TaskArgs(InputObjectType): - check_syntax = Boolean(description="""Use with actions: -- `dry_run_tasks`""") - no_check = Boolean(description="""Use with actions: -- `insert_tasks`""") - stop_point_string = String(description="""Use with actions: -- `insert_tasks`""") - poll_succ = Boolean(description="""Use with actions: -- `poll_tasks`""") - spawn = Boolean(description="""Use with actions: -- `remove_tasks`""") - state = String(description="""Use with actions: -- `reset_task_states`""") - outputs = List(String, description="""Use with actions: -- `reset_task_states`""") - back_out = Boolean(description="""Use with actions: -- `trigger_tasks`""") - - -class TaskActions(Mutation): +class ExtTrigger(Mutation): class Meta: - description = """Task actions: -- Prepare job file for task(s). -- Hold tasks. -- Insert task proxies. -- Kill task jobs. -- Return True if task_id exists (and running). -- Unhold tasks. -- Remove tasks from task pool. -- Reset statuses tasks. -- Spawn tasks. -- Trigger submission of task jobs where possible.""" - resolver = nodes_mutator + description = sstrip(''' + Report an external event message to a suite server program. + + It is expected that a task in the suite has registered the same + message as an external trigger - a special prerequisite to be + satisfied by an external system, via this command, rather than by + triggering off other tasks. + + The ID argument should uniquely distinguish one external trigger + event from the next. When a task's external trigger is satisfied by + an incoming message, the message ID is broadcast to all downstream + tasks in the cycle point as `$CYLC_EXT_TRIGGER_ID` so that they can + use it - e.g. to identify a new data file that the external + triggering system is responding to. + + Use the retry options in case the target suite is down or out of + contact. + + Note: To manually trigger a task use "Trigger" not + "ExtTrigger". + ''') + resolver = partial(mutator, command='put_ext_trigger') class Arguments: - workflows = List(String) - command = String( - description="""Task actions: -- `dry_run_tasks` -- `hold_tasks` -- `insert_tasks` -- `kill_tasks` -- `poll_tasks` -- `release_tasks` -- `remove_tasks` -- `reset_task_states` -- `spawn_tasks` -- `trigger_tasks`""", - required=True,) - ids = List( - String, - description="""Used with: -- All Commands - -A list of identifiers (family%glob%id) for matching task proxies, i.e. -``` -[ - "owner%workflow%201905*%foo", - "foo.201901*:failed", - "201901*%baa:failed", - "FAM.20190101T0000Z", - "FAM2", - "*.20190101T0000Z" -] -``` -(where % is the delimiter) + workflows = List(WorkflowID, required=True) + message = String( + description='External trigger message.', + required=True + ) + id = String( + description='Unique trigger ID.', + required=True + ) + + result = GenericScalar() -Splits argument into components, creates workflows argument if non-existent. -""", - required=True) - args = TaskArgs() + +class TaskMutation: + class Arguments: + workflows = List( + WorkflowID, + required=True + ) + tasks = List( + NamespaceIDGlob, + required=True + ) result = GenericScalar() -class TakeCheckpoint(Mutation): +class DryRun(Mutation, TaskMutation): class Meta: - description = """Checkpoint current task pool.""" - resolver = mutator + description = sstrip(''' + [For internal use] Prepare the job file for a task. + ''') + resolver = partial(mutator, command='dry_run_tasks') - class Arguments: - workflows = List(String, required=True) - command = String(default_value='take_checkpoints') - name = String( - description="""The checkpoint name""", - required=True,) + class Arguments(TaskMutation.Arguments): + check_syntax = Boolean( + description='Check shell syntax.', + default_value=True + ) - result = GenericScalar() +class Insert(Mutation, TaskMutation): + class Meta: + description = sstrip(''' + Insert new task proxies into the task pool of a running workflow. + + For example to enable re-triggering earlier tasks already removed + from the pool. + + Note: inserted cycling tasks cycle on as normal, even if another + instance of the same task exists at a later cycle (instances of the + same task at different cycles can coexist, but a newly spawned task + will not be added to the pool if it catches up to another task with + the same ID). + + See also "Submit", for running tasks without the scheduler. + ''') + resolver = partial(mutator, command='insert_tasks') + + class Arguments(TaskMutation.Arguments): + check_point = Boolean( + description=sstrip(''' + Check that the provided cycle point is on one of the task's + recurrences as defined in the suite configuration before + inserting. + '''), + default_value=True + ) + stop_point = CyclePoint( + description='hold/stop cycle point for inserted task.' + ) + + +class Kill(Mutation, TaskMutation): + # TODO: This should be a job mutation? + class Meta: + description = sstrip(''' + Kill jobs of active tasks and update their statuses accordingly. + ''') + resolver = partial(mutator, command='kill_tasks') -class ExternalTrigger(Mutation): + +class Poll(Mutation, TaskMutation): class Meta: - description = """Server-side external event trigger interface.""" - resolver = mutator + description = sstrip(''' + Poll (query) task jobs to verify and update their statuses. + ''') + resolver = partial(mutator, command='poll_tasks') - class Arguments: - workflows = List(String, required=True) - command = String(default_value='put_external_trigger') - event_message = String(required=True) - event_id = String(required=True) + class Arguments(TaskMutation.Arguments): + poll_succeeded = Boolean( + description='Allow polling of succeeded tasks.', + default_value=False + ) - result = GenericScalar() + +class Remove(Mutation, TaskMutation): + class Meta: + description = sstrip(''' + Remove one or more task instances from a running workflow. + + Tasks will be forced to spawn successors before removal if they + have not done so already, unless you change the `spawn` option. + ''') + resolver = partial(mutator, command='remove_tasks') + + class Arguments(TaskMutation.Arguments): + spawn = Boolean( + description='Spawn successors before removal.', + default_value=True + ) + + +class Reset(Mutation, TaskMutation): + class Meta: + description = sstrip(f''' + Force task instances to a specified state. + + Outputs are automatically updated to reflect the new task state, + except for custom message outputs which can be manipulated directly + with `outputs`. + + Prerequisites reflect the state of other tasks; they are not + changed except to unset them on resetting state to + `{TASK_STATUS_WAITING}` or earlier. + + Note: To hold and release tasks use "Hold" and "Release", not this + command. + ''') + resolver = partial(mutator, command='reset_task_states') + + class Arguments(TaskMutation.Arguments): + state = TaskStatus( + description='Reset the task status to this.' + ) + outputs = List( + String, + description=sstrip(''' + Find task output by message string or trigger string, set + complete or incomplete with `!OUTPUT`, `*` to set all + complete, `!*` to set all incomplete. + ''') + ) + + +class Spawn(Mutation, TaskMutation): + class Meta: + description = sstrip(f''' + Force task proxies to spawn successors at their own next cycle + point. + + Tasks normally spawn on reaching the {TASK_STATUS_SUBMITTED} + status. Spawning them early allows running successive instances of + the same task out of order. See also the `spawn to max active + cycle points` workflow configuration. + + Note this command does not operate on tasks at any arbitrary point + in the abstract workflow graph - tasks not already in the pool must + be inserted first with "Insert". + ''') + resolver = partial(mutator, command='spawn_tasks') + + +class Trigger(Mutation, TaskMutation): + class Meta: + description = sstrip(''' + Manually trigger tasks. + + TODO: re-implement edit funtionality! + + For single tasks you can use `edit` to edit the generated job + script before it submits, to apply one-off changes. A diff between + the original and edited job script will be saved to the task job + log directory. + + Warning: waiting tasks that are queue-limited will be queued if + triggered, to submit as normal when released by the queue; queued + tasks will submit immediately if triggered, even if that violates + the queue limit (so you may need to trigger a queue-limited task + twice to get it to submit immediately). + + Note: tasks not already in the pool must be inserted first with + "Insert" in order to be matched. + ''') + resolver = partial(mutator, command='trigger_tasks') + + class Arguments(TaskMutation.Arguments): + # back_out = Boolean() + # TODO: remove or re-implement? + pass # Mutation declarations + class Mutations(ObjectType): - clear_broadcast = ClearBroadcast.Field( - description=ClearBroadcast._meta.description) - expire_broadcast = ExpireBroadcast.Field( - description=ExpireBroadcast._meta.description) - put_broadcast = PutBroadcast.Field( - description=PutBroadcast._meta.description) - ext_trigger = ExternalTrigger.Field( - description=ExternalTrigger._meta.description) - hold_workflow = HoldWorkflow.Field( - description=HoldWorkflow._meta.description) - nudge_workflow = NudgeWorkflow.Field( - description=NudgeWorkflow._meta.description) - put_messages = PutMessages.Field( - description=PutMessages._meta.description) - release_workflow = ReleaseWorkflow.Field( - description=ReleaseWorkflow._meta.description) - reload_workflow = ReloadWorkflow.Field( - description=ReloadWorkflow._meta.description) + # workflow actions + broadcast = Broadcast.Field(description=Message._meta.description) + ext_trigger = ExtTrigger.Field( + description=ExtTrigger._meta.description) + hold = Hold.Field(description=Hold._meta.description) + nudge = Nudge.Field(description=Nudge._meta.description) + message = Message.Field(description=Message._meta.description) + ping = Ping.Field(description=Ping._meta.description) + release = Release.Field(description=Release._meta.description) + reload = Reload.Field(description=Reload._meta.description) set_verbosity = SetVerbosity.Field( description=SetVerbosity._meta.description) - stop_workflow = StopWorkflow.Field( - description=StopWorkflow._meta.description) - take_checkpoint = TakeCheckpoint.Field( - description=TakeCheckpoint._meta.description) - task_actions = TaskActions.Field( - description=TaskActions._meta.description) + stop = Stop.Field(description=Stop._meta.description) + checkpoint = Checkpoint.Field( + description=Checkpoint._meta.description) + + # task actions + dry_run = DryRun.Field(description=DryRun._meta.description) + insert = Insert.Field(description=Insert._meta.description) + kill = Kill.Field(description=Kill._meta.description) + poll = Poll.Field(description=Poll._meta.description) + remove = Remove.Field(description=Remove._meta.description) + reset = Reset.Field(description=Reset._meta.description) + spawn = Spawn.Field(description=Spawn._meta.description) + trigger = Trigger.Field(description=Trigger._meta.description) + + # job actions + # TODO # ** Subscription Related ** # diff --git a/cylc/flow/network/server.py b/cylc/flow/network/server.py index 49e952eb3f0..01495db954f 100644 --- a/cylc/flow/network/server.py +++ b/cylc/flow/network/server.py @@ -50,6 +50,25 @@ def expose(func=None): return func +def filter_none(dictionary): + """Filter out `None` items from a dictionary: + + Examples: + >>> filter_none({ + ... 'a': 0, + ... 'b': '', + ... 'c': None + ... }) + {'a': 0, 'b': ''} + + """ + return { + key: value + for key, value in dictionary.items() + if value is not None + } + + class SuiteRuntimeServer(ZMQSocketBase): """Suite runtime service API facade exposed via zmq. @@ -291,6 +310,25 @@ def api(self, endpoint=None): return '%s\n%s' % (head, tail) return 'No method by name "%s"' % endpoint + @authorise(Priv.CONTROL) + @expose + def broadcast( + self, + mode, + cycle_points=None, + tasks=None, + settings=None + ): + """Put or clear broadcasts.""" + if mode == 'put_broadcast': + return self.schd.task_events_mgr.broadcast_mgr.put_broadcast( + cycle_points, tasks, settings) + if mode == 'clear_broadcast': + return self.schd.task_events_mgr.broadcast_mgr.clear_broadcast( + cycle_points, tasks, settings) + # TODO implement other broadcast interfaces (i.e. expire, display) + raise ValueError('Unsupported broadcast mode') + @authorise(Priv.READ) @expose def graphql(self, request_string=None, variables=None): @@ -331,6 +369,7 @@ def graphql(self, request_string=None, variables=None): return errors return executed.data + # TODO: deprecated by broadcast() @authorise(Priv.CONTROL) @expose def clear_broadcast( @@ -370,11 +409,11 @@ def clear_broadcast( @authorise(Priv.CONTROL) @expose - def dry_run_tasks(self, task_globs, check_syntax=True): + def dry_run_tasks(self, tasks, check_syntax=True): """Prepare job file for a task. Args: - task_globs (list): List of identifiers, see `task globs`_ + task (list): List of identifiers, see `task globs`_ check_syntax (bool, optional): Check shell syntax. Returns: @@ -386,8 +425,11 @@ def dry_run_tasks(self, task_globs, check_syntax=True): Information about outcome. """ - self.schd.command_queue.put(('dry_run_tasks', (task_globs,), - {'check_syntax': check_syntax})) + self.schd.command_queue.put(( + 'dry_run_tasks', + (tasks,), + {'check_syntax': check_syntax} + )) return (True, 'Command queued') @authorise(Priv.CONTROL) @@ -565,6 +607,21 @@ def get_task_requisites(self, task_globs=None, list_prereqs=False): return self.schd.info_get_task_requisites( task_globs, list_prereqs=list_prereqs) + @authorise(Priv.CONTROL) + @expose + def hold(self, tasks=None, time=None): + """Hold the workflow.""" + self.schd.command_queue.put(( + 'hold', + tuple(), + filter_none({ + 'tasks': tasks, + 'time': time + }) + )) + return (True, 'Command queued') + + # TODO: deprecated by hold() @authorise(Priv.CONTROL) @expose def hold_after_point_string(self, point_string): @@ -586,6 +643,7 @@ def hold_after_point_string(self, point_string): ("hold_after_point_string", (point_string,), {})) return (True, 'Command queued') + # TODO: deprecated by hold() @authorise(Priv.CONTROL) @expose def hold_suite(self): @@ -603,6 +661,7 @@ def hold_suite(self): self.schd.command_queue.put(("hold_suite", (), {})) return (True, 'Command queued') + # TODO: deprecated by hold() @authorise(Priv.CONTROL) @expose def hold_tasks(self, task_globs): @@ -688,18 +747,18 @@ def state_totals(self): @authorise(Priv.CONTROL) @expose - def insert_tasks(self, items, stop_point_string=None, no_check=False): + def insert_tasks(self, tasks, stop_point=None, check_point=True): """Insert task proxies. Args: - items (list): + tasks (list): A list of `task globs`_ (strings) which *cannot* contain any glob characters (``*``). - stop_point_string (str, optional): + stop_point (str, optional): Optional hold/stop cycle point for inserted task. - no_check (bool, optional): - Add task even if the provided cycle point is not valid - for the given task. + check_point (bool, optional): + If True check that the cycle point is valid for the + given task and fail if it is not. Returns: tuple: (outcome, message) @@ -711,17 +770,21 @@ def insert_tasks(self, items, stop_point_string=None, no_check=False): """ self.schd.command_queue.put(( "insert_tasks", - (items,), - {"stop_point_string": stop_point_string, "no_check": no_check})) + (tasks,), + { + "stop_point_string": stop_point, + "check_point": check_point + } + )) return (True, 'Command queued') @authorise(Priv.CONTROL) @expose - def kill_tasks(self, task_globs): + def kill_tasks(self, tasks): """Kill task jobs. Args: - task_globs (list): List of identifiers, see `task globs`_ + tasks (list): List of identifiers, see `task globs`_ Returns: tuple: (outcome, message) @@ -732,7 +795,7 @@ def kill_tasks(self, task_globs): Information about outcome. """ - self.schd.command_queue.put(("kill_tasks", (task_globs,), {})) + self.schd.command_queue.put(("kill_tasks", (tasks,), {})) return (True, 'Command queued') @authorise(Priv.CONTROL) @@ -795,13 +858,13 @@ def ping_task(self, task_id, exists_only=False): @authorise(Priv.CONTROL) @expose - def poll_tasks(self, task_globs=None, poll_succ=False): + def poll_tasks(self, tasks=None, poll_succeeded=False): """Request the suite to poll task jobs. Args: - task_globs (list, optional): + tasks (list, optional): List of identifiers, see `task globs`_ - poll_succ (bool, optional): + poll_succeeded (bool, optional): Allow polling of remote tasks if True. Returns: @@ -814,9 +877,10 @@ def poll_tasks(self, task_globs=None, poll_succ=False): """ self.schd.command_queue.put( - ("poll_tasks", (task_globs,), {"poll_succ": poll_succ})) + ("poll_tasks", (tasks,), {"poll_succ": poll_succeeded})) return (True, 'Command queued') + # TODO: deprecated by broadcast() @authorise(Priv.CONTROL) @expose def put_broadcast( @@ -858,12 +922,12 @@ def put_broadcast( @authorise(Priv.CONTROL) @expose - def put_ext_trigger(self, event_message, event_id): + def put_ext_trigger(self, message, id): """Server-side external event trigger interface. Args: - event_message (str): The external trigger message. - event_id (str): The unique trigger ID. + message (str): The external trigger message. + id (str): The unique trigger ID. Returns: tuple: (outcome, message) @@ -874,7 +938,7 @@ def put_ext_trigger(self, event_message, event_id): Information about outcome. """ - self.schd.ext_trigger_queue.put((event_message, event_id)) + self.schd.ext_trigger_queue.put((message, id)) return (True, 'Event queued') @authorise(Priv.CONTROL) @@ -923,6 +987,17 @@ def reload_suite(self): self.schd.command_queue.put(("reload_suite", (), {})) return (True, 'Command queued') + @authorise(Priv.CONTROL) + @expose + def release(self, tasks=None): + """Release (un-hold) the workflow.""" + if tasks: + self.schd.command_queue.put(("release_tasks", (tasks,), {})) + else: + self.schd.command_queue.put(("release_suite", (), {})) + return (True, 'Command queued') + + # TODO: deprecated by release() @authorise(Priv.CONTROL) @expose def release_suite(self): @@ -940,6 +1015,7 @@ def release_suite(self): self.schd.command_queue.put(("release_suite", (), {})) return (True, 'Command queued') + # TODO: deprecated by release() @authorise(Priv.CONTROL) @expose def release_tasks(self, task_globs): @@ -962,11 +1038,11 @@ def release_tasks(self, task_globs): @authorise(Priv.CONTROL) @expose - def remove_tasks(self, task_globs, spawn=False): + def remove_tasks(self, tasks, spawn=False): """Remove tasks from task pool. Args: - task_globs (list): + tasks (list): List of identifiers, see `task globs`_ spawn (bool, optional): If True ensure task has spawned before removal. @@ -981,16 +1057,16 @@ def remove_tasks(self, task_globs, spawn=False): """ self.schd.command_queue.put( - ("remove_tasks", (task_globs,), {"spawn": spawn})) + ("remove_tasks", (tasks,), {"spawn": spawn})) return (True, 'Command queued') @authorise(Priv.CONTROL) @expose - def reset_task_states(self, task_globs, state=None, outputs=None): + def reset_task_states(self, tasks, state=None, outputs=None): """Reset statuses tasks. Args: - task_globs (list): + tasks (list): List of identifiers, see `task globs`_ state (str, optional): Task state to reset task to. @@ -1011,9 +1087,10 @@ def reset_task_states(self, task_globs, state=None, outputs=None): """ self.schd.command_queue.put(( "reset_task_states", - (task_globs,), {"state": state, "outputs": outputs})) + (tasks,), {"state": state, "outputs": outputs})) return (True, 'Command queued') + # TODO: deprecated by stop() @authorise(Priv.SHUTDOWN) @expose def set_stop_after_clock_time(self, datetime_string): @@ -1038,6 +1115,7 @@ def set_stop_after_clock_time(self, datetime_string): ("set_stop_after_clock_time", (datetime_string,), {})) return (True, 'Command queued') + # TODO: deprecated by stop() @authorise(Priv.SHUTDOWN) @expose def set_stop_after_point(self, point_string): @@ -1060,6 +1138,7 @@ def set_stop_after_point(self, point_string): ("set_stop_after_point", (point_string,), {})) return (True, 'Command queued') + # TODO: deprecated by stop() @authorise(Priv.SHUTDOWN) @expose def set_stop_after_task(self, task_id): @@ -1081,6 +1160,7 @@ def set_stop_after_task(self, task_id): ("set_stop_after_task", (task_id,), {})) return (True, 'Command queued') + # TODO: deprecated by stop() @authorise(Priv.SHUTDOWN) @expose def set_stop_cleanly(self, kill_active_tasks=False): @@ -1129,11 +1209,11 @@ def set_verbosity(self, level): @authorise(Priv.CONTROL) @expose - def spawn_tasks(self, task_globs): + def spawn_tasks(self, tasks): """Spawn tasks. Args: - task_globs (list): List of identifiers, see `task globs`_ + tasks (list): List of identifiers, see `task globs`_ Returns: tuple: (outcome, message) @@ -1144,9 +1224,33 @@ def spawn_tasks(self, task_globs): Information about outcome. """ - self.schd.command_queue.put(("spawn_tasks", (task_globs,), {})) + self.schd.command_queue.put(("spawn_tasks", (tasks,), {})) + return (True, 'Command queued') + + @authorise(Priv.SHUTDOWN) + @expose + def stop_workflow( + self, + mode=None, + cycle_point=None, + clock_time=None, + task=None + ): + """Stop the workflow.""" + + self.schd.command_queue.put(( + "stop", + (), + filter_none({ + 'mode': mode, + 'cycle_point': cycle_point, + 'clock_time': clock_time, + 'task': task + }) + )) return (True, 'Command queued') + # TODO: deprecated by stop() @authorise(Priv.SHUTDOWN) @expose def stop_now(self, terminate=False): @@ -1190,11 +1294,11 @@ def take_checkpoints(self, name): @authorise(Priv.CONTROL) @expose - def trigger_tasks(self, task_globs, back_out=False): + def trigger_tasks(self, tasks, back_out=False): """Trigger submission of task jobs where possible. Args: - task_globs (list): + tasks (list): List of identifiers, see `task globs`_ back_out (bool, optional): Abort e.g. in the event of a rejected trigger-edit. @@ -1209,7 +1313,7 @@ def trigger_tasks(self, task_globs, back_out=False): """ self.schd.command_queue.put( - ("trigger_tasks", (task_globs,), {"back_out": back_out})) + ("trigger_tasks", (tasks,), {"back_out": back_out})) return (True, 'Command queued') # UIServer Data Commands diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 5831ace8998..d7425ffa6ae 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -815,14 +815,58 @@ def info_ping_task(self, task_id, exists_only=False): task_id = self.get_standardised_taskid(task_id) return self.pool.ping_task(task_id, exists_only) + def command_stop( + self, + mode=None, + cycle_point=None, + # NOTE clock_time YYYY/MM/DD-HH:mm back-compat removed + clock_time=None, + task=None + ): + # immediate shutdown + if mode: + self._set_stop(mode) + elif not any([mode, cycle_point, clock_time, task]): + # if no arguments provided do a standard clean shutdown + self._set_stop(StopMode.REQUEST_CLEAN) + + # schedule shutdown after tasks pass provided cycle point + if cycle_point: + point = self.get_standardised_point(cycle_point) + if self.pool.set_stop_point(point): + self.options.stopcp = str(point) + self.suite_db_mgr.put_suite_stop_cycle_point( + self.options.stopcp) + else: + # TODO: yield warning + pass + + # schedule shutdown after wallclock time passes provided time + if clock_time: + parser = TimePointParser() + clock_time = parser.parse(clock_time) + self.set_stop_clock( + int(clock_time.get("seconds_since_unix_epoch"))) + + # schedule shutdown after task succeeds + if task: + task_id = self.get_standardised_taskid(task) + if TaskID.is_valid_id(task_id): + self.set_stop_task(task_id) + else: + # TODO: yield warning + pass + def command_set_stop_cleanly(self, kill_active_tasks=False): """Stop job submission and set the flag for clean shutdown.""" + # TODO: deprecated by command_stop() self._set_stop() if kill_active_tasks: self.time_next_kill = time() def command_stop_now(self, terminate=False): """Shutdown immediately.""" + # TODO: deprecated by command_stop() if terminate: self._set_stop(StopMode.REQUEST_NOW_NOW) else: @@ -837,6 +881,7 @@ def _set_stop(self, stop_mode=None): def command_set_stop_after_point(self, point_string): """Set stop after ... point.""" + # TODO: deprecated by command_stop() stop_point = self.get_standardised_point(point_string) if self.pool.set_stop_point(stop_point): self.options.stopcp = str(stop_point) @@ -847,6 +892,7 @@ def command_set_stop_after_clock_time(self, arg): format: ISO 8601 compatible or YYYY/MM/DD-HH:mm (backwards comp.) """ + # TODO: deprecate parser = TimePointParser() try: stop_time = parser.parse(arg) @@ -859,12 +905,19 @@ def command_set_stop_after_clock_time(self, arg): def command_set_stop_after_task(self, task_id): """Set stop after a task.""" + # TODO: deprecate task_id = self.get_standardised_taskid(task_id) if TaskID.is_valid_id(task_id): self.set_stop_task(task_id) + def command_release(self, ids=None): + if ids: + return self.pool.release_tasks(ids) + self.release_suite() + def command_release_tasks(self, items): """Release tasks.""" + # TODO: deprecated by command_release() return self.pool.release_tasks(items) def command_poll_tasks(self, items=None, poll_succ=False): @@ -893,18 +946,33 @@ def command_kill_tasks(self, items=None): def command_release_suite(self): """Release all task proxies in the suite.""" + # TODO: deprecated by command_release() self.release_suite() + def command_hold(self, tasks=None, time=None): + if tasks: + self.pool.hold_tasks(tasks) + if time: + point = self.get_standardised_point(time) + self.hold_suite(point) + LOG.info( + 'The suite will pause when all tasks have passed %s', point) + if not (tasks or time): + self.hold_suite() + def command_hold_tasks(self, items): """Hold selected task proxies in the suite.""" + # TODO: deprecated by command_hold() return self.pool.hold_tasks(items) def command_hold_suite(self): """Hold all task proxies in the suite.""" + # TODO: deprecated by command_hold() self.hold_suite() def command_hold_after_point_string(self, point_string): """Hold tasks AFTER this point (itask.point > point).""" + # TODO: deprecated by command_hold() point = self.get_standardised_point(point_string) self.hold_suite(point) LOG.info( @@ -926,9 +994,9 @@ def command_remove_tasks(self, items, spawn=False): return self.pool.remove_tasks(items, spawn) def command_insert_tasks(self, items, stop_point_string=None, - no_check=False): + check_point=True): """Insert tasks.""" - return self.pool.insert_tasks(items, stop_point_string, no_check) + return self.pool.insert_tasks(items, stop_point_string, check_point) def command_nudge(self): """Cause the task processing loop to be invoked""" @@ -1904,7 +1972,11 @@ def hold_suite(self, point=None): self.task_events_mgr.pflag = True self.suite_db_mgr.put_suite_hold() else: - LOG.info("Setting suite hold cycle point: %s", point) + LOG.info( + 'Setting suite hold cycle point: %s.' + '\nThe suite will hold once all tasks have passed this point.', + point + ) self.pool.set_hold_point(point) self.suite_db_mgr.put_suite_hold_cycle_point(point) diff --git a/cylc/flow/scripts/cylc_ext_trigger.py b/cylc/flow/scripts/cylc_ext_trigger.py index 84269066e01..89ffbf483b8 100755 --- a/cylc/flow/scripts/cylc_ext_trigger.py +++ b/cylc/flow/scripts/cylc_ext_trigger.py @@ -86,7 +86,7 @@ def main(parser, options, suite, event_msg, event_id): try: pclient( 'put_ext_trigger', - {'event_message': event_msg, 'event_id': event_id}, + {'message': event_msg, 'id': event_id}, timeout=options.comms_timeout ) except ClientError as exc: diff --git a/cylc/flow/scripts/cylc_insert.py b/cylc/flow/scripts/cylc_insert.py index 67290741b4d..da9b548ffca 100755 --- a/cylc/flow/scripts/cylc_insert.py +++ b/cylc/flow/scripts/cylc_insert.py @@ -74,8 +74,8 @@ def main(parser, options, suite, *items): pclient( 'insert_tasks', - {'items': items, 'no_check': options.no_check, - 'stop_point_string': options.stop_point_string}, + {'tasks': items, 'check_point': not options.no_check, + 'stop_point': options.stop_point_string}, timeout=options.comms_timeout ) diff --git a/cylc/flow/scripts/cylc_kill.py b/cylc/flow/scripts/cylc_kill.py index d78e6438374..6cbbb5af188 100755 --- a/cylc/flow/scripts/cylc_kill.py +++ b/cylc/flow/scripts/cylc_kill.py @@ -56,7 +56,7 @@ def main(parser, options, suite, *task_globs): suite, options.owner, options.host, options.port) pclient( 'kill_tasks', - {'task_globs': task_globs}, + {'tasks': task_globs}, timeout=options.comms_timeout ) diff --git a/cylc/flow/scripts/cylc_poll.py b/cylc/flow/scripts/cylc_poll.py index 26eca9bb5ac..6993d04546e 100755 --- a/cylc/flow/scripts/cylc_poll.py +++ b/cylc/flow/scripts/cylc_poll.py @@ -60,7 +60,7 @@ def main(parser, options, suite, *task_globs): options.comms_timeout) pclient( 'poll_tasks', - {'task_globs': task_globs, 'poll_succ': options.poll_succ} + {'tasks': task_globs, 'poll_succeeded': options.poll_succ} ) diff --git a/cylc/flow/scripts/cylc_remove.py b/cylc/flow/scripts/cylc_remove.py index a53126f26e1..56ac043c820 100755 --- a/cylc/flow/scripts/cylc_remove.py +++ b/cylc/flow/scripts/cylc_remove.py @@ -59,7 +59,7 @@ def main(parser, options, suite, *task_globs): options.comms_timeout) pclient( 'remove_tasks', - {'task_globs': task_globs, 'spawn': (not options.no_spawn)} + {'tasks': task_globs, 'spawn': (not options.no_spawn)} ) diff --git a/cylc/flow/scripts/cylc_reset.py b/cylc/flow/scripts/cylc_reset.py index 2cb6bbb854e..8f5d2615e78 100755 --- a/cylc/flow/scripts/cylc_reset.py +++ b/cylc/flow/scripts/cylc_reset.py @@ -97,7 +97,7 @@ def main(parser, options, suite, *task_globs): options.comms_timeout) pclient( 'reset_task_states', - {'task_globs': task_globs, 'state': options.state, + {'tasks': task_globs, 'state': options.state, 'outputs': options.outputs} ) diff --git a/cylc/flow/scripts/cylc_spawn.py b/cylc/flow/scripts/cylc_spawn.py index c8498dcd9c7..226a8bc2970 100755 --- a/cylc/flow/scripts/cylc_spawn.py +++ b/cylc/flow/scripts/cylc_spawn.py @@ -58,7 +58,7 @@ def main(parser, options, suite, *task_globs): pclient( 'spawn_tasks', - {'task_globs': task_globs} + {'tasks': task_globs} ) diff --git a/cylc/flow/scripts/cylc_trigger.py b/cylc/flow/scripts/cylc_trigger.py index fbc4b9afb11..27a19b16128 100755 --- a/cylc/flow/scripts/cylc_trigger.py +++ b/cylc/flow/scripts/cylc_trigger.py @@ -119,7 +119,7 @@ def main(parser, options, suite, *task_globs): # Tell the suite server program to generate the job file. pclient( 'dry_run_tasks', - {'task_globs': [task_id], 'check_syntax': False} + {'tasks': [task_id], 'check_syntax': False} ) # Wait for the new job file to be written. Use mtime because the same @@ -198,7 +198,7 @@ def main(parser, options, suite, *task_globs): # Trigger the task proxy(s). pclient( 'trigger_tasks', - {'task_globs': task_globs, 'back_out': aborted} + {'tasks': task_globs, 'back_out': aborted} ) diff --git a/cylc/flow/suite_status.py b/cylc/flow/suite_status.py index e350f1e2552..008eede79e1 100644 --- a/cylc/flow/suite_status.py +++ b/cylc/flow/suite_status.py @@ -74,15 +74,27 @@ class StopMode(Enum): def describe(self): """Return a user-friendly description of this state.""" if self == self.AUTO: - return 'suite has completed' + return 'Wait until suite has completed.' if self == self.AUTO_ON_TASK_FAILURE: - return 'a task has finished' + return 'Wait until the first task fails.' if self == self.REQUEST_CLEAN: - return 'waiting for active jobs to complete' + return ( + 'Regular shutdown:\n' + '* Wait for all active jobs to complete.\n' + '* Run suite event handlers and wait for them to complete.' + ) if self == self.REQUEST_NOW: - return 'waiting for event handlers to complete' + return ( + 'Immediate shutdown\n' + "* Don't kill submitted or running jobs.\n" + '* Run suite event handlers and wait for them to complete.' + ) if self == self.REQUEST_NOW_NOW: - return 'immediate shutdown' + return ( + 'Immediate shutdown\n' + "* Don't kill submitted or running jobs.\n" + "* Don't run event handlers." + ) return '' diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index a70bb5ad1c8..e67199aaa98 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -101,7 +101,7 @@ def assign_queues(self): for queue, qconfig in self.config.cfg['scheduling']['queues'].items(): self.myq.update((name, queue) for name in qconfig['members']) - def insert_tasks(self, items, stopcp, no_check=False): + def insert_tasks(self, items, stopcp, check_point=True): """Insert tasks.""" n_warnings = 0 task_items = {} @@ -144,7 +144,7 @@ def insert_tasks(self, items, stopcp, no_check=False): # Check that the cycle point is on one of the tasks sequences. point = get_point(key[1]) - if not no_check: # Check if cycle point is on the tasks sequence. + if check_point: # Check if cycle point is on the tasks sequence. for sequence in taskdef.sequences: if sequence.is_on_sequence(point): break diff --git a/cylc/flow/task_state.py b/cylc/flow/task_state.py index 20e9fe0e085..76382fcfb73 100644 --- a/cylc/flow/task_state.py +++ b/cylc/flow/task_state.py @@ -53,6 +53,33 @@ # Job execution failed, but will try again soon: TASK_STATUS_RETRYING = "retrying" +TASK_STATUS_DESC = { + TASK_STATUS_RUNAHEAD: + 'Waiting for dependencies to be satisfied.', + TASK_STATUS_WAITING: + 'Waiting for dependencies to be satisfied.', + TASK_STATUS_QUEUED: + 'Depencies are satisfied, placed in internal queue.', + TASK_STATUS_EXPIRED: + 'Execution skipped.', + TASK_STATUS_READY: + 'Cylc is preparing a job for submission.', + TASK_STATUS_SUBMIT_FAILED: + 'Job submission has failed.', + TASK_STATUS_SUBMIT_RETRYING: + 'Atempting to re-submit job after previous failed attempt.', + TASK_STATUS_SUBMITTED: + 'Job has been submitted.', + TASK_STATUS_RETRYING: + 'Attempting to re-run job after previous failed attempt.', + TASK_STATUS_RUNNING: + 'Job is running.', + TASK_STATUS_FAILED: + 'Job has returned non-zero exit code.', + TASK_STATUS_SUCCEEDED: + 'Job has returned zero exit code.' +} + # Tasks statuses ordered according to task runtime progression. TASK_STATUSES_ORDERED = [ TASK_STATUS_RUNAHEAD, diff --git a/cylc/flow/tests/network/test_client.py b/cylc/flow/tests/network/test_client.py index 4e7a076b0a0..a67d0a94b90 100644 --- a/cylc/flow/tests/network/test_client.py +++ b/cylc/flow/tests/network/test_client.py @@ -74,7 +74,7 @@ def setUp(self) -> None: warnings = self.task_pool.insert_tasks( items=[task_proxy.identity], stopcp=None, - no_check=False + check_point=True ) assert warnings == 0 self.task_pool.release_runahead_tasks() diff --git a/cylc/flow/tests/network/test_publisher.py b/cylc/flow/tests/network/test_publisher.py index 796d78a63e3..3fd0be5ecce 100644 --- a/cylc/flow/tests/network/test_publisher.py +++ b/cylc/flow/tests/network/test_publisher.py @@ -76,7 +76,7 @@ def setUp(self) -> None: warnings = self.task_pool.insert_tasks( items=[task_proxy.identity], stopcp=None, - no_check=False + check_point=True ) assert 0 == warnings self.task_pool.release_runahead_tasks() diff --git a/cylc/flow/tests/network/test_resolvers.py b/cylc/flow/tests/network/test_resolvers.py index d28b19ba9ad..566eb1d1ee4 100644 --- a/cylc/flow/tests/network/test_resolvers.py +++ b/cylc/flow/tests/network/test_resolvers.py @@ -23,7 +23,7 @@ DataStoreMgr, ID_DELIM, EDGES, TASK_PROXIES, WORKFLOW ) from cylc.flow.network.schema import parse_node_id -from cylc.flow.network.resolvers import workflow_filter, node_filter, Resolvers +from cylc.flow.network.resolvers import node_filter, Resolvers def _run_coroutine(coro): @@ -55,15 +55,6 @@ class FakeFlow: status = 'running' -def test_workflow_filter(): - data = {WORKFLOW: FakeFlow()} - args = deepcopy(FLOW_ARGS) - args['workflows'].append(('*', 'jin', None)) - assert not workflow_filter(data, args) - args['workflows'].append(('qux', 'baz', 'running')) - assert workflow_filter(data, args) - - class FakeNode: id = f'qux{ID_DELIM}baz{ID_DELIM}20130808T00{ID_DELIM}foo{ID_DELIM}1' namespace = ['root', 'foo'] @@ -119,7 +110,7 @@ def setUp(self) -> None: warnings = self.task_pool.insert_tasks( items=[task_proxy.identity], stopcp=None, - no_check=False + check_point=True ) assert 0 == warnings self.task_pool.release_runahead_tasks() diff --git a/cylc/flow/tests/network/test_server.py b/cylc/flow/tests/network/test_server.py index 95b53c14c93..0a21892d80c 100644 --- a/cylc/flow/tests/network/test_server.py +++ b/cylc/flow/tests/network/test_server.py @@ -75,7 +75,7 @@ def setUp(self) -> None: warnings = self.task_pool.insert_tasks( items=[task_proxy.identity], stopcp=None, - no_check=False + check_point=True ) assert 0 == warnings self.task_pool.release_runahead_tasks() diff --git a/cylc/flow/tests/network/test_subscriber.py b/cylc/flow/tests/network/test_subscriber.py index 91786e866f0..91ab9f7a4e6 100644 --- a/cylc/flow/tests/network/test_subscriber.py +++ b/cylc/flow/tests/network/test_subscriber.py @@ -81,7 +81,7 @@ def setUp(self) -> None: warnings = self.task_pool.insert_tasks( items=[task_proxy.identity], stopcp=None, - no_check=False + check_point=True ) assert warnings == 0 self.task_pool.release_runahead_tasks() diff --git a/cylc/flow/tests/test_data_store_mgr.py b/cylc/flow/tests/test_data_store_mgr.py index 022b3892cf0..2ca63ac7361 100644 --- a/cylc/flow/tests/test_data_store_mgr.py +++ b/cylc/flow/tests/test_data_store_mgr.py @@ -69,7 +69,7 @@ def setUp(self) -> None: warnings = self.task_pool.insert_tasks( items=[task_proxy.identity], stopcp=None, - no_check=False + check_point=True ) assert 0 == warnings self.task_pool.release_runahead_tasks() diff --git a/tests/cylc-insert/12-cycle-500-tasks.t b/tests/cylc-insert/12-cycle-500-tasks.t index 6346227ba73..91ec2764111 100755 --- a/tests/cylc-insert/12-cycle-500-tasks.t +++ b/tests/cylc-insert/12-cycle-500-tasks.t @@ -31,7 +31,7 @@ cut -d' ' -f 2- "${SUITE_RUN_DIR}/log/suite/log" >'trimmed-log' for I in {001..500}; do echo "INFO - [v_i${I}.2008] -submit-num=00, inserted" done - echo "INFO - Command succeeded: insert_tasks(['2008/*'], stop_point_string=None, no_check=False)" + echo "INFO - Command succeeded: insert_tasks(['2008/*'], stop_point_string=None, check_point=True)" } >'expected-log' contains_ok 'trimmed-log' 'expected-log'