Skip to content

Commit

Permalink
disttae, logtailreplay: fix missing Vector.Free and naming fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
reusee committed Jul 24, 2024
1 parent d5dcae8 commit 0b11b6a
Show file tree
Hide file tree
Showing 9 changed files with 229 additions and 135 deletions.
37 changes: 19 additions & 18 deletions pkg/vm/engine/disttae/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
func (e *Engine) init(ctx context.Context) error {
e.Lock()
defer e.Unlock()
m := e.mp
m := e.memoryPool

e.catalog = cache.NewCatalog()
e.partitions = make(map[[2]uint64]*logtailreplay.Partition)
Expand Down Expand Up @@ -70,7 +70,7 @@ func (e *Engine) init(ctx context.Context) error {
return err
}
state, done := part.MutateState()
state.HandleRowsInsert(ctx, ibat, MO_PRIMARY_OFF, packer)
state.HandleRowsInsert(ctx, ibat, MO_PRIMARY_OFF, packer, m)
done()
e.catalog.InsertDatabase(bat)
bat.Clean(m)
Expand All @@ -97,7 +97,7 @@ func (e *Engine) init(ctx context.Context) error {
return err
}
state, done := part.MutateState()
state.HandleRowsInsert(ctx, ibat, MO_PRIMARY_OFF+catalog.MO_TABLES_REL_ID_IDX, packer)
state.HandleRowsInsert(ctx, ibat, MO_PRIMARY_OFF+catalog.MO_TABLES_REL_ID_IDX, packer, m)
done()
e.catalog.InsertTable(bat)
bat.Clean(m)
Expand Down Expand Up @@ -131,7 +131,7 @@ func (e *Engine) init(ctx context.Context) error {
return err
}
state, done = part.MutateState()
state.HandleRowsInsert(ctx, ibat, MO_PRIMARY_OFF+catalog.MO_COLUMNS_ATT_UNIQ_NAME_IDX, packer)
state.HandleRowsInsert(ctx, ibat, MO_PRIMARY_OFF+catalog.MO_COLUMNS_ATT_UNIQ_NAME_IDX, packer, m)
done()
e.catalog.InsertColumns(bat)
bat.Clean(m)
Expand All @@ -157,7 +157,7 @@ func (e *Engine) init(ctx context.Context) error {
return err
}
state, done := part.MutateState()
state.HandleRowsInsert(ctx, ibat, MO_PRIMARY_OFF+catalog.MO_TABLES_REL_ID_IDX, packer)
state.HandleRowsInsert(ctx, ibat, MO_PRIMARY_OFF+catalog.MO_TABLES_REL_ID_IDX, packer, m)
done()
e.catalog.InsertTable(bat)
bat.Clean(m)
Expand Down Expand Up @@ -191,7 +191,7 @@ func (e *Engine) init(ctx context.Context) error {
return err
}
state, done = part.MutateState()
state.HandleRowsInsert(ctx, ibat, MO_PRIMARY_OFF+catalog.MO_COLUMNS_ATT_UNIQ_NAME_IDX, packer)
state.HandleRowsInsert(ctx, ibat, MO_PRIMARY_OFF+catalog.MO_COLUMNS_ATT_UNIQ_NAME_IDX, packer, m)
done()
e.catalog.InsertColumns(bat)
bat.Clean(m)
Expand All @@ -217,7 +217,7 @@ func (e *Engine) init(ctx context.Context) error {
return err
}
state, done := part.MutateState()
state.HandleRowsInsert(ctx, ibat, MO_PRIMARY_OFF+catalog.MO_TABLES_REL_ID_IDX, packer)
state.HandleRowsInsert(ctx, ibat, MO_PRIMARY_OFF+catalog.MO_TABLES_REL_ID_IDX, packer, m)
done()
e.catalog.InsertTable(bat)
bat.Clean(m)
Expand Down Expand Up @@ -251,7 +251,7 @@ func (e *Engine) init(ctx context.Context) error {
return err
}
state, done = part.MutateState()
state.HandleRowsInsert(ctx, ibat, MO_PRIMARY_OFF+catalog.MO_COLUMNS_ATT_UNIQ_NAME_IDX, packer)
state.HandleRowsInsert(ctx, ibat, MO_PRIMARY_OFF+catalog.MO_COLUMNS_ATT_UNIQ_NAME_IDX, packer, m)
done()
e.catalog.InsertColumns(bat)
bat.Clean(m)
Expand Down Expand Up @@ -282,8 +282,8 @@ func (e *Engine) loadSnapCkpForTable(
tblName,
did,
dbName,
e.mp,
e.fs,
e.memoryPool,
e.fileService,
)
if err != nil {
return err
Expand Down Expand Up @@ -317,7 +317,7 @@ func (e *Engine) getOrCreateSnapCatalogCache(
snapCata := cache.NewCatalog()
//TODO:: insert mo_tables, or mo_colunms, or mo_database, mo_catalog into snapCata.
// ref to engine.init.
ckps, err := checkpoint.ListSnapshotCheckpoint(ctx, e.service, e.fs, ts, 0, nil)
ckps, err := checkpoint.ListSnapshotCheckpoint(ctx, e.service, e.fileService, ts, 0, nil)
if ckps == nil {
return nil, moerr.NewInternalErrorNoCtx("No checkpoints for snapshot read")
}
Expand Down Expand Up @@ -430,7 +430,7 @@ func (e *Engine) getOrCreateSnapPart(
snap := logtailreplay.NewPartition(e.service)
//TODO::if tableId is mo_tables, or mo_colunms, or mo_database,
// we should init the partition,ref to engine.init
ckps, err := checkpoint.ListSnapshotCheckpoint(ctx, e.service, e.fs, ts, tbl.tableId, nil)
ckps, err := checkpoint.ListSnapshotCheckpoint(ctx, e.service, e.fileService, ts, tbl.tableId, nil)
if err != nil {
return nil, err
}
Expand All @@ -449,8 +449,8 @@ func (e *Engine) getOrCreateSnapPart(
tbl.tableName,
tbl.db.databaseId,
tbl.db.databaseName,
e.mp,
e.fs)
e.memoryPool,
e.fileService)
if err != nil {
return err
}
Expand All @@ -466,7 +466,8 @@ func (e *Engine) getOrCreateSnapPart(
e,
nil,
state,
entry); err != nil {
entry,
); err != nil {
return err
}
}
Expand Down Expand Up @@ -541,8 +542,8 @@ func (e *Engine) LazyLoadLatestCkp(
tbl.tableName,
tbl.db.databaseId,
tbl.db.databaseName,
tbl.getTxn().engine.mp,
tbl.getTxn().engine.fs)
tbl.getTxn().engine.memoryPool,
tbl.getTxn().engine.fileService)
if err != nil {
return err
}
Expand All @@ -569,5 +570,5 @@ func (e *Engine) UpdateOfPush(
ctx context.Context,
databaseId,
tableId uint64, ts timestamp.Timestamp) error {
return e.pClient.TryToSubscribeTable(ctx, databaseId, tableId)
return e.pushClient.TryToSubscribeTable(ctx, databaseId, tableId)
}
60 changes: 30 additions & 30 deletions pkg/vm/engine/disttae/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,14 @@ func New(
}

e := &Engine{
service: service,
mp: mp,
fs: fs,
ls: ls.(lockservice.LockService),
hakeeper: hakeeper,
cli: cli,
idGen: hakeeper,
tnID: tnID,
service: service,
memoryPool: mp,
fileService: fs,
lockService: ls.(lockservice.LockService),
hakeeper: hakeeper,
txnClient: cli,
idGenerator: hakeeper,
tnID: tnID,
packerPool: fileservice.NewPool(
128,
func() *types.Packer {
Expand Down Expand Up @@ -134,7 +134,7 @@ func New(
panic(err)
}

e.pClient.LogtailRPCClientFactory = DefaultNewRpcStreamToTnLogTailService
e.pushClient.LogtailRPCClientFactory = DefaultNewRpcStreamToTnLogTailService
return e
}

Expand Down Expand Up @@ -492,7 +492,7 @@ func (e *Engine) GetRelationById(ctx context.Context, op client.TxnOperator, tab
}

func (e *Engine) AllocateIDByKey(ctx context.Context, key string) (uint64, error) {
return e.idGen.AllocateIDByKey(ctx, key)
return e.idGenerator.AllocateIDByKey(ctx, key)
}

func (e *Engine) Delete(ctx context.Context, name string, op client.TxnOperator) (err error) {
Expand Down Expand Up @@ -586,14 +586,14 @@ func (e *Engine) New(ctx context.Context, op client.TxnOperator) error {
logDebugf(op.Txn(), "Engine.New")
proc := process.New(
ctx,
e.mp,
e.cli,
e.memoryPool,
e.txnClient,
op,
e.fs,
e.ls,
e.qc,
e.fileService,
e.lockService,
e.queryClient,
e.hakeeper,
e.us,
e.udfService,
nil,
)

Expand All @@ -604,7 +604,7 @@ func (e *Engine) New(ctx context.Context, op client.TxnOperator) error {
proc: proc,
engine: e,
//meta: op.TxnRef(),
idGen: e.idGen,
idGen: e.idGenerator,
tnStores: e.getTNServices(),
tableCache: struct {
cachedIndex int
Expand All @@ -631,7 +631,7 @@ func (e *Engine) New(ctx context.Context, op client.TxnOperator) error {
cnBlkId_Pos: map[types.Blockid]Pos{},
batchSelectList: make(map[*batch.Batch][]int64),
toFreeBatches: make(map[tableKey][]*batch.Batch),
syncCommittedTSCount: e.cli.GetSyncLatestCommitTSTimes(),
syncCommittedTSCount: e.txnClient.GetSyncLatestCommitTSTimes(),
}

txn.blockId_tn_delete_metaLoc_batch = struct {
Expand All @@ -644,7 +644,7 @@ func (e *Engine) New(ctx context.Context, op client.TxnOperator) error {
colexec.Get().PutCnSegment(id, colexec.TxnWorkSpaceIdType)
op.AddWorkspace(txn)

e.pClient.validLogTailMustApplied(txn.op.SnapshotTS())
e.pushClient.validLogTailMustApplied(txn.op.SnapshotTS())
return nil
}

Expand Down Expand Up @@ -738,7 +738,7 @@ func (e *Engine) NewBlockReader(ctx context.Context, num int, ts timestamp.Times
//FIXME::why set blk.EntryState = false ?
blk.EntryState = false
rds[i] = newBlockReader(
ctx, tblDef, ts, []*objectio.BlockInfo{blk}, expr, blockReadPKFilter, e.fs, proc.(*process.Process),
ctx, tblDef, ts, []*objectio.BlockInfo{blk}, expr, blockReadPKFilter, e.fileService, proc.(*process.Process),
)
}
for j := len(blkInfos); j < num; j++ {
Expand All @@ -748,7 +748,7 @@ func (e *Engine) NewBlockReader(ctx context.Context, num int, ts timestamp.Times
}

infos, steps := groupBlocksToObjects(blkInfos, num)
fs, err := fileservice.Get[fileservice.FileService](e.fs, defines.SharedFileServiceName)
fs, err := fileservice.Get[fileservice.FileService](e.fileService, defines.SharedFileServiceName)
if err != nil {
return nil, err
}
Expand All @@ -770,25 +770,25 @@ func (e *Engine) setPushClientStatus(ready bool) {
defer e.Unlock()

if ready {
e.cli.Resume()
e.txnClient.Resume()
} else {
e.cli.Pause()
e.txnClient.Pause()
}

e.pClient.receivedLogTailTime.ready.Store(ready)
if e.pClient.subscriber != nil {
e.pushClient.receivedLogTailTime.ready.Store(ready)
if e.pushClient.subscriber != nil {
if ready {
e.pClient.subscriber.setReady()
e.pushClient.subscriber.setReady()
} else {
e.pClient.subscriber.setNotReady()
e.pushClient.subscriber.setNotReady()
}
}
}

func (e *Engine) abortAllRunningTxn() {
e.Lock()
defer e.Unlock()
e.cli.AbortAllRunningTxn()
e.txnClient.AbortAllRunningTxn()
}

func (e *Engine) cleanMemoryTableWithTable(dbId, tblId uint64) {
Expand All @@ -806,7 +806,7 @@ func (e *Engine) cleanMemoryTableWithTable(dbId, tblId uint64) {
}

func (e *Engine) PushClient() *PushClient {
return &e.pClient
return &e.pushClient
}

// TryToSubscribeTable implements the LogtailEngine interface.
Expand All @@ -828,5 +828,5 @@ func (e *Engine) GetMessageCenter() any {
}

func (e *Engine) FS() fileservice.FileService {
return e.fs
return e.fileService
}
2 changes: 1 addition & 1 deletion pkg/vm/engine/disttae/logtail.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func consumeEntry(

if state != nil {
t0 := time.Now()
state.HandleLogtailEntry(ctx, engine.fs, e, primarySeqnum, packer)
state.HandleLogtailEntry(ctx, engine.fileService, e, primarySeqnum, packer, engine.memoryPool)
v2.LogtailUpdatePartitonConsumeLogtailOneEntryLogtailReplayDurationHistogram.Observe(time.Since(t0).Seconds())
}

Expand Down
Loading

0 comments on commit 0b11b6a

Please sign in to comment.