Skip to content

Commit

Permalink
feat: support push projection (#3001)
Browse files Browse the repository at this point in the history
Signed-off-by: Song Gao <[email protected]>
  • Loading branch information
Yisaer authored Jul 12, 2024
1 parent d660988 commit 2b506df
Show file tree
Hide file tree
Showing 8 changed files with 220 additions and 130 deletions.
6 changes: 4 additions & 2 deletions internal/server/rest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,8 +340,10 @@ func (suite *RestTestSuite) Test_rulesManageHandler() {
suite.r.ServeHTTP(w1, req1)
returnVal, _ = io.ReadAll(w1.Result().Body) //nolint
returnStr := string(returnVal)
expect = "{\"type\":\"ProjectPlan\",\"info\":\"Fields:[ * ]\",\"id\":0,\"children\":[1]}\n\n {\"type\":\"DataSourcePlan\",\"info\":\"StreamName: alert\",\"id\":1,\"children\":null}\n\n"
assert.Equal(suite.T(), expect, returnStr)
expect = `
{"type":"ProjectPlan","info":"Fields:[ * ]","id":0,"children":[1]}
{"type":"DataSourcePlan","info":"StreamName: alert","id":1}`
assert.Equal(suite.T(), strings.Trim(expect, "\n"), returnStr)

req1, _ = http.NewRequest(http.MethodGet, "http://localhost:8080/rules/rule32211/explain", bytes.NewBufferString("any"))
w1 = httptest.NewRecorder()
Expand Down
30 changes: 15 additions & 15 deletions internal/topo/planner/explainInfo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestDataSourcePlanExplainInfo(t *testing.T) {
"field3": {},
},
},
res: "{\"type\":\"DataSourcePlan\",\"info\":\"StreamName: test1, Fields:[ field1, field2, field3 ]\",\"id\":0,\"children\":null}\n",
res: "{\"type\":\"DataSourcePlan\",\"info\":\"StreamName: test1, Fields:[ field1, field2, field3 ]\",\"id\":0}\n",
t: "DataSourcePlan",
},
{
Expand Down Expand Up @@ -53,19 +53,19 @@ func TestDataSourcePlanExplainInfo(t *testing.T) {
"s3": {},
},
},
res: "{\"type\":\"DataSourcePlan\",\"info\":\"StreamName: test3, Fields:[ column1, column2, id ], StreamFields:[ s1, s2, s3 ]\",\"id\":2,\"children\":null}\n",
res: "{\"type\":\"DataSourcePlan\",\"info\":\"StreamName: test3, Fields:[ column1, column2, id ], StreamFields:[ s1, s2, s3 ]\",\"id\":2}\n",
t: "DataSourcePlan",
},
{
p: &DataSourcePlan{
name: "test4",
},
res: "{\"type\":\"DataSourcePlan\",\"info\":\"StreamName: test4\",\"id\":3,\"children\":null}\n",
res: "{\"type\":\"DataSourcePlan\",\"info\":\"StreamName: test4\",\"id\":3}\n",
t: "DataSourcePlan",
},
{
p: &DataSourcePlan{},
res: "{\"type\":\"DataSourcePlan\",\"info\":\"\",\"id\":4,\"children\":null}\n",
res: "{\"type\":\"DataSourcePlan\",\"info\":\"\",\"id\":4}\n",
t: "DataSourcePlan",
},
}
Expand Down Expand Up @@ -108,7 +108,7 @@ func TestAggregatePlanExplainInfo(t *testing.T) {
},
},
},
res: "{\"type\":\"AggregatePlan\",\"info\":\"Dimension:{ Call:{ name:lpad, args:[$$default.name, 1] } }\",\"id\":0,\"children\":null}\n",
res: "{\"type\":\"AggregatePlan\",\"info\":\"Dimension:{ Call:{ name:lpad, args:[$$default.name, 1] } }\",\"id\":0}\n",
t: "AggregatePlan",
},
{
Expand Down Expand Up @@ -139,7 +139,7 @@ func TestAggregatePlanExplainInfo(t *testing.T) {
ast.Dimension{Expr: &ast.FieldRef{Name: "department", StreamName: ast.DefaultStream}},
},
},
res: "{\"type\":\"AggregatePlan\",\"info\":\"Dimension:{ $$default.department }\",\"id\":0,\"children\":null}\n",
res: "{\"type\":\"AggregatePlan\",\"info\":\"Dimension:{ $$default.department }\",\"id\":0}\n",
t: "AggregatePlan",
},
}
Expand Down Expand Up @@ -190,7 +190,7 @@ func TestAnalyticFuncsPlanExplainInfo(t *testing.T) {
},
},
},
res: "{\"type\":\"AnalyticFuncsPlan\",\"info\":\"Funcs:[ Call:{ name:lag, args:[src1.temp] } ], FieldFuncs:[ Call:{ name:lag, args:[src1.name] }, Call:{ name:latest, args:[Call:{ name:lag, args:[src1.name] }] } ]\",\"id\":0,\"children\":null}\n",
res: "{\"type\":\"AnalyticFuncsPlan\",\"info\":\"Funcs:[ Call:{ name:lag, args:[src1.temp] } ], FieldFuncs:[ Call:{ name:lag, args:[src1.name] }, Call:{ name:latest, args:[Call:{ name:lag, args:[src1.name] }] } ]\",\"id\":0}\n",
t: "AnalyticFuncsPlan",
},
{
Expand Down Expand Up @@ -278,7 +278,7 @@ func TestFilterPlanExplainInfo(t *testing.T) {
RHS: &ast.StringLiteral{Val: "v1"},
},
},
res: "{\"type\":\"FilterPlan\",\"info\":\"Condition:{ binaryExpr:{ src1.name = v1 } }, \",\"id\":0,\"children\":null}\n",
res: "{\"type\":\"FilterPlan\",\"info\":\"Condition:{ binaryExpr:{ src1.name = v1 } }, \",\"id\":0}\n",
t: "FilterPlan",
},
{
Expand Down Expand Up @@ -336,7 +336,7 @@ func TestHavingPlanExplainInfo(t *testing.T) {
RHS: &ast.IntegerLiteral{Val: 2},
},
},
res: "{\"type\":\"HavingPlan\",\"info\":\"Condition:{ binaryExpr:{ Call:{ name:count, args:[*] } > 2 } }, \",\"id\":0,\"children\":null}\n",
res: "{\"type\":\"HavingPlan\",\"info\":\"Condition:{ binaryExpr:{ Call:{ name:count, args:[*] } > 2 } }, \",\"id\":0}\n",
t: "HavingPlan",
},
{
Expand Down Expand Up @@ -413,7 +413,7 @@ func TestJoinAlignPlanExplainInfo(t *testing.T) {
p: &JoinAlignPlan{
Emitters: []string{"tableInPlanner"},
},
res: "{\"type\":\"JoinAlignPlan\",\"info\":\"Emitters:[ tableInPlanner ]\",\"id\":0,\"children\":null}\n",
res: "{\"type\":\"JoinAlignPlan\",\"info\":\"Emitters:[ tableInPlanner ]\",\"id\":0}\n",
t: "JoinAlignPlan",
},
{
Expand Down Expand Up @@ -478,7 +478,7 @@ func TestJoinPlanExplainInfo(t *testing.T) {
},
}},
},
res: "{\"type\":\"JoinPlan\",\"info\":\"Joins:[ { joinType:INNER_JOIN, binaryExpr:{ binaryExpr:{ binaryExpr:{ src1.temp > 20 } OR binaryExpr:{ src2.hum > 60 } } AND binaryExpr:{ src1.id1 = src2.id2 } } } ]\",\"id\":0,\"children\":null}\n",
res: "{\"type\":\"JoinPlan\",\"info\":\"Joins:[ { joinType:INNER_JOIN, binaryExpr:{ binaryExpr:{ binaryExpr:{ src1.temp > 20 } OR binaryExpr:{ src2.hum > 60 } } AND binaryExpr:{ src1.id1 = src2.id2 } } } ]\",\"id\":0}\n",
t: "JoinPlan",
},
{
Expand Down Expand Up @@ -557,7 +557,7 @@ func TestLookupPlanExplainInfo(t *testing.T) {
},
},
},
res: "{\"type\":\"LookupPlan\",\"info\":\"Join:{ joinType:LEFT_JOIN, expr:binaryExpr:{ left.device_id = good.id } }\",\"id\":0,\"children\":null}\n",
res: "{\"type\":\"LookupPlan\",\"info\":\"Join:{ joinType:LEFT_JOIN, expr:binaryExpr:{ left.device_id = good.id } }\",\"id\":0}\n",
t: "LookupPlan",
},
{
Expand Down Expand Up @@ -907,7 +907,7 @@ func TestWatermarkPlanExplainInfo(t *testing.T) {
Emitters: []string{"id", "grade"},
SendWatermark: false,
},
res: "{\"type\":\"WatermarkPlan\",\"info\":\"Emitters:[ id, grade ], SendWatermark:false\",\"id\":0,\"children\":null}\n",
res: "{\"type\":\"WatermarkPlan\",\"info\":\"Emitters:[ id, grade ], SendWatermark:false\",\"id\":0}\n",
t: "WatermarkPlan",
},
{
Expand Down Expand Up @@ -969,7 +969,7 @@ func TestWindowFuncPlanExplainInfo(t *testing.T) {
},
},
},
res: "{\"type\":\"WindowFuncPlan\",\"info\":\"windowFuncFields:[ {name:AName, expr:}, {name:AName, expr:}, {name:Name}, {name:AName} ]\",\"id\":0,\"children\":null}\n",
res: "{\"type\":\"WindowFuncPlan\",\"info\":\"windowFuncFields:[ {name:AName, expr:}, {name:AName, expr:}, {name:Name}, {name:AName} ]\",\"id\":0}\n",
t: "WindowFuncPlan",
},
{
Expand Down Expand Up @@ -1015,7 +1015,7 @@ func TestOrderPlanExplainInfo(t *testing.T) {
p: &OrderPlan{
SortFields: []ast.SortField{{Uname: "name", Name: "name", Ascending: false, FieldExpr: &ast.FieldRef{Name: "name", StreamName: ast.DefaultStream}}},
},
res: "{\"type\":\"OrderPlan\",\"info\":\"SortFields:[ sortField:{ name:name, ascending:false, fieldExpr:{ $$default.name } } ]\",\"id\":0,\"children\":null}\n",
res: "{\"type\":\"OrderPlan\",\"info\":\"SortFields:[ sortField:{ name:name, ascending:false, fieldExpr:{ $$default.name } } ]\",\"id\":0}\n",
t: "OrderPlan",
},
{
Expand Down
2 changes: 1 addition & 1 deletion internal/topo/planner/logicalPlan.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type PlanExplainInfo struct {
T PlanType `json:"type"`
Info string `json:"info"`
ID int64 `json:"id"`
Children []int64 `json:"children"`
Children []int64 `json:"children,omitempty"`
}

func (p *baseLogicalPlan) Explain() string {
Expand Down
1 change: 1 addition & 0 deletions internal/topo/planner/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package planner
var optRuleList = []logicalOptRule{
&columnPruner{},
&predicatePushDown{},
&pushProjectionPlan{},
}

func optimize(p LogicalPlan) (LogicalPlan, error) {
Expand Down
12 changes: 9 additions & 3 deletions internal/topo/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package planner
import (
"errors"
"fmt"
"strings"
"time"

"github.com/lf-edge/ekuiper/v2/internal/conf"
Expand Down Expand Up @@ -147,6 +148,10 @@ func GetExplainInfoFromLogicalPlan(rule *def.Rule) (string, error) {
if err != nil {
return "", err
}
return ExplainFromLogicalPlan(lp, rule.Id)
}

func ExplainFromLogicalPlan(lp LogicalPlan, ruleID string) (string, error) {
var setId func(p LogicalPlan, id int64)
setId = func(p LogicalPlan, id int64) {
p.SetID(id)
Expand All @@ -162,15 +167,16 @@ func GetExplainInfoFromLogicalPlan(rule *def.Rule) (string, error) {
tmp := ""
res := ""
for i := 0; i < level; i++ {
tmp += " "
tmp += "\t"
}
p.BuildExplainInfo()
if info, ok := p.(RuleRuntimeInfo); ok {
info.BuildSchemaInfo(rule.Id)
info.BuildSchemaInfo(ruleID)
}
// Build the explainInfo of the current layer
res += tmp + p.Explain() + "\n"
res += tmp + strings.TrimSuffix(p.Explain(), "\n")
if len(p.Children()) != 0 {
res += "\n"
for _, v := range p.Children() {
res += tmp + getExplainInfo(v, level+1)
}
Expand Down
109 changes: 0 additions & 109 deletions internal/topo/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4863,112 +4863,3 @@ func TestTransformSourceNode(t *testing.T) {
})
}
}

func TestGetLogicalPlanForExplain(t *testing.T) {
kv, err := store.GetKV("stream")
if err != nil {
t.Error(err)
return
}
streamSqls := map[string]string{
"src1": `CREATE STREAM src1 (
id1 BIGINT,
temp BIGINT,
name string,
myarray array(string)
) WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`,
"src2": `CREATE STREAM src2 (
id2 BIGINT,
hum BIGINT
) WITH (DATASOURCE="src2", FORMAT="json", KEY="ts", TIMESTAMP_FORMAT="YYYY-MM-dd HH:mm:ss");`,
"tableInPlanner": `CREATE TABLE tableInPlanner (
id BIGINT,
name STRING,
value STRING,
hum BIGINT
) WITH (TYPE="file");`,
}
types := map[string]ast.StreamType{
"src1": ast.TypeStream,
"src2": ast.TypeStream,
"tableInPlanner": ast.TypeTable,
}
for name, sql := range streamSqls {
s, err := json.Marshal(&xsql.StreamInfo{
StreamType: types[name],
Statement: sql,
})
if err != nil {
t.Error(err)
t.Fail()
}
err = kv.Set(name, string(s))
if err != nil {
t.Error(err)
t.Fail()
}
}
streams := make(map[string]*ast.StreamStmt)
for n := range streamSqls {
streamStmt, err := xsql.GetDataSource(kv, n)
if err != nil {
t.Errorf("fail to get stream %s, please check if stream is created", n)
return
}
streams[n] = streamStmt
}

ref := &ast.AliasRef{
Expression: &ast.Call{
Name: "row_number",
FuncType: ast.FuncTypeWindow,
},
}
ref.SetRefSource([]string{})

tests := []struct {
rule *def.Rule
res string
err string
}{
{
rule: &def.Rule{
Triggered: false,
Id: "test",
Sql: "select name, row_number() as index from src1",
Actions: []map[string]interface{}{
{
"log": map[string]interface{}{},
},
},
Options: defaultOption,
},
res: "{\"type\":\"ProjectPlan\",\"info\":\"Fields:[ $$alias.index, src1.name ]\",\"id\":0,\"children\":[1]}\n\n {\"type\":\"WindowFuncPlan\",\"info\":\"windowFuncFields:[ {name:index, expr:$$alias.index} ]\",\"id\":1,\"children\":[2]}\n\n {\"type\":\"DataSourcePlan\",\"info\":\"StreamName: src1, StreamFields:[ name ]\",\"id\":2,\"children\":null}\n\n",
},
{
rule: &def.Rule{
Triggered: false,
Id: "test",
Sql: "select name, row_number() from src1",
Actions: []map[string]interface{}{
{
"log": map[string]interface{}{},
},
},
Options: defaultOption,
},
res: "{\"type\":\"ProjectPlan\",\"info\":\"Fields:[ src1.name, Call:{ name:row_number } ]\",\"id\":0,\"children\":[1]}\n\n {\"type\":\"WindowFuncPlan\",\"info\":\"windowFuncFields:[ {name:row_number, expr:Call:{ name:row_number }} ]\",\"id\":1,\"children\":[2]}\n\n {\"type\":\"DataSourcePlan\",\"info\":\"StreamName: src1, StreamFields:[ name ]\",\"id\":2,\"children\":null}\n\n",
},
}
fmt.Printf("The test bucket size is %d.\n\n", len(tests))

for i, tt := range tests {
explain, err := GetExplainInfoFromLogicalPlan(tt.rule)
if err != nil {
t.Errorf(err.Error())
}
if !reflect.DeepEqual(explain, tt.res) {
t.Errorf("case %d: expect validate %v but got %v", i, tt.res, explain)
}
}
}
Loading

0 comments on commit 2b506df

Please sign in to comment.