Skip to content

Commit

Permalink
storage: Write PreventStartupFile on Node SSTFile Corruption
Browse files Browse the repository at this point in the history
Currently if a node faces sstable corruption, that node will crash and
try to automatically restart. Since it is likely that the node may crash
again, we would like to prevent the node from attempting to restart
itself. As a result, this pr created a `PreventStartupFile` when a node
experiences sstable corruption.

Fixes: cockroachdb#103899
Release-note: None
  • Loading branch information
raggar committed Aug 1, 2023
1 parent e9add29 commit 4c8ce7c
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 23 deletions.
2 changes: 1 addition & 1 deletion pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ type EngineIterator interface {
// a clone of an existing iterator.
type CloneContext struct {
rawIter pebbleiter.Iterator
statsReporter iterStatsReporter
statsReporter *Pebble
}

// IterOptions contains options used to create an {MVCC,Engine}Iterator.
Expand Down
19 changes: 19 additions & 0 deletions pkg/storage/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -1268,10 +1268,29 @@ func (p *Pebble) async(fn func()) {
}()
}

// writePreventStartupFile creates a file that will prevent nodes from automatically restarting after
// experiencing sstable corruption.
func (p *Pebble) writePreventStartupFile(ctx context.Context) {
auxDir := p.GetAuxiliaryDir()
_ = p.MkdirAll(auxDir, os.ModePerm)
path := base.PreventedStartupFile(auxDir)

preventStartupMsg := fmt.Sprintf(`ATTENTION:
this node is terminating because of sstable corruption.
Corruption may be a consequence of a hardware error.
%s`, path)

if err := fs.WriteFile(p.unencryptedFS, path, []byte(preventStartupMsg)); err != nil {
log.Warningf(ctx, "%v", err)
}
}

func (p *Pebble) makeMetricEtcEventListener(ctx context.Context) pebble.EventListener {
return pebble.EventListener{
BackgroundError: func(err error) {
if errors.Is(err, pebble.ErrCorruption) {
p.writePreventStartupFile(ctx)
log.Fatalf(ctx, "local corruption detected: %v", err)
}
},
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/pebble_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type pebbleBatch struct {
// scratch space for wrappedIntentWriter.
scratch []byte

iterStatsReporter iterStatsReporter
iterStatsReporter *Pebble
batchStatsReporter batchStatsReporter
settings *cluster.Settings
mayWriteSizedDeletes bool
Expand All @@ -82,7 +82,7 @@ func newPebbleBatch(
batch *pebble.Batch,
writeOnly bool,
settings *cluster.Settings,
iterStatsReporter iterStatsReporter,
iterStatsReporter *Pebble,
batchStatsReporter batchStatsReporter,
) *pebbleBatch {
pb := pebbleBatchPool.Get().(*pebbleBatch)
Expand Down
29 changes: 10 additions & 19 deletions pkg/storage/pebble_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package storage

import (
"bytes"
"context"
"math"
"sync"

Expand Down Expand Up @@ -49,7 +50,7 @@ type pebbleIterator struct {
// statsReporter is used to sum iterator stats across all the iterators
// during the lifetime of the Engine when the iterator is closed or its
// stats reset. It's intended to be used with (*Pebble). It must not be nil.
statsReporter iterStatsReporter
statsReporter *Pebble

// Set to true to govern whether to call SeekPrefixGE or SeekGE. Skips
// SSTables based on MVCC/Engine key when true.
Expand All @@ -75,16 +76,6 @@ type pebbleIterator struct {
mvccDone bool
}

type iterStatsReporter interface {
aggregateIterStats(IteratorStats)
}

var noopStatsReporter = noopStatsReporterImpl{}

type noopStatsReporterImpl struct{}

func (noopStatsReporterImpl) aggregateIterStats(IteratorStats) {}

var _ MVCCIterator = &pebbleIterator{}
var _ EngineIterator = &pebbleIterator{}

Expand All @@ -96,10 +87,7 @@ var pebbleIterPool = sync.Pool{

// newPebbleIterator creates a new Pebble iterator for the given Pebble reader.
func newPebbleIterator(
handle pebble.Reader,
opts IterOptions,
durability DurabilityRequirement,
statsReporter iterStatsReporter,
handle pebble.Reader, opts IterOptions, durability DurabilityRequirement, statsReporter *Pebble,
) *pebbleIterator {
p := pebbleIterPool.Get().(*pebbleIterator)
p.reusable = false // defensive
Expand Down Expand Up @@ -134,7 +122,7 @@ func newPebbleSSTIterator(
) (*pebbleIterator, error) {
p := pebbleIterPool.Get().(*pebbleIterator)
p.reusable = false // defensive
p.init(nil, opts, StandardDurability, noopStatsReporter)
p.init(nil, opts, StandardDurability, nil)

var externalIterOpts []pebble.ExternalIterOption
if forwardOnly {
Expand All @@ -158,7 +146,7 @@ func (p *pebbleIterator) init(
iter pebbleiter.Iterator,
opts IterOptions,
durability DurabilityRequirement,
statsReporter iterStatsReporter,
statsReporter *Pebble,
) {
*p = pebbleIterator{
iter: iter,
Expand All @@ -185,7 +173,7 @@ func (p *pebbleIterator) initReuseOrCreate(
clone bool,
opts IterOptions,
durability DurabilityRequirement,
statsReporter iterStatsReporter,
statsReporter *Pebble,
) {
if iter != nil && !clone {
p.init(iter, opts, durability, statsReporter)
Expand Down Expand Up @@ -315,7 +303,7 @@ func (p *pebbleIterator) Close() {

// Report the iterator's stats so they can be accumulated and exposed
// through time-series metrics.
if p.iter != nil {
if p.iter != nil && p.statsReporter != nil {
p.statsReporter.aggregateIterStats(p.Stats())
}

Expand Down Expand Up @@ -956,6 +944,9 @@ func (p *pebbleIterator) destroy() {
// NB: The panic is omitted if the error is encountered on an external
// iterator which is iterating over uncommitted sstables.
if err := p.iter.Close(); !p.external && errors.Is(err, pebble.ErrCorruption) {
if p.statsReporter != nil {
p.statsReporter.writePreventStartupFile(context.Background())
}
panic(err)
}
p.iter = nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/pebble_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestPebbleIterator_Corruption(t *testing.T) {
LowerBound: []byte("a"),
UpperBound: []byte("z"),
}
iter := newPebbleIterator(p.db, iterOpts, StandardDurability, noopStatsReporter)
iter := newPebbleIterator(p.db, iterOpts, StandardDurability, nil /* statsReporter */)

// Seeking into the table catches the corruption.
ok, err := iter.SeekEngineKeyGE(ek)
Expand Down

0 comments on commit 4c8ce7c

Please sign in to comment.