Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix a panic in multi-CN DEDUP join #20461

Merged
merged 4 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,225 changes: 722 additions & 503 deletions pkg/pb/pipeline/pipeline.pb.go

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions pkg/sql/compile/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,13 +560,13 @@ func dupOperator(sourceOp vm.Operator, index int, maxParallel int) vm.Operator {
op.Channel = t.Channel
op.NumCPU = uint64(maxParallel)
op.IsMerger = (index == 0)
op.Result = append(op.Result, t.Result...)
op.Result = t.Result
op.LeftTypes = t.LeftTypes
op.RightTypes = append(op.RightTypes, t.RightTypes...)
op.Conditions = append(op.Conditions, t.Conditions...)
op.RightTypes = t.RightTypes
op.Conditions = t.Conditions
op.IsShuffle = t.IsShuffle
op.ShuffleIdx = t.ShuffleIdx
op.RuntimeFilterSpecs = append(op.RuntimeFilterSpecs, t.RuntimeFilterSpecs...)
op.RuntimeFilterSpecs = t.RuntimeFilterSpecs
op.JoinMapTag = t.JoinMapTag
op.OnDuplicateAction = t.OnDuplicateAction
op.DedupColName = t.DedupColName
Expand Down
8 changes: 6 additions & 2 deletions pkg/sql/compile/remoterun.go
Original file line number Diff line number Diff line change
Expand Up @@ -809,7 +809,10 @@ func convertToPipelineInstruction(op vm.Operator, proc *process.Process, ctx *sc
RuntimeFilterSpec: t.RuntimeFilterSpec,
}
case *dedupjoin.DedupJoin:
relList, colList := getRelColList(t.Result)
in.DedupJoin = &pipeline.DedupJoin{
RelList: relList,
ColList: colList,
LeftCond: t.Conditions[0],
RightCond: t.Conditions[1],
RuntimeFilterBuildList: t.RuntimeFilterSpecs,
Expand Down Expand Up @@ -1341,6 +1344,9 @@ func convertToVmOperator(opr *pipeline.Instruction, ctx *scopeContext, eng engin
case vm.DedupJoin:
arg := dedupjoin.NewArgument()
t := opr.GetDedupJoin()
arg.Result = convertToResultPos(t.RelList, t.ColList)
arg.LeftTypes = convertToTypes(t.LeftTypes)
arg.RightTypes = convertToTypes(t.RightTypes)
arg.Conditions = [][]*plan.Expr{t.LeftCond, t.RightCond}
arg.RuntimeFilterSpecs = t.RuntimeFilterBuildList
arg.IsShuffle = t.IsShuffle
Expand All @@ -1349,8 +1355,6 @@ func convertToVmOperator(opr *pipeline.Instruction, ctx *scopeContext, eng engin
arg.OnDuplicateAction = t.OnDuplicateAction
arg.DedupColName = t.DedupColName
arg.DedupColTypes = t.DedupColTypes
arg.LeftTypes = convertToTypes(t.LeftTypes)
arg.RightTypes = convertToTypes(t.RightTypes)
arg.UpdateColIdxList = t.UpdateColIdxList
arg.UpdateColExprList = t.UpdateColExprList
op = arg
Expand Down
28 changes: 15 additions & 13 deletions proto/pipeline.proto
Original file line number Diff line number Diff line change
Expand Up @@ -356,19 +356,21 @@ message MarkJoin {
}

message DedupJoin {
repeated plan.Expr left_cond = 1;
repeated plan.Expr right_cond = 2;
repeated plan.RuntimeFilterSpec runtime_filter_build_list = 3;
bool is_shuffle = 4;
int32 join_map_tag = 5;
int32 shuffle_idx = 6;
plan.Node.OnDuplicateAction on_duplicate_action = 7;
string dedup_col_name = 8;
repeated plan.Type dedup_col_types = 9 [(gogoproto.nullable) = false];
repeated plan.Type left_types = 10 [(gogoproto.nullable) = false];
repeated plan.Type right_types = 11 [(gogoproto.nullable) = false];
repeated int32 update_col_idx_list = 12;
repeated plan.Expr update_col_expr_list = 13;
repeated int32 rel_list = 1;
repeated int32 col_list = 2;
repeated plan.Expr left_cond = 3;
repeated plan.Expr right_cond = 4;
repeated plan.RuntimeFilterSpec runtime_filter_build_list = 5;
bool is_shuffle = 6;
int32 join_map_tag = 7;
int32 shuffle_idx = 8;
plan.Node.OnDuplicateAction on_duplicate_action = 9;
string dedup_col_name = 10;
repeated plan.Type dedup_col_types = 11 [(gogoproto.nullable) = false];
repeated plan.Type left_types = 12 [(gogoproto.nullable) = false];
repeated plan.Type right_types = 13 [(gogoproto.nullable) = false];
repeated int32 update_col_idx_list = 14;
repeated plan.Expr update_col_expr_list = 15;
}

message Product {
Expand Down
9 changes: 9 additions & 0 deletions test/distributed/cases/dml/insert/insert_ignore.result
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,12 @@ c1 c2
45 45
55 55
222 222
create table insert_ignore_09(c1 int primary key, c2 int);
insert into insert_ignore_09 values(20,45),(21,55),(1,45),(6,22),(5,1),(1000,222),(99999,19);
insert ignore into insert_ignore_09 select result, result from generate_series(1,10000000) g;
select count(*) from insert_ignore_09;
count(*)
10000000
select count(*) from insert_ignore_09 where c1 != c2;
count(*)
7
6 changes: 6 additions & 0 deletions test/distributed/cases/dml/insert/insert_ignore.sql
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,9 @@ insert into insert_ignore_08 values(20,45),(21,55),(1,45),(6,22),(5,1),(1000,222
insert ignore into insert_ignore_08 select * from insert_ignore_07;
select count(*) from insert_ignore_08;
select * from insert_ignore_08 where c2 in (45,55,22,1,222,19);

create table insert_ignore_09(c1 int primary key, c2 int);
insert into insert_ignore_09 values(20,45),(21,55),(1,45),(6,22),(5,1),(1000,222),(99999,19);
insert ignore into insert_ignore_09 select result, result from generate_series(1,10000000) g;
select count(*) from insert_ignore_09;
select count(*) from insert_ignore_09 where c1 != c2;
Loading