Skip to content

Commit

Permalink
remove segment data related code (matrixorigin#12933)
Browse files Browse the repository at this point in the history
Remove segment data related code

Approved by: @LeftHandCold
  • Loading branch information
XuPeng-SH authored Nov 23, 2023
1 parent 7909602 commit 91da863
Show file tree
Hide file tree
Showing 16 changed files with 40 additions and 174 deletions.
3 changes: 0 additions & 3 deletions pkg/vm/engine/tae/catalog/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ const (

type DataFactory interface {
MakeTableFactory() TableDataFactory
MakeSegmentFactory() SegmentDataFactory
MakeBlockFactory() BlockDataFactory
}

Expand Down Expand Up @@ -528,7 +527,6 @@ func (catalog *Catalog) onReplayUpdateSegment(
seg = NewReplaySegmentEntry()
seg.ID = *cmd.ID.SegmentID()
seg.table = tbl
seg.segData = dataFactory.MakeSegmentFactory()(seg)
seg.Insert(un)
seg.SegmentNode = cmd.node
tbl.AddEntryLocked(seg)
Expand Down Expand Up @@ -596,7 +594,6 @@ func (catalog *Catalog) onReplayCreateSegment(
seg.SegmentNode = segNode
seg.table = rel
seg.ID = *segid
seg.segData = dataFactory.MakeSegmentFactory()(seg)
rel.AddEntryLocked(seg)
un := &MVCCNode[*MetadataMVCCNode]{
EntryMVCCNode: &EntryMVCCNode{
Expand Down
2 changes: 1 addition & 1 deletion pkg/vm/engine/tae/catalog/catalog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ func TestSegment1(t *testing.T) {
schema.Name = tbName
tb, err := db.CreateTableEntry(schema, txn1, nil)
assert.Nil(t, err)
seg1, err := tb.CreateSegment(txn1, ES_Appendable, nil, nil)
seg1, err := tb.CreateSegment(txn1, ES_Appendable, nil)
assert.Nil(t, err)
err = txn1.Commit(context.Background())
assert.Nil(t, err)
Expand Down
18 changes: 1 addition & 17 deletions pkg/vm/engine/tae/catalog/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,10 @@ import (

"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/data"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/txnif"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/index"
)

type SegmentDataFactory = func(meta *SegmentEntry) data.Segment

type SegmentEntry struct {
ID objectio.Segmentid
Stat SegStat
Expand All @@ -44,7 +41,6 @@ type SegmentEntry struct {
//link.head and tail is nil when new a segmentEntry object.
link *common.GenericSortedDList[*BlockEntry]
*SegmentNode
segData data.Segment
}

type SegStat struct {
Expand Down Expand Up @@ -167,7 +163,7 @@ func (s *SegStat) String(composeSortKey bool) string {
)
}

func NewSegmentEntry(table *TableEntry, id *objectio.Segmentid, txn txnif.AsyncTxn, state EntryState, dataFactory SegmentDataFactory) *SegmentEntry {
func NewSegmentEntry(table *TableEntry, id *objectio.Segmentid, txn txnif.AsyncTxn, state EntryState) *SegmentEntry {
e := &SegmentEntry{
ID: *id,
BaseEntryImpl: NewBaseEntry(
Expand All @@ -181,9 +177,6 @@ func NewSegmentEntry(table *TableEntry, id *objectio.Segmentid, txn txnif.AsyncT
},
}
e.CreateWithTxn(txn, &MetadataMVCCNode{})
if dataFactory != nil {
e.segData = dataFactory(e)
}
return e
}

Expand Down Expand Up @@ -548,15 +541,6 @@ func (entry *SegmentEntry) AsCommonID() *common.ID {

func (entry *SegmentEntry) GetCatalog() *Catalog { return entry.table.db.catalog }

func (entry *SegmentEntry) InitData(factory DataFactory) {
if factory == nil {
return
}
dataFactory := factory.MakeSegmentFactory()
entry.segData = dataFactory(entry)
}
func (entry *SegmentEntry) GetSegmentData() data.Segment { return entry.segData }

func (entry *SegmentEntry) deleteEntryLocked(block *BlockEntry) error {
if n, ok := entry.entries[block.ID]; !ok {
return moerr.GetOkExpectedEOB()
Expand Down
6 changes: 3 additions & 3 deletions pkg/vm/engine/tae/catalog/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ func (entry *TableEntry) MakeSegmentIt(reverse bool) *common.GenericSortedDListI
func (entry *TableEntry) CreateSegment(
txn txnif.AsyncTxn,
state EntryState,
dataFactory SegmentDataFactory,
opts *objectio.CreateSegOpt) (created *SegmentEntry, err error) {
opts *objectio.CreateSegOpt,
) (created *SegmentEntry, err error) {
entry.Lock()
defer entry.Unlock()
var id *objectio.Segmentid
Expand All @@ -195,7 +195,7 @@ func (entry *TableEntry) CreateSegment(
} else {
id = objectio.NewSegmentid()
}
created = NewSegmentEntry(entry, id, txn, state, dataFactory)
created = NewSegmentEntry(entry, id, txn, state)
entry.AddEntryLocked(created)
return
}
Expand Down
1 change: 0 additions & 1 deletion pkg/vm/engine/tae/db/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ func (replayer *Replayer) PreReplayWal() {
if dropCommit != nil && dropCommit.DeleteBefore(replayer.ckpedTS) {
return moerr.GetOkStopCurrRecur()
}
entry.InitData(replayer.DataFactory)
return
}
if err := replayer.db.Catalog.RecurLoop(processor); err != nil {
Expand Down
1 change: 0 additions & 1 deletion pkg/vm/engine/tae/db/test/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8053,7 +8053,6 @@ func TestDeduplication(t *testing.T) {
seg, err := tbl.CreateSegment(
txn,
catalog.ES_Appendable,
dataFactory.MakeSegmentFactory(),
new(objectio.CreateSegOpt).WithId(segmentIDs[0]))
assert.NoError(t, err)
blk, err := seg.CreateBlock(txn, catalog.ES_Appendable, dataFactory.MakeBlockFactory(), nil)
Expand Down
12 changes: 7 additions & 5 deletions pkg/vm/engine/tae/db/test/hidden_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/testutil"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/data"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tables"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/testutils"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -150,8 +151,9 @@ func TestHiddenWithPK1(t *testing.T) {

assert.NoError(t, txn.Commit(context.Background()))
{
seg := segMeta.GetSegmentData()
factory, taskType, scopes, err := seg.BuildCompactionTaskFactory()
factory, taskType, scopes, err := tables.BuildSegmentCompactionTaskFactory(
segMeta, tae.Runtime,
)
assert.NoError(t, err)
task, err := tae.Runtime.Scheduler.ScheduleMultiScopedTxnTask(tasks.WaitableCtx, taskType, scopes, factory)
assert.NoError(t, err)
Expand Down Expand Up @@ -313,14 +315,14 @@ func TestHidden2(t *testing.T) {
txn, rel = testutil.GetDefaultRelation(t, tae, schema.Name)
{
it := rel.MakeSegmentIt()
segs := make([]data.Segment, 0)
segs := make([]*catalog.SegmentEntry, 0)
for it.Valid() {
seg := it.GetSegment().GetMeta().(*catalog.SegmentEntry).GetSegmentData()
seg := it.GetSegment().GetMeta().(*catalog.SegmentEntry)
segs = append(segs, seg)
it.Next()
}
for _, seg := range segs {
factory, taskType, scopes, err := seg.BuildCompactionTaskFactory()
factory, taskType, scopes, err := tables.BuildSegmentCompactionTaskFactory(seg, tae.Runtime)
assert.NoError(t, err)
if factory == nil {
continue
Expand Down
27 changes: 0 additions & 27 deletions pkg/vm/engine/tae/iface/data/segment.go

This file was deleted.

22 changes: 0 additions & 22 deletions pkg/vm/engine/tae/tables/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/RoaringBitmap/roaring"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
Expand Down Expand Up @@ -54,7 +53,6 @@ type baseBlock struct {
rt *dbutils.Runtime
meta *catalog.BlockEntry
mvcc *updates.MVCCHandle
ttl time.Time
impl data.Block

node atomic.Pointer[Node]
Expand All @@ -69,7 +67,6 @@ func newBaseBlock(
impl: impl,
rt: rt,
meta: meta,
ttl: time.Now(),
}
blk.mvcc = updates.NewMVCCHandle(meta)
blk.RWMutex = blk.mvcc.RWMutex
Expand Down Expand Up @@ -205,25 +202,6 @@ func (blk *baseBlock) LoadPersistedCommitTS() (vec containers.Vector, err error)
return
}

// func (blk *baseBlock) LoadPersistedData() (bat *containers.Batch, err error) {
// schema := blk.meta.GetSchema()
// bat = containers.NewBatch()
// defer func() {
// if err != nil {
// bat.Close()
// }
// }()
// var vec containers.Vector
// for i, col := range schema.ColDefs {
// vec, err = blk.LoadPersistedColumnData(i)
// if err != nil {
// return
// }
// bat.AddVector(col.Name, vec)
// }
// return
// }

func (blk *baseBlock) LoadPersistedColumnData(
ctx context.Context, schema *catalog.Schema, colIdx int, mp *mpool.MPool,
) (vec containers.Vector, err error) {
Expand Down
6 changes: 0 additions & 6 deletions pkg/vm/engine/tae/tables/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,6 @@ func (factory *DataFactory) MakeTableFactory() catalog.TableDataFactory {
}
}

func (factory *DataFactory) MakeSegmentFactory() catalog.SegmentDataFactory {
return func(meta *catalog.SegmentEntry) data.Segment {
return newSegment(meta, factory.dir, factory.rt)
}
}

func (factory *DataFactory) MakeBlockFactory() catalog.BlockDataFactory {
return func(meta *catalog.BlockEntry) data.Block {
if meta.IsAppendable() {
Expand Down
77 changes: 22 additions & 55 deletions pkg/vm/engine/tae/tables/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,70 +15,37 @@
package tables

import (
"time"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/dbutils"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/txnif"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tables/jobs"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks"
)

type dataSegment struct {
meta *catalog.SegmentEntry
rt *dbutils.Runtime
}

func newSegment(
meta *catalog.SegmentEntry, dir string, rt *dbutils.Runtime,
) *dataSegment {
seg := &dataSegment{
meta: meta,
rt: rt,
func BuildSegmentCompactionTaskFactory(meta *catalog.SegmentEntry, rt *dbutils.Runtime) (
factory tasks.TxnTaskFactory, taskType tasks.TaskType, scopes []common.ID, err error,
) {
if !meta.IsAppendable() {
return
}
return seg
}

func (segment *dataSegment) Destroy() (err error) {
return
}

func (segment *dataSegment) GetID() uint64 { panic("not support") }

func (segment *dataSegment) BatchDedup(txn txnif.AsyncTxn, pks containers.Vector) (err error) {
return moerr.GetOkExpectedPossibleDup()
}

func (segment *dataSegment) MutationInfo() string { return "" }

func (segment *dataSegment) RunCalibration() int { return 0 }
func (segment *dataSegment) EstimateScore(interval time.Duration, force bool) int { return 0 }

func (segment *dataSegment) BuildCompactionTaskFactory() (factory tasks.TxnTaskFactory, taskType tasks.TaskType, scopes []common.ID, err error) {
if segment.meta.IsAppendable() {
segment.meta.RLock()
dropped := segment.meta.HasDropCommittedLocked()
inTxn := segment.meta.IsCreatingOrAborted()
segment.meta.RUnlock()
if dropped || inTxn {
return
}
filter := catalog.NewComposedFilter()
filter.AddBlockFilter(catalog.NonAppendableBlkFilter)
filter.AddCommitFilter(catalog.ActiveWithNoTxnFilter)
blks := segment.meta.CollectBlockEntries(filter.FilteCommit, filter.FilteBlock)
if len(blks) < int(segment.meta.GetTable().GetLastestSchema().SegmentMaxBlocks) {
return
}
for _, blk := range blks {
scopes = append(scopes, *blk.AsCommonID())
}
factory = jobs.CompactSegmentTaskFactory(blks, segment.rt)
taskType = tasks.DataCompactionTask
meta.RLock()
dropped := meta.HasDropCommittedLocked()
inTxn := meta.IsCreatingOrAborted()
meta.RUnlock()
if dropped || inTxn {
return
}
filter := catalog.NewComposedFilter()
filter.AddBlockFilter(catalog.NonAppendableBlkFilter)
filter.AddCommitFilter(catalog.ActiveWithNoTxnFilter)
blks := meta.CollectBlockEntries(filter.FilteCommit, filter.FilteBlock)
if len(blks) < int(meta.GetTable().GetLastestSchema().SegmentMaxBlocks) {
return
}
for _, blk := range blks {
scopes = append(scopes, *blk.AsCommonID())
}
factory = jobs.CompactSegmentTaskFactory(blks, rt)
taskType = tasks.DataCompactionTask
return
}
2 changes: 1 addition & 1 deletion pkg/vm/engine/tae/tables/updates/chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestDeleteChain1(t *testing.T) {

db, _ := c.CreateDBEntry("db", "", "", nil)
table, _ := db.CreateTableEntry(schema, nil, nil)
seg, _ := table.CreateSegment(nil, catalog.ES_Appendable, nil, nil)
seg, _ := table.CreateSegment(nil, catalog.ES_Appendable, nil)
blk, _ := seg.CreateBlock(nil, catalog.ES_Appendable, nil, nil)

controller := NewMVCCHandle(blk)
Expand Down
4 changes: 2 additions & 2 deletions pkg/vm/engine/tae/tables/updates/command_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestCompactBlockCmd(t *testing.T) {

db, _ := c.CreateDBEntry("db", "", "", nil)
table, _ := db.CreateTableEntry(schema, nil, nil)
seg, _ := table.CreateSegment(nil, catalog.ES_Appendable, nil, nil)
seg, _ := table.CreateSegment(nil, catalog.ES_Appendable, nil)
blk, _ := seg.CreateBlock(nil, catalog.ES_Appendable, nil, nil)

controller := NewMVCCHandle(blk)
Expand Down Expand Up @@ -67,7 +67,7 @@ func TestDeleteNodeCmd(t *testing.T) {

db, _ := c.CreateDBEntry("db", "", "", nil)
table, _ := db.CreateTableEntry(schema, nil, nil)
seg, _ := table.CreateSegment(nil, catalog.ES_Appendable, nil, nil)
seg, _ := table.CreateSegment(nil, catalog.ES_Appendable, nil)
blk, _ := seg.CreateBlock(nil, catalog.ES_Appendable, nil, nil)

controller := NewMVCCHandle(blk)
Expand Down
7 changes: 1 addition & 6 deletions pkg/vm/engine/tae/tables/updates/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ func init() {

type MVCCHandle struct {
*sync.RWMutex
deletes atomic.Pointer[DeleteChain]
// deletes *DeleteChain
deletes atomic.Pointer[DeleteChain]
meta *catalog.BlockEntry
appends *txnbase.MVCCSlice[*AppendNode]
changes atomic.Uint32
Expand Down Expand Up @@ -103,10 +102,6 @@ func (n *MVCCHandle) EstimateMemSizeLocked() (asize int, dsize int) {
// *************** All deletes related APIs *****************
// ==========================================================

func (n *MVCCHandle) GetDeletesPersistedTSInMVCCChain() types.TS {
return n.persistedTS
}

func (n *MVCCHandle) UpgradeDeleteChainByTS(flushed types.TS) {
n.Lock()
if n.persistedTS.Equal(flushed) {
Expand Down
Loading

0 comments on commit 91da863

Please sign in to comment.