Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix range checkpoint #18799

Merged
merged 2 commits into from
Sep 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions pkg/vm/engine/tae/logstore/driver/logservicedriver/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,14 @@ var ErrDriverLsnNotFound = moerr.NewInternalErrorNoCtx("driver info: driver lsn
var ErrRetryTimeOut = moerr.NewInternalErrorNoCtx("driver info: retry time out")

type driverInfo struct {
addr map[uint64]*common.ClosedIntervals //logservicelsn-driverlsn TODO drop on truncate
validLsn *roaring64.Bitmap
addrMu sync.RWMutex
driverLsn uint64 //
syncing uint64
synced uint64
syncedMu sync.RWMutex
driverLsnMu sync.RWMutex

addr map[uint64]*common.ClosedIntervals //logservicelsn-driverlsn TODO drop on truncate
validLsn *roaring64.Bitmap
addrMu sync.RWMutex
driverLsn uint64 //
syncing uint64
synced uint64
syncedMu sync.RWMutex
driverLsnMu sync.RWMutex
truncating atomic.Uint64 //
truncatedLogserviceLsn uint64 //

Expand Down Expand Up @@ -96,7 +95,7 @@ func (info *driverInfo) onReplayRecordEntry(lsn uint64, driverLsns *common.Close
func (info *driverInfo) getNextValidLogserviceLsn(lsn uint64) uint64 {
info.addrMu.Lock()
defer info.addrMu.Unlock()
if info.validLsn.GetCardinality() == 0 {
if info.validLsn.IsEmpty() {
return 0
}
max := info.validLsn.Maximum()
Expand Down Expand Up @@ -209,6 +208,7 @@ func (info *driverInfo) gcAddr(logserviceLsn uint64) {
lsnToDelete = append(lsnToDelete, serviceLsn)
}
}
info.validLsn.RemoveRange(0, logserviceLsn)
for _, lsn := range lsnToDelete {
delete(info.addr, lsn)
}
Expand Down
27 changes: 24 additions & 3 deletions pkg/vm/engine/tae/logstore/driver/logservicedriver/truncate.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@ package logservicedriver

import (
"context"
"time"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/logutil"
"go.uber.org/zap"
// "time"
)

// driver lsn -> entry lsn
func (d *LogServiceDriver) Truncate(lsn uint64) error {
logutil.Info("TRACE-WAL-TRUNCATE", zap.Uint64(" driver start truncate", lsn))
if lsn > d.truncating.Load() {
d.truncating.Store(lsn)
}
Expand All @@ -44,19 +47,33 @@ func (d *LogServiceDriver) onTruncate(items ...any) {
}

func (d *LogServiceDriver) doTruncate() {
t0 := time.Now()
target := d.truncating.Load()
lastServiceLsn := d.truncatedLogserviceLsn
lsn := lastServiceLsn
//TODO use valid lsn
next := d.getNextValidLogserviceLsn(lsn)
loopCount := 0
for d.isToTruncate(next, target) {
loopCount++
lsn = next
next = d.getNextValidLogserviceLsn(lsn)
if next <= lsn {
break
}
}
d.addrMu.RLock()
min := d.validLsn.Minimum()
max := d.validLsn.Maximum()
d.addrMu.RUnlock()
logutil.Info("TRACE-WAL-TRUNCATE-Get LogService lsn",
zap.Int("loop count", loopCount),
zap.Uint64("driver lsn", target),
zap.Uint64("min", min),
zap.Uint64("max", max),
zap.String("duration", time.Since(t0).String()))
if lsn == lastServiceLsn {
logutil.Info("LogService Driver: retrun because logservice is small")
return
}
d.truncateLogservice(lsn)
Expand All @@ -65,7 +82,8 @@ func (d *LogServiceDriver) doTruncate() {
}

func (d *LogServiceDriver) truncateLogservice(lsn uint64) {
logutil.Infof("LogService Driver: Start Truncate %d", lsn)
logutil.Info("TRACE-WAL-TRUNCATE-Start Truncate", zap.Uint64("lsn", lsn))
t0 := time.Now()
client, err := d.clientPool.Get()
if err == ErrClientPoolClosed {
return
Expand Down Expand Up @@ -101,7 +119,10 @@ func (d *LogServiceDriver) truncateLogservice(lsn uint64) {
panic(err)
}
}
logutil.Infof("LogService Driver: Truncate %d successfully", lsn)
logutil.Info("TRACE-WAL-TRUNCATE-Truncate successfully",
zap.Uint64("lsn", lsn),
zap.String("duration",
time.Since(t0).String()))
}
func (d *LogServiceDriver) getLogserviceTruncate() (lsn uint64) {
client, err := d.clientPool.Get()
Expand All @@ -127,6 +148,6 @@ func (d *LogServiceDriver) getLogserviceTruncate() (lsn uint64) {
panic(err)
}
}
logutil.Infof("Logservice Driver: Get Truncate %d", lsn)
logutil.Infof("TRACE-WAL-TRUNCATE-Get Truncate %d", lsn)
return
}
21 changes: 21 additions & 0 deletions pkg/vm/engine/tae/logstore/store/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,18 @@
package store

import (
"time"

"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common"
driverEntry "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logstore/driver/entry"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logstore/entry"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logstore/sm"
"go.uber.org/zap"
)

func (w *StoreImpl) RangeCheckpoint(gid uint32, start, end uint64) (ckpEntry entry.Entry, err error) {
logutil.Info("TRACE-WAL-TRUNCATE-RangeCheckpoint", zap.Uint32("group", gid), zap.Uint64("lsn", end))
ckpEntry = w.makeRangeCheckpointEntry(gid, start, end)
drentry, _, err := w.doAppend(GroupCKP, ckpEntry)
if err == sm.ErrClose {
Expand Down Expand Up @@ -58,6 +63,9 @@ func (w *StoreImpl) onLogCKPInfoQueue(items ...any) {
if err != nil {
panic(err)
}
logutil.Info("TRACE-WAL-TRUNCATE-CKP-Entry",
zap.Uint32("group", e.Info.Checkpoints[0].Group),
zap.Uint64("lsn", e.Info.Checkpoints[0].Ranges.GetMax()))
w.logCheckpointInfo(e.Info)
}
w.onCheckpoint()
Expand All @@ -69,6 +77,7 @@ func (w *StoreImpl) onCheckpoint() {
}

func (w *StoreImpl) ckpCkp() {
t0 := time.Now()
e := w.makeInternalCheckpointEntry()
driverEntry, _, err := w.doAppend(GroupInternal, e)
if err == sm.ErrClose {
Expand All @@ -77,6 +86,8 @@ func (w *StoreImpl) ckpCkp() {
if err != nil {
panic(err)
}
logutil.Info("TRACE-WAL-TRUNCATE-Internal-Entry",
zap.String("duration", time.Since(t0).String()))
w.truncatingQueue.Enqueue(driverEntry)
err = e.WaitDone()
if err != nil {
Expand All @@ -86,6 +97,7 @@ func (w *StoreImpl) ckpCkp() {
}

func (w *StoreImpl) onTruncatingQueue(items ...any) {
t0 := time.Now()
for _, item := range items {
e := item.(*driverEntry.Entry)
err := e.WaitDone()
Expand All @@ -94,7 +106,14 @@ func (w *StoreImpl) onTruncatingQueue(items ...any) {
}
w.logCheckpointInfo(e.Info)
}
tTruncateEntry := time.Since(t0)
t0 = time.Now()
gid, driverLsn := w.getDriverCheckpointed()
tGetDriverEntry := time.Since(t0)
logutil.Info("TRACE-WAL-TRUNCATE",
zap.String("wait truncating entry takes", tTruncateEntry.String()),
zap.String("get driver lsn takes", tGetDriverEntry.String()),
zap.Uint64("driver lsn", driverLsn))
if gid == 0 {
return
}
Expand All @@ -113,7 +132,9 @@ func (w *StoreImpl) onTruncateQueue(items ...any) {
lsn = w.driverCheckpointing.Load()
err = w.driver.Truncate(lsn)
}
t := time.Now()
w.gcWalDriverLsnMap(lsn)
logutil.Info("TRACE-WAL-TRUNCATE-GC-Store", zap.String("duration", time.Since(t).String()))
w.driverCheckpointed = lsn
}
}
Loading