Skip to content

Commit

Permalink
Merge #107828
Browse files Browse the repository at this point in the history
107828: storage: Write PreventStartupFile on Node SSTFile Corruption r=RahulAggarwal1016 a=RahulAggarwal1016

Currently if a node faces sstable corruption, that node will crash and try to automatically restart. Since it is likely that the node may crash again, we would like to prevent the node from attempting to restart itself. As a result, this pr created a `PreventStartupFile` when a node experiences sstable corruption.

Note: There is another place where a node crashes due to sstable corruption: https://github.com/cockroachdb/cockroach/blob/ba053dd3ff75dbca557e3b21195ed4cdff250660/pkg/storage/pebble_iterator.go#L958 however I am not quite sure how to access a `Pebble` instance without it getting a little messy.  

Fixes: #103899
Release-note: None

Co-authored-by: Rahul Aggarwal <[email protected]>
  • Loading branch information
craig[bot] and raggar committed Aug 14, 2023
2 parents 7cd6a33 + bbfd162 commit 236d3b7
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 43 deletions.
4 changes: 2 additions & 2 deletions pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,8 +398,8 @@ type EngineIterator interface {
// CloneContext is an opaque type encapsulating sufficient context to construct
// a clone of an existing iterator.
type CloneContext struct {
rawIter pebbleiter.Iterator
statsReporter iterStatsReporter
rawIter pebbleiter.Iterator
engine *Pebble
}

// IterOptions contains options used to create an {MVCC,Engine}Iterator.
Expand Down
30 changes: 26 additions & 4 deletions pkg/storage/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -1282,10 +1282,32 @@ func (p *Pebble) async(fn func()) {
}()
}

// writePreventStartupFile creates a file that will prevent nodes from automatically restarting after
// experiencing sstable corruption.
func (p *Pebble) writePreventStartupFile(ctx context.Context, corruptionError error) {
auxDir := p.GetAuxiliaryDir()
path := base.PreventedStartupFile(auxDir)

preventStartupMsg := fmt.Sprintf(`ATTENTION:
this node is terminating because of sstable corruption.
Corruption may be a consequence of a hardware error.
Error: %s
A file preventing this node from restarting was placed at:
%s`, corruptionError.Error(), path)

if err := fs.WriteFile(p.unencryptedFS, path, []byte(preventStartupMsg)); err != nil {
log.Warningf(ctx, "%v", err)
}
}

