Skip to content

Commit

Permalink
Merge pull request #15802 from irfansharif/proposal-quota
Browse files Browse the repository at this point in the history
storage: flow control throttling replica operations
  • Loading branch information
irfansharif authored May 27, 2017
2 parents 53ac9de + 1b4ebeb commit db8da7c
Show file tree
Hide file tree
Showing 14 changed files with 1,160 additions and 92 deletions.
117 changes: 117 additions & 0 deletions pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1678,6 +1678,123 @@ func TestReplicateRemoveAndAdd(t *testing.T) {
testReplicaAddRemove(t, false)
}

// TestQuotaPool verifies that writes get throttled in the case where we have
// two fast moving replicas with sufficiently fast growing raft logs and a
// slower replica catching up. By throttling write throughput we avoid having
// to constantly catch up the slower node via snapshots. See #8659.
func TestQuotaPool(t *testing.T) {
defer leaktest.AfterTest(t)()

const quota = 10000
const numReplicas = 3
const rangeID = 1
sc := storage.TestStoreConfig(nil)
// Suppress timeout-based elections to avoid leadership changes in ways
// this test doesn't expect.
sc.RaftElectionTimeoutTicks = 100000
mtc := &multiTestContext{storeConfig: &sc}
mtc.Start(t, numReplicas)
defer mtc.Stop()

mtc.replicateRange(rangeID, 1, 2)

assertEqualLastIndex := func() error {
var expectedIndex uint64

for i, s := range mtc.stores {
repl, err := s.GetReplica(rangeID)
if err != nil {
t.Fatal(err)
}

index, err := repl.GetLastIndex()
if err != nil {
t.Fatal(err)
}
if i == 0 {
expectedIndex = index
} else if expectedIndex != index {
return fmt.Errorf("%s: expected lastIndex %d, but found %d", repl, expectedIndex, index)
}
}
return nil
}
testutils.SucceedsSoon(t, assertEqualLastIndex)

leaderRepl := mtc.getRaftLeader(rangeID)
leaderRepl.InitQuotaPool(quota)
followerRepl := func() *storage.Replica {
for _, store := range mtc.stores {
repl, err := store.GetReplica(rangeID)
if err != nil {
t.Fatal(err)
}
if repl == leaderRepl {
continue
}
return repl
}
return nil
}()
if followerRepl == nil {
t.Fatal("could not get a handle on a follower replica")
}

// We block the third replica effectively causing acquisition of quota
// without subsequent release.
//
// NB: See TestRaftBlockedReplica/#9914 for why we use a separate
// goroutine.
var wg sync.WaitGroup
wg.Add(1)
go func() {
followerRepl.RaftLock()
wg.Done()
}()
wg.Wait()

// In order to verify write throttling we insert a value 3/4th the size of
// total quota available in the system. This should effectively go through
// and block the subsequent insert of the same size. We check to see whether
// or not after this write has gone through by verifying that the total
// quota available has decreased as expected.
//
// Following this we unblock the 'slow' replica allowing it to catch up to
// the first write. This in turn releases quota back to the pool and the
// second write, previously blocked by virtue of there not being enough
// quota, is now free to proceed. We expect the final quota in the system
// to be the same as what we started with.
key := roachpb.Key("k")
value := bytes.Repeat([]byte("v"), (3*quota)/4)
_, pErr := client.SendWrapped(context.Background(), leaderRepl, putArgs(key, value))
if pErr != nil {
t.Fatal(pErr)
}

if curQuota := leaderRepl.QuotaAvailable(); curQuota > quota/4 {
t.Fatalf("didn't observe the expected quota acquisition, available: %d", curQuota)
}

ch := make(chan *roachpb.Error, 1)
go func() {
_, pErr := client.SendWrapped(context.Background(), leaderRepl, putArgs(key, value))
ch <- pErr
}()

followerRepl.RaftUnlock()

testutils.SucceedsSoonDepth(1, t, func() error {
if curQuota := leaderRepl.QuotaAvailable(); curQuota != quota {
return errors.Errorf("expected available quota %d, got %d", quota, curQuota)
}
return nil
})

if pErr := <-ch; pErr != nil {
t.Fatal(pErr)
}
}

