Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(tsm1): "snapshot in progress" error during backup #19869

Merged
merged 3 commits into from
Nov 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tsdb/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type Engine interface {

LoadMetadataIndex(shardID uint64, index Index) error

CreateSnapshot() (string, error)
CreateSnapshot(skipCacheOk bool) (string, error)
Backup(w io.Writer, basePath string, since time.Time) error
Export(w io.Writer, basePath string, start time.Time, end time.Time) error
Restore(r io.Reader, basePath string) error
Expand Down
34 changes: 14 additions & 20 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -912,26 +912,16 @@ func (e *Engine) Free() error {
// of the files in the archive. It will force a snapshot of the WAL first
// then perform the backup with a read lock against the file store. This means
// that new TSM files will not be able to be created in this shard while the
// backup is running. For shards that are still acively getting writes, this
// could cause the WAL to backup, increasing memory usage and evenutally rejecting writes.
// backup is running. For shards that are still actively getting writes, this
// could cause the WAL to backup, increasing memory usage and eventually rejecting writes.
func (e *Engine) Backup(w io.Writer, basePath string, since time.Time) error {
var err error
var path string
for i := 0; i < 3; i++ {
path, err = e.CreateSnapshot()
if err != nil {
switch err {
case ErrSnapshotInProgress:
backoff := time.Duration(math.Pow(32, float64(i))) * time.Millisecond
time.Sleep(backoff)
default:
return err
}
}
}
if err == ErrSnapshotInProgress {
e.logger.Warn("Snapshotter busy: Backup proceeding without snapshot contents.")
path, err = e.CreateSnapshot(true)
if err != nil {
return err
}

// Remove the temporary snapshot dir
defer func() {
if err := os.RemoveAll(path); err != nil {
Expand Down Expand Up @@ -998,7 +988,7 @@ func (e *Engine) timeStampFilterTarFile(start, end time.Time) func(f os.FileInfo
}

func (e *Engine) Export(w io.Writer, basePath string, start time.Time, end time.Time) error {
path, err := e.CreateSnapshot()
path, err := e.CreateSnapshot(false)
if err != nil {
return err
}
Expand Down Expand Up @@ -1947,9 +1937,13 @@ func (e *Engine) WriteSnapshot() (err error) {
}

dgnorton marked this conversation as resolved.
Show resolved Hide resolved
// CreateSnapshot will create a temp directory that holds
// temporary hardlinks to the underylyng shard files.
func (e *Engine) CreateSnapshot() (string, error) {
if err := e.WriteSnapshot(); err != nil {
// temporary hardlinks to the underlying shard files.
// skipCacheOk controls whether it is permissible to fail writing out
// in-memory cache data when a previous snapshot is in progress
func (e *Engine) CreateSnapshot(skipCacheOk bool) (string, error) {
if err := e.WriteSnapshot(); (err == ErrSnapshotInProgress) && skipCacheOk {
e.logger.Warn("Snapshotter busy: proceeding without cache contents.")
} else if err != nil {
return "", err
}

Expand Down
116 changes: 116 additions & 0 deletions tsdb/engine/tsm1/engine_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package tsm1

import (
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/index/inmem"
"io/ioutil"
"os"
"path/filepath"
"runtime"
"testing"
"time"
)

func TestEngine_ConcurrentShardSnapshots(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("Skipping on windows")
}

tmpDir, err := ioutil.TempDir("", "shard_test")
if err != nil {
t.Fatalf("error creating temporary directory: %s", err.Error())
}
defer os.RemoveAll(tmpDir)
tmpShard := filepath.Join(tmpDir, "shard")
tmpWal := filepath.Join(tmpDir, "wal")

sfile := NewSeriesFile(tmpDir)
defer sfile.Close()

opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
opts.InmemIndex = inmem.NewIndex(filepath.Base(tmpDir), sfile)
opts.SeriesIDSets = seriesIDSets([]*tsdb.SeriesIDSet{})

sh := tsdb.NewShard(1, tmpShard, tmpWal, sfile, opts)
if err := sh.Open(); err != nil {
t.Fatalf("error opening shard: %s", err.Error())
}
defer sh.Close()

points := make([]models.Point, 0, 10000)
for i := 0; i < cap(points); i++ {
points = append(points, models.MustNewPoint(
"cpu",
models.NewTags(map[string]string{"host": "server"}),
map[string]interface{}{"value": 1.0},
time.Unix(int64(i), 0),
))
}
err = sh.WritePoints(points)
if err != nil {
t.Fatalf(err.Error())
}

engineInterface, err := sh.Engine()
if err != nil {
t.Fatalf("error retrieving shard.Engine(): %s", err.Error())
}

// Get the struct underlying the interface. Not a recommended practice.
realEngineStruct, ok := (engineInterface).(*Engine)
if !ok {
t.Log("Engine type does not permit simulating Cache race conditions")
return
}
// fake a race condition in snapshotting the cache.
realEngineStruct.Cache.snapshotting = true
defer func() {
realEngineStruct.Cache.snapshotting = false
}()

snapshotFunc := func(skipCacheOk bool) {
if f, err := sh.CreateSnapshot(skipCacheOk); err == nil {
if err = os.RemoveAll(f); err != nil {
t.Fatalf("Failed to clean up in TestEngine_ConcurrentShardSnapshots: %s", err.Error())
}
} else if err == ErrSnapshotInProgress {
if skipCacheOk {
t.Fatalf("failing to ignore this error,: %s", err.Error())
}
} else if err != nil {
t.Fatalf("error creating shard snapshot: %s", err.Error())
}
}

// Permit skipping cache in the snapshot
snapshotFunc(true)
// do not permit skipping the cache in the snapshot
snapshotFunc(false)
realEngineStruct.Cache.snapshotting = false
}

// NewSeriesFile returns a new instance of SeriesFile with a temporary file path.
func NewSeriesFile(tmpDir string) *tsdb.SeriesFile {
dir, err := ioutil.TempDir(tmpDir, "tsdb-series-file-")
if err != nil {
panic(err)
}
f := tsdb.NewSeriesFile(dir)
f.Logger = logger.New(os.Stdout)
if err := f.Open(); err != nil {
panic(err)
}
return f
}

type seriesIDSets []*tsdb.SeriesIDSet

func (a seriesIDSets) ForEach(f func(ids *tsdb.SeriesIDSet)) error {
for _, v := range a {
f(v)
}
return nil
}
4 changes: 2 additions & 2 deletions tsdb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -1153,12 +1153,12 @@ func (s *Shard) Restore(r io.Reader, basePath string) error {

// CreateSnapshot will return a path to a temp directory
// containing hard links to the underlying shard files.
func (s *Shard) CreateSnapshot() (string, error) {
func (s *Shard) CreateSnapshot(skipCacheOk bool) (string, error) {
engine, err := s.Engine()
if err != nil {
return "", err
}
return engine.CreateSnapshot()
return engine.CreateSnapshot(skipCacheOk)
}

// ForEachMeasurementName iterates over each measurement in the shard.
Expand Down
4 changes: 2 additions & 2 deletions tsdb/shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ func TestShard_WritePoints_FieldConflictConcurrent(t *testing.T) {
}

_ = sh.WritePoints(points[:500])
if f, err := sh.CreateSnapshot(); err == nil {
if f, err := sh.CreateSnapshot(false); err == nil {
os.RemoveAll(f)
}

Expand All @@ -472,7 +472,7 @@ func TestShard_WritePoints_FieldConflictConcurrent(t *testing.T) {
}

_ = sh.WritePoints(points[500:])
if f, err := sh.CreateSnapshot(); err == nil {
if f, err := sh.CreateSnapshot(false); err == nil {
os.RemoveAll(f)
}
}
Expand Down
4 changes: 2 additions & 2 deletions tsdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,13 +675,13 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64, en

// CreateShardSnapShot will create a hard link to the underlying shard and return a path.
// The caller is responsible for cleaning up (removing) the file path returned.
func (s *Store) CreateShardSnapshot(id uint64) (string, error) {
func (s *Store) CreateShardSnapshot(id uint64, skipCacheOk bool) (string, error) {
sh := s.Shard(id)
if sh == nil {
return "", ErrShardNotFound
}

return sh.CreateSnapshot()
return sh.CreateSnapshot(skipCacheOk)
}

// SetShardEnabled enables or disables a shard for read and writes.
Expand Down
2 changes: 1 addition & 1 deletion tsdb/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ func TestStore_CreateShardSnapShot(t *testing.T) {
t.Fatalf("expected shard")
}

dir, e := s.CreateShardSnapshot(1)
dir, e := s.CreateShardSnapshot(1, false)
if e != nil {
t.Fatal(e)
}
Expand Down