Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-23.1: upgrade: checkpoint fence version for multi-tenant clusters #104921

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/upgrade/upgrademanager/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -49,24 +49,29 @@ go_test(
args = ["-test.timeout=295s"],
deps = [
"//pkg/base",
"//pkg/ccl/kvccl/kvtenantccl",
"//pkg/clusterversion",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/kv",
"//pkg/kv/kvserver/batcheval",
"//pkg/kv/kvserver/liveness",
"//pkg/roachpb",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/server/settingswatcher",
"//pkg/settings/cluster",
"//pkg/sql/execinfra",
"//pkg/sql/isql",
"//pkg/sql/sem/eval",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/upgrade",
"//pkg/upgrade/upgradebase",
"//pkg/upgrade/upgrades",
"//pkg/util",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand Down
2 changes: 1 addition & 1 deletion pkg/upgrade/upgrademanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ func (m *Manager) Migrate(
if mustPersistFenceVersion {
var err error
for {
err = updateSystemVersionSetting(ctx, cv, validate)
err = updateSystemVersionSetting(ctx, fenceVersion, validate)
if errors.Is(err, upgradecluster.InconsistentSQLServersError) {
if err := bumpClusterVersion(ctx, m.deps.Cluster, fenceVersion); err != nil {
return err
Expand Down
131 changes: 131 additions & 0 deletions pkg/upgrade/upgrademanager/manager_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,34 @@ import (
"context"
gosql "database/sql"
"fmt"
"math/rand"
"sort"
"sync/atomic"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
_ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/server/settingswatcher"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/upgrade"
"github.com/cockroachdb/cockroach/pkg/upgrade/upgradebase"
"github.com/cockroachdb/cockroach/pkg/upgrade/upgrades"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -618,6 +625,7 @@ func TestPrecondition(t *testing.T) {
require.Equalf(t, exp, got, "server %d", i)
}
}

defer tc.Stopper().Stop(ctx)
sqlDB := tc.ServerConn(0)
{
Expand Down Expand Up @@ -662,3 +670,126 @@ func TestPrecondition(t *testing.T) {
checkActiveVersion(t, v2)
}
}

func TestMigrationFailure(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

// Configure the range of versions used by the test
startVersionKey := clusterversion.BinaryMinSupportedVersionKey
startVersion := clusterversion.ByKey(startVersionKey)
endVersionKey := clusterversion.BinaryVersionKey
endVersion := clusterversion.ByKey(endVersionKey)

// Pick a random version in to fail at
versions := clusterversion.ListBetween(startVersion, endVersion)
failVersion := versions[rand.Intn(len(versions))]
fenceVersion := upgrade.FenceVersionFor(ctx, clusterversion.ClusterVersion{Version: failVersion}).Version
t.Logf("test will fail at version: %s", failVersion.String())

// Create a storage cluster for the tenant
testCluster := serverutils.StartNewTestCluster(t, 1, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
DisableDefaultTestTenant: true,
Knobs: base.TestingKnobs{
SQLEvalContext: &eval.TestingKnobs{
TenantLogicalVersionKeyOverride: startVersionKey,
},
},
},
})
defer testCluster.Stopper().Stop(ctx)

// Set the version override so that the tenant is able to upgrade. If this is
// not set, the tenant treats the storage cluster as if it had the oldest
// supported binary version.
s := testCluster.Server(0)
goDB := serverutils.OpenDBConn(t, s.ServingSQLAddr(), "system", false, s.Stopper())
_, err := goDB.Exec(`ALTER TENANT ALL SET CLUSTER SETTING version = $1`, endVersion.String())
require.NoError(t, err)

// setting failUpgrade to false disables the upgrade error logic.
var failUpgrade atomic.Bool
failUpgrade.Store(true)

// Create a tenant cluster at the oldest supported binary version. Configure
// upgrade manager to fail the upgrade at a random version.
tenantSettings := cluster.MakeTestingClusterSettingsWithVersions(
endVersion,
startVersion,
false,
)
require.NoError(t, clusterversion.Initialize(ctx, startVersion, &tenantSettings.SV))
tenant, db := serverutils.StartTenant(t, testCluster.Server(0), base.TestTenantArgs{
TenantID: roachpb.MustMakeTenantID(10),
Settings: tenantSettings,
TestingKnobs: base.TestingKnobs{
Server: &server.TestingKnobs{
DisableAutomaticVersionUpgrade: make(chan struct{}),
BootstrapVersionKeyOverride: startVersionKey,
BinaryVersionOverride: startVersion,
},
UpgradeManager: &upgradebase.TestingKnobs{
DontUseJobs: true,
RegistryOverride: func(cv roachpb.Version) (upgradebase.Upgrade, bool) {
if failUpgrade.Load() && cv == failVersion {
errorUpgrade := func(ctx context.Context, version clusterversion.ClusterVersion, deps upgrade.TenantDeps) error {
return errors.New("the upgrade failed with some error!")
}
return upgrade.NewTenantUpgrade("test", cv, nil, errorUpgrade), true
}
return upgrades.GetUpgrade(cv)
},
},
},
})

// Wait for the storage cluster version to propogate
watcher := tenant.SettingsWatcher().(*settingswatcher.SettingsWatcher)
testutils.SucceedsSoon(t, func() error {
storageVersion := watcher.GetStorageClusterActiveVersion()
if !storageVersion.IsActive(endVersionKey) {
return errors.Newf("expected storage version of at least %s found %s", endVersion.PrettyPrint(), storageVersion.PrettyPrint())
}
return nil
})

checkActiveVersion := func(t *testing.T, exp roachpb.Version) {
got := tenant.ClusterSettings().Version.ActiveVersion(ctx).Version
require.Equal(t, exp, got)
}
checkSettingVersion := func(t *testing.T, exp roachpb.Version) {
var settingVersion clusterversion.ClusterVersion
require.NoError(t, tenant.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
var err error
settingVersion, err = watcher.GetClusterVersionFromStorage(ctx, txn)
return err
}))
require.Equal(t, exp, settingVersion.Version)
}

checkActiveVersion(t, startVersion)
checkSettingVersion(t, startVersion)

// Try to finalize
_, err = db.Exec(`SET CLUSTER SETTING version = $1`, endVersion.String())
require.Error(t, err)
checkActiveVersion(t, fenceVersion)
// Note: we don't check the setting version here because the fence setting
// is only materialized on the first attempt.

// Try to finalize again.
_, err = db.Exec(`SET CLUSTER SETTING version = $1`, endVersion.String())
require.Error(t, err)
checkActiveVersion(t, fenceVersion)
checkSettingVersion(t, fenceVersion)

// Try to finalize a final time, allowing the upgrade to succeed.
failUpgrade.Store(false)
_, err = db.Exec(`SET CLUSTER SETTING version = $1`, endVersion.String())
require.NoError(t, err)
checkActiveVersion(t, endVersion)
checkSettingVersion(t, endVersion)
}