Skip to content

Commit

Permalink
storage: Write PreventStartupFile on Node SSTFile Corruption
Browse files Browse the repository at this point in the history
Currently if a node faces sstable corruption, that node will crash and
try to automatically restart. Since it is likely that the node may crash
again, we would like to prevent the node from attempting to restart
itself. As a result, this pr created a `PreventStartupFile` when a node
experiences sstable corruption.

Fixes: cockroachdb#103899
Release note: None
  • Loading branch information
raggar committed Aug 2, 2023
1 parent e9add29 commit b45b084
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 42 deletions.
4 changes: 2 additions & 2 deletions pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,8 +397,8 @@ type EngineIterator interface {
// CloneContext is an opaque type encapsulating sufficient context to construct
// a clone of an existing iterator.
type CloneContext struct {
rawIter pebbleiter.Iterator
statsReporter iterStatsReporter
rawIter pebbleiter.Iterator
engine *Pebble
}

// IterOptions contains options used to create an {MVCC,Engine}Iterator.
Expand Down
28 changes: 24 additions & 4 deletions pkg/storage/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -1268,10 +1268,30 @@ func (p *Pebble) async(fn func()) {
}()
}

// writePreventStartupFile creates a file that will prevent nodes from automatically restarting after
// experiencing sstable corruption.
func (p *Pebble) writePreventStartupFile(ctx context.Context) {
auxDir := p.GetAuxiliaryDir()
path := base.PreventedStartupFile(auxDir)

preventStartupMsg := fmt.Sprintf(`ATTENTION:
this node is terminating because of sstable corruption.
Corruption may be a consequence of a hardware error.
A file preventing this node from restarting was placed at:
%s`, path)

if err := fs.WriteFile(p.unencryptedFS, path, []byte(preventStartupMsg)); err != nil {
log.Warningf(ctx, "%v", err)
}
}

