Skip to content

Commit

Permalink
pkg/migration: support waiting for minimum engine version
Browse files Browse the repository at this point in the history
As part of the MVCC Bulk Operations project, [Pebble range keys][1] will
need to be enabled on _all_ engines of a cluster before nodes can start
using the feature to read and write SSTables that contain the range key
features (a backward-incompatible change).

Adding a cluster version is necessary, but not sufficient in
guaranteeing that nodes are ready to generate, but importantly _receive_
SSTabbles with range key support. Specifically, there exists a race
condition where nodes are told to update their engines as part of the
existing Pebble major format update process, but there is no
coordination between the nodes. One node (a sender) may complete its
engine upgrade and enable the new SSTable features _before_ another node
(the receiver). The latter will panic on receipt of an SSTable with the
newer features written by the former.

Add an server RPC endpoint that provides a means of waiting on a node to
update its store to a version that is compatible with a cluster version.
This endpoint is used as part of a system migration to ensure that all
nodes in a cluster are running with an engine version that is at least
compatible with a given cluster version.

Expose the table format major version on `storage.Engine`. This will be
used elsewhere in Cockroach (for example, SSTable generation for ingest
and backup).

Add a `WaitForCompatibleEngineVersion` function on the `storage.Engine`
interface that provides a mechanism to block until an engine is running
at a format major version that compatible with a given cluster version.

Expose the engine format major version as a `storage.TestingKnob` to
allow tests to alter the Pebble format major version.

Add a new cluster version to coordinate the upgrade of all engines in a
cluster to `pebble.FormatBlockPropertyCollector` (Pebble,v1), and the
system migration required for coordinating the engine upgrade to the
latest Pebble table format version.

This patch also fixes an existing issue where a node may write SSTables
with block properties as part of a backup that are then ingested by an
older node. This patch provides the infrastructure necessary for making
these "cluster-external" SSTable operations engine-aware. Nodes should
only use a table format version that other nodes in the cluster
understand.

Informs cockroachdb/pebble#1339.

[1]: cockroachdb/pebble#1339

Release note: None
  • Loading branch information
nicktrav committed Feb 4, 2022
1 parent b51c9b4 commit 66db578
Show file tree
Hide file tree
Showing 16 changed files with 407 additions and 33 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -178,4 +178,4 @@ trace.debug.enable boolean false if set, traces for recent requests can be seen
trace.jaeger.agent string the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as <host>:<port>. If no port is specified, 6381 will be used.
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 21.2-54 set the active cluster version in the format '<major>.<minor>'
version version 21.2-56 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,6 @@
<tr><td><code>trace.jaeger.agent</code></td><td>string</td><td><code></code></td><td>the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as <host>:<port>. If no port is specified, 6381 will be used.</td></tr>
<tr><td><code>trace.opentelemetry.collector</code></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.2-54</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.2-56</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
12 changes: 11 additions & 1 deletion pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,13 @@ const (
// RemoveIncompatibleDatabasePrivileges adds the migration which guarantees that
// databases do not have incompatible privileges
RemoveIncompatibleDatabasePrivileges
// PebbleFormatVersionBlockProperties enables a new Pebble SSTable format
// version for block property collectors.
// NB: a cluster version (PebbleFormatBlockPropertyCollector) was previously
// introduced for this change, however, it enabled the feature on an
// incompatible SSTable format version. This newer cluster version supersedes
// it.
PebbleFormatVersionBlockProperties
// *************************************************
// Step (1): Add new versions here.
// Do not add new versions to a patch release.
Expand Down Expand Up @@ -400,7 +407,10 @@ var versionsSingleton = keyedVersions{
Key: RemoveIncompatibleDatabasePrivileges,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 54},
},

