diff --git a/br/pkg/lightning/backend/local/engine.go b/br/pkg/lightning/backend/local/engine.go index 82ebc4c4c3e65..5db851f338ec9 100644 --- a/br/pkg/lightning/backend/local/engine.go +++ b/br/pkg/lightning/backend/local/engine.go @@ -27,6 +27,7 @@ import ( "sort" "sync" "time" + "unsafe" "github.com/cockroachdb/pebble" "github.com/cockroachdb/pebble/sstable" @@ -93,8 +94,9 @@ func (r *syncedRanges) reset() { type Engine struct { engineMeta - closed atomic.Bool - db *pebble.DB + closed atomic.Bool + // db is an atomic pointer to pebble.DB. + db atomic.UnsafePointer UUID uuid.UUID localWriters sync.Map @@ -143,13 +145,19 @@ func (e *Engine) setError(err error) { } } +func (e *Engine) getDB() *pebble.DB { + return (*pebble.DB)(e.db.Load()) +} + +// Close closes the engine and release all resources. func (e *Engine) Close() error { log.L().Debug("closing local engine", zap.Stringer("engine", e.UUID), zap.Stack("stack")) - if e.db == nil { + db := e.getDB() + if db == nil { return nil } - err := errors.Trace(e.db.Close()) - e.db = nil + err := errors.Trace(db.Close()) + e.db.Store(unsafe.Pointer(nil)) return err } @@ -426,9 +434,7 @@ func getSizeProperties(logger log.Logger, db *pebble.DB, keyAdapter KeyAdapter) } func (e *Engine) getEngineFileSize() backend.EngineFileSize { - e.mutex.RLock() - db := e.db - e.mutex.RUnlock() + db := e.getDB() var total pebble.LevelMetrics if db != nil { @@ -834,7 +840,7 @@ func (e *Engine) flushEngineWithoutLock(ctx context.Context) error { return err } - flushFinishedCh, err := e.db.AsyncFlush() + flushFinishedCh, err := e.getDB().AsyncFlush() if err != nil { return errors.Trace(err) } @@ -862,11 +868,11 @@ func saveEngineMetaToDB(meta *engineMeta, db *pebble.DB) error { func (e *Engine) saveEngineMeta() error { log.L().Debug("save engine meta", zap.Stringer("uuid", e.UUID), zap.Int64("count", e.Length.Load()), zap.Int64("size", e.TotalSize.Load())) - return errors.Trace(saveEngineMetaToDB(&e.engineMeta, e.db)) + return errors.Trace(saveEngineMetaToDB(&e.engineMeta, e.getDB())) } func (e *Engine) loadEngineMeta() error { - jsonBytes, closer, err := e.db.Get(engineMetaKey) + jsonBytes, closer, err := e.getDB().Get(engineMetaKey) if err != nil { if err == pebble.ErrNotFound { log.L().Debug("local db missing engine meta", zap.Stringer("uuid", e.UUID), log.ShortError(err)) @@ -958,13 +964,13 @@ func (e *Engine) newKVIter(ctx context.Context, opts *pebble.IterOptions) Iter { opts = &newOpts } if !e.duplicateDetection { - return pebbleIter{Iterator: e.db.NewIter(opts)} + return pebbleIter{Iterator: e.getDB().NewIter(opts)} } logger := log.With( zap.String("table", common.UniqueTable(e.tableInfo.DB, e.tableInfo.Name)), zap.Int64("tableID", e.tableInfo.ID), zap.Stringer("engineUUID", e.UUID)) - return newDupDetectIter(ctx, e.db, e.keyAdapter, opts, e.duplicateDB, logger) + return newDupDetectIter(ctx, e.getDB(), e.keyAdapter, opts, e.duplicateDB, logger) } type sstMeta struct { @@ -1478,8 +1484,9 @@ func (i dbSSTIngester) ingest(metas []*sstMeta) error { for _, m := range metas { paths = append(paths, m.path) } - if i.e.db == nil { + db := i.e.getDB() + if db == nil { return errorEngineClosed } - return i.e.db.Ingest(paths) + return db.Ingest(paths) } diff --git a/br/pkg/lightning/backend/local/engine_test.go b/br/pkg/lightning/backend/local/engine_test.go index d78aa29ee1c36..421d1fc886dba 100644 --- a/br/pkg/lightning/backend/local/engine_test.go +++ b/br/pkg/lightning/backend/local/engine_test.go @@ -21,7 +21,9 @@ import ( "os" "path" "path/filepath" + "sync" "testing" + "unsafe" "github.com/cockroachdb/pebble" "github.com/cockroachdb/pebble/sstable" @@ -31,6 +33,53 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/backend" ) +func makePebbleDB(t *testing.T, opt *pebble.Options) (*pebble.DB, string) { + dir := t.TempDir() + db, err := pebble.Open(path.Join(dir, "test"), opt) + require.NoError(t, err) + tmpPath := filepath.Join(dir, "test.sst") + err = os.Mkdir(tmpPath, 0o755) + require.NoError(t, err) + return db, tmpPath +} + +func TestGetEngineSizeWhenImport(t *testing.T) { + opt := &pebble.Options{ + MemTableSize: 1024 * 1024, + MaxConcurrentCompactions: 16, + L0CompactionThreshold: math.MaxInt32, // set to max try to disable compaction + L0StopWritesThreshold: math.MaxInt32, // set to max try to disable compaction + DisableWAL: true, + ReadOnly: false, + } + db, tmpPath := makePebbleDB(t, opt) + + _, engineUUID := backend.MakeUUID("ww", 0) + engineCtx, cancel := context.WithCancel(context.Background()) + f := &Engine{ + UUID: engineUUID, + sstDir: tmpPath, + ctx: engineCtx, + cancel: cancel, + sstMetasChan: make(chan metaOrFlush, 64), + keyAdapter: noopKeyAdapter{}, + } + f.db.Store(unsafe.Pointer(db)) + // simulate import + f.lock(importMutexStateImport) + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + engineFileSize := f.getEngineFileSize() + require.Equal(t, f.UUID, engineFileSize.UUID) + require.True(t, engineFileSize.IsImporting) + }() + wg.Wait() + f.unlock() + require.NoError(t, f.Close()) +} + func TestIngestSSTWithClosedEngine(t *testing.T) { dir := t.TempDir() opt := &pebble.Options{ @@ -50,7 +99,6 @@ func TestIngestSSTWithClosedEngine(t *testing.T) { _, engineUUID := backend.MakeUUID("ww", 0) engineCtx, cancel := context.WithCancel(context.Background()) f := &Engine{ - db: db, UUID: engineUUID, sstDir: tmpPath, ctx: engineCtx, @@ -58,6 +106,7 @@ func TestIngestSSTWithClosedEngine(t *testing.T) { sstMetasChan: make(chan metaOrFlush, 64), keyAdapter: noopKeyAdapter{}, } + f.db.Store(unsafe.Pointer(db)) f.sstIngester = dbSSTIngester{e: f} sstPath := path.Join(tmpPath, uuid.New().String()+".sst") file, err := os.Create(sstPath) diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 6e86849858d4e..dc2a734f1c82a 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -25,6 +25,7 @@ import ( "strings" "sync" "time" + "unsafe" "github.com/cockroachdb/pebble" "github.com/coreos/go-semver/semver" @@ -619,7 +620,7 @@ func (local *local) OpenEngine(ctx context.Context, cfg *backend.EngineConfig, e keyAdapter: local.keyAdapter, }) engine := e.(*Engine) - engine.db = db + engine.db.Store(unsafe.Pointer(db)) engine.sstIngester = dbSSTIngester{e: engine} if err = engine.loadEngineMeta(); err != nil { return errors.Trace(err) @@ -658,7 +659,6 @@ func (local *local) CloseEngine(ctx context.Context, cfg *backend.EngineConfig, } engine := &Engine{ UUID: engineUUID, - db: db, sstMetasChan: make(chan metaOrFlush), tableInfo: cfg.TableInfo, keyAdapter: local.keyAdapter, @@ -666,6 +666,7 @@ func (local *local) CloseEngine(ctx context.Context, cfg *backend.EngineConfig, duplicateDB: local.duplicateDB, errorMgr: local.errorMgr, } + engine.db.Store(unsafe.Pointer(db)) engine.sstIngester = dbSSTIngester{e: engine} if err = engine.loadEngineMeta(); err != nil { return err @@ -1023,7 +1024,7 @@ func (local *local) readAndSplitIntoRange(ctx context.Context, engine *Engine, r } logger := log.With(zap.Stringer("engine", engine.UUID)) - sizeProps, err := getSizeProperties(logger, engine.db, local.keyAdapter) + sizeProps, err := getSizeProperties(logger, engine.getDB(), local.keyAdapter) if err != nil { return nil, errors.Trace(err) } @@ -1587,7 +1588,7 @@ func (local *local) ResetEngine(ctx context.Context, engineUUID uuid.UUID) error } db, err := local.openEngineDB(engineUUID, false) if err == nil { - localEngine.db = db + localEngine.db.Store(unsafe.Pointer(db)) localEngine.engineMeta = engineMeta{} if !common.IsDirExists(localEngine.sstDir) { if err := os.Mkdir(localEngine.sstDir, 0o750); err != nil { diff --git a/br/pkg/lightning/backend/local/local_test.go b/br/pkg/lightning/backend/local/local_test.go index 5e7f21bdb3ca9..f0a0294d82d2b 100644 --- a/br/pkg/lightning/backend/local/local_test.go +++ b/br/pkg/lightning/backend/local/local_test.go @@ -28,6 +28,7 @@ import ( "sync" "sync/atomic" "testing" + "unsafe" "github.com/cockroachdb/pebble" "github.com/coreos/go-semver/semver" @@ -337,7 +338,6 @@ func testLocalWriter(t *testing.T, needSort bool, partitialSort bool) { _, engineUUID := backend.MakeUUID("ww", 0) engineCtx, cancel := context.WithCancel(context.Background()) f := &Engine{ - db: db, UUID: engineUUID, sstDir: tmpPath, ctx: engineCtx, @@ -345,6 +345,7 @@ func testLocalWriter(t *testing.T, needSort bool, partitialSort bool) { sstMetasChan: make(chan metaOrFlush, 64), keyAdapter: noopKeyAdapter{}, } + f.db.Store(unsafe.Pointer(db)) f.sstIngester = dbSSTIngester{e: f} f.wg.Add(1) go f.ingestSSTLoop() @@ -594,7 +595,6 @@ func TestLocalIngestLoop(t *testing.T) { _, engineUUID := backend.MakeUUID("ww", 0) engineCtx, cancel := context.WithCancel(context.Background()) f := Engine{ - db: db, UUID: engineUUID, sstDir: "", ctx: engineCtx, @@ -606,6 +606,7 @@ func TestLocalIngestLoop(t *testing.T) { CompactConcurrency: 4, }, } + f.db.Store(unsafe.Pointer(db)) f.sstIngester = testIngester{} f.wg.Add(1) go f.ingestSSTLoop() @@ -811,7 +812,6 @@ func testMergeSSTs(t *testing.T, kvs [][]common.KvPair, meta *sstMeta) { engineCtx, cancel := context.WithCancel(context.Background()) f := &Engine{ - db: db, UUID: engineUUID, sstDir: tmpPath, ctx: engineCtx, @@ -823,6 +823,7 @@ func testMergeSSTs(t *testing.T, kvs [][]common.KvPair, meta *sstMeta) { CompactConcurrency: 4, }, } + f.db.Store(unsafe.Pointer(db)) createSSTWriter := func() (*sstWriter, error) { path := filepath.Join(f.sstDir, uuid.New().String()+".sst")