Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
89813: kvserver: remove legacy snapshot and diff code r=erikgrinaker a=pavelkalinnikov

This commit removes the proto and code for `RaftDataSnapshot`. This type was used for reporting replica snapshots and computing diffs, but now this functionality has been removed in favor of storage checkpoints and offline tooling.

Part of #21128
Epic: none

Release note: None

90006: roachtest: Introduce a test to overwhelm nodes r=irfansharif a=andrewbaptist

This test is here to check behavior of the system as the SQL load greatly exceeds what the nodes are able to handle. In the future we want to evaulate how this is handled, but today this will cause nodes to OOM.

Informs #89142.

Release note: None

90063: roachtest: de-flake admission-control/tpcc-olap r=irfansharif a=irfansharif

Fixes #89600. This test makes use of `workload querybench`, which is only available in the `workload` binary. This test started failing after \#89482 where we (attempted) to replace all uses of `./workload` with `./cockroach workload`.

Release note: None

Co-authored-by: Pavel Kalinnikov <[email protected]>
Co-authored-by: Andrew Baptist <[email protected]>
Co-authored-by: irfan sharif <[email protected]>
  • Loading branch information
4 people committed Oct 17, 2022
4 parents 8970edb + bbdd88c + d8976d9 + dcd3273 commit c68e94a
Show file tree
Hide file tree
Showing 13 changed files with 78 additions and 453 deletions.
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tests/admission_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func registerAdmission(r registry.Registry) {
registerMultiStoreOverload(r)
registerSnapshotOverload(r)
registerTPCCOverload(r)
registerTPCCSevereOverload(r)

// TODO(irfansharif): Once registerMultiTenantFairness is unskipped and
// observed to be non-flaky for 3-ish months, transfer ownership to the AC
Expand Down
41 changes: 41 additions & 0 deletions pkg/cmd/roachtest/tests/admission_control_tpcc_overload.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/ts/tspb"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -51,6 +53,10 @@ func (s tpccOLAPSpec) run(ctx context.Context, t test.Test, c cluster.Cluster) {
ctx, t, c, tpccOptions{
Warehouses: s.Warehouses, SetupType: usingImport,
})
// We make use of querybench below, only available through the `workload`
// binary.
c.Put(ctx, t.DeprecatedWorkload(), "./workload", workloadNode)

const queryFileName = "queries.sql"
// querybench expects the entire query to be on a single line.
queryLine := `"` + strings.Replace(tpccOlapQuery, "\n", " ", -1) + `"`
Expand Down Expand Up @@ -144,3 +150,38 @@ func registerTPCCOverload(r registry.Registry) {
})
}
}