{
Key: PebbleFormatVersionBlockProperties,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 56},
},
// *************************************************
// Step (2): Add new versions here.
// Do not add new versions to a patch release.
Expand Down
5 changes: 3 additions & 2 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pkg/migration/migrations/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
"alter_table_protected_timestamp_records.go",
"alter_table_statistics_avg_size.go",
"comment_on_index_migration.go",
"ensure_engine_version.go",
"ensure_no_draining_names.go",
"grant_option_migration.go",
"insert_missing_public_schema_namespace_entry.go",
Expand Down Expand Up @@ -63,6 +64,7 @@ go_test(
"alter_table_statistics_avg_size_test.go",
"builtins_test.go",
"comment_on_index_migration_external_test.go",
"ensure_engine_version_test.go",
"ensure_no_draining_names_external_test.go",
"grant_option_migration_external_test.go",
"helpers_test.go",
Expand Down Expand Up @@ -98,6 +100,7 @@ go_test(
"//pkg/sql/sem/tree",
"//pkg/sql/sqlutil",
"//pkg/sql/types",
"//pkg/storage",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
Expand All @@ -108,6 +111,7 @@ go_test(
"//pkg/util/log",
"//pkg/util/protoutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_pebble//:pebble",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
],
Expand Down
38 changes: 38 additions & 0 deletions pkg/migration/migrations/ensure_engine_version.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package migrations

import (
"context"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/migration"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
)

// ensureEngineVersionAtLeast waits for the engine version to be at least
// compatible with the given clusterversion.ClusterVersion, on all nodes in the
// cluster.
func ensureEngineVersionAtLeast(
ctx context.Context, v clusterversion.ClusterVersion, deps migration.SystemDeps, _ *jobs.Job,
) error {
return deps.Cluster.UntilClusterStable(ctx, func() error {
return deps.Cluster.ForEveryNode(ctx, "ensure-engine-version",
func(ctx context.Context, client serverpb.MigrationClient) error {
req := &serverpb.WaitForEngineVersionRequest{
Version: &v.Version,
}
_, err := client.WaitForEngineVersion(ctx, req)
return err
})
})
}
164 changes: 164 additions & 0 deletions pkg/migration/migrations/ensure_engine_version_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package migrations_test

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/pebble"
"github.com/stretchr/testify/require"
)

// TestEnsureEngineVersion_SingleNode verifies that the migration of a single
// node waits for the node's engine version to be at least at a minimum
// version, blocking until it occurs.
func TestEnsureEngineVersion_SingleNode(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
DisableAutomaticVersionUpgrade: 1,
BinaryVersionOverride: clusterversion.ByKey(
// Start at early version of the binary.
clusterversion.Start22_1,
),
},
Store: &kvserver.StoreTestingKnobs{
StorageKnobs: storage.TestingKnobs{
// Start at early engine version.
FormatMajorVersion: pebble.FormatMostCompatible,
},
},
},
},
})

defer tc.Stopper().Stop(ctx)
ts := tc.Server(0)

getFormatVersion := func() (pebble.FormatMajorVersion, error) {
e := ts.Engines()[0]
// Wait for the current version to stabilize.
currentVers := ts.ClusterSettings().Version.ActiveVersion(ctx)
if err := e.WaitForCompatibleEngineVersion(ctx, currentVers.Version); err != nil {
return 0, err
}
return e.FormatMajorVersion(), nil
}

// We start at Pebble major format version 4 (SetWithDelete). Note that the
// server was started with an earlier engine version (v0, "most compatible"),
// on start it will ratchet up to a compatible version.
v, err := getFormatVersion()
require.NoError(t, err)
require.Equal(t, v, pebble.FormatSetWithDelete)

// Bump the cluster version to include block properties.
tdb := sqlutils.MakeSQLRunner(tc.ServerConn(0))
tdb.ExecSucceedsSoon(t,
"SET CLUSTER SETTING version = $1",
clusterversion.ByKey(clusterversion.PebbleFormatVersionBlockProperties).String(),
)

// The store version has been ratcheted to version 5 (Block property
// collectors).
v, err = getFormatVersion()
require.NoError(t, err)
require.Equal(t, v, pebble.FormatBlockPropertyCollector)
}

// TestEnsureEngineVersion_MultiNode is the same as the above, except that it
// runs on a cluster of three nodes.
func TestEnsureEngineVersion_MultiNode(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

const nNodes = 3
tc := testcluster.StartTestCluster(t, nNodes, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
DisableAutomaticVersionUpgrade: 1,
BinaryVersionOverride: clusterversion.ByKey(
clusterversion.Start22_1,
),
},
Store: &kvserver.StoreTestingKnobs{
StorageKnobs: storage.TestingKnobs{
FormatMajorVersion: pebble.FormatMostCompatible,
},
},
},
},
})

defer tc.Stopper().Stop(ctx)

getFormatVersions := func(tc *testcluster.TestCluster) (
[]pebble.FormatMajorVersion, error,
) {
var versions [nNodes]pebble.FormatMajorVersion
for i := 0; i < nNodes; i++ {
ts := tc.Server(i)
e := ts.Engines()[0]
// Wait for the current version to stabilize.
currentVers := ts.ClusterSettings().Version.ActiveVersion(ctx)
if err := e.WaitForCompatibleEngineVersion(ctx, currentVers.Version); err != nil {
return nil, err
}
versions[i] = e.FormatMajorVersion()
}
return versions[:], nil
}

all := func(v pebble.FormatMajorVersion) []pebble.FormatMajorVersion {
var vs [nNodes]pebble.FormatMajorVersion
for i := 0; i < nNodes; i++ {
vs[i] = v
}
return vs[:]
}

// All nodes start at Pebble major format version 4 (SetWithDelete).
vs, err := getFormatVersions(tc)
require.NoError(t, err)
require.Equal(t, all(pebble.FormatSetWithDelete), vs)

// Bump the cluster version to include block properties.
tdb := sqlutils.MakeSQLRunner(tc.ServerConn(0))
tdb.ExecSucceedsSoon(t,
"SET CLUSTER SETTING version = $1",
clusterversion.ByKey(clusterversion.PebbleFormatVersionBlockProperties).String(),
)

