Skip to content

Commit

Permalink
roachtest: Add sstable-corruption roachtest
Browse files Browse the repository at this point in the history
Adds a new roachtest, sstable-corruption/table, that
imports TPCC, then finds an sstable that contains
table keys (which are replicated) on one node, then
calls `dd` to write random bytes into the middle
of them. It then checks that the corrupt node
either crashes on restart, or crashes after the
tpcc workload is run on it.

Informs #67568.

Release note: None.
  • Loading branch information
itsbilal committed Sep 28, 2021
1 parent bdb4c1a commit d2bfa7a
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 0 deletions.
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/cluster/monitor_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type Monitor interface {
ExpectDeath()
ExpectDeaths(count int32)
ResetDeaths()
NumExpectedDeaths() int32
Go(fn func(context.Context) error)
WaitE() error
Wait()
Expand Down
5 changes: 5 additions & 0 deletions pkg/cmd/roachtest/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ func (m *monitorImpl) ResetDeaths() {
atomic.StoreInt32(&m.expDeaths, 0)
}

// NumExpectedDeaths is the number of expected deaths that have yet to happen.
func (m *monitorImpl) NumExpectedDeaths() int32 {
return atomic.LoadInt32(&m.expDeaths)
}

var errTestFatal = errors.New("t.Fatal() was called")

func (m *monitorImpl) Go(fn func(context.Context) error) {
Expand Down
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tests/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func RegisterTests(r registry.Registry) {
registerSequelize(r)
registerSQLAlchemy(r)
registerSQLSmith(r)
registerSstableCorruption(r)
registerSyncTest(r)
registerSysbench(r)
registerTLP(r)
Expand Down
124 changes: 124 additions & 0 deletions pkg/cmd/roachtest/tests/sstable_corruption.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package tests

import (
"context"
"fmt"
"strconv"
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
)

func runSstableCorruption(ctx context.Context, t test.Test, c cluster.Cluster) {
crdbNodes := c.Range(1, c.Spec().NodeCount)
workloadNode := c.Node(1)
const corruptNode = 3

t.Status("installing cockroach")
c.Put(ctx, t.Cockroach(), "./cockroach", crdbNodes)
c.Start(ctx, crdbNodes)

// We don't really need tpcc, we just need a good amount of data. Enough
// to have multiple ranges, and some sstables with only table keys.
t.Status("importing tpcc fixture")
c.Run(ctx, workloadNode,
"./cockroach workload fixtures import tpcc --warehouses=100 --fks=false --checks=false")

m := c.NewMonitor(ctx, crdbNodes)
signalChan := make(chan bool)

m.Go(func(ctx context.Context) error {
// Wait for a signal from the other goroutine below before running the
// workload. This will only be called after the cluster has been restarted
// with corrupt sstables on one node.
select {
case proceed := <-signalChan:
if !proceed {
return nil
}
case <-ctx.Done():
return ctx.Err()
}
_ = c.RunE(ctx, workloadNode, "./cockroach workload run tpcc --warehouses=100 "+
fmt.Sprintf("--tolerate-errors --duration=%s", 5*time.Minute))
return nil
})

m.Go(func(ctx context.Context) error {
m.ExpectDeaths(3)
c.Stop(ctx, crdbNodes)

tableSSTs, err := c.RunWithBuffer(ctx, t.L(), c.Node(corruptNode),
"./cockroach debug pebble manifest dump {store-dir}/MANIFEST-* | grep -v added | grep -v deleted | grep \"\\[/Table\"")
if err != nil {
return err
}
strTableSSTs := strings.Split(string(tableSSTs), "\n")
if len(strTableSSTs) == 0 {
t.Fatal("expected at least one sst containing table keys only, got none")
}
// Corrupt up to 6 SSTs containing table keys.
corruptedFiles := 0
for _, sstLine := range strTableSSTs {
sstLine = strings.TrimSpace(sstLine)
firstFileIdx := strings.Index(sstLine, ":")
_, err = strconv.Atoi(sstLine[:firstFileIdx])
if err != nil {
t.Fatal("error when converting %s to int: %s", sstLine[:firstFileIdx], err.Error())
}

t.Status(fmt.Sprintf("corrupting sstable %s on node %d", sstLine[:firstFileIdx], corruptNode))
c.Run(ctx, c.Node(corruptNode), fmt.Sprintf("dd if=/dev/urandom of={store-dir}/%s.sst seek=256 count=128 bs=1 conv=notrunc", sstLine[:firstFileIdx]))
corruptedFiles++
if corruptedFiles >= 6 {
break
}
}

m.ExpectDeath()
if err := c.StartE(ctx, crdbNodes); err != nil {
// Node detected corruption on start and crashed. This is good. No need
// to run workload.
signalChan <- false
return nil
}
// Start the workload. This should cause the node to crash.
signalChan <- true
time.Sleep(2 * time.Minute)

if num := m.NumExpectedDeaths(); num > 0 {
t.Fatalf("expected node death to have occurred, still waiting on %d deaths", num)
}
// Reset deaths and restart the corrupt node as a clean node. This is
// necessary for the test to not fail upon cleanup.
m.ResetDeaths()
_ = c.WipeE(ctx, t.L(), c.Node(corruptNode))
c.Start(ctx, c.Node(corruptNode))
return nil
})
m.Wait()
}

func registerSstableCorruption(r registry.Registry) {
r.Add(registry.TestSpec{
Name: "sstable-corruption/table",
Owner: registry.OwnerStorage,
Cluster: r.MakeClusterSpec(3),
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runSstableCorruption(ctx, t, c)
},
})
}

0 comments on commit d2bfa7a

Please sign in to comment.