Skip to content

Commit

Permalink
refactor ickp impl phase 1 (#20866)
Browse files Browse the repository at this point in the history
refactor ickp implementation phase 1

Approved by: @LeftHandCold
  • Loading branch information
XuPeng-SH authored Dec 23, 2024
1 parent 28cc306 commit e098406
Show file tree
Hide file tree
Showing 13 changed files with 890 additions and 191 deletions.
134 changes: 128 additions & 6 deletions pkg/vm/engine/tae/db/checkpoint/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,32 @@ import (
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail"
)

type Intent interface {
String() string
Wait() <-chan struct{}
}

type EntryOption func(*CheckpointEntry)

func WithEndEntryOption(end types.TS) EntryOption {
return func(e *CheckpointEntry) {
e.end = end
}
}

func WithStateEntryOption(state State) EntryOption {
return func(e *CheckpointEntry) {
e.state = state
}
}

func WithCheckedEntryOption(policyChecked, flushedChecked bool) EntryOption {
return func(e *CheckpointEntry) {
e.policyChecked = policyChecked
e.flushChecked = flushedChecked
}
}

type CheckpointEntry struct {
sync.RWMutex
sid string
Expand All @@ -43,35 +69,112 @@ type CheckpointEntry struct {
ckpLSN uint64
truncateLSN uint64

policyChecked bool
flushChecked bool

// only for new entry logic procedure
bornTime time.Time
refreshCnt uint32

doneC chan struct{}
}

func NewCheckpointEntry(sid string, start, end types.TS, typ EntryType) *CheckpointEntry {
return &CheckpointEntry{
func NewCheckpointEntry(
sid string, start, end types.TS, typ EntryType, opts ...EntryOption,
) *CheckpointEntry {
e := &CheckpointEntry{
sid: sid,
start: start,
end: end,
state: ST_Pending,
entryType: typ,
version: logtail.CheckpointCurrentVersion,
bornTime: time.Now(),
doneC: make(chan struct{}),
}
for _, opt := range opts {
opt(e)
}
return e
}

func InheritCheckpointEntry(
from *CheckpointEntry,
replaceOpts ...EntryOption,
) *CheckpointEntry {
from.RLock()
defer from.RUnlock()
e := &CheckpointEntry{
sid: from.sid,
start: from.start,
end: from.end,
state: from.state,
entryType: from.entryType,
version: from.version,
bornTime: from.bornTime,
refreshCnt: from.refreshCnt,
policyChecked: from.policyChecked,
flushChecked: from.flushChecked,
doneC: from.doneC,
}
for _, opt := range replaceOpts {
opt(e)
}
return e
}

func (e *CheckpointEntry) Wait() <-chan struct{} {
return e.doneC
}

func (e *CheckpointEntry) Done() {
close(e.doneC)
}

// e.start >= o.end
func (e *CheckpointEntry) AllGE(o *CheckpointEntry) bool {
return e.start.GE(&o.end)
}

func (e *CheckpointEntry) SetPolicyChecked() {
e.Lock()
defer e.Unlock()
e.policyChecked = true
}

func (e *CheckpointEntry) IsPolicyChecked() bool {
e.RLock()
defer e.RUnlock()
return e.policyChecked
}

func (e *CheckpointEntry) SetFlushChecked() {
e.Lock()
defer e.Unlock()
e.flushChecked = true
}

func (e *CheckpointEntry) IsFlushChecked() bool {
e.RLock()
defer e.RUnlock()
return e.flushChecked
}

func (e *CheckpointEntry) AllChecked() bool {
e.RLock()
defer e.RUnlock()
return e.policyChecked && e.flushChecked
}

func (e *CheckpointEntry) SetVersion(version uint32) {
e.Lock()
defer e.Unlock()
e.version = version
}

func (e *CheckpointEntry) SetLSN(ckpLSN, truncateLSN uint64) {
e.Lock()
defer e.Unlock()
e.ckpLSN = ckpLSN
e.truncateLSN = truncateLSN
}
Expand All @@ -85,13 +188,21 @@ func (e *CheckpointEntry) Age() time.Duration {
defer e.RUnlock()
return time.Since(e.bornTime)
}
func (e *CheckpointEntry) ResetAge() {
e.Lock()
defer e.Unlock()
e.bornTime = time.Now()
e.refreshCnt = 0
}
func (e *CheckpointEntry) TooOld() bool {
e.RLock()
defer e.RUnlock()
return time.Since(e.bornTime) > time.Minute*4*time.Duration(e.refreshCnt+1)
return time.Since(e.bornTime) > time.Minute*3*time.Duration(e.refreshCnt+1)
}
func (e *CheckpointEntry) LSNString() string {
return fmt.Sprintf("ckp %d, truncate %d", e.ckpLSN, e.truncateLSN)
e.RLock()
defer e.RUnlock()
return fmt.Sprintf("%d-%d", e.ckpLSN, e.truncateLSN)
}

func (e *CheckpointEntry) LSN() uint64 {
Expand Down Expand Up @@ -195,7 +306,16 @@ func (e *CheckpointEntry) String() string {
t = "G"
}
state := e.GetState()
return fmt.Sprintf("CKP[%s][%v][%s](%s->%s)", t, state, e.LSNString(), e.start.ToString(), e.end.ToString())
return fmt.Sprintf(
"CKP[%s][%v][%v:%v][%s](%s->%s)",
t,
state,
e.IsPolicyChecked(),
e.IsFlushChecked(),
e.LSNString(),
e.start.ToString(),
e.end.ToString(),
)
}

func (e *CheckpointEntry) Prefetch(
Expand Down Expand Up @@ -259,7 +379,9 @@ func (e *CheckpointEntry) ReadMetaIdx(
return data.ReadTNMetaBatch(ctx, e.version, e.tnLocation, reader)
}

func (e *CheckpointEntry) GetByTableID(ctx context.Context, fs *objectio.ObjectFS, tid uint64) (ins, del, dataObject, tombstoneObject *api.Batch, err error) {
func (e *CheckpointEntry) GetTableByID(
ctx context.Context, fs *objectio.ObjectFS, tid uint64,
) (ins, del, dataObject, tombstoneObject *api.Batch, err error) {
reader, err := blockio.NewObjectReader(e.sid, fs.Service, e.cnLocation)
if err != nil {
return
Expand Down
4 changes: 2 additions & 2 deletions pkg/vm/engine/tae/db/checkpoint/flusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,8 +371,8 @@ func (flusher *flushImpl) triggerJob(ctx context.Context) {
request.tree = entry
flusher.flushRequestQ.Enqueue(request)
}
_, endTS := entry.GetTimeRange()
flusher.checkpointSchduler.TryScheduleCheckpoint(endTS)
_, ts := entry.GetTimeRange()
flusher.checkpointSchduler.TryScheduleCheckpoint(ts, false)
}

func (flusher *flushImpl) onFlushRequest(items ...any) {
Expand Down
Loading

0 comments on commit e098406

Please sign in to comment.