Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pkger): add support for applying checks #16275

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
1. [16234](https://github.com/influxdata/influxdb/pull/16234): add support for notification endpoints to influx templates/pkgs.
2. [16242](https://github.com/influxdata/influxdb/pull/16242): drop id prefix for secret key requirement for notification endpoints
3. [16259](https://github.com/influxdata/influxdb/pull/16259): add support for check resource to pkger parser
4. [16262](https://github.com/influxdata/influxdb/pull/16262): add support for check resource dry run functionality
5. [16275](https://github.com/influxdata/influxdb/pull/16275): add support for check resource apply functionality

### Bug Fixes

Expand Down
4 changes: 2 additions & 2 deletions authz.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,15 +266,15 @@ func (p *Permission) Valid() error {
}
}

if p.Resource.OrgID != nil && !(*p.Resource.OrgID).Valid() {
if p.Resource.OrgID != nil && !p.Resource.OrgID.Valid() {
return &Error{
Code: EInvalid,
Err: ErrInvalidID,
Msg: "invalid org id for permission",
}
}

if p.Resource.ID != nil && !(*p.Resource.ID).Valid() {
if p.Resource.ID != nil && !p.Resource.ID.Valid() {
return &Error{
Code: EInvalid,
Err: ErrInvalidID,
Expand Down
17 changes: 17 additions & 0 deletions cmd/influx/pkg.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,23 @@ func (b *cmdPkgBuilder) printPkgDiff(diff pkger.Diff) {
})
}

if checks := diff.Checks; len(checks) > 0 {
headers := []string{"New", "ID", "Name", "Description"}
tablePrintFn("CHECKS", headers, len(checks), func(i int) []string {
c := checks[i]
var oldDesc string
if c.Old != nil {
oldDesc = c.Old.GetDescription()
}
return []string{
boolDiff(c.IsNew()),
c.ID.String(),
c.Name,
diffLn(c.IsNew(), oldDesc, c.New.GetDescription()),
}
})
}

if dashes := diff.Dashboards; len(dashes) > 0 {
headers := []string{"New", "Name", "Description", "Num Charts"}
tablePrintFn("DASHBOARDS", headers, len(dashes), func(i int) []string {
Expand Down
5 changes: 4 additions & 1 deletion cmd/influxd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -836,12 +836,15 @@ func (m *Launcher) run(ctx context.Context) (err error) {
var pkgSVC pkger.SVC
{
b := m.apibackend
authedOrgSVC := authorizer.NewOrgService(b.OrganizationService)
authedURMSVC := authorizer.NewURMService(b.OrgLookupService, b.UserResourceMappingService)
pkgSVC = pkger.NewService(
pkger.WithLogger(m.log.With(zap.String("service", "pkger"))),
pkger.WithBucketSVC(authorizer.NewBucketService(b.BucketService)),
pkger.WithCheckSVC(authorizer.NewCheckService(b.CheckService, authedURMSVC, authedOrgSVC)),
pkger.WithDashboardSVC(authorizer.NewDashboardService(b.DashboardService)),
pkger.WithLabelSVC(authorizer.NewLabelService(b.LabelService)),
pkger.WithNoticationEndpointSVC(authorizer.NewNotificationEndpointService(b.NotificationEndpointService, b.UserResourceMappingService, b.OrganizationService)),
pkger.WithNoticationEndpointSVC(authorizer.NewNotificationEndpointService(b.NotificationEndpointService, authedURMSVC, authedOrgSVC)),
pkger.WithSecretSVC(authorizer.NewSecretService(b.SecretService)),
pkger.WithTelegrafSVC(authorizer.NewTelegrafConfigService(b.TelegrafService, b.UserResourceMappingService)),
pkger.WithVariableSVC(authorizer.NewVariableService(b.VariableService)),
Expand Down
4 changes: 4 additions & 0 deletions cmd/influxd/launcher/launcher_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,10 @@ func (tl *TestLauncher) BucketService(tb testing.TB) *http.BucketService {
return &http.BucketService{Client: tl.HTTPClient(tb), OpPrefix: kv.OpPrefix}
}

func (tl *TestLauncher) CheckService() platform.CheckService {
return tl.kvService
}

func (tl *TestLauncher) DashboardService(tb testing.TB) *http.DashboardService {
tb.Helper()
return &http.DashboardService{Client: tl.HTTPClient(tb)}
Expand Down
69 changes: 65 additions & 4 deletions cmd/influxd/launcher/pkger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func TestLauncher_Pkger(t *testing.T) {
svc := pkger.NewService(
pkger.WithBucketSVC(l.BucketService(t)),
pkger.WithDashboardSVC(l.DashboardService(t)),
pkger.WithCheckSVC(l.CheckService()),
pkger.WithLabelSVC(&fakeLabelSVC{
LabelService: l.LabelService(t),
killCount: 2, // hits error on 3rd attempt at creating a mapping
Expand Down Expand Up @@ -92,9 +93,6 @@ func TestLauncher_Pkger(t *testing.T) {

hasLabelAssociations := func(t *testing.T, associations []pkger.SummaryLabel, numAss int, expectedNames ...string) {
t.Helper()

require.Len(t, associations, numAss)

hasAss := func(t *testing.T, expected string) {
t.Helper()
for _, ass := range associations {
Expand All @@ -105,6 +103,7 @@ func TestLauncher_Pkger(t *testing.T) {
require.FailNow(t, "did not find expected association: "+expected)
}

require.Len(t, associations, numAss)
for _, expected := range expectedNames {
hasAss(t, expected)
}
Expand Down Expand Up @@ -137,6 +136,11 @@ func TestLauncher_Pkger(t *testing.T) {
require.Len(t, diffVars, 1)
assert.True(t, diffVars[0].IsNew())

require.Len(t, diff.Checks, 2)
for _, ch := range diff.Checks {
assert.True(t, ch.IsNew())
}

require.Len(t, diff.Dashboards, 1)
require.Len(t, diff.NotificationEndpoints, 1)
require.Len(t, diff.Telegrafs, 1)
Expand All @@ -150,6 +154,13 @@ func TestLauncher_Pkger(t *testing.T) {
assert.Equal(t, "rucket_1", bkts[0].Name)
hasLabelAssociations(t, bkts[0].LabelAssociations, 1, "label_1")

checks := sum.Checks
require.Len(t, checks, 2)
for i, ch := range checks {
assert.Equal(t, fmt.Sprintf("check_%d", i), ch.Check.GetName())
hasLabelAssociations(t, ch.LabelAssociations, 1, "label_1")
}

dashs := sum.Dashboards
require.Len(t, dashs, 1)
assert.Equal(t, "dash_1", dashs[0].Name)
Expand Down Expand Up @@ -197,6 +208,14 @@ func TestLauncher_Pkger(t *testing.T) {
assert.Equal(t, "rucket_1", bkts[0].Name)
hasLabelAssociations(t, bkts[0].LabelAssociations, 1, "label_1")

checks := sum1.Checks
require.Len(t, checks, 2)
for i, ch := range checks {
assert.NotZero(t, ch.Check.GetID())
assert.Equal(t, fmt.Sprintf("check_%d", i), ch.Check.GetName())
hasLabelAssociations(t, ch.LabelAssociations, 1, "label_1")
}

dashs := sum1.Dashboards
require.Len(t, dashs, 1)
assert.NotZero(t, dashs[0].ID)
Expand Down Expand Up @@ -246,7 +265,7 @@ func TestLauncher_Pkger(t *testing.T) {
}

mappings := sum1.LabelMappings
require.Len(t, mappings, 5)
require.Len(t, mappings, 7)
hasMapping(t, mappings, newSumMapping(bkts[0].ID, bkts[0].Name, influxdb.BucketsResourceType))
hasMapping(t, mappings, newSumMapping(dashs[0].ID, dashs[0].Name, influxdb.DashboardsResourceType))
hasMapping(t, mappings, newSumMapping(vars[0].ID, vars[0].Name, influxdb.VariablesResourceType))
Expand Down Expand Up @@ -549,6 +568,48 @@ spec:
associations:
- kind: Label
name: label_1
- kind: Check_Threshold
name: check_0
every: 1m
query: >
from(bucket: "rucket_1")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r._measurement == "cpu")
|> filter(fn: (r) => r._field == "usage_idle")
|> aggregateWindow(every: 1m, fn: mean)
|> yield(name: "mean")
statusMessageTemplate: "Check: ${ r._check_name } is: ${ r._level }"
tags:
- key: tag_1
value: val_1
thresholds:
- type: inside_range
level: INfO
min: 30.0
max: 45.0
associations:
- kind: Label
name: label_1
- kind: Check_Deadman
name: check_1
description: desc_1
every: 5m
level: cRiT
offset: 10s
query: >
from(bucket: "rucket_1")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r._measurement == "cpu")
|> filter(fn: (r) => r._field == "usage_idle")
|> aggregateWindow(every: 1m, fn: mean)
|> yield(name: "mean")
reportZero: true
staleTime: 10m
statusMessageTemplate: "Check: ${ r._check_name } is: ${ r._level }"
timeSince: 90s
associations:
- kind: Label
name: label_1
`

const updatePkgYMLStr = `apiVersion: 0.1.0
Expand Down
10 changes: 10 additions & 0 deletions http/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,22 @@ import (
"strings"
"time"

"github.com/influxdata/influxdb/kit/tracing"
"go.uber.org/zap"
)

// Middleware constructor.
type Middleware func(http.Handler) http.Handler

func traceMW(next http.Handler) http.Handler {
fn := func(w http.ResponseWriter, r *http.Request) {
span, ctx := tracing.StartSpanFromContext(r.Context())
defer span.Finish()
next.ServeHTTP(w, r.WithContext(ctx))
}
return http.HandlerFunc(fn)
}

func skipOptionsMW(next http.Handler) http.Handler {
fn := func(w http.ResponseWriter, r *http.Request) {
if r.Method == "OPTIONS" {
Expand Down
10 changes: 0 additions & 10 deletions http/pkger_http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/go-chi/chi/middleware"
"github.com/influxdata/influxdb"
pctx "github.com/influxdata/influxdb/context"
"github.com/influxdata/influxdb/kit/tracing"
"github.com/influxdata/influxdb/pkg/httpc"
"github.com/influxdata/influxdb/pkger"
"go.uber.org/zap"
Expand Down Expand Up @@ -334,12 +333,3 @@ func newDecodeErr(encoding string, err error) *influxdb.Error {
Err: err,
}
}

func traceMW(next http.Handler) http.Handler {
fn := func(w http.ResponseWriter, r *http.Request) {
span, ctx := tracing.StartSpanFromContext(r.Context())
defer span.Finish()
next.ServeHTTP(w, r.WithContext(ctx))
}
return http.HandlerFunc(fn)
}
24 changes: 24 additions & 0 deletions http/swagger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7209,6 +7209,17 @@ components:
type: array
items:
$ref: "#/components/schemas/PkgSummaryLabel"
checks:
type: array
items:
allOf:
- $ref: "#/components/schemas/CheckDiscriminator"
- type: object
properties:
labelAssociations:
type: array
items:
$ref: "#/components/schemas/Label"
labels:
type: array
items:
Expand Down Expand Up @@ -7316,6 +7327,19 @@ components:
type: string
retentionRules:
$ref: "#/components/schemas/RetentionRules"
checks:
type: array
items:
type: object
properties:
id:
type: string
name:
type: string
new:
$ref: "#/components/schemas/CheckDiscriminator"
old:
$ref: "#/components/schemas/CheckDiscriminator"
dashboards:
type: array
items:
Expand Down
3 changes: 1 addition & 2 deletions notification/check/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ func (b Base) generateTaskOption() ast.Statement {
}

func (b Base) generateFluxASTCheckDefinition(checkType string) ast.Statement {
props := []*ast.Property{}
props = append(props, flux.Property("_check_id", flux.String(b.ID.String())))
props := append([]*ast.Property{}, flux.Property("_check_id", flux.String(b.ID.String())))
props = append(props, flux.Property("_check_name", flux.String(b.Name)))
props = append(props, flux.Property("_type", flux.String(checkType)))

Expand Down
3 changes: 1 addition & 2 deletions notification/check/deadman.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ func (c Deadman) generateFluxASTBody() []ast.Statement {
statements = append(statements, c.generateFluxASTCheckDefinition("deadman"))
statements = append(statements, c.generateLevelFn())
statements = append(statements, c.generateFluxASTMessageFunction())
statements = append(statements, c.generateFluxASTChecksFunction())
return statements
return append(statements, c.generateFluxASTChecksFunction())
}

func (c Deadman) generateLevelFn() ast.Statement {
Expand Down
38 changes: 26 additions & 12 deletions notification/check/threshold.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,16 @@ func (t Threshold) GenerateFluxAST() (*ast.Package, error) {
return nil, fmt.Errorf("expect a single file to be returned from query parsing got %d", len(p.Files))
}

fields := getFields(p)
if len(fields) != 1 {
return nil, fmt.Errorf("expected a single field but got: %s", fields)
}

f := p.Files[0]
assignPipelineToData(f)

f.Imports = append(f.Imports, flux.Imports("influxdata/influxdb/monitor", "influxdata/influxdb/v1")...)
f.Body = append(f.Body, t.generateFluxASTBody()...)
f.Body = append(f.Body, t.generateFluxASTBody(fields[0])...)

return p, nil
}
Expand All @@ -146,7 +151,7 @@ func (t Threshold) getSelectedField() (string, error) {
if kv.Key == "_field" && len(kv.Values) != 1 {
return "", fmt.Errorf("expect there to be a single field value in builder config")
}
if kv.Key == "_field" && len(kv.Values) == 1 {
if kv.Key == "_field" {
return kv.Values[0], nil
}
}
Expand Down Expand Up @@ -222,6 +227,22 @@ func removeAggregateWindow(pkg *ast.Package) {
})
}

func getFields(pkg *ast.Package) []string {
var fields []string
ast.Visit(pkg, func(n ast.Node) {
if fn, ok := n.(*ast.BinaryExpression); ok {
if me, ok := fn.Left.(*ast.MemberExpression); ok {
if me.Property.Key() == "_field" {
if str, ok := fn.Right.(*ast.StringLiteral); ok {
fields = append(fields, str.Value)
}
}
}
}
})
return fields
}

func assignPipelineToData(f *ast.File) error {
if len(f.Body) != 1 {
return fmt.Errorf("expected there to be a single statement in the flux script body, recieved %d", len(f.Body))
Expand Down Expand Up @@ -249,11 +270,11 @@ func assignPipelineToData(f *ast.File) error {
return nil
}

func (t Threshold) generateFluxASTBody() []ast.Statement {
func (t Threshold) generateFluxASTBody(field string) []ast.Statement {
var statements []ast.Statement
statements = append(statements, t.generateTaskOption())
statements = append(statements, t.generateFluxASTCheckDefinition("threshold"))
statements = append(statements, t.generateFluxASTThresholdFunctions()...)
statements = append(statements, t.generateFluxASTThresholdFunctions(field)...)
statements = append(statements, t.generateFluxASTMessageFunction())
statements = append(statements, t.generateFluxASTChecksFunction())
return statements
Expand All @@ -280,16 +301,9 @@ func (t Threshold) generateFluxASTChecksCall() *ast.CallExpression {
return flux.Call(flux.Member("monitor", "check"), flux.Object(objectProps...))
}

func (t Threshold) generateFluxASTThresholdFunctions() []ast.Statement {
func (t Threshold) generateFluxASTThresholdFunctions(field string) []ast.Statement {
thresholdStatements := make([]ast.Statement, len(t.Thresholds))

field, err := t.getSelectedField()
if err != nil {
// the error here should never happen since it should be validated before this
// function is ever called.
panic(err)
}

// This assumes that the ThresholdConfigs we've been provided do not have duplicates.
for k, v := range t.Thresholds {
thresholdStatements[k] = v.generateFluxASTThresholdFunction(field)
Expand Down
Loading