diff --git a/simpleflow/history.py b/simpleflow/history.py index ccc3dd95d..dfed7c7d3 100644 --- a/simpleflow/history.py +++ b/simpleflow/history.py @@ -4,10 +4,9 @@ class History(object): def __init__(self, history): self._history = history - self._activities = collections.defaultdict( - lambda: {'type': 'activity'}) - self._child_workflows = collections.defaultdict( - lambda: {'type': 'child_workflow'}) + self._activities = collections.OrderedDict() + self._child_workflows = collections.OrderedDict() + self._tasks = [] @property def events(self): @@ -29,9 +28,11 @@ def get_activity(event): 'version': event.activity_type['version'], 'state': event.state, 'scheduled_id': event.id, + 'scheduled_timestamp': event.timestamp, } if event.activity_id not in self._activities: self._activities[event.activity_id] = activity + self._tasks.append(activity) else: # When the executor retries a task, it schedules it again. # We have to take care of not overriding some values set by the @@ -40,20 +41,37 @@ def get_activity(event): # corresponds to the last execution. self._activities[event.activity_id].update(activity) elif event.state == 'schedule_failed': - activity = self._activities[event.activity_id] - activity['state'] = event.state - activity['cause'] = event.cause - activity['activity_type'] = event.activity_type.copy() + activity = { + 'type': 'activity', + 'state': event.state, + 'cause': event.cause, + 'activity_type': event.activity_type.copy(), + 'schedule_failed_timestamp': event.timestamp, + } + + if event.activity_id not in self._activities: + self._activities[event.activity_id] = activity + self._tasks.append(activity) + else: + # When the executor retries a task, it schedules it again. + # We have to take care of not overriding some values set by the + # previous execution of the task such as the number of retries + # in ``retry``. As the state of the event mutates, it + # corresponds to the last execution. + self._activities[event.activity_id].update(activity) + elif event.state == 'started': activity = get_activity(event) activity['state'] = event.state activity['identity'] = event.identity activity['started_id'] = event.id + activity['started_timestamp'] = event.timestamp elif event.state == 'completed': activity = get_activity(event) activity['state'] = event.state activity['result'] = event.result activity['completed_id'] = event.id + activity['completed_timestamp'] = event.timestamp elif event.state == 'timed_out': activity = get_activity(event) activity['state'] = event.state @@ -62,6 +80,7 @@ def get_activity(event): events[activity['scheduled_id'] - 1], '{}_timeout'.format(event.timeout_type.lower())) activity['timed_out_id'] = event.id + activity['timed_out_timestamp'] = event.timestamp if 'retry' not in activity: activity['retry'] = 0 else: @@ -71,6 +90,7 @@ def get_activity(event): activity['state'] = event.state activity['reason'] = event.reason activity['details'] = event.details + activity['failed_timestamp'] = event.timestamp if 'retry' not in activity: activity['retry'] = 0 else: @@ -80,6 +100,7 @@ def get_activity(event): activity['state'] = event.state if hasattr(event, 'details'): activity['details'] = event.details + activity['cancelled_timestamp'] = event.timestamp def parse_child_workflow_event(self, events, event): """Aggregate all the attributes of a workflow in a single entry. @@ -106,18 +127,25 @@ def get_workflow(event): 'state': event.state, 'initiated_event_id': event.id, } - self._child_workflows[event.workflow_id] = workflow + workflow['initiated_event_timestamp'] = event.timestamp + if event.workflow_id not in self._child_workflows: + self._child_workflows[event.workflow_id] = workflow + self._tasks.append(workflow) + else: + self._child_workflows[event.workflow_id].update(workflow) elif event.state == 'started': workflow = get_workflow(event) workflow['state'] = event.state workflow['run_id'] = event.workflow_execution['runId'] workflow['workflow_id'] = event.workflow_execution['workflowId'] workflow['started_id'] = event.id + workflow['started_timestamp'] = event.timestamp elif event.state == 'completed': workflow = get_workflow(event) workflow['state'] = event.state workflow['result'] = event.result workflow['completed_id'] = event.id + workflow['completed_timestamp'] = event.timestamp def parse(self): events = self.events