// This test begins a ramping TPCC workload that will overwhelm the CRDB nodes.
// There is no way to "pass" this test since the 6 nodes can't possibly handle
// 10K warehouses. If they could handle this load, then the test should be
// changed to increase that count. The purpose of the test is to make sure that
// the nodes don't fail under unsustainable overload. As of today (v22.2), the
// CRDB nodes will eventually OOM around 3-4 hours through the ramp period.
func registerTPCCSevereOverload(r registry.Registry) {
r.Add(registry.TestSpec{
Name: "admission-control/tpcc-severe-overload",
Owner: registry.OwnerAdmissionControl,
// TODO(abaptist): This test will require a lot of admission control work
// to pass. Just putting it here to make easy to run at any time.
Skip: "#89142",
Cluster: r.MakeClusterSpec(7, spec.CPU(8)),
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
roachNodes := c.Range(1, c.Spec().NodeCount-1)
workloadNode := c.Spec().NodeCount

c.Put(ctx, t.Cockroach(), "./cockroach", c.All())
c.Start(ctx, t.L(), option.DefaultStartOpts(), install.MakeClusterSettings(), roachNodes)

t.Status("initializing (~1h)")
c.Run(ctx, c.Node(workloadNode), "./cockroach workload fixtures import tpcc --checks=false --warehouses=10000 {pgurl:1}")

// This run passes through 4 "phases"
// 1) No admission control, low latencies (up to ~1500 warehouses).
// 2) Admission control delays, growing latencies (up to ~3000 warehouses).
// 3) High latencies (100s+), queues building (up to ~4500 warehouse).
// 4) Memory and goroutine unbounded growth with eventual node crashes (up to ~6000 warehouse).
t.Status("running workload (fails in ~3-4 hours)")
c.Run(ctx, c.Node(workloadNode), "./cockroach workload run tpcc --ramp=6h --tolerate-errors --warehouses=10000 '{pgurl:1-6}'")
},
})
}
2 changes: 0 additions & 2 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ go_library(
"replica_closedts.go",
"replica_command.go",
"replica_consistency.go",
"replica_consistency_diff.go",
"replica_corruption.go",
"replica_destroy.go",
"replica_eval_context.go",
Expand Down Expand Up @@ -168,7 +167,6 @@ go_library(
"//pkg/util",
"//pkg/util/admission",
"//pkg/util/admission/admissionpb",
"//pkg/util/bufalloc",
"//pkg/util/buildutil",
"//pkg/util/circuit",
"//pkg/util/contextutil",
Expand Down
28 changes: 9 additions & 19 deletions pkg/kv/kvserver/consistency_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,9 +347,8 @@ func TestCheckConsistencyInconsistent(t *testing.T) {
assert.Contains(t, resp.Result[0].Detail, `[minority]`)
assert.Contains(t, resp.Result[0].Detail, `stats`)

// Checkpoints should have been created on all stores. Load replica snapshots
// from them.
snaps := make([]roachpb.RaftSnapshotData, numStores)
// Checkpoints should have been created on all stores.
hashes := make([][]byte, numStores)
for i := 0; i < numStores; i++ {
cps := onDiskCheckpointPaths(i)
require.Len(t, cps, 1)
Expand Down Expand Up @@ -379,23 +378,14 @@ func TestCheckConsistencyInconsistent(t *testing.T) {
}))
require.NotNil(t, desc)

// Load the content of the problematic range.
snap, err := kvserver.LoadRaftSnapshotDataForTesting(context.Background(), *desc, cpEng)
// Compute a checksum over the content of the problematic range.
hash, err := kvserver.ChecksumRange(context.Background(), *desc, cpEng)
require.NoError(t, err)
snaps[i] = snap
}

assert.Empty(t, kvserver.DiffRange(&snaps[0], &snaps[2])) // s1 and s3 agree
diff := kvserver.DiffRange(&snaps[0], &snaps[1])
diff[0].Timestamp = hlc.Timestamp{Logical: 987, WallTime: 123} // for determinism
wantDiff := `--- leaseholder
+++ follower
+0.000000123,987 "e"
+ ts:1970-01-01 00:00:00.000000123 +0000 UTC
+ value:"\x00\x00\x00\x00\x01T"
+ raw mvcc_key/value: 6500000000000000007b000003db0d 000000000154
`
assert.Equal(t, wantDiff, diff.String())
hashes[i] = hash
}

assert.Equal(t, hashes[0], hashes[2]) // s1 and s3 agree
assert.NotEqual(t, hashes[0], hashes[1]) // s2 diverged

// A death rattle should have been written on s2 (store index 1).
eng := store1.Engine()
Expand Down
12 changes: 12 additions & 0 deletions pkg/kv/kvserver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,3 +550,15 @@ func WatchForDisappearingReplicas(t testing.TB, store *Store) {
}
}
}

// ChecksumRange returns a checksum over the KV data of the given range.
func ChecksumRange(
ctx context.Context, desc roachpb.RangeDescriptor, snap storage.Reader,
) ([]byte, error) {
lim := quotapool.NewRateLimiter("test", 1<<30, 1<<30)
res, err := replicaSHA512(ctx, desc, snap, roachpb.ChecksumMode_CHECK_FULL, lim)
if err != nil {
return nil, err
}
return res.SHA512[:], nil
}
64 changes: 6 additions & 58 deletions pkg/kv/kvserver/replica_consistency.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
Expand All @@ -30,7 +29,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/fs"
"github.com/cockroachdb/cockroach/pkg/util/bufalloc"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -482,34 +480,19 @@ type replicaHash struct {
PersistedMS, RecomputedMS enginepb.MVCCStats
}

// LoadRaftSnapshotDataForTesting returns all the KV data of the given range.
// Only for testing.
func LoadRaftSnapshotDataForTesting(
ctx context.Context, rd roachpb.RangeDescriptor, store storage.Reader,
) (roachpb.RaftSnapshotData, error) {
var r *Replica
var snap roachpb.RaftSnapshotData
lim := quotapool.NewRateLimiter("test", 1<<20, 1<<20)
if _, err := r.sha512(ctx, rd, store, &snap, roachpb.ChecksumMode_CHECK_FULL, lim); err != nil {
return roachpb.RaftSnapshotData{}, err
}
return snap, nil
}

