From a5c552ef57a594f2ba3d4c90e4a13cdf59dd6763 Mon Sep 17 00:00:00 2001 From: qingxinhome <70939751+qingxinhome@users.noreply.github.com> Date: Tue, 11 Jun 2024 10:22:38 +0800 Subject: [PATCH] handle Restore Duplicate Entry-1.2-dev (#16743) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit SQL执行时将事务WriteOffset与当前语句绑定,解决读数据万圣节问题 MO Checkin Regression test susccess: https://github.com/matrixorigin/ci-test/actions/runs/9412877126 Approved by: @badboynt1, @ouyuanning, @aunjgr, @m-schen, @daviszhen, @triump2020, @reusee, @zhangxu19830126, @XuPeng-SH, @sukki37 --- pkg/frontend/session_test.go | 2 +- pkg/frontend/test/engine_mock.go | 16 ++--- pkg/frontend/util_test.go | 4 +- pkg/sql/colexec/index_metadata_test.go | 8 +-- pkg/sql/compile/compile.go | 13 +++- pkg/sql/compile/scope.go | 62 ++++++++++------- pkg/sql/compile/types.go | 5 ++ pkg/sql/plan/function/func_mo.go | 6 +- pkg/util/test/cron_task_test.go | 2 +- pkg/vm/engine/disttae/filter.go | 5 +- pkg/vm/engine/disttae/partition_reader.go | 16 +++-- pkg/vm/engine/disttae/reader.go | 14 +++- pkg/vm/engine/disttae/txn_table.go | 79 ++++++++++++++++------ pkg/vm/engine/disttae/types.go | 5 +- pkg/vm/engine/memoryengine/table_reader.go | 5 +- pkg/vm/engine/types.go | 15 +++- 16 files changed, 179 insertions(+), 78 deletions(-) diff --git a/pkg/frontend/session_test.go b/pkg/frontend/session_test.go index 213715f8ba56e..e993f613e4a24 100644 --- a/pkg/frontend/session_test.go +++ b/pkg/frontend/session_test.go @@ -337,7 +337,7 @@ func TestSession_TxnCompilerContext(t *testing.T) { db.EXPECT().Relations(gomock.Any()).Return(nil, nil).AnyTimes() table := mock_frontend.NewMockRelation(ctrl) - table.EXPECT().Ranges(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() + table.EXPECT().Ranges(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() table.EXPECT().TableDefs(gomock.Any()).Return(nil, nil).AnyTimes() table.EXPECT().GetTableDef(gomock.Any()).Return(&plan.TableDef{}).AnyTimes() table.EXPECT().CopyTableDef(gomock.Any()).Return(&plan.TableDef{}).AnyTimes() diff --git a/pkg/frontend/test/engine_mock.go b/pkg/frontend/test/engine_mock.go index 5093f18dd8dfa..8a046d1dfe241 100644 --- a/pkg/frontend/test/engine_mock.go +++ b/pkg/frontend/test/engine_mock.go @@ -559,18 +559,18 @@ func (mr *MockRelationMockRecorder) MergeObjects(ctx, objstats, policyName, targ } // NewReader mocks base method. -func (m *MockRelation) NewReader(arg0 context.Context, arg1 int, arg2 *plan.Expr, arg3 []byte, arg4 bool) ([]engine.Reader, error) { +func (m *MockRelation) NewReader(arg0 context.Context, arg1 int, arg2 *plan.Expr, arg3 []byte, arg4 bool, arg5 int) ([]engine.Reader, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "NewReader", arg0, arg1, arg2, arg3, arg4) + ret := m.ctrl.Call(m, "NewReader", arg0, arg1, arg2, arg3, arg4, arg5) ret0, _ := ret[0].([]engine.Reader) ret1, _ := ret[1].(error) return ret0, ret1 } // NewReader indicates an expected call of NewReader. -func (mr *MockRelationMockRecorder) NewReader(arg0, arg1, arg2, arg3, arg4 interface{}) *gomock.Call { +func (mr *MockRelationMockRecorder) NewReader(arg0, arg1, arg2, arg3, arg4, arg5 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewReader", reflect.TypeOf((*MockRelation)(nil).NewReader), arg0, arg1, arg2, arg3, arg4) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewReader", reflect.TypeOf((*MockRelation)(nil).NewReader), arg0, arg1, arg2, arg3, arg4, arg5) } // PrimaryKeysMayBeModified mocks base method. @@ -589,18 +589,18 @@ func (mr *MockRelationMockRecorder) PrimaryKeysMayBeModified(ctx, from, to, keyV } // Ranges mocks base method. -func (m *MockRelation) Ranges(arg0 context.Context, arg1 []*plan.Expr) (engine.Ranges, error) { +func (m *MockRelation) Ranges(arg0 context.Context, arg1 []*plan.Expr, arg2 int) (engine.Ranges, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Ranges", arg0, arg1) + ret := m.ctrl.Call(m, "Ranges", arg0, arg1, arg2) ret0, _ := ret[0].(engine.Ranges) ret1, _ := ret[1].(error) return ret0, ret1 } // Ranges indicates an expected call of Ranges. -func (mr *MockRelationMockRecorder) Ranges(arg0, arg1 interface{}) *gomock.Call { +func (mr *MockRelationMockRecorder) Ranges(arg0, arg1, arg2 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Ranges", reflect.TypeOf((*MockRelation)(nil).Ranges), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Ranges", reflect.TypeOf((*MockRelation)(nil).Ranges), arg0, arg1, arg2) } // Rows mocks base method. diff --git a/pkg/frontend/util_test.go b/pkg/frontend/util_test.go index d1e35c4fca3fa..ad14cd3b50197 100644 --- a/pkg/frontend/util_test.go +++ b/pkg/frontend/util_test.go @@ -603,8 +603,8 @@ func TestGetExprValue(t *testing.T) { binary.LittleEndian.PutUint64(id, 1) ranges.Append(id) - table.EXPECT().Ranges(gomock.Any(), gomock.Any()).Return(&ranges, nil).AnyTimes() - table.EXPECT().NewReader(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, moerr.NewInvalidInputNoCtx("new reader failed")).AnyTimes() + table.EXPECT().Ranges(gomock.Any(), gomock.Any(), gomock.Any()).Return(&ranges, nil).AnyTimes() + table.EXPECT().NewReader(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, moerr.NewInvalidInputNoCtx("new reader failed")).AnyTimes() eng.EXPECT().Database(gomock.Any(), gomock.Any(), gomock.Any()).Return(db, nil).AnyTimes() eng.EXPECT().Hints().Return(engine.Hints{ diff --git a/pkg/sql/colexec/index_metadata_test.go b/pkg/sql/colexec/index_metadata_test.go index 6f209cd868eef..b8b0f61f87421 100644 --- a/pkg/sql/colexec/index_metadata_test.go +++ b/pkg/sql/colexec/index_metadata_test.go @@ -48,7 +48,7 @@ func TestInsertIndexMetadata(t *testing.T) { mockEngine.EXPECT().Database(gomock.Any(), catalog.MO_CATALOG, txnOperator).Return(catalog_database, nil).AnyTimes() indexes_relation := mock_frontend.NewMockRelation(ctrl) - indexes_relation.EXPECT().Ranges(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() + indexes_relation.EXPECT().Ranges(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() indexes_relation.EXPECT().Delete(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() indexes_relation.EXPECT().Write(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() @@ -78,7 +78,7 @@ func TestInsertIndexMetadata(t *testing.T) { }).AnyTimes() reader.EXPECT().Close().Return(nil).AnyTimes() - indexes_relation.EXPECT().NewReader(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return([]engine.Reader{reader}, nil).AnyTimes() + indexes_relation.EXPECT().NewReader(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return([]engine.Reader{reader}, nil).AnyTimes() catalog_database.EXPECT().Relation(gomock.Any(), catalog.MO_INDEXES, gomock.Any()).Return(indexes_relation, nil).AnyTimes() //--------------------------------------------------------------------------------------------------------------------------- mock_emp_Relation := mock_frontend.NewMockRelation(ctrl) @@ -140,7 +140,7 @@ func TestInsertOneIndexMetadata(t *testing.T) { mockEngine.EXPECT().Database(gomock.Any(), catalog.MO_CATALOG, txnOperator).Return(catalog_database, nil).AnyTimes() indexes_relation := mock_frontend.NewMockRelation(ctrl) - indexes_relation.EXPECT().Ranges(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() + indexes_relation.EXPECT().Ranges(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() indexes_relation.EXPECT().Delete(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() indexes_relation.EXPECT().Write(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() @@ -170,7 +170,7 @@ func TestInsertOneIndexMetadata(t *testing.T) { }).AnyTimes() reader.EXPECT().Close().Return(nil).AnyTimes() - indexes_relation.EXPECT().NewReader(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return([]engine.Reader{reader}, nil).AnyTimes() + indexes_relation.EXPECT().NewReader(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return([]engine.Reader{reader}, nil).AnyTimes() catalog_database.EXPECT().Relation(gomock.Any(), catalog.MO_INDEXES, gomock.Any()).Return(indexes_relation, nil).AnyTimes() //--------------------------------------------------------------------------------------------------------------------------- diff --git a/pkg/sql/compile/compile.go b/pkg/sql/compile/compile.go index e446440a25c66..23857772e2db2 100644 --- a/pkg/sql/compile/compile.go +++ b/pkg/sql/compile/compile.go @@ -130,7 +130,13 @@ func NewCompile( c.startAt = startAt c.disableRetry = false if c.proc.TxnOperator != nil { + // TODO: The action of updating the WriteOffset logic should be executed in the `func (c *Compile) Run(_ uint64)` method. + // However, considering that the delay ranges are not completed yet, the UpdateSnapshotWriteOffset() and + // the assignment of `Compile.TxnOffset` should be moved into the `func (c *Compile) Run(_ uint64)` method in the later stage. c.proc.TxnOperator.GetWorkspace().UpdateSnapshotWriteOffset() + c.TxnOffset = c.proc.TxnOperator.GetWorkspace().GetSnapshotWriteOffset() + } else { + c.TxnOffset = 0 } return c } @@ -2223,6 +2229,7 @@ func (c *Compile) compileTableScan(n *plan.Node) ([]*Scope, error) { func (c *Compile) compileTableScanWithNode(n *plan.Node, node engine.Node) (*Scope, error) { s := newScope(Remote) s.NodeInfo = node + s.TxnOffset = c.TxnOffset s.DataSource = &Source{ node: n, } @@ -3794,7 +3801,7 @@ func (c *Compile) expandRanges(n *plan.Node, rel engine.Relation, blockFilterLis if err != nil { return nil, err } - ranges, err = rel.Ranges(ctx, blockFilterList) + ranges, err = rel.Ranges(ctx, blockFilterList, c.TxnOffset) if err != nil { return nil, err } @@ -3807,7 +3814,7 @@ func (c *Compile) expandRanges(n *plan.Node, rel engine.Relation, blockFilterLis if err != nil { return nil, err } - subranges, err := subrelation.Ranges(ctx, n.BlockFilterList) + subranges, err := subrelation.Ranges(ctx, n.BlockFilterList, c.TxnOffset) if err != nil { return nil, err } @@ -3829,7 +3836,7 @@ func (c *Compile) expandRanges(n *plan.Node, rel engine.Relation, blockFilterLis if err != nil { return nil, err } - subranges, err := subrelation.Ranges(ctx, n.BlockFilterList) + subranges, err := subrelation.Ranges(ctx, n.BlockFilterList, c.TxnOffset) if err != nil { return nil, err } diff --git a/pkg/sql/compile/scope.go b/pkg/sql/compile/scope.go index 89c76a8d5b68a..2ebbad6abf00b 100644 --- a/pkg/sql/compile/scope.go +++ b/pkg/sql/compile/scope.go @@ -17,33 +17,27 @@ package compile import ( "context" "fmt" - "strings" - "hash/crc32" goruntime "runtime" "runtime/debug" + "strings" "sync" - "github.com/matrixorigin/matrixone/pkg/catalog" - "github.com/matrixorigin/matrixone/pkg/common/bitmap" - "github.com/matrixorigin/matrixone/pkg/objectio" - "github.com/matrixorigin/matrixone/pkg/pb/timestamp" - "github.com/matrixorigin/matrixone/pkg/sql/colexec/right" - "github.com/matrixorigin/matrixone/pkg/sql/colexec/rightanti" - "github.com/matrixorigin/matrixone/pkg/sql/colexec/rightsemi" - "github.com/matrixorigin/matrixone/pkg/sql/util" - "github.com/matrixorigin/matrixone/pkg/vm/engine" - "github.com/matrixorigin/matrixone/pkg/vm/engine/disttae" - "github.com/matrixorigin/matrixone/pkg/vm/engine/memoryengine" + "github.com/panjf2000/ants/v2" + "go.uber.org/zap" + "github.com/matrixorigin/matrixone/pkg/catalog" "github.com/matrixorigin/matrixone/pkg/cnservice/cnclient" + "github.com/matrixorigin/matrixone/pkg/common/bitmap" "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/common/morpc" "github.com/matrixorigin/matrixone/pkg/common/reuse" "github.com/matrixorigin/matrixone/pkg/common/runtime" "github.com/matrixorigin/matrixone/pkg/defines" + "github.com/matrixorigin/matrixone/pkg/objectio" pbpipeline "github.com/matrixorigin/matrixone/pkg/pb/pipeline" "github.com/matrixorigin/matrixone/pkg/pb/plan" + "github.com/matrixorigin/matrixone/pkg/pb/timestamp" "github.com/matrixorigin/matrixone/pkg/sql/colexec" "github.com/matrixorigin/matrixone/pkg/sql/colexec/connector" "github.com/matrixorigin/matrixone/pkg/sql/colexec/group" @@ -55,15 +49,19 @@ import ( "github.com/matrixorigin/matrixone/pkg/sql/colexec/mergetop" "github.com/matrixorigin/matrixone/pkg/sql/colexec/offset" "github.com/matrixorigin/matrixone/pkg/sql/colexec/restrict" + "github.com/matrixorigin/matrixone/pkg/sql/colexec/right" + "github.com/matrixorigin/matrixone/pkg/sql/colexec/rightanti" + "github.com/matrixorigin/matrixone/pkg/sql/colexec/rightsemi" "github.com/matrixorigin/matrixone/pkg/sql/colexec/sample" "github.com/matrixorigin/matrixone/pkg/sql/colexec/top" plan2 "github.com/matrixorigin/matrixone/pkg/sql/plan" + "github.com/matrixorigin/matrixone/pkg/sql/util" "github.com/matrixorigin/matrixone/pkg/vm" + "github.com/matrixorigin/matrixone/pkg/vm/engine" + "github.com/matrixorigin/matrixone/pkg/vm/engine/disttae" + "github.com/matrixorigin/matrixone/pkg/vm/engine/memoryengine" "github.com/matrixorigin/matrixone/pkg/vm/pipeline" "github.com/matrixorigin/matrixone/pkg/vm/process" - - "github.com/panjf2000/ants/v2" - "go.uber.org/zap" ) func newScope(magic magicType) *Scope { @@ -525,7 +523,12 @@ func buildScanParallelRun(s *Scope, c *Compile) (*Scope, error) { scanUsedCpuNumber = 1 } - readers, err = s.NodeInfo.Rel.NewReader(c.ctx, scanUsedCpuNumber, s.DataSource.FilterExpr, s.NodeInfo.Data, len(s.DataSource.OrderBy) > 0) + readers, err = s.NodeInfo.Rel.NewReader(c.ctx, + scanUsedCpuNumber, + s.DataSource.FilterExpr, + s.NodeInfo.Data, + len(s.DataSource.OrderBy) > 0, + s.TxnOffset) if err != nil { return nil, err } @@ -594,8 +597,12 @@ func buildScanParallelRun(s *Scope, c *Compile) (*Scope, error) { } if rel.GetEngineType() == engine.Memory || s.DataSource.PartitionRelationNames == nil { - mainRds, err1 := rel.NewReader( - ctx, scanUsedCpuNumber, s.DataSource.FilterExpr, s.NodeInfo.Data, len(s.DataSource.OrderBy) > 0) + mainRds, err1 := rel.NewReader(ctx, + scanUsedCpuNumber, + s.DataSource.FilterExpr, + s.NodeInfo.Data, + len(s.DataSource.OrderBy) > 0, + s.TxnOffset) if err1 != nil { return nil, err1 } @@ -622,9 +629,12 @@ func buildScanParallelRun(s *Scope, c *Compile) (*Scope, error) { if len(cleanRanges) > 0 { // create readers for reading clean blocks from the main table. - mainRds, err1 := rel.NewReader( - ctx, - scanUsedCpuNumber, s.DataSource.FilterExpr, cleanRanges, len(s.DataSource.OrderBy) > 0) + mainRds, err1 := rel.NewReader(ctx, + scanUsedCpuNumber, + s.DataSource.FilterExpr, + cleanRanges, + len(s.DataSource.OrderBy) > 0, + s.TxnOffset) if err1 != nil { return nil, err1 } @@ -636,7 +646,12 @@ func buildScanParallelRun(s *Scope, c *Compile) (*Scope, error) { if err1 != nil { return nil, err1 } - memRds, err2 := subRel.NewReader(ctx, scanUsedCpuNumber, s.DataSource.FilterExpr, dirtyRanges[num], len(s.DataSource.OrderBy) > 0) + memRds, err2 := subRel.NewReader(ctx, + scanUsedCpuNumber, + s.DataSource.FilterExpr, + dirtyRanges[num], + len(s.DataSource.OrderBy) > 0, + s.TxnOffset) if err2 != nil { return nil, err2 } @@ -683,6 +698,7 @@ func buildScanParallelRun(s *Scope, c *Compile) (*Scope, error) { AccountId: s.DataSource.AccountId, } readerScopes[i].Proc = process.NewWithAnalyze(s.Proc, c.ctx, 0, c.anal.Nodes()) + readerScopes[i].TxnOffset = s.TxnOffset } mergeFromParallelScanScope, errNew := newParallelScope(c, s, readerScopes) diff --git a/pkg/sql/compile/types.go b/pkg/sql/compile/types.go index e5f24142d07f0..508995c3109ea 100644 --- a/pkg/sql/compile/types.go +++ b/pkg/sql/compile/types.go @@ -21,6 +21,7 @@ import ( "time" "github.com/google/uuid" + "github.com/matrixorigin/matrixone/pkg/common/reuse" "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/types" @@ -121,6 +122,8 @@ type Scope struct { PreScopes []*Scope // NodeInfo contains the information about the remote node. NodeInfo engine.Node + // TxnOffset represents the transaction's write offset, specifying the starting position for reading data. + TxnOffset int // Instructions contains command list of this scope. Instructions vm.Instructions // Proc contains the execution context. @@ -249,6 +252,8 @@ type Compile struct { ctx context.Context // proc stores the execution context. proc *process.Process + // TxnOffset read starting offset position within the transaction during the execute current statement + TxnOffset int MessageBoard *process.MessageBoard diff --git a/pkg/sql/plan/function/func_mo.go b/pkg/sql/plan/function/func_mo.go index 07901d8b5caf4..e3c642e77ee51 100644 --- a/pkg/sql/plan/function/func_mo.go +++ b/pkg/sql/plan/function/func_mo.go @@ -23,6 +23,8 @@ import ( "strings" "time" + "go.uber.org/zap" + "github.com/matrixorigin/matrixone/pkg/catalog" "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/container/bytejson" @@ -34,7 +36,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/sql/plan/function/functionUtil" "github.com/matrixorigin/matrixone/pkg/vm/engine" "github.com/matrixorigin/matrixone/pkg/vm/process" - "go.uber.org/zap" ) const ( @@ -434,7 +435,8 @@ func moTableColMaxMinImpl(fnName string, parameters []*vector.Vector, result vec return err } - ranges, err := rel.Ranges(ctx, nil) + //ranges, err := rel.Ranges(ctx, nil) + ranges, err := rel.Ranges(ctx, nil, 0) if err != nil { return err } diff --git a/pkg/util/test/cron_task_test.go b/pkg/util/test/cron_task_test.go index e1d0c5714ccc6..40a4e227620ad 100644 --- a/pkg/util/test/cron_task_test.go +++ b/pkg/util/test/cron_task_test.go @@ -76,7 +76,7 @@ func TestCalculateStorageUsage(t *testing.T) { txnClient := mock_frontend.NewMockTxnClient(ctrl) txnClient.EXPECT().New(gomock.Any(), gomock.Any()).Return(txnOperator, nil).AnyTimes() table := mock_frontend.NewMockRelation(ctrl) - table.EXPECT().Ranges(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() + table.EXPECT().Ranges(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() table.EXPECT().TableDefs(gomock.Any()).Return(nil, nil).AnyTimes() table.EXPECT().GetPrimaryKeys(gomock.Any()).Return(nil, nil).AnyTimes() table.EXPECT().GetHideKeys(gomock.Any()).Return(nil, nil).AnyTimes() diff --git a/pkg/vm/engine/disttae/filter.go b/pkg/vm/engine/disttae/filter.go index 2b92d956cd5c5..f08895a441d97 100644 --- a/pkg/vm/engine/disttae/filter.go +++ b/pkg/vm/engine/disttae/filter.go @@ -992,6 +992,7 @@ func CompileFilterExpr( func TryFastFilterBlocks( ctx context.Context, tbl *txnTable, + txnOffset int, // Transaction writes offset used to specify the starting position for reading data. snapshotTS timestamp.Timestamp, tableDef *plan.TableDef, exprs []*plan.Expr, @@ -1010,6 +1011,7 @@ func TryFastFilterBlocks( err = ExecuteBlockFilter( ctx, tbl, + txnOffset, snapshotTS, fastFilterOp, loadOp, @@ -1030,6 +1032,7 @@ func TryFastFilterBlocks( func ExecuteBlockFilter( ctx context.Context, tbl *txnTable, + txnOffset int, // Transaction writes offset used to specify the starting position for reading data. snapshotTS timestamp.Timestamp, fastFilterOp FastFilterOp, loadOp LoadOp, @@ -1070,7 +1073,7 @@ func ExecuteBlockFilter( }() if !highSelectivityHint { - *dirtyBlocks = tbl.collectDirtyBlocks(snapshot, uncommittedObjects) + *dirtyBlocks = tbl.collectDirtyBlocks(snapshot, uncommittedObjects, txnOffset) } err = ForeachSnapshotObjects( diff --git a/pkg/vm/engine/disttae/partition_reader.go b/pkg/vm/engine/disttae/partition_reader.go index 51643485a06b4..931e675f2d824 100644 --- a/pkg/vm/engine/disttae/partition_reader.go +++ b/pkg/vm/engine/disttae/partition_reader.go @@ -19,9 +19,10 @@ import ( "github.com/matrixorigin/matrixone/pkg/objectio" + "go.uber.org/zap" + "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/testutil" - "go.uber.org/zap" "github.com/matrixorigin/matrixone/pkg/catalog" "github.com/matrixorigin/matrixone/pkg/common/mpool" @@ -34,8 +35,9 @@ import ( ) type PartitionReader struct { - table *txnTable - prepared bool + table *txnTable + txnOffset int // Transaction writes offset used to specify the starting position for reading data. + prepared bool // inserted rows comes from txn.writes. inserts []*batch.Batch //deleted rows comes from txn.writes or partitionState.rows. @@ -73,8 +75,14 @@ func (p *PartitionReader) prepare() error { inserts = make([]*batch.Batch, 0) deletes = make(map[types.Rowid]uint8) //load inserts and deletes from txn.writes. + + txnOffset := p.txnOffset + if p.table.db.op.IsSnapOp() { + txnOffset = p.table.getTxn().GetSnapshotWriteOffset() + } + p.table.getTxn().forEachTableWrites(p.table.db.databaseId, p.table.tableId, - p.table.getTxn().GetSnapshotWriteOffset(), func(entry Entry) { + txnOffset, func(entry Entry) { if entry.typ == INSERT || entry.typ == INSERT_TXN { if entry.bat == nil || entry.bat.IsEmpty() { return diff --git a/pkg/vm/engine/disttae/reader.go b/pkg/vm/engine/disttae/reader.go index 3920517712160..7a55b1460fb4c 100644 --- a/pkg/vm/engine/disttae/reader.go +++ b/pkg/vm/engine/disttae/reader.go @@ -19,6 +19,8 @@ import ( "sort" "time" + "go.uber.org/zap" + "github.com/matrixorigin/matrixone/pkg/catalog" "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/common/mpool" @@ -41,7 +43,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/index" "github.com/matrixorigin/matrixone/pkg/vm/process" - "go.uber.org/zap" ) // ----------------------------------------------------------------- @@ -477,11 +478,13 @@ func newBlockMergeReader( ts timestamp.Timestamp, dirtyBlks []*objectio.BlockInfo, filterExpr *plan.Expr, + txnOffset int, // Transaction writes offset used to specify the starting position for reading data. fs fileservice.FileService, proc *process.Process, ) *blockMergeReader { r := &blockMergeReader{ - table: txnTable, + table: txnTable, + txnOffset: txnOffset, blockReader: newBlockReader( ctx, txnTable.GetTableDef(ctx), @@ -608,12 +611,17 @@ func (r *blockMergeReader) loadDeletes(ctx context.Context, cols []string) error iter.Close() + txnOffset := r.txnOffset + if r.table.db.op.IsSnapOp() { + txnOffset = r.table.getTxn().GetSnapshotWriteOffset() + } + //TODO:: if r.table.writes is a map , the time complexity could be O(1) //load deletes from txn.writes for the specified block r.table.getTxn().forEachTableWrites( r.table.db.databaseId, r.table.tableId, - r.table.getTxn().GetSnapshotWriteOffset(), func(entry Entry) { + txnOffset, func(entry Entry) { if entry.isGeneratedByTruncate() { return } diff --git a/pkg/vm/engine/disttae/txn_table.go b/pkg/vm/engine/disttae/txn_table.go index 9178bc6d588b0..4abd56ce471e2 100644 --- a/pkg/vm/engine/disttae/txn_table.go +++ b/pkg/vm/engine/disttae/txn_table.go @@ -23,6 +23,8 @@ import ( "time" "unsafe" + "go.uber.org/zap" + "github.com/docker/go-units" "github.com/matrixorigin/matrixone/pkg/catalog" "github.com/matrixorigin/matrixone/pkg/common/moerr" @@ -57,7 +59,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/mergesort" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/options" "github.com/matrixorigin/matrixone/pkg/vm/process" - "go.uber.org/zap" ) const ( @@ -630,8 +631,12 @@ func (tbl *txnTable) resetSnapshot() { tbl._partState.Store(nil) } -// return all unmodified blocks -func (tbl *txnTable) Ranges(ctx context.Context, exprs []*plan.Expr) (ranges engine.Ranges, err error) { +// Ranges returns all unmodified blocks from the table. +// Parameters: +// - ctx: Context used to control the lifecycle of the request. +// - exprs: A slice of expressions used to filter data. +// - txnOffset: Transaction offset used to specify the starting position for reading data. +func (tbl *txnTable) Ranges(ctx context.Context, exprs []*plan.Expr, txnOffset int) (ranges engine.Ranges, err error) { start := time.Now() seq := tbl.db.op.NextSequence() trace.GetService().AddTxnDurationAction( @@ -683,6 +688,7 @@ func (tbl *txnTable) Ranges(ctx context.Context, exprs []*plan.Expr) (ranges eng exprs, &blocks, tbl.proc.Load(), + txnOffset, ); err != nil { return } @@ -717,16 +723,17 @@ func (tbl *txnTable) rangesOnePart( exprs []*plan.Expr, // filter expression outBlocks *objectio.BlockInfoSlice, // output marshaled block list after filtering proc *process.Process, // process of this transaction + txnOffset int, ) (err error) { - var done bool // collect dirty blocks lazily var dirtyBlks map[types.Blockid]struct{} - uncommittedObjects := tbl.collectUnCommittedObjects() + uncommittedObjects := tbl.collectUnCommittedObjects(txnOffset) if done, err = TryFastFilterBlocks( ctx, tbl, + txnOffset, tbl.db.op.SnapshotTS(), tbl.tableDef, exprs, @@ -752,7 +759,7 @@ func (tbl *txnTable) rangesOnePart( } if dirtyBlks == nil { - tbl.collectDirtyBlocks(state, uncommittedObjects) + tbl.collectDirtyBlocks(state, uncommittedObjects, txnOffset) } // for dynamic parameter, substitute param ref and const fold cast expression here to improve performance @@ -906,12 +913,20 @@ func (tbl *txnTable) rangesOnePart( return } -func (tbl *txnTable) collectUnCommittedObjects() []objectio.ObjectStats { +// Parameters: +// - txnOffset: Transaction writes offset used to specify the starting position for reading data. +// - fromSnapshot: Boolean indicating if the data is from a snapshot. +func (tbl *txnTable) collectUnCommittedObjects(txnOffset int) []objectio.ObjectStats { var unCommittedObjects []objectio.ObjectStats + + if tbl.db.op.IsSnapOp() { + txnOffset = tbl.getTxn().GetSnapshotWriteOffset() + } + tbl.getTxn().forEachTableWrites( tbl.db.databaseId, tbl.tableId, - tbl.getTxn().GetSnapshotWriteOffset(), + txnOffset, func(entry Entry) { stats := objectio.ObjectStats{} if entry.bat == nil || entry.bat.IsEmpty() { @@ -936,7 +951,9 @@ func (tbl *txnTable) collectUnCommittedObjects() []objectio.ObjectStats { func (tbl *txnTable) collectDirtyBlocks( state *logtailreplay.PartitionState, - uncommittedObjects []objectio.ObjectStats) map[types.Blockid]struct{} { + uncommittedObjects []objectio.ObjectStats, + txnOffset int, // Transaction writes offset used to specify the starting position for reading data. +) map[types.Blockid]struct{} { dirtyBlks := make(map[types.Blockid]struct{}) //collect partitionState.dirtyBlocks which may be invisible to this txn into dirtyBlks. { @@ -964,10 +981,14 @@ func (tbl *txnTable) collectDirtyBlocks( }, uncommittedObjects...) } + if tbl.db.op.IsSnapOp() { + txnOffset = tbl.getTxn().GetSnapshotWriteOffset() + } + tbl.getTxn().forEachTableWrites( tbl.db.databaseId, tbl.tableId, - tbl.getTxn().GetSnapshotWriteOffset(), + txnOffset, func(entry Entry) { // the CN workspace can only handle `INSERT` and `DELETE` operations. Other operations will be skipped, // TODO Adjustments will be made here in the future @@ -1691,8 +1712,21 @@ func (tbl *txnTable) GetDBID(ctx context.Context) uint64 { return tbl.db.databaseId } +// NewReader creates a new list of Readers to read data from the table. +// Parameters: +// - ctx: Context used to control the lifecycle of the request. +// - num: The number of Readers to create. +// - expr: Expression used to filter data. +// - ranges: Byte array representing the data range to read. +// - orderedScan: Whether to scan the data in order. +// - txnOffset: Transaction offset used to specify the starting position for reading data. func (tbl *txnTable) NewReader( - ctx context.Context, num int, expr *plan.Expr, ranges []byte, orderedScan bool, + ctx context.Context, + num int, + expr *plan.Expr, + ranges []byte, + orderedScan bool, + txnOffset int, ) ([]engine.Reader, error) { pkFilter := tbl.tryExtractPKFilter(expr) blkArray := objectio.BlockInfoSlice(ranges) @@ -1700,10 +1734,10 @@ func (tbl *txnTable) NewReader( return []engine.Reader{new(emptyReader)}, nil } if blkArray.Len() == 0 { - return tbl.newMergeReader(ctx, num, expr, pkFilter, nil) + return tbl.newMergeReader(ctx, num, expr, pkFilter, nil, txnOffset) } if blkArray.Len() == 1 && engine.IsMemtable(blkArray.GetBytes(0)) { - return tbl.newMergeReader(ctx, num, expr, pkFilter, nil) + return tbl.newMergeReader(ctx, num, expr, pkFilter, nil, txnOffset) } if blkArray.Len() > 1 && engine.IsMemtable(blkArray.GetBytes(0)) { rds := make([]engine.Reader, num) @@ -1720,7 +1754,7 @@ func (tbl *txnTable) NewReader( } dirtyBlks = append(dirtyBlks, blkInfo) } - rds0, err := tbl.newMergeReader(ctx, num, expr, pkFilter, dirtyBlks) + rds0, err := tbl.newMergeReader(ctx, num, expr, pkFilter, dirtyBlks, txnOffset) if err != nil { return nil, err } @@ -1790,6 +1824,7 @@ func (tbl *txnTable) newMergeReader( expr *plan.Expr, pkFilter PKFilter, dirtyBlks []*objectio.BlockInfo, + txnOffset int, // Transaction writes offset used to specify the starting position for reading data. ) ([]engine.Reader, error) { rds := make([]engine.Reader, num) mrds := make([]mergeReader, num) @@ -1798,7 +1833,8 @@ func (tbl *txnTable) newMergeReader( num, pkFilter, expr, - dirtyBlks) + dirtyBlks, + txnOffset) if err != nil { return nil, err } @@ -1909,6 +1945,7 @@ func (tbl *txnTable) newReader( pkFilter PKFilter, expr *plan.Expr, dirtyBlks []*objectio.BlockInfo, + txnOffset int, // Transaction writes offset used to specify the starting position for reading data. ) ([]engine.Reader, error) { txn := tbl.getTxn() ts := txn.op.SnapshotTS() @@ -1949,10 +1986,11 @@ func (tbl *txnTable) newReader( } partReader := &PartitionReader{ - table: tbl, - iter: iter, - seqnumMp: seqnumMp, - typsMap: mp, + table: tbl, + txnOffset: txnOffset, + iter: iter, + seqnumMp: seqnumMp, + typsMap: mp, } //tbl.Lock() @@ -1972,6 +2010,7 @@ func (tbl *txnTable) newReader( ts, []*objectio.BlockInfo{dirtyBlks[i]}, expr, + txnOffset, fs, proc, ), @@ -1989,6 +2028,7 @@ func (tbl *txnTable) newReader( ts, []*objectio.BlockInfo{dirtyBlks[i]}, expr, + txnOffset, fs, proc, ) @@ -2018,6 +2058,7 @@ func (tbl *txnTable) newReader( bmr := &blockMergeReader{ blockReader: blockReaders[i], table: tbl, + txnOffset: txnOffset, pkFilter: pkFilter, deletaLocs: make(map[string][]objectio.Location), } diff --git a/pkg/vm/engine/disttae/types.go b/pkg/vm/engine/disttae/types.go index 1b7661c3d8dae..63c1bdca08a36 100644 --- a/pkg/vm/engine/disttae/types.go +++ b/pkg/vm/engine/disttae/types.go @@ -789,8 +789,9 @@ type blockReader struct { type blockMergeReader struct { *blockReader - table *txnTable - pkFilter PKFilter + table *txnTable + txnOffset int // Transaction writes offset used to specify the starting position for reading data. + pkFilter PKFilter //for perfetch deletes loaded bool pkidx int diff --git a/pkg/vm/engine/memoryengine/table_reader.go b/pkg/vm/engine/memoryengine/table_reader.go index daab32841d67b..8f234e62431e7 100644 --- a/pkg/vm/engine/memoryengine/table_reader.go +++ b/pkg/vm/engine/memoryengine/table_reader.go @@ -17,6 +17,7 @@ package memoryengine import ( "context" "encoding/binary" + "github.com/matrixorigin/matrixone/pkg/catalog" "github.com/matrixorigin/matrixone/pkg/common/mpool" "github.com/matrixorigin/matrixone/pkg/container/batch" @@ -41,7 +42,7 @@ type IterInfo struct { IterID ID } -func (t *Table) NewReader(ctx context.Context, parallel int, expr *plan.Expr, bytes []byte, _ bool) (readers []engine.Reader, err error) { +func (t *Table) NewReader(ctx context.Context, parallel int, expr *plan.Expr, bytes []byte, _ bool, txnOffset int) (readers []engine.Reader, err error) { readers = make([]engine.Reader, parallel) shardIDs := ShardIdSlice(bytes) @@ -210,7 +211,7 @@ func (t *Table) GetEngineType() engine.EngineType { return engine.Memory } -func (t *Table) Ranges(_ context.Context, _ []*plan.Expr) (engine.Ranges, error) { +func (t *Table) Ranges(_ context.Context, _ []*plan.Expr, _ int) (engine.Ranges, error) { // return encoded shard ids nodes := getTNServices(t.engine.cluster) shards := make(ShardIdSlice, 0, len(nodes)*8) diff --git a/pkg/vm/engine/types.go b/pkg/vm/engine/types.go index 5bb957a8b415b..98f799ac7b56d 100644 --- a/pkg/vm/engine/types.go +++ b/pkg/vm/engine/types.go @@ -593,7 +593,11 @@ var _ Ranges = (*objectio.BlockInfoSlice)(nil) type Relation interface { Statistics - Ranges(context.Context, []*plan.Expr) (Ranges, error) + // Ranges Parameters: + // first parameter: Context + // second parameter: Slice of expressions used to filter the data. + // third parameter: Transaction offset used to specify the starting position for reading data. + Ranges(context.Context, []*plan.Expr, int) (Ranges, error) TableDefs(context.Context) ([]TableDef, error) @@ -630,8 +634,13 @@ type Relation interface { GetDBID(context.Context) uint64 - // second argument is the number of reader, third argument is the filter extend, foruth parameter is the payload required by the engine - NewReader(context.Context, int, *plan.Expr, []byte, bool) ([]Reader, error) + // NewReader Parameters: + // second parameter is the number of reader, + // third parameter is the filter extend, + // foruth parameter is the payload required by the engine + // fifth parameter is data blocks + // sixth parameter is transaction offset used to specify the starting position for reading data. + NewReader(context.Context, int, *plan.Expr, []byte, bool, int) ([]Reader, error) TableColumns(ctx context.Context) ([]*Attribute, error)