diff --git a/pkg/storage/engine.go b/pkg/storage/engine.go index 19f9666b5bdc..e3745244d76a 100644 --- a/pkg/storage/engine.go +++ b/pkg/storage/engine.go @@ -398,8 +398,8 @@ type EngineIterator interface { // CloneContext is an opaque type encapsulating sufficient context to construct // a clone of an existing iterator. type CloneContext struct { - rawIter pebbleiter.Iterator - statsReporter iterStatsReporter + rawIter pebbleiter.Iterator + engine *Pebble } // IterOptions contains options used to create an {MVCC,Engine}Iterator. diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go index f89d2c945fb9..fb937ce9d855 100644 --- a/pkg/storage/pebble.go +++ b/pkg/storage/pebble.go @@ -1282,10 +1282,32 @@ func (p *Pebble) async(fn func()) { }() } +// writePreventStartupFile creates a file that will prevent nodes from automatically restarting after +// experiencing sstable corruption. +func (p *Pebble) writePreventStartupFile(ctx context.Context, corruptionError error) { + auxDir := p.GetAuxiliaryDir() + path := base.PreventedStartupFile(auxDir) + + preventStartupMsg := fmt.Sprintf(`ATTENTION: + + this node is terminating because of sstable corruption. + Corruption may be a consequence of a hardware error. + + Error: %s + + A file preventing this node from restarting was placed at: + %s`, corruptionError.Error(), path) + + if err := fs.WriteFile(p.unencryptedFS, path, []byte(preventStartupMsg)); err != nil { + log.Warningf(ctx, "%v", err) + } +} + func (p *Pebble) makeMetricEtcEventListener(ctx context.Context) pebble.EventListener { return pebble.EventListener{ BackgroundError: func(err error) { if errors.Is(err, pebble.ErrCorruption) { + p.writePreventStartupFile(ctx, err) log.Fatalf(ctx, "local corruption detected: %v", err) } }, @@ -2446,8 +2468,8 @@ func (p *pebbleReadOnly) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions } if iter.inuse { return newPebbleIteratorByCloning(CloneContext{ - rawIter: p.iter, - statsReporter: p.parent, + rawIter: p.iter, + engine: p.parent, }, opts, p.durability) } @@ -2479,8 +2501,8 @@ func (p *pebbleReadOnly) NewEngineIterator(opts IterOptions) EngineIterator { } if iter.inuse { return newPebbleIteratorByCloning(CloneContext{ - rawIter: p.iter, - statsReporter: p.parent, + rawIter: p.iter, + engine: p.parent, }, opts, p.durability) } diff --git a/pkg/storage/pebble_batch.go b/pkg/storage/pebble_batch.go index 3dfc6022172d..2bcbd0519ead 100644 --- a/pkg/storage/pebble_batch.go +++ b/pkg/storage/pebble_batch.go @@ -57,7 +57,7 @@ type pebbleBatch struct { // scratch space for wrappedIntentWriter. scratch []byte - iterStatsReporter iterStatsReporter + parent *Pebble batchStatsReporter batchStatsReporter settings *cluster.Settings mayWriteSizedDeletes bool @@ -83,7 +83,7 @@ func newPebbleBatch( batch *pebble.Batch, writeOnly bool, settings *cluster.Settings, - iterStatsReporter iterStatsReporter, + parent *Pebble, batchStatsReporter batchStatsReporter, ) *pebbleBatch { pb := pebbleBatchPool.Get().(*pebbleBatch) @@ -112,7 +112,7 @@ func newPebbleBatch( reusable: true, }, writeOnly: writeOnly, - iterStatsReporter: iterStatsReporter, + parent: parent, batchStatsReporter: batchStatsReporter, settings: settings, // NB: We do not use settings.Version.IsActive because we do not @@ -202,15 +202,15 @@ func (p *pebbleBatch) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions) M } if iter.inuse { return newPebbleIteratorByCloning(CloneContext{ - rawIter: p.iter, - statsReporter: p.iterStatsReporter, + rawIter: p.iter, + engine: p.parent, }, opts, StandardDurability) } if iter.iter != nil { iter.setOptions(opts, StandardDurability) } else { - iter.initReuseOrCreate(handle, p.iter, p.iterUsed, opts, StandardDurability, p.iterStatsReporter) + iter.initReuseOrCreate(handle, p.iter, p.iterUsed, opts, StandardDurability, p.parent) if p.iter == nil { // For future cloning. p.iter = iter.iter @@ -238,15 +238,15 @@ func (p *pebbleBatch) NewEngineIterator(opts IterOptions) EngineIterator { } if iter.inuse { return newPebbleIteratorByCloning(CloneContext{ - rawIter: p.iter, - statsReporter: p.iterStatsReporter, + rawIter: p.iter, + engine: p.parent, }, opts, StandardDurability) } if iter.iter != nil { iter.setOptions(opts, StandardDurability) } else { - iter.initReuseOrCreate(handle, p.iter, p.iterUsed, opts, StandardDurability, p.iterStatsReporter) + iter.initReuseOrCreate(handle, p.iter, p.iterUsed, opts, StandardDurability, p.parent) if p.iter == nil { // For future cloning. p.iter = iter.iter diff --git a/pkg/storage/pebble_iterator.go b/pkg/storage/pebble_iterator.go index b77b72a663d1..a0a526bfa391 100644 --- a/pkg/storage/pebble_iterator.go +++ b/pkg/storage/pebble_iterator.go @@ -12,6 +12,7 @@ package storage import ( "bytes" + "context" "math" "sync" @@ -46,10 +47,9 @@ type pebbleIterator struct { // Buffer used to store MVCCRangeKeyVersions returned by RangeKeys(). Lazily // initialized the first time an iterator's RangeKeys() method is called. mvccRangeKeyVersions []MVCCRangeKeyVersion - // statsReporter is used to sum iterator stats across all the iterators - // during the lifetime of the Engine when the iterator is closed or its - // stats reset. It's intended to be used with (*Pebble). It must not be nil. - statsReporter iterStatsReporter + + // parent is a pointer to the Engine from which the iterator was constructed. + parent *Pebble // Set to true to govern whether to call SeekPrefixGE or SeekGE. Skips // SSTables based on MVCC/Engine key when true. @@ -75,16 +75,6 @@ type pebbleIterator struct { mvccDone bool } -type iterStatsReporter interface { - aggregateIterStats(IteratorStats) -} - -var noopStatsReporter = noopStatsReporterImpl{} - -type noopStatsReporterImpl struct{} - -func (noopStatsReporterImpl) aggregateIterStats(IteratorStats) {} - var _ MVCCIterator = &pebbleIterator{} var _ EngineIterator = &pebbleIterator{} @@ -96,14 +86,11 @@ var pebbleIterPool = sync.Pool{ // newPebbleIterator creates a new Pebble iterator for the given Pebble reader. func newPebbleIterator( - handle pebble.Reader, - opts IterOptions, - durability DurabilityRequirement, - statsReporter iterStatsReporter, + handle pebble.Reader, opts IterOptions, durability DurabilityRequirement, parent *Pebble, ) *pebbleIterator { p := pebbleIterPool.Get().(*pebbleIterator) p.reusable = false // defensive - p.init(nil, opts, durability, statsReporter) + p.init(nil, opts, durability, parent) p.iter = pebbleiter.MaybeWrap(handle.NewIter(&p.options)) return p } @@ -116,7 +103,7 @@ func newPebbleIteratorByCloning( var err error p := pebbleIterPool.Get().(*pebbleIterator) p.reusable = false // defensive - p.init(nil, opts, durability, cloneCtx.statsReporter) + p.init(nil, opts, durability, cloneCtx.engine) p.iter, err = cloneCtx.rawIter.Clone(pebble.CloneOptions{ IterOptions: &p.options, RefreshBatchView: true, @@ -134,7 +121,7 @@ func newPebbleSSTIterator( ) (*pebbleIterator, error) { p := pebbleIterPool.Get().(*pebbleIterator) p.reusable = false // defensive - p.init(nil, opts, StandardDurability, noopStatsReporter) + p.init(nil, opts, StandardDurability, nil) var externalIterOpts []pebble.ExternalIterOption if forwardOnly { @@ -158,7 +145,7 @@ func (p *pebbleIterator) init( iter pebbleiter.Iterator, opts IterOptions, durability DurabilityRequirement, - statsReporter iterStatsReporter, + statsReporter *Pebble, ) { *p = pebbleIterator{ iter: iter, @@ -166,7 +153,7 @@ func (p *pebbleIterator) init( lowerBoundBuf: p.lowerBoundBuf, upperBoundBuf: p.upperBoundBuf, rangeKeyMaskingBuf: p.rangeKeyMaskingBuf, - statsReporter: statsReporter, + parent: statsReporter, reusable: p.reusable, } p.setOptions(opts, durability) @@ -185,7 +172,7 @@ func (p *pebbleIterator) initReuseOrCreate( clone bool, opts IterOptions, durability DurabilityRequirement, - statsReporter iterStatsReporter, + statsReporter *Pebble, ) { if iter != nil && !clone { p.init(iter, opts, durability, statsReporter) @@ -315,8 +302,8 @@ func (p *pebbleIterator) Close() { // Report the iterator's stats so they can be accumulated and exposed // through time-series metrics. - if p.iter != nil { - p.statsReporter.aggregateIterStats(p.Stats()) + if p.iter != nil && p.parent != nil { + p.parent.aggregateIterStats(p.Stats()) } if p.reusable { @@ -922,7 +909,7 @@ func (p *pebbleIterator) IsPrefix() bool { // CloneContext is part of the EngineIterator interface. func (p *pebbleIterator) CloneContext() CloneContext { - return CloneContext{rawIter: p.iter, statsReporter: p.statsReporter} + return CloneContext{rawIter: p.iter, engine: p.parent} } func (p *pebbleIterator) getBlockPropertyFilterMask() pebble.BlockPropertyFilterMask { @@ -955,7 +942,11 @@ func (p *pebbleIterator) destroy() { // // NB: The panic is omitted if the error is encountered on an external // iterator which is iterating over uncommitted sstables. + if err := p.iter.Close(); !p.external && errors.Is(err, pebble.ErrCorruption) { + if p.parent != nil { + p.parent.writePreventStartupFile(context.Background(), err) + } panic(err) } p.iter = nil diff --git a/pkg/storage/pebble_iterator_test.go b/pkg/storage/pebble_iterator_test.go index 29b957fc322e..c2bff73d42ed 100644 --- a/pkg/storage/pebble_iterator_test.go +++ b/pkg/storage/pebble_iterator_test.go @@ -20,6 +20,7 @@ import ( "strings" "testing" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -72,7 +73,7 @@ func TestPebbleIterator_Corruption(t *testing.T) { LowerBound: []byte("a"), UpperBound: []byte("z"), } - iter := newPebbleIterator(p.db, iterOpts, StandardDurability, noopStatsReporter) + iter := newPebbleIterator(p.db, iterOpts, StandardDurability, p) // Seeking into the table catches the corruption. ok, err := iter.SeekEngineKeyGE(ek) @@ -81,6 +82,10 @@ func TestPebbleIterator_Corruption(t *testing.T) { // Closing the iter results in a panic due to the corruption. require.Panics(t, func() { iter.Close() }) + + // Should have laid down marker file to prevent startup. + _, err = p.Stat(base.PreventedStartupFile(p.GetAuxiliaryDir())) + require.NoError(t, err) } func randStr(fill []byte, rng *rand.Rand) {