Skip to content

Commit

Permalink
kvserver,kvstorage: move read/write methods for cluster versions
Browse files Browse the repository at this point in the history
Same rationale as cockroachdb#95432 - they belong in `kvstorage` and I need
them there now as I'm working on a datadriven test.

Touches cockroachdb#93310.

Epic: CRDB-220
Release note: None
  • Loading branch information
tbg committed Feb 2, 2023
1 parent 61d7296 commit 78abe43
Show file tree
Hide file tree
Showing 17 changed files with 327 additions and 257 deletions.
2 changes: 2 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ ALL_TESTS = [
"//pkg/kv/kvserver/gc:gc_test",
"//pkg/kv/kvserver/idalloc:idalloc_test",
"//pkg/kv/kvserver/intentresolver:intentresolver_test",
"//pkg/kv/kvserver/kvstorage:kvstorage_test",
"//pkg/kv/kvserver/liveness:liveness_test",
"//pkg/kv/kvserver/logstore:logstore_test",
"//pkg/kv/kvserver/loqrecovery/loqrecoverypb:loqrecoverypb_test",
Expand Down Expand Up @@ -1219,6 +1220,7 @@ GO_TARGETS = [
"//pkg/kv/kvserver/kvserverbase:kvserverbase",
"//pkg/kv/kvserver/kvserverpb:kvserverpb",
"//pkg/kv/kvserver/kvstorage:kvstorage",
"//pkg/kv/kvserver/kvstorage:kvstorage_test",
"//pkg/kv/kvserver/liveness/livenesspb:livenesspb",
"//pkg/kv/kvserver/liveness:liveness",
"//pkg/kv/kvserver/liveness:liveness_test",
Expand Down
19 changes: 18 additions & 1 deletion pkg/kv/kvserver/kvstorage/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data")
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "kvstorage",
srcs = [
"cluster_version.go",
"doc.go",
"init.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage",
visibility = ["//visibility:public"],
deps = [
"//pkg/clusterversion",
"//pkg/keys",
"//pkg/kv/kvserver/logstore",
"//pkg/roachpb",
Expand All @@ -23,4 +25,19 @@ go_library(
],
)

go_test(
name = "kvstorage_test",
srcs = ["cluster_version_test.go"],
args = ["-test.timeout=295s"],
embed = [":kvstorage"],
deps = [
"//pkg/clusterversion",
"//pkg/roachpb",
"//pkg/storage",
"//pkg/testutils",
"//pkg/util/leaktest",
"//pkg/util/log",
],
)

get_x_data(name = "get_x_data")
188 changes: 188 additions & 0 deletions pkg/kv/kvserver/kvstorage/cluster_version.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvstorage

import (
"context"
"fmt"
"math"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)

// WriteClusterVersion writes the given cluster version to the store-local
// cluster version key. We only accept a raw engine to ensure we're persisting
// the write durably.
func WriteClusterVersion(
ctx context.Context, eng storage.Engine, cv clusterversion.ClusterVersion,
) error {
err := storage.MVCCPutProto(
ctx,
eng,
nil,
keys.StoreClusterVersionKey(),
hlc.Timestamp{},
hlc.ClockTimestamp{},
nil,
&cv,
)
if err != nil {
return err
}

// The storage engine sometimes must make backwards incompatible
// changes. However, the store cluster version key is a key stored
// within the storage engine, so it's unavailable when the store is
// opened.
//
// The storage engine maintains its own minimum version on disk that
// it may consult it before opening the Engine. This version is
// stored in a separate file on the filesystem. For now, write to
// this file in combination with the store cluster version key.
//
// This parallel version state is a bit of a wart and an eventual
// goal is to replace the store cluster version key with the storage
// engine's flat file. This requires that there are no writes to the
// engine until either bootstrapping or joining an existing cluster.
// Writing the version to this file would happen before opening the
// engine for completing the rest of bootstrapping/joining the
// cluster.
return eng.SetMinVersion(cv.Version)
}

// ReadClusterVersion reads the cluster version from the store-local version
// key. Returns an empty version if the key is not found.
func ReadClusterVersion(
ctx context.Context, reader storage.Reader,
) (clusterversion.ClusterVersion, error) {
var cv clusterversion.ClusterVersion
_, err := storage.MVCCGetProto(ctx, reader, keys.StoreClusterVersionKey(), hlc.Timestamp{},
&cv, storage.MVCCGetOptions{})
return cv, err
}

// WriteClusterVersionToEngines writes the given version to the given engines,
// Returns nil on success; otherwise returns first error encountered writing to
// the stores. It makes no attempt to validate the supplied version.
//
// At the time of writing this is used during bootstrap, initial server start
// (to perhaps fill into additional stores), and during cluster version bumps.
func WriteClusterVersionToEngines(
ctx context.Context, engines []storage.Engine, cv clusterversion.ClusterVersion,
) error {
for _, eng := range engines {
if err := WriteClusterVersion(ctx, eng, cv); err != nil {
return errors.Wrapf(err, "error writing version to engine %s", eng)
}
}
return nil
}

// SynthesizeClusterVersionFromEngines returns the cluster version that was read
// from the engines or, if none are initialized, binaryMinSupportedVersion.
// Typically all initialized engines will have the same version persisted,
// though ill-timed crashes can result in situations where this is not the
// case. Then, the largest version seen is returned.
//
// binaryVersion is the version of this binary. An error is returned if
// any engine has a higher version, as this would indicate that this node
// has previously acked the higher cluster version but is now running an
// old binary, which is unsafe.
//
// binaryMinSupportedVersion is the minimum version supported by this binary. An
// error is returned if any engine has a version lower that this.
func SynthesizeClusterVersionFromEngines(
ctx context.Context,
engines []storage.Engine,
binaryVersion, binaryMinSupportedVersion roachpb.Version,
) (clusterversion.ClusterVersion, error) {
// Find the most recent bootstrap info.
type originVersion struct {
roachpb.Version
origin string
}

maxPossibleVersion := roachpb.Version{Major: math.MaxInt32} // Sort above any real version.
minStoreVersion := originVersion{
Version: maxPossibleVersion,
origin: "(no store)",
}

// We run this twice because it's only after having seen all the versions
// that we can decide whether the node catches a version error. However, we
// also want to name at least one engine that violates the version
// constraints, which at the latest the second loop will achieve (because
// then minStoreVersion don't change any more).
for _, eng := range engines {
eng := eng.(storage.Reader) // we're read only
var cv clusterversion.ClusterVersion
cv, err := ReadClusterVersion(ctx, eng)
if err != nil {
return clusterversion.ClusterVersion{}, err
}
if cv.Version == (roachpb.Version{}) {
// This is needed when a node first joins an existing cluster, in
// which case it won't know what version to use until the first
// Gossip update comes in.
cv.Version = binaryMinSupportedVersion
}

// Avoid running a binary with a store that is too new. For example,
// restarting into 1.1 after having upgraded to 1.2 doesn't work.
if binaryVersion.Less(cv.Version) {
return clusterversion.ClusterVersion{}, errors.Errorf(
"cockroach version v%s is incompatible with data in store %s; use version v%s or later",
binaryVersion, eng, cv.Version)
}

// Track smallest use version encountered.
if cv.Version.Less(minStoreVersion.Version) {
minStoreVersion.Version = cv.Version
minStoreVersion.origin = fmt.Sprint(eng)
}
}

// If no use version was found, fall back to our binaryMinSupportedVersion. This
// is the case when a brand new node is joining an existing cluster (which
// may be on any older version this binary supports).
if minStoreVersion.Version == maxPossibleVersion {
minStoreVersion.Version = binaryMinSupportedVersion
}

cv := clusterversion.ClusterVersion{
Version: minStoreVersion.Version,
}
log.Eventf(ctx, "read clusterVersion %+v", cv)

// Avoid running a binary too new for this store. This is what you'd catch
// if, say, you restarted directly from 1.0 into 1.2 (bumping the min
// version) without going through 1.1 first. It would also be what you catch if
// you are starting 1.1 for the first time (after 1.0), but it crashes
// half-way through the startup sequence (so now some stores have 1.1, but
// some 1.0), in which case you are expected to run 1.1 again (hopefully
// without the crash this time) which would then rewrite all the stores.
//
// We only verify this now because as we iterate through the stores, we
// may not yet have picked up the final versions we're actually planning
// to use.
if minStoreVersion.Version.Less(binaryMinSupportedVersion) {
return clusterversion.ClusterVersion{}, errors.Errorf("store %s, last used with cockroach version v%s, "+
"is too old for running version v%s (which requires data from v%s or later)",
minStoreVersion.origin, minStoreVersion.Version, binaryVersion, binaryMinSupportedVersion)
}
return cv, nil
}
84 changes: 84 additions & 0 deletions pkg/kv/kvserver/kvstorage/cluster_version_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvstorage

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
)

// TestStoresClusterVersionIncompatible verifies an error occurs when
// setting up the cluster version from stores that are incompatible with the
// running binary.
func TestStoresClusterVersionIncompatible(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

vOneDashOne := roachpb.Version{Major: 1, Internal: 1}
vOne := roachpb.Version{Major: 1}

type testCase struct {
binV, minV roachpb.Version // binary version and min supported version
engV roachpb.Version // version found on engine in test
expErr string
}
for name, tc := range map[string]testCase{
"StoreTooNew": {
// This is what the node is running.
binV: vOneDashOne,
// This is what the running node requires from its stores.
minV: vOne,
// Version is way too high for this node.
engV: roachpb.Version{Major: 9},
expErr: `cockroach version v1\.0-1 is incompatible with data in store <no-attributes>=<in-mem>; use version v9\.0 or later`,
},
"StoreTooOldVersion": {
// This is what the node is running.
binV: roachpb.Version{Major: 9},
// This is what the running node requires from its stores.
minV: roachpb.Version{Major: 5},
// Version is way too low.
engV: roachpb.Version{Major: 4},
expErr: `store <no-attributes>=<in-mem>, last used with cockroach version v4\.0, is too old for running version v9\.0 \(which requires data from v5\.0 or later\)`,
},
"StoreTooOldMinVersion": {
// Like the previous test case, but this time cv.MinimumVersion is the culprit.
binV: roachpb.Version{Major: 9},
minV: roachpb.Version{Major: 5},
engV: roachpb.Version{Major: 4},
expErr: `store <no-attributes>=<in-mem>, last used with cockroach version v4\.0, is too old for running version v9\.0 \(which requires data from v5\.0 or later\)`,
},
} {
t.Run(name, func(t *testing.T) {
engs := []storage.Engine{storage.NewDefaultInMemForTesting()}
defer engs[0].Close()
// Configure versions and write.
cv := clusterversion.ClusterVersion{Version: tc.engV}
if err := WriteClusterVersionToEngines(ctx, engs, cv); err != nil {
t.Fatal(err)
}
if cv, err := SynthesizeClusterVersionFromEngines(
ctx, engs, tc.binV, tc.minV,
); !testutils.IsError(err, tc.expErr) {
t.Fatalf("unexpected error: %+v, got version %v", err, cv)
}
})
}
}
51 changes: 0 additions & 51 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3440,57 +3440,6 @@ func (s *storeForTruncatorImpl) getEngine() storage.Engine {
return (*Store)(s).engine
}

// WriteClusterVersion writes the given cluster version to the store-local
// cluster version key. We only accept a raw engine to ensure we're persisting
// the write durably.
func WriteClusterVersion(
ctx context.Context, eng storage.Engine, cv clusterversion.ClusterVersion,
) error {
err := storage.MVCCPutProto(
ctx,
eng,
nil,
keys.StoreClusterVersionKey(),
hlc.Timestamp{},
hlc.ClockTimestamp{},
nil,
&cv,
)
if err != nil {
return err
}

// The storage engine sometimes must make backwards incompatible
// changes. However, the store cluster version key is a key stored
// within the storage engine, so it's unavailable when the store is
// opened.
//
// The storage engine maintains its own minimum version on disk that
// it may consult it before opening the Engine. This version is
// stored in a separate file on the filesystem. For now, write to
// this file in combination with the store cluster version key.
//
// This parallel version state is a bit of a wart and an eventual
// goal is to replace the store cluster version key with the storage
// engine's flat file. This requires that there are no writes to the
// engine until either bootstrapping or joining an existing cluster.
// Writing the version to this file would happen before opening the
// engine for completing the rest of bootstrapping/joining the
// cluster.
return eng.SetMinVersion(cv.Version)
}

// ReadClusterVersion reads the cluster version from the store-local version
// key. Returns an empty version if the key is not found.
func ReadClusterVersion(
ctx context.Context, reader storage.Reader,
) (clusterversion.ClusterVersion, error) {
var cv clusterversion.ClusterVersion
_, err := storage.MVCCGetProto(ctx, reader, keys.StoreClusterVersionKey(), hlc.Timestamp{},
&cv, storage.MVCCGetOptions{})
return cv, err
}

func init() {
tracing.RegisterTagRemapping("s", "store")
}
Expand Down
Loading

0 comments on commit 78abe43

Please sign in to comment.