Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: flow control throttling replica operations #15802

Merged
merged 2 commits into from
May 27, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 117 additions & 0 deletions pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1678,6 +1678,123 @@ func TestReplicateRemoveAndAdd(t *testing.T) {
testReplicaAddRemove(t, false)
}

// TestQuotaPool verifies that writes get throttled in the case where we have
// two fast moving replicas with sufficiently fast growing raft logs and a
// slower replica catching up. By throttling write throughput we avoid having
// to constantly catch up the slower node via snapshots. See #8659.
func TestQuotaPool(t *testing.T) {
defer leaktest.AfterTest(t)()

const quota = 10000
const numReplicas = 3
const rangeID = 1
sc := storage.TestStoreConfig(nil)
// Suppress timeout-based elections to avoid leadership changes in ways
// this test doesn't expect.
sc.RaftElectionTimeoutTicks = 100000
mtc := &multiTestContext{storeConfig: &sc}
mtc.Start(t, numReplicas)
defer mtc.Stop()

mtc.replicateRange(rangeID, 1, 2)

assertEqualLastIndex := func() error {
var expectedIndex uint64

for i, s := range mtc.stores {
repl, err := s.GetReplica(rangeID)
if err != nil {
t.Fatal(err)
}

index, err := repl.GetLastIndex()
if err != nil {
t.Fatal(err)
}
if i == 0 {
expectedIndex = index
} else if expectedIndex != index {
return fmt.Errorf("%s: expected lastIndex %d, but found %d", repl, expectedIndex, index)
}
}
return nil
}
testutils.SucceedsSoon(t, assertEqualLastIndex)

leaderRepl := mtc.getRaftLeader(rangeID)
leaderRepl.InitQuotaPool(quota)
followerRepl := func() *storage.Replica {
for _, store := range mtc.stores {
repl, err := store.GetReplica(rangeID)
if err != nil {
t.Fatal(err)
}
if repl == leaderRepl {
continue
}
return repl
}
return nil
}()
if followerRepl == nil {
t.Fatal("could not get a handle on a follower replica")
}

// We block the third replica effectively causing acquisition of quota
// without subsequent release.
//
// NB: See TestRaftBlockedReplica/#9914 for why we use a separate
// goroutine.
var wg sync.WaitGroup
wg.Add(1)
go func() {
followerRepl.RaftLock()
wg.Done()
}()
wg.Wait()

// In order to verify write throttling we insert a value 3/4th the size of
// total quota available in the system. This should effectively go through
// and block the subsequent insert of the same size. We check to see whether
// or not after this write has gone through by verifying that the total
// quota available has decreased as expected.
//
// Following this we unblock the 'slow' replica allowing it to catch up to
// the first write. This in turn releases quota back to the pool and the
// second write, previously blocked by virtue of there not being enough
// quota, is now free to proceed. We expect the final quota in the system
// to be the same as what we started with.
key := roachpb.Key("k")
value := bytes.Repeat([]byte("v"), (3*quota)/4)
_, pErr := client.SendWrapped(context.Background(), leaderRepl, putArgs(key, value))
if pErr != nil {
t.Fatal(pErr)
}

if curQuota := leaderRepl.QuotaAvailable(); curQuota > quota/4 {
t.Fatalf("didn't observe the expected quota acquisition, available: %d", curQuota)
}

ch := make(chan *roachpb.Error, 1)
go func() {
_, pErr := client.SendWrapped(context.Background(), leaderRepl, putArgs(key, value))
ch <- pErr
}()

followerRepl.RaftUnlock()

testutils.SucceedsSoonDepth(1, t, func() error {
if curQuota := leaderRepl.QuotaAvailable(); curQuota != quota {
return errors.Errorf("expected available quota %d, got %d", quota, curQuota)
}
return nil
})

if pErr := <-ch; pErr != nil {
t.Fatal(pErr)
}
}

