Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix state change when workflow output fails rendering #102

Merged
merged 3 commits into from
Nov 10, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/source/overview.rst
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,9 @@ provider relays the status and result back to the conductor. The conductor then
change, keeps track of the sequence of task execution, manages change history of the runtime
context, evaluate outbound task transitions, identifies any new tasks for execution, and determines
the overall workflow state and result.

When there is no more tasks identified to run next, the workflow is complete. On workflow
completion, regardless of state, the workflow result contains the list of error(s) if any and the
output as defined in the workflow defintion. If the workflow failed, the workflow conductor will do
its best to render the output from the latest version of the runtime context at completion of the
workflow execution.
20 changes: 15 additions & 5 deletions orquesta/conducting.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,15 +229,21 @@ def log_entry(self, entry_type, message,
if entry_type not in ['info', 'warn', 'error']:
raise exc.WorkflowLogEntryError('The log entry type "%s" is not valid.' % entry_type)

# Create a log entry.
# Identify the appropriate log and then log the entry.
log = self.errors if entry_type == 'error' else self.log

# Create the log entry.
entry = {'type': entry_type, 'message': message}
dx.set_dict_value(entry, 'task_id', task_id, insert_null=False)
dx.set_dict_value(entry, 'task_transition_id', task_transition_id, insert_null=False)
dx.set_dict_value(entry, 'result', result, insert_null=False)
dx.set_dict_value(entry, 'data', data, insert_null=False)

# Identify the appropriate log and then log the entry.
log = self.errors if entry_type == 'error' else self.log
# Ignore if this is a duplicate.
if len(list(filter(lambda x: x == entry, log))) > 0:
return

# Append the log entry.
log.append(entry)

def log_error(self, e, task_id=None, task_transition_id=None):
Expand Down Expand Up @@ -326,8 +332,10 @@ def _update_workflow_terminal_context(self, ctx_diff, task_flow_idx):
term_ctx_entry['value'] = term_ctx_val

def _render_workflow_outputs(self):
wf_state = self.get_workflow_state()

# Render workflow outputs if workflow is completed.
if self.get_workflow_state() in states.COMPLETED_STATES and not self._outputs:
if wf_state in states.COMPLETED_STATES and not self._outputs:
workflow_context = self.get_workflow_terminal_context()['value']
outputs, errors = self.spec.render_output(workflow_context)

Expand All @@ -338,7 +346,9 @@ def _render_workflow_outputs(self):
# Log errors if any returned and mark workflow as failed.
if errors:
self.log_errors(errors)
self.request_workflow_state(states.FAILED)

if wf_state not in [states.EXPIRED, states.ABANDONED, states.CANCELED]:
self.request_workflow_state(states.FAILED)

def get_workflow_output(self):
return copy.deepcopy(self._outputs) if self._outputs else None
Expand Down
57 changes: 56 additions & 1 deletion orquesta/tests/unit/conducting/test_workflow_conductor.py
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,7 @@ def test_set_workflow_paused_when_has_active_tasks(self):
conductor.request_workflow_state(states.PAUSED)
self.assertEqual(conductor.get_workflow_state(), states.PAUSING)

def test_append_log_entry(self):
def test_append_log_entries(self):
inputs = {'a': 123, 'b': True}
conductor = self._prep_conductor(inputs=inputs, state=states.RUNNING)

Expand Down Expand Up @@ -838,3 +838,58 @@ def test_append_log_entry(self):
self.assertIsInstance(conductor.flow, conducting.TaskFlow)
self.assertListEqual(conductor.log, expected_log_entries)
self.assertListEqual(conductor.errors, expected_errors)

def test_append_duplicate_log_entries(self):
inputs = {'a': 123, 'b': True}
conductor = self._prep_conductor(inputs=inputs, state=states.RUNNING)

extra = {'x': 1234}
conductor.log_entry('info', 'The workflow is running as expected.', data=extra)
conductor.log_entry('info', 'The workflow is running as expected.', data=extra)
conductor.log_entry('warn', 'The task may be running a little bit slow.', task_id='task1')
conductor.log_entry('warn', 'The task may be running a little bit slow.', task_id='task1')
conductor.log_entry('error', 'This is baloney.', task_id='task1')
conductor.log_entry('error', 'This is baloney.', task_id='task1')
conductor.log_error(TypeError('Something is not right.'), task_id='task1')
conductor.log_error(TypeError('Something is not right.'), task_id='task1')
conductor.log_errors([KeyError('task1'), ValueError('foobar')], task_id='task1')
conductor.log_errors([KeyError('task1'), ValueError('foobar')], task_id='task1')

expected_log_entries = [
{
'type': 'info',
'message': 'The workflow is running as expected.',
'data': extra
},
{
'type': 'warn',
'message': 'The task may be running a little bit slow.',
'task_id': 'task1'
}
]

expected_errors = [
{
'type': 'error',
'message': 'This is baloney.',
'task_id': 'task1'
},
{
'type': 'error',
'message': 'TypeError: Something is not right.',
'task_id': 'task1'
},
{
'type': 'error',
'message': "KeyError: 'task1'",
'task_id': 'task1'
},
{
'type': 'error',
'message': 'ValueError: foobar',
'task_id': 'task1'
}
]

self.assertListEqual(conductor.log, expected_log_entries)
self.assertListEqual(conductor.errors, expected_errors)
106 changes: 106 additions & 0 deletions orquesta/tests/unit/conducting/test_workflow_conductor_cancel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from orquesta import conducting
from orquesta import events
from orquesta.specs import native as specs
from orquesta import states
from orquesta.tests.unit import base


class WorkflowConductorCancelTest(base.WorkflowConductorTest):

def test_workflow_output(self):
wf_def = """
version: 1.0

output:
- x: 123
- y: <% ctx().x %>

tasks:
task1:
action: core.noop
"""

expected_output = {
'x': 123,
'y': 123
}

expected_errors = []

spec = specs.WorkflowSpec(wf_def)
self.assertDictEqual(spec.inspect(), {})

# Run the workflow and keep it running.
conductor = conducting.WorkflowConductor(spec)
conductor.request_workflow_state(states.RUNNING)
task_name = 'task1'
conductor.update_task_flow(task_name, events.ActionExecutionEvent(states.RUNNING))

# Cancels the workflow and complete task1.
conductor.request_workflow_state(states.CANCELING)
conductor.update_task_flow(task_name, events.ActionExecutionEvent(states.SUCCEEDED))

# Check workflow status and output.
self.assertEqual(conductor.get_workflow_state(), states.CANCELED)
self.assertListEqual(conductor.errors, expected_errors)
self.assertDictEqual(conductor.get_workflow_output(), expected_output)

def test_workflow_output_with_error(self):
wf_def = """
version: 1.0

output:
- x: 123
- y: <% ctx().x %>
- z: <% ctx().y.value %>

tasks:
task1:
action: core.noop
"""

expected_output = {
'x': 123,
'y': 123
}

expected_errors = [
{
'type': 'error',
'message': (
'YaqlEvaluationException: Unable to evaluate expression '
'\'<% ctx().y.value %>\'. NoFunctionRegisteredException: '
'Unknown function "#property#value"'
)
}
]

spec = specs.WorkflowSpec(wf_def)
self.assertDictEqual(spec.inspect(), {})

# Run the workflow and keep it running.
conductor = conducting.WorkflowConductor(spec)
conductor.request_workflow_state(states.RUNNING)
task_name = 'task1'
conductor.update_task_flow(task_name, events.ActionExecutionEvent(states.RUNNING))

# Cancels the workflow and complete task1.
conductor.request_workflow_state(states.CANCELING)
conductor.update_task_flow(task_name, events.ActionExecutionEvent(states.SUCCEEDED))

# Check workflow status is not changed to failed given the output error.
self.assertEqual(conductor.get_workflow_state(), states.CANCELED)
self.assertListEqual(conductor.errors, expected_errors)
self.assertDictEqual(conductor.get_workflow_output(), expected_output)