// TestRaftHeartbeats verifies that coalesced heartbeats are correctly
// suppressing elections in an idle cluster.
func TestRaftHeartbeats(t *testing.T) {
Expand Down
32 changes: 18 additions & 14 deletions pkg/storage/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,10 @@ type multiTestContext struct {
clock *hlc.Clock
rpcContext *rpc.Context

nodeIDtoAddr map[roachpb.NodeID]net.Addr
nodeIDtoAddrMu struct {
*syncutil.RWMutex
nodeIDtoAddr map[roachpb.NodeID]net.Addr
}

transport *storage.RaftTransport

Expand Down Expand Up @@ -223,9 +226,9 @@ type multiTestContext struct {
}

func (m *multiTestContext) getNodeIDAddress(nodeID roachpb.NodeID) (net.Addr, error) {
m.mu.RLock()
addr, ok := m.nodeIDtoAddr[nodeID]
m.mu.RUnlock()
m.nodeIDtoAddrMu.RLock()
addr, ok := m.nodeIDtoAddrMu.nodeIDtoAddr[nodeID]
m.nodeIDtoAddrMu.RUnlock()
if ok {
return addr, nil
}
Expand All @@ -249,6 +252,7 @@ func (m *multiTestContext) Start(t *testing.T, numStores int) {
}
m.t = t

m.nodeIDtoAddrMu.RWMutex = &syncutil.RWMutex{}
m.mu = &syncutil.RWMutex{}
m.stores = make([]*storage.Store, numStores)
m.storePools = make([]*storage.StorePool, numStores)
Expand Down Expand Up @@ -705,9 +709,9 @@ func (m *multiTestContext) addStore(idx int) {
// previous stores as resolvers as doing so can cause delays in bringing the
// gossip network up.
resolvers := func() []resolver.Resolver {
m.mu.Lock()
defer m.mu.Unlock()
addr := m.nodeIDtoAddr[1]
m.nodeIDtoAddrMu.Lock()
defer m.nodeIDtoAddrMu.Unlock()
addr := m.nodeIDtoAddrMu.nodeIDtoAddr[1]
if addr == nil {
return nil
}
Expand Down Expand Up @@ -762,15 +766,15 @@ func (m *multiTestContext) addStore(idx int) {
if err != nil {
m.t.Fatal(err)
}
m.mu.Lock()
if m.nodeIDtoAddr == nil {
m.nodeIDtoAddr = make(map[roachpb.NodeID]net.Addr)
m.nodeIDtoAddrMu.Lock()
if m.nodeIDtoAddrMu.nodeIDtoAddr == nil {
m.nodeIDtoAddrMu.nodeIDtoAddr = make(map[roachpb.NodeID]net.Addr)
}
_, ok := m.nodeIDtoAddr[nodeID]
_, ok := m.nodeIDtoAddrMu.nodeIDtoAddr[nodeID]
if !ok {
m.nodeIDtoAddr[nodeID] = ln.Addr()
m.nodeIDtoAddrMu.nodeIDtoAddr[nodeID] = ln.Addr()
}
m.mu.Unlock()
m.nodeIDtoAddrMu.Unlock()
if ok {
m.t.Fatalf("node %d already listening", nodeID)
}
Expand Down Expand Up @@ -827,7 +831,7 @@ func (m *multiTestContext) addStore(idx int) {
}

func (m *multiTestContext) nodeDesc(nodeID roachpb.NodeID) *roachpb.NodeDescriptor {
addr := m.nodeIDtoAddr[nodeID]
addr := m.nodeIDtoAddrMu.nodeIDtoAddr[nodeID]
return &roachpb.NodeDescriptor{
NodeID: nodeID,
Address: util.MakeUnresolvedAddr(addr.Network(), addr.String()),
Expand Down
26 changes: 26 additions & 0 deletions pkg/storage/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

Expand Down Expand Up @@ -254,6 +255,31 @@ func (r *Replica) GetLease() (roachpb.Lease, *roachpb.Lease) {
return r.getLease()
}

// SetQuotaPool allows the caller to set a replica's quota pool initialized to
// a given quota. Additionally it initializes the replica's quota release queue
// and its command sizes map. Only safe to call on the replica that is both
// lease holder and raft leader.
func (r *Replica) InitQuotaPool(quota int64) {
r.mu.Lock()
defer r.mu.Unlock()

r.mu.proposalQuotaBaseIndex = r.mu.lastIndex
if r.mu.proposalQuota != nil {
r.mu.proposalQuota.close()
}
r.mu.proposalQuota = newQuotaPool(quota)
r.mu.quotaReleaseQueue = nil
r.mu.commandSizes = make(map[storagebase.CmdIDKey]int)
}

// QuotaAvailable returns the quota available in the replica's quota pool. Only
// safe to call on the replica that is both lease holder and raft leader.
func (r *Replica) QuotaAvailable() int64 {
r.mu.Lock()
defer r.mu.Unlock()
return r.mu.proposalQuota.approximateQuota()
}

// GetTimestampCacheLowWater returns the timestamp cache low water mark.
func (r *Replica) GetTimestampCacheLowWater() hlc.Timestamp {
r.store.tsCacheMu.Lock()
Expand Down
Loading

0 comments on commit db8da7c

Please sign in to comment.