// TestRaftHeartbeats verifies that coalesced heartbeats are correctly
// suppressing elections in an idle cluster.
func TestRaftHeartbeats(t *testing.T) {
Expand Down
32 changes: 18 additions & 14 deletions pkg/storage/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,10 @@ type multiTestContext struct {
clock *hlc.Clock
rpcContext *rpc.Context

nodeIDtoAddr map[roachpb.NodeID]net.Addr
nodeIDtoAddrMu struct {
*syncutil.RWMutex
nodeIDtoAddr map[roachpb.NodeID]net.Addr
}

transport *storage.RaftTransport

Expand Down Expand Up @@ -223,9 +226,9 @@ type multiTestContext struct {
}

func (m *multiTestContext) getNodeIDAddress(nodeID roachpb.NodeID) (net.Addr, error) {
m.mu.RLock()
addr, ok := m.nodeIDtoAddr[nodeID]
m.mu.RUnlock()
m.nodeIDtoAddrMu.RLock()
addr, ok := m.nodeIDtoAddrMu.nodeIDtoAddr[nodeID]
m.nodeIDtoAddrMu.RUnlock()
if ok {
return addr, nil
}
Expand All @@ -249,6 +252,7 @@ func (m *multiTestContext) Start(t *testing.T, numStores int) {
}
m.t = t

m.nodeIDtoAddrMu.RWMutex = &syncutil.RWMutex{}
m.mu = &syncutil.RWMutex{}
m.stores = make([]*storage.Store, numStores)
m.storePools = make([]*storage.StorePool, numStores)
Expand Down Expand Up @@ -705,9 +709,9 @@ func (m *multiTestContext) addStore(idx int) {
// previous stores as resolvers as doing so can cause delays in bringing the
// gossip network up.
resolvers := func() []resolver.Resolver {
m.mu.Lock()
defer m.mu.Unlock()
addr := m.nodeIDtoAddr[1]
m.nodeIDtoAddrMu.Lock()
defer m.nodeIDtoAddrMu.Unlock()
addr := m.nodeIDtoAddrMu.nodeIDtoAddr[1]
if addr == nil {
return nil
}
Expand Down Expand Up @@ -762,15 +766,15 @@ func (m *multiTestContext) addStore(idx int) {
if err != nil {
m.t.Fatal(err)
}
m.mu.Lock()
if m.nodeIDtoAddr == nil {
m.nodeIDtoAddr = make(map[roachpb.NodeID]net.Addr)
m.nodeIDtoAddrMu.Lock()
if m.nodeIDtoAddrMu.nodeIDtoAddr == nil {
m.nodeIDtoAddrMu.nodeIDtoAddr = make(map[roachpb.NodeID]net.Addr)
}
_, ok := m.nodeIDtoAddr[nodeID]
_, ok := m.nodeIDtoAddrMu.nodeIDtoAddr[nodeID]
if !ok {
m.nodeIDtoAddr[nodeID] = ln.Addr()
m.nodeIDtoAddrMu.nodeIDtoAddr[nodeID] = ln.Addr()
}
m.mu.Unlock()
m.nodeIDtoAddrMu.Unlock()
if ok {
m.t.Fatalf("node %d already listening", nodeID)
}
Expand Down Expand Up @@ -827,7 +831,7 @@ func (m *multiTestContext) addStore(idx int) {
}

func (m *multiTestContext) nodeDesc(nodeID roachpb.NodeID) *roachpb.NodeDescriptor {
addr := m.nodeIDtoAddr[nodeID]
addr := m.nodeIDtoAddrMu.nodeIDtoAddr[nodeID]
return &roachpb.NodeDescriptor{
NodeID: nodeID,
Address: util.MakeUnresolvedAddr(addr.Network(), addr.String()),
Expand Down
26 changes: 26 additions & 0 deletions pkg/storage/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

Expand Down Expand Up @@ -254,6 +255,31 @@ func (r *Replica) GetLease() (roachpb.Lease, *roachpb.Lease) {
return r.getLease()
}

// SetQuotaPool allows the caller to set a replica's quota pool initialized to
// a given quota. Additionally it initializes the replica's quota release queue
// and its command sizes map. Only safe to call on the replica that is both
// lease holder and raft leader.
func (r *Replica) InitQuotaPool(quota int64) {
r.mu.Lock()
defer r.mu.Unlock()

r.mu.proposalQuotaBaseIndex = r.mu.lastIndex
if r.mu.proposalQuota != nil {
r.mu.proposalQuota.close()
}
r.mu.proposalQuota = newQuotaPool(quota)
r.mu.quotaReleaseQueue = nil
r.mu.commandSizes = make(map[storagebase.CmdIDKey]int)
}

// QuotaAvailable returns the quota available in the replica's quota pool. Only
// safe to call on the replica that is both lease holder and raft leader.
func (r *Replica) QuotaAvailable() int64 {
r.mu.Lock()
defer r.mu.Unlock()
return r.mu.proposalQuota.approximateQuota()
}

// GetTimestampCacheLowWater returns the timestamp cache low water mark.
func (r *Replica) GetTimestampCacheLowWater() hlc.Timestamp {
r.store.tsCacheMu.Lock()
Expand Down
Loading