// sha512 computes the SHA512 hash of all the replica data at the snapshot.
// It will dump all the kv data into snapshot if it is provided.
func (*Replica) sha512(
// replicaSHA512 computes the SHA512 hash of the replica data at the given
// snapshot. Either the full replicated state is taken into account, or only
// RangeAppliedState (which includes MVCC stats), depending on the mode.
func replicaSHA512(
ctx context.Context,
desc roachpb.RangeDescriptor,
snap storage.Reader,
snapshot *roachpb.RaftSnapshotData,
mode roachpb.ChecksumMode,
limiter *quotapool.RateLimiter,
) (*replicaHash, error) {
statsOnly := mode == roachpb.ChecksumMode_CHECK_STATS

// Iterate over all the data in the range.
var alloc bufalloc.ByteAllocator
var intBuf [8]byte
var legacyTimestamp hlc.LegacyTimestamp
var timestampBuf []byte
Expand All @@ -520,17 +503,6 @@ func (*Replica) sha512(
if err := limiter.WaitN(ctx, int64(len(unsafeKey.Key)+len(unsafeValue))); err != nil {
return err
}

if snapshot != nil {
// Add (a copy of) the kv pair into the debug message.
kv := roachpb.RaftSnapshotData_KeyValue{
Timestamp: unsafeKey.Timestamp,
}
alloc, kv.Key = alloc.Copy(unsafeKey.Key, 0)
alloc, kv.Value = alloc.Copy(unsafeValue, 0)
snapshot.KV = append(snapshot.KV, kv)
}

// Encode the length of the key and value.
binary.LittleEndian.PutUint64(intBuf[:], uint64(len(unsafeKey.Key)))
if _, err := hasher.Write(intBuf[:]); err != nil {
Expand Down Expand Up @@ -566,18 +538,6 @@ func (*Replica) sha512(
if err != nil {
return err
}

if snapshot != nil {
// Add (a copy of) the range key into the debug message.
rkv := roachpb.RaftSnapshotData_RangeKeyValue{
Timestamp: rangeKV.RangeKey.Timestamp,
}
alloc, rkv.StartKey = alloc.Copy(rangeKV.RangeKey.StartKey, 0)
alloc, rkv.EndKey = alloc.Copy(rangeKV.RangeKey.EndKey, 0)
alloc, rkv.Value = alloc.Copy(rangeKV.Value, 0)
snapshot.RangeKV = append(snapshot.RangeKV, rkv)
}

// Encode the length of the start key and end key.
binary.LittleEndian.PutUint64(intBuf[:], uint64(len(rangeKV.RangeKey.StartKey)))
if _, err := hasher.Write(intBuf[:]); err != nil {
Expand Down Expand Up @@ -639,19 +599,6 @@ func (*Replica) sha512(
if err != nil {
return nil, err
}
if snapshot != nil {
// Add LeaseAppliedState to the snapshot.
kv := roachpb.RaftSnapshotData_KeyValue{
Timestamp: hlc.Timestamp{},
}
kv.Key = keys.RangeAppliedStateKey(desc.RangeID)
var v roachpb.Value
if err := v.SetProto(rangeAppliedState); err != nil {
return nil, err
}
kv.Value = v.RawBytes
snapshot.KV = append(snapshot.KV, kv)
}
if _, err := hasher.Write(b); err != nil {
return nil, err
}
Expand Down Expand Up @@ -725,6 +672,7 @@ func (r *Replica) computeChecksumPostApply(
defer taskCancel()
defer snap.Close()
defer cleanup()

// Wait until the CollectChecksum request handler joins in and learns about
// the starting computation, and then start it.
if err := contextutil.RunWithTimeout(ctx, taskName, consistencyCheckSyncTimeout,
Expand All @@ -743,7 +691,7 @@ func (r *Replica) computeChecksumPostApply(
); err != nil {
log.Errorf(ctx, "checksum collection did not join: %v", err)
} else {
result, err := r.sha512(ctx, desc, snap, nil /* snapshot */, cc.Mode, r.store.consistencyLimiter)
result, err := replicaSHA512(ctx, desc, snap, cc.Mode, r.store.consistencyLimiter)
if err != nil {
log.Errorf(ctx, "checksum computation failed: %v", err)
result = nil
Expand Down
Loading

0 comments on commit c68e94a

Please sign in to comment.