Skip to content

Commit

Permalink
lightning: fix check disk quota routine block when some engine is imp…
Browse files Browse the repository at this point in the history
…orting (#44877) (#44880)

close #44867
  • Loading branch information
ti-chi-bot authored Jun 30, 2023
1 parent 69948ed commit ae39052
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 23 deletions.
37 changes: 22 additions & 15 deletions br/pkg/lightning/backend/local/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"sort"
"sync"
"time"
"unsafe"

"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/sstable"
Expand Down Expand Up @@ -93,8 +94,9 @@ func (r *syncedRanges) reset() {

type Engine struct {
engineMeta
closed atomic.Bool
db *pebble.DB
closed atomic.Bool
// db is an atomic pointer to pebble.DB.
db atomic.UnsafePointer
UUID uuid.UUID
localWriters sync.Map

Expand Down Expand Up @@ -143,13 +145,19 @@ func (e *Engine) setError(err error) {
}
}

func (e *Engine) getDB() *pebble.DB {
return (*pebble.DB)(e.db.Load())
}

// Close closes the engine and release all resources.
func (e *Engine) Close() error {
log.L().Debug("closing local engine", zap.Stringer("engine", e.UUID), zap.Stack("stack"))
if e.db == nil {
db := e.getDB()
if db == nil {
return nil
}
err := errors.Trace(e.db.Close())
e.db = nil
err := errors.Trace(db.Close())
e.db.Store(unsafe.Pointer(nil))
return err
}

Expand Down Expand Up @@ -426,9 +434,7 @@ func getSizeProperties(logger log.Logger, db *pebble.DB, keyAdapter KeyAdapter)
}

func (e *Engine) getEngineFileSize() backend.EngineFileSize {
e.mutex.RLock()
db := e.db
e.mutex.RUnlock()
db := e.getDB()

var total pebble.LevelMetrics
if db != nil {
Expand Down Expand Up @@ -834,7 +840,7 @@ func (e *Engine) flushEngineWithoutLock(ctx context.Context) error {
return err
}

flushFinishedCh, err := e.db.AsyncFlush()
flushFinishedCh, err := e.getDB().AsyncFlush()
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -862,11 +868,11 @@ func saveEngineMetaToDB(meta *engineMeta, db *pebble.DB) error {
func (e *Engine) saveEngineMeta() error {
log.L().Debug("save engine meta", zap.Stringer("uuid", e.UUID), zap.Int64("count", e.Length.Load()),
zap.Int64("size", e.TotalSize.Load()))
return errors.Trace(saveEngineMetaToDB(&e.engineMeta, e.db))
return errors.Trace(saveEngineMetaToDB(&e.engineMeta, e.getDB()))
}

func (e *Engine) loadEngineMeta() error {
jsonBytes, closer, err := e.db.Get(engineMetaKey)
jsonBytes, closer, err := e.getDB().Get(engineMetaKey)
if err != nil {
if err == pebble.ErrNotFound {
log.L().Debug("local db missing engine meta", zap.Stringer("uuid", e.UUID), log.ShortError(err))
Expand Down Expand Up @@ -958,13 +964,13 @@ func (e *Engine) newKVIter(ctx context.Context, opts *pebble.IterOptions) Iter {
opts = &newOpts
}
if !e.duplicateDetection {
return pebbleIter{Iterator: e.db.NewIter(opts)}
return pebbleIter{Iterator: e.getDB().NewIter(opts)}
}
logger := log.With(
zap.String("table", common.UniqueTable(e.tableInfo.DB, e.tableInfo.Name)),
zap.Int64("tableID", e.tableInfo.ID),
zap.Stringer("engineUUID", e.UUID))
return newDupDetectIter(ctx, e.db, e.keyAdapter, opts, e.duplicateDB, logger)
return newDupDetectIter(ctx, e.getDB(), e.keyAdapter, opts, e.duplicateDB, logger)
}

type sstMeta struct {
Expand Down Expand Up @@ -1478,8 +1484,9 @@ func (i dbSSTIngester) ingest(metas []*sstMeta) error {
for _, m := range metas {
paths = append(paths, m.path)
}
if i.e.db == nil {
db := i.e.getDB()
if db == nil {
return errorEngineClosed
}
return i.e.db.Ingest(paths)
return db.Ingest(paths)
}
51 changes: 50 additions & 1 deletion br/pkg/lightning/backend/local/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"os"
"path"
"path/filepath"
"sync"
"testing"
"unsafe"

"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/sstable"
Expand All @@ -31,6 +33,53 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/backend"
)

func makePebbleDB(t *testing.T, opt *pebble.Options) (*pebble.DB, string) {
dir := t.TempDir()
db, err := pebble.Open(path.Join(dir, "test"), opt)
require.NoError(t, err)
tmpPath := filepath.Join(dir, "test.sst")
err = os.Mkdir(tmpPath, 0o755)
require.NoError(t, err)
return db, tmpPath
}

func TestGetEngineSizeWhenImport(t *testing.T) {
opt := &pebble.Options{
MemTableSize: 1024 * 1024,
MaxConcurrentCompactions: 16,
L0CompactionThreshold: math.MaxInt32, // set to max try to disable compaction
L0StopWritesThreshold: math.MaxInt32, // set to max try to disable compaction
DisableWAL: true,
ReadOnly: false,
}
db, tmpPath := makePebbleDB(t, opt)

_, engineUUID := backend.MakeUUID("ww", 0)
engineCtx, cancel := context.WithCancel(context.Background())
f := &Engine{
UUID: engineUUID,
sstDir: tmpPath,
ctx: engineCtx,
cancel: cancel,
sstMetasChan: make(chan metaOrFlush, 64),
keyAdapter: noopKeyAdapter{},
}
f.db.Store(unsafe.Pointer(db))
// simulate import
f.lock(importMutexStateImport)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
engineFileSize := f.getEngineFileSize()
require.Equal(t, f.UUID, engineFileSize.UUID)
require.True(t, engineFileSize.IsImporting)
}()
wg.Wait()
f.unlock()
require.NoError(t, f.Close())
}

func TestIngestSSTWithClosedEngine(t *testing.T) {
dir := t.TempDir()
opt := &pebble.Options{
Expand All @@ -50,14 +99,14 @@ func TestIngestSSTWithClosedEngine(t *testing.T) {
_, engineUUID := backend.MakeUUID("ww", 0)
engineCtx, cancel := context.WithCancel(context.Background())
f := &Engine{
db: db,
UUID: engineUUID,
sstDir: tmpPath,
ctx: engineCtx,
cancel: cancel,
sstMetasChan: make(chan metaOrFlush, 64),
keyAdapter: noopKeyAdapter{},
}
f.db.Store(unsafe.Pointer(db))
f.sstIngester = dbSSTIngester{e: f}
sstPath := path.Join(tmpPath, uuid.New().String()+".sst")
file, err := os.Create(sstPath)
Expand Down
9 changes: 5 additions & 4 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"strings"
"sync"
"time"
"unsafe"

"github.com/cockroachdb/pebble"
"github.com/coreos/go-semver/semver"
Expand Down Expand Up @@ -619,7 +620,7 @@ func (local *local) OpenEngine(ctx context.Context, cfg *backend.EngineConfig, e
keyAdapter: local.keyAdapter,
})
engine := e.(*Engine)
engine.db = db
engine.db.Store(unsafe.Pointer(db))
engine.sstIngester = dbSSTIngester{e: engine}
if err = engine.loadEngineMeta(); err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -658,14 +659,14 @@ func (local *local) CloseEngine(ctx context.Context, cfg *backend.EngineConfig,
}
engine := &Engine{
UUID: engineUUID,
db: db,
sstMetasChan: make(chan metaOrFlush),
tableInfo: cfg.TableInfo,
keyAdapter: local.keyAdapter,
duplicateDetection: local.duplicateDetection,
duplicateDB: local.duplicateDB,
errorMgr: local.errorMgr,
}
engine.db.Store(unsafe.Pointer(db))
engine.sstIngester = dbSSTIngester{e: engine}
if err = engine.loadEngineMeta(); err != nil {
return err
Expand Down Expand Up @@ -1023,7 +1024,7 @@ func (local *local) readAndSplitIntoRange(ctx context.Context, engine *Engine, r
}

logger := log.With(zap.Stringer("engine", engine.UUID))
sizeProps, err := getSizeProperties(logger, engine.db, local.keyAdapter)
sizeProps, err := getSizeProperties(logger, engine.getDB(), local.keyAdapter)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -1587,7 +1588,7 @@ func (local *local) ResetEngine(ctx context.Context, engineUUID uuid.UUID) error
}
db, err := local.openEngineDB(engineUUID, false)
if err == nil {
localEngine.db = db
localEngine.db.Store(unsafe.Pointer(db))
localEngine.engineMeta = engineMeta{}
if !common.IsDirExists(localEngine.sstDir) {
if err := os.Mkdir(localEngine.sstDir, 0o750); err != nil {
Expand Down
7 changes: 4 additions & 3 deletions br/pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"sync"
"sync/atomic"
"testing"
"unsafe"

"github.com/cockroachdb/pebble"
"github.com/coreos/go-semver/semver"
Expand Down Expand Up @@ -337,14 +338,14 @@ func testLocalWriter(t *testing.T, needSort bool, partitialSort bool) {
_, engineUUID := backend.MakeUUID("ww", 0)
engineCtx, cancel := context.WithCancel(context.Background())
f := &Engine{
db: db,
UUID: engineUUID,
sstDir: tmpPath,
ctx: engineCtx,
cancel: cancel,
sstMetasChan: make(chan metaOrFlush, 64),
keyAdapter: noopKeyAdapter{},
}
f.db.Store(unsafe.Pointer(db))
f.sstIngester = dbSSTIngester{e: f}
f.wg.Add(1)
go f.ingestSSTLoop()
Expand Down Expand Up @@ -594,7 +595,6 @@ func TestLocalIngestLoop(t *testing.T) {
_, engineUUID := backend.MakeUUID("ww", 0)
engineCtx, cancel := context.WithCancel(context.Background())
f := Engine{
db: db,
UUID: engineUUID,
sstDir: "",
ctx: engineCtx,
Expand All @@ -606,6 +606,7 @@ func TestLocalIngestLoop(t *testing.T) {
CompactConcurrency: 4,
},
}
f.db.Store(unsafe.Pointer(db))
f.sstIngester = testIngester{}
f.wg.Add(1)
go f.ingestSSTLoop()
Expand Down Expand Up @@ -811,7 +812,6 @@ func testMergeSSTs(t *testing.T, kvs [][]common.KvPair, meta *sstMeta) {
engineCtx, cancel := context.WithCancel(context.Background())

f := &Engine{
db: db,
UUID: engineUUID,
sstDir: tmpPath,
ctx: engineCtx,
Expand All @@ -823,6 +823,7 @@ func testMergeSSTs(t *testing.T, kvs [][]common.KvPair, meta *sstMeta) {
CompactConcurrency: 4,
},
}
f.db.Store(unsafe.Pointer(db))

createSSTWriter := func() (*sstWriter, error) {
path := filepath.Join(f.sstDir, uuid.New().String()+".sst")
Expand Down

0 comments on commit ae39052

Please sign in to comment.