Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
96667: roachtest,loqrecovery: half-online loss of quorum recovery r=erikgrinaker a=aliher1911

This commit adds a roachtest exercising loss of quorum recovery
tools in half-online mode.
Added tests:
```
loqrecovery/half-online/workload=movr/rangeSize=2mb [replication]
loqrecovery/half-online/workload=movr/rangeSize=default [replication]
loqrecovery/half-online/workload=tpcc/rangeSize=16mb [replication]
loqrecovery/half-online/workload=tpcc/rangeSize=default [replication]
```
are similar to ones without `half-online` infix, but instead use
half-online tools that allow collection of info from running nodes
and restarting affected nodes only using a rolling restart.

Release note: None

Fixes #93055 

96982: bulk: fix incorrect ingestion throughput aggregation r=rhu713 a=adityamaru

IngestionPerformanceStats are emitted on completion of every flush in the SSTBatcher. A tracing aggregator on each restore processor listens for these events and maintains a running aggregate. This aggregate is then used to generate interesting metrics, one of which is the throughput in MB/sec at which this processor is flushing + ingesting bytes..

Previously, we were naively using the aggregatedBytes / aggregatedDuration to compute this thorughput. This is incorrect because we process several flushes concurrently per processor. To account for this we now maintain the earliest start time and the latest end time across the requests we have aggregated. When computing throughput we use the difference between these two timestamps as our denominator.

Fixes: #89579

Release note: None

97029: kvserver: bump raftLogQueue concurrency to 16 r=erikgrinaker a=pavelkalinnikov

Details in #93534

Fixes #93534
Epic: none
Release note (ops change): added COCKROACH_RAFT_LOG_QUEUE_CONCURRENCY env var which controls the number of parallel workers doing Raft log truncations. It can be used to make the in-memory log truncations more agressive and reduce the amount of Raft log data flushed to disk.

Co-authored-by: Oleg Afanasyev <[email protected]>
Co-authored-by: adityamaru <[email protected]>
Co-authored-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
4 people committed Feb 15, 2023
4 parents de09310 + ccb4a39 + 10d97b3 + db02e39 commit a24b178
Show file tree
Hide file tree
Showing 8 changed files with 340 additions and 28 deletions.
258 changes: 252 additions & 6 deletions pkg/cmd/roachtest/tests/loss_of_quorum_recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,19 @@ type loqTestSpec struct {
rangeSizeMB int64
}

func (s loqTestSpec) String() string {
// testName builds a test name based on mode and workload. mode argument is
// optional and is not included in the path if not set. It is done to preserve
// names for offline tests that have a success history stored as roachperf
// artifacts. If we rename it, they'll have to be checked manually.
// TODO: remove this exception once offline recovery is deprecated.
func (s loqTestSpec) testName(mode string) string {
sizeName := "default"
if s.rangeSizeMB > 0 {
sizeName = fmt.Sprintf("%dmb", s.rangeSizeMB)
}
if len(mode) > 0 {
return fmt.Sprintf("loqrecovery/%s/workload=%s/rangeSize=%s", mode, s.wl, sizeName)
}
return fmt.Sprintf("loqrecovery/workload=%s/rangeSize=%s", s.wl, sizeName)
}

Expand All @@ -61,7 +69,7 @@ func registerLOQRecovery(r registry.Registry) {
} {
testSpec := s
r.Add(registry.TestSpec{
Name: s.String(),
Name: s.testName(""),
Owner: registry.OwnerReplication,
Tags: []string{`default`},
Cluster: spec,
Expand All @@ -70,6 +78,16 @@ func registerLOQRecovery(r registry.Registry) {
runRecoverLossOfQuorum(ctx, t, c, testSpec)
},
})
r.Add(registry.TestSpec{
Name: s.testName("half-online"),
Owner: registry.OwnerReplication,
Tags: []string{`default`},
Cluster: spec,
NonReleaseBlocker: true,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runHalfOnlineRecoverLossOfQuorum(ctx, t, c, testSpec)
},
})
}
}

