From 66db578dddd425fba38a2de472e4511ceb496852 Mon Sep 17 00:00:00 2001 From: Nick Travers Date: Thu, 3 Feb 2022 14:13:05 -0800 Subject: [PATCH] pkg/migration: support waiting for minimum engine version As part of the MVCC Bulk Operations project, [Pebble range keys][1] will need to be enabled on _all_ engines of a cluster before nodes can start using the feature to read and write SSTables that contain the range key features (a backward-incompatible change). Adding a cluster version is necessary, but not sufficient in guaranteeing that nodes are ready to generate, but importantly _receive_ SSTabbles with range key support. Specifically, there exists a race condition where nodes are told to update their engines as part of the existing Pebble major format update process, but there is no coordination between the nodes. One node (a sender) may complete its engine upgrade and enable the new SSTable features _before_ another node (the receiver). The latter will panic on receipt of an SSTable with the newer features written by the former. Add an server RPC endpoint that provides a means of waiting on a node to update its store to a version that is compatible with a cluster version. This endpoint is used as part of a system migration to ensure that all nodes in a cluster are running with an engine version that is at least compatible with a given cluster version. Expose the table format major version on `storage.Engine`. This will be used elsewhere in Cockroach (for example, SSTable generation for ingest and backup). Add a `WaitForCompatibleEngineVersion` function on the `storage.Engine` interface that provides a mechanism to block until an engine is running at a format major version that compatible with a given cluster version. Expose the engine format major version as a `storage.TestingKnob` to allow tests to alter the Pebble format major version. Add a new cluster version to coordinate the upgrade of all engines in a cluster to `pebble.FormatBlockPropertyCollector` (Pebble,v1), and the system migration required for coordinating the engine upgrade to the latest Pebble table format version. This patch also fixes an existing issue where a node may write SSTables with block properties as part of a backup that are then ingested by an older node. This patch provides the infrastructure necessary for making these "cluster-external" SSTable operations engine-aware. Nodes should only use a table format version that other nodes in the cluster understand. Informs cockroachdb/pebble#1339. [1]: cockroachdb/pebble#1339 Release note: None --- .../settings/settings-for-tenants.txt | 2 +- docs/generated/settings/settings.html | 2 +- pkg/clusterversion/cockroach_versions.go | 12 +- pkg/clusterversion/key_string.go | 5 +- pkg/migration/migrations/BUILD.bazel | 4 + .../migrations/ensure_engine_version.go | 38 ++++ .../migrations/ensure_engine_version_test.go | 164 ++++++++++++++++++ pkg/migration/migrations/migrations.go | 5 + pkg/server/config.go | 5 + pkg/server/migration.go | 31 ++++ pkg/server/serverpb/migration.proto | 14 ++ pkg/storage/BUILD.bazel | 1 + pkg/storage/engine.go | 7 + pkg/storage/min_version_test.go | 80 ++++++++- pkg/storage/pebble.go | 64 ++++--- pkg/storage/testing_knobs.go | 6 +- 16 files changed, 407 insertions(+), 33 deletions(-) create mode 100644 pkg/migration/migrations/ensure_engine_version.go create mode 100644 pkg/migration/migrations/ensure_engine_version_test.go diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 985356d20568..c29d8f4d6064 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -178,4 +178,4 @@ trace.debug.enable boolean false if set, traces for recent requests can be seen trace.jaeger.agent string the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as :. If no port is specified, 6381 will be used. trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.zipkin.collector string the address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -version version 21.2-54 set the active cluster version in the format '.' +version version 21.2-56 set the active cluster version in the format '.' diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index e8b71cb5db45..6375fd120945 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -186,6 +186,6 @@ trace.jaeger.agentstringthe address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as :. If no port is specified, 6381 will be used. trace.opentelemetry.collectorstringaddress of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.zipkin.collectorstringthe address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -versionversion21.2-54set the active cluster version in the format '.' +versionversion21.2-56set the active cluster version in the format '.' diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index 6f6ef3acb0ea..900a924a68b5 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -257,6 +257,13 @@ const ( // RemoveIncompatibleDatabasePrivileges adds the migration which guarantees that // databases do not have incompatible privileges RemoveIncompatibleDatabasePrivileges + // PebbleFormatVersionBlockProperties enables a new Pebble SSTable format + // version for block property collectors. + // NB: a cluster version (PebbleFormatBlockPropertyCollector) was previously + // introduced for this change, however, it enabled the feature on an + // incompatible SSTable format version. This newer cluster version supersedes + // it. + PebbleFormatVersionBlockProperties // ************************************************* // Step (1): Add new versions here. // Do not add new versions to a patch release. @@ -400,7 +407,10 @@ var versionsSingleton = keyedVersions{ Key: RemoveIncompatibleDatabasePrivileges, Version: roachpb.Version{Major: 21, Minor: 2, Internal: 54}, }, - + { + Key: PebbleFormatVersionBlockProperties, + Version: roachpb.Version{Major: 21, Minor: 2, Internal: 56}, + }, // ************************************************* // Step (2): Add new versions here. // Do not add new versions to a patch release. diff --git a/pkg/clusterversion/key_string.go b/pkg/clusterversion/key_string.go index b0d53e03f5e8..a49b1dd7115e 100644 --- a/pkg/clusterversion/key_string.go +++ b/pkg/clusterversion/key_string.go @@ -36,11 +36,12 @@ func _() { _ = x[EnableProtectedTimestampsForTenant-25] _ = x[DeleteCommentsWithDroppedIndexes-26] _ = x[RemoveIncompatibleDatabasePrivileges-27] + _ = x[PebbleFormatVersionBlockProperties-28] } -const _Key_name = "V21_2Start22_1TargetBytesAvoidExcessAvoidDrainingNamesDrainingNamesMigrationTraceIDDoesntImplyStructuredRecordingAlterSystemTableStatisticsAddAvgSizeColAlterSystemStmtDiagReqsMVCCAddSSTableInsertPublicSchemaNamespaceEntryOnRestoreUnsplitRangesInAsyncGCJobsValidateGrantOptionPebbleFormatBlockPropertyCollectorProbeRequestSelectRPCsTakeTracingInfoInbandPreSeedTenantSpanConfigsSeedTenantSpanConfigsPublicSchemasWithDescriptorsEnsureSpanConfigReconciliationEnsureSpanConfigSubscriptionEnableSpanConfigStoreScanWholeRowsSCRAMAuthenticationUnsafeLossOfQuorumRecoveryRangeLogAlterSystemProtectedTimestampAddColumnEnableProtectedTimestampsForTenantDeleteCommentsWithDroppedIndexesRemoveIncompatibleDatabasePrivileges" +const _Key_name = "V21_2Start22_1TargetBytesAvoidExcessAvoidDrainingNamesDrainingNamesMigrationTraceIDDoesntImplyStructuredRecordingAlterSystemTableStatisticsAddAvgSizeColAlterSystemStmtDiagReqsMVCCAddSSTableInsertPublicSchemaNamespaceEntryOnRestoreUnsplitRangesInAsyncGCJobsValidateGrantOptionPebbleFormatBlockPropertyCollectorProbeRequestSelectRPCsTakeTracingInfoInbandPreSeedTenantSpanConfigsSeedTenantSpanConfigsPublicSchemasWithDescriptorsEnsureSpanConfigReconciliationEnsureSpanConfigSubscriptionEnableSpanConfigStoreScanWholeRowsSCRAMAuthenticationUnsafeLossOfQuorumRecoveryRangeLogAlterSystemProtectedTimestampAddColumnEnableProtectedTimestampsForTenantDeleteCommentsWithDroppedIndexesRemoveIncompatibleDatabasePrivilegesPebbleFormatVersionBlockProperties" -var _Key_index = [...]uint16{0, 5, 14, 36, 54, 76, 113, 152, 175, 189, 230, 256, 275, 309, 321, 352, 376, 397, 425, 455, 483, 504, 517, 536, 570, 608, 642, 674, 710} +var _Key_index = [...]uint16{0, 5, 14, 36, 54, 76, 113, 152, 175, 189, 230, 256, 275, 309, 321, 352, 376, 397, 425, 455, 483, 504, 517, 536, 570, 608, 642, 674, 710, 744} func (i Key) String() string { if i < 0 || i >= Key(len(_Key_index)-1) { diff --git a/pkg/migration/migrations/BUILD.bazel b/pkg/migration/migrations/BUILD.bazel index 0076dff675c3..8b26f9c71634 100644 --- a/pkg/migration/migrations/BUILD.bazel +++ b/pkg/migration/migrations/BUILD.bazel @@ -7,6 +7,7 @@ go_library( "alter_table_protected_timestamp_records.go", "alter_table_statistics_avg_size.go", "comment_on_index_migration.go", + "ensure_engine_version.go", "ensure_no_draining_names.go", "grant_option_migration.go", "insert_missing_public_schema_namespace_entry.go", @@ -63,6 +64,7 @@ go_test( "alter_table_statistics_avg_size_test.go", "builtins_test.go", "comment_on_index_migration_external_test.go", + "ensure_engine_version_test.go", "ensure_no_draining_names_external_test.go", "grant_option_migration_external_test.go", "helpers_test.go", @@ -98,6 +100,7 @@ go_test( "//pkg/sql/sem/tree", "//pkg/sql/sqlutil", "//pkg/sql/types", + "//pkg/storage", "//pkg/testutils", "//pkg/testutils/serverutils", "//pkg/testutils/skip", @@ -108,6 +111,7 @@ go_test( "//pkg/util/log", "//pkg/util/protoutil", "@com_github_cockroachdb_errors//:errors", + "@com_github_cockroachdb_pebble//:pebble", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", ], diff --git a/pkg/migration/migrations/ensure_engine_version.go b/pkg/migration/migrations/ensure_engine_version.go new file mode 100644 index 000000000000..52421342c66f --- /dev/null +++ b/pkg/migration/migrations/ensure_engine_version.go @@ -0,0 +1,38 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package migrations + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/migration" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" +) + +// ensureEngineVersionAtLeast waits for the engine version to be at least +// compatible with the given clusterversion.ClusterVersion, on all nodes in the +// cluster. +func ensureEngineVersionAtLeast( + ctx context.Context, v clusterversion.ClusterVersion, deps migration.SystemDeps, _ *jobs.Job, +) error { + return deps.Cluster.UntilClusterStable(ctx, func() error { + return deps.Cluster.ForEveryNode(ctx, "ensure-engine-version", + func(ctx context.Context, client serverpb.MigrationClient) error { + req := &serverpb.WaitForEngineVersionRequest{ + Version: &v.Version, + } + _, err := client.WaitForEngineVersion(ctx, req) + return err + }) + }) +} diff --git a/pkg/migration/migrations/ensure_engine_version_test.go b/pkg/migration/migrations/ensure_engine_version_test.go new file mode 100644 index 000000000000..305cff0838e7 --- /dev/null +++ b/pkg/migration/migrations/ensure_engine_version_test.go @@ -0,0 +1,164 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package migrations_test + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/pebble" + "github.com/stretchr/testify/require" +) + +// TestEnsureEngineVersion_SingleNode verifies that the migration of a single +// node waits for the node's engine version to be at least at a minimum +// version, blocking until it occurs. +func TestEnsureEngineVersion_SingleNode(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + DisableAutomaticVersionUpgrade: 1, + BinaryVersionOverride: clusterversion.ByKey( + // Start at early version of the binary. + clusterversion.Start22_1, + ), + }, + Store: &kvserver.StoreTestingKnobs{ + StorageKnobs: storage.TestingKnobs{ + // Start at early engine version. + FormatMajorVersion: pebble.FormatMostCompatible, + }, + }, + }, + }, + }) + + defer tc.Stopper().Stop(ctx) + ts := tc.Server(0) + + getFormatVersion := func() (pebble.FormatMajorVersion, error) { + e := ts.Engines()[0] + // Wait for the current version to stabilize. + currentVers := ts.ClusterSettings().Version.ActiveVersion(ctx) + if err := e.WaitForCompatibleEngineVersion(ctx, currentVers.Version); err != nil { + return 0, err + } + return e.FormatMajorVersion(), nil + } + + // We start at Pebble major format version 4 (SetWithDelete). Note that the + // server was started with an earlier engine version (v0, "most compatible"), + // on start it will ratchet up to a compatible version. + v, err := getFormatVersion() + require.NoError(t, err) + require.Equal(t, v, pebble.FormatSetWithDelete) + + // Bump the cluster version to include block properties. + tdb := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + tdb.ExecSucceedsSoon(t, + "SET CLUSTER SETTING version = $1", + clusterversion.ByKey(clusterversion.PebbleFormatVersionBlockProperties).String(), + ) + + // The store version has been ratcheted to version 5 (Block property + // collectors). + v, err = getFormatVersion() + require.NoError(t, err) + require.Equal(t, v, pebble.FormatBlockPropertyCollector) +} + +// TestEnsureEngineVersion_MultiNode is the same as the above, except that it +// runs on a cluster of three nodes. +func TestEnsureEngineVersion_MultiNode(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + const nNodes = 3 + tc := testcluster.StartTestCluster(t, nNodes, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + DisableAutomaticVersionUpgrade: 1, + BinaryVersionOverride: clusterversion.ByKey( + clusterversion.Start22_1, + ), + }, + Store: &kvserver.StoreTestingKnobs{ + StorageKnobs: storage.TestingKnobs{ + FormatMajorVersion: pebble.FormatMostCompatible, + }, + }, + }, + }, + }) + + defer tc.Stopper().Stop(ctx) + + getFormatVersions := func(tc *testcluster.TestCluster) ( + []pebble.FormatMajorVersion, error, + ) { + var versions [nNodes]pebble.FormatMajorVersion + for i := 0; i < nNodes; i++ { + ts := tc.Server(i) + e := ts.Engines()[0] + // Wait for the current version to stabilize. + currentVers := ts.ClusterSettings().Version.ActiveVersion(ctx) + if err := e.WaitForCompatibleEngineVersion(ctx, currentVers.Version); err != nil { + return nil, err + } + versions[i] = e.FormatMajorVersion() + } + return versions[:], nil + } + + all := func(v pebble.FormatMajorVersion) []pebble.FormatMajorVersion { + var vs [nNodes]pebble.FormatMajorVersion + for i := 0; i < nNodes; i++ { + vs[i] = v + } + return vs[:] + } + + // All nodes start at Pebble major format version 4 (SetWithDelete). + vs, err := getFormatVersions(tc) + require.NoError(t, err) + require.Equal(t, all(pebble.FormatSetWithDelete), vs) + + // Bump the cluster version to include block properties. + tdb := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + tdb.ExecSucceedsSoon(t, + "SET CLUSTER SETTING version = $1", + clusterversion.ByKey(clusterversion.PebbleFormatVersionBlockProperties).String(), + ) + + // The store versions have been ratcheted to version 5 (Block property + // collectors). + vs, err = getFormatVersions(tc) + require.NoError(t, err) + require.Equal(t, all(pebble.FormatBlockPropertyCollector), vs) +} diff --git a/pkg/migration/migrations/migrations.go b/pkg/migration/migrations/migrations.go index bcfc72dc2108..08a61fb20587 100644 --- a/pkg/migration/migrations/migrations.go +++ b/pkg/migration/migrations/migrations.go @@ -109,6 +109,11 @@ var migrations = []migration.Migration{ NoPrecondition, runRemoveInvalidDatabasePrivileges, ), + migration.NewSystemMigration( + "update engine table format to support block property collectors and filters", + toCV(clusterversion.PebbleFormatVersionBlockProperties), + ensureEngineVersionAtLeast, + ), } func init() { diff --git a/pkg/server/config.go b/pkg/server/config.go index e67a412505bc..eeadbdd87842 100644 --- a/pkg/server/config.go +++ b/pkg/server/config.go @@ -625,6 +625,11 @@ func (cfg *Config) CreateEngines(ctx context.Context) (Engines, error) { pebbleConfig.Opts.Cache = pebbleCache pebbleConfig.Opts.TableCache = tableCache pebbleConfig.Opts.MaxOpenFiles = int(openFileLimitPerStore) + if cfg.TestingKnobs.Store != nil { + v := cfg.TestingKnobs.Store.(*kvserver.StoreTestingKnobs).StorageKnobs.FormatMajorVersion + pebbleConfig.Opts.FormatMajorVersion = v + } + // If the spec contains Pebble options, set those too. if len(spec.PebbleOptions) > 0 { err := pebbleConfig.Opts.Parse(spec.PebbleOptions, &pebble.ParseHooks{}) diff --git a/pkg/server/migration.go b/pkg/server/migration.go index 1861f3954352..92e7d5976606 100644 --- a/pkg/server/migration.go +++ b/pkg/server/migration.go @@ -267,3 +267,34 @@ func (m *migrationServer) WaitForSpanConfigSubscription( resp := &serverpb.WaitForSpanConfigSubscriptionResponse{} return resp, nil } + +func (m *migrationServer) WaitForEngineVersion( + ctx context.Context, req *serverpb.WaitForEngineVersionRequest, +) (*serverpb.WaitForEngineVersionResponse, error) { + const opName = "wait-for-engine-version" + ctx, span := m.server.AnnotateCtxWithSpan(ctx, opName) + defer span.Finish() + ctx = logtags.AddTag(ctx, opName, nil) + + if err := m.server.stopper.RunTaskWithErr(ctx, opName, func( + ctx context.Context, + ) error { + // Same as in SyncAllEngines, because stores can be added asynchronously, we + // need to ensure that the bootstrap process has happened. + m.server.node.waitForAdditionalStoreInit() + + // Wait for each engine on this node to be ratcheted to at least the + // requested version. + for _, eng := range m.server.engines { + if err := eng.WaitForCompatibleEngineVersion(ctx, *req.Version); err != nil { + return err + } + } + return nil + }); err != nil { + return nil, err + } + + resp := &serverpb.WaitForEngineVersionResponse{} + return resp, nil +} diff --git a/pkg/server/serverpb/migration.proto b/pkg/server/serverpb/migration.proto index 57b2e69e91a9..65d22bee605d 100644 --- a/pkg/server/serverpb/migration.proto +++ b/pkg/server/serverpb/migration.proto @@ -70,6 +70,16 @@ message WaitForSpanConfigSubscriptionRequest{} // WaitForSpanConfigSubscriptionRequest. message WaitForSpanConfigSubscriptionResponse{} +// WaitForEngineVersionRequest waits until the target node has an engine format +// major version compatible with the given cluster version. +message WaitForEngineVersionRequest{ + roachpb.Version version = 1; +} + +// WaitForEngineVersionResponse is the response to a +// WaitForEngineVersionRequest. +message WaitForEngineVersionResponse{} + service Migration { // ValidateTargetClusterVersion is used to verify that the target node is // running a binary that's able to support the specified cluster version. @@ -102,4 +112,8 @@ service Migration { // WaitForSpanConfigSubscription waits until the target node is wholly // subscribed to the global span configurations state. rpc WaitForSpanConfigSubscription (WaitForSpanConfigSubscriptionRequest) returns (WaitForSpanConfigSubscriptionResponse) { } + + // WaitForEngineVersionRequest waits until the target node has an engine + // format major version compatible with a given cluster version. + rpc WaitForEngineVersion (WaitForEngineVersionRequest) returns (WaitForEngineVersionResponse) { } } diff --git a/pkg/storage/BUILD.bazel b/pkg/storage/BUILD.bazel index 5952a8b79c08..5c67b81906f9 100644 --- a/pkg/storage/BUILD.bazel +++ b/pkg/storage/BUILD.bazel @@ -66,6 +66,7 @@ go_library( "//pkg/util/log", "//pkg/util/mon", "//pkg/util/protoutil", + "//pkg/util/retry", "//pkg/util/syncutil", "//pkg/util/sysutil", "//pkg/util/timeutil", diff --git a/pkg/storage/engine.go b/pkg/storage/engine.go index 5198a553ab36..e6027857b394 100644 --- a/pkg/storage/engine.go +++ b/pkg/storage/engine.go @@ -768,6 +768,13 @@ type Engine interface { // MinVersionIsAtLeastTargetVersion returns whether the engine's recorded // storage min version is at least the target version. MinVersionIsAtLeastTargetVersion(target roachpb.Version) (bool, error) + + // FormatMajorVersion returns the current format major version of the engine. + FormatMajorVersion() pebble.FormatMajorVersion + + // WaitForCompatibleEngineVersion waits for the engine's format major version + // to be at least at a version that is compatible with the target version. + WaitForCompatibleEngineVersion(ctx context.Context, target roachpb.Version) error } // Batch is the interface for batch specific operations. diff --git a/pkg/storage/min_version_test.go b/pkg/storage/min_version_test.go index 9697dc30feb1..8edcfcc69be3 100644 --- a/pkg/storage/min_version_test.go +++ b/pkg/storage/min_version_test.go @@ -14,6 +14,7 @@ import ( "context" "os" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -94,31 +95,30 @@ func TestSetMinVersion(t *testing.T) { p, err := Open(context.Background(), InMemory(), CacheSize(0)) require.NoError(t, err) defer p.Close() - require.Equal(t, pebble.FormatMostCompatible, p.db.FormatMajorVersion()) + require.Equal(t, pebble.FormatMostCompatible, p.FormatMajorVersion()) // The earliest supported Cockroach version advances the pebble version. err = p.SetMinVersion(clusterversion.ByKey(clusterversion.V21_2)) require.NoError(t, err) - require.Equal(t, pebble.FormatSetWithDelete, p.db.FormatMajorVersion()) + require.Equal(t, pebble.FormatSetWithDelete, p.FormatMajorVersion()) // Setting the same min version twice is okay. err = p.SetMinVersion(clusterversion.ByKey(clusterversion.V21_2)) require.NoError(t, err) - require.Equal(t, pebble.FormatSetWithDelete, p.db.FormatMajorVersion()) + require.Equal(t, pebble.FormatSetWithDelete, p.FormatMajorVersion()) // Advancing the store cluster version to another cluster version // that does not advance the Pebble format major version should // leave the format major version unchanged. err = p.SetMinVersion(clusterversion.ByKey(clusterversion.ValidateGrantOption)) require.NoError(t, err) - require.Equal(t, pebble.FormatSetWithDelete, p.db.FormatMajorVersion()) + require.Equal(t, pebble.FormatSetWithDelete, p.FormatMajorVersion()) // Advancing the store cluster version to PebbleFormatBlockPropertyCollector // should also advance the store's format major version. err = p.SetMinVersion(clusterversion.ByKey(clusterversion.PebbleFormatBlockPropertyCollector)) require.NoError(t, err) - require.Equal(t, pebble.FormatBlockPropertyCollector, p.db.FormatMajorVersion()) - + require.Equal(t, pebble.FormatBlockPropertyCollector, p.FormatMajorVersion()) } func TestMinVersion_IsNotEncrypted(t *testing.T) { @@ -219,3 +219,71 @@ func (f fauxEncryptedFile) ReadAt(p []byte, off int64) (int, error) { } return n, err } + +func TestWaitForCompatibleEngineVersion(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + p, err := Open(context.Background(), InMemory(), CacheSize(0)) + require.NoError(t, err) + defer p.Close() + + const timeout = 5 * time.Second + setAndWait := func(ctx context.Context, vers roachpb.Version) (err error) { + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + errSet, errWait := make(chan error), make(chan error) + go func() { + errSet <- p.SetMinVersion(vers) + }() + go func() { + errWait <- p.WaitForCompatibleEngineVersion(ctx, vers) + }() + + // First wait for the set version operation to complete ... + if err = <-errSet; err != nil { + return err + } + + // ... then wait for the version to ratchet. + select { + case <-ctx.Done(): + err = ctx.Err() + case err = <-errWait: + } + + return + } + + ctx := context.Background() + + // The DB starts in the most compatible state. + require.Equal(t, pebble.FormatMostCompatible, p.FormatMajorVersion()) + + // Waiting for a newer engine version (without telling the store to update) is + // expected to time out. + ctx1, cancel := context.WithTimeout(ctx, timeout) + err = p.WaitForCompatibleEngineVersion(ctx1, clusterversion.ByKey(clusterversion.V21_2)) + require.Error(t, err) + require.Contains(t, err.Error(), "could not set desired engine version") + cancel() + + // Upgrade the DB to a cluster version with corresponding engine version for + // v21_2 (i.e. v4, SetWithDelete). + err = setAndWait(ctx, clusterversion.ByKey(clusterversion.V21_2)) + require.NoError(t, err) + require.Equal(t, pebble.FormatSetWithDelete, p.FormatMajorVersion()) + + // Upgrade the DB to a cluster version that requires a more recent Pebble + // engine version (i.e. block property collectors). + err = setAndWait(ctx, clusterversion.ByKey(clusterversion.PebbleFormatVersionBlockProperties)) + require.NoError(t, err) + require.Equal(t, pebble.FormatBlockPropertyCollector, p.FormatMajorVersion()) + + // Waiting for older store version is a no-op - the engine version is already + // at least at the corresponding minimum version. + err = setAndWait(ctx, clusterversion.ByKey(clusterversion.V21_2)) + require.NoError(t, err) + require.Equal(t, pebble.FormatBlockPropertyCollector, p.FormatMajorVersion()) +} diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go index 0e0ce7a68642..9b31a6428e8c 100644 --- a/pkg/storage/pebble.go +++ b/pkg/storage/pebble.go @@ -41,6 +41,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" @@ -1546,34 +1547,33 @@ func (p *Pebble) SetMinVersion(version roachpb.Version) error { return err } - // Pebble has a concept of format major versions, similar to cluster - // versions. Backwards incompatible changes to Pebble's on-disk - // format are gated behind new format major versions. Bumping the - // storage engine's format major version is tied to a CockroachDB - // cluster version. + formatVers := clusterVersionToFormatMajorVersion(version) + if p.db.FormatMajorVersion() < formatVers { + if err := p.db.RatchetFormatMajorVersion(formatVers); err != nil { + return errors.Wrap(err, "ratcheting format major version") + } + } + return nil +} + +func clusterVersionToFormatMajorVersion(version roachpb.Version) pebble.FormatMajorVersion { + // Pebble has a concept of format major versions, similar to cluster versions. + // Backwards incompatible changes to Pebble's on-disk format are gated behind + // new format major versions. Bumping the storage engine's format major + // version is tied to a CockroachDB cluster version. // - // Format major versions and cluster versions both only ratchet - // upwards. Here we map the persisted cluster version to the - // corresponding format major version, ratcheting Pebble's format - // major version if necessary. + // Format major versions and cluster versions both only ratchet upwards. Here + // we map the persisted cluster version to the corresponding format major + // version, ratcheting Pebble's format major version if necessary. formatVers := pebble.FormatMostCompatible // Cases are ordered from newer to older versions. switch { case !version.Less(clusterversion.ByKey(clusterversion.PebbleFormatBlockPropertyCollector)): - if formatVers < pebble.FormatBlockPropertyCollector { - formatVers = pebble.FormatBlockPropertyCollector - } + formatVers = pebble.FormatBlockPropertyCollector case !version.Less(clusterversion.ByKey(clusterversion.TODOPreV21_2)): - if formatVers < pebble.FormatSetWithDelete { - formatVers = pebble.FormatSetWithDelete - } - } - if p.db.FormatMajorVersion() < formatVers { - if err := p.db.RatchetFormatMajorVersion(formatVers); err != nil { - return errors.Wrap(err, "ratcheting format major version") - } + formatVers = pebble.FormatSetWithDelete } - return nil + return formatVers } // MinVersionIsAtLeastTargetVersion implements the Engine interface. @@ -1581,6 +1581,28 @@ func (p *Pebble) MinVersionIsAtLeastTargetVersion(target roachpb.Version) (bool, return MinVersionIsAtLeastTargetVersion(p.unencryptedFS, p.path, target) } +// FormatMajorVersion implements the Engine interface. +func (p *Pebble) FormatMajorVersion() pebble.FormatMajorVersion { + return p.db.FormatMajorVersion() +} + +// WaitForCompatibleEngineVersion implements the Engine interface. +func (p *Pebble) WaitForCompatibleEngineVersion(ctx context.Context, target roachpb.Version) error { + var current pebble.FormatMajorVersion + desired := clusterVersionToFormatMajorVersion(target) + for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); { + current = p.FormatMajorVersion() + if current >= desired { + return nil + } + log.Warningf(ctx, "waiting for desired engine version %s (current=%s)...", + desired, current, + ) + continue + } + return errors.Newf("could not set desired engine version %s (current=%s)", desired, current) +} + type pebbleReadOnly struct { parent *Pebble // The iterator reuse optimization in pebbleReadOnly is for servicing a diff --git a/pkg/storage/testing_knobs.go b/pkg/storage/testing_knobs.go index 87bde3d72f58..7a5557119261 100644 --- a/pkg/storage/testing_knobs.go +++ b/pkg/storage/testing_knobs.go @@ -10,6 +10,10 @@ package storage +import "github.com/cockroachdb/pebble" + // TestingKnobs can be passed when instantiating a storage engine. Settings here // are used to change behavior in tests. -type TestingKnobs struct{} +type TestingKnobs struct { + FormatMajorVersion pebble.FormatMajorVersion +}