diff --git a/pkg/vm/engine/tae/catalog/catalog.go b/pkg/vm/engine/tae/catalog/catalog.go index f0d9da7501c1a..f62d4eb66dd95 100644 --- a/pkg/vm/engine/tae/catalog/catalog.go +++ b/pkg/vm/engine/tae/catalog/catalog.go @@ -56,7 +56,6 @@ const ( type DataFactory interface { MakeTableFactory() TableDataFactory - MakeSegmentFactory() SegmentDataFactory MakeBlockFactory() BlockDataFactory } @@ -528,7 +527,6 @@ func (catalog *Catalog) onReplayUpdateSegment( seg = NewReplaySegmentEntry() seg.ID = *cmd.ID.SegmentID() seg.table = tbl - seg.segData = dataFactory.MakeSegmentFactory()(seg) seg.Insert(un) seg.SegmentNode = cmd.node tbl.AddEntryLocked(seg) @@ -596,7 +594,6 @@ func (catalog *Catalog) onReplayCreateSegment( seg.SegmentNode = segNode seg.table = rel seg.ID = *segid - seg.segData = dataFactory.MakeSegmentFactory()(seg) rel.AddEntryLocked(seg) un := &MVCCNode[*MetadataMVCCNode]{ EntryMVCCNode: &EntryMVCCNode{ diff --git a/pkg/vm/engine/tae/catalog/catalog_test.go b/pkg/vm/engine/tae/catalog/catalog_test.go index 0d0714d64cf85..06b4dcdc2bf3d 100644 --- a/pkg/vm/engine/tae/catalog/catalog_test.go +++ b/pkg/vm/engine/tae/catalog/catalog_test.go @@ -368,7 +368,7 @@ func TestSegment1(t *testing.T) { schema.Name = tbName tb, err := db.CreateTableEntry(schema, txn1, nil) assert.Nil(t, err) - seg1, err := tb.CreateSegment(txn1, ES_Appendable, nil, nil) + seg1, err := tb.CreateSegment(txn1, ES_Appendable, nil) assert.Nil(t, err) err = txn1.Commit(context.Background()) assert.Nil(t, err) diff --git a/pkg/vm/engine/tae/catalog/segment.go b/pkg/vm/engine/tae/catalog/segment.go index 6418c6cab5cf8..e0336ab0a5c40 100644 --- a/pkg/vm/engine/tae/catalog/segment.go +++ b/pkg/vm/engine/tae/catalog/segment.go @@ -28,13 +28,10 @@ import ( "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/data" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/txnif" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/index" ) -type SegmentDataFactory = func(meta *SegmentEntry) data.Segment - type SegmentEntry struct { ID objectio.Segmentid Stat SegStat @@ -44,7 +41,6 @@ type SegmentEntry struct { //link.head and tail is nil when new a segmentEntry object. link *common.GenericSortedDList[*BlockEntry] *SegmentNode - segData data.Segment } type SegStat struct { @@ -167,7 +163,7 @@ func (s *SegStat) String(composeSortKey bool) string { ) } -func NewSegmentEntry(table *TableEntry, id *objectio.Segmentid, txn txnif.AsyncTxn, state EntryState, dataFactory SegmentDataFactory) *SegmentEntry { +func NewSegmentEntry(table *TableEntry, id *objectio.Segmentid, txn txnif.AsyncTxn, state EntryState) *SegmentEntry { e := &SegmentEntry{ ID: *id, BaseEntryImpl: NewBaseEntry( @@ -181,9 +177,6 @@ func NewSegmentEntry(table *TableEntry, id *objectio.Segmentid, txn txnif.AsyncT }, } e.CreateWithTxn(txn, &MetadataMVCCNode{}) - if dataFactory != nil { - e.segData = dataFactory(e) - } return e } @@ -548,15 +541,6 @@ func (entry *SegmentEntry) AsCommonID() *common.ID { func (entry *SegmentEntry) GetCatalog() *Catalog { return entry.table.db.catalog } -func (entry *SegmentEntry) InitData(factory DataFactory) { - if factory == nil { - return - } - dataFactory := factory.MakeSegmentFactory() - entry.segData = dataFactory(entry) -} -func (entry *SegmentEntry) GetSegmentData() data.Segment { return entry.segData } - func (entry *SegmentEntry) deleteEntryLocked(block *BlockEntry) error { if n, ok := entry.entries[block.ID]; !ok { return moerr.GetOkExpectedEOB() diff --git a/pkg/vm/engine/tae/catalog/table.go b/pkg/vm/engine/tae/catalog/table.go index 212385806fe83..fde48fbd3147b 100644 --- a/pkg/vm/engine/tae/catalog/table.go +++ b/pkg/vm/engine/tae/catalog/table.go @@ -185,8 +185,8 @@ func (entry *TableEntry) MakeSegmentIt(reverse bool) *common.GenericSortedDListI func (entry *TableEntry) CreateSegment( txn txnif.AsyncTxn, state EntryState, - dataFactory SegmentDataFactory, - opts *objectio.CreateSegOpt) (created *SegmentEntry, err error) { + opts *objectio.CreateSegOpt, +) (created *SegmentEntry, err error) { entry.Lock() defer entry.Unlock() var id *objectio.Segmentid @@ -195,7 +195,7 @@ func (entry *TableEntry) CreateSegment( } else { id = objectio.NewSegmentid() } - created = NewSegmentEntry(entry, id, txn, state, dataFactory) + created = NewSegmentEntry(entry, id, txn, state) entry.AddEntryLocked(created) return } diff --git a/pkg/vm/engine/tae/db/replay.go b/pkg/vm/engine/tae/db/replay.go index d392a9cad7ce1..51a515d427256 100644 --- a/pkg/vm/engine/tae/db/replay.go +++ b/pkg/vm/engine/tae/db/replay.go @@ -77,7 +77,6 @@ func (replayer *Replayer) PreReplayWal() { if dropCommit != nil && dropCommit.DeleteBefore(replayer.ckpedTS) { return moerr.GetOkStopCurrRecur() } - entry.InitData(replayer.DataFactory) return } if err := replayer.db.Catalog.RecurLoop(processor); err != nil { diff --git a/pkg/vm/engine/tae/db/test/db_test.go b/pkg/vm/engine/tae/db/test/db_test.go index 369486253a41a..f7bcac4d6d118 100644 --- a/pkg/vm/engine/tae/db/test/db_test.go +++ b/pkg/vm/engine/tae/db/test/db_test.go @@ -8053,7 +8053,6 @@ func TestDeduplication(t *testing.T) { seg, err := tbl.CreateSegment( txn, catalog.ES_Appendable, - dataFactory.MakeSegmentFactory(), new(objectio.CreateSegOpt).WithId(segmentIDs[0])) assert.NoError(t, err) blk, err := seg.CreateBlock(txn, catalog.ES_Appendable, dataFactory.MakeBlockFactory(), nil) diff --git a/pkg/vm/engine/tae/db/test/hidden_test.go b/pkg/vm/engine/tae/db/test/hidden_test.go index 65555697c67a7..9e6ee8c724907 100644 --- a/pkg/vm/engine/tae/db/test/hidden_test.go +++ b/pkg/vm/engine/tae/db/test/hidden_test.go @@ -25,6 +25,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/testutil" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/data" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tables" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/testutils" "github.com/stretchr/testify/assert" @@ -150,8 +151,9 @@ func TestHiddenWithPK1(t *testing.T) { assert.NoError(t, txn.Commit(context.Background())) { - seg := segMeta.GetSegmentData() - factory, taskType, scopes, err := seg.BuildCompactionTaskFactory() + factory, taskType, scopes, err := tables.BuildSegmentCompactionTaskFactory( + segMeta, tae.Runtime, + ) assert.NoError(t, err) task, err := tae.Runtime.Scheduler.ScheduleMultiScopedTxnTask(tasks.WaitableCtx, taskType, scopes, factory) assert.NoError(t, err) @@ -313,14 +315,14 @@ func TestHidden2(t *testing.T) { txn, rel = testutil.GetDefaultRelation(t, tae, schema.Name) { it := rel.MakeSegmentIt() - segs := make([]data.Segment, 0) + segs := make([]*catalog.SegmentEntry, 0) for it.Valid() { - seg := it.GetSegment().GetMeta().(*catalog.SegmentEntry).GetSegmentData() + seg := it.GetSegment().GetMeta().(*catalog.SegmentEntry) segs = append(segs, seg) it.Next() } for _, seg := range segs { - factory, taskType, scopes, err := seg.BuildCompactionTaskFactory() + factory, taskType, scopes, err := tables.BuildSegmentCompactionTaskFactory(seg, tae.Runtime) assert.NoError(t, err) if factory == nil { continue diff --git a/pkg/vm/engine/tae/iface/data/segment.go b/pkg/vm/engine/tae/iface/data/segment.go deleted file mode 100644 index 95f79ce67a200..0000000000000 --- a/pkg/vm/engine/tae/iface/data/segment.go +++ /dev/null @@ -1,27 +0,0 @@ -// Copyright 2021 Matrix Origin -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package data - -import ( - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/txnif" -) - -type Segment interface { - CheckpointUnit - GetID() uint64 - BatchDedup(txn txnif.AsyncTxn, pks containers.Vector) error - Destroy() error -} diff --git a/pkg/vm/engine/tae/tables/base.go b/pkg/vm/engine/tae/tables/base.go index 5ed952595955b..f9fccd1dca55a 100644 --- a/pkg/vm/engine/tae/tables/base.go +++ b/pkg/vm/engine/tae/tables/base.go @@ -19,7 +19,6 @@ import ( "fmt" "sync" "sync/atomic" - "time" "github.com/RoaringBitmap/roaring" "github.com/matrixorigin/matrixone/pkg/common/moerr" @@ -54,7 +53,6 @@ type baseBlock struct { rt *dbutils.Runtime meta *catalog.BlockEntry mvcc *updates.MVCCHandle - ttl time.Time impl data.Block node atomic.Pointer[Node] @@ -69,7 +67,6 @@ func newBaseBlock( impl: impl, rt: rt, meta: meta, - ttl: time.Now(), } blk.mvcc = updates.NewMVCCHandle(meta) blk.RWMutex = blk.mvcc.RWMutex @@ -205,25 +202,6 @@ func (blk *baseBlock) LoadPersistedCommitTS() (vec containers.Vector, err error) return } -// func (blk *baseBlock) LoadPersistedData() (bat *containers.Batch, err error) { -// schema := blk.meta.GetSchema() -// bat = containers.NewBatch() -// defer func() { -// if err != nil { -// bat.Close() -// } -// }() -// var vec containers.Vector -// for i, col := range schema.ColDefs { -// vec, err = blk.LoadPersistedColumnData(i) -// if err != nil { -// return -// } -// bat.AddVector(col.Name, vec) -// } -// return -// } - func (blk *baseBlock) LoadPersistedColumnData( ctx context.Context, schema *catalog.Schema, colIdx int, mp *mpool.MPool, ) (vec containers.Vector, err error) { diff --git a/pkg/vm/engine/tae/tables/factory.go b/pkg/vm/engine/tae/tables/factory.go index 6ce73b8f4c50d..9b797a852f787 100644 --- a/pkg/vm/engine/tae/tables/factory.go +++ b/pkg/vm/engine/tae/tables/factory.go @@ -40,12 +40,6 @@ func (factory *DataFactory) MakeTableFactory() catalog.TableDataFactory { } } -func (factory *DataFactory) MakeSegmentFactory() catalog.SegmentDataFactory { - return func(meta *catalog.SegmentEntry) data.Segment { - return newSegment(meta, factory.dir, factory.rt) - } -} - func (factory *DataFactory) MakeBlockFactory() catalog.BlockDataFactory { return func(meta *catalog.BlockEntry) data.Block { if meta.IsAppendable() { diff --git a/pkg/vm/engine/tae/tables/segment.go b/pkg/vm/engine/tae/tables/segment.go index ff0f4550f2311..405d3e6ac17b5 100644 --- a/pkg/vm/engine/tae/tables/segment.go +++ b/pkg/vm/engine/tae/tables/segment.go @@ -15,70 +15,37 @@ package tables import ( - "time" - - "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/dbutils" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/txnif" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tables/jobs" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks" ) -type dataSegment struct { - meta *catalog.SegmentEntry - rt *dbutils.Runtime -} - -func newSegment( - meta *catalog.SegmentEntry, dir string, rt *dbutils.Runtime, -) *dataSegment { - seg := &dataSegment{ - meta: meta, - rt: rt, +func BuildSegmentCompactionTaskFactory(meta *catalog.SegmentEntry, rt *dbutils.Runtime) ( + factory tasks.TxnTaskFactory, taskType tasks.TaskType, scopes []common.ID, err error, +) { + if !meta.IsAppendable() { + return } - return seg -} - -func (segment *dataSegment) Destroy() (err error) { - return -} - -func (segment *dataSegment) GetID() uint64 { panic("not support") } - -func (segment *dataSegment) BatchDedup(txn txnif.AsyncTxn, pks containers.Vector) (err error) { - return moerr.GetOkExpectedPossibleDup() -} - -func (segment *dataSegment) MutationInfo() string { return "" } - -func (segment *dataSegment) RunCalibration() int { return 0 } -func (segment *dataSegment) EstimateScore(interval time.Duration, force bool) int { return 0 } - -func (segment *dataSegment) BuildCompactionTaskFactory() (factory tasks.TxnTaskFactory, taskType tasks.TaskType, scopes []common.ID, err error) { - if segment.meta.IsAppendable() { - segment.meta.RLock() - dropped := segment.meta.HasDropCommittedLocked() - inTxn := segment.meta.IsCreatingOrAborted() - segment.meta.RUnlock() - if dropped || inTxn { - return - } - filter := catalog.NewComposedFilter() - filter.AddBlockFilter(catalog.NonAppendableBlkFilter) - filter.AddCommitFilter(catalog.ActiveWithNoTxnFilter) - blks := segment.meta.CollectBlockEntries(filter.FilteCommit, filter.FilteBlock) - if len(blks) < int(segment.meta.GetTable().GetLastestSchema().SegmentMaxBlocks) { - return - } - for _, blk := range blks { - scopes = append(scopes, *blk.AsCommonID()) - } - factory = jobs.CompactSegmentTaskFactory(blks, segment.rt) - taskType = tasks.DataCompactionTask + meta.RLock() + dropped := meta.HasDropCommittedLocked() + inTxn := meta.IsCreatingOrAborted() + meta.RUnlock() + if dropped || inTxn { + return + } + filter := catalog.NewComposedFilter() + filter.AddBlockFilter(catalog.NonAppendableBlkFilter) + filter.AddCommitFilter(catalog.ActiveWithNoTxnFilter) + blks := meta.CollectBlockEntries(filter.FilteCommit, filter.FilteBlock) + if len(blks) < int(meta.GetTable().GetLastestSchema().SegmentMaxBlocks) { return } + for _, blk := range blks { + scopes = append(scopes, *blk.AsCommonID()) + } + factory = jobs.CompactSegmentTaskFactory(blks, rt) + taskType = tasks.DataCompactionTask return } diff --git a/pkg/vm/engine/tae/tables/updates/chain_test.go b/pkg/vm/engine/tae/tables/updates/chain_test.go index e222531fa7eb3..1a8b8c7b2f554 100644 --- a/pkg/vm/engine/tae/tables/updates/chain_test.go +++ b/pkg/vm/engine/tae/tables/updates/chain_test.go @@ -46,7 +46,7 @@ func TestDeleteChain1(t *testing.T) { db, _ := c.CreateDBEntry("db", "", "", nil) table, _ := db.CreateTableEntry(schema, nil, nil) - seg, _ := table.CreateSegment(nil, catalog.ES_Appendable, nil, nil) + seg, _ := table.CreateSegment(nil, catalog.ES_Appendable, nil) blk, _ := seg.CreateBlock(nil, catalog.ES_Appendable, nil, nil) controller := NewMVCCHandle(blk) diff --git a/pkg/vm/engine/tae/tables/updates/command_test.go b/pkg/vm/engine/tae/tables/updates/command_test.go index cf0e7cf264f7c..6240d1e3d5888 100644 --- a/pkg/vm/engine/tae/tables/updates/command_test.go +++ b/pkg/vm/engine/tae/tables/updates/command_test.go @@ -35,7 +35,7 @@ func TestCompactBlockCmd(t *testing.T) { db, _ := c.CreateDBEntry("db", "", "", nil) table, _ := db.CreateTableEntry(schema, nil, nil) - seg, _ := table.CreateSegment(nil, catalog.ES_Appendable, nil, nil) + seg, _ := table.CreateSegment(nil, catalog.ES_Appendable, nil) blk, _ := seg.CreateBlock(nil, catalog.ES_Appendable, nil, nil) controller := NewMVCCHandle(blk) @@ -67,7 +67,7 @@ func TestDeleteNodeCmd(t *testing.T) { db, _ := c.CreateDBEntry("db", "", "", nil) table, _ := db.CreateTableEntry(schema, nil, nil) - seg, _ := table.CreateSegment(nil, catalog.ES_Appendable, nil, nil) + seg, _ := table.CreateSegment(nil, catalog.ES_Appendable, nil) blk, _ := seg.CreateBlock(nil, catalog.ES_Appendable, nil, nil) controller := NewMVCCHandle(blk) diff --git a/pkg/vm/engine/tae/tables/updates/mvcc.go b/pkg/vm/engine/tae/tables/updates/mvcc.go index 8d0dece9ee2f4..6e439c708a03c 100644 --- a/pkg/vm/engine/tae/tables/updates/mvcc.go +++ b/pkg/vm/engine/tae/tables/updates/mvcc.go @@ -52,8 +52,7 @@ func init() { type MVCCHandle struct { *sync.RWMutex - deletes atomic.Pointer[DeleteChain] - // deletes *DeleteChain + deletes atomic.Pointer[DeleteChain] meta *catalog.BlockEntry appends *txnbase.MVCCSlice[*AppendNode] changes atomic.Uint32 @@ -103,10 +102,6 @@ func (n *MVCCHandle) EstimateMemSizeLocked() (asize int, dsize int) { // *************** All deletes related APIs ***************** // ========================================================== -func (n *MVCCHandle) GetDeletesPersistedTSInMVCCChain() types.TS { - return n.persistedTS -} - func (n *MVCCHandle) UpgradeDeleteChainByTS(flushed types.TS) { n.Lock() if n.persistedTS.Equal(flushed) { diff --git a/pkg/vm/engine/tae/txn/txnimpl/command_test.go b/pkg/vm/engine/tae/txn/txnimpl/command_test.go index eeea573052dd9..06df3ccb941a6 100644 --- a/pkg/vm/engine/tae/txn/txnimpl/command_test.go +++ b/pkg/vm/engine/tae/txn/txnimpl/command_test.go @@ -46,7 +46,7 @@ func TestComposedCmd(t *testing.T) { assert.Nil(t, err) composed.AddCmd(tblCmd) - seg, _ := table.CreateSegment(nil, catalog.ES_Appendable, nil, nil) + seg, _ := table.CreateSegment(nil, catalog.ES_Appendable, nil) segCmd, err := seg.MakeCommand(1) assert.Nil(t, err) composed.AddCmd(segCmd) diff --git a/pkg/vm/engine/tae/txn/txnimpl/table.go b/pkg/vm/engine/tae/txn/txnimpl/table.go index 3676bd32ce588..27a68b72efd84 100644 --- a/pkg/vm/engine/tae/txn/txnimpl/table.go +++ b/pkg/vm/engine/tae/txn/txnimpl/table.go @@ -445,11 +445,7 @@ func (tbl *txnTable) CreateNonAppendableSegment(is1PC bool, opts *objectio.Creat func (tbl *txnTable) createSegment(state catalog.EntryState, is1PC bool, opts *objectio.CreateSegOpt) (seg handle.Segment, err error) { var meta *catalog.SegmentEntry - var factory catalog.SegmentDataFactory - if tbl.store.dataFactory != nil { - factory = tbl.store.dataFactory.MakeSegmentFactory() - } - if meta, err = tbl.entry.CreateSegment(tbl.store.txn, state, factory, opts); err != nil { + if meta, err = tbl.entry.CreateSegment(tbl.store.txn, state, opts); err != nil { return } seg = newSegment(tbl, meta) @@ -1259,15 +1255,6 @@ func (tbl *txnTable) DoPrecommitDedupByPK(pks containers.Vector, pksZM index.ZM) continue } } - segData := seg.GetSegmentData() - // TODO: Add a new batch dedup method later - if err = segData.BatchDedup(tbl.store.txn, pks); moerr.IsMoErrCode(err, moerr.ErrDuplicateEntry) { - return - } - if err == nil { - segIt.Next() - continue - } var shouldSkip bool err = nil blkIt := seg.MakeBlockIt(false) @@ -1344,7 +1331,6 @@ func (tbl *txnTable) DoPrecommitDedupByNode(ctx context.Context, node InsertNode continue } } - segData := seg.GetSegmentData() //TODO::load ZM/BF index first, then load PK column if necessary. if pks == nil { @@ -1356,14 +1342,6 @@ func (tbl *txnTable) DoPrecommitDedupByNode(ctx context.Context, node InsertNode pks = colV.Orphan() defer pks.Close() } - // TODO: Add a new batch dedup method later - if err = segData.BatchDedup(tbl.store.txn, pks); moerr.IsMoErrCode(err, moerr.ErrDuplicateEntry) { - return err - } - if err == nil { - segIt.Next() - continue - } var shouldSkip bool err = nil blkIt := seg.MakeBlockIt(false)