Skip to content

Commit

Permalink
feat(task): Inject Task's LatestSuccess Timestamp In Flux Extern (#19402
Browse files Browse the repository at this point in the history
)

* feat(task): Inject latest success/failure into extern.

* chore(task/backend): Don't specify an extern if there are no statements.

* chore(task/executor): Don't apply the latest failure for now.

* chore(changelog): Add 19402 to changelog.

* chore(kit): Introduce feature flag for time injection.

* chore(task/executor): Guard injection into extern by feature flag.

* chore(task/executor): No need for this subtest pattern.

* chore(task/executor): Add tests for extern injection.
  • Loading branch information
brettbuddin authored Aug 25, 2020
1 parent 342a298 commit 1ae2541
Show file tree
Hide file tree
Showing 6 changed files with 240 additions and 55 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
1. [19334](https://github.com/influxdata/influxdb/pull/19334): Add --active-config flag to influx to set config for single command
1. [19219](https://github.com/influxdata/influxdb/pull/19219): List buckets via the API now supports after (ID) parameter as an alternative to offset.
1. [19390](https://github.com/influxdata/influxdb/pull/19390): Record last success and failure run times in the Task
1. [19402](https://github.com/influxdata/influxdb/pull/19402): Inject Task's LatestSuccess Timestamp In Flux Extern

### Bug Fixes

Expand Down
6 changes: 6 additions & 0 deletions flags.yml
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,9 @@
key: enforceOrgDashboardLimits
default: false
contact: Compute Team

- name: Inject Latest Success Time
description: Inject the latest successful task run timestamp into a Task query extern when executing.
key: injectLatestSuccessTime
default: false
contact: Compute Team
16 changes: 16 additions & 0 deletions kit/feature/list.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

80 changes: 70 additions & 10 deletions task/backend/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package executor

import (
"context"
"encoding/json"
"fmt"
"sync"
"time"

"github.com/influxdata/flux"
"github.com/influxdata/flux/ast"
"github.com/influxdata/flux/lang"
"github.com/influxdata/flux/runtime"
"github.com/influxdata/influxdb/v2"
Expand All @@ -22,6 +24,9 @@ import (
const (
maxPromises = 1000
defaultMaxWorkers = 100

latestSuccessOption = "tasks.latestSuccessTime"
latestFailureOption = "tasks.latestFailureTime"
)

var _ scheduler.Executor = (*Executor)(nil)
Expand Down Expand Up @@ -70,7 +75,31 @@ func WithMaxWorkers(n int) executorOption {

// CompilerBuilderFunc is a function that yields a new flux.Compiler. The
// context.Context provided can be assumed to be an authorized context.
type CompilerBuilderFunc func(ctx context.Context, query string, now time.Time) (flux.Compiler, error)
type CompilerBuilderFunc func(ctx context.Context, query string, ts CompilerBuilderTimestamps) (flux.Compiler, error)

// CompilerBuilderTimestamps contains timestamps which should be provided along
// with a Task query.
type CompilerBuilderTimestamps struct {
Now time.Time
LatestSuccess time.Time
}

func (ts CompilerBuilderTimestamps) Extern() *ast.File {
var body []ast.Statement

if !ts.LatestSuccess.IsZero() {
body = append(body, &ast.OptionStatement{
Assignment: &ast.VariableAssignment{
ID: &ast.Identifier{Name: latestSuccessOption},
Init: &ast.DateTimeLiteral{
Value: ts.LatestSuccess,
},
},
})
}

return &ast.File{Body: body}
}

// WithSystemCompilerBuilder is an Executor option that configures a
// CompilerBuilderFunc to be used when compiling queries for System Tasks.
Expand Down Expand Up @@ -416,8 +445,6 @@ func (w *worker) start(p *promise) {
}

func (w *worker) finish(p *promise, rs influxdb.RunStatus, err error) {

// trace
span, ctx := tracing.StartSpanFromContext(p.ctx)
defer span.Finish()

Expand Down Expand Up @@ -471,7 +498,10 @@ func (w *worker) executeQuery(p *promise) {
if p.task.Type != influxdb.TaskSystemType {
buildCompiler = w.nonSystemBuildCompiler
}
compiler, err := buildCompiler(ctx, p.task.Flux, p.run.ScheduledFor)
compiler, err := buildCompiler(ctx, p.task.Flux, CompilerBuilderTimestamps{
Now: p.run.ScheduledFor,
LatestSuccess: p.task.LatestSuccess,
})
if err != nil {
w.finish(p, influxdb.RunFail, influxdb.ErrFluxParseError(err))
return
Expand Down Expand Up @@ -592,21 +622,45 @@ func exhaustResultIterators(res flux.Result) error {
}

// NewASTCompiler parses a Flux query string into an AST representatation.
func NewASTCompiler(_ context.Context, query string, now time.Time) (flux.Compiler, error) {
func NewASTCompiler(ctx context.Context, query string, ts CompilerBuilderTimestamps) (flux.Compiler, error) {
pkg, err := runtime.ParseToJSON(query)
if err != nil {
return nil, err
}
var externBytes []byte
if feature.InjectLatestSuccessTime().Enabled(ctx) {
extern := ts.Extern()
if len(extern.Body) > 0 {
var err error
externBytes, err = json.Marshal(extern)
if err != nil {
return nil, err
}
}
}
return lang.ASTCompiler{
AST: pkg,
Now: now,
AST: pkg,
Now: ts.Now,
Extern: externBytes,
}, nil
}

// NewFluxCompiler wraps a Flux query string in a raw-query representation.
func NewFluxCompiler(_ context.Context, query string, _ time.Time) (flux.Compiler, error) {
func NewFluxCompiler(ctx context.Context, query string, ts CompilerBuilderTimestamps) (flux.Compiler, error) {
var externBytes []byte
if feature.InjectLatestSuccessTime().Enabled(ctx) {
extern := ts.Extern()
if len(extern.Body) > 0 {
var err error
externBytes, err = json.Marshal(extern)
if err != nil {
return nil, err
}
}
}
return lang.FluxCompiler{
Query: query,
Query: query,
Extern: externBytes,
// TODO(brett): This mitigates an immediate problem where
// Checks/Notifications breaks when sending Now, and system Tasks do not
// break when sending Now. We are currently sending C+N through using
Expand All @@ -617,7 +671,13 @@ func NewFluxCompiler(_ context.Context, query string, _ time.Time) (flux.Compile
// we are able to locate the root cause and use Flux Compiler for all
// Task types.
//
// This should be removed once we diagnose the problem.
// It turns out this is due to the exclusive nature of the stop time in
// Flux "from" and that we weren't including the left-hand boundary of
// the range check for notifications. We're shipping a fix soon in
//
// https://github.com/influxdata/influxdb/pull/19392
//
// Once this has merged, we can send Now again.
//
// Now: now,
}, nil
Expand Down
Loading

0 comments on commit 1ae2541

Please sign in to comment.