Skip to content

Commit

Permalink
[wip] *: add a bunch of tests for the migration infrastructure
Browse files Browse the repository at this point in the history
And address open comments in #57445.

Release note: None
  • Loading branch information
irfansharif committed Dec 7, 2020
1 parent c93d65a commit 12bf0c2
Show file tree
Hide file tree
Showing 18 changed files with 1,013 additions and 150 deletions.
4 changes: 1 addition & 3 deletions pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,9 +642,7 @@ func (db *DB) AddSSTable(
// Migrate proactively forces ranges overlapping with the provided keyspace to
// transition out of any legacy modes of operation (as defined by the target
// version).
func (db *DB) Migrate(
ctx context.Context, begin, end interface{}, version roachpb.Version,
) error {
func (db *DB) Migrate(ctx context.Context, begin, end interface{}, version roachpb.Version) error {
b := &Batch{}
b.migrate(begin, end, version)
return getOneErr(db.Run(ctx, b), b)
Expand Down
18 changes: 0 additions & 18 deletions pkg/kv/kvserver/queue_helpers_testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"context"

"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)

// Code in this file is for testing usage only. It is exported only because it
Expand All @@ -26,23 +25,6 @@ func (bq *baseQueue) testingAdd(
return bq.addInternal(ctx, repl.Desc(), repl.ReplicaID(), priority)
}

func forceScanAndProcess(s *Store, q *baseQueue) error {
// Check that the system config is available. It is needed by many queues. If
// it's not available, some queues silently fail to process any replicas,
// which is undesirable for this method.
if cfg := s.Gossip().GetSystemConfig(); cfg == nil {
return errors.Errorf("system config not available in gossip")
}

newStoreReplicaVisitor(s).Visit(func(repl *Replica) bool {
q.maybeAdd(context.Background(), repl, s.cfg.Clock.Now())
return true
})

q.DrainQueue(s.stopper)
return nil
}

func mustForceScanAndProcess(ctx context.Context, s *Store, q *baseQueue) {
if err := forceScanAndProcess(s, q); err != nil {
log.Fatalf(ctx, "%v", err)
Expand Down
26 changes: 26 additions & 0 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2765,6 +2765,32 @@ func (s *Store) ManuallyEnqueue(
return collect(), processErr, nil
}

// GCReplicas iterates over all ranges and processes any that may need to be
// GC'd.
func (s *Store) GCReplicas() error {
if interceptor := s.TestingKnobs().GCReplicasInterceptor; interceptor != nil {
interceptor()
}
return forceScanAndProcess(s, s.replicaGCQueue.baseQueue)
}

func forceScanAndProcess(s *Store, q *baseQueue) error {
// Check that the system config is available. It is needed by many queues. If
// it's not available, some queues silently fail to process any replicas,
// which is undesirable for this method.
if cfg := s.Gossip().GetSystemConfig(); cfg == nil {
return errors.Errorf("system config not available in gossip")
}

newStoreReplicaVisitor(s).Visit(func(repl *Replica) bool {
q.maybeAdd(context.Background(), repl, s.cfg.Clock.Now())
return true
})

q.DrainQueue(s.stopper)
return nil
}

// WriteClusterVersion writes the given cluster version to the store-local
// cluster version key.
func WriteClusterVersion(
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,9 @@ type StoreTestingKnobs struct {
// heartbeats and then expect other replicas to take the lease without
// worrying about Raft).
AllowLeaseRequestProposalsWhenNotLeader bool
// GCReplicasInterceptor intercepts attempts to GC all replicas in the
// store.
GCReplicasInterceptor func()
}

// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
Expand Down
36 changes: 35 additions & 1 deletion pkg/migration/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "migration",
Expand All @@ -20,11 +20,45 @@ go_library(
"//pkg/rpc/nodedialer",
"//pkg/server/serverpb",
"//pkg/sql",
"//pkg/sql/sqlutil",
"//pkg/util/ctxgroup",
"//pkg/util/log",
"//pkg/util/timeutil",
"//vendor/github.com/cockroachdb/errors",
"//vendor/github.com/cockroachdb/logtags",
"//vendor/github.com/cockroachdb/redact",
"//vendor/google.golang.org/grpc",
],
)

go_test(
name = "migration_test",
srcs = [
"client_test.go",
"helper_test.go",
"main_test.go",
"manager_test.go",
"util_test.go",
],
embed = [":migration"],
deps = [
"//pkg/clusterversion",
"//pkg/kv",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/liveness/livenesspb",
"//pkg/roachpb",
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/server/serverpb",
"//pkg/sql/sqlutil",
"//pkg/sql/tests",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/syncutil",
"//vendor/github.com/cockroachdb/errors",
"//vendor/google.golang.org/grpc",
],
)
171 changes: 171 additions & 0 deletions pkg/migration/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
package migration_test

import (
"context"
"database/sql"
"testing"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/migration"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
)

func TestHelperIterateRangeDescriptors(t *testing.T) {
defer leaktest.AfterTest(t)

cv := clusterversion.ClusterVersion{}
ctx := context.Background()
const numNodes = 1

params, _ := tests.CreateTestServerParams()
server, _, kvDB := serverutils.StartServer(t, params)
defer server.Stopper().Stop(context.Background())

var numRanges int
if err := server.GetStores().(*kvserver.Stores).VisitStores(func(s *kvserver.Store) error {
numRanges = s.ReplicaCount()
return nil
}); err != nil {
t.Fatal(err)
}

c := migration.TestingNewCluster(numNodes, kvDB, server.InternalExecutor().(sqlutil.InternalExecutor))
h := migration.TestingNewHelper(c, cv)

for _, blockSize := range []int{1, 5, 10, 50} {
var numDescs int
init := func() { numDescs = 0 }
if err := h.IterateRangeDescriptors(ctx, blockSize, init, func(descriptors ...roachpb.RangeDescriptor) error {
numDescs += len(descriptors)
return nil
}); err != nil {
t.Fatal(err)
}

// TODO(irfansharif): We always seem to include a second copy of r1's
// desc. Unsure why.
if numDescs != numRanges+1 {
t.Fatalf("expected to find %d ranges, found %d", numRanges+1, numDescs)
}

}
}

func TestHelperWithMigrationTable(t *testing.T) {
defer leaktest.AfterTest(t)

// Sort above any real version.
cv := clusterversion.ClusterVersion{
Version: roachpb.Version{Major: 420, Minor: 7, Internal: 10},
}
ctx := context.Background()
const numNodes = 1

params, _ := tests.CreateTestServerParams()
server, sqlDB, kvDB := serverutils.StartServer(t, params)
defer server.Stopper().Stop(context.Background())

c := migration.TestingNewCluster(numNodes, kvDB, server.InternalExecutor().(sqlutil.InternalExecutor))
h := migration.TestingNewHelper(c, cv)

dummyDesc := "dummy desc"
if err := h.TestingInsertMigrationRecord(ctx, "dummy desc"); err != nil {
t.Fatal(err)
}
{
// Check to see that the initial migration record is what we expect.
row := sqlDB.QueryRow(`
SELECT version, status, description
FROM system.migrations
ORDER BY version DESC LIMIT 1`,
)
var version, status, desc string
if err := row.Scan(&version, &status, &desc); err != nil {
t.Fatal(err)
}

if version != cv.String() {
t.Fatalf("expected %s, got %s", cv, version)
}

if status != string(migration.StatusRunning) {
t.Fatalf("expected %s, got %s", migration.StatusSucceeded, status)
}

if desc != dummyDesc {
t.Fatalf("expected %s, got %s", dummyDesc, desc)
}
}

dummyProgress := "dummy progress"
if err := h.UpdateProgress(ctx, dummyProgress); err != nil {
t.Fatal(err)
}

{
row := sqlDB.QueryRow(`
SELECT progress
FROM system.migrations
ORDER BY version DESC LIMIT 1`,
)
var progress string
if err := row.Scan(&progress); err != nil {
t.Fatal(err)
}

if progress != dummyProgress {
t.Fatalf("expected %s, got %s", dummyProgress, progress)
}
}

if err := h.TestingUpdateStatus(ctx, migration.StatusFailed); err != nil {
t.Fatal(err)
}

{
row := sqlDB.QueryRow(`
SELECT status, completed
FROM system.migrations
ORDER BY version DESC LIMIT 1`,
)
var status string
var completed sql.NullTime
if err := row.Scan(&status, &completed); err != nil {
t.Fatal(err)
}

if status != string(migration.StatusFailed) {
t.Fatalf("expected %s, got %s", dummyProgress, migration.StatusFailed)
}
if (completed != sql.NullTime{}) {
t.Fatalf("expected empty completed timestamp, got %v", completed)
}
}
if err := h.TestingUpdateStatus(ctx, migration.StatusSucceeded); err != nil {
t.Fatal(err)
}
{
row := sqlDB.QueryRow(`
SELECT status, completed
FROM system.migrations
ORDER BY version DESC LIMIT 1`,
)
var status string
var completed sql.NullTime
if err := row.Scan(&status, &completed); err != nil {
t.Fatal(err)
}

if status != string(migration.StatusSucceeded) {
t.Fatalf("expected %s, got %s", dummyProgress, migration.StatusSucceeded)
}
if (completed == sql.NullTime{}) {
t.Fatalf("expected non-empty completed timestamp")
}
}
}
Loading

0 comments on commit 12bf0c2

Please sign in to comment.