Skip to content

Commit

Permalink
feat(query): add a feature flag for query tracing (#19437)
Browse files Browse the repository at this point in the history
I reapplied this via cherry-pick after it was reverted by
the OSS storage split.
  • Loading branch information
Christopher M. Wolff committed Sep 9, 2020
1 parent 965ef89 commit 83b8fce
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 7 deletions.
6 changes: 0 additions & 6 deletions cmd/influxd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"os"
"time"

"github.com/influxdata/flux"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/cmd/influxd/launcher"
"github.com/influxdata/influxdb/v2/cmd/influxd/upgrade"
Expand Down Expand Up @@ -43,11 +42,6 @@ func main() {
},
)

// TODO: this should be removed in the future: https://github.com/influxdata/influxdb/issues/16220
if os.Getenv("QUERY_TRACING") == "1" {
flux.EnableExperimentalTracing()
}

if err := rootCmd.Execute(); err != nil {
os.Exit(1)
}
Expand Down
7 changes: 7 additions & 0 deletions flags.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,13 @@
contact: Query Team
lifetime: temporary

- name: Query Tracing
description: Turn on query tracing for queries that are sampled
key: queryTracing
default: false
contact: Query Team
lifetime: permanent

- name: Simple Task Options Extraction
description: Simplified task options extraction to avoid undefined functions when saving tasks
key: simpleTaskOptionsExtraction
Expand Down
16 changes: 16 additions & 0 deletions kit/feature/list.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions query/control/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/influxdata/flux/runtime"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/errors"
"github.com/influxdata/influxdb/v2/kit/feature"
"github.com/influxdata/influxdb/v2/kit/prom"
"github.com/influxdata/influxdb/v2/kit/tracing"
influxlogger "github.com/influxdata/influxdb/v2/logger"
Expand Down Expand Up @@ -208,6 +209,10 @@ func (c *Controller) Query(ctx context.Context, req *query.Request) (flux.Query,
for _, dep := range c.dependencies {
ctx = dep.Inject(ctx)
}
// Add per-transformation spans if the feature flag is set.
if feature.QueryTracing().Enabled(ctx) {
ctx = flux.WithExperimentalTracingEnabled(ctx)
}
q, err := c.query(ctx, req.Compiler)
if err != nil {
return q, err
Expand Down
100 changes: 100 additions & 0 deletions query/control/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@ import (
"github.com/influxdata/flux/plan"
"github.com/influxdata/flux/plan/plantest"
"github.com/influxdata/flux/stdlib/universe"
"github.com/influxdata/influxdb/v2/kit/feature"
pmock "github.com/influxdata/influxdb/v2/mock"
"github.com/influxdata/influxdb/v2/query"
_ "github.com/influxdata/influxdb/v2/query/builtin"
"github.com/influxdata/influxdb/v2/query/control"
"github.com/influxdata/influxdb/v2/query/stdlib/influxdata/influxdb"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/mocktracer"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"go.uber.org/zap/zaptest"
Expand Down Expand Up @@ -1289,6 +1293,102 @@ func TestController_ReserveMemoryWithoutExceedingMax(t *testing.T) {
validateUnusedMemory(t, reg, config)
}

func TestController_QueryTracing(t *testing.T) {
// temporarily install a mock tracer to see which spans are created.
oldTracer := opentracing.GlobalTracer()
defer opentracing.SetGlobalTracer(oldTracer)
mockTracer := mocktracer.New()
opentracing.SetGlobalTracer(mockTracer)

const memoryBytesQuotaPerQuery = 64
config := config
config.MemoryBytesQuotaPerQuery = memoryBytesQuotaPerQuery
ctrl, err := control.New(config)
if err != nil {
t.Fatal(err)
}
defer shutdown(t, ctrl)

flagger := pmock.NewFlagger(map[feature.Flag]interface{}{
feature.QueryTracing(): true,
})
plainCtx := context.Background()
withFlagger, err := feature.Annotate(plainCtx, flagger)
if err != nil {
t.Fatal(err)
}
tcs := []struct {
name string
ctx context.Context
doNotWantSpan string
wantSpan string
}{
{
name: "feature flag off",
ctx: plainCtx,
doNotWantSpan: "*executetest.AllocatingFromProcedureSpec",
},
{
name: "feature flag on",
ctx: withFlagger,
wantSpan: "*executetest.AllocatingFromProcedureSpec",
},
}
for _, tc := range tcs {
tc := tc
t.Run(tc.name, func(t *testing.T) {
mockTracer.Reset()

compiler := &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
// Return a program that will allocate one more byte than is allowed.
pts := plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreatePhysicalNode("allocating-from-test", &executetest.AllocatingFromProcedureSpec{
ByteCount: 16,
}),
plan.CreatePhysicalNode("yield", &universe.YieldProcedureSpec{Name: "_result"}),
},
Edges: [][2]int{
{0, 1},
},
Resources: flux.ResourceManagement{
ConcurrencyQuota: 1,
},
}

ps := plantest.CreatePlanSpec(&pts)
prog := &lang.Program{
Logger: zaptest.NewLogger(t),
PlanSpec: ps,
}

return prog, nil
},
}

// Depending on how the feature flag is set in the context,
// we may or may not do query tracing here.
q, err := ctrl.Query(tc.ctx, makeRequest(compiler))
if err != nil {
t.Fatalf("unexpected error: %s", err)
}

consumeResults(t, q)
gotSpans := make(map[string]struct{})
for _, span := range mockTracer.FinishedSpans() {
gotSpans[span.OperationName] = struct{}{}
}
if _, found := gotSpans[tc.doNotWantSpan]; tc.doNotWantSpan != "" && found {
t.Fatalf("did not want to find span %q but it was there", tc.doNotWantSpan)
}
if _, found := gotSpans[tc.wantSpan]; tc.wantSpan != "" && !found {
t.Fatalf("wanted to find span %q but it was not there", tc.wantSpan)
}
})
}
}

func consumeResults(tb testing.TB, q flux.Query) {
tb.Helper()
for res := range q.Results() {
Expand Down
2 changes: 1 addition & 1 deletion query/stdlib/influxdata/influxdb/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (s *Source) Run(ctx context.Context) {
labelValues := s.m.getLabelValues(ctx, s.orgID, s.op)
start := time.Now()
var err error
if flux.IsExperimentalTracingEnabled() {
if flux.IsExperimentalTracingEnabled(ctx) {
span, ctxWithSpan := tracing.StartSpanFromContextWithOperationName(ctx, "source-"+s.op)
err = s.runner.run(ctxWithSpan)
span.Finish()
Expand Down

0 comments on commit 83b8fce

Please sign in to comment.