// The store versions have been ratcheted to version 5 (Block property
// collectors).
vs, err = getFormatVersions(tc)
require.NoError(t, err)
require.Equal(t, all(pebble.FormatBlockPropertyCollector), vs)
}
5 changes: 5 additions & 0 deletions pkg/migration/migrations/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ var migrations = []migration.Migration{
NoPrecondition,
runRemoveInvalidDatabasePrivileges,
),
migration.NewSystemMigration(
"update engine table format to support block property collectors and filters",
toCV(clusterversion.PebbleFormatVersionBlockProperties),
ensureEngineVersionAtLeast,
),
}

func init() {
Expand Down
5 changes: 5 additions & 0 deletions pkg/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,11 @@ func (cfg *Config) CreateEngines(ctx context.Context) (Engines, error) {
pebbleConfig.Opts.Cache = pebbleCache
pebbleConfig.Opts.TableCache = tableCache
pebbleConfig.Opts.MaxOpenFiles = int(openFileLimitPerStore)
if cfg.TestingKnobs.Store != nil {
v := cfg.TestingKnobs.Store.(*kvserver.StoreTestingKnobs).StorageKnobs.FormatMajorVersion
pebbleConfig.Opts.FormatMajorVersion = v
}

// If the spec contains Pebble options, set those too.
if len(spec.PebbleOptions) > 0 {
err := pebbleConfig.Opts.Parse(spec.PebbleOptions, &pebble.ParseHooks{})
Expand Down
31 changes: 31 additions & 0 deletions pkg/server/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,3 +267,34 @@ func (m *migrationServer) WaitForSpanConfigSubscription(
resp := &serverpb.WaitForSpanConfigSubscriptionResponse{}
return resp, nil
}

func (m *migrationServer) WaitForEngineVersion(
ctx context.Context, req *serverpb.WaitForEngineVersionRequest,
) (*serverpb.WaitForEngineVersionResponse, error) {
const opName = "wait-for-engine-version"
ctx, span := m.server.AnnotateCtxWithSpan(ctx, opName)
defer span.Finish()
ctx = logtags.AddTag(ctx, opName, nil)

if err := m.server.stopper.RunTaskWithErr(ctx, opName, func(
ctx context.Context,
) error {
// Same as in SyncAllEngines, because stores can be added asynchronously, we
// need to ensure that the bootstrap process has happened.
m.server.node.waitForAdditionalStoreInit()

// Wait for each engine on this node to be ratcheted to at least the
// requested version.
for _, eng := range m.server.engines {
if err := eng.WaitForCompatibleEngineVersion(ctx, *req.Version); err != nil {
return err
}
}
return nil
}); err != nil {
return nil, err
}

resp := &serverpb.WaitForEngineVersionResponse{}
return resp, nil
}
14 changes: 14 additions & 0 deletions pkg/server/serverpb/migration.proto
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,16 @@ message WaitForSpanConfigSubscriptionRequest{}
// WaitForSpanConfigSubscriptionRequest.
message WaitForSpanConfigSubscriptionResponse{}

// WaitForEngineVersionRequest waits until the target node has an engine format
// major version compatible with the given cluster version.
message WaitForEngineVersionRequest{
roachpb.Version version = 1;
}

// WaitForEngineVersionResponse is the response to a
// WaitForEngineVersionRequest.
message WaitForEngineVersionResponse{}

service Migration {
// ValidateTargetClusterVersion is used to verify that the target node is
// running a binary that's able to support the specified cluster version.
Expand Down Expand Up @@ -102,4 +112,8 @@ service Migration {
// WaitForSpanConfigSubscription waits until the target node is wholly
// subscribed to the global span configurations state.
rpc WaitForSpanConfigSubscription (WaitForSpanConfigSubscriptionRequest) returns (WaitForSpanConfigSubscriptionResponse) { }

// WaitForEngineVersionRequest waits until the target node has an engine
// format major version compatible with a given cluster version.
rpc WaitForEngineVersion (WaitForEngineVersionRequest) returns (WaitForEngineVersionResponse) { }
}
1 change: 1 addition & 0 deletions pkg/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ go_library(
"//pkg/util/log",
"//pkg/util/mon",
"//pkg/util/protoutil",
"//pkg/util/retry",
"//pkg/util/syncutil",
"//pkg/util/sysutil",
"//pkg/util/timeutil",
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,13 @@ type Engine interface {
// MinVersionIsAtLeastTargetVersion returns whether the engine's recorded
// storage min version is at least the target version.
MinVersionIsAtLeastTargetVersion(target roachpb.Version) (bool, error)

// FormatMajorVersion returns the current format major version of the engine.
FormatMajorVersion() pebble.FormatMajorVersion

// WaitForCompatibleEngineVersion waits for the engine's format major version
// to be at least at a version that is compatible with the target version.
WaitForCompatibleEngineVersion(ctx context.Context, target roachpb.Version) error
}

// Batch is the interface for batch specific operations.
Expand Down
Loading

0 comments on commit 66db578

Please sign in to comment.