Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
39640: storage: unify replica addition and removal paths r=nvanbenschoten a=tbg

This continues the reworking of the various replication change APIs with
the goal of allowing a) testing of general atomic replication changes b)
issuing replica swaps from the replicate queue (in 19.2).

For previous steps, see:

#39485
#39611

This change is not a pure plumbing PR. Instead, it unifies
`(*Replica).addReplica` and `(*Replica).removeReplica` into a method that
can do both, `(*Replica).addAndRemoveReplicas`.

Given a slice of ReplicationChanges, this method first adds learner
replicas corresponding to the desired new voters. After having sent
snapshots to all of them, the method issues a configuration change that
atomically
- upgrades all learners to voters
- removes any undesired replicas.

Note that no atomic membership changes are *actually* carried out yet. This
is because the callers of `addAndRemoveReplicas` pass in only a single
change (i.e. an addition or removal), which the method also verifies.

Three pieces are missing after this PR: First, we need to be able to
instruct raft to carry out atomic configuration changes:

https://github.com/cockroachdb/cockroach/blob/2e8db6ca53c59d3d281e64939f79d937195403d4/pkg/storage/replica_proposal_buf.go#L448-L451

which in particular requires being able to store the ConfState
corresponding to a joint configuration in the unreplicated local state
(under a new key).

Second, we must pass the slice of changes handed to
`AdminChangeReplicas` through to `addAndRemoveReplicas` without unrolling
it first, see:

https://github.com/cockroachdb/cockroach/blob/3b316bac6ef342590ddc68d2989714d6e126371a/pkg/storage/replica_command.go#L870-L891

and

https://github.com/cockroachdb/cockroach/blob/3b316bac6ef342590ddc68d2989714d6e126371a/pkg/storage/replica.go#L1314-L1325

Third, we must to teach the replicate queue to issue the "atomic swaps";
this is the reason we're introducing atomic membership changes in the first
place.

Release note: None

39656: kv: init heartbeat txn log tag later r=nvanbenschoten a=tbg

At init() time, the txn proto has not been populated yet.
Found while investigating #39652.

This change strikes me as clunky, but I don't have the bandwidth to dig deeper
right now.

Release note: None

39666: testutils/lint/passes: disable under nightly stress r=mjibson a=mjibson

Under stress these error with "go build a: failed to cache compiled Go files".

Fixes #39616
Fixes #39541
Fixes #39479

Release note: None

39669: rpc: use gRPC enforced minimum keepalive timeout r=knz a=ajwerner

Before this commit we'd experience the following annoying log message from gRPC
every time we create a new connection telling us that our setting is being
ignored.

```
Adjusting keepalive ping interval to minimum period of 10s
```

Release note: None

Co-authored-by: Tobias Schottdorf <[email protected]>
Co-authored-by: Matt Jibson <[email protected]>
Co-authored-by: Andrew Werner <[email protected]>
  • Loading branch information