func (p *Pebble) makeMetricEtcEventListener(ctx context.Context) pebble.EventListener {
return pebble.EventListener{
BackgroundError: func(err error) {
if errors.Is(err, pebble.ErrCorruption) {
p.writePreventStartupFile(ctx, err)
log.Fatalf(ctx, "local corruption detected: %v", err)
}
},
Expand Down Expand Up @@ -2446,8 +2468,8 @@ func (p *pebbleReadOnly) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions
}
if iter.inuse {
return newPebbleIteratorByCloning(CloneContext{
rawIter: p.iter,
statsReporter: p.parent,
rawIter: p.iter,
engine: p.parent,
}, opts, p.durability)
}

Expand Down Expand Up @@ -2479,8 +2501,8 @@ func (p *pebbleReadOnly) NewEngineIterator(opts IterOptions) EngineIterator {
}
if iter.inuse {
return newPebbleIteratorByCloning(CloneContext{
rawIter: p.iter,
statsReporter: p.parent,
rawIter: p.iter,
engine: p.parent,
}, opts, p.durability)
}

Expand Down
18 changes: 9 additions & 9 deletions pkg/storage/pebble_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type pebbleBatch struct {
// scratch space for wrappedIntentWriter.
scratch []byte

iterStatsReporter iterStatsReporter
parent *Pebble
batchStatsReporter batchStatsReporter
settings *cluster.Settings
mayWriteSizedDeletes bool
Expand All @@ -83,7 +83,7 @@ func newPebbleBatch(
batch *pebble.Batch,
writeOnly bool,
settings *cluster.Settings,
iterStatsReporter iterStatsReporter,
parent *Pebble,
batchStatsReporter batchStatsReporter,
) *pebbleBatch {
pb := pebbleBatchPool.Get().(*pebbleBatch)
Expand Down Expand Up @@ -112,7 +112,7 @@ func newPebbleBatch(
reusable: true,
},
writeOnly: writeOnly,
iterStatsReporter: iterStatsReporter,
parent: parent,
batchStatsReporter: batchStatsReporter,
settings: settings,
// NB: We do not use settings.Version.IsActive because we do not
Expand Down Expand Up @@ -202,15 +202,15 @@ func (p *pebbleBatch) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions) M
}
if iter.inuse {
return newPebbleIteratorByCloning(CloneContext{
rawIter: p.iter,
statsReporter: p.iterStatsReporter,
rawIter: p.iter,
engine: p.parent,
}, opts, StandardDurability)
}

if iter.iter != nil {
iter.setOptions(opts, StandardDurability)
} else {
iter.initReuseOrCreate(handle, p.iter, p.iterUsed, opts, StandardDurability, p.iterStatsReporter)
iter.initReuseOrCreate(handle, p.iter, p.iterUsed, opts, StandardDurability, p.parent)
if p.iter == nil {
// For future cloning.
p.iter = iter.iter
Expand Down Expand Up @@ -238,15 +238,15 @@ func (p *pebbleBatch) NewEngineIterator(opts IterOptions) EngineIterator {
}
if iter.inuse {
return newPebbleIteratorByCloning(CloneContext{
rawIter: p.iter,
statsReporter: p.iterStatsReporter,
rawIter: p.iter,
engine: p.parent,
}, opts, StandardDurability)
}

if iter.iter != nil {
iter.setOptions(opts, StandardDurability)
} else {
iter.initReuseOrCreate(handle, p.iter, p.iterUsed, opts, StandardDurability, p.iterStatsReporter)
iter.initReuseOrCreate(handle, p.iter, p.iterUsed, opts, StandardDurability, p.parent)
if p.iter == nil {
// For future cloning.
p.iter = iter.iter
Expand Down
45 changes: 18 additions & 27 deletions pkg/storage/pebble_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package storage

import (
"bytes"
"context"
"math"
"sync"

Expand Down Expand Up @@ -46,10 +47,9 @@ type pebbleIterator struct {
// Buffer used to store MVCCRangeKeyVersions returned by RangeKeys(). Lazily
// initialized the first time an iterator's RangeKeys() method is called.
mvccRangeKeyVersions []MVCCRangeKeyVersion
// statsReporter is used to sum iterator stats across all the iterators
// during the lifetime of the Engine when the iterator is closed or its
// stats reset. It's intended to be used with (*Pebble). It must not be nil.
statsReporter iterStatsReporter

// parent is a pointer to the Engine from which the iterator was constructed.
parent *Pebble

// Set to true to govern whether to call SeekPrefixGE or SeekGE. Skips
// SSTables based on MVCC/Engine key when true.
Expand All @@ -75,16 +75,6 @@ type pebbleIterator struct {
mvccDone bool
}

type iterStatsReporter interface {
aggregateIterStats(IteratorStats)
}

var noopStatsReporter = noopStatsReporterImpl{}

type noopStatsReporterImpl struct{}

func (noopStatsReporterImpl) aggregateIterStats(IteratorStats) {}

var _ MVCCIterator = &pebbleIterator{}
var _ EngineIterator = &pebbleIterator{}

Expand All @@ -96,14 +86,11 @@ var pebbleIterPool = sync.Pool{

// newPebbleIterator creates a new Pebble iterator for the given Pebble reader.
func newPebbleIterator(
handle pebble.Reader,
opts IterOptions,
durability DurabilityRequirement,
statsReporter iterStatsReporter,
handle pebble.Reader, opts IterOptions, durability DurabilityRequirement, parent *Pebble,
) *pebbleIterator {
p := pebbleIterPool.Get().(*pebbleIterator)
p.reusable = false // defensive
p.init(nil, opts, durability, statsReporter)
p.init(nil, opts, durability, parent)
p.iter = pebbleiter.MaybeWrap(handle.NewIter(&p.options))
return p
}
Expand All @@ -116,7 +103,7 @@ func newPebbleIteratorByCloning(
var err error
p := pebbleIterPool.Get().(*pebbleIterator)
p.reusable = false // defensive
p.init(nil, opts, durability, cloneCtx.statsReporter)
p.init(nil, opts, durability, cloneCtx.engine)
p.iter, err = cloneCtx.rawIter.Clone(pebble.CloneOptions{
IterOptions: &p.options,
RefreshBatchView: true,
Expand All @@ -134,7 +121,7 @@ func newPebbleSSTIterator(
) (*pebbleIterator, error) {
p := pebbleIterPool.Get().(*pebbleIterator)
p.reusable = false // defensive
p.init(nil, opts, StandardDurability, noopStatsReporter)
p.init(nil, opts, StandardDurability, nil)

var externalIterOpts []pebble.ExternalIterOption
if forwardOnly {
Expand All @@ -158,15 +145,15 @@ func (p *pebbleIterator) init(
iter pebbleiter.Iterator,
opts IterOptions,
durability DurabilityRequirement,
statsReporter iterStatsReporter,
statsReporter *Pebble,
) {
*p = pebbleIterator{
iter: iter,
keyBuf: p.keyBuf,
lowerBoundBuf: p.lowerBoundBuf,
upperBoundBuf: p.upperBoundBuf,
rangeKeyMaskingBuf: p.rangeKeyMaskingBuf,
statsReporter: statsReporter,
parent: statsReporter,
reusable: p.reusable,
}
p.setOptions(opts, durability)
Expand All @@ -185,7 +172,7 @@ func (p *pebbleIterator) initReuseOrCreate(
clone bool,
opts IterOptions,
durability DurabilityRequirement,
statsReporter iterStatsReporter,
statsReporter *Pebble,
) {
if iter != nil && !clone {
p.init(iter, opts, durability, statsReporter)
Expand Down Expand Up @@ -315,8 +302,8 @@ func (p *pebbleIterator) Close() {

// Report the iterator's stats so they can be accumulated and exposed
// through time-series metrics.
if p.iter != nil {
p.statsReporter.aggregateIterStats(p.Stats())
if p.iter != nil && p.parent != nil {
p.parent.aggregateIterStats(p.Stats())
}

if p.reusable {
Expand Down Expand Up @@ -922,7 +909,7 @@ func (p *pebbleIterator) IsPrefix() bool {

// CloneContext is part of the EngineIterator interface.
func (p *pebbleIterator) CloneContext() CloneContext {
return CloneContext{rawIter: p.iter, statsReporter: p.statsReporter}
return CloneContext{rawIter: p.iter, engine: p.parent}
}

func (p *pebbleIterator) getBlockPropertyFilterMask() pebble.BlockPropertyFilterMask {
Expand Down Expand Up @@ -955,7 +942,11 @@ func (p *pebbleIterator) destroy() {
//
// NB: The panic is omitted if the error is encountered on an external
// iterator which is iterating over uncommitted sstables.

if err := p.iter.Close(); !p.external && errors.Is(err, pebble.ErrCorruption) {
if p.parent != nil {
p.parent.writePreventStartupFile(context.Background(), err)
}
panic(err)
}
p.iter = nil
Expand Down
7 changes: 6 additions & 1 deletion pkg/storage/pebble_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"strings"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -72,7 +73,7 @@ func TestPebbleIterator_Corruption(t *testing.T) {
LowerBound: []byte("a"),
UpperBound: []byte("z"),
}
iter := newPebbleIterator(p.db, iterOpts, StandardDurability, noopStatsReporter)
iter := newPebbleIterator(p.db, iterOpts, StandardDurability, p)

// Seeking into the table catches the corruption.
ok, err := iter.SeekEngineKeyGE(ek)
Expand All @@ -81,6 +82,10 @@ func TestPebbleIterator_Corruption(t *testing.T) {

// Closing the iter results in a panic due to the corruption.
require.Panics(t, func() { iter.Close() })

// Should have laid down marker file to prevent startup.
_, err = p.Stat(base.PreventedStartupFile(p.GetAuxiliaryDir()))
require.NoError(t, err)
}

func randStr(fill []byte, rng *rand.Rand) {
Expand Down

0 comments on commit 236d3b7

Please sign in to comment.