Skip to content

Commit

Permalink
flushed
Browse files Browse the repository at this point in the history
  • Loading branch information
aptend committed Dec 18, 2024
1 parent a05ead1 commit 08a4f0a
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 33 deletions.
10 changes: 1 addition & 9 deletions pkg/vm/engine/tae/db/checkpoint/flusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,15 +765,7 @@ func (flusher *flushImpl) FlushTable(
func (flusher *flushImpl) IsAllChangesFlushed(
start, end types.TS, doPrint bool,
) bool {
tree := flusher.sourcer.ScanInRangePruned(start, end)
tree.GetTree().Compact()
if doPrint && !tree.IsEmpty() {
logutil.Info(
"IsAllChangesFlushed",
zap.String("dirty-tree", tree.String()),
)
}
return tree.IsEmpty()
return IsAllDirtyFlushed(flusher.sourcer, flusher.catalogCache, start, end, doPrint)
}

func (flusher *flushImpl) Start() {
Expand Down
55 changes: 31 additions & 24 deletions pkg/vm/engine/tae/db/checkpoint/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,35 @@ func (r *runner) tryScheduleIncrementalCheckpoint(start, end types.TS) {
r.tryAddNewIncrementalCheckpointEntry(entry)
}

func isTableTailFlushed(table *catalog.TableEntry, start, end types.TS, isTombstone bool) (bool, *catalog.ObjectEntry) {
func IsAllDirtyFlushed(source logtail.Collector, cata *catalog.Catalog, start, end types.TS, doPrint bool) bool {
tree, _ := source.ScanInRange(start, end)
ready := true
var notFlushed *catalog.ObjectEntry
for _, table := range tree.GetTree().Tables {
db, err := cata.GetDatabaseByID(table.DbID)
if err != nil {
continue
}
table, err := db.GetTableEntryByID(table.ID)
if err != nil {
continue
}
ready, notFlushed = IsTableTailFlushed(table, start, end, false)
if !ready {
break
}
ready, notFlushed = IsTableTailFlushed(table, start, end, true)
if !ready {
break
}
}
if !ready && doPrint {
logutil.Infof("waiting for dirty tree %s", notFlushed.StringWithLevel(2))
}
return ready
}

func IsTableTailFlushed(table *catalog.TableEntry, start, end types.TS, isTombstone bool) (bool, *catalog.ObjectEntry) {
var it btree.IterG[*catalog.ObjectEntry]
if isTombstone {
table.WaitTombstoneObjectCommitted(end)
Expand Down Expand Up @@ -796,29 +824,8 @@ func (r *runner) TryScheduleCheckpoint(endts types.TS) {
if entry.IsPendding() {
check := func() (done bool) {
start, end := entry.GetStart(), entry.GetEnd()
tree, _ := r.source.ScanInRange(start, end)
ready := true
var notFlushed *catalog.ObjectEntry
for _, table := range tree.GetTree().Tables {
db, err := r.catalog.GetDatabaseByID(table.DbID)
if err != nil {
continue
}
table, err := db.GetTableEntryByID(table.ID)
if err != nil {
continue
}
ready, notFlushed = isTableTailFlushed(table, start, end, false)
if !ready {
break
}
ready, notFlushed = isTableTailFlushed(table, start, end, true)
if !ready {
break
}
}
if !ready && entry.TooOld() {
logutil.Infof("waiting for dirty tree %s", notFlushed.StringWithLevel(2))
ready := IsAllDirtyFlushed(r.source, r.catalog, start, end, entry.TooOld())
if !ready {
entry.DeferRetirement()
}
return ready
Expand Down

0 comments on commit 08a4f0a

Please sign in to comment.