Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

roachtest: add synctest #31187

Merged
merged 5 commits into from
Oct 10, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 30 additions & 15 deletions pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/cli/debug"
"github.com/cockroachdb/cockroach/pkg/cli/synctest"
"github.com/cockroachdb/cockroach/pkg/cli/syncbench"
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/keys"
Expand Down Expand Up @@ -99,9 +99,23 @@ func parseRangeID(arg string) (roachpb.RangeID, error) {
return roachpb.RangeID(rangeIDInt), nil
}

// OpenEngineOptions tunes the behavior of OpenEngine.
type OpenEngineOptions struct {
ReadOnly bool
MustExist bool
}

// OpenExistingStore opens the rocksdb engine rooted at 'dir'.
// If 'readOnly' is true, opens the store in read-only mode.
func OpenExistingStore(dir string, stopper *stop.Stopper, readOnly bool) (*engine.RocksDB, error) {
return OpenEngine(dir, stopper, OpenEngineOptions{ReadOnly: readOnly, MustExist: true})
}

// OpenEngine opens the RocksDB engine at 'dir'. Depending on the supplied options,
// an empty engine might be initialized.
func OpenEngine(
dir string, stopper *stop.Stopper, opts OpenEngineOptions,
) (*engine.RocksDB, error) {
cache := engine.NewRocksDBCache(server.DefaultCacheSize)
defer cache.Release()
maxOpenFiles, err := server.SetOpenFileLimitForOneStore()
Expand All @@ -113,8 +127,8 @@ func OpenExistingStore(dir string, stopper *stop.Stopper, readOnly bool) (*engin
Settings: serverCfg.Settings,
Dir: dir,
MaxOpenFiles: maxOpenFiles,
MustExist: true,
ReadOnly: readOnly,
MustExist: opts.MustExist,
ReadOnly: opts.ReadOnly,
}

if PopulateRocksDBConfigHook != nil {
Expand Down Expand Up @@ -1011,28 +1025,28 @@ func runTimeSeriesDump(cmd *cobra.Command, args []string) error {
}
}

var debugSyncTestCmd = &cobra.Command{
Use: "synctest [directory]",
var debugSyncBenchCmd = &cobra.Command{
Use: "syncbench [directory]",
Short: "Run a performance test for WAL sync speed",
Long: `
`,
Args: cobra.MaximumNArgs(1),
Hidden: true,
RunE: MaybeDecorateGRPCError(runDebugSyncTest),
RunE: MaybeDecorateGRPCError(runDebugSyncBench),
}

var syncTestOpts = synctest.Options{
var syncBenchOpts = syncbench.Options{
Concurrency: 1,
Duration: 10 * time.Second,
LogOnly: true,
}

func runDebugSyncTest(cmd *cobra.Command, args []string) error {
syncTestOpts.Dir = "./testdb"
func runDebugSyncBench(cmd *cobra.Command, args []string) error {
syncBenchOpts.Dir = "./testdb"
if len(args) == 1 {
syncTestOpts.Dir = args[0]
syncBenchOpts.Dir = args[0]
}
return synctest.Run(syncTestOpts)
return syncbench.Run(syncBenchOpts)
}

var debugUnsafeRemoveDeadReplicasCmd = &cobra.Command{
Expand Down Expand Up @@ -1207,12 +1221,12 @@ func removeDeadReplicas(
func init() {
DebugCmd.AddCommand(debugCmds...)

f := debugSyncTestCmd.Flags()
f.IntVarP(&syncTestOpts.Concurrency, "concurrency", "c", syncTestOpts.Concurrency,
f := debugSyncBenchCmd.Flags()
f.IntVarP(&syncBenchOpts.Concurrency, "concurrency", "c", syncBenchOpts.Concurrency,
"number of concurrent writers")
f.DurationVarP(&syncTestOpts.Duration, "duration", "d", syncTestOpts.Duration,
f.DurationVarP(&syncBenchOpts.Duration, "duration", "d", syncBenchOpts.Duration,
"duration to run the test for")
f.BoolVarP(&syncTestOpts.LogOnly, "log-only", "l", syncTestOpts.LogOnly,
f.BoolVarP(&syncBenchOpts.LogOnly, "log-only", "l", syncBenchOpts.LogOnly,
"only write to the WAL, not to sstables")

f = debugUnsafeRemoveDeadReplicasCmd.Flags()
Expand Down Expand Up @@ -1243,6 +1257,7 @@ var debugCmds = append(DebugCmdsForRocksDB,
debugSSTDumpCmd,
debugGossipValuesCmd,
debugTimeSeriesDumpCmd,
debugSyncBenchCmd,
debugSyncTestCmd,
debugUnsafeRemoveDeadReplicasCmd,
debugEnvCmd,
Expand Down
246 changes: 246 additions & 0 deletions pkg/cli/debug_synctest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
// Copyright 2018 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License. See the AUTHORS file
// for names of contributors.

package cli

import (
"bytes"
"context"
"fmt"
"math/rand"
"os"
"os/exec"
"os/signal"
"path/filepath"
"strconv"
"time"

"github.com/pkg/errors"
"github.com/spf13/cobra"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/stop"
)

var debugSyncTestCmd = &cobra.Command{
Use: "synctest <empty-dir> <nemesis-script>",
Short: "Run a log-like workload that can help expose filesystem anomalies",
Long: `
synctest is a tool to verify filesystem consistency in the presence of I/O errors.
It takes a directory (required to be initially empty and created on demand) into
which data will be written and a nemesis script which receives a single argument
that is either "on" or "off".

The nemesis script will be called with a parameter of "on" when the filesystem
underlying the given directory should be "disturbed". It is called with "off"
to restore the undisturbed state (note that "off" must be idempotent).

synctest will run run across multiple "epochs", each terminated by an I/O error
injected by the nemesis. After each epoch, the nemesis is turned off and the
written data is reopened, checked for data loss, new data is written, and
the nemesis turned back on. In the absence of unexpected error or user interrupt,
this process continues indefinitely.
`,
Args: cobra.ExactArgs(2),
RunE: runDebugSyncTest,
}

type scriptNemesis string

func (sn scriptNemesis) exec(arg string) error {
b, err := exec.Command(string(sn), arg).CombinedOutput()
if err != nil {
return errors.Wrap(err, string(b))
}
fmt.Fprintf(stderr, "%s %s: %s", sn, arg, b)
return nil
}

func (sn scriptNemesis) On() error {
return sn.exec("on")
}

func (sn scriptNemesis) Off() error {
return sn.exec("off")
}

func runDebugSyncTest(cmd *cobra.Command, args []string) error {
// TODO(tschottdorf): make this a flag.
duration := 10 * time.Minute

ctx, cancel := context.WithTimeout(context.Background(), duration)
defer cancel()

nem := scriptNemesis(args[1])
if err := nem.Off(); err != nil {
return errors.Wrap(err, "unable to disable nemesis at beginning of run")
}

var generation int
var lastSeq int64
for {
dir := filepath.Join(args[0], strconv.Itoa(generation))
curLastSeq, err := runSyncer(ctx, dir, lastSeq, nem)
if err != nil {
return err
}
lastSeq = curLastSeq
if curLastSeq == 0 {
if ctx.Err() != nil {
// Clean shutdown.
return nil
}
// RocksDB dir got corrupted.
generation++
continue
}
}
}

type nemesisI interface {
On() error
Off() error
}

func runSyncer(
ctx context.Context, dir string, expSeq int64, nemesis nemesisI,
) (lastSeq int64, _ error) {
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

db, err := OpenEngine(dir, stopper, OpenEngineOptions{})
if err != nil {
if expSeq == 0 {
// Failed on first open, before we tried to corrupt anything. Hard stop.
return 0, err
}
fmt.Fprintln(stderr, "RocksDB directory", dir, "corrupted:", err)
return 0, nil // trigger reset
}

buf := make([]byte, 128)
var seq int64
key := func() roachpb.Key {
seq++
return encoding.EncodeUvarintAscending(buf[:0:0], uint64(seq))
}

check := func(kv engine.MVCCKeyValue) (bool, error) {
expKey := key()
if !bytes.Equal(kv.Key.Key, expKey) {
return false, errors.Errorf(
"found unexpected key %q (expected %q)", kv.Key.Key, expKey,
)
}
return false, nil // want more
}

fmt.Fprintf(stderr, "verifying existing sequence numbers...")
keyMin := engine.MakeMVCCMetadataKey(roachpb.KeyMin)
if err := db.Iterate(keyMin, engine.MVCCKeyMax, check); err != nil {
return 0, err
}
// We must not lose writes, but sometimes we get extra ones (i.e. we caught an
// error but the write actually went through).
if expSeq != 0 && seq < expSeq {
return 0, errors.Errorf("highest persisted sequence number is %d, but expected at least %d", seq, expSeq)
}
fmt.Fprintf(stderr, "done (seq=%d).\nWriting new entries:\n", seq)

waitFailure := time.After(time.Duration(rand.Int63n(5 * time.Second.Nanoseconds())))

stopper.RunWorker(ctx, func(ctx context.Context) {
<-waitFailure
if err := nemesis.On(); err != nil {
panic(err)
}
defer func() {
if err := nemesis.Off(); err != nil {
panic(err)
}
}()
<-stopper.ShouldQuiesce()
})

ch := make(chan os.Signal, 1)
signal.Notify(ch, drainSignals...)

write := func() (_ int64, err error) {
defer func() {
// Catch any RocksDB NPEs. They do occur when enough
// faults are being injected.
if r := recover(); r != nil {
if err == nil {
err = errors.New("recovered panic on write")
}
err = errors.Wrapf(err, "%v", r)
}
}()

k, v := engine.MakeMVCCMetadataKey(key()), []byte("payload")
switch seq % 2 {
case 0:
if err := db.Put(k, v); err != nil {
seq--
return seq, err
}
if err := db.Flush(); err != nil {
seq--
return seq, err
}
default:
b := db.NewBatch()
if err := b.Put(k, v); err != nil {
seq--
return seq, err
}
if err := b.Commit(true /* sync */); err != nil {
seq--
return seq, err
}
}
return seq, nil
}

for {
if lastSeq, err := write(); err != nil {
// Exercise three cases:
// 1. no more writes after first failure
// 2. one more attempt, failure persists
// 3. two more attempts, file system healed for last attempt
for n := rand.Intn(3); n >= 0; n-- {
if n == 1 {
if err := nemesis.Off(); err != nil {
return 0, err
}
}
fmt.Fprintf(stderr, "error after seq %d (trying %d additional writes): %v\n", lastSeq, n, err)
lastSeq, err = write()
}
fmt.Fprintf(stderr, "error after seq %d: %v\n", lastSeq, err)
// Intentionally swallow the error to get into the next epoch.
return lastSeq, nil
}
select {
case sig := <-ch:
return seq, errors.Errorf("interrupted (%v)", sig)
case <-ctx.Done():
return 0, nil
default:
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// permissions and limitations under the License. See the AUTHORS file
// for names of contributors.

package synctest
package syncbench

import (
"context"
Expand Down Expand Up @@ -127,6 +127,9 @@ type Options struct {
}

// Run a test of writing synchronously to the RocksDB WAL.
//
// TODO(tschottdorf): this should receive a RocksDB instance so that the caller
// in cli can use OpenEngine (which in turn allows to use encryption, etc).
func Run(opts Options) error {
// Check if the directory exists.
_, err := os.Stat(opts.Dir)
Expand Down
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func registerTests(r *registry) {
registerRoachmart(r)
registerScaleData(r)
registerSchemaChange(r)
registerSyncTest(r)
registerTPCC(r)
registerUpgrade(r)
registerVersion(r)
Expand Down
Loading