diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 1c06fbbe7b66..f6a9380d54a1 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -238,6 +238,7 @@ go_test( "below_raft_protos_test.go", "client_atomic_membership_change_test.go", "client_lease_test.go", + "client_manual_proposal_test.go", "client_merge_test.go", "client_metrics_test.go", "client_migration_test.go", @@ -389,6 +390,7 @@ go_test( "//pkg/kv/kvserver/protectedts/ptpb", "//pkg/kv/kvserver/protectedts/ptstorage", "//pkg/kv/kvserver/protectedts/ptutil", + "//pkg/kv/kvserver/raftentry", "//pkg/kv/kvserver/raftlog", "//pkg/kv/kvserver/raftutil", "//pkg/kv/kvserver/rditer", diff --git a/pkg/kv/kvserver/client_manual_proposal_test.go b/pkg/kv/kvserver/client_manual_proposal_test.go new file mode 100644 index 000000000000..676d0bacdb8f --- /dev/null +++ b/pkg/kv/kvserver/client_manual_proposal_test.go @@ -0,0 +1,270 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvserver_test + +import ( + "context" + "math" + "os" + "path/filepath" + "sync" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftentry" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/stretchr/testify/require" + "go.etcd.io/raft/v3" + "go.etcd.io/raft/v3/raftpb" +) + +// TestCreateManyUnappliedProbes is a (by default skipped) test that writes +// a very large unapplied raft log consisting entirely of probes. +// +// It's a toy example for #75729 but has been useful to validate improvements +// in the raft pipeline, so it is checked in to allow for future re-use for +// similar purposes. +func TestCreateManyUnappliedProbes(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + skip.UnderShort(t, "takes ~4s") + + ctx := context.Background() + + // NB: I used this as follows: + // + // p := os.ExpandEnv("$HOME/go/src/github.com/cockroachdb/cockroach/cockroach-data") + // const entsPerBatch = 100000 + // const batches = 1000 + // + // ./dev build && rm -rf cockroach-data && timeout 10 ./cockroach start-single-node --logtostderr --insecure ; \ + // go test ./pkg/kv/kvserver/ -v --run TestCreateManyUnappliedProbes && sleep 3 && \ + // (./cockroach start-single-node --logtostderr=INFO --insecure | grep -F r10/) + // + // Then wait and watch the `raft.commandsapplied` metric to see r10 apply the entries. + p := filepath.Join(t.TempDir(), "cockroach-data") + const entsPerBatch = 10 + const batches = 3 + rangeID := roachpb.RangeID(10) // system.settings + + if _, err := os.Stat(p); err != nil { + args := base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{StoreSpecs: []base.StoreSpec{ + {Path: p}, + }}, + ReplicationMode: base.ReplicationManual, + } + tc := testcluster.StartTestCluster(t, 1, args) + // Reload system.settings' rangeID just in case it changes. + require.NoError(t, tc.ServerConn(0).QueryRow(`SELECT + range_id +FROM + [SHOW RANGES FROM TABLE system.settings] +ORDER BY + range_id ASC +LIMIT + 1;`).Scan(&rangeID)) + tc.Stopper().Stop(ctx) + + defer func() { + if t.Failed() { + return + } + tc := testcluster.StartTestCluster(t, 1, args) + defer tc.Stopper().Stop(ctx) + require.NoError(t, tc.ServerConn(0).QueryRow(`SELECT count(1) FROM system.settings`).Err()) + t.Log("read system.settings") + }() + + } + + st := cluster.MakeTestingClusterSettings() + eng, err := storage.Open(ctx, storage.Filesystem(p), st) + require.NoError(t, err) + defer eng.Close() + + // Determine LastIndex, LastTerm, and next MaxLeaseIndex by scanning + // existing log. + it := raftlog.NewIterator(rangeID, eng, raftlog.IterOptions{}) + defer it.Close() + rsl := logstore.NewStateLoader(rangeID) + lastIndex, err := rsl.LoadLastIndex(ctx, eng) + require.NoError(t, err) + t.Logf("loaded LastIndex: %d", lastIndex) + ok, err := it.SeekGE(lastIndex) + require.NoError(t, err) + require.True(t, ok) + + var lai kvpb.LeaseAppliedIndex + var lastTerm uint64 + require.NoError(t, raftlog.Visit(eng, rangeID, lastIndex, math.MaxUint64, func(entry raftpb.Entry) error { + ent, err := raftlog.NewEntry(it.Entry()) + require.NoError(t, err) + if lai < ent.Cmd.MaxLeaseIndex { + lai = ent.Cmd.MaxLeaseIndex + } + lastTerm = ent.Term + return nil + })) + + sl := stateloader.Make(rangeID) + lease, err := sl.LoadLease(ctx, eng) + require.NoError(t, err) + + for batchIdx := 0; batchIdx < batches; batchIdx++ { + t.Logf("batch %d", batchIdx+1) + b := storage.NewOpLoggerBatch(eng.NewBatch()) + defer b.Batch.Close() + + var ents []raftpb.Entry + for i := 0; i < entsPerBatch; i++ { + lai++ + c := &kvpb.ProbeRequest{} + resp := &kvpb.ProbeResponse{} + c.Key = keys.LocalMax + + cmd, ok := batcheval.LookupCommand(c.Method()) + require.True(t, ok) + // NB: this should really operate on a BatchRequest. We need to librarize + // evaluateBatch: + // https://github.com/cockroachdb/cockroach/blob/9c09473ec9da9d458869abb3fe08a9db251c9291/pkg/kv/kvserver/replica_evaluate.go#L141-L153 + // The good news is, this is already in good shape! Just needs to be moved + // to a leaf package, like `batcheval`. + // To use the "true" logic we want this for sure, and probably even the + // caller of evaluateBatch and a few more levels. Pulling it out until + // there's nothing left basically. + + evalCtx := &batcheval.MockEvalCtx{} + args := batcheval.CommandArgs{ + EvalCtx: evalCtx.EvalContext(), + Header: kvpb.Header{}, + Args: c, + Stats: &enginepb.MVCCStats{}, + Uncertainty: uncertainty.Interval{}, + } + res, err := cmd.EvalRW(ctx, b, args, resp) + require.NoError(t, err) + // TODO: there's more stuff in evaluateProposal that would need to be lifted + // here: + // https://github.com/cockroachdb/cockroach/blob/f048ab082c58ec0357b2ecad763606ef64faa3b7/pkg/kv/kvserver/replica_proposal.go#L842-L869 + res.WriteBatch = &kvserverpb.WriteBatch{Data: b.Repr()} + res.LogicalOpLog = &kvserverpb.LogicalOpLog{Ops: b.LogicalOps()} + + // End of evaluation. Start of "proposing". + + // TODO: the "requires consensus" logic is not reusable, make it so: + // https://github.com/cockroachdb/cockroach/blob/f048ab082c58ec0357b2ecad763606ef64faa3b7/pkg/kv/kvserver/replica_proposal.go#L827-L840 + + raftCmd := kvserverpb.RaftCommand{ + // Propose under latest lease, this isn't necessarily what you want (in a + // test) but it reflects the steady state when proposing under the leader. + // To also support proposing leases itself, we need this whole chunk of code + // to be reusable: + // https://github.com/cockroachdb/cockroach/blob/9a7b735b1282bbb3fb7472cc26a47d516a446958/pkg/kv/kvserver/replica_raft.go#L192-L219 + // Really we probably just want to librarize the relevant parts of + // evalAndPropose and requestToProposal. + ProposerLeaseSequence: lease.Sequence, + MaxLeaseIndex: lai, + // Rest was determined by evaluation. + ReplicatedEvalResult: res.Replicated, + WriteBatch: res.WriteBatch, + LogicalOpLog: res.LogicalOpLog, + } + + idKey := raftlog.MakeCmdIDKey() + payload, err := raftlog.RaftCmdToPayload(ctx, &raftCmd, idKey) + require.NoError(t, err) + ents = append(ents, raftpb.Entry{ + Term: lastTerm, + Index: uint64(lastIndex) + uint64(len(ents)) + 1, + Type: raftpb.EntryNormal, + Data: payload, + }) + } + + stats := &logstore.AppendStats{} + + msgApp := raftpb.Message{ + Type: raftpb.MsgStorageAppend, + To: raft.LocalAppendThread, + Term: lastTerm, + LogTerm: lastTerm, + Index: uint64(lastIndex), + Entries: ents, + Commit: uint64(lastIndex) + uint64(len(ents)), + Responses: []raftpb.Message{{}}, // need >0 responses so StoreEntries will sync + } + + fakeMeta := metric.Metadata{ + Name: "fake.meta", + } + swl := logstore.NewSyncWaiterLoop() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + swl.Start(ctx, stopper) + ls := logstore.LogStore{ + RangeID: rangeID, + Engine: eng, + Sideload: nil, + StateLoader: rsl, + SyncWaiter: swl, + EntryCache: raftentry.NewCache(1024), + Settings: st, + Metrics: logstore.Metrics{ + RaftLogCommitLatency: metric.NewHistogram(metric.HistogramOptions{ + Mode: metric.HistogramModePrometheus, + Metadata: fakeMeta, + Duration: time.Millisecond, + Buckets: metric.NetworkLatencyBuckets, + }), + }, + } + + wg := &sync.WaitGroup{} + wg.Add(1) + _, err = ls.StoreEntries(ctx, logstore.RaftState{ + LastIndex: lastIndex, + LastTerm: kvpb.RaftTerm(lastTerm), + }, logstore.MakeMsgStorageAppend(msgApp), (*wgSyncCallback)(wg), stats) + require.NoError(t, err) + wg.Wait() + + lastIndex = kvpb.RaftIndex(ents[len(ents)-1].Index) + } + + t.Logf("LastIndex is now: %d", lastIndex) +} + +type wgSyncCallback sync.WaitGroup + +func (w *wgSyncCallback) OnLogSync( + ctx context.Context, messages []raftpb.Message, stats storage.BatchCommitStats, +) { + (*sync.WaitGroup)(w).Done() +}