Skip to content

Commit

Permalink
fix range checkpoint (#18799)
Browse files Browse the repository at this point in the history
1. fix range checkpoint
2. add log

Approved by: @XuPeng-SH, @sukki37
  • Loading branch information
jiangxinmeng1 authored Sep 14, 2024
1 parent 2da268f commit 42a7c45
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 13 deletions.
20 changes: 10 additions & 10 deletions pkg/vm/engine/tae/logstore/driver/logservicedriver/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,14 @@ var ErrDriverLsnNotFound = moerr.NewInternalErrorNoCtx("driver info: driver lsn
var ErrRetryTimeOut = moerr.NewInternalErrorNoCtx("driver info: retry time out")

type driverInfo struct {
addr map[uint64]*common.ClosedIntervals //logservicelsn-driverlsn TODO drop on truncate
validLsn *roaring64.Bitmap
addrMu sync.RWMutex
driverLsn uint64 //
syncing uint64
synced uint64
syncedMu sync.RWMutex
driverLsnMu sync.RWMutex

addr map[uint64]*common.ClosedIntervals //logservicelsn-driverlsn TODO drop on truncate
validLsn *roaring64.Bitmap
addrMu sync.RWMutex
driverLsn uint64 //
syncing uint64
synced uint64
syncedMu sync.RWMutex
driverLsnMu sync.RWMutex
truncating atomic.Uint64 //
truncatedLogserviceLsn uint64 //

Expand Down Expand Up @@ -96,7 +95,7 @@ func (info *driverInfo) onReplayRecordEntry(lsn uint64, driverLsns *common.Close
func (info *driverInfo) getNextValidLogserviceLsn(lsn uint64) uint64 {
info.addrMu.Lock()
defer info.addrMu.Unlock()
if info.validLsn.GetCardinality() == 0 {
if info.validLsn.IsEmpty() {
return 0
}
max := info.validLsn.Maximum()
Expand Down Expand Up @@ -209,6 +208,7 @@ func (info *driverInfo) gcAddr(logserviceLsn uint64) {
lsnToDelete = append(lsnToDelete, serviceLsn)
}
}
info.validLsn.RemoveRange(0, logserviceLsn)
for _, lsn := range lsnToDelete {
delete(info.addr, lsn)
}
Expand Down
27 changes: 24 additions & 3 deletions pkg/vm/engine/tae/logstore/driver/logservicedriver/truncate.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@ package logservicedriver

import (
"context"
"time"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/logutil"
"go.uber.org/zap"
// "time"
)

// driver lsn -> entry lsn
func (d *LogServiceDriver) Truncate(lsn uint64) error {
logutil.Info("TRACE-WAL-TRUNCATE", zap.Uint64(" driver start truncate", lsn))
if lsn > d.truncating.Load() {
d.truncating.Store(lsn)
}
Expand All @@ -44,19 +47,33 @@ func (d *LogServiceDriver) onTruncate(items ...any) {
}

func (d *LogServiceDriver) doTruncate() {
t0 := time.Now()
target := d.truncating.Load()
lastServiceLsn := d.truncatedLogserviceLsn
lsn := lastServiceLsn
//TODO use valid lsn
next := d.getNextValidLogserviceLsn(lsn)
loopCount := 0
for d.isToTruncate(next, target) {
loopCount++
lsn = next
next = d.getNextValidLogserviceLsn(lsn)
if next <= lsn {
break
}
}
d.addrMu.RLock()
min := d.validLsn.Minimum()
max := d.validLsn.Maximum()
d.addrMu.RUnlock()
logutil.Info("TRACE-WAL-TRUNCATE-Get LogService lsn",
zap.Int("loop count", loopCount),
zap.Uint64("driver lsn", target),
zap.Uint64("min", min),
zap.Uint64("max", max),
zap.String("duration", time.Since(t0).String()))
if lsn == lastServiceLsn {
logutil.Info("LogService Driver: retrun because logservice is small")
return
}
d.truncateLogservice(lsn)
Expand All @@ -65,7 +82,8 @@ func (d *LogServiceDriver) doTruncate() {
}

func (d *LogServiceDriver) truncateLogservice(lsn uint64) {
logutil.Infof("LogService Driver: Start Truncate %d", lsn)
logutil.Info("TRACE-WAL-TRUNCATE-Start Truncate", zap.Uint64("lsn", lsn))
t0 := time.Now()
client, err := d.clientPool.Get()
if err == ErrClientPoolClosed {
return
Expand Down Expand Up @@ -101,7 +119,10 @@ func (d *LogServiceDriver) truncateLogservice(lsn uint64) {
panic(err)
}
}
logutil.Infof("LogService Driver: Truncate %d successfully", lsn)
logutil.Info("TRACE-WAL-TRUNCATE-Truncate successfully",
zap.Uint64("lsn", lsn),
zap.String("duration",
time.Since(t0).String()))
}
func (d *LogServiceDriver) getLogserviceTruncate() (lsn uint64) {
client, err := d.clientPool.Get()
Expand All @@ -127,6 +148,6 @@ func (d *LogServiceDriver) getLogserviceTruncate() (lsn uint64) {
panic(err)
}
}
logutil.Infof("Logservice Driver: Get Truncate %d", lsn)
logutil.Infof("TRACE-WAL-TRUNCATE-Get Truncate %d", lsn)
return
}
21 changes: 21 additions & 0 deletions pkg/vm/engine/tae/logstore/store/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,18 @@
package store

import (
"time"

"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common"
driverEntry "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logstore/driver/entry"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logstore/entry"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logstore/sm"
"go.uber.org/zap"
)

func (w *StoreImpl) RangeCheckpoint(gid uint32, start, end uint64) (ckpEntry entry.Entry, err error) {
logutil.Info("TRACE-WAL-TRUNCATE-RangeCheckpoint", zap.Uint32("group", gid), zap.Uint64("lsn", end))
ckpEntry = w.makeRangeCheckpointEntry(gid, start, end)
drentry, _, err := w.doAppend(GroupCKP, ckpEntry)
if err == sm.ErrClose {
Expand Down Expand Up @@ -58,6 +63,9 @@ func (w *StoreImpl) onLogCKPInfoQueue(items ...any) {
if err != nil {
panic(err)
}
logutil.Info("TRACE-WAL-TRUNCATE-CKP-Entry",
zap.Uint32("group", e.Info.Checkpoints[0].Group),
zap.Uint64("lsn", e.Info.Checkpoints[0].Ranges.GetMax()))
w.logCheckpointInfo(e.Info)
}
w.onCheckpoint()
Expand All @@ -69,6 +77,7 @@ func (w *StoreImpl) onCheckpoint() {
}

func (w *StoreImpl) ckpCkp() {
t0 := time.Now()
e := w.makeInternalCheckpointEntry()
driverEntry, _, err := w.doAppend(GroupInternal, e)
if err == sm.ErrClose {
Expand All @@ -77,6 +86,8 @@ func (w *StoreImpl) ckpCkp() {
if err != nil {
panic(err)
}
logutil.Info("TRACE-WAL-TRUNCATE-Internal-Entry",
zap.String("duration", time.Since(t0).String()))
w.truncatingQueue.Enqueue(driverEntry)
err = e.WaitDone()
if err != nil {
Expand All @@ -86,6 +97,7 @@ func (w *StoreImpl) ckpCkp() {
}

func (w *StoreImpl) onTruncatingQueue(items ...any) {
t0 := time.Now()
for _, item := range items {
e := item.(*driverEntry.Entry)
err := e.WaitDone()
Expand All @@ -94,7 +106,14 @@ func (w *StoreImpl) onTruncatingQueue(items ...any) {
}
w.logCheckpointInfo(e.Info)
}
tTruncateEntry := time.Since(t0)
t0 = time.Now()
gid, driverLsn := w.getDriverCheckpointed()
tGetDriverEntry := time.Since(t0)
logutil.Info("TRACE-WAL-TRUNCATE",
zap.String("wait truncating entry takes", tTruncateEntry.String()),
zap.String("get driver lsn takes", tGetDriverEntry.String()),
zap.Uint64("driver lsn", driverLsn))
if gid == 0 {
return
}
Expand All @@ -113,7 +132,9 @@ func (w *StoreImpl) onTruncateQueue(items ...any) {
lsn = w.driverCheckpointing.Load()
err = w.driver.Truncate(lsn)
}
t := time.Now()
w.gcWalDriverLsnMap(lsn)
logutil.Info("TRACE-WAL-TRUNCATE-GC-Store", zap.String("duration", time.Since(t).String()))
w.driverCheckpointed = lsn
}
}

0 comments on commit 42a7c45

Please sign in to comment.