diff --git a/pkg/storage/client_raft_test.go b/pkg/storage/client_raft_test.go index fb1d9f09e665..68c60fa53e87 100644 --- a/pkg/storage/client_raft_test.go +++ b/pkg/storage/client_raft_test.go @@ -1678,6 +1678,115 @@ func TestReplicateRemoveAndAdd(t *testing.T) { testReplicaAddRemove(t, false) } +// TestQuotaPool verifies that writes get throttled in the case where we have +// two fast moving replicas with sufficiently fast growing raft logs and a +// slower replica catching up. By throttling write throughput we avoid having +// to constantly catch up the slower node via snapshots. See #8659. +func TestQuotaPool(t *testing.T) { + defer leaktest.AfterTest(t)() + + sc := storage.TestStoreConfig(nil) + // Suppress timeout-based elections to avoid leadership changes in ways + // this test doesn't expect. + sc.RaftElectionTimeoutTicks = 100000 + mtc := &multiTestContext{storeConfig: &sc} + mtc.Start(t, 3) + defer mtc.Stop() + + const rangeID = 1 + const quota = 100 + mtc.replicateRange(rangeID, 1, 2) + + // Log truncation requests generate raft log entries and consequently acquire + // quota. To deterministically simulate a fixed number of quota + // acquisitions we deactivate the raft log queue on each replica. + for _, store := range mtc.stores { + store.SetRaftLogQueueActive(false) + } + + // Heartbeats (for node liveness) generate raft log entries and + // consequently acquire quota. To deterministically simulate a fixed number + // of quota acquisitions we pause heartbeats on each replica. + for _, nl := range mtc.nodeLivenesses { + nl.PauseHeartbeat(true) + } + + leaderRepl, err := mtc.stores[0].GetReplica(rangeID) + if err != nil { + t.Fatal(err) + } + + leaderDesc, err := leaderRepl.GetReplicaDescriptor() + if err != nil { + t.Fatal(err) + } + + if status := leaderRepl.RaftStatus(); status == nil || status.Lead != uint64(leaderDesc.ReplicaID) { + t.Fatalf("raft leader should be %d, bug got status %+v", leaderDesc.ReplicaID, status) + } + + leaderRepl.SetQuotaPool(quota) + + followerRepl, err := mtc.stores[2].GetReplica(rangeID) + if err != nil { + t.Fatal(err) + } + + // NB: See TestRaftBlockedReplica/#9914 for why we use a separate goroutine. + // We block the third replica. + var wg sync.WaitGroup + wg.Add(1) + go func() { + followerRepl.RaftLock() + wg.Done() + }() + wg.Wait() + + // We can write up to 'quota' number of keys before writes get throttled. + // We verify this by writing this many keys and ensuring the next write is + // blocked. + // NB: This can block if some other moving part of the system gets a + // proposal in. At the time of writing the only moving parts are the node + // liveness heartbeats and raft log truncations, both of which are disabled + // for the purposes of this test. + // TODO(irfansharif): Once we move to quota acquisitions based on the size + // (in bytes) of the generated raft log entry this will have to be + // revisited. + incArgs := incrementArgs([]byte("k"), 1) + for i := 0; i < quota; i++ { + if _, err := client.SendWrapped(context.Background(), leaderRepl, incArgs); err != nil { + t.Fatal(err) + } + } + + ch := make(chan error, 1) + go func() { + defer close(ch) + if _, err := client.SendWrapped(context.Background(), leaderRepl, incArgs); err != nil { + ch <- errors.New("write not throttled by the quota pool") + } + ch <- errors.New("write not throttled by the quota pool") + }() + + select { + case err := <-ch: + t.Fatal(err) + case <-time.After(15 * time.Millisecond): + } + + mtc.waitForValues(roachpb.Key("k"), []int64{quota, quota, 0}) + + followerRepl.RaftUnlock() + + mtc.waitForValues(roachpb.Key("k"), []int64{quota + 1, quota + 1, quota + 1}) + + select { + case <-ch: + default: + t.Fatal(errors.New("throttled write not unblocked")) + } +} + // TestRaftHeartbeats verifies that coalesced heartbeats are correctly // suppressing elections in an idle cluster. func TestRaftHeartbeats(t *testing.T) { diff --git a/pkg/storage/helpers_test.go b/pkg/storage/helpers_test.go index 6a0dcdefb266..e4188cef9d1e 100644 --- a/pkg/storage/helpers_test.go +++ b/pkg/storage/helpers_test.go @@ -254,6 +254,16 @@ func (r *Replica) GetLease() (*roachpb.Lease, *roachpb.Lease) { return r.getLease() } +// SetQuotaPool allows the caller to set a replica's quota pool initialized to +// a given quota. Only safe to call on the leader replica. +func (r *Replica) SetQuotaPool(quota int64) { + r.mu.Lock() + defer r.mu.Unlock() + + r.mu.proposalQuotaBaseIndex = r.mu.lastIndex + r.mu.proposalQuota = newQuotaPool(quota) +} + // GetTimestampCacheLowWater returns the timestamp cache low water mark. func (r *Replica) GetTimestampCacheLowWater() hlc.Timestamp { r.store.tsCacheMu.Lock() diff --git a/pkg/storage/quota_pool.go b/pkg/storage/quota_pool.go new file mode 100644 index 000000000000..46dc7fdad353 --- /dev/null +++ b/pkg/storage/quota_pool.go @@ -0,0 +1,131 @@ +// Copyright 2017 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. +// +// Author: Irfan Sharif (irfansharif@cockroachlabs.com) +// +// The code below is a simplified version of a similar structure found in +// grpc-go (github.com/grpc/grpc-go/blob/b2fae0c/transport/control.go). + +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +package storage + +import ( + "errors" + + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "golang.org/x/net/context" +) + +const ( + // TODO(peter): This setting needs additional thought. Should it be adjusted + // dynamically? + defaultProposalQuota = 1000 +) + +type quotaPool struct { + syncutil.Mutex + + // We use a channel to 'park' our quota value for easier composition with + // context cancellation and leadership changes (see quotaPool.acquire). + // NB: A value of '0' is never allowed to be parked in the + // channel, the lack of quota is represented by an empty channel. Quota + // additions push a value into the channel whereas acquisitions wait on the + // channel itself. + quota chan int64 + done chan struct{} +} + +// newQuotaPool returns a new quota pool initialized with a given quota, +// newQuotaPool(0) disallowed. +func newQuotaPool(q int64) *quotaPool { + qp := "aPool{ + quota: make(chan int64, 1), + } + qp.quota <- q + return qp +} + +// add adds the specified quota back to the pool. Safe for concurrent use. +func (qp *quotaPool) add(v int64) { + if v == 0 { + return + } + + qp.Lock() + select { + case q := <-qp.quota: + v += q + default: + } + qp.quota <- v + qp.Unlock() +} + +// acquire acquires a single unit of quota from the pool. On success, nil is +// returned and the caller must call add(1) or otherwise arrange for the quota +// to be returned to the pool. Safe for concurrent use. +func (qp *quotaPool) acquire(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + case q := <-qp.quota: + if q > 1 { + qp.add(q - 1) + } + return nil + case <-qp.done: + return errors.New("raft leadership changed, quota pool no longer in use") + } +} + +func (qp *quotaPool) close() { + qp.Lock() + if qp.done != nil { + close(qp.done) + } + qp.done = nil + qp.Unlock() +} diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 6e80e2163962..38836306ea48 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -375,6 +375,9 @@ type Replica struct { // Computed checksum at a snapshot UUID. checksums map[uuid.UUID]replicaChecksum + proposalQuota *quotaPool + proposalQuotaBaseIndex uint64 + // Counts calls to Replica.tick() ticks int @@ -784,6 +787,91 @@ func (r *Replica) setEstimatedCommitIndexLocked(commit uint64) { } } +func (r *Replica) maybeAcquireProposalQuota(ctx context.Context) error { + r.mu.RLock() + quota := r.mu.proposalQuota + r.mu.RUnlock() + // Quota acquisition only takes place on the leader node, + // r.mu.proposalQuota is set to nil if a node is a follower (see + // updateProposalQuotaLocked). + if quota == nil { + return nil + } + return quota.acquire(ctx) +} + +func (r *Replica) updateProposalQuotaLocked(ctx context.Context, newLeaderID roachpb.ReplicaID) { + if r.mu.leaderID != newLeaderID { + if r.mu.replicaID == newLeaderID { + // We're becoming the leader. + r.mu.proposalQuotaBaseIndex = r.mu.lastIndex + + // Raft may propose commands itself (specifically the empty commands when + // leadership changes), and these commands don't go through the code paths + // where we acquire quota from the pool. To offset this we reset + // the quota pool whenever leadership changes hands. + r.mu.proposalQuota = newQuotaPool(defaultProposalQuota) + } else { + // We're either becoming a follower or simply observing a + // leadership change. + if r.mu.proposalQuota != nil { + r.mu.proposalQuota.close() + } + r.mu.proposalQuota = nil + } + return + } else if r.mu.proposalQuota == nil { + // We're a follower. + return + } + // We're still the leader. + + // TODO(peter): Can we avoid retrieving the Raft status on every invocation + // in order to avoid the associated allocation? Tracking the progress + // ourselves via looking at MsgAppResp messages would be overkill. Perhaps + // another accessor on RawNode. + status := r.raftStatusRLocked() + // Find the minimum index that active followers have acknowledged. + minIndex := status.Commit + + for _, rep := range r.mu.state.Desc.Replicas { + // Only consider followers that have "healthy" RPC connections. We don't + // use node liveness here as doing so could lead to deadlock unless we + // avoided enforcing proposal quota for node liveness ranges. + if r.store.cfg.Transport.resolver != nil { + addr, err := r.store.cfg.Transport.resolver(rep.NodeID) + if err != nil { + continue + } + if err := r.store.cfg.Transport.rpcContext.ConnHealth(addr.String()); err != nil { + continue + } + } + if progress, ok := status.Progress[uint64(rep.ReplicaID)]; ok { + // Only consider followers who are in advance of the quota base + // index. This prevents a follower from coming back online and preventing + // throughput to the range until it has caught up. + if progress.Match < r.mu.proposalQuotaBaseIndex { + continue + } + if progress.Match > 0 && progress.Match < minIndex { + minIndex = progress.Match + } + } + } + + if r.mu.proposalQuotaBaseIndex < minIndex { + // We've persisted minIndex - r.mu.proposalQuotaBaseIndex entries to + // the raft log since last we checked, the delta which can be released + // back to the quota pool. + // Entries with an index less than minIndex have been persisted on all + // followers with "healthy" RPC connections. + delta := int64(minIndex - r.mu.proposalQuotaBaseIndex) + r.mu.proposalQuotaBaseIndex = minIndex + r.mu.proposalQuota.add(delta) + } +} + // getEstimatedBehindCountRLocked returns an estimate of how far this replica is // behind. A return value of 0 indicates that the replica is up to date. func (r *Replica) getEstimatedBehindCountRLocked(raftStatus *raft.Status) int64 { @@ -2225,6 +2313,10 @@ func (r *Replica) tryExecuteWriteBatch( log.Event(ctx, "raft") + if err := r.maybeAcquireProposalQuota(ctx); err != nil { + return nil, roachpb.NewError(err), proposalNoRetry + } + ch, tryAbandon, err := r.propose(ctx, lease, ba, endCmds, spans) if err != nil { return nil, roachpb.NewError(err), proposalNoRetry @@ -2842,6 +2934,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked( r.store.raftEntryCache.addEntries(r.RangeID, rd.Entries) r.mu.lastIndex = lastIndex r.mu.raftLogSize = raftLogSize + r.updateProposalQuotaLocked(ctx, leaderID) r.mu.leaderID = leaderID r.mu.Unlock()