diff --git a/pkg/cli/BUILD.bazel b/pkg/cli/BUILD.bazel index 167590c11634..1ca59797867d 100644 --- a/pkg/cli/BUILD.bazel +++ b/pkg/cli/BUILD.bazel @@ -127,6 +127,7 @@ go_library( "//pkg/keys", "//pkg/kv/kvserver", "//pkg/kv/kvserver/gc", + "//pkg/kv/kvserver/kvstorage", "//pkg/kv/kvserver/liveness/livenesspb", "//pkg/kv/kvserver/loqrecovery", "//pkg/kv/kvserver/loqrecovery/loqrecoverypb", diff --git a/pkg/cli/debug_check_store.go b/pkg/cli/debug_check_store.go index 68de28c9061f..5936dc051c71 100644 --- a/pkg/cli/debug_check_store.go +++ b/pkg/cli/debug_check_store.go @@ -19,7 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/cli/clierrorplus" "github.com/cockroachdb/cockroach/pkg/keys" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -166,7 +166,7 @@ func checkStoreRangeStats( } go func() { - if err := kvserver.IterateRangeDescriptorsFromDisk(ctx, eng, + if err := kvstorage.IterateRangeDescriptorsFromDisk(ctx, eng, func(desc roachpb.RangeDescriptor) error { inCh <- checkInput{eng: eng, desc: &desc, sl: stateloader.Make(desc.RangeID)} return nil diff --git a/pkg/cli/debug_recover_loss_of_quorum.go b/pkg/cli/debug_recover_loss_of_quorum.go index ae645a3af6a4..f627ca144ad1 100644 --- a/pkg/cli/debug_recover_loss_of_quorum.go +++ b/pkg/cli/debug_recover_loss_of_quorum.go @@ -19,7 +19,7 @@ import ( "strings" "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery/loqrecoverypb" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -465,7 +465,7 @@ func runDebugExecuteRecoverPlan(cmd *cobra.Command, args []string) error { defer store.Close() defer batch.Close() - storeIdent, err := kvserver.ReadStoreIdent(cmd.Context(), store) + storeIdent, err := kvstorage.ReadStoreIdent(cmd.Context(), store) if err != nil { return err } diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 51f5c8e2bd1b..eaeeae55bc28 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -134,6 +134,7 @@ go_library( "//pkg/kv/kvserver/kvadmission", "//pkg/kv/kvserver/kvserverbase", "//pkg/kv/kvserver/kvserverpb", + "//pkg/kv/kvserver/kvstorage", "//pkg/kv/kvserver/liveness", "//pkg/kv/kvserver/liveness/livenesspb", "//pkg/kv/kvserver/logstore", @@ -354,6 +355,7 @@ go_test( "//pkg/kv/kvserver/intentresolver", "//pkg/kv/kvserver/kvserverbase", "//pkg/kv/kvserver/kvserverpb", + "//pkg/kv/kvserver/kvstorage", "//pkg/kv/kvserver/liveness", "//pkg/kv/kvserver/liveness/livenesspb", "//pkg/kv/kvserver/logstore", diff --git a/pkg/kv/kvserver/client_replica_test.go b/pkg/kv/kvserver/client_replica_test.go index 9d20c34f536f..00a870da1ba0 100644 --- a/pkg/kv/kvserver/client_replica_test.go +++ b/pkg/kv/kvserver/client_replica_test.go @@ -33,6 +33,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptutil" @@ -2701,7 +2702,7 @@ func TestSystemZoneConfigs(t *testing.T) { waitForReplicas := func() error { replicas := make(map[roachpb.RangeID]roachpb.RangeDescriptor) for _, s := range tc.Servers { - if err := kvserver.IterateRangeDescriptorsFromDisk(ctx, s.Engines()[0], func(desc roachpb.RangeDescriptor) error { + if err := kvstorage.IterateRangeDescriptorsFromDisk(ctx, s.Engines()[0], func(desc roachpb.RangeDescriptor) error { if len(desc.Replicas().LearnerDescriptors()) > 0 { return fmt.Errorf("descriptor contains learners: %v", desc) } diff --git a/pkg/kv/kvserver/consistency_queue_test.go b/pkg/kv/kvserver/consistency_queue_test.go index 7d4b9adcb627..5ff6eabe92b0 100644 --- a/pkg/kv/kvserver/consistency_queue_test.go +++ b/pkg/kv/kvserver/consistency_queue_test.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server" @@ -370,7 +371,7 @@ func TestCheckConsistencyInconsistent(t *testing.T) { // Find the problematic range in the storage. var desc *roachpb.RangeDescriptor - require.NoError(t, kvserver.IterateRangeDescriptorsFromDisk(context.Background(), cpEng, + require.NoError(t, kvstorage.IterateRangeDescriptorsFromDisk(context.Background(), cpEng, func(rd roachpb.RangeDescriptor) error { if rd.RangeID == resp.Result[0].RangeID { desc = &rd diff --git a/pkg/kv/kvserver/kvstorage/BUILD.bazel b/pkg/kv/kvserver/kvstorage/BUILD.bazel index a17f50ad0b76..110f33cf9416 100644 --- a/pkg/kv/kvserver/kvstorage/BUILD.bazel +++ b/pkg/kv/kvserver/kvstorage/BUILD.bazel @@ -3,9 +3,24 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "kvstorage", - srcs = ["doc.go"], + srcs = [ + "doc.go", + "init.go", + ], importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage", visibility = ["//visibility:public"], + deps = [ + "//pkg/keys", + "//pkg/kv/kvserver/logstore", + "//pkg/roachpb", + "//pkg/storage", + "//pkg/util/buildutil", + "//pkg/util/hlc", + "//pkg/util/iterutil", + "//pkg/util/log", + "//pkg/util/protoutil", + "@com_github_cockroachdb_errors//:errors", + ], ) get_x_data(name = "get_x_data") diff --git a/pkg/kv/kvserver/kvstorage/init.go b/pkg/kv/kvserver/kvstorage/init.go new file mode 100644 index 000000000000..9a7e96771148 --- /dev/null +++ b/pkg/kv/kvserver/kvstorage/init.go @@ -0,0 +1,283 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvstorage + +import ( + "bytes" + "context" + + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/util/buildutil" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/iterutil" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/errors" +) + +// IterateIDPrefixKeys helps visit system keys that use RangeID prefixing (such +// as RaftHardStateKey, RangeTombstoneKey, and many others). Such keys could in +// principle exist at any RangeID, and this helper efficiently discovers all the +// keys of the desired type (as specified by the supplied `keyFn`) and, for each +// key-value pair discovered, unmarshals it into `msg` and then invokes `f`. +// +// Iteration stops on the first error (and will pass through that error). +func IterateIDPrefixKeys( + ctx context.Context, + reader storage.Reader, + keyFn func(roachpb.RangeID) roachpb.Key, + msg protoutil.Message, + f func(_ roachpb.RangeID) error, +) error { + rangeID := roachpb.RangeID(1) + // NB: Range-ID local keys have no versions and no intents. + iter := reader.NewMVCCIterator(storage.MVCCKeyIterKind, storage.IterOptions{ + UpperBound: keys.LocalRangeIDPrefix.PrefixEnd().AsRawKey(), + }) + defer iter.Close() + + for { + bumped := false + mvccKey := storage.MakeMVCCMetadataKey(keyFn(rangeID)) + iter.SeekGE(mvccKey) + + if ok, err := iter.Valid(); !ok { + return err + } + + unsafeKey := iter.UnsafeKey() + + if !bytes.HasPrefix(unsafeKey.Key, keys.LocalRangeIDPrefix) { + // Left the local keyspace, so we're done. + return nil + } + + curRangeID, _, _, _, err := keys.DecodeRangeIDKey(unsafeKey.Key) + if err != nil { + return err + } + + if curRangeID > rangeID { + // `bumped` is always `false` here, but let's be explicit. + if !bumped { + rangeID = curRangeID + bumped = true + } + mvccKey = storage.MakeMVCCMetadataKey(keyFn(rangeID)) + } + + if !unsafeKey.Key.Equal(mvccKey.Key) { + if !bumped { + // Don't increment the rangeID if it has already been incremented + // above, or we could skip past a value we ought to see. + rangeID++ + bumped = true // for completeness' sake; continuing below anyway + } + continue + } + + ok, err := storage.MVCCGetProto( + ctx, reader, unsafeKey.Key, hlc.Timestamp{}, msg, storage.MVCCGetOptions{}) + if err != nil { + return err + } + if !ok { + return errors.Errorf("unable to unmarshal %s into %T", unsafeKey.Key, msg) + } + + if err := f(rangeID); err != nil { + return iterutil.Map(err) + } + rangeID++ + } +} + +// ReadStoreIdent reads the StoreIdent from the store. +// It returns *NotBootstrappedError if the ident is missing (meaning that the +// store needs to be bootstrapped). +func ReadStoreIdent(ctx context.Context, eng storage.Engine) (roachpb.StoreIdent, error) { + var ident roachpb.StoreIdent + ok, err := storage.MVCCGetProto( + ctx, eng, keys.StoreIdentKey(), hlc.Timestamp{}, &ident, storage.MVCCGetOptions{}) + if err != nil { + return roachpb.StoreIdent{}, err + } else if !ok { + return roachpb.StoreIdent{}, &NotBootstrappedError{} + } + return ident, err +} + +// IterateRangeDescriptorsFromDisk discovers the initialized replicas and calls +// the provided function with each such descriptor from the provided Engine. The +// return values of this method and fn have semantics similar to +// storage.MVCCIterate. +func IterateRangeDescriptorsFromDisk( + ctx context.Context, reader storage.Reader, fn func(desc roachpb.RangeDescriptor) error, +) error { + log.Event(ctx, "beginning range descriptor iteration") + // MVCCIterator over all range-local key-based data. + start := keys.RangeDescriptorKey(roachpb.RKeyMin) + end := keys.RangeDescriptorKey(roachpb.RKeyMax) + + allCount := 0 + matchCount := 0 + bySuffix := make(map[string]int) + kvToDesc := func(kv roachpb.KeyValue) error { + allCount++ + // Only consider range metadata entries; ignore others. + startKey, suffix, _, err := keys.DecodeRangeKey(kv.Key) + if err != nil { + return err + } + bySuffix[string(suffix)]++ + if !bytes.Equal(suffix, keys.LocalRangeDescriptorSuffix) { + return nil + } + var desc roachpb.RangeDescriptor + if err := kv.Value.GetProto(&desc); err != nil { + return err + } + // Descriptor for range `[a,z)` must be found at `/rdsc/a`. + if !startKey.Equal(desc.StartKey.AsRawKey()) { + return errors.AssertionFailedf("descriptor stored at %s but has StartKey %s", + kv.Key, desc.StartKey) + } + if !desc.IsInitialized() { + return errors.AssertionFailedf("uninitialized descriptor: %s", desc) + } + matchCount++ + err = fn(desc) + if err == nil { + return nil + } + if iterutil.Map(err) == nil { + return iterutil.StopIteration() + } + return err + } + + _, err := storage.MVCCIterate(ctx, reader, start, end, hlc.MaxTimestamp, + storage.MVCCScanOptions{Inconsistent: true}, kvToDesc) + log.Eventf(ctx, "iterated over %d keys to find %d range descriptors (by suffix: %v)", + allCount, matchCount, bySuffix) + return err +} + +type EngineReplicas struct { + Uninitialized map[storage.FullReplicaID]struct{} + Initialized map[storage.FullReplicaID]*roachpb.RangeDescriptor +} + +// loadFullReplicaIDsFromDisk discovers all Replicas on this Store. +// There will only be one entry in the map for a given RangeID. +// +// TODO(sep-raft-log): the reader here is for the log engine. +func loadFullReplicaIDsFromDisk( + ctx context.Context, reader storage.Reader, +) (map[storage.FullReplicaID]struct{}, error) { + m := map[storage.FullReplicaID]struct{}{} + var msg roachpb.RaftReplicaID + if err := IterateIDPrefixKeys(ctx, reader, func(rangeID roachpb.RangeID) roachpb.Key { + return keys.RaftReplicaIDKey(rangeID) + }, &msg, func(rangeID roachpb.RangeID) error { + m[storage.FullReplicaID{RangeID: rangeID, ReplicaID: msg.ReplicaID}] = struct{}{} + return nil + }); err != nil { + return nil, err + } + + // TODO(sep-raft-log): if there is any other data that we mandate is present here + // (like a HardState), validate that here. + + return m, nil +} + +// LoadAndReconcileReplicas loads the Replicas present on this +// store. It reconciles inconsistent state and runs validation checks. +// +// TOOD(sep-raft-log): consider a callback-visitor pattern here. +func LoadAndReconcileReplicas(ctx context.Context, eng storage.Engine) (*EngineReplicas, error) { + ident, err := ReadStoreIdent(ctx, eng) + if err != nil { + return nil, err + } + + initM := map[storage.FullReplicaID]*roachpb.RangeDescriptor{} + // INVARIANT: the latest visible committed version of the RangeDescriptor + // (which is what IterateRangeDescriptorsFromDisk returns) is the one reflecting + // the state of the Replica. + // INVARIANT: the descriptor for range [a,z) is located at RangeDescriptorKey(a). + // This is checked in IterateRangeDescriptorsFromDisk. + if err := IterateRangeDescriptorsFromDisk( + ctx, eng, func(desc roachpb.RangeDescriptor) error { + // INVARIANT: a Replica's RangeDescriptor always contains the local Store, + // i.e. a Store is a member of all of its local Replicas. + repDesc, found := desc.GetReplicaDescriptor(ident.StoreID) + if !found { + return errors.AssertionFailedf( + "RangeDescriptor does not contain local s%d: %s", + ident.StoreID, desc) + } + + initM[storage.FullReplicaID{ + RangeID: desc.RangeID, + ReplicaID: repDesc.ReplicaID, + }] = &desc + return nil + }); err != nil { + return nil, err + } + + allM, err := loadFullReplicaIDsFromDisk(ctx, eng) + if err != nil { + return nil, err + } + + for id := range initM { + if _, ok := allM[id]; !ok { + // INVARIANT: all replicas have a persisted full replicaID (i.e. a "replicaID from disk"). + // + // This invariant is true for replicas created in 22.2, but no migration + // was ever written. So we backfill the replicaID here (as of 23.1) and + // remove this code in the future (the follow-up release, assuming it is + // forced to migrate through 23.1, otherwise later). + if buildutil.CrdbTestBuild { + return nil, errors.AssertionFailedf("%s has no persisted replicaID", initM[id]) + } + if err := logstore.NewStateLoader(id.RangeID).SetRaftReplicaID(ctx, eng, id.ReplicaID); err != nil { + return nil, errors.Wrapf(err, "backfilling replicaID for r%d", id.RangeID) + } + log.Eventf(ctx, "backfilled replicaID for %s", id) + } + // `allM` will be our map of uninitialized replicas. + // + // A replica is "uninitialized" if it's not in initM (i.e. is at log position + // zero and has no visible RangeDescriptor). + delete(allM, id) + } + + return &EngineReplicas{ + Initialized: initM, + Uninitialized: allM, // NB: init'ed ones were deleted earlier + }, nil +} + +// A NotBootstrappedError indicates that an engine has not yet been +// bootstrapped due to a store identifier not being present. +type NotBootstrappedError struct{} + +// Error formats error. +func (e *NotBootstrappedError) Error() string { + return "store has not been bootstrapped" +} diff --git a/pkg/kv/kvserver/loqrecovery/BUILD.bazel b/pkg/kv/kvserver/loqrecovery/BUILD.bazel index b9d7029b6b55..f7355709e72e 100644 --- a/pkg/kv/kvserver/loqrecovery/BUILD.bazel +++ b/pkg/kv/kvserver/loqrecovery/BUILD.bazel @@ -14,8 +14,8 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/keys", - "//pkg/kv/kvserver", "//pkg/kv/kvserver/kvserverpb", + "//pkg/kv/kvserver/kvstorage", "//pkg/kv/kvserver/loqrecovery/loqrecoverypb", "//pkg/kv/kvserver/raftlog", "//pkg/kv/kvserver/stateloader", @@ -49,6 +49,7 @@ go_test( "//pkg/kv/kvserver", "//pkg/kv/kvserver/kvserverbase", "//pkg/kv/kvserver/kvserverpb", + "//pkg/kv/kvserver/kvstorage", "//pkg/kv/kvserver/loqrecovery/loqrecoverypb", "//pkg/kv/kvserver/stateloader", "//pkg/roachpb", diff --git a/pkg/kv/kvserver/loqrecovery/collect.go b/pkg/kv/kvserver/loqrecovery/collect.go index 2c9154838c87..4ba2a87b6618 100644 --- a/pkg/kv/kvserver/loqrecovery/collect.go +++ b/pkg/kv/kvserver/loqrecovery/collect.go @@ -14,7 +14,7 @@ import ( "context" "math" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery/loqrecoverypb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" @@ -34,11 +34,11 @@ func CollectReplicaInfo( var replicas []loqrecoverypb.ReplicaInfo for _, reader := range stores { - storeIdent, err := kvserver.ReadStoreIdent(ctx, reader) + storeIdent, err := kvstorage.ReadStoreIdent(ctx, reader) if err != nil { return loqrecoverypb.NodeReplicaInfo{}, err } - if err = kvserver.IterateRangeDescriptorsFromDisk(ctx, reader, func(desc roachpb.RangeDescriptor) error { + if err = kvstorage.IterateRangeDescriptorsFromDisk(ctx, reader, func(desc roachpb.RangeDescriptor) error { rsl := stateloader.Make(desc.RangeID) rstate, err := rsl.Load(ctx, reader, &desc) if err != nil { diff --git a/pkg/kv/kvserver/loqrecovery/recovery_env_test.go b/pkg/kv/kvserver/loqrecovery/recovery_env_test.go index bc46986b6bf6..1ceb2611a564 100644 --- a/pkg/kv/kvserver/loqrecovery/recovery_env_test.go +++ b/pkg/kv/kvserver/loqrecovery/recovery_env_test.go @@ -20,9 +20,9 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/keys" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery/loqrecoverypb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -540,7 +540,7 @@ func (e *quorumRecoveryEnv) handleDumpStore(t *testing.T, d datadriven.TestData) var descriptorViews []storeDescriptorView var localDataViews []localDataView store := e.stores[storeID] - err := kvserver.IterateRangeDescriptorsFromDisk(ctx, store.engine, + err := kvstorage.IterateRangeDescriptorsFromDisk(ctx, store.engine, func(desc roachpb.RangeDescriptor) error { descriptorViews = append(descriptorViews, descriptorView(desc)) diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index bf37867ef01e..93f8d94d8806 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -43,9 +43,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/multiqueue" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftentry" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed" @@ -65,7 +65,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" - "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/envutil" @@ -76,7 +75,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log/logcrash" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/mon" - "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/quotapool" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/shuffle" @@ -347,15 +345,6 @@ func verifyKeys(start, end roachpb.Key, checkEndKey bool) error { return nil } -// A NotBootstrappedError indicates that an engine has not yet been -// bootstrapped due to a store identifier not being present. -type NotBootstrappedError struct{} - -// Error formats error. -func (e *NotBootstrappedError) Error() string { - return "store has not been bootstrapped" -} - // A storeReplicaVisitor calls a visitor function for each of a store's // initialized Replicas (in unspecified order). It provides an option // to visit replicas in increasing RangeID order. @@ -1772,261 +1761,13 @@ func (s *Store) IsStarted() bool { return atomic.LoadInt32(&s.started) == 1 } -// IterateIDPrefixKeys helps visit system keys that use RangeID prefixing (such -// as RaftHardStateKey, RangeTombstoneKey, and many others). Such keys could in -// principle exist at any RangeID, and this helper efficiently discovers all the -// keys of the desired type (as specified by the supplied `keyFn`) and, for each -// key-value pair discovered, unmarshals it into `msg` and then invokes `f`. -// -// Iteration stops on the first error (and will pass through that error). -func IterateIDPrefixKeys( - ctx context.Context, - reader storage.Reader, - keyFn func(roachpb.RangeID) roachpb.Key, - msg protoutil.Message, - f func(_ roachpb.RangeID) error, -) error { - rangeID := roachpb.RangeID(1) - // NB: Range-ID local keys have no versions and no intents. - iter := reader.NewMVCCIterator(storage.MVCCKeyIterKind, storage.IterOptions{ - UpperBound: keys.LocalRangeIDPrefix.PrefixEnd().AsRawKey(), - }) - defer iter.Close() - - for { - bumped := false - mvccKey := storage.MakeMVCCMetadataKey(keyFn(rangeID)) - iter.SeekGE(mvccKey) - - if ok, err := iter.Valid(); !ok { - return err - } - - unsafeKey := iter.UnsafeKey() - - if !bytes.HasPrefix(unsafeKey.Key, keys.LocalRangeIDPrefix) { - // Left the local keyspace, so we're done. - return nil - } - - curRangeID, _, _, _, err := keys.DecodeRangeIDKey(unsafeKey.Key) - if err != nil { - return err - } - - if curRangeID > rangeID { - // `bumped` is always `false` here, but let's be explicit. - if !bumped { - rangeID = curRangeID - bumped = true - } - mvccKey = storage.MakeMVCCMetadataKey(keyFn(rangeID)) - } - - if !unsafeKey.Key.Equal(mvccKey.Key) { - if !bumped { - // Don't increment the rangeID if it has already been incremented - // above, or we could skip past a value we ought to see. - rangeID++ - bumped = true // for completeness' sake; continuing below anyway - } - continue - } - - ok, err := storage.MVCCGetProto( - ctx, reader, unsafeKey.Key, hlc.Timestamp{}, msg, storage.MVCCGetOptions{}) - if err != nil { - return err - } - if !ok { - return errors.Errorf("unable to unmarshal %s into %T", unsafeKey.Key, msg) - } - - if err := f(rangeID); err != nil { - return iterutil.Map(err) - } - rangeID++ - } -} - -// IterateRangeDescriptorsFromDisk discovers the initialized replicas and calls -// the provided function with each such descriptor from the provided Engine. The -// return values of this method and fn have semantics similar to -// storage.MVCCIterate. -func IterateRangeDescriptorsFromDisk( - ctx context.Context, reader storage.Reader, fn func(desc roachpb.RangeDescriptor) error, -) error { - log.Event(ctx, "beginning range descriptor iteration") - // MVCCIterator over all range-local key-based data. - start := keys.RangeDescriptorKey(roachpb.RKeyMin) - end := keys.RangeDescriptorKey(roachpb.RKeyMax) - - allCount := 0 - matchCount := 0 - bySuffix := make(map[string]int) - kvToDesc := func(kv roachpb.KeyValue) error { - allCount++ - // Only consider range metadata entries; ignore others. - startKey, suffix, _, err := keys.DecodeRangeKey(kv.Key) - if err != nil { - return err - } - bySuffix[string(suffix)]++ - if !bytes.Equal(suffix, keys.LocalRangeDescriptorSuffix) { - return nil - } - var desc roachpb.RangeDescriptor - if err := kv.Value.GetProto(&desc); err != nil { - return err - } - // Descriptor for range `[a,z)` must be found at `/rdsc/a`. - if !startKey.Equal(desc.StartKey.AsRawKey()) { - return errors.AssertionFailedf("descriptor stored at %s but has StartKey %s", - kv.Key, desc.StartKey) - } - if !desc.IsInitialized() { - return errors.AssertionFailedf("uninitialized descriptor: %s", desc) - } - matchCount++ - err = fn(desc) - if err == nil { - return nil - } - if iterutil.Map(err) == nil { - return iterutil.StopIteration() - } - return err - } - - _, err := storage.MVCCIterate(ctx, reader, start, end, hlc.MaxTimestamp, - storage.MVCCScanOptions{Inconsistent: true}, kvToDesc) - log.Eventf(ctx, "iterated over %d keys to find %d range descriptors (by suffix: %v)", - allCount, matchCount, bySuffix) - return err -} - -// ReadStoreIdent reads the StoreIdent from the store. -// It returns *NotBootstrappedError if the ident is missing (meaning that the -// store needs to be bootstrapped). -func ReadStoreIdent(ctx context.Context, eng storage.Engine) (roachpb.StoreIdent, error) { - var ident roachpb.StoreIdent - ok, err := storage.MVCCGetProto( - ctx, eng, keys.StoreIdentKey(), hlc.Timestamp{}, &ident, storage.MVCCGetOptions{}) - if err != nil { - return roachpb.StoreIdent{}, err - } else if !ok { - return roachpb.StoreIdent{}, &NotBootstrappedError{} - } - return ident, err -} - -type engineReplicas struct { - uninitialized map[storage.FullReplicaID]struct{} - initialized map[storage.FullReplicaID]*roachpb.RangeDescriptor -} - -// loadFullReplicaIDsFromDisk discovers all Replicas on this Store. -// There will only be one entry in the map for a given RangeID. -// -// TODO(sep-raft-log): the reader here is for the log engine. -func loadFullReplicaIDsFromDisk( - ctx context.Context, reader storage.Reader, -) (map[storage.FullReplicaID]struct{}, error) { - m := map[storage.FullReplicaID]struct{}{} - var msg roachpb.RaftReplicaID - if err := IterateIDPrefixKeys(ctx, reader, func(rangeID roachpb.RangeID) roachpb.Key { - return keys.RaftReplicaIDKey(rangeID) - }, &msg, func(rangeID roachpb.RangeID) error { - m[storage.FullReplicaID{RangeID: rangeID, ReplicaID: msg.ReplicaID}] = struct{}{} - return nil - }); err != nil { - return nil, err - } - - // TODO(sep-raft-log): if there is any other data that we mandate is present here - // (like a HardState), validate that here. - - return m, nil -} - -// loadAndReconcileReplicas loads the Replicas present on this -// store. It reconciles inconsistent state and runs validation checks. -// -// TODO(sep-raft-log): also load *uninitialized* Replicas. -// TOOD(sep-raft-log): consider a callback-visitor pattern here. -func loadAndReconcileReplicas(ctx context.Context, eng storage.Engine) (*engineReplicas, error) { - ident, err := ReadStoreIdent(ctx, eng) - if err != nil { - return nil, err - } - - initM := map[storage.FullReplicaID]*roachpb.RangeDescriptor{} - // INVARIANT: the latest visible committed version of the RangeDescriptor - // (which is what IterateRangeDescriptorsFromDisk returns) is the one reflecting - // the state of the Replica. - // INVARIANT: the descriptor for range [a,z) is located at RangeDescriptorKey(a). - // This is checked in IterateRangeDescriptorsFromDisk. - if err := IterateRangeDescriptorsFromDisk( - ctx, eng, func(desc roachpb.RangeDescriptor) error { - // INVARIANT: a Replica's RangeDescriptor always contains the local Store, - // i.e. a Store is a member of all of its local Replicas. - repDesc, found := desc.GetReplicaDescriptor(ident.StoreID) - if !found { - return errors.AssertionFailedf( - "RangeDescriptor does not contain local s%d: %s", - ident.StoreID, desc) - } - - initM[storage.FullReplicaID{ - RangeID: desc.RangeID, - ReplicaID: repDesc.ReplicaID, - }] = &desc - return nil - }); err != nil { - return nil, err - } - - allM, err := loadFullReplicaIDsFromDisk(ctx, eng) - if err != nil { - return nil, err - } - - for id := range initM { - if _, ok := allM[id]; !ok { - // INVARIANT: all replicas have a persisted full replicaID (i.e. a "replicaID from disk"). - // - // This invariant is true for replicas created in 22.2, but no migration - // was ever written. So we backfill the replicaID here (as of 23.1) and - // remove this code in the future (the follow-up release, assuming it is - // forced to migrate through 23.1, otherwise later). - if buildutil.CrdbTestBuild { - return nil, errors.AssertionFailedf("%s has no persisted replicaID", initM[id]) - } - if err := logstore.NewStateLoader(id.RangeID).SetRaftReplicaID(ctx, eng, id.ReplicaID); err != nil { - return nil, errors.Wrapf(err, "backfilling replicaID for r%d", id.RangeID) - } - log.Eventf(ctx, "backfilled replicaID for %s", id) - } - // `allM` will be our map of uninitialized replicas. - // - // A replica is "uninitialized" if it's not in initM (i.e. is at log position - // zero and has no visible RangeDescriptor). - delete(allM, id) - } - - return &engineReplicas{ - initialized: initM, - uninitialized: allM, // NB: init'ed ones were deleted earlier - }, nil -} - // Start the engine, set the GC and read the StoreIdent. func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { s.stopper = stopper // Populate the store ident. If not bootstrapped, ReadStoreIntent will // return an error. - ident, err := ReadStoreIdent(ctx, s.engine) + ident, err := kvstorage.ReadStoreIdent(ctx, s.engine) if err != nil { return err } @@ -2135,11 +1876,11 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { // concurrently. Note that while we can perform this initialization // concurrently, all of the initialization must be performed before we start // listening for Raft messages and starting the process Raft loop. - engRepls, err := loadAndReconcileReplicas(ctx, s.engine) + engRepls, err := kvstorage.LoadAndReconcileReplicas(ctx, s.engine) if err != nil { return err } - for fullID, desc := range engRepls.initialized { + for fullID, desc := range engRepls.Initialized { rep, err := newReplica(ctx, desc, s, fullID.ReplicaID) if err != nil { return err @@ -2953,10 +2694,10 @@ func ReadMaxHLCUpperBound(ctx context.Context, engines []storage.Engine) (int64, // cluster version, which must be present. func checkCanInitializeEngine(ctx context.Context, eng storage.Engine) error { // See if this is an already-bootstrapped store. - ident, err := ReadStoreIdent(ctx, eng) + ident, err := kvstorage.ReadStoreIdent(ctx, eng) if err == nil { return errors.Errorf("engine already initialized as %s", ident.String()) - } else if !errors.HasType(err, (*NotBootstrappedError)(nil)) { + } else if !errors.HasType(err, (*kvstorage.NotBootstrappedError)(nil)) { return errors.Wrap(err, "unable to read store ident") } // Engine is not bootstrapped yet (i.e. no StoreIdent). Does it contain a diff --git a/pkg/kv/kvserver/store_init.go b/pkg/kv/kvserver/store_init.go index 8bf9f3e9519e..b1665c4d198e 100644 --- a/pkg/kv/kvserver/store_init.go +++ b/pkg/kv/kvserver/store_init.go @@ -14,6 +14,7 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" @@ -39,11 +40,11 @@ const FirstStoreID = roachpb.StoreID(1) // already have been persisted to it. Returns an error if this is not // the case. func InitEngine(ctx context.Context, eng storage.Engine, ident roachpb.StoreIdent) error { - exIdent, err := ReadStoreIdent(ctx, eng) + exIdent, err := kvstorage.ReadStoreIdent(ctx, eng) if err == nil { return errors.Errorf("engine %s is already initialized with ident %s", eng, exIdent.String()) } - if !errors.HasType(err, (*NotBootstrappedError)(nil)) { + if !errors.HasType(err, (*kvstorage.NotBootstrappedError)(nil)) { return err } diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index 55a74f41e6fa..31b034ace867 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -37,6 +37,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" @@ -417,7 +418,7 @@ func TestIterateIDPrefixKeys(t *testing.T) { return nil } - if err := IterateIDPrefixKeys(ctx, eng, keys.RangeTombstoneKey, &tombstone, handleTombstone); err != nil { + if err := kvstorage.IterateIDPrefixKeys(ctx, eng, keys.RangeTombstoneKey, &tombstone, handleTombstone); err != nil { t.Fatal(err) } placeholder := seenT{ @@ -452,7 +453,7 @@ func TestStoreInitAndBootstrap(t *testing.T) { store := createTestStoreWithConfig(ctx, t, stopper, testStoreOpts{}, &cfg) defer stopper.Stop(ctx) - if _, err := ReadStoreIdent(ctx, store.Engine()); err != nil { + if _, err := kvstorage.ReadStoreIdent(ctx, store.Engine()); err != nil { t.Fatalf("unable to read store ident: %+v", err) } @@ -499,7 +500,7 @@ func TestInitializeEngineErrors(t *testing.T) { // Can't init as haven't bootstrapped. err = store.Start(ctx, stopper) - require.ErrorIs(t, err, &NotBootstrappedError{}) + require.ErrorIs(t, err, &kvstorage.NotBootstrappedError{}) // Bootstrap should fail on non-empty engine. err = InitEngine(ctx, eng, testIdent) diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 17984d9d6860..303194b34704 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -106,6 +106,7 @@ go_library( "//pkg/kv/kvserver/kvadmission", "//pkg/kv/kvserver/kvserverbase", "//pkg/kv/kvserver/kvserverpb", + "//pkg/kv/kvserver/kvstorage", "//pkg/kv/kvserver/liveness", "//pkg/kv/kvserver/liveness/livenesspb", "//pkg/kv/kvserver/loqrecovery", diff --git a/pkg/server/debug/BUILD.bazel b/pkg/server/debug/BUILD.bazel index b3cedee5afef..023bbfb7100b 100644 --- a/pkg/server/debug/BUILD.bazel +++ b/pkg/server/debug/BUILD.bazel @@ -16,6 +16,7 @@ go_library( "//pkg/base", "//pkg/kv/kvserver", "//pkg/kv/kvserver/closedts/sidetransport", + "//pkg/kv/kvserver/kvstorage", "//pkg/roachpb", "//pkg/server/debug/goroutineui", "//pkg/server/debug/pprofui", diff --git a/pkg/server/debug/server.go b/pkg/server/debug/server.go index 4eb3825e3949..35368285661f 100644 --- a/pkg/server/debug/server.go +++ b/pkg/server/debug/server.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/sidetransport" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server/debug/goroutineui" "github.com/cockroachdb/cockroach/pkg/server/debug/pprofui" @@ -198,7 +199,7 @@ func (ds *Server) RegisterEngines(specs []base.StoreSpec, engines []storage.Engi storeIDs := make([]roachpb.StoreIdent, len(engines)) for i := range engines { - id, err := kvserver.ReadStoreIdent(context.Background(), engines[i]) + id, err := kvstorage.ReadStoreIdent(context.Background(), engines[i]) if err != nil { return err } diff --git a/pkg/server/init.go b/pkg/server/init.go index 579366e9c71d..2b345c22b92a 100644 --- a/pkg/server/init.go +++ b/pkg/server/init.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/config/zonepb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/server/serverpb" @@ -677,8 +678,8 @@ func inspectEngines( } } - storeIdent, err := kvserver.ReadStoreIdent(ctx, eng) - if errors.HasType(err, (*kvserver.NotBootstrappedError)(nil)) { + storeIdent, err := kvstorage.ReadStoreIdent(ctx, eng) + if errors.HasType(err, (*kvstorage.NotBootstrappedError)(nil)) { uninitializedEngines = append(uninitializedEngines, eng) continue } else if err != nil { diff --git a/pkg/server/node.go b/pkg/server/node.go index 216aef3240e6..69b0defe0c9a 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -32,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvtenant" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage" "github.com/cockroachdb/cockroach/pkg/multitenant" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" @@ -845,7 +846,7 @@ func (dsm *diskStatsMap) initDiskStatsMap(specs []base.StoreSpec, engines []stor diskNameToStoreID: make(map[string]roachpb.StoreID), } for i := range engines { - id, err := kvserver.ReadStoreIdent(context.Background(), engines[i]) + id, err := kvstorage.ReadStoreIdent(context.Background(), engines[i]) if err != nil { return err } diff --git a/pkg/server/node_tombstone_storage.go b/pkg/server/node_tombstone_storage.go index e009366f31a6..edc966ad2dee 100644 --- a/pkg/server/node_tombstone_storage.go +++ b/pkg/server/node_tombstone_storage.go @@ -15,7 +15,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/keys" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -124,8 +124,8 @@ func (s *nodeTombstoneStorage) SetDecommissioned( // // One initialized engine is always available when this method // is called, so we're still persisting on at least one engine. - if _, err := kvserver.ReadStoreIdent(ctx, eng); err != nil { - if errors.Is(err, &kvserver.NotBootstrappedError{}) { + if _, err := kvstorage.ReadStoreIdent(ctx, eng); err != nil { + if errors.Is(err, &kvstorage.NotBootstrappedError{}) { continue } return err