Skip to content

Commit

Permalink
*: introduce GCReplicas and FlushAllEngines RPCs
Browse files Browse the repository at this point in the history
We introduce two new RPCs for the migration infrastructure to use.
`GCReplicas` will be used to instruct the target node to process all
GC-able replicas. `FlushAllEngines` will be used to instruct the target
node to persist all in-memory state to disk. Both of these are necessary
primitives for the migration infastructure.

Specifically this comes up in the context of wanting the ensure that
ranges where we've executed a ranged `Migrate` command over have no way
of ever surfacing pre-migrated state. This can happen with older
replicas in the replica GC queue and with applied state that is not yet
persisted. We elaborate on both of these below:

Motivation for `GCReplicas`: Currently we wait for the `Migrate` to have
applied on all replicas of a range before returning to the caller. This
does not include earlier incarnations of the range, possibly sitting
idle in the replica GC queue. These replicas can still request leases,
and go through the request evaluation paths, possibly tripping up
assertions that check to see no pre-migrated state is found. For this
reason we introduce the `GCReplicas` RPC that the migration manager can
use to ensure all GC-able replicas are processed before declaring the
specific cluster version bump complete.

Motivation for `FlushAllEngines`: Like we mentioned above, KV currently
waits for the `Migrate` command to have applied on all replicas before
returning. With the applied state, there's no necessity to durably
persist it (the representative version is already stored in the raft
log). Out of an abundance of caution, and to really really ensure that
no pre-migrated state is ever seen in the system, we provide the
migration manager a mechanism to flush out all in-memory state to disk.
This way the manager can guarantee that by the time a specific cluster
version is bumped, all pre-migrated state from prior to that cluster
version will have been fully purged from the system.

---

The ideas here follow from our original prototype in #57445. Neither of
these RPCs are currently wired up to anything. That'll happen in a
future commit introducing the raft truncated state migration.

Release note: None
  • Loading branch information
irfansharif committed Dec 8, 2020
1 parent b8b7722 commit 00f1a41
Show file tree
Hide file tree
Showing 7 changed files with 657 additions and 37 deletions.
18 changes: 0 additions & 18 deletions pkg/kv/kvserver/queue_helpers_testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"context"

"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)

// Code in this file is for testing usage only. It is exported only because it
Expand All @@ -26,23 +25,6 @@ func (bq *baseQueue) testingAdd(
return bq.addInternal(ctx, repl.Desc(), repl.ReplicaID(), priority)
}

func forceScanAndProcess(s *Store, q *baseQueue) error {
// Check that the system config is available. It is needed by many queues. If
// it's not available, some queues silently fail to process any replicas,
// which is undesirable for this method.
if cfg := s.Gossip().GetSystemConfig(); cfg == nil {
return errors.Errorf("system config not available in gossip")
}

newStoreReplicaVisitor(s).Visit(func(repl *Replica) bool {
q.maybeAdd(context.Background(), repl, s.cfg.Clock.Now())
return true
})

q.DrainQueue(s.stopper)
return nil
}

func mustForceScanAndProcess(ctx context.Context, s *Store, q *baseQueue) {
if err := forceScanAndProcess(s, q); err != nil {
log.Fatalf(ctx, "%v", err)
Expand Down
26 changes: 26 additions & 0 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2763,6 +2763,32 @@ func (s *Store) ManuallyEnqueue(
return collect(), processErr, nil
}

// GCReplicas iterates over all ranges and processes any that may need to be
// GC'd.
func (s *Store) GCReplicas() error {
if interceptor := s.TestingKnobs().GCReplicasInterceptor; interceptor != nil {
interceptor()
}
return forceScanAndProcess(s, s.replicaGCQueue.baseQueue)
}

func forceScanAndProcess(s *Store, q *baseQueue) error {
// Check that the system config is available. It is needed by many queues. If
// it's not available, some queues silently fail to process any replicas,
// which is undesirable for this method.
if cfg := s.Gossip().GetSystemConfig(); cfg == nil {
return errors.Errorf("system config not available in gossip")
}

newStoreReplicaVisitor(s).Visit(func(repl *Replica) bool {
q.maybeAdd(context.Background(), repl, s.cfg.Clock.Now())
return true
})

q.DrainQueue(s.stopper)
return nil
}

// WriteClusterVersion writes the given cluster version to the store-local
// cluster version key.
func WriteClusterVersion(
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,9 @@ type StoreTestingKnobs struct {
// in execChangeReplicasTxn that prevent moving
// to a configuration that cannot make progress.
AllowDangerousReplicationChanges bool
// GCReplicasInterceptor intercepts attempts to GC all replicas in the
// store.
GCReplicasInterceptor func()
}

// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
Expand Down
28 changes: 28 additions & 0 deletions pkg/server/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,31 @@ func (m *migrationServer) BumpClusterVersion(
resp := &serverpb.BumpClusterVersionResponse{}
return resp, nil
}

// FlushAllEngines implements the MigrationServer interface.
func (m *migrationServer) FlushAllEngines(
_ context.Context, _ *serverpb.FlushAllEnginesRequest,
) (*serverpb.FlushAllEnginesResponse, error) {
for _, eng := range m.server.engines {
if err := eng.Flush(); err != nil {
return nil, err
}
}

resp := &serverpb.FlushAllEnginesResponse{}
return resp, nil
}

// GCReplicas implements the MigrationServer interface.
func (m *migrationServer) GCReplicas(
_ context.Context, _ *serverpb.GCReplicasRequest,
) (*serverpb.GCReplicasResponse, error) {
if err := m.server.node.stores.VisitStores(func(s *kvserver.Store) error {
return s.GCReplicas()
}); err != nil {
return nil, err
}

resp := &serverpb.GCReplicasResponse{}
return resp, nil
}
34 changes: 34 additions & 0 deletions pkg/server/migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -96,3 +97,36 @@ func TestValidateTargetClusterVersion(t *testing.T) {
s.Stopper().Stop(context.Background())
}
}

func TestMigrationGCReplicas(t *testing.T) {
defer leaktest.AfterTest(t)()

const numStores = 3
var storeSpecs []base.StoreSpec
for i := 0; i < numStores; i++ {
storeSpecs = append(storeSpecs, base.StoreSpec{InMemory: true})
}

intercepted := 0
s, _, _ := serverutils.StartServer(t, base.TestServerArgs{
StoreSpecs: storeSpecs,
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
GCReplicasInterceptor: func() {
intercepted++
},
},
},
})

migrationServer := s.MigrationServer().(*migrationServer)
if _, err := migrationServer.GCReplicas(context.Background(), &serverpb.GCReplicasRequest{}); err != nil {
t.Fatal(err)
}

if intercepted != numStores {
t.Fatalf("expected to have GC-ed replicas on %d stores, found %d", numStores, intercepted)
}

s.Stopper().Stop(context.Background())
}
Loading

0 comments on commit 00f1a41

Please sign in to comment.