func (p *Pebble) makeMetricEtcEventListener(ctx context.Context) pebble.EventListener {
return pebble.EventListener{
BackgroundError: func(err error) {
if errors.Is(err, pebble.ErrCorruption) {
p.writePreventStartupFile(ctx)
log.Fatalf(ctx, "local corruption detected: %v", err)
}
},
Expand Down Expand Up @@ -2362,8 +2382,8 @@ func (p *pebbleReadOnly) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions
}
if iter.inuse {
return newPebbleIteratorByCloning(CloneContext{
rawIter: p.iter,
statsReporter: p.parent,
rawIter: p.iter,
engine: p.parent,
}, opts, p.durability)
}

Expand Down Expand Up @@ -2395,8 +2415,8 @@ func (p *pebbleReadOnly) NewEngineIterator(opts IterOptions) EngineIterator {
}
if iter.inuse {
return newPebbleIteratorByCloning(CloneContext{
rawIter: p.iter,
statsReporter: p.parent,
rawIter: p.iter,
engine: p.parent,
}, opts, p.durability)
}

Expand Down
18 changes: 9 additions & 9 deletions pkg/storage/pebble_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type pebbleBatch struct {
// scratch space for wrappedIntentWriter.
scratch []byte

iterStatsReporter iterStatsReporter
parent *Pebble
batchStatsReporter batchStatsReporter
settings *cluster.Settings
mayWriteSizedDeletes bool
Expand All @@ -82,7 +82,7 @@ func newPebbleBatch(
batch *pebble.Batch,
writeOnly bool,
settings *cluster.Settings,
iterStatsReporter iterStatsReporter,
parent *Pebble,
batchStatsReporter batchStatsReporter,
) *pebbleBatch {
pb := pebbleBatchPool.Get().(*pebbleBatch)
Expand Down Expand Up @@ -111,7 +111,7 @@ func newPebbleBatch(
reusable: true,
},
writeOnly: writeOnly,
iterStatsReporter: iterStatsReporter,
parent: parent,
batchStatsReporter: batchStatsReporter,
settings: settings,
// NB: We do not use settings.Version.IsActive because we do not
Expand Down Expand Up @@ -201,15 +201,15 @@ func (p *pebbleBatch) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions) M
}
if iter.inuse {
return newPebbleIteratorByCloning(CloneContext{
rawIter: p.iter,
statsReporter: p.iterStatsReporter,
rawIter: p.iter,
engine: p.parent,
}, opts, StandardDurability)
}

if iter.iter != nil {
iter.setOptions(opts, StandardDurability)
} else {
iter.initReuseOrCreate(handle, p.iter, p.iterUsed, opts, StandardDurability, p.iterStatsReporter)
iter.initReuseOrCreate(handle, p.iter, p.iterUsed, opts, StandardDurability, p.parent)
if p.iter == nil {
// For future cloning.
p.iter = iter.iter
Expand Down Expand Up @@ -237,15 +237,15 @@ func (p *pebbleBatch) NewEngineIterator(opts IterOptions) EngineIterator {
}
if iter.inuse {
return newPebbleIteratorByCloning(CloneContext{
rawIter: p.iter,
statsReporter: p.iterStatsReporter,
rawIter: p.iter,
engine: p.parent,
}, opts, StandardDurability)
}

if iter.iter != nil {
iter.setOptions(opts, StandardDurability)
} else {
iter.initReuseOrCreate(handle, p.iter, p.iterUsed, opts, StandardDurability, p.iterStatsReporter)
iter.initReuseOrCreate(handle, p.iter, p.iterUsed, opts, StandardDurability, p.parent)
if p.iter == nil {
// For future cloning.
p.iter = iter.iter
Expand Down
42 changes: 16 additions & 26 deletions pkg/storage/pebble_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package storage

import (
"bytes"
"context"
"math"
"sync"

Expand Down Expand Up @@ -46,10 +47,9 @@ type pebbleIterator struct {
// Buffer used to store MVCCRangeKeyVersions returned by RangeKeys(). Lazily
// initialized the first time an iterator's RangeKeys() method is called.
mvccRangeKeyVersions []MVCCRangeKeyVersion
// statsReporter is used to sum iterator stats across all the iterators
// during the lifetime of the Engine when the iterator is closed or its
// stats reset. It's intended to be used with (*Pebble). It must not be nil.
statsReporter iterStatsReporter

// parent is a pointer to the Engine from which the iterator was constructed.
parent *Pebble

// Set to true to govern whether to call SeekPrefixGE or SeekGE. Skips
// SSTables based on MVCC/Engine key when true.
Expand All @@ -75,16 +75,6 @@ type pebbleIterator struct {
mvccDone bool
}

type iterStatsReporter interface {
aggregateIterStats(IteratorStats)
}

var noopStatsReporter = noopStatsReporterImpl{}

type noopStatsReporterImpl struct{}

func (noopStatsReporterImpl) aggregateIterStats(IteratorStats) {}

var _ MVCCIterator = &pebbleIterator{}
var _ EngineIterator = &pebbleIterator{}

Expand All @@ -96,10 +86,7 @@ var pebbleIterPool = sync.Pool{

// newPebbleIterator creates a new Pebble iterator for the given Pebble reader.
func newPebbleIterator(
handle pebble.Reader,
opts IterOptions,
durability DurabilityRequirement,
statsReporter iterStatsReporter,
handle pebble.Reader, opts IterOptions, durability DurabilityRequirement, statsReporter *Pebble,
) *pebbleIterator {
p := pebbleIterPool.Get().(*pebbleIterator)
p.reusable = false // defensive
Expand All @@ -116,7 +103,7 @@ func newPebbleIteratorByCloning(
var err error
p := pebbleIterPool.Get().(*pebbleIterator)
p.reusable = false // defensive
p.init(nil, opts, durability, cloneCtx.statsReporter)
p.init(nil, opts, durability, cloneCtx.engine)
p.iter, err = cloneCtx.rawIter.Clone(pebble.CloneOptions{
IterOptions: &p.options,
RefreshBatchView: true,
Expand All @@ -134,7 +121,7 @@ func newPebbleSSTIterator(
) (*pebbleIterator, error) {
p := pebbleIterPool.Get().(*pebbleIterator)
p.reusable = false // defensive
p.init(nil, opts, StandardDurability, noopStatsReporter)
p.init(nil, opts, StandardDurability, nil)

var externalIterOpts []pebble.ExternalIterOption
if forwardOnly {
Expand All @@ -158,15 +145,15 @@ func (p *pebbleIterator) init(
iter pebbleiter.Iterator,
opts IterOptions,
durability DurabilityRequirement,
statsReporter iterStatsReporter,
statsReporter *Pebble,
) {
*p = pebbleIterator{
iter: iter,
keyBuf: p.keyBuf,
lowerBoundBuf: p.lowerBoundBuf,
upperBoundBuf: p.upperBoundBuf,
rangeKeyMaskingBuf: p.rangeKeyMaskingBuf,
statsReporter: statsReporter,
parent: statsReporter,
reusable: p.reusable,
}
p.setOptions(opts, durability)
Expand All @@ -185,7 +172,7 @@ func (p *pebbleIterator) initReuseOrCreate(
clone bool,
opts IterOptions,
durability DurabilityRequirement,
statsReporter iterStatsReporter,
statsReporter *Pebble,
) {
if iter != nil && !clone {
p.init(iter, opts, durability, statsReporter)
Expand Down Expand Up @@ -315,8 +302,8 @@ func (p *pebbleIterator) Close() {

// Report the iterator's stats so they can be accumulated and exposed
// through time-series metrics.
if p.iter != nil {
p.statsReporter.aggregateIterStats(p.Stats())
if p.iter != nil && p.parent != nil {
p.parent.aggregateIterStats(p.Stats())
}

if p.reusable {
Expand Down Expand Up @@ -922,7 +909,7 @@ func (p *pebbleIterator) IsPrefix() bool {

// CloneContext is part of the EngineIterator interface.
func (p *pebbleIterator) CloneContext() CloneContext {
return CloneContext{rawIter: p.iter, statsReporter: p.statsReporter}
return CloneContext{rawIter: p.iter, engine: p.parent}
}

func (p *pebbleIterator) getBlockPropertyFilterMask() pebble.BlockPropertyFilterMask {
Expand Down Expand Up @@ -956,6 +943,9 @@ func (p *pebbleIterator) destroy() {
// NB: The panic is omitted if the error is encountered on an external
// iterator which is iterating over uncommitted sstables.
if err := p.iter.Close(); !p.external && errors.Is(err, pebble.ErrCorruption) {
if p.parent != nil {
p.parent.writePreventStartupFile(context.Background())
}
panic(err)
}
p.iter = nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/pebble_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestPebbleIterator_Corruption(t *testing.T) {
LowerBound: []byte("a"),
UpperBound: []byte("z"),
}
iter := newPebbleIterator(p.db, iterOpts, StandardDurability, noopStatsReporter)
iter := newPebbleIterator(p.db, iterOpts, StandardDurability, nil /* statsReporter */)

// Seeking into the table catches the corruption.
ok, err := iter.SeekEngineKeyGE(ek)
Expand Down

0 comments on commit b45b084

Please sign in to comment.