Skip to content

Commit

Permalink
Merge #38149
Browse files Browse the repository at this point in the history
38149: storage: use raft learners in replica addition, defaulted off r=tbg a=danhhz



Co-authored-by: Daniel Harrison <[email protected]>
  • Loading branch information
craig[bot] and danhhz committed Jul 19, 2019
2 parents e115fa9 + 844eac4 commit 1db1dd6
Show file tree
Hide file tree
Showing 45 changed files with 1,401 additions and 169 deletions.
3 changes: 2 additions & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
<tr><td><code>kv.closed_timestamp.target_duration</code></td><td>duration</td><td><code>30s</code></td><td>if nonzero, attempt to provide closed timestamp notifications for timestamps trailing cluster time by approximately this duration</td></tr>
<tr><td><code>kv.follower_read.target_multiple</code></td><td>float</td><td><code>3</code></td><td>if above 1, encourages the distsender to perform a read against the closest replica if a request is older than kv.closed_timestamp.target_duration * (1 + kv.closed_timestamp.close_fraction * this) less a clock uncertainty interval. This value also is used to create follower_timestamp(). (WARNING: may compromise cluster stability or correctness; do not edit without supervision)</td></tr>
<tr><td><code>kv.import.batch_size</code></td><td>byte size</td><td><code>32 MiB</code></td><td>the maximum size of the payload in an AddSSTable request (WARNING: may compromise cluster stability or correctness; do not edit without supervision)</td></tr>
<tr><td><code>kv.learner_replicas.enabled</code></td><td>boolean</td><td><code>false</code></td><td>use learner replicas for replica addition</td></tr>
<tr><td><code>kv.raft.command.max_size</code></td><td>byte size</td><td><code>64 MiB</code></td><td>maximum size of a raft command</td></tr>
<tr><td><code>kv.raft_log.disable_synchronization_unsafe</code></td><td>boolean</td><td><code>false</code></td><td>set to true to disable synchronization on Raft log writes to persistent storage. Setting to true risks data loss or data corruption on server crashes. The setting is meant for internal testing only and SHOULD NOT be used in production.</td></tr>
<tr><td><code>kv.range.backpressure_range_size_multiplier</code></td><td>float</td><td><code>2</code></td><td>multiple of range_max_bytes that a range is allowed to grow to without splitting before writes to that range are blocked, or 0 to disable</td></tr>
Expand Down Expand Up @@ -120,6 +121,6 @@
<tr><td><code>trace.debug.enable</code></td><td>boolean</td><td><code>false</code></td><td>if set, traces for recent requests can be seen in the /debug page</td></tr>
<tr><td><code>trace.lightstep.token</code></td><td>string</td><td><code></code></td><td>if set, traces go to Lightstep using this token</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'); ignored if trace.lightstep.token is set</td></tr>
<tr><td><code>version</code></td><td>custom validation</td><td><code>19.1-5</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>custom validation</td><td><code>19.1-6</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
5 changes: 3 additions & 2 deletions pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -1163,8 +1163,9 @@ func removeDeadReplicas(
err = storage.IterateRangeDescriptors(ctx, db, func(desc roachpb.RangeDescriptor) (bool, error) {
hasSelf := false
numDeadPeers := 0
numReplicas := len(desc.Replicas().Unwrap())
for _, rep := range desc.Replicas().Unwrap() {
allReplicas := desc.Replicas().All()
numReplicas := len(allReplicas)
for _, rep := range allReplicas {
if rep.StoreID == storeIdent.StoreID {
hasSelf = true
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,8 +487,10 @@ func (ds *DistSender) getDescriptor(
func (ds *DistSender) sendSingleRange(
ctx context.Context, ba roachpb.BatchRequest, desc *roachpb.RangeDescriptor, withCommit bool,
) (*roachpb.BatchResponse, *roachpb.Error) {
// Try to send the call.
replicas := NewReplicaSlice(ds.gossip, desc)
// Try to send the call. Learner replicas won't serve reads/writes, so send
// only to the `Voters` replicas. This is just an optimization to save a
// network hop, everything would still work if we had `All` here.
replicas := NewReplicaSlice(ds.gossip, desc.Replicas().Voters())

// If this request needs to go to a lease holder and we know who that is, move
// it to the front.
Expand Down
5 changes: 4 additions & 1 deletion pkg/kv/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,10 @@ func (ds *DistSender) singleRangeFeed(
if ds.rpcContext != nil {
latencyFn = ds.rpcContext.RemoteClocks.Latency
}
replicas := NewReplicaSlice(ds.gossip, desc)
// Learner replicas won't serve reads/writes, so send only to the `Voters`
// replicas. This is just an optimization to save a network hop, everything
// would still work if we had `All` here.
replicas := NewReplicaSlice(ds.gossip, desc.Replicas().Voters())
replicas.OptimizeReplicaOrder(ds.getNodeDescriptor(), latencyFn)

transport, err := ds.transportFactory(SendOptions{}, ds.nodeDialer, replicas)
Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/replica_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,25 +42,25 @@ type ReplicaSlice []ReplicaInfo
// NewReplicaSlice creates a ReplicaSlice from the replicas listed in the range
// descriptor and using gossip to lookup node descriptors. Replicas on nodes
// that are not gossiped are omitted from the result.
func NewReplicaSlice(gossip *gossip.Gossip, desc *roachpb.RangeDescriptor) ReplicaSlice {
func NewReplicaSlice(gossip *gossip.Gossip, replicas []roachpb.ReplicaDescriptor) ReplicaSlice {
if gossip == nil {
return nil
}
replicas := make(ReplicaSlice, 0, len(desc.Replicas().Unwrap()))
for _, r := range desc.Replicas().Unwrap() {
rs := make(ReplicaSlice, 0, len(replicas))
for _, r := range replicas {
nd, err := gossip.GetNodeDescriptor(r.NodeID)
if err != nil {
if log.V(1) {
log.Infof(context.TODO(), "node %d is not gossiped: %v", r.NodeID, err)
}
continue
}
replicas = append(replicas, ReplicaInfo{
rs = append(rs, ReplicaInfo{
ReplicaDescriptor: r,
NodeDesc: nd,
})
}
return replicas
return rs
}

// ReplicaSlice implements shuffle.Interface.
Expand Down
20 changes: 17 additions & 3 deletions pkg/roachpb/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (r *RangeDescriptor) Validate() error {
return errors.Errorf("NextReplicaID must be non-zero")
}
seen := map[ReplicaID]struct{}{}
for i, rep := range r.Replicas().Unwrap() {
for i, rep := range r.Replicas().All() {
if err := rep.Validate(); err != nil {
return errors.Errorf("replica %d is invalid: %s", i, err)
}
Expand All @@ -247,8 +247,8 @@ func (r *RangeDescriptor) String() string {
}
buf.WriteString(" [")

if len(r.Replicas().Unwrap()) > 0 {
for i, rep := range r.Replicas().Unwrap() {
if allReplicas := r.Replicas().All(); len(allReplicas) > 0 {
for i, rep := range allReplicas {
if i > 0 {
buf.WriteString(", ")
}
Expand All @@ -270,6 +270,20 @@ func (r ReplicationTarget) String() string {
return fmt.Sprintf("n%d,s%d", r.NodeID, r.StoreID)
}

// ReplicaTypeLearner returns a ReplicaType_LEARNER pointer suitable for use in
// a nullable proto field.
func ReplicaTypeLearner() *ReplicaType {
t := ReplicaType_LEARNER
return &t
}

// ReplicaTypeVoter returns a ReplicaType_VOTER pointer suitable for use in a
// nullable proto field.
func ReplicaTypeVoter() *ReplicaType {
t := ReplicaType_VOTER
return &t
}

func (r ReplicaDescriptor) String() string {
var buf bytes.Buffer
fmt.Fprintf(&buf, "(n%d,s%d):", r.NodeID, r.StoreID)
Expand Down
75 changes: 72 additions & 3 deletions pkg/roachpb/metadata_replicas.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (d ReplicaDescriptors) Unwrap() []ReplicaDescriptor {
}

// All returns every replica in the set, including both voter replicas and
// learner replicas.
// learner replicas. Voter replicas are ordered first in the returned slice.
func (d ReplicaDescriptors) All() []ReplicaDescriptor {
return d.wrapped
}
Expand All @@ -76,6 +76,77 @@ func (d ReplicaDescriptors) Voters() []ReplicaDescriptor {
}

// Learners returns the learner replicas in the set.
//
// A learner is a participant in a raft group that accepts messages but doesn't
// vote. This means it doesn't affect raft quorum and thus doesn't affect the
// fragility of the range, even if it's very far behind or many learners are
// down.
//
// At the time of writing, learners are used in CockroachDB as an interim state
// while adding a replica. A learner replica is added to the range via raft
// ConfChange, a raft snapshot (of type LEARNER) is sent to catch it up, and
// then a second ConfChange promotes it to a full replica.
//
// This means that learners are currently always expected to have a short
// lifetime, approximately the time it takes to send a snapshot. Ideas have been
// kicked around to use learners with follower reads, which could be a cheap way
// to allow many geographies to have local reads without affecting write
// latencies. If implemented, these learners would have long lifetimes.
//
// For simplicity, CockroachDB treats learner replicas the same as voter
// replicas as much as possible, but there are a few exceptions:
//
// - Learner replicas are not considered when calculating quorum size, and thus
// do not affect the computation of which ranges are under-replicated for
// upreplication/alerting/debug/etc purposes. Ditto for over-replicated.
// - Learner replicas cannot become raft leaders, so we also don't allow them to
// become leaseholders. As a result, DistSender and the various oracles don't
// try to send them traffic.
// - The raft snapshot queue does not send snapshots to learners for reasons
// described below.
// - Merges won't run while a learner replica is present.
//
// Replicas are now added in two ConfChange transactions. The first creates the
// learner and the second promotes it to a voter. If the node that is
// coordinating this dies in the middle, we're left with an orphaned learner.
// For this reason, the replicate queue always first removes any learners it
// sees before doing anything else. We could instead try to finish off the
// learner snapshot and promotion, but this is more complicated and it's not yet
// clear the efficiency win is worth it.
//
// This introduces some rare races between the replicate queue and
// AdminChangeReplicas or if a range's lease is moved to a new owner while the
// old leaseholder is still processing it in the replicate queue. These races
// are handled by retrying if a learner disappears during the
// snapshot/promotion.
//
// If the coordinator otherwise encounters an error while sending the learner
// snapshot or promoting it (which can happen for a number of reasons, including
// the node getting the learner going away), it tries to clean up after itself
// by rolling back the addition of the learner.
//
// There is another race between the learner snapshot being sent and the raft
// snapshot queue happening to check the replica at the same time, also sending
// it a snapshot. This is safe but wasteful, so the raft snapshot queue won't
// try to send snapshots to learners.
//
// Merges are blocked if either side has a learner (to avoid working out the
// edge cases) but it's historically turned out to be a bad idea to get in the
// way of splits, so we allow them even when some of the replicas are learners.
// This orphans a learner on each side of the split (the original coordinator
// will not be able to finish either of them), but the replication queue will
// eventually clean them up.
//
// Learner replicas don't affect quorum but they do affect the system in other
// ways. The most obvious way is that the leader sends them the raft traffic it
// would send to any follower, consuming resources. More surprising is that once
// the learner has received a snapshot, it's considered by the quota pool that
// prevents the raft leader from getting too far ahead of the followers. This is
// because a learner (especially one that already has a snapshot) is expected to
// very soon be a voter, so we treat it like one. However, it means a slow
// learner can slow down regular traffic, which is possibly counterintuitive.
//
// For some related mega-comments, see Replica.sendSnapshot.
func (d ReplicaDescriptors) Learners() []ReplicaDescriptor {
// Note that the wrapped replicas are sorted first by type.
for i := range d.wrapped {
Expand All @@ -86,8 +157,6 @@ func (d ReplicaDescriptors) Learners() []ReplicaDescriptor {
return nil
}

var _, _ = ReplicaDescriptors.All, ReplicaDescriptors.Learners

// AsProto returns the protobuf representation of these replicas, suitable for
// setting the InternalReplicas field of a RangeDescriptor. When possible the
// SetReplicas method of RangeDescriptor should be used instead, this is only
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,7 @@ func (s *adminServer) statsForSpan(
if err := kv.Value.GetProto(&rng); err != nil {
return nil, s.serverError(err)
}
for _, repl := range rng.Replicas().Unwrap() {
for _, repl := range rng.Replicas().All() {
nodeIDs[repl.NodeID] = struct{}{}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -1125,7 +1125,7 @@ func (s *statusServer) RaftDebug(
desc := node.Range.State.Desc
// Check for whether replica should be GCed.
containsNode := false
for _, replica := range desc.Replicas().Unwrap() {
for _, replica := range desc.Replicas().All() {
if replica.NodeID == node.NodeID {
containsNode = true
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/settings/cluster/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const (
VersionStickyBit
VersionParallelCommits
VersionGenerationComparable
VersionLearnerReplicas

// Add new versions here (step one of two).

Expand Down Expand Up @@ -483,6 +484,11 @@ var versionsSingleton = keyedVersions([]keyedVersion{
Key: VersionGenerationComparable,
Version: roachpb.Version{Major: 19, Minor: 1, Unstable: 5},
},
{
// VersionLearnerReplicas is https://github.com/cockroachdb/cockroach/pull/38149.
Key: VersionLearnerReplicas,
Version: roachpb.Version{Major: 19, Minor: 1, Unstable: 6},
},

// Add new versions here (step two of two).

Expand Down
5 changes: 3 additions & 2 deletions pkg/settings/cluster/versionkey_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/sql/ambiguous_commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func TestAmbiguousCommit(t *testing.T) {
tableStartKey.Store(keys.MakeTablePrefix(tableID))

// Wait for new table to split & replication.
if err := tc.WaitForSplitAndReplication(tableStartKey.Load().([]byte)); err != nil {
if err := tc.WaitForSplitAndInitialization(tableStartKey.Load().([]byte)); err != nil {
t.Fatal(err)
}

Expand Down
32 changes: 23 additions & 9 deletions pkg/sql/crdb_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -1679,6 +1679,7 @@ CREATE VIEW crdb_internal.ranges AS SELECT
table_name,
index_name,
replicas,
learner_replicas,
split_enforced_until,
crdb_internal.lease_holder(start_key) AS lease_holder
FROM crdb_internal.ranges_no_leases
Expand All @@ -1693,6 +1694,7 @@ FROM crdb_internal.ranges_no_leases
{Name: "table_name", Typ: types.String},
{Name: "index_name", Typ: types.String},
{Name: "replicas", Typ: types.Int2Vector},
{Name: "learner_replicas", Typ: types.Int2Vector},
{Name: "split_enforced_until", Typ: types.Timestamp},
{Name: "lease_holder", Typ: types.Int},
},
Expand All @@ -1714,7 +1716,8 @@ CREATE TABLE crdb_internal.ranges_no_leases (
database_name STRING NOT NULL,
table_name STRING NOT NULL,
index_name STRING NOT NULL,
replicas INT[] NOT NULL,
replicas INT[] NOT NULL,
learner_replicas INT[] NOT NULL,
split_enforced_until TIMESTAMP
)
`,
Expand Down Expand Up @@ -1768,14 +1771,24 @@ CREATE TABLE crdb_internal.ranges_no_leases (
return nil, err
}

var replicas []int
for _, rd := range desc.Replicas().Unwrap() {
replicas = append(replicas, int(rd.StoreID))
var voterReplicas, learnerReplicas []int
for _, rd := range desc.Replicas().Voters() {
voterReplicas = append(voterReplicas, int(rd.StoreID))
}
sort.Ints(replicas)
arr := tree.NewDArray(types.Int)
for _, replica := range replicas {
if err := arr.Append(tree.NewDInt(tree.DInt(replica))); err != nil {
for _, rd := range desc.Replicas().Learners() {
learnerReplicas = append(learnerReplicas, int(rd.StoreID))
}
sort.Ints(voterReplicas)
sort.Ints(learnerReplicas)
votersArr := tree.NewDArray(types.Int)
for _, replica := range voterReplicas {
if err := votersArr.Append(tree.NewDInt(tree.DInt(replica))); err != nil {
return nil, err
}
}
learnersArr := tree.NewDArray(types.Int)
for _, replica := range learnerReplicas {
if err := learnersArr.Append(tree.NewDInt(tree.DInt(replica))); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -1808,7 +1821,8 @@ CREATE TABLE crdb_internal.ranges_no_leases (
tree.NewDString(dbName),
tree.NewDString(tableName),
tree.NewDString(indexName),
arr,
votersArr,
learnersArr,
splitEnforcedUntil,
}, nil
}, nil
Expand Down
8 changes: 6 additions & 2 deletions pkg/sql/distsqlplan/replicaoracle/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,11 +268,15 @@ func (o *binPackingOracle) ChoosePreferredReplica(
// is available in gossip. If no nodes are available, a RangeUnavailableError is
// returned.
func replicaSliceOrErr(desc roachpb.RangeDescriptor, gsp *gossip.Gossip) (kv.ReplicaSlice, error) {
replicas := kv.NewReplicaSlice(gsp, &desc)
// Learner replicas won't serve reads/writes, so send only to the `Voters`
// replicas. This is just an optimization to save a network hop, everything
// would still work if we had `All` here.
voterReplicas := desc.Replicas().Voters()
replicas := kv.NewReplicaSlice(gsp, voterReplicas)
if len(replicas) == 0 {
// We couldn't get node descriptors for any replicas.
var nodeIDs []roachpb.NodeID
for _, r := range desc.Replicas().Unwrap() {
for _, r := range voterReplicas {
nodeIDs = append(nodeIDs, r.NodeID)
}
return kv.ReplicaSlice{}, sqlbase.NewRangeUnavailableError(
Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/logictest/testdata/logic_test/crdb_internal
Original file line number Diff line number Diff line change
Expand Up @@ -207,15 +207,15 @@ SELECT * FROM crdb_internal.zones WHERE false
----
zone_id zone_name cli_specifier config_yaml config_sql config_protobuf

query ITTTTTTTTTT colnames
query ITTTTTTTTTTT colnames
SELECT * FROM crdb_internal.ranges WHERE range_id < 0
----
range_id start_key start_pretty end_key end_pretty database_name table_name index_name replicas split_enforced_until lease_holder
range_id start_key start_pretty end_key end_pretty database_name table_name index_name replicas learner_replicas split_enforced_until lease_holder

query ITTTTTTTTT colnames
query ITTTTTTTTTT colnames
SELECT * FROM crdb_internal.ranges_no_leases WHERE range_id < 0
----
range_id start_key start_pretty end_key end_pretty database_name table_name index_name replicas split_enforced_until
range_id start_key start_pretty end_key end_pretty database_name table_name index_name replicas learner_replicas split_enforced_until

statement ok
INSERT INTO system.zones (id, config) VALUES
Expand Down
Loading

0 comments on commit 1db1dd6

Please sign in to comment.