Skip to content

Commit

Permalink
Update history: add timestamp of each state
Browse files Browse the repository at this point in the history
  • Loading branch information
Greg Leclercq committed Mar 12, 2015
1 parent b5809f9 commit 96d49f6
Showing 1 changed file with 37 additions and 9 deletions.
46 changes: 37 additions & 9 deletions simpleflow/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@
class History(object):
def __init__(self, history):
self._history = history
self._activities = collections.defaultdict(
lambda: {'type': 'activity'})
self._child_workflows = collections.defaultdict(
lambda: {'type': 'child_workflow'})
self._activities = collections.OrderedDict()
self._child_workflows = collections.OrderedDict()
self._tasks = []

@property
def events(self):
Expand All @@ -29,9 +28,11 @@ def get_activity(event):
'version': event.activity_type['version'],
'state': event.state,
'scheduled_id': event.id,
'scheduled_timestamp': event.timestamp,
}
if event.activity_id not in self._activities:
self._activities[event.activity_id] = activity
self._tasks.append(activity)
else:
# When the executor retries a task, it schedules it again.
# We have to take care of not overriding some values set by the
Expand All @@ -40,20 +41,37 @@ def get_activity(event):
# corresponds to the last execution.
self._activities[event.activity_id].update(activity)
elif event.state == 'schedule_failed':
activity = self._activities[event.activity_id]
activity['state'] = event.state
activity['cause'] = event.cause
activity['activity_type'] = event.activity_type.copy()
activity = {
'type': 'activity',
'state': event.state,
'cause': event.cause,
'activity_type': event.activity_type.copy(),
'schedule_failed_timestamp': event.timestamp,
}

if event.activity_id not in self._activities:
self._activities[event.activity_id] = activity
self._tasks.append(activity)
else:
# When the executor retries a task, it schedules it again.
# We have to take care of not overriding some values set by the
# previous execution of the task such as the number of retries
# in ``retry``. As the state of the event mutates, it
# corresponds to the last execution.
self._activities[event.activity_id].update(activity)

elif event.state == 'started':
activity = get_activity(event)
activity['state'] = event.state
activity['identity'] = event.identity
activity['started_id'] = event.id
activity['started_timestamp'] = event.timestamp
elif event.state == 'completed':
activity = get_activity(event)
activity['state'] = event.state
activity['result'] = event.result
activity['completed_id'] = event.id
activity['completed_timestamp'] = event.timestamp
elif event.state == 'timed_out':
activity = get_activity(event)
activity['state'] = event.state
Expand All @@ -62,6 +80,7 @@ def get_activity(event):
events[activity['scheduled_id'] - 1],
'{}_timeout'.format(event.timeout_type.lower()))
activity['timed_out_id'] = event.id
activity['timed_out_timestamp'] = event.timestamp
if 'retry' not in activity:
activity['retry'] = 0
else:
Expand All @@ -71,6 +90,7 @@ def get_activity(event):
activity['state'] = event.state
activity['reason'] = event.reason
activity['details'] = event.details
activity['failed_timestamp'] = event.timestamp
if 'retry' not in activity:
activity['retry'] = 0
else:
Expand All @@ -80,6 +100,7 @@ def get_activity(event):
activity['state'] = event.state
if hasattr(event, 'details'):
activity['details'] = event.details
activity['cancelled_timestamp'] = event.timestamp

def parse_child_workflow_event(self, events, event):
"""Aggregate all the attributes of a workflow in a single entry.
Expand All @@ -106,18 +127,25 @@ def get_workflow(event):
'state': event.state,
'initiated_event_id': event.id,
}
self._child_workflows[event.workflow_id] = workflow
workflow['initiated_event_timestamp'] = event.timestamp
if event.workflow_id not in self._child_workflows:
self._child_workflows[event.workflow_id] = workflow
self._tasks.append(workflow)
else:
self._child_workflows[event.workflow_id].update(workflow)
elif event.state == 'started':
workflow = get_workflow(event)
workflow['state'] = event.state
workflow['run_id'] = event.workflow_execution['runId']
workflow['workflow_id'] = event.workflow_execution['workflowId']
workflow['started_id'] = event.id
workflow['started_timestamp'] = event.timestamp
elif event.state == 'completed':
workflow = get_workflow(event)
workflow['state'] = event.state
workflow['result'] = event.result
workflow['completed_id'] = event.id
workflow['completed_timestamp'] = event.timestamp

def parse(self):
events = self.events
Expand Down

0 comments on commit 96d49f6

Please sign in to comment.