diff --git a/go.mod b/go.mod index 6f8505d21..a2e9a134b 100644 --- a/go.mod +++ b/go.mod @@ -6,8 +6,8 @@ require ( github.com/DiSiqueira/GoTree v1.0.1-0.20180907134536-53a8e837f295 github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1 github.com/fatih/color v1.13.0 - github.com/flyteorg/flyteidl v1.3.12 - github.com/flyteorg/flyteplugins v1.0.42 + github.com/flyteorg/flyteidl v1.3.14 + github.com/flyteorg/flyteplugins v1.0.43 github.com/flyteorg/flytestdlib v1.0.15 github.com/ghodss/yaml v1.0.0 github.com/go-redis/redis v6.15.7+incompatible diff --git a/go.sum b/go.sum index 44da2028e..92e4c9d37 100644 --- a/go.sum +++ b/go.sum @@ -260,10 +260,10 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= -github.com/flyteorg/flyteidl v1.3.12 h1:RTcxCrqKU235cWuy+j3gkmqPJOaaYEcJaT6fsRjoS8Q= -github.com/flyteorg/flyteidl v1.3.12/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM= -github.com/flyteorg/flyteplugins v1.0.42 h1:vkBe8hzG7jlpqE6ZBKf7cENjkN8n+t6ff+EpxYeRrec= -github.com/flyteorg/flyteplugins v1.0.42/go.mod h1:5in2y2zWO6fbheoPJ44wNRppfVpjkWXCs0dy+oA232o= +github.com/flyteorg/flyteidl v1.3.14 h1:o5M0g/r6pXTPu5PEurbYxbQmuOu3hqqsaI2M6uvK0N8= +github.com/flyteorg/flyteidl v1.3.14/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM= +github.com/flyteorg/flyteplugins v1.0.43 h1:uI/Y88xqJKfvfuxfu0Sw9CNZ7iu3+HUwwRhxh558cbs= +github.com/flyteorg/flyteplugins v1.0.43/go.mod h1:ztsonku5fKwyxcIg1k69PTiBVjRI6d3nK5DnC+iwx08= github.com/flyteorg/flytestdlib v1.0.15 h1:kv9jDQmytbE84caY+pkZN8trJU2ouSAmESzpTEhfTt0= github.com/flyteorg/flytestdlib v1.0.15/go.mod h1:ghw/cjY0sEWIIbyCtcJnL/Gt7ZS7gf9SUi0CCPhbz3s= github.com/flyteorg/stow v0.3.6 h1:jt50ciM14qhKBaIrB+ppXXY+SXB59FNREFgTJqCyqIk= diff --git a/pkg/controller/nodes/executor.go b/pkg/controller/nodes/executor.go index 7e42e0ee3..3d0707469 100644 --- a/pkg/controller/nodes/executor.go +++ b/pkg/controller/nodes/executor.go @@ -524,6 +524,7 @@ func (c *nodeExecutor) finalize(ctx context.Context, h handler.Node, nCtx handle func (c *nodeExecutor) handleNotYetStartedNode(ctx context.Context, dag executors.DAGStructure, nCtx *nodeExecContext, _ handler.Node) (executors.NodeStatus, error) { logger.Debugf(ctx, "Node not yet started, running pre-execute") defer logger.Debugf(ctx, "Node pre-execute completed") + occurredAt := time.Now() p, err := c.preExecute(ctx, dag, nCtx) if err != nil { logger.Errorf(ctx, "failed preExecute for node. Error: %s", err.Error()) @@ -547,6 +548,7 @@ func (c *nodeExecutor) handleNotYetStartedNode(ctx context.Context, dag executor if np != nodeStatus.GetPhase() { // assert np == Queued! logger.Infof(ctx, "Change in node state detected from [%s] -> [%s]", nodeStatus.GetPhase().String(), np.String()) + p = p.WithOccuredAt(occurredAt) nev, err := ToNodeExecutionEvent(nCtx.NodeExecutionMetadata().GetNodeExecutionID(), p, nCtx.InputReader().GetInputPath().String(), nodeStatus, nCtx.ExecutionContext().GetEventVersion(), @@ -691,6 +693,7 @@ func (c *nodeExecutor) handleQueuedOrRunningNode(ctx context.Context, nCtx *node Message: err.Error(), }, }, + ReportedAt: ptypes.TimestampNow(), }) if err != nil { @@ -1152,6 +1155,7 @@ func (c *nodeExecutor) AbortHandler(ctx context.Context, execContext executors.E }, }, ProducerId: c.clusterID, + ReportedAt: ptypes.TimestampNow(), }) if err != nil && !eventsErr.IsNotFound(err) && !eventsErr.IsEventIncompatibleClusterError(err) { if errors2.IsCausedBy(err, errors.IllegalStateError) { diff --git a/pkg/controller/nodes/handler/transition_info.go b/pkg/controller/nodes/handler/transition_info.go index 400b00935..5d302f4fa 100644 --- a/pkg/controller/nodes/handler/transition_info.go +++ b/pkg/controller/nodes/handler/transition_info.go @@ -105,6 +105,16 @@ func (p PhaseInfo) WithInfo(i *ExecutionInfo) PhaseInfo { } } +func (p PhaseInfo) WithOccuredAt(t time.Time) PhaseInfo { + return PhaseInfo{ + p: p.p, + occurredAt: t, + err: p.err, + info: p.info, + reason: p.reason, + } +} + var PhaseInfoUndefined = PhaseInfo{p: EPhaseUndefined} func phaseInfo(p EPhase, err *core.ExecutionError, info *ExecutionInfo, reason string) PhaseInfo { diff --git a/pkg/controller/nodes/task/handler.go b/pkg/controller/nodes/task/handler.go index 9d4b2e4e2..8c1ce6575 100644 --- a/pkg/controller/nodes/task/handler.go +++ b/pkg/controller/nodes/task/handler.go @@ -659,6 +659,7 @@ func (t Handler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext) } barrierTick := uint32(0) + occurredAt := time.Now() // STEP 2: If no cache-hit and not transitioning to PhaseWaitingForCache, then lets invoke the plugin and wait for a transition out of undefined if pluginTrns.execInfo.TaskNodeInfo == nil || (pluginTrns.pInfo.Phase() != pluginCore.PhaseWaitingForCache && pluginTrns.execInfo.TaskNodeInfo.TaskNodeMetadata.CacheStatus != core.CatalogCacheStatus_CACHE_HIT) { @@ -724,6 +725,7 @@ func (t Handler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext) PluginID: p.GetID(), ResourcePoolInfo: tCtx.rm.GetResourcePoolInfo(), ClusterID: t.clusterID, + OccurredAt: occurredAt, }) if err != nil { return handler.UnknownTransition, err @@ -750,6 +752,7 @@ func (t Handler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext) PluginID: p.GetID(), ResourcePoolInfo: tCtx.rm.GetResourcePoolInfo(), ClusterID: t.clusterID, + OccurredAt: occurredAt, }) if err != nil { logger.Errorf(ctx, "failed to convert plugin transition to TaskExecutionEvent. Error: %s", err.Error()) diff --git a/pkg/controller/nodes/task/transformer.go b/pkg/controller/nodes/task/transformer.go index db4e8a5a9..6faa93f70 100644 --- a/pkg/controller/nodes/task/transformer.go +++ b/pkg/controller/nodes/task/transformer.go @@ -1,6 +1,8 @@ package task import ( + "time" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" pluginCore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core" @@ -9,9 +11,10 @@ import ( "github.com/flyteorg/flytepropeller/pkg/controller/config" "github.com/flyteorg/flytepropeller/pkg/controller/executors" "github.com/flyteorg/flytepropeller/pkg/controller/nodes/common" - "github.com/golang/protobuf/ptypes" - "github.com/flyteorg/flytepropeller/pkg/controller/nodes/handler" + + "github.com/golang/protobuf/ptypes" + timestamppb "github.com/golang/protobuf/ptypes/timestamp" ) // This is used by flyteadmin to indicate that map tasks now report subtask metadata individually. @@ -78,15 +81,27 @@ type ToTaskExecutionEventInputs struct { PluginID string ResourcePoolInfo []*event.ResourcePoolInfo ClusterID string + OccurredAt time.Time } func ToTaskExecutionEvent(input ToTaskExecutionEventInputs) (*event.TaskExecutionEvent, error) { // Transitions to a new phase - tm := ptypes.TimestampNow() var err error + var occurredAt *timestamppb.Timestamp if i := input.Info.Info(); i != nil && i.OccurredAt != nil { - tm, err = ptypes.TimestampProto(*i.OccurredAt) + occurredAt, err = ptypes.TimestampProto(*i.OccurredAt) + } else { + occurredAt, err = ptypes.TimestampProto(input.OccurredAt) + } + + if err != nil { + return nil, err + } + + reportedAt := ptypes.TimestampNow() + if i := input.Info.Info(); i != nil && i.ReportedAt != nil { + occurredAt, err = ptypes.TimestampProto(*i.ReportedAt) if err != nil { return nil, err } @@ -127,11 +142,12 @@ func ToTaskExecutionEvent(input ToTaskExecutionEventInputs) (*event.TaskExecutio Phase: ToTaskEventPhase(input.Info.Phase()), PhaseVersion: input.Info.Version(), ProducerId: input.ClusterID, - OccurredAt: tm, + OccurredAt: occurredAt, TaskType: input.TaskType, Reason: input.Info.Reason(), Metadata: metadata, EventVersion: taskExecutionEventVersion, + ReportedAt: reportedAt, } if input.Info.Phase().IsSuccess() && input.OutputWriter != nil { diff --git a/pkg/controller/nodes/transformers.go b/pkg/controller/nodes/transformers.go index 91cb7ea38..d34fc7a7a 100644 --- a/pkg/controller/nodes/transformers.go +++ b/pkg/controller/nodes/transformers.go @@ -114,6 +114,7 @@ func ToNodeExecutionEvent(nodeExecID *core.NodeExecutionIdentifier, OccurredAt: occurredTime, ProducerId: clusterID, EventVersion: nodeExecutionEventVersion, + ReportedAt: ptypes.TimestampNow(), } } else { nev = &event.NodeExecutionEvent{ @@ -122,6 +123,7 @@ func ToNodeExecutionEvent(nodeExecID *core.NodeExecutionIdentifier, OccurredAt: occurredTime, ProducerId: clusterID, EventVersion: nodeExecutionEventVersion, + ReportedAt: ptypes.TimestampNow(), } }