Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Online check GC correctness #16242

Merged
merged 77 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
77 commits
Select commit Hold shift + click to select a range
59c749b
GC tombstone object
LeftHandCold May 14, 2024
9d56e39
Update code
LeftHandCold May 14, 2024
26cf51d
Update code
LeftHandCold May 14, 2024
7656fa8
Update code
LeftHandCold May 14, 2024
9e147a4
Update code
LeftHandCold May 14, 2024
2ed92e3
Update code
LeftHandCold May 14, 2024
9ea7d1b
Update code
LeftHandCold May 14, 2024
260dae1
Update code
LeftHandCold May 14, 2024
6c1d544
Update code
LeftHandCold May 14, 2024
a9f6e90
Update code
LeftHandCold May 14, 2024
074755c
Update code
LeftHandCold May 14, 2024
40eccba
Update code
LeftHandCold May 14, 2024
ea9926c
Update code
LeftHandCold May 14, 2024
00113c9
Update code
LeftHandCold May 14, 2024
59539a6
Merge branch 'main' into gc_tombstone
mergify[bot] May 14, 2024
0b7252d
Update code
LeftHandCold May 14, 2024
448a6fd
Merge branch 'gc_tombstone' of github.com:LeftHandCold/matrixone into…
LeftHandCold May 14, 2024
7e45eee
Update code
LeftHandCold May 14, 2024
1e5fa63
Update code
LeftHandCold May 14, 2024
96354e5
Update code
LeftHandCold May 14, 2024
6df3baf
Fix bug
LeftHandCold May 14, 2024
57064d4
Update code
LeftHandCold May 14, 2024
160ab2b
Update codd
LeftHandCold May 14, 2024
790c5ab
Fix bug
LeftHandCold May 14, 2024
e2d2daf
Update code
LeftHandCold May 14, 2024
2760e5f
Update code
LeftHandCold May 14, 2024
b48d0bb
Merge branch 'main' into gc_tombstone
LeftHandCold May 14, 2024
60e4b6a
Merge branch 'main' into gc_tombstone
LeftHandCold May 15, 2024
de8a45d
Update log
LeftHandCold May 15, 2024
b5c1240
Merge branch 'main' into gc_tombstone
LeftHandCold May 15, 2024
a1dfb80
Update log
LeftHandCold May 15, 2024
be7d7b0
Merge branch 'gc_tombstone' of github.com:LeftHandCold/matrixone into…
LeftHandCold May 15, 2024
5b6a897
Update code
LeftHandCold May 15, 2024
ccea4d6
Fix ut
LeftHandCold May 16, 2024
914bb88
Add GC Check
LeftHandCold May 16, 2024
9f7bf1b
Update code
LeftHandCold May 16, 2024
39c0f7f
Update code
LeftHandCold May 16, 2024
60adc36
Update code
LeftHandCold May 16, 2024
03a2211
Update code
LeftHandCold May 17, 2024
37670cf
Fix bug
LeftHandCold May 17, 2024
231bac5
Update code
LeftHandCold May 17, 2024
4440aa4
Update code
LeftHandCold May 17, 2024
ab3f557
Update code
LeftHandCold May 17, 2024
71b6060
Update code
LeftHandCold May 17, 2024
5a1c940
Update code
LeftHandCold May 17, 2024
291e8af
Update code
LeftHandCold May 17, 2024
3825ae3
Update code
LeftHandCold May 17, 2024
8a265d2
Update code
LeftHandCold May 17, 2024
46b8311
Update code
LeftHandCold May 18, 2024
ad4ef4e
Update code
LeftHandCold May 18, 2024
a8b4f9e
Update code
LeftHandCold May 18, 2024
111c2e6
Update code
LeftHandCold May 18, 2024
81965e0
Update code
LeftHandCold May 19, 2024
436be80
Update code
LeftHandCold May 19, 2024
47eebb1
Update code
LeftHandCold May 19, 2024
71cedb0
Update code
LeftHandCold May 19, 2024
8c4ee98
Update code
LeftHandCold May 19, 2024
46de5df
Update code
LeftHandCold May 19, 2024
b965d45
Update code
LeftHandCold May 19, 2024
215c423
Update code
LeftHandCold May 19, 2024
1c811cc
Update code
LeftHandCold May 19, 2024
ad817aa
update code
LeftHandCold May 20, 2024
504fc83
Update code
LeftHandCold May 20, 2024
87f2e6d
Update code
LeftHandCold May 20, 2024
506c592
Update code
LeftHandCold May 20, 2024
3888566
Merge branch 'main' into online_gc_check_main
LeftHandCold May 20, 2024
071e4c7
Update code
LeftHandCold May 20, 2024
bb13f82
Update code
LeftHandCold May 20, 2024
5cb461e
Update code
LeftHandCold May 20, 2024
229cf4e
Update code
LeftHandCold May 20, 2024
e92dccf
Update code
LeftHandCold May 20, 2024
9e008f0
Fix mpool leak
LeftHandCold May 21, 2024
a244f6f
Merge branch 'main' into online_gc_check_main
mergify[bot] May 21, 2024
244a2ab
Update log
LeftHandCold May 21, 2024
c0cd9e2
Merge branch 'online_gc_check_main' of github.com:LeftHandCold/matrix…
LeftHandCold May 21, 2024
f5f522e
Update log
LeftHandCold May 21, 2024
a28b11e
Merge branch 'main' into online_gc_check_main
mergify[bot] May 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/tnservice/cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ type Config struct {
GCTTL toml.Duration `toml:"gc-ttl"`
ScanGCInterval toml.Duration `toml:"scan-gc-interval"`
DisableGC bool `toml:"disable-gc"`
CheckGC bool `toml:"check-gc"`
}

