Skip to content

Commit

Permalink
Merge branch '2.0-dev' into skip_partition_shard_embed_tests_2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
sukki37 authored Nov 29, 2024
2 parents 1b53195 + e0a261c commit 529dfa2
Show file tree
Hide file tree
Showing 14 changed files with 889 additions and 565 deletions.
10 changes: 10 additions & 0 deletions pkg/lockservice/lock_table_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func (l *localLockTable) doLock(
c *lockContext,
blocked bool) {
var old *waiter
var oldOffset int
var err error
table := l.bind.Table
for {
Expand Down Expand Up @@ -129,6 +130,14 @@ func (l *localLockTable) doLock(
return
}

if oldOffset != c.offset {
if old != nil {
old.disableNotify()
old.close("doLock, lock next row", l.logger)
}
c.txn.clearBlocked(old, l.logger)
}

// we handle remote lock on current rpc io read goroutine, so we can not wait here, otherwise
// the rpc will be blocked.
if c.opts.async {
Expand All @@ -139,6 +148,7 @@ func (l *localLockTable) doLock(
// txn is locked by service.lock or service_remote.lock. We must unlock here. And lock again after
// wait result. Because during wait, deadlock detection may be triggered, and need call txn.fetchWhoWaitingMe,
// or other concurrent txn method.
oldOffset = c.offset
oldTxnID := c.txn.txnID
old = c.w
c.txn.Unlock()
Expand Down
76 changes: 76 additions & 0 deletions pkg/lockservice/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3468,6 +3468,82 @@ func TestLockResultWithConflictAndTxnAborted(t *testing.T) {
)
}

func TestIssue19913(t *testing.T) {
runLockServiceTests(
t,
[]string{"s1"},
func(alloc *lockTableAllocator, s []*service) {
err := os.Setenv("mo_reuse_enable_checker", "true")
require.NoError(t, err)
l1 := s[0]

ctx, cancel := context.WithTimeout(
context.Background(),
time.Second*10)
defer cancel()
option := pb.LockOptions{
Granularity: pb.Granularity_Row,
Mode: pb.LockMode_Exclusive,
Policy: pb.WaitPolicy_Wait,
}

_, err = l1.Lock(
ctx,
0,
[][]byte{{1}},
[]byte("txn1"),
option)
require.NoError(t, err)

_, err = l1.Lock(
ctx,
0,
[][]byte{{2}},
[]byte("txn2"),
option)
require.NoError(t, err)

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
// blocked by txn1
_, err := l1.Lock(
ctx,
0,
newTestRows(1, 2),
[]byte("txn3"),
option)
require.NoError(t, err)
}()

waitWaiters(t, l1, 0, []byte{1}, 1)

w := l1.activeTxnHolder.getActiveTxn([]byte("txn3"), false, "").blockedWaiters[0]

require.NoError(t, l1.Unlock(
ctx,
[]byte("txn1"),
timestamp.Timestamp{}))

waitWaiters(t, l1, 0, []byte{2}, 1)

require.NoError(t, l1.Unlock(
ctx,
[]byte("txn2"),
timestamp.Timestamp{}))
wg.Wait()

require.NoError(t, l1.Unlock(
ctx,
[]byte("txn3"),
timestamp.Timestamp{}))

require.Less(t, w.refCount.Load(), int32(2))
},
)
}

func TestRowLockWithConflictAndUnlock(t *testing.T) {
table := uint64(0)
getRunner(false)(
Expand Down
1,224 changes: 722 additions & 502 deletions pkg/pb/pipeline/pipeline.pb.go

Large diffs are not rendered by default.

7 changes: 3 additions & 4 deletions pkg/sql/colexec/lockop/lock_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -1014,10 +1014,6 @@ func lockTalbeIfLockCountIsZero(
}
for idx := 0; idx < len(lockOp.targets); idx++ {
target := lockOp.targets[idx]
// do not lock table or rows at the end for hidden table
if !target.lockTableAtTheEnd {
continue
}
if target.lockRows != nil {
vec, free, err := colexec.GetReadonlyResultFromNoColumnExpression(proc, target.lockRows)
if err != nil {
Expand All @@ -1039,6 +1035,9 @@ func lockTalbeIfLockCountIsZero(
return err
}
} else {
if !target.lockTableAtTheEnd {
continue
}
err := LockTable(lockOp.engine, proc, target.tableID, target.primaryColumnType, false)
if err != nil {
return err
Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/compile/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,13 +561,13 @@ func dupOperator(sourceOp vm.Operator, index int, maxParallel int) vm.Operator {
op.Channel = t.Channel
op.NumCPU = uint64(maxParallel)
op.IsMerger = (index == 0)
op.Result = append(op.Result, t.Result...)
op.Result = t.Result
op.LeftTypes = t.LeftTypes
op.RightTypes = append(op.RightTypes, t.RightTypes...)
op.Conditions = append(op.Conditions, t.Conditions...)
op.RightTypes = t.RightTypes
op.Conditions = t.Conditions
op.IsShuffle = t.IsShuffle
op.ShuffleIdx = t.ShuffleIdx
op.RuntimeFilterSpecs = append(op.RuntimeFilterSpecs, t.RuntimeFilterSpecs...)
op.RuntimeFilterSpecs = t.RuntimeFilterSpecs
op.JoinMapTag = t.JoinMapTag
op.OnDuplicateAction = t.OnDuplicateAction
op.DedupColName = t.DedupColName
Expand Down
8 changes: 6 additions & 2 deletions pkg/sql/compile/remoterun.go
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,10 @@ func convertToPipelineInstruction(op vm.Operator, proc *process.Process, ctx *sc
RuntimeFilterSpec: t.RuntimeFilterSpec,
}
case *dedupjoin.DedupJoin:
relList, colList := getRelColList(t.Result)
in.DedupJoin = &pipeline.DedupJoin{
RelList: relList,
ColList: colList,
LeftCond: t.Conditions[0],
RightCond: t.Conditions[1],
RuntimeFilterBuildList: t.RuntimeFilterSpecs,
Expand Down Expand Up @@ -1334,6 +1337,9 @@ func convertToVmOperator(opr *pipeline.Instruction, ctx *scopeContext, eng engin
case vm.DedupJoin:
arg := dedupjoin.NewArgument()
t := opr.GetDedupJoin()
arg.Result = convertToResultPos(t.RelList, t.ColList)
arg.LeftTypes = convertToTypes(t.LeftTypes)
arg.RightTypes = convertToTypes(t.RightTypes)
arg.Conditions = [][]*plan.Expr{t.LeftCond, t.RightCond}
arg.RuntimeFilterSpecs = t.RuntimeFilterBuildList
arg.IsShuffle = t.IsShuffle
Expand All @@ -1342,8 +1348,6 @@ func convertToVmOperator(opr *pipeline.Instruction, ctx *scopeContext, eng engin
arg.OnDuplicateAction = t.OnDuplicateAction
arg.DedupColName = t.DedupColName
arg.DedupColTypes = t.DedupColTypes
arg.LeftTypes = convertToTypes(t.LeftTypes)
arg.RightTypes = convertToTypes(t.RightTypes)
arg.UpdateColIdxList = t.UpdateColIdxList
arg.UpdateColExprList = t.UpdateColExprList
op = arg
Expand Down
54 changes: 28 additions & 26 deletions pkg/sql/compile/remoterun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,45 +19,33 @@ import (
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/google/uuid"
"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/common/morpc"
"github.com/matrixorigin/matrixone/pkg/pb/txn"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/connector"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/value_scan"
"github.com/matrixorigin/matrixone/pkg/testutil"
"github.com/matrixorigin/matrixone/pkg/txn/client"

"github.com/matrixorigin/matrixone/pkg/sql/colexec"

"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/common/reuse"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/defines"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/aggexec"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/apply"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/hashbuild"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/indexbuild"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/postdml"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/shufflebuild"

"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/common/reuse"

"github.com/golang/mock/gomock"
"github.com/google/uuid"
"github.com/stretchr/testify/require"

"github.com/matrixorigin/matrixone/pkg/sql/colexec/source"

mock_frontend "github.com/matrixorigin/matrixone/pkg/frontend/test"
"github.com/matrixorigin/matrixone/pkg/pb/pipeline"
"github.com/matrixorigin/matrixone/pkg/pb/txn"
"github.com/matrixorigin/matrixone/pkg/sql/colexec"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/aggexec"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/anti"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/apply"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/connector"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/dedupjoin"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/deletion"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/dispatch"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/external"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/filter"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/group"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/hashbuild"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/indexbuild"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/insert"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/intersect"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/intersectall"
Expand All @@ -75,6 +63,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/sql/colexec/offset"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/onduplicatekey"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/order"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/postdml"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/preinsert"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/preinsertunique"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/product"
Expand All @@ -84,12 +73,18 @@ import (
"github.com/matrixorigin/matrixone/pkg/sql/colexec/rightsemi"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/semi"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/shuffle"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/shufflebuild"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/single"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/source"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/table_function"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/top"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/value_scan"
"github.com/matrixorigin/matrixone/pkg/sql/plan"
"github.com/matrixorigin/matrixone/pkg/testutil"
"github.com/matrixorigin/matrixone/pkg/txn/client"
"github.com/matrixorigin/matrixone/pkg/vm"
"github.com/matrixorigin/matrixone/pkg/vm/process"
"github.com/stretchr/testify/require"
)

func Test_EncodeProcessInfo(t *testing.T) {
Expand Down Expand Up @@ -245,7 +240,13 @@ func Test_convertToPipelineInstruction(t *testing.T) {
&source.Source{},
&apply.Apply{TableFunction: &table_function.TableFunction{}},
&postdml.PostDml{
PostDmlCtx: &postdml.PostDmlCtx{FullText: &postdml.PostDmlFullTextCtx{}}},
PostDmlCtx: &postdml.PostDmlCtx{
FullText: &postdml.PostDmlFullTextCtx{},
},
},
&dedupjoin.DedupJoin{
Conditions: [][]*plan.Expr{nil, nil},
},
}
ctx := &scopeContext{
id: 1,
Expand Down Expand Up @@ -321,6 +322,7 @@ func Test_convertToVmInstruction(t *testing.T) {
{Op: int32(vm.IndexBuild), IndexBuild: &pipeline.Indexbuild{}},
{Op: int32(vm.Apply), Apply: &pipeline.Apply{}, TableFunction: &pipeline.TableFunction{}},
{Op: int32(vm.PostDml), PostDml: &pipeline.PostDml{}},
{Op: int32(vm.DedupJoin), DedupJoin: &pipeline.DedupJoin{}},
}
for _, instruction := range instructions {
_, err := convertToVmOperator(instruction, ctx, nil)
Expand Down
5 changes: 1 addition & 4 deletions pkg/sql/plan/opt_misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1155,9 +1155,6 @@ func (builder *QueryBuilder) lockTableIfLockNoRowsAtTheEndForDelAndUpdate() (err
return
}
tableDef := baseNode.TableDef
if !getLockTableAtTheEnd(tableDef) {
return
}
objRef := baseNode.ObjRef
tableIDs := make(map[uint64]bool)
tableIDs[tableDef.TblId] = true
Expand Down Expand Up @@ -1229,7 +1226,7 @@ func (builder *QueryBuilder) lockTableIfLockNoRowsAtTheEndForDelAndUpdate() (err
}

lockTarget.LockRows = lockRows
lockTarget.LockTableAtTheEnd = true
lockTarget.LockTableAtTheEnd = false
}

return
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/plan/query_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2709,7 +2709,6 @@ func (builder *QueryBuilder) bindSelect(stmt *tree.Select, ctx *BindContext, isR
PrimaryColTyp: pkTyp,
Block: true,
RefreshTsIdxInBat: -1, //unsupport now
LockTableAtTheEnd: getLockTableAtTheEnd(tableDef),
}
if tableDef.Partition != nil {
partTableIDs, _ := getPartTableIdsAndNames(builder.compCtx, objRef, tableDef)
Expand Down
17 changes: 9 additions & 8 deletions pkg/sql/plan/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2711,11 +2711,12 @@ func offsetToString(offset int) string {
return fmt.Sprintf("+%02d:%02d", hours, minutes)
}

func getLockTableAtTheEnd(tableDef *TableDef) bool {
if tableDef.Pkey.PkeyColName == catalog.FakePrimaryKeyColName || //fake pk, skip
tableDef.Partition != nil || // unsupport partition table
len(tableDef.Pkey.Names) > 1 { // unsupport multi-column primary key
return false
}
return !strings.HasPrefix(tableDef.Name, catalog.IndexTableNamePrefix)
}
// do not lock table if lock no rows now.
// if need to lock table, uncomment these codes
// func getLockTableAtTheEnd(tableDef *TableDef) bool {
// if tableDef.Pkey.PkeyColName == catalog.FakePrimaryKeyColName || //fake pk, skip
// tableDef.Partition != nil { // unsupport partition table
// return false
// }
// return !strings.HasPrefix(tableDef.Name, catalog.IndexTableNamePrefix)
// }
28 changes: 15 additions & 13 deletions proto/pipeline.proto
Original file line number Diff line number Diff line change
Expand Up @@ -356,19 +356,21 @@ message MarkJoin {
}

message DedupJoin {
repeated plan.Expr left_cond = 1;
repeated plan.Expr right_cond = 2;
repeated plan.RuntimeFilterSpec runtime_filter_build_list = 3;
bool is_shuffle = 4;
int32 join_map_tag = 5;
int32 shuffle_idx = 6;
plan.Node.OnDuplicateAction on_duplicate_action = 7;
string dedup_col_name = 8;
repeated plan.Type dedup_col_types = 9 [(gogoproto.nullable) = false];
repeated plan.Type left_types = 10 [(gogoproto.nullable) = false];
repeated plan.Type right_types = 11 [(gogoproto.nullable) = false];
repeated int32 update_col_idx_list = 12;
repeated plan.Expr update_col_expr_list = 13;
repeated int32 rel_list = 1;
repeated int32 col_list = 2;
repeated plan.Expr left_cond = 3;
repeated plan.Expr right_cond = 4;
repeated plan.RuntimeFilterSpec runtime_filter_build_list = 5;
bool is_shuffle = 6;
int32 join_map_tag = 7;
int32 shuffle_idx = 8;
plan.Node.OnDuplicateAction on_duplicate_action = 9;
string dedup_col_name = 10;
repeated plan.Type dedup_col_types = 11 [(gogoproto.nullable) = false];
repeated plan.Type left_types = 12 [(gogoproto.nullable) = false];
repeated plan.Type right_types = 13 [(gogoproto.nullable) = false];
repeated int32 update_col_idx_list = 14;
repeated plan.Expr update_col_expr_list = 15;
}

message Product {
Expand Down
9 changes: 9 additions & 0 deletions test/distributed/cases/dml/insert/insert_ignore.result
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,12 @@ c1 c2
45 45
55 55
222 222
create table insert_ignore_09(c1 int primary key, c2 int);
insert into insert_ignore_09 values(20,45),(21,55),(1,45),(6,22),(5,1),(1000,222),(99999,19);
insert ignore into insert_ignore_09 select result, result from generate_series(1,10000000) g;
select count(*) from insert_ignore_09;
count(*)
10000000
select count(*) from insert_ignore_09 where c1 != c2;
count(*)
7
6 changes: 6 additions & 0 deletions test/distributed/cases/dml/insert/insert_ignore.sql
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,9 @@ insert into insert_ignore_08 values(20,45),(21,55),(1,45),(6,22),(5,1),(1000,222
insert ignore into insert_ignore_08 select * from insert_ignore_07;
select count(*) from insert_ignore_08;
select * from insert_ignore_08 where c2 in (45,55,22,1,222,19);

create table insert_ignore_09(c1 int primary key, c2 int);
insert into insert_ignore_09 values(20,45),(21,55),(1,45),(6,22),(5,1),(1000,222),(99999,19);
insert ignore into insert_ignore_09 select result, result from generate_series(1,10000000) g;
select count(*) from insert_ignore_09;
select count(*) from insert_ignore_09 where c1 != c2;
Loading

0 comments on commit 529dfa2

Please sign in to comment.