Expand Down Expand Up @@ -147,9 +165,8 @@ func runRecoverLossOfQuorum(ctx context.Context, t test.Test, c cluster.Cluster,
workloadHistogramFile := "restored.json"

c.Put(ctx, t.Cockroach(), "./cockroach", c.All())
startOpts := option.DefaultStartOpts()
settings := install.MakeClusterSettings()
c.Start(ctx, t.L(), startOpts, settings, nodes)
c.Start(ctx, t.L(), option.DefaultStartOpts(), settings, nodes)

// Cleanup stale files generated during recovery. We do this for the case
// where the cluster is reused and cli would refuse to overwrite files
Expand Down Expand Up @@ -232,13 +249,12 @@ func runRecoverLossOfQuorum(ctx context.Context, t test.Test, c cluster.Cluster,
applyCommand := "./cockroach debug recover apply-plan --store={store-dir} --confirm y " + planName
c.Run(ctx, c.Nodes(remaining...), applyCommand)

startOpts.RoachprodOpts.SkipInit = true
// Ignore node failures because they could fail if recovered ranges
// generate panics. We don't want test to fail in that case, and we
// rely on query and workload failures to expose that.
m.ExpectDeaths(int32(len(remaining)))
settings.Env = append(settings.Env, "COCKROACH_SCAN_INTERVAL=10s")
c.Start(ctx, t.L(), startOpts, settings, c.Nodes(remaining...))
c.Start(ctx, t.L(), option.DefaultStartSingleNodeOpts(), settings, c.Nodes(remaining...))

t.L().Printf("waiting for nodes to restart")
if err = contextutil.RunWithTimeout(ctx, "wait-for-restart", time.Minute,
Expand Down Expand Up @@ -335,6 +351,236 @@ func runRecoverLossOfQuorum(ctx context.Context, t test.Test, c cluster.Cluster,
}
}

func runHalfOnlineRecoverLossOfQuorum(
ctx context.Context, t test.Test, c cluster.Cluster, s loqTestSpec,
) {
// To debug or analyze recovery failures, enable this option. The test will
// start failing every time recovery is not successful. That would let you
// get the logs and plans and check if there was a node panic happening as a
// result of recovery.
// Debug state is taken from --debug option, but could be overridden here
// if we want to stress multiple time to and collect details without keeping
// clusters running.
debugFailures := t.IsDebug()

// Number of cockroach cluster nodes.
maxNode := c.Spec().NodeCount - 1
nodes := c.Range(1, maxNode)
// Controller node runs offline procedures and workload.
controller := c.Spec().NodeCount
// Nodes that we plan to keep after simulated failure.
remaining := []int{1, 4, 5}
killed := []int{2, 3}
killedNodes := c.Nodes(killed...)
planName := "recover-plan.json"
pgURL := fmt.Sprintf("{pgurl:1-%d}", c.Spec().NodeCount-1)
dbName := "test_db"
workloadHistogramFile := "restored.json"

c.Put(ctx, t.Cockroach(), "./cockroach", c.All())
settings := install.MakeClusterSettings()
c.Start(ctx, t.L(), option.DefaultStartOpts(), settings, nodes)

// Cleanup stale files generated during recovery. We do this for the case
// where the cluster is reused and cli would refuse to overwrite files
// blindly.
c.Run(ctx, c.All(), "rm", "-f", planName)

db := c.Conn(ctx, t.L(), 1)
defer db.Close()

t.L().Printf("started cluster")

// Lower the dead node detection threshold to make decommissioning nodes
// faster. 1:30 is currently the smallest allowed value, otherwise we could
// go as low as possible to improve test time.
_, err := db.Exec("SET CLUSTER SETTING server.time_until_store_dead = '1m30s'")
require.NoError(t, err, "failed to set dead store timeout")
if debugFailures {
// For debug runs enable statement tracing to have a better visibility of which
// queries failed.
_, err = db.Exec("SET CLUSTER SETTING sql.trace.stmt.enable_threshold = '30s'")
}

m := c.NewMonitor(ctx, nodes)
m.Go(func(ctx context.Context) error {
t.L().Printf("initializing workload")

c.Run(ctx, c.Node(controller), s.wl.initCmd(pgURL, dbName))

if s.rangeSizeMB > 0 {
err = setDBRangeLimits(ctx, db, dbName, s.rangeSizeMB*(1<<20))
require.NoError(t, err, "failed to set range limits configuration")
}

// Lower default statement timeout so that we can detect if workload gets
// stuck. We don't do it earlier because init could have long-running
// queries.
_, err = db.Exec("SET CLUSTER SETTING sql.defaults.statement_timeout = '60s'")
require.NoError(t, err, "failed to set default statement timeout")

t.L().Printf("running workload")
c.Run(ctx, c.Node(controller), s.wl.runCmd(pgURL, dbName, ifLocal(c, "10s", "30s"), ""))
t.L().Printf("workload finished")

m.ExpectDeaths(int32(len(killed)))
stopOpts := option.DefaultStopOpts()
c.Stop(ctx, t.L(), stopOpts, killedNodes)

t.L().Printf("running plan creation")
addrs, err := c.ExternalAddr(ctx, t.L(), c.Node(1))
require.NoError(t, err, "infra failure, can't get IP addr of cluster node")
require.NotEmpty(t, addrs, "infra failure, can't get IP addr of cluster node")
addr := addrs[0]
planCmd := "./cockroach debug recover make-plan --confirm y --insecure --host " + addr + " -o " + planName

if err = c.RunE(ctx, c.Node(controller), planCmd); err != nil {
t.L().Printf("failed to create plan, test can't proceed assuming unrecoverable cluster: %s",
err)
return &recoveryImpossibleError{testOutcome: planCantBeCreated}
}

if err := c.Get(ctx, t.L(), planName, path.Join(t.ArtifactsDir(), planName),
c.Node(controller)); err != nil {
t.Fatalf("failed to collect plan %s from controller node %d: %s", planName, controller, err)
}

t.L().Printf("staging recovery plan")
applyCommand := "./cockroach debug recover apply-plan --confirm y --insecure --host " + addr + " " + planName
c.Run(ctx, c.Nodes(controller), applyCommand)

// Ignore node failures because they could fail if recovered ranges
// generate panics. We don't want test to fail in that case, and we
// rely on query and workload failures to expose that.
m.ExpectDeaths(int32(len(remaining)))
settings.Env = append(settings.Env, "COCKROACH_SCAN_INTERVAL=10s")

t.L().Printf("performing rolling restart of surviving nodes")
for _, id := range remaining {
c.Stop(ctx, t.L(), stopOpts, c.Node(id))
c.Start(ctx, t.L(), option.DefaultStartSingleNodeOpts(), settings, c.Node(id))
}

t.L().Printf("waiting for nodes to process recovery")
verifyCommand := "./cockroach debug recover verify --insecure --host " + addr + " " + planName
if err = contextutil.RunWithTimeout(ctx, "wait-for-restart", 2*time.Minute,
func(ctx context.Context) error {
for {
res, err := c.RunWithDetailsSingleNode(ctx, t.L(), c.Node(controller), verifyCommand)
if res.RemoteExitStatus == 0 {
if ctx.Err() != nil {
return &recoveryImpossibleError{testOutcome: restartFailed}
}
if err != nil {
// No exit code and error means roachprod failure (ssh etc).
return err
}
break
}
t.L().Printf("recovery is not finished: %s", res.Stderr)
time.Sleep(10 * time.Second)
}
var err error
for {
if ctx.Err() != nil {
return &recoveryImpossibleError{testOutcome: restartFailed}
}
db, err = c.ConnE(ctx, t.L(), remaining[len(remaining)-1])
if err == nil {
break
}
time.Sleep(5 * time.Second)
}
// Restoring range limits if they were changed to improve recovery
// times.
for {
if ctx.Err() != nil {
return &recoveryImpossibleError{testOutcome: restartFailed}
}
err = setDBRangeLimits(ctx, db, dbName, 0 /* zero restores default size */)
if err == nil {
break
}
time.Sleep(5 * time.Second)
}
return nil
}); err != nil {
return err
}

t.L().Printf("resuming workload")
if err = c.RunE(ctx, c.Node(controller),
s.wl.runCmd(
fmt.Sprintf("{pgurl:1,4-%d}", maxNode), dbName, ifLocal(c, "30s", "3m"),
workloadHistogramFile)); err != nil {
return &recoveryImpossibleError{testOutcome: workloadFailed}
}
t.L().Printf("workload finished")

t.L().Printf("ensuring nodes are decommissioned")
if err := setSnapshotRate(ctx, db, 512); err != nil {
// Set snapshot executes SQL query against cluster, if query failed then
// cluster is not healthy after recovery and that means restart failed.
return &recoveryImpossibleError{testOutcome: restartFailed}
}
// In half online mode, nodes will update dead nodes' status upon
// restart. Check that it actually happened. We also need to have retry
// since decommission is done in the background with retries.
if err = contextutil.RunWithTimeout(ctx, "wait-for-decommission", 5*time.Minute,
func(ctx context.Context) error {
// Keep trying to query until either we get no rows (all nodes are
// decommissioned or removed) or task times out. In timeout case, test
// error will be a cause for timeout thus exposing error code.
res := workloadFailed
for ; ; time.Sleep(5 * time.Second) {
rows, err := db.QueryContext(
ctx,
"select node_id from crdb_internal.kv_node_liveness where node_id in ($1, $2) and membership = 'active'",
killed[0],
killed[1])
if ctx.Err() != nil {
return &recoveryImpossibleError{testOutcome: res}
}
if err != nil {
continue
}
if rows.Next() {
res = decommissionFailed
continue
}
return nil
}
}); err != nil {
return err
}

return nil
})

testOutcome := success
if err = m.WaitE(); err != nil {
testOutcome = restartFailed
if recErr := (*recoveryImpossibleError)(nil); errors.As(err, &recErr) {
testOutcome = recErr.testOutcome
} else {
t.L().Printf("restart failed with: %s", err)
}
}

recordOutcome, buffer := initPerfCapture()
recordOutcome(testOutcome)
buffer.upload(ctx, t, c)

if testOutcome == success {
t.L().Printf("recovery succeeded 🍾 \U0001F973")
} else {
t.L().Printf("recovery failed with error %s(%d)", outcomeNames[testOutcome], testOutcome)
if debugFailures && testOutcome > 0 {
t.Fatalf("test failed with error %s(%d)", outcomeNames[testOutcome], testOutcome)
}
}
}

func setDBRangeLimits(ctx context.Context, db *gosql.DB, dbName string, size int64) error {
query := fmt.Sprintf("ALTER DATABASE %s CONFIGURE ZONE USING range_max_bytes=%d, range_min_bytes=1024",
dbName, size)
Expand Down
11 changes: 9 additions & 2 deletions pkg/kv/bulk/bulkpb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ proto_library(
srcs = ["bulkpb.proto"],
strip_import_prefix = "/pkg",
visibility = ["//visibility:public"],
deps = ["@com_github_gogo_protobuf//gogoproto:gogo_proto"],
deps = [
"//pkg/util/hlc:hlc_proto",
"@com_github_gogo_protobuf//gogoproto:gogo_proto",
],
)

go_proto_library(
Expand All @@ -17,7 +20,10 @@ go_proto_library(
importpath = "github.com/cockroachdb/cockroach/pkg/kv/bulk/bulkpb",
proto = ":bulkpb_proto",
visibility = ["//visibility:public"],
deps = ["@com_github_gogo_protobuf//gogoproto"],
deps = [
"//pkg/util/hlc",
"@com_github_gogo_protobuf//gogoproto",
],
)

go_library(
Expand All @@ -29,6 +35,7 @@ go_library(
deps = [
"//pkg/roachpb",
"//pkg/util/bulk",
"//pkg/util/hlc",
"//pkg/util/humanizeutil",
"//pkg/util/log",
"@com_github_cockroachdb_redact//:redact",
Expand Down
18 changes: 15 additions & 3 deletions pkg/kv/bulk/bulkpb/bulkpb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@ package cockroach.kv.bulk.bulkpb;
option go_package = "bulkpb";

import "gogoproto/gogo.proto";
import "util/hlc/timestamp.proto";

// IngestionPerformanceStats is a message containing information about the
// creation of SSTables by an SSTBatcher or BufferingAdder.
message IngestionPerformanceStats {
// DataSize is the total byte size of all the SST files ingested.
int64 data_size = 1;
// LogicalDataSize is the total byte size of all the KVs ingested.
int64 logical_data_size = 1;

// SSTDataSize is the total byte size of the SSTs sent to KV for ingestion.
int64 sst_data_size = 23 [(gogoproto.customname) = "SSTDataSize"];

// Buffer Flushes is the number of buffer flushes.
int64 buffer_flushes = 2;
Expand Down Expand Up @@ -72,9 +76,17 @@ message IngestionPerformanceStats {
// CommitWait is the time spent waiting for commit timestamps.
int64 commit_wait = 18 [(gogoproto.casttype) = "time.Duration"];

// Duration is the total ingestion time.
// Duration is the difference between the CurrentFlushTime and the
// PreviousFlushTime.
int64 duration = 19 [(gogoproto.casttype) = "time.Duration"];

// LastFlushTime is the timestamp at which we completed the flush prior to the
// current flush.
util.hlc.Timestamp last_flush_time = 21 [(gogoproto.nullable) = false];

// CurrentFlushTime is the timestamp at which we finished the current flush.
util.hlc.Timestamp current_flush_time = 22 [(gogoproto.nullable) = false];

// SendWaitByStore is the time spent sending batches to each store.
map<int32, int64> send_wait_by_store = 20 [(gogoproto.castkey) = "github.com/cockroachdb/cockroach/pkg/roachpb.StoreID", (gogoproto.castvalue) = "time.Duration"];
}
Loading

0 comments on commit a24b178

Please sign in to comment.