Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor tn migration 7 #20821

Merged
merged 12 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions pkg/vm/engine/tae/db/checkpoint/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ func NewCheckpointEntry(sid string, start, end types.TS, typ EntryType) *Checkpo
}
}

// e.start >= o.end
func (e *CheckpointEntry) AllGE(o *CheckpointEntry) bool {
return e.start.GE(&o.end)
}

func (e *CheckpointEntry) SetVersion(version uint32) {
e.Lock()
defer e.Unlock()
Expand Down Expand Up @@ -111,8 +116,8 @@ func (e *CheckpointEntry) HasOverlap(from, to types.TS) bool {
}
return true
}
func (e *CheckpointEntry) LessEq(ts types.TS) bool {
return e.end.LE(&ts)
func (e *CheckpointEntry) LessEq(ts *types.TS) bool {
return e.end.LE(ts)
}
func (e *CheckpointEntry) SetLocation(cn, tn objectio.Location) {
e.Lock()
Expand Down
288 changes: 39 additions & 249 deletions pkg/vm/engine/tae/db/checkpoint/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,43 @@ package checkpoint

import (
"context"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/wal"
"time"

"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail"

"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers"
)

type RunnerWriter interface {
RemoveCheckpointMetaFile(string)
AddCheckpointMetaFile(string)
UpdateCompacted(entry *CheckpointEntry)
}

type RunnerReader interface {
String() string
GetAllIncrementalCheckpoints() []*CheckpointEntry
GetAllGlobalCheckpoints() []*CheckpointEntry
GetPenddingIncrementalCount() int
GetGlobalCheckpointCount() int
CollectCheckpointsInRange(ctx context.Context, start, end types.TS) (ckpLoc string, lastEnd types.TS, err error)
ICKPSeekLT(ts types.TS, cnt int) []*CheckpointEntry
MaxGlobalCheckpoint() *CheckpointEntry
GetLowWaterMark() types.TS
MaxLSN() uint64
GetCatalog() *catalog.Catalog
GetCheckpointMetaFiles() map[string]struct{}
RemoveCheckpointMetaFile(string)
AddCheckpointMetaFile(string)
ICKPRange(start, end *types.TS, cnt int) []*CheckpointEntry
GetCompacted() *CheckpointEntry
UpdateCompacted(entry *CheckpointEntry)
GetDriver() wal.Driver

// for test, delete in next phase
GetAllCheckpoints() []*CheckpointEntry
GetAllCheckpointsForBackup(compact *CheckpointEntry) []*CheckpointEntry

MaxGlobalCheckpoint() *CheckpointEntry
MaxIncrementalCheckpoint() *CheckpointEntry
GetDirtyCollector() logtail.Collector
}

func (r *runner) collectCheckpointMetadata(start, end types.TS, ckpLSN, truncateLSN uint64) *containers.Batch {
Expand Down Expand Up @@ -80,301 +90,81 @@ func (r *runner) collectCheckpointMetadata(start, end types.TS, ckpLSN, truncate
return bat
}
func (r *runner) GetAllIncrementalCheckpoints() []*CheckpointEntry {
r.storage.Lock()
snapshot := r.storage.incrementals.Copy()
r.storage.Unlock()
return snapshot.Items()
return r.store.GetAllIncrementalCheckpoints()
}

func (r *runner) GetAllGlobalCheckpoints() []*CheckpointEntry {
r.storage.Lock()
snapshot := r.storage.globals.Copy()
r.storage.Unlock()
return snapshot.Items()
return r.store.GetAllGlobalCheckpoints()
}

func (r *runner) MaxLSN() uint64 {
endTs := types.BuildTS(time.Now().UTC().UnixNano(), 0)
return r.source.GetMaxLSN(types.TS{}, endTs)
}

func (r *runner) MaxLSNInRange(end types.TS) uint64 {
return r.source.GetMaxLSN(types.TS{}, end)
}

func (r *runner) MaxGlobalCheckpoint() *CheckpointEntry {
r.storage.RLock()
defer r.storage.RUnlock()
global, _ := r.storage.globals.Max()
return global
return r.store.MaxGlobalCheckpoint()
}

func (r *runner) MaxIncrementalCheckpoint() *CheckpointEntry {
r.storage.RLock()
defer r.storage.RUnlock()
entry, _ := r.storage.incrementals.Max()
return entry
return r.store.MaxIncrementalCheckpoint()
}

func (r *runner) GetCatalog() *catalog.Catalog {
return r.catalog
}

func (r *runner) ICKPSeekLT(ts types.TS, cnt int) []*CheckpointEntry {
r.storage.Lock()
tree := r.storage.incrementals.Copy()
r.storage.Unlock()
it := tree.Iter()
ok := it.Seek(NewCheckpointEntry(r.rt.SID(), ts, ts, ET_Incremental))
incrementals := make([]*CheckpointEntry, 0)
if ok {
for len(incrementals) < cnt {
e := it.Item()
if !e.IsFinished() {
break
}
if e.start.LT(&ts) {
if !it.Next() {
break
}
continue
}
incrementals = append(incrementals, e)
if !it.Next() {
break
}
}
}
return incrementals
return r.store.ICKPSeekLT(ts, cnt)
}

func (r *runner) ICKPRange(start, end *types.TS, cnt int) []*CheckpointEntry {
r.storage.Lock()
tree := r.storage.incrementals.Copy()
r.storage.Unlock()
it := tree.Iter()
ok := it.Seek(NewCheckpointEntry(r.rt.SID(), *start, *start, ET_Incremental))
incrementals := make([]*CheckpointEntry, 0)
if ok {
for len(incrementals) < cnt {
e := it.Item()
if !e.IsFinished() {
break
}
if e.start.GE(start) && e.start.LT(end) {
incrementals = append(incrementals, e)
}
if !it.Next() {
break
}
}
}
return incrementals
return r.store.ICKPRange(start, end, cnt)
}

func (r *runner) GetCompacted() *CheckpointEntry {
r.storage.Lock()
defer r.storage.Unlock()
return r.storage.compacted.Load()
return r.store.GetCompacted()
}

func (r *runner) UpdateCompacted(entry *CheckpointEntry) {
r.storage.Lock()
defer r.storage.Unlock()
r.storage.compacted.Store(entry)
r.store.UpdateCompacted(entry)
}

// this API returns the min ts of all checkpoints
// the start of global checkpoint is always 0
func (r *runner) GetLowWaterMark() types.TS {
r.storage.RLock()
defer r.storage.RUnlock()
global, okG := r.storage.globals.Min()
incremental, okI := r.storage.incrementals.Min()
if !okG && !okI {
return types.TS{}
}
if !okG {
return incremental.start
}
if !okI {
return global.start
}
if global.end.LT(&incremental.start) {
return global.end
}
return incremental.start
return r.store.GetLowWaterMark()
}

func (r *runner) GetPenddingIncrementalCount() int {
entries := r.GetAllIncrementalCheckpoints()
global := r.MaxGlobalCheckpoint()

count := 0
for i := len(entries) - 1; i >= 0; i-- {
if global != nil && entries[i].end.LE(&global.end) {
break
}
if !entries[i].IsFinished() {
continue
}
count++
}
return count
return r.store.GetPenddingIncrementalCount()
}

func (r *runner) GetGlobalCheckpointCount() int {
r.storage.RLock()
defer r.storage.RUnlock()
return r.storage.globals.Len()
}

func (r *runner) getLastFinishedGlobalCheckpointLocked() *CheckpointEntry {
g, ok := r.storage.globals.Max()
if !ok {
return nil
}
if g.IsFinished() {
return g
}
it := r.storage.globals.Iter()
it.Seek(g)
defer it.Release()
if !it.Prev() {
return nil
}
return it.Item()
return r.store.GetGlobalCheckpointCount()
}

func (r *runner) GetAllCheckpoints() []*CheckpointEntry {
ckps := make([]*CheckpointEntry, 0)
var ts types.TS
r.storage.Lock()
g := r.getLastFinishedGlobalCheckpointLocked()
tree := r.storage.incrementals.Copy()
r.storage.Unlock()
if g != nil {
ts = g.GetEnd()
ckps = append(ckps, g)
}
pivot := NewCheckpointEntry(r.rt.SID(), ts.Next(), ts.Next(), ET_Incremental)
iter := tree.Iter()
defer iter.Release()
if ok := iter.Seek(pivot); ok {
for {
e := iter.Item()
if !e.IsFinished() {
break
}
ckps = append(ckps, e)
if !iter.Next() {
break
}
}
}
return ckps
return r.store.GetAllCheckpoints()
}

func (r *runner) GetAllCheckpointsForBackup(compact *CheckpointEntry) []*CheckpointEntry {
ckps := make([]*CheckpointEntry, 0)
var ts types.TS
if compact != nil {
ts = compact.GetEnd()
ckps = append(ckps, compact)
}
r.storage.Lock()
g := r.getLastFinishedGlobalCheckpointLocked()
tree := r.storage.incrementals.Copy()
r.storage.Unlock()
if g != nil {
if ts.IsEmpty() {
ts = g.GetEnd()
}
ckps = append(ckps, g)
}
pivot := NewCheckpointEntry(r.rt.SID(), ts.Next(), ts.Next(), ET_Incremental)
iter := tree.Iter()
defer iter.Release()
if ok := iter.Seek(pivot); ok {
for {
e := iter.Item()
if !e.IsFinished() {
break
}
ckps = append(ckps, e)
if !iter.Next() {
break
}
}
}
return ckps

return r.store.GetAllCheckpointsForBackup(compact)
}

func (r *runner) GCByTS(ctx context.Context, ts types.TS) error {
prev := r.gcTS.Load()
if prev == nil {
r.gcTS.Store(ts)
} else {
prevTS := prev.(types.TS)
if prevTS.LT(&ts) {
r.gcTS.Store(ts)
}
}
logutil.Debugf("GC %v", ts.ToString())
r.gcCheckpointQueue.Enqueue(struct{}{})
return nil
}

func (r *runner) getGCTS() types.TS {
prev := r.gcTS.Load()
if prev == nil {
return types.TS{}
}
return prev.(types.TS)
}

func (r *runner) getGCedTS() types.TS {
r.storage.RLock()
minGlobal, _ := r.storage.globals.Min()
minIncremental, _ := r.storage.incrementals.Min()
r.storage.RUnlock()
if minGlobal == nil {
return types.TS{}
}
if minIncremental == nil {
return minGlobal.end
}
if minIncremental.start.GE(&minGlobal.end) {
return minGlobal.end
}
return minIncremental.start
}

func (r *runner) getTSToGC() types.TS {
maxGlobal := r.MaxGlobalCheckpoint()
if maxGlobal == nil {
return types.TS{}
}
if maxGlobal.IsFinished() {
return maxGlobal.end.Prev()
}
globals := r.GetAllGlobalCheckpoints()
if len(globals) == 1 {
return types.TS{}
if _, updated := r.store.UpdateGCIntent(&ts); !updated {
// TODO
return nil
}
maxGlobal = globals[len(globals)-1]
return maxGlobal.end.Prev()
_, err := r.gcCheckpointQueue.Enqueue(struct{}{})
return err
}

func (r *runner) ExistPendingEntryToGC() bool {
_, needGC := r.getTSTOGC()
return needGC
}

func (r *runner) IsTSStale(ts types.TS) bool {
gcts := r.getGCTS()
if gcts.IsEmpty() {
return false
}
minValidTS := gcts.Physical() - r.options.globalVersionInterval.Nanoseconds()
return ts.Physical() < minValidTS
func (r *runner) GCNeeded() bool {
return r.store.GCNeeded()
}
Loading
Loading