Skip to content

Commit

Permalink
fix a panic in multi-CN DEDUP join (#20461)
Browse files Browse the repository at this point in the history
include dedupjoin.Result in protobuf message

Approved by: @ouyuanning, @badboynt1, @m-schen, @heni02
  • Loading branch information
aunjgr authored Nov 29, 2024
1 parent 084d4fc commit 83bc719
Show file tree
Hide file tree
Showing 7 changed files with 790 additions and 548 deletions.
1,225 changes: 722 additions & 503 deletions pkg/pb/pipeline/pipeline.pb.go

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions pkg/sql/compile/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,13 +560,13 @@ func dupOperator(sourceOp vm.Operator, index int, maxParallel int) vm.Operator {
op.Channel = t.Channel
op.NumCPU = uint64(maxParallel)
op.IsMerger = (index == 0)
op.Result = append(op.Result, t.Result...)
op.Result = t.Result
op.LeftTypes = t.LeftTypes
op.RightTypes = append(op.RightTypes, t.RightTypes...)
op.Conditions = append(op.Conditions, t.Conditions...)
op.RightTypes = t.RightTypes
op.Conditions = t.Conditions
op.IsShuffle = t.IsShuffle
op.ShuffleIdx = t.ShuffleIdx
op.RuntimeFilterSpecs = append(op.RuntimeFilterSpecs, t.RuntimeFilterSpecs...)
op.RuntimeFilterSpecs = t.RuntimeFilterSpecs
op.JoinMapTag = t.JoinMapTag
op.OnDuplicateAction = t.OnDuplicateAction
op.DedupColName = t.DedupColName
Expand Down
8 changes: 6 additions & 2 deletions pkg/sql/compile/remoterun.go
Original file line number Diff line number Diff line change
Expand Up @@ -809,7 +809,10 @@ func convertToPipelineInstruction(op vm.Operator, proc *process.Process, ctx *sc
RuntimeFilterSpec: t.RuntimeFilterSpec,
}
case *dedupjoin.DedupJoin:
relList, colList := getRelColList(t.Result)
in.DedupJoin = &pipeline.DedupJoin{
RelList: relList,
ColList: colList,
LeftCond: t.Conditions[0],
RightCond: t.Conditions[1],
RuntimeFilterBuildList: t.RuntimeFilterSpecs,
Expand Down Expand Up @@ -1341,6 +1344,9 @@ func convertToVmOperator(opr *pipeline.Instruction, ctx *scopeContext, eng engin
case vm.DedupJoin:
arg := dedupjoin.NewArgument()
t := opr.GetDedupJoin()
arg.Result = convertToResultPos(t.RelList, t.ColList)
arg.LeftTypes = convertToTypes(t.LeftTypes)
arg.RightTypes = convertToTypes(t.RightTypes)
arg.Conditions = [][]*plan.Expr{t.LeftCond, t.RightCond}
arg.RuntimeFilterSpecs = t.RuntimeFilterBuildList
arg.IsShuffle = t.IsShuffle
Expand All @@ -1349,8 +1355,6 @@ func convertToVmOperator(opr *pipeline.Instruction, ctx *scopeContext, eng engin
arg.OnDuplicateAction = t.OnDuplicateAction
arg.DedupColName = t.DedupColName
arg.DedupColTypes = t.DedupColTypes
arg.LeftTypes = convertToTypes(t.LeftTypes)
arg.RightTypes = convertToTypes(t.RightTypes)
arg.UpdateColIdxList = t.UpdateColIdxList
arg.UpdateColExprList = t.UpdateColExprList
op = arg
Expand Down
54 changes: 28 additions & 26 deletions pkg/sql/compile/remoterun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,45 +19,33 @@ import (
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/google/uuid"
"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/common/morpc"
"github.com/matrixorigin/matrixone/pkg/pb/txn"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/connector"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/value_scan"
"github.com/matrixorigin/matrixone/pkg/testutil"
"github.com/matrixorigin/matrixone/pkg/txn/client"

"github.com/matrixorigin/matrixone/pkg/sql/colexec"

"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/common/reuse"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/defines"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/aggexec"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/apply"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/hashbuild"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/indexbuild"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/postdml"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/shufflebuild"

"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/common/reuse"

"github.com/golang/mock/gomock"
"github.com/google/uuid"
"github.com/stretchr/testify/require"

"github.com/matrixorigin/matrixone/pkg/sql/colexec/source"

mock_frontend "github.com/matrixorigin/matrixone/pkg/frontend/test"
"github.com/matrixorigin/matrixone/pkg/pb/pipeline"
"github.com/matrixorigin/matrixone/pkg/pb/txn"
"github.com/matrixorigin/matrixone/pkg/sql/colexec"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/aggexec"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/anti"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/apply"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/connector"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/dedupjoin"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/deletion"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/dispatch"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/external"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/filter"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/group"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/hashbuild"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/indexbuild"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/insert"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/intersect"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/intersectall"
Expand All @@ -75,6 +63,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/sql/colexec/offset"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/onduplicatekey"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/order"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/postdml"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/preinsert"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/preinsertunique"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/product"
Expand All @@ -84,12 +73,18 @@ import (
"github.com/matrixorigin/matrixone/pkg/sql/colexec/rightsemi"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/semi"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/shuffle"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/shufflebuild"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/single"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/source"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/table_function"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/top"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/value_scan"
"github.com/matrixorigin/matrixone/pkg/sql/plan"
"github.com/matrixorigin/matrixone/pkg/testutil"
"github.com/matrixorigin/matrixone/pkg/txn/client"
"github.com/matrixorigin/matrixone/pkg/vm"
"github.com/matrixorigin/matrixone/pkg/vm/process"
"github.com/stretchr/testify/require"
)

func Test_EncodeProcessInfo(t *testing.T) {
Expand Down Expand Up @@ -245,7 +240,13 @@ func Test_convertToPipelineInstruction(t *testing.T) {
&source.Source{},
&apply.Apply{TableFunction: &table_function.TableFunction{}},
&postdml.PostDml{
PostDmlCtx: &postdml.PostDmlCtx{FullText: &postdml.PostDmlFullTextCtx{}}},
PostDmlCtx: &postdml.PostDmlCtx{
FullText: &postdml.PostDmlFullTextCtx{},
},
},
&dedupjoin.DedupJoin{
Conditions: [][]*plan.Expr{nil, nil},
},
}
ctx := &scopeContext{
id: 1,
Expand Down Expand Up @@ -321,6 +322,7 @@ func Test_convertToVmInstruction(t *testing.T) {
{Op: int32(vm.IndexBuild), IndexBuild: &pipeline.Indexbuild{}},
{Op: int32(vm.Apply), Apply: &pipeline.Apply{}, TableFunction: &pipeline.TableFunction{}},
{Op: int32(vm.PostDml), PostDml: &pipeline.PostDml{}},
{Op: int32(vm.DedupJoin), DedupJoin: &pipeline.DedupJoin{}},
}
for _, instruction := range instructions {
_, err := convertToVmOperator(instruction, ctx, nil)
Expand Down
28 changes: 15 additions & 13 deletions proto/pipeline.proto
Original file line number Diff line number Diff line change
Expand Up @@ -356,19 +356,21 @@ message MarkJoin {
}

message DedupJoin {
repeated plan.Expr left_cond = 1;
repeated plan.Expr right_cond = 2;
repeated plan.RuntimeFilterSpec runtime_filter_build_list = 3;
bool is_shuffle = 4;
int32 join_map_tag = 5;
int32 shuffle_idx = 6;
plan.Node.OnDuplicateAction on_duplicate_action = 7;
string dedup_col_name = 8;
repeated plan.Type dedup_col_types = 9 [(gogoproto.nullable) = false];
repeated plan.Type left_types = 10 [(gogoproto.nullable) = false];
repeated plan.Type right_types = 11 [(gogoproto.nullable) = false];
repeated int32 update_col_idx_list = 12;
repeated plan.Expr update_col_expr_list = 13;
repeated int32 rel_list = 1;
repeated int32 col_list = 2;
repeated plan.Expr left_cond = 3;
repeated plan.Expr right_cond = 4;
repeated plan.RuntimeFilterSpec runtime_filter_build_list = 5;
bool is_shuffle = 6;
int32 join_map_tag = 7;
int32 shuffle_idx = 8;
plan.Node.OnDuplicateAction on_duplicate_action = 9;
string dedup_col_name = 10;
repeated plan.Type dedup_col_types = 11 [(gogoproto.nullable) = false];
repeated plan.Type left_types = 12 [(gogoproto.nullable) = false];
repeated plan.Type right_types = 13 [(gogoproto.nullable) = false];
repeated int32 update_col_idx_list = 14;
repeated plan.Expr update_col_expr_list = 15;
}

message Product {
Expand Down
9 changes: 9 additions & 0 deletions test/distributed/cases/dml/insert/insert_ignore.result
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,12 @@ c1 c2
45 45
55 55
222 222
create table insert_ignore_09(c1 int primary key, c2 int);
insert into insert_ignore_09 values(20,45),(21,55),(1,45),(6,22),(5,1),(1000,222),(99999,19);
insert ignore into insert_ignore_09 select result, result from generate_series(1,10000000) g;
select count(*) from insert_ignore_09;
count(*)
10000000
select count(*) from insert_ignore_09 where c1 != c2;
count(*)
7
6 changes: 6 additions & 0 deletions test/distributed/cases/dml/insert/insert_ignore.sql
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,9 @@ insert into insert_ignore_08 values(20,45),(21,55),(1,45),(6,22),(5,1),(1000,222
insert ignore into insert_ignore_08 select * from insert_ignore_07;
select count(*) from insert_ignore_08;
select * from insert_ignore_08 where c2 in (45,55,22,1,222,19);

create table insert_ignore_09(c1 int primary key, c2 int);
insert into insert_ignore_09 values(20,45),(21,55),(1,45),(6,22),(5,1),(1000,222),(99999,19);
insert ignore into insert_ignore_09 select result, result from generate_series(1,10000000) g;
select count(*) from insert_ignore_09;
select count(*) from insert_ignore_09 where c1 != c2;

0 comments on commit 83bc719

Please sign in to comment.