Skip to content

Commit

Permalink
storage: deprecate cluster version key
Browse files Browse the repository at this point in the history
This change deprecates the cluster version key. We have the version
outside the store (in the min version file) so we no longer need to
store it inside a KV.

To keep backward compatibility with 22.2, we still write the key but
we never read it. Once we stop supporting 22.2, we can stop writing it
altogether.

This required updating the `TestClusterVersionWriteSynthesize` to use
versions that work with the binary. We also clean up the test and move
it to `kvstorage` where it belongs.

Release note: None
Epic: none
  • Loading branch information
RaduBerinde committed Feb 16, 2023
1 parent 99bcda7 commit 5b3ebf0
Show file tree
Hide file tree
Showing 16 changed files with 197 additions and 231 deletions.
16 changes: 8 additions & 8 deletions pkg/keys/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,14 +211,14 @@ var _ = [...]interface{}{
// 4. Store local keys: These contain metadata about an individual store.
// They are unreplicated and unaddressable. The typical example is the
// store 'ident' record. They all share `localStorePrefix`.
StoreClusterVersionKey, // "cver"
StoreGossipKey, // "goss"
StoreHLCUpperBoundKey, // "hlcu"
StoreIdentKey, // "iden"
StoreUnsafeReplicaRecoveryKey, // "loqr"
StoreNodeTombstoneKey, // "ntmb"
StoreCachedSettingsKey, // "stng"
StoreLastUpKey, // "uptm"
DeprecatedStoreClusterVersionKey, // "cver"
StoreGossipKey, // "goss"
StoreHLCUpperBoundKey, // "hlcu"
StoreIdentKey, // "iden"
StoreUnsafeReplicaRecoveryKey, // "loqr"
StoreNodeTombstoneKey, // "ntmb"
StoreCachedSettingsKey, // "stng"
StoreLastUpKey, // "uptm"

// 5. Range lock keys for all replicated locks. All range locks share
// LocalRangeLockTablePrefix. Locks can be acquired on global keys and on
Expand Down
7 changes: 5 additions & 2 deletions pkg/keys/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,11 @@ func StoreGossipKey() roachpb.Key {
return MakeStoreKey(localStoreGossipSuffix, nil)
}

// StoreClusterVersionKey returns a store-local key for the cluster version.
func StoreClusterVersionKey() roachpb.Key {
// DeprecatedStoreClusterVersionKey returns a store-local key for the cluster version.
//
// We no longer use this key, but still write it out for interoperability with
// older versions.
func DeprecatedStoreClusterVersionKey() roachpb.Key {
return MakeStoreKey(localStoreClusterVersionSuffix, nil)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/keys/keys_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestStoreKeyEncodeDecode(t *testing.T) {
}{
{key: StoreIdentKey(), expSuffix: localStoreIdentSuffix, expDetail: nil},
{key: StoreGossipKey(), expSuffix: localStoreGossipSuffix, expDetail: nil},
{key: StoreClusterVersionKey(), expSuffix: localStoreClusterVersionSuffix, expDetail: nil},
{key: DeprecatedStoreClusterVersionKey(), expSuffix: localStoreClusterVersionSuffix, expDetail: nil},
{key: StoreLastUpKey(), expSuffix: localStoreLastUpSuffix, expDetail: nil},
{key: StoreHLCUpperBoundKey(), expSuffix: localStoreHLCUpperBoundSuffix, expDetail: nil},
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/keys/printer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func TestPrettyPrint(t *testing.T) {
// local
{keys.StoreIdentKey(), "/Local/Store/storeIdent", revertSupportUnknown},
{keys.StoreGossipKey(), "/Local/Store/gossipBootstrap", revertSupportUnknown},
{keys.StoreClusterVersionKey(), "/Local/Store/clusterVersion", revertSupportUnknown},
{keys.DeprecatedStoreClusterVersionKey(), "/Local/Store/clusterVersion", revertSupportUnknown},
{keys.StoreNodeTombstoneKey(123), "/Local/Store/nodeTombstone/n123", revertSupportUnknown},
{keys.StoreCachedSettingsKey(roachpb.Key("a")), `/Local/Store/cachedSettings/"a"`, revertSupportUnknown},
{keys.StoreUnsafeReplicaRecoveryKey(loqRecoveryID), fmt.Sprintf(`/Local/Store/lossOfQuorumRecovery/applied/%s`, loqRecoveryID), revertSupportUnknown},
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/kvstorage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,14 @@ go_test(
"//pkg/keys",
"//pkg/kv/kvserver/logstore",
"//pkg/roachpb",
"//pkg/settings/cluster",
"//pkg/storage",
"//pkg/testutils",
"//pkg/testutils/datapathutils",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/stop",
"//pkg/util/tracing",
"//pkg/util/uuid",
"@com_github_cockroachdb_datadriven//:datadriven",
Expand Down
63 changes: 15 additions & 48 deletions pkg/kv/kvserver/kvstorage/cluster_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,21 @@ import (
"github.com/cockroachdb/errors"
)

// WriteClusterVersion writes the given cluster version to the store-local
// cluster version key. We only accept a raw engine to ensure we're persisting
// the write durably.
// WriteClusterVersion writes the given cluster version to the min version file
// and to the store-local cluster version key. We only accept a raw engine to
// ensure we're persisting the write durably.
func WriteClusterVersion(
ctx context.Context, eng storage.Engine, cv clusterversion.ClusterVersion,
) error {
// We no longer read this key, but v22.2 still does. We continue writing the key
// for interoperability.
// TODO(radu): Remove this when we are no longer compatible with 22.2 (keeping
// just the SetMinVersion call).
err := storage.MVCCPutProto(
ctx,
eng,
nil,
keys.StoreClusterVersionKey(),
keys.DeprecatedStoreClusterVersionKey(),
hlc.Timestamp{},
hlc.ClockTimestamp{},
nil,
Expand All @@ -43,38 +47,9 @@ func WriteClusterVersion(
if err != nil {
return err
}

// The storage engine sometimes must make backwards incompatible
// changes. However, the store cluster version key is a key stored
// within the storage engine, so it's unavailable when the store is
// opened.
//
// The storage engine maintains its own minimum version on disk that
// it may consult it before opening the Engine. This version is
// stored in a separate file on the filesystem. For now, write to
// this file in combination with the store cluster version key.
//
// This parallel version state is a bit of a wart and an eventual
// goal is to replace the store cluster version key with the storage
// engine's flat file. This requires that there are no writes to the
// engine until either bootstrapping or joining an existing cluster.
// Writing the version to this file would happen before opening the
// engine for completing the rest of bootstrapping/joining the
// cluster.
return eng.SetMinVersion(cv.Version)
}

// ReadClusterVersion reads the cluster version from the store-local version
// key. Returns an empty version if the key is not found.
func ReadClusterVersion(
ctx context.Context, reader storage.Reader,
) (clusterversion.ClusterVersion, error) {
var cv clusterversion.ClusterVersion
_, err := storage.MVCCGetProto(ctx, reader, keys.StoreClusterVersionKey(), hlc.Timestamp{},
&cv, storage.MVCCGetOptions{})
return cv, err
}

// WriteClusterVersionToEngines writes the given version to the given engines,
// Returns nil on success; otherwise returns first error encountered writing to
// the stores. It makes no attempt to validate the supplied version.
Expand Down Expand Up @@ -128,30 +103,22 @@ func SynthesizeClusterVersionFromEngines(
// constraints, which at the latest the second loop will achieve (because
// then minStoreVersion don't change any more).
for _, eng := range engines {
eng := eng.(storage.Reader) // we're read only
var cv clusterversion.ClusterVersion
cv, err := ReadClusterVersion(ctx, eng)
if err != nil {
return clusterversion.ClusterVersion{}, err
}
if cv.Version == (roachpb.Version{}) {
// This is needed when a node first joins an existing cluster, in
// which case it won't know what version to use until the first
// Gossip update comes in.
cv.Version = binaryMinSupportedVersion
engVer := eng.MinVersion()
if engVer == (roachpb.Version{}) {
return clusterversion.ClusterVersion{}, errors.AssertionFailedf("store %s has no version", eng)
}

// Avoid running a binary with a store that is too new. For example,
// restarting into 1.1 after having upgraded to 1.2 doesn't work.
if binaryVersion.Less(cv.Version) {
if binaryVersion.Less(engVer) {
return clusterversion.ClusterVersion{}, errors.Errorf(
"cockroach version v%s is incompatible with data in store %s; use version v%s or later",
binaryVersion, eng, cv.Version)
binaryVersion, eng, engVer)
}

// Track smallest use version encountered.
if cv.Version.Less(minStoreVersion.Version) {
minStoreVersion.Version = cv.Version
if engVer.Less(minStoreVersion.Version) {
minStoreVersion.Version = engVer
minStoreVersion.origin = fmt.Sprint(eng)
}
}
Expand Down
135 changes: 135 additions & 0 deletions pkg/kv/kvserver/kvstorage/cluster_version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@ package kvstorage

import (
"context"
"reflect"
"testing"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
)

// TestStoresClusterVersionIncompatible verifies an error occurs when
Expand Down Expand Up @@ -80,3 +83,135 @@ func TestStoresClusterVersionIncompatible(t *testing.T) {
})
}
}

// TestStoresClusterVersionWriteSynthesize verifies that the cluster version is
// written to all stores and that missing versions are filled in appropriately.
func TestClusterVersionWriteSynthesize(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

// We can't hardcode versions here because the can become too old to create
// stores with. create a store.
// For example's sake, let's assume that minV is 1.0. Then binV is 1.1 and
// development versions are 1.0-1 and 1.0-2.
minV := clusterversion.TestingBinaryMinSupportedVersion
binV := minV
binV.Minor += 1

// Development versions.
versionA := minV
versionA.Internal = 1

versionB := minV
versionB.Internal = 2

engines := make([]storage.Engine, 3)
for i := range engines {
// Create the stores without an initialized cluster version.
st := cluster.MakeTestingClusterSettingsWithVersions(binV, minV, false /* initializeVersion */)
eng, err := storage.Open(
ctx, storage.InMemory(), st,
storage.ForTesting, storage.MaxSize(1<<20),
)
if err != nil {
t.Fatal(err)
}
stopper.AddCloser(eng)
engines[i] = eng
}
e0 := engines[:1]
e01 := engines[:2]
e2 := engines[2:3]
e012 := engines[:3]

// If there are no stores, default to minV.
if initialCV, err := SynthesizeClusterVersionFromEngines(ctx, nil, binV, minV); err != nil {
t.Fatal(err)
} else {
expCV := clusterversion.ClusterVersion{
Version: minV,
}
if !reflect.DeepEqual(initialCV, expCV) {
t.Fatalf("expected %+v; got %+v", expCV, initialCV)
}
}

// Verify that the initial read of an empty store synthesizes minV. This
// is the code path that runs after starting the binV binary for the first
// time after the rolling upgrade from minV.
if initialCV, err := SynthesizeClusterVersionFromEngines(ctx, e0, binV, minV); err != nil {
t.Fatal(err)
} else {
expCV := clusterversion.ClusterVersion{
Version: minV,
}
if !reflect.DeepEqual(initialCV, expCV) {
t.Fatalf("expected %+v; got %+v", expCV, initialCV)
}
}

// Bump a version to something more modern (but supported by this binary).
// Note that there's still only one store.
{
cv := clusterversion.ClusterVersion{
Version: versionB,
}
if err := WriteClusterVersionToEngines(ctx, e0, cv); err != nil {
t.Fatal(err)
}

// Verify the same thing comes back on read.
if newCV, err := SynthesizeClusterVersionFromEngines(ctx, e0, binV, minV); err != nil {
t.Fatal(err)
} else {
expCV := cv
if !reflect.DeepEqual(newCV, cv) {
t.Fatalf("expected %+v; got %+v", expCV, newCV)
}
}
}

// Use stores 0 and 1. It reads as minV because store 1 has no entry, lowering
// the use version to minV.
{
expCV := clusterversion.ClusterVersion{
Version: minV,
}
if cv, err := SynthesizeClusterVersionFromEngines(ctx, e01, binV, minV); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(cv, expCV) {
t.Fatalf("expected %+v, got %+v", expCV, cv)
}

// Write an updated Version to both stores.
cv := clusterversion.ClusterVersion{
Version: versionB,
}
if err := WriteClusterVersionToEngines(ctx, e01, cv); err != nil {
t.Fatal(err)
}
}

// Third node comes along, for now it's alone. It has a lower use version.
cv := clusterversion.ClusterVersion{
Version: versionA,
}

if err := WriteClusterVersionToEngines(ctx, e2, cv); err != nil {
t.Fatal(err)
}

// Reading across all stores, we expect to pick up the lowest useVersion both
// from the third store.
expCV := clusterversion.ClusterVersion{
Version: versionA,
}
if cv, err := SynthesizeClusterVersionFromEngines(ctx, e012, binV, minV); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(cv, expCV) {
t.Fatalf("expected %+v, got %+v", expCV, cv)
}
}
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/kvstorage/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func checkCanInitializeEngine(ctx context.Context, eng storage.Engine) error {
if k, err = getMVCCKey(); err != nil {
return err
}
if !k.Key.Equal(keys.StoreClusterVersionKey()) {
if !k.Key.Equal(keys.DeprecatedStoreClusterVersionKey()) {
return errors.New("no cluster version found on uninitialized engine")
}
valid, err = iter.NextEngineKey()
Expand Down
11 changes: 0 additions & 11 deletions pkg/kv/kvserver/stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,14 +300,3 @@ func (ls *Stores) updateBootstrapInfoLocked(bi *gossip.BootstrapInfo) error {
})
return err
}

func (ls *Stores) engines() []storage.Engine {
var engines []storage.Engine
ls.storeMap.Range(func(_ int64, v unsafe.Pointer) bool {
// TODO(sep-raft-log): at time of writing the only caller to this is
// TestClusterVersionWriteSynthesize.
engines = append(engines, (*Store)(v).TODOEngine())
return true // want more
})
return engines
}
Loading

0 comments on commit 5b3ebf0

Please sign in to comment.