Merge struct {
Expand Down
1 change: 1 addition & 0 deletions pkg/tnservice/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ func (s *store) newTAEStorage(ctx context.Context, shard metadata.TNShard, facto
GCTTL: s.cfg.GCCfg.GCTTL.Duration,
ScanGCInterval: s.cfg.GCCfg.ScanGCInterval.Duration,
DisableGC: s.cfg.GCCfg.DisableGC,
CheckGC: s.cfg.GCCfg.CheckGC,
}

mergeCfg := &options.MergeConfig{
Expand Down
6 changes: 6 additions & 0 deletions pkg/vm/engine/tae/db/checkpoint/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers"
"time"
)
Expand All @@ -32,6 +33,7 @@ type RunnerReader interface {
MaxGlobalCheckpoint() *CheckpointEntry
GetStage() types.TS
MaxLSN() uint64
GetCatalog() *catalog.Catalog
}

func (r *runner) collectCheckpointMetadata(start, end types.TS, ckpLSN, truncateLSN uint64) *containers.Batch {
Expand Down Expand Up @@ -102,6 +104,10 @@ func (r *runner) MaxCheckpoint() *CheckpointEntry {
return entry
}

func (r *runner) GetCatalog() *catalog.Catalog {
return r.catalog
}

func (r *runner) ICKPSeekLT(ts types.TS, cnt int) []*CheckpointEntry {
r.storage.Lock()
tree := r.storage.entries.Copy()
Expand Down
214 changes: 214 additions & 0 deletions pkg/vm/engine/tae/db/gc/check.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
// Copyright 2021 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package gc

import (
catalog2 "github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/objectio"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/checkpoint"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail"
"time"
)

const NotFoundLimit = 10

type checker struct {
cleaner *checkpointCleaner
}

func (c *checker) getObjects() (map[string]struct{}, error) {
dirs, err := c.cleaner.fs.ListDir("")
if err != nil {
return nil, err
}
objects := make(map[string]struct{})
for _, entry := range dirs {
if entry.IsDir {
continue
}
objects[entry.Name] = struct{}{}
}
return objects, nil
}

func (c *checker) Check() error {
now := time.Now()
c.cleaner.inputs.RLock()
defer c.cleaner.inputs.RUnlock()
gcTables := c.cleaner.GetGCTables()

// Collect the objects and tombstones that the disk cleaner has already consumed
gcTable := NewGCTable()
for _, table := range gcTables {
gcTable.Merge(table)
}
objects := gcTable.objects
tombstones := gcTable.tombstones
entry := c.cleaner.GetMaxConsumed()
maxTs := entry.GetEnd()

// Collect the objects and tombstones that the disk cleaner has not consumed
checkpoints := c.cleaner.ckpClient.ICKPSeekLT(entry.GetEnd(), 40)
unconsumedTable := NewGCTable()
for _, ckp := range checkpoints {
_, data, err := logtail.LoadCheckpointEntriesFromKey(c.cleaner.ctx, c.cleaner.fs.Service,
ckp.GetLocation(), ckp.GetVersion(), nil, &types.TS{})
if err != nil {
logutil.Errorf("load checkpoint failed: %v", err)
continue
}
defer data.Close()
unconsumedTable.UpdateTable(data)
end := ckp.GetEnd()
if end.Greater(&maxTs) {
maxTs = ckp.GetEnd()
}
}
unconsumedObjects := unconsumedTable.objects
unconsumedTombstones := unconsumedTable.tombstones

// Collect all objects
allObjects, err := c.getObjects()
if err != nil {
return err
}

// Collect all checkpoint files
ckpfiles, _, err := checkpoint.ListSnapshotMeta(c.cleaner.ctx, c.cleaner.fs.Service, entry.GetStart(), nil)
if err != nil {
return err
}
// The number of checkpoint files is ckpObjectCount
ckpObjectCount := len(ckpfiles) * 2
allCount := len(allObjects)
for name := range allObjects {
isfound := false
if _, ok := objects[name]; ok {
isfound = true
delete(objects, name)
}
if _, ok := tombstones[name]; ok {
isfound = true
delete(tombstones, name)
}
if _, ok := unconsumedObjects[name]; ok {
isfound = true
delete(unconsumedObjects, name)
}
if _, ok := unconsumedTombstones[name]; ok {
isfound = true
delete(unconsumedTombstones, name)
}
if isfound {
delete(allObjects, name)
}
}

// Collect all objects in memory
catalog := c.cleaner.ckpClient.GetCatalog()
it := catalog.MakeDBIt(true)
bat := makeRespBatchFromSchema(logtail.BlkMetaSchema, common.DebugAllocator)
defer bat.Close()
end := types.BuildTS(time.Now().UnixNano(), 0)
for ; it.Valid(); it.Next() {
db := it.Get().GetPayload()
itTable := db.MakeTableIt(true)
for itTable.Valid() {
table := itTable.Get().GetPayload()
itObject := table.MakeObjectIt(true)
for itObject.Valid() {
objectEntry := itObject.Get().GetPayload()
stats := objectEntry.GetObjectStats()
delete(allObjects, stats.ObjectName().String())
itObject.Next()
}
it2 := table.GetDeleteList().Items()
for _, itt := range it2 {
_, _, _, err = itt.VisitDeletes(c.cleaner.ctx, maxTs, end, bat, nil, true)
if err != nil {
logutil.Errorf("visit deletes failed: %v", err)
continue
}
}
itTable.Next()
}
}
for i := 0; i < bat.Length(); i++ {
deltaLoc := objectio.Location(bat.GetVectorByName(catalog2.BlockMeta_DeltaLoc).Get(i).([]byte))
delete(allObjects, deltaLoc.Name().String())
}

if len(objects) != 0 || len(tombstones) != 0 || len(unconsumedObjects) != 0 || len(unconsumedTombstones) != 0 {
for name := range objects {
logutil.Errorf("[Check GC]lost object %s,", name)
}

for name := range tombstones {
logutil.Errorf("[Check GC]lost tombstone %s,", name)
}

for name := range unconsumedObjects {
logutil.Errorf("[Check GC]lost unconsumed object %s,", name)
}

for name := range unconsumedTombstones {
logutil.Errorf("[Check GC]lost unconsumed tombstone %s,", name)
}
}

if len(allObjects) > ckpObjectCount+NotFoundLimit {
for name := range allObjects {
logutil.Infof("[Check GC]not found object %s,", name)
}
logutil.Warnf("[Check GC]GC abnormal!!! const: %v, all objects: %d, not found: %d, checkpoint file: %d",
time.Since(now), allCount, len(allObjects)-ckpObjectCount, ckpObjectCount)
} else {
logutil.Infof("[Check GC]Check end!!! const: %v, all objects: %d, not found: %d",
time.Since(now), allCount, len(allObjects)-ckpObjectCount)
}
return nil
}

func makeRespBatchFromSchema(schema *catalog.Schema, mp *mpool.MPool) *containers.Batch {
bat := containers.NewBatch()

bat.AddVector(
catalog.AttrRowID,
containers.MakeVector(types.T_Rowid.ToType(), mp),
)
bat.AddVector(
catalog.AttrCommitTs,
containers.MakeVector(types.T_TS.ToType(), mp),
)
// Types() is not used, then empty schema can also be handled here
typs := schema.AllTypes()
attrs := schema.AllNames()
for i, attr := range attrs {
if attr == catalog.PhyAddrColumnName {
continue
}
bat.AddVector(
attr,
containers.MakeVector(typs[i], mp),
)
}
return bat
}
Loading
Loading