4 people committed Aug 14, 2019
5 parents e73d88d + 329d825 + dc3686f + 05710c0 + b73abf6 commit 7fb25c6
Show file tree
Hide file tree
Showing 12 changed files with 247 additions and 135 deletions.
4 changes: 3 additions & 1 deletion pkg/kv/txn_interceptor_heartbeater.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ func (h *txnHeartbeater) init(
asyncAbortCallbackLocked func(context.Context),
) {
h.AmbientContext = ac
h.AmbientContext.AddLogTag("txn-hb", txn.Short())
h.stopper = stopper
h.clock = clock
h.heartbeatInterval = heartbeatInterval
Expand Down Expand Up @@ -208,6 +207,9 @@ func (h *txnHeartbeater) startHeartbeatLoopLocked(ctx context.Context) error {
log.VEventf(ctx, 2, "coordinator spawns heartbeat loop")
h.mu.loopStarted = true
h.mu.txnEnd = make(chan struct{})
// NB: we can't do this in init() because the txn isn't populated yet then
// (it's zero).
h.AmbientContext.AddLogTag("txn-hb", h.mu.txn.Short())

// Create a new context so that the heartbeat loop doesn't inherit the
// caller's cancelation.
Expand Down
23 changes: 23 additions & 0 deletions pkg/roachpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1212,6 +1212,29 @@ func (acrr *AdminChangeReplicasRequest) AddChanges(chgs ...ReplicationChange) {
}
}

// ReplicationChanges is a slice of ReplicationChange.
type ReplicationChanges []ReplicationChange

func (rc ReplicationChanges) byType(typ ReplicaChangeType) []ReplicationTarget {
var sl []ReplicationTarget
for _, chg := range rc {
if chg.ChangeType == typ {
sl = append(sl, chg.Target)
}
}
return sl
}

// Additions returns a slice of all contained replication changes that add replicas.
func (rc ReplicationChanges) Additions() []ReplicationTarget {
return rc.byType(ADD_REPLICA)
}

// Removals returns a slice of all contained replication changes that remove replicas.
func (rc ReplicationChanges) Removals() []ReplicationTarget {
return rc.byType(REMOVE_REPLICA)
}

// Changes returns the changes requested by this AdminChangeReplicasRequest, taking
// the deprecated method of doing so into account.
func (acrr *AdminChangeReplicasRequest) Changes() []ReplicationChange {
Expand Down
2 changes: 1 addition & 1 deletion pkg/rpc/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -980,7 +980,7 @@ type grpcKeepaliveTestCase struct {
func grpcRunKeepaliveTestCase(testCtx context.Context, c grpcKeepaliveTestCase) error {
var cKeepalive keepalive.ClientParameters
if c.cKeepalive {
cKeepalive = clientTestingKeepalive
cKeepalive = clientKeepalive
}
var sKeepalive keepalive.ServerParameters
if c.sKeepalive {
Expand Down
17 changes: 8 additions & 9 deletions pkg/rpc/keepalive.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,22 @@ import (
"google.golang.org/grpc/keepalive"
)

// 10 seconds is the minimum keepalive interval permitted by gRPC.
// Setting it to a value lower than this will lead to gRPC adjusting to this
// value and annoyingly logging "Adjusting keepalive ping interval to minimum
// period of 10s". See grpc/grpc-go#2642.
const minimumClientKeepaliveInterval = 10 * time.Second

// To prevent unidirectional network partitions from keeping an unhealthy
// connection alive, we use both client-side and server-side keepalive pings.
var clientKeepalive = keepalive.ClientParameters{
// Send periodic pings on the connection.
Time: base.NetworkTimeout,
Time: minimumClientKeepaliveInterval,
// If the pings don't get a response within the timeout, we might be
// experiencing a network partition. gRPC will close the transport-level
// connection and all the pending RPCs (which may not have timeouts) will
// fail eagerly. gRPC will then reconnect the transport transparently.
Timeout: base.NetworkTimeout,
Timeout: minimumClientKeepaliveInterval,
// Do the pings even when there are no ongoing RPCs.
PermitWithoutStream: true,
}
Expand All @@ -48,13 +54,6 @@ var serverEnforcement = keepalive.EnforcementPolicy{
PermitWithoutStream: true,
}

// These aggressively low keepalive timeouts ensure that tests which use
// them don't take too long.
var clientTestingKeepalive = keepalive.ClientParameters{
Time: 200 * time.Millisecond,
Timeout: 300 * time.Millisecond,
PermitWithoutStream: true,
}
var serverTestingKeepalive = keepalive.ServerParameters{
Time: 200 * time.Millisecond,
Timeout: 300 * time.Millisecond,
Expand Down
10 changes: 6 additions & 4 deletions pkg/storage/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1673,20 +1673,22 @@ func TestChangeReplicasGeneration(t *testing.T) {
assert.EqualValues(t, repl.Desc().GetGeneration(), oldGeneration+2)

oldGeneration = repl.Desc().GetGeneration()
if _, err := repl.ChangeReplicas(
oldDesc := repl.Desc()
newDesc, err := repl.ChangeReplicas(
context.Background(),
roachpb.REMOVE_REPLICA,
roachpb.ReplicationTarget{
NodeID: mtc.idents[1].NodeID,
StoreID: mtc.idents[1].StoreID,
},
repl.Desc(),
oldDesc,
storagepb.ReasonRangeOverReplicated,
"",
); err != nil {
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
assert.EqualValues(t, repl.Desc().GetGeneration(), oldGeneration+1)
assert.EqualValues(t, repl.Desc().GetGeneration(), oldGeneration+1, "\nold: %+v\nnew: %+v", oldDesc, newDesc)
}

func TestSystemZoneConfigs(t *testing.T) {
Expand Down
Loading

0 comments on commit 7fb25c6

Please sign in to comment.