Skip to content

Commit

Permalink
handle Restore Duplicate Entry-1.2-dev (#16743)
Browse files Browse the repository at this point in the history
SQL执行时将事务WriteOffset与当前语句绑定,解决读数据万圣节问题

MO Checkin Regression test susccess:
https://github.com/matrixorigin/ci-test/actions/runs/9412877126

Approved by: @badboynt1, @ouyuanning, @aunjgr, @m-schen, @daviszhen, @triump2020, @reusee, @zhangxu19830126, @XuPeng-SH, @sukki37
  • Loading branch information
qingxinhome authored Jun 11, 2024
1 parent 6391c36 commit a5c552e
Show file tree
Hide file tree
Showing 16 changed files with 179 additions and 78 deletions.
2 changes: 1 addition & 1 deletion pkg/frontend/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ func TestSession_TxnCompilerContext(t *testing.T) {
db.EXPECT().Relations(gomock.Any()).Return(nil, nil).AnyTimes()

table := mock_frontend.NewMockRelation(ctrl)
table.EXPECT().Ranges(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()
table.EXPECT().Ranges(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()
table.EXPECT().TableDefs(gomock.Any()).Return(nil, nil).AnyTimes()
table.EXPECT().GetTableDef(gomock.Any()).Return(&plan.TableDef{}).AnyTimes()
table.EXPECT().CopyTableDef(gomock.Any()).Return(&plan.TableDef{}).AnyTimes()
Expand Down
16 changes: 8 additions & 8 deletions pkg/frontend/test/engine_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pkg/frontend/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,8 +603,8 @@ func TestGetExprValue(t *testing.T) {
binary.LittleEndian.PutUint64(id, 1)
ranges.Append(id)

table.EXPECT().Ranges(gomock.Any(), gomock.Any()).Return(&ranges, nil).AnyTimes()
table.EXPECT().NewReader(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, moerr.NewInvalidInputNoCtx("new reader failed")).AnyTimes()
table.EXPECT().Ranges(gomock.Any(), gomock.Any(), gomock.Any()).Return(&ranges, nil).AnyTimes()
table.EXPECT().NewReader(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, moerr.NewInvalidInputNoCtx("new reader failed")).AnyTimes()

eng.EXPECT().Database(gomock.Any(), gomock.Any(), gomock.Any()).Return(db, nil).AnyTimes()
eng.EXPECT().Hints().Return(engine.Hints{
Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/colexec/index_metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestInsertIndexMetadata(t *testing.T) {
mockEngine.EXPECT().Database(gomock.Any(), catalog.MO_CATALOG, txnOperator).Return(catalog_database, nil).AnyTimes()

indexes_relation := mock_frontend.NewMockRelation(ctrl)
indexes_relation.EXPECT().Ranges(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()
indexes_relation.EXPECT().Ranges(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()
indexes_relation.EXPECT().Delete(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
indexes_relation.EXPECT().Write(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()

Expand Down Expand Up @@ -78,7 +78,7 @@ func TestInsertIndexMetadata(t *testing.T) {
}).AnyTimes()
reader.EXPECT().Close().Return(nil).AnyTimes()

indexes_relation.EXPECT().NewReader(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return([]engine.Reader{reader}, nil).AnyTimes()
indexes_relation.EXPECT().NewReader(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return([]engine.Reader{reader}, nil).AnyTimes()
catalog_database.EXPECT().Relation(gomock.Any(), catalog.MO_INDEXES, gomock.Any()).Return(indexes_relation, nil).AnyTimes()
//---------------------------------------------------------------------------------------------------------------------------
mock_emp_Relation := mock_frontend.NewMockRelation(ctrl)
Expand Down Expand Up @@ -140,7 +140,7 @@ func TestInsertOneIndexMetadata(t *testing.T) {
mockEngine.EXPECT().Database(gomock.Any(), catalog.MO_CATALOG, txnOperator).Return(catalog_database, nil).AnyTimes()

indexes_relation := mock_frontend.NewMockRelation(ctrl)
indexes_relation.EXPECT().Ranges(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()
indexes_relation.EXPECT().Ranges(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()
indexes_relation.EXPECT().Delete(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
indexes_relation.EXPECT().Write(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()

Expand Down Expand Up @@ -170,7 +170,7 @@ func TestInsertOneIndexMetadata(t *testing.T) {
}).AnyTimes()
reader.EXPECT().Close().Return(nil).AnyTimes()

indexes_relation.EXPECT().NewReader(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return([]engine.Reader{reader}, nil).AnyTimes()
indexes_relation.EXPECT().NewReader(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return([]engine.Reader{reader}, nil).AnyTimes()
catalog_database.EXPECT().Relation(gomock.Any(), catalog.MO_INDEXES, gomock.Any()).Return(indexes_relation, nil).AnyTimes()
//---------------------------------------------------------------------------------------------------------------------------

Expand Down
13 changes: 10 additions & 3 deletions pkg/sql/compile/compile.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,13 @@ func NewCompile(
c.startAt = startAt
c.disableRetry = false
if c.proc.TxnOperator != nil {
// TODO: The action of updating the WriteOffset logic should be executed in the `func (c *Compile) Run(_ uint64)` method.
// However, considering that the delay ranges are not completed yet, the UpdateSnapshotWriteOffset() and
// the assignment of `Compile.TxnOffset` should be moved into the `func (c *Compile) Run(_ uint64)` method in the later stage.
c.proc.TxnOperator.GetWorkspace().UpdateSnapshotWriteOffset()
c.TxnOffset = c.proc.TxnOperator.GetWorkspace().GetSnapshotWriteOffset()
} else {
c.TxnOffset = 0
}
return c
}
Expand Down Expand Up @@ -2223,6 +2229,7 @@ func (c *Compile) compileTableScan(n *plan.Node) ([]*Scope, error) {
func (c *Compile) compileTableScanWithNode(n *plan.Node, node engine.Node) (*Scope, error) {
s := newScope(Remote)
s.NodeInfo = node
s.TxnOffset = c.TxnOffset
s.DataSource = &Source{
node: n,
}
Expand Down Expand Up @@ -3794,7 +3801,7 @@ func (c *Compile) expandRanges(n *plan.Node, rel engine.Relation, blockFilterLis
if err != nil {
return nil, err
}
ranges, err = rel.Ranges(ctx, blockFilterList)
ranges, err = rel.Ranges(ctx, blockFilterList, c.TxnOffset)
if err != nil {
return nil, err
}
Expand All @@ -3807,7 +3814,7 @@ func (c *Compile) expandRanges(n *plan.Node, rel engine.Relation, blockFilterLis
if err != nil {
return nil, err
}
subranges, err := subrelation.Ranges(ctx, n.BlockFilterList)
subranges, err := subrelation.Ranges(ctx, n.BlockFilterList, c.TxnOffset)
if err != nil {
return nil, err
}
Expand All @@ -3829,7 +3836,7 @@ func (c *Compile) expandRanges(n *plan.Node, rel engine.Relation, blockFilterLis
if err != nil {
return nil, err
}
subranges, err := subrelation.Ranges(ctx, n.BlockFilterList)
subranges, err := subrelation.Ranges(ctx, n.BlockFilterList, c.TxnOffset)
if err != nil {
return nil, err
}
Expand Down
62 changes: 39 additions & 23 deletions pkg/sql/compile/scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,27 @@ package compile
import (
"context"
"fmt"
"strings"

"hash/crc32"
goruntime "runtime"
"runtime/debug"
"strings"
"sync"

"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/common/bitmap"
"github.com/matrixorigin/matrixone/pkg/objectio"
"github.com/matrixorigin/matrixone/pkg/pb/timestamp"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/right"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/rightanti"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/rightsemi"
"github.com/matrixorigin/matrixone/pkg/sql/util"
"github.com/matrixorigin/matrixone/pkg/vm/engine"
"github.com/matrixorigin/matrixone/pkg/vm/engine/disttae"
"github.com/matrixorigin/matrixone/pkg/vm/engine/memoryengine"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"

"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/cnservice/cnclient"
"github.com/matrixorigin/matrixone/pkg/common/bitmap"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/common/morpc"
"github.com/matrixorigin/matrixone/pkg/common/reuse"
"github.com/matrixorigin/matrixone/pkg/common/runtime"
"github.com/matrixorigin/matrixone/pkg/defines"
"github.com/matrixorigin/matrixone/pkg/objectio"
pbpipeline "github.com/matrixorigin/matrixone/pkg/pb/pipeline"
"github.com/matrixorigin/matrixone/pkg/pb/plan"
"github.com/matrixorigin/matrixone/pkg/pb/timestamp"
"github.com/matrixorigin/matrixone/pkg/sql/colexec"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/connector"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/group"
Expand All @@ -55,15 +49,19 @@ import (
"github.com/matrixorigin/matrixone/pkg/sql/colexec/mergetop"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/offset"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/restrict"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/right"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/rightanti"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/rightsemi"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/sample"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/top"
plan2 "github.com/matrixorigin/matrixone/pkg/sql/plan"
"github.com/matrixorigin/matrixone/pkg/sql/util"
"github.com/matrixorigin/matrixone/pkg/vm"
"github.com/matrixorigin/matrixone/pkg/vm/engine"
"github.com/matrixorigin/matrixone/pkg/vm/engine/disttae"
"github.com/matrixorigin/matrixone/pkg/vm/engine/memoryengine"
"github.com/matrixorigin/matrixone/pkg/vm/pipeline"
"github.com/matrixorigin/matrixone/pkg/vm/process"

"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
)

func newScope(magic magicType) *Scope {
Expand Down Expand Up @@ -525,7 +523,12 @@ func buildScanParallelRun(s *Scope, c *Compile) (*Scope, error) {
scanUsedCpuNumber = 1
}

readers, err = s.NodeInfo.Rel.NewReader(c.ctx, scanUsedCpuNumber, s.DataSource.FilterExpr, s.NodeInfo.Data, len(s.DataSource.OrderBy) > 0)
readers, err = s.NodeInfo.Rel.NewReader(c.ctx,
scanUsedCpuNumber,
s.DataSource.FilterExpr,
s.NodeInfo.Data,
len(s.DataSource.OrderBy) > 0,
s.TxnOffset)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -594,8 +597,12 @@ func buildScanParallelRun(s *Scope, c *Compile) (*Scope, error) {
}

if rel.GetEngineType() == engine.Memory || s.DataSource.PartitionRelationNames == nil {
mainRds, err1 := rel.NewReader(
ctx, scanUsedCpuNumber, s.DataSource.FilterExpr, s.NodeInfo.Data, len(s.DataSource.OrderBy) > 0)
mainRds, err1 := rel.NewReader(ctx,
scanUsedCpuNumber,
s.DataSource.FilterExpr,
s.NodeInfo.Data,
len(s.DataSource.OrderBy) > 0,
s.TxnOffset)
if err1 != nil {
return nil, err1
}
Expand All @@ -622,9 +629,12 @@ func buildScanParallelRun(s *Scope, c *Compile) (*Scope, error) {

if len(cleanRanges) > 0 {
// create readers for reading clean blocks from the main table.
mainRds, err1 := rel.NewReader(
ctx,
scanUsedCpuNumber, s.DataSource.FilterExpr, cleanRanges, len(s.DataSource.OrderBy) > 0)
mainRds, err1 := rel.NewReader(ctx,
scanUsedCpuNumber,
s.DataSource.FilterExpr,
cleanRanges,
len(s.DataSource.OrderBy) > 0,
s.TxnOffset)
if err1 != nil {
return nil, err1
}
Expand All @@ -636,7 +646,12 @@ func buildScanParallelRun(s *Scope, c *Compile) (*Scope, error) {
if err1 != nil {
return nil, err1
}
memRds, err2 := subRel.NewReader(ctx, scanUsedCpuNumber, s.DataSource.FilterExpr, dirtyRanges[num], len(s.DataSource.OrderBy) > 0)
memRds, err2 := subRel.NewReader(ctx,
scanUsedCpuNumber,
s.DataSource.FilterExpr,
dirtyRanges[num],
len(s.DataSource.OrderBy) > 0,
s.TxnOffset)
if err2 != nil {
return nil, err2
}
Expand Down Expand Up @@ -683,6 +698,7 @@ func buildScanParallelRun(s *Scope, c *Compile) (*Scope, error) {
AccountId: s.DataSource.AccountId,
}
readerScopes[i].Proc = process.NewWithAnalyze(s.Proc, c.ctx, 0, c.anal.Nodes())
readerScopes[i].TxnOffset = s.TxnOffset
}

mergeFromParallelScanScope, errNew := newParallelScope(c, s, readerScopes)
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/compile/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/google/uuid"

"github.com/matrixorigin/matrixone/pkg/common/reuse"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
Expand Down Expand Up @@ -121,6 +122,8 @@ type Scope struct {
PreScopes []*Scope
// NodeInfo contains the information about the remote node.
NodeInfo engine.Node
// TxnOffset represents the transaction's write offset, specifying the starting position for reading data.
TxnOffset int
// Instructions contains command list of this scope.
Instructions vm.Instructions
// Proc contains the execution context.
Expand Down Expand Up @@ -249,6 +252,8 @@ type Compile struct {
ctx context.Context
// proc stores the execution context.
proc *process.Process
// TxnOffset read starting offset position within the transaction during the execute current statement
TxnOffset int

MessageBoard *process.MessageBoard

Expand Down
6 changes: 4 additions & 2 deletions pkg/sql/plan/function/func_mo.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"strings"
"time"

"go.uber.org/zap"

"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/container/bytejson"
Expand All @@ -34,7 +36,6 @@ import (
"github.com/matrixorigin/matrixone/pkg/sql/plan/function/functionUtil"
"github.com/matrixorigin/matrixone/pkg/vm/engine"
"github.com/matrixorigin/matrixone/pkg/vm/process"
"go.uber.org/zap"
)

const (
Expand Down Expand Up @@ -434,7 +435,8 @@ func moTableColMaxMinImpl(fnName string, parameters []*vector.Vector, result vec
return err
}

ranges, err := rel.Ranges(ctx, nil)
//ranges, err := rel.Ranges(ctx, nil)
ranges, err := rel.Ranges(ctx, nil, 0)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/test/cron_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestCalculateStorageUsage(t *testing.T) {
txnClient := mock_frontend.NewMockTxnClient(ctrl)
txnClient.EXPECT().New(gomock.Any(), gomock.Any()).Return(txnOperator, nil).AnyTimes()
table := mock_frontend.NewMockRelation(ctrl)
table.EXPECT().Ranges(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()
table.EXPECT().Ranges(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()
table.EXPECT().TableDefs(gomock.Any()).Return(nil, nil).AnyTimes()
table.EXPECT().GetPrimaryKeys(gomock.Any()).Return(nil, nil).AnyTimes()
table.EXPECT().GetHideKeys(gomock.Any()).Return(nil, nil).AnyTimes()
Expand Down
5 changes: 4 additions & 1 deletion pkg/vm/engine/disttae/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -992,6 +992,7 @@ func CompileFilterExpr(
func TryFastFilterBlocks(
ctx context.Context,
tbl *txnTable,
txnOffset int, // Transaction writes offset used to specify the starting position for reading data.
snapshotTS timestamp.Timestamp,
tableDef *plan.TableDef,
exprs []*plan.Expr,
Expand All @@ -1010,6 +1011,7 @@ func TryFastFilterBlocks(
err = ExecuteBlockFilter(
ctx,
tbl,
txnOffset,
snapshotTS,
fastFilterOp,
loadOp,
Expand All @@ -1030,6 +1032,7 @@ func TryFastFilterBlocks(
func ExecuteBlockFilter(
ctx context.Context,
tbl *txnTable,
txnOffset int, // Transaction writes offset used to specify the starting position for reading data.
snapshotTS timestamp.Timestamp,
fastFilterOp FastFilterOp,
loadOp LoadOp,
Expand Down Expand Up @@ -1070,7 +1073,7 @@ func ExecuteBlockFilter(
}()

if !highSelectivityHint {
*dirtyBlocks = tbl.collectDirtyBlocks(snapshot, uncommittedObjects)
*dirtyBlocks = tbl.collectDirtyBlocks(snapshot, uncommittedObjects, txnOffset)
}

err = ForeachSnapshotObjects(
Expand Down
Loading

0 comments on commit a5c552e

Please sign in to comment.