forked from cockroachdb/cockroach
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
kvserver: add TestCreateManyUnappliedProbes
This is the test used for cockroachdb#102953. Epic: none Release note: none
- Loading branch information
Showing
2 changed files
with
272 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,270 @@ | ||
// Copyright 2022 The Cockroach Authors. | ||
// | ||
// Use of this software is governed by the Business Source License | ||
// included in the file licenses/BSL.txt. | ||
// | ||
// As of the Change Date specified in that file, in accordance with | ||
// the Business Source License, use of this software will be governed | ||
// by the Apache License, Version 2.0, included in the file | ||
// licenses/APL.txt. | ||
|
||
package kvserver_test | ||
|
||
import ( | ||
"context" | ||
"math" | ||
"os" | ||
"path/filepath" | ||
"sync" | ||
"testing" | ||
"time" | ||
|
||
"github.com/cockroachdb/cockroach/pkg/base" | ||
"github.com/cockroachdb/cockroach/pkg/keys" | ||
"github.com/cockroachdb/cockroach/pkg/kv/kvpb" | ||
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" | ||
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" | ||
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore" | ||
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftentry" | ||
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog" | ||
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" | ||
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty" | ||
"github.com/cockroachdb/cockroach/pkg/roachpb" | ||
"github.com/cockroachdb/cockroach/pkg/settings/cluster" | ||
"github.com/cockroachdb/cockroach/pkg/storage" | ||
"github.com/cockroachdb/cockroach/pkg/storage/enginepb" | ||
"github.com/cockroachdb/cockroach/pkg/testutils/skip" | ||
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster" | ||
"github.com/cockroachdb/cockroach/pkg/util/leaktest" | ||
"github.com/cockroachdb/cockroach/pkg/util/log" | ||
"github.com/cockroachdb/cockroach/pkg/util/metric" | ||
"github.com/cockroachdb/cockroach/pkg/util/stop" | ||
"github.com/stretchr/testify/require" | ||
"go.etcd.io/raft/v3" | ||
"go.etcd.io/raft/v3/raftpb" | ||
) | ||
|
||
// TestCreateManyUnappliedProbes is a (by default skipped) test that writes | ||
// a very large unapplied raft log consisting entirely of probes. | ||
// | ||
// It's a toy example for #75729 but has been useful to validate improvements | ||
// in the raft pipeline, so it is checked in to allow for future re-use for | ||
// similar purposes. | ||
func TestCreateManyUnappliedProbes(t *testing.T) { | ||
defer leaktest.AfterTest(t)() | ||
defer log.Scope(t).Close(t) | ||
skip.UnderShort(t, "takes ~4s") | ||
|
||
ctx := context.Background() | ||
|
||
// NB: I used this as follows: | ||
// | ||
// p := os.ExpandEnv("$HOME/go/src/github.com/cockroachdb/cockroach/cockroach-data") | ||
// const entsPerBatch = 100000 | ||
// const batches = 1000 | ||
// | ||
// ./dev build && rm -rf cockroach-data && timeout 10 ./cockroach start-single-node --logtostderr --insecure ; \ | ||
// go test ./pkg/kv/kvserver/ -v --run TestCreateManyUnappliedProbes && sleep 3 && \ | ||
// (./cockroach start-single-node --logtostderr=INFO --insecure | grep -F r10/) | ||
// | ||
// Then wait and watch the `raft.commandsapplied` metric to see r10 apply the entries. | ||
p := filepath.Join(t.TempDir(), "cockroach-data") | ||
const entsPerBatch = 10 | ||
const batches = 3 | ||
rangeID := roachpb.RangeID(10) // system.settings | ||
|
||
if _, err := os.Stat(p); err != nil { | ||
args := base.TestClusterArgs{ | ||
ServerArgs: base.TestServerArgs{StoreSpecs: []base.StoreSpec{ | ||
{Path: p}, | ||
}}, | ||
ReplicationMode: base.ReplicationManual, | ||
} | ||
tc := testcluster.StartTestCluster(t, 1, args) | ||
// Reload system.settings' rangeID just in case it changes. | ||
require.NoError(t, tc.ServerConn(0).QueryRow(`SELECT | ||
range_id | ||
FROM | ||
[SHOW RANGES FROM TABLE system.settings] | ||
ORDER BY | ||
range_id ASC | ||
LIMIT | ||
1;`).Scan(&rangeID)) | ||
tc.Stopper().Stop(ctx) | ||
|
||
defer func() { | ||
if t.Failed() { | ||
return | ||
} | ||
tc := testcluster.StartTestCluster(t, 1, args) | ||
defer tc.Stopper().Stop(ctx) | ||
require.NoError(t, tc.ServerConn(0).QueryRow(`SELECT count(1) FROM system.settings`).Err()) | ||
t.Log("read system.settings") | ||
}() | ||
|
||
} | ||
|
||
st := cluster.MakeTestingClusterSettings() | ||
eng, err := storage.Open(ctx, storage.Filesystem(p), st) | ||
require.NoError(t, err) | ||
defer eng.Close() | ||
|
||
// Determine LastIndex, LastTerm, and next MaxLeaseIndex by scanning | ||
// existing log. | ||
it := raftlog.NewIterator(rangeID, eng, raftlog.IterOptions{}) | ||
defer it.Close() | ||
rsl := logstore.NewStateLoader(rangeID) | ||
lastIndex, err := rsl.LoadLastIndex(ctx, eng) | ||
require.NoError(t, err) | ||
t.Logf("loaded LastIndex: %d", lastIndex) | ||
ok, err := it.SeekGE(lastIndex) | ||
require.NoError(t, err) | ||
require.True(t, ok) | ||
|
||
var lai kvpb.LeaseAppliedIndex | ||
var lastTerm uint64 | ||
require.NoError(t, raftlog.Visit(eng, rangeID, lastIndex, math.MaxUint64, func(entry raftpb.Entry) error { | ||
ent, err := raftlog.NewEntry(it.Entry()) | ||
require.NoError(t, err) | ||
if lai < ent.Cmd.MaxLeaseIndex { | ||
lai = ent.Cmd.MaxLeaseIndex | ||
} | ||
lastTerm = ent.Term | ||
return nil | ||
})) | ||
|
||
sl := stateloader.Make(rangeID) | ||
lease, err := sl.LoadLease(ctx, eng) | ||
require.NoError(t, err) | ||
|
||
for batchIdx := 0; batchIdx < batches; batchIdx++ { | ||
t.Logf("batch %d", batchIdx+1) | ||
b := storage.NewOpLoggerBatch(eng.NewBatch()) | ||
defer b.Batch.Close() | ||
|
||
var ents []raftpb.Entry | ||
for i := 0; i < entsPerBatch; i++ { | ||
lai++ | ||
c := &kvpb.ProbeRequest{} | ||
resp := &kvpb.ProbeResponse{} | ||
c.Key = keys.LocalMax | ||
|
||
cmd, ok := batcheval.LookupCommand(c.Method()) | ||
require.True(t, ok) | ||
// NB: this should really operate on a BatchRequest. We need to librarize | ||
// evaluateBatch: | ||
// https://github.com/cockroachdb/cockroach/blob/9c09473ec9da9d458869abb3fe08a9db251c9291/pkg/kv/kvserver/replica_evaluate.go#L141-L153 | ||
// The good news is, this is already in good shape! Just needs to be moved | ||
// to a leaf package, like `batcheval`. | ||
// To use the "true" logic we want this for sure, and probably even the | ||
// caller of evaluateBatch and a few more levels. Pulling it out until | ||
// there's nothing left basically. | ||
|
||
evalCtx := &batcheval.MockEvalCtx{} | ||
args := batcheval.CommandArgs{ | ||
EvalCtx: evalCtx.EvalContext(), | ||
Header: kvpb.Header{}, | ||
Args: c, | ||
Stats: &enginepb.MVCCStats{}, | ||
Uncertainty: uncertainty.Interval{}, | ||
} | ||
res, err := cmd.EvalRW(ctx, b, args, resp) | ||
require.NoError(t, err) | ||
// TODO: there's more stuff in evaluateProposal that would need to be lifted | ||
// here: | ||
// https://github.com/cockroachdb/cockroach/blob/f048ab082c58ec0357b2ecad763606ef64faa3b7/pkg/kv/kvserver/replica_proposal.go#L842-L869 | ||
res.WriteBatch = &kvserverpb.WriteBatch{Data: b.Repr()} | ||
res.LogicalOpLog = &kvserverpb.LogicalOpLog{Ops: b.LogicalOps()} | ||
|
||
// End of evaluation. Start of "proposing". | ||
|
||
// TODO: the "requires consensus" logic is not reusable, make it so: | ||
// https://github.com/cockroachdb/cockroach/blob/f048ab082c58ec0357b2ecad763606ef64faa3b7/pkg/kv/kvserver/replica_proposal.go#L827-L840 | ||
|
||
raftCmd := kvserverpb.RaftCommand{ | ||
// Propose under latest lease, this isn't necessarily what you want (in a | ||
// test) but it reflects the steady state when proposing under the leader. | ||
// To also support proposing leases itself, we need this whole chunk of code | ||
// to be reusable: | ||
// https://github.com/cockroachdb/cockroach/blob/9a7b735b1282bbb3fb7472cc26a47d516a446958/pkg/kv/kvserver/replica_raft.go#L192-L219 | ||
// Really we probably just want to librarize the relevant parts of | ||
// evalAndPropose and requestToProposal. | ||
ProposerLeaseSequence: lease.Sequence, | ||
MaxLeaseIndex: lai, | ||
// Rest was determined by evaluation. | ||
ReplicatedEvalResult: res.Replicated, | ||
WriteBatch: res.WriteBatch, | ||
LogicalOpLog: res.LogicalOpLog, | ||
} | ||
|
||
idKey := raftlog.MakeCmdIDKey() | ||
payload, err := raftlog.RaftCmdToPayload(ctx, &raftCmd, idKey) | ||
require.NoError(t, err) | ||
ents = append(ents, raftpb.Entry{ | ||
Term: lastTerm, | ||
Index: uint64(lastIndex) + uint64(len(ents)) + 1, | ||
Type: raftpb.EntryNormal, | ||
Data: payload, | ||
}) | ||
} | ||
|
||
stats := &logstore.AppendStats{} | ||
|
||
msgApp := raftpb.Message{ | ||
Type: raftpb.MsgStorageAppend, | ||
To: raft.LocalAppendThread, | ||
Term: lastTerm, | ||
LogTerm: lastTerm, | ||
Index: uint64(lastIndex), | ||
Entries: ents, | ||
Commit: uint64(lastIndex) + uint64(len(ents)), | ||
Responses: []raftpb.Message{{}}, // need >0 responses so StoreEntries will sync | ||
} | ||
|
||
fakeMeta := metric.Metadata{ | ||
Name: "fake.meta", | ||
} | ||
swl := logstore.NewSyncWaiterLoop() | ||
stopper := stop.NewStopper() | ||
defer stopper.Stop(ctx) | ||
swl.Start(ctx, stopper) | ||
ls := logstore.LogStore{ | ||
RangeID: rangeID, | ||
Engine: eng, | ||
Sideload: nil, | ||
StateLoader: rsl, | ||
SyncWaiter: swl, | ||
EntryCache: raftentry.NewCache(1024), | ||
Settings: st, | ||
Metrics: logstore.Metrics{ | ||
RaftLogCommitLatency: metric.NewHistogram(metric.HistogramOptions{ | ||
Mode: metric.HistogramModePrometheus, | ||
Metadata: fakeMeta, | ||
Duration: time.Millisecond, | ||
Buckets: metric.NetworkLatencyBuckets, | ||
}), | ||
}, | ||
} | ||
|
||
wg := &sync.WaitGroup{} | ||
wg.Add(1) | ||
_, err = ls.StoreEntries(ctx, logstore.RaftState{ | ||
LastIndex: lastIndex, | ||
LastTerm: kvpb.RaftTerm(lastTerm), | ||
}, logstore.MakeMsgStorageAppend(msgApp), (*wgSyncCallback)(wg), stats) | ||
require.NoError(t, err) | ||
wg.Wait() | ||
|
||
lastIndex = kvpb.RaftIndex(ents[len(ents)-1].Index) | ||
} | ||
|
||
t.Logf("LastIndex is now: %d", lastIndex) | ||
} | ||
|
||
type wgSyncCallback sync.WaitGroup | ||
|
||
func (w *wgSyncCallback) OnLogSync( | ||
ctx context.Context, messages []raftpb.Message, stats storage.BatchCommitStats, | ||
) { | ||
(*sync.WaitGroup)(w).Done() | ||
} |