Skip to content

Commit

Permalink
fix(func): analytic depending on alias must run alias firstly
Browse files Browse the repository at this point in the history
Signed-off-by: Jiyong Huang <[email protected]>
  • Loading branch information
ngjaying committed Nov 22, 2022
1 parent 6d3cbaa commit 5fa2a51
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 9 deletions.
25 changes: 25 additions & 0 deletions internal/topo/planner/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,31 @@ func decorateStmt(s *ast.SelectStatement, store kv.KeyValue) ([]*ast.StreamStmt,
walkErr = validate(s)
// Collect all analytic function calls so that we can let them run firstly
ast.WalkFunc(s, func(n ast.Node) bool {
switch f := n.(type) {
case ast.Fields:
return false
case *ast.Call:
if function.IsAnalyticFunc(f.Name) {
f.CachedField = fmt.Sprintf("%s_%s_%d", function.AnalyticPrefix, f.Name, f.FuncId)
f.Cached = true
analyticFuncs = append(analyticFuncs, &ast.Call{
Name: f.Name,
FuncId: f.FuncId,
FuncType: f.FuncType,
Args: f.Args,
CachedField: f.CachedField,
Partition: f.Partition,
})
}
}
return true
})
if walkErr != nil {
return nil, nil, walkErr
}
// walk sources at last to let them run firstly
// because other clause may depend on the alias defined here
ast.WalkFunc(s.Fields, func(n ast.Node) bool {
switch f := n.(type) {
case *ast.Call:
if function.IsAnalyticFunc(f.Name) {
Expand Down
28 changes: 19 additions & 9 deletions internal/topo/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1102,7 +1102,7 @@ func Test_createLogicalPlan(t *testing.T) {
sendMeta: false,
}.Init(),
}, { // 13 analytic function plan
sql: `SELECT lag(name), id1 FROM src1 WHERE lag(temp) > temp`,
sql: `SELECT latest(lag(name)), id1 FROM src1 WHERE lag(temp) > temp`,
p: ProjectPlan{
baseLogicalPlan: baseLogicalPlan{
children: []LogicalPlan{
Expand Down Expand Up @@ -1135,29 +1135,33 @@ func Test_createLogicalPlan(t *testing.T) {
},
funcs: []*ast.Call{
{
Name: "lag", FuncId: 0, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}},
}, {
Name: "lag",
FuncId: 1,
CachedField: "$$a_lag_1",
FuncId: 2,
CachedField: "$$a_lag_2",
Args: []ast.Expr{&ast.FieldRef{
Name: "temp",
StreamName: "src1",
}},
},
{
Name: "latest", FuncId: 1, CachedField: "$$a_latest_1", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.Call{Name: "lag", FuncId: 0, Cached: true, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}}}},
},
{
Name: "lag", FuncId: 0, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}},
},
},
}.Init(),
},
},
condition: &ast.BinaryExpr{
LHS: &ast.Call{
Name: "lag",
FuncId: 1,
FuncId: 2,
Args: []ast.Expr{&ast.FieldRef{
Name: "temp",
StreamName: "src1",
}},
CachedField: "$$a_lag_1",
CachedField: "$$a_lag_2",
Cached: true,
},
OP: ast.GT,
Expand All @@ -1171,8 +1175,14 @@ func Test_createLogicalPlan(t *testing.T) {
},
fields: []ast.Field{
{
Expr: &ast.Call{Name: "lag", FuncId: 0, FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}}, CachedField: "$$a_lag_0", Cached: true},
Name: "lag",
Expr: &ast.Call{
Name: "latest",
FuncId: 1,
Args: []ast.Expr{&ast.Call{Name: "lag", FuncId: 0, Cached: true, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}}}},
CachedField: "$$a_latest_1",
Cached: true,
},
Name: "latest",
}, {
Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
Name: "id1",
Expand Down

0 comments on commit 5fa2a51

Please sign in to comment.