Skip to content

Commit

Permalink
feat: flux query profiler (#19359)
Browse files Browse the repository at this point in the history
  • Loading branch information
ethanyzhang authored and Christopher Wolff committed Sep 11, 2020
1 parent 4aaecf9 commit 5c330a4
Showing 1 changed file with 190 additions and 1 deletion.
191 changes: 190 additions & 1 deletion cmd/influxd/launcher/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/execute/executetest"
"github.com/influxdata/flux/lang"
"github.com/influxdata/flux/memory"
"github.com/influxdata/flux/runtime"
"github.com/influxdata/flux/values"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/cmd/influxd/launcher"
Expand Down Expand Up @@ -221,7 +223,7 @@ func queryPoints(ctx context.Context, t *testing.T, l *launcher.TestLauncher, op
if d.verbose {
t.Logf("query:\n%s", qs)
}
pkg, err := flux.Parse(qs)
pkg, err := runtime.ParseToJSON(qs)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -751,6 +753,193 @@ from(bucket: "%s")
}
}

type TestQueryProfiler struct{
start int64
}

func (s TestQueryProfiler) Name() string {
return fmt.Sprintf("query%d", s.start)
}

func (s TestQueryProfiler) GetResult(q flux.Query, alloc *memory.Allocator) (flux.Table, error) {
groupKey := execute.NewGroupKey(
[]flux.ColMeta{
{
Label: "_measurement",
Type: flux.TString,
},
},
[]values.Value{
values.NewString(fmt.Sprintf("profiler/query%d", s.start)),
},
)
b := execute.NewColListTableBuilder(groupKey, alloc)
colMeta := []flux.ColMeta{
{
Label: "_measurement",
Type: flux.TString,
},
{
Label: "TotalDuration",
Type: flux.TInt,
},
{
Label: "CompileDuration",
Type: flux.TInt,
},
{
Label: "QueueDuration",
Type: flux.TInt,
},
{
Label: "PlanDuration",
Type: flux.TInt,
},
{
Label: "RequeueDuration",
Type: flux.TInt,
},
{
Label: "ExecuteDuration",
Type: flux.TInt,
},
{
Label: "Concurrency",
Type: flux.TInt,
},
{
Label: "MaxAllocated",
Type: flux.TInt,
},
{
Label: "TotalAllocated",
Type: flux.TInt,
},
{
Label: "RuntimeErrors",
Type: flux.TString,
},
{
Label: "influxdb/scanned-bytes",
Type: flux.TInt,
},
{
Label: "influxdb/scanned-values",
Type: flux.TInt,
},
{
Label: "flux/query-plan",
Type: flux.TString,
},
}
colData := []interface{} {
fmt.Sprintf("profiler/query%d", s.start),
s.start,
s.start + 1,
s.start + 2,
s.start + 3,
s.start + 4,
s.start + 5,
s.start + 6,
s.start + 7,
s.start + 8,
"error1\nerror2",
s.start + 9,
s.start + 10,
"query plan",
}
for _, col := range colMeta {
if _, err := b.AddCol(col); err != nil {
return nil, err
}
}
for i := 0; i < len(colData); i++ {
if intValue, ok := colData[i].(int64); ok {
b.AppendInt(i, intValue)
} else {
b.AppendString(i, colData[i].(string))
}
}
tbl, err := b.Table()
if err != nil {
return nil, err
}
return tbl, nil
}

func TestFluxProfiler(t *testing.T) {
testcases := []struct {
name string
data []string
query string
want string
}{
{
name: "range last single point start time",
data: []string{
"m,tag=a f=1i 1",
},
query: `
option profiler.enabledProfilers = ["query0", "query100", "query100", "NonExistentProfiler"]
from(bucket: v.bucket)
|> range(start: 1970-01-01T00:00:00.000000001Z, stop: 1970-01-01T01:00:00Z)
|> last()
`,
want: `
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string
#group,false,false,true,true,false,false,true,true,true
#default,_result,,,,,,,,
,result,table,_start,_stop,_time,_value,_field,_measurement,tag
,,0,1970-01-01T00:00:00.000000001Z,1970-01-01T01:00:00Z,1970-01-01T00:00:00.000000001Z,1,f,m,a
#datatype,string,long,string,long,long,long,long,long,long,long,long,long,string,string,long,long
#group,false,false,true,false,false,false,false,false,false,false,false,false,false,false,false,false
#default,_profiler,,,,,,,,,,,,,,,
,result,table,_measurement,TotalDuration,CompileDuration,QueueDuration,PlanDuration,RequeueDuration,ExecuteDuration,Concurrency,MaxAllocated,TotalAllocated,RuntimeErrors,flux/query-plan,influxdb/scanned-bytes,influxdb/scanned-values
,,0,profiler/query0,0,1,2,3,4,5,6,7,8,"error1
error2","query plan",9,10
,,1,profiler/query100,100,101,102,103,104,105,106,107,108,"error1
error2","query plan",109,110
`,
},
}
execute.RegisterProfilers(&TestQueryProfiler{}, &TestQueryProfiler{start: 100})
for _, tc := range testcases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
l := launcher.RunTestLauncherOrFail(t, ctx, nil)

l.SetupOrFail(t)
defer l.ShutdownOrFail(t, ctx)

l.WritePointsOrFail(t, strings.Join(tc.data, "\n"))

queryStr := "import \"profiler\"\nv = {bucket: " + "\"" + l.Bucket.Name + "\"" + "}\n" + tc.query
req := &query.Request{
Authorization: l.Auth,
OrganizationID: l.Org.ID,
Compiler: lang.FluxCompiler{
Query: queryStr,
},
}
if got, err := l.FluxQueryService().Query(ctx, req); err != nil {
t.Error(err)
} else {
dec := csv.NewMultiResultDecoder(csv.ResultDecoderConfig{})
want, err := dec.Decode(ioutil.NopCloser(strings.NewReader(tc.want)))
if err != nil {
t.Fatal(err)
}
defer want.Release()

if err := executetest.EqualResultIterators(want, got); err != nil {
t.Fatal(err)
}
}
})
}
}

func TestQueryPushDowns(t *testing.T) {
t.Skip("Not supported yet")
testcases := []struct {
Expand Down

0 comments on commit 5c330a4

Please sign in to comment.