Skip to content

Commit

Permalink
storage: address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
irfansharif committed May 26, 2017
1 parent 40e2753 commit 4aa1893
Show file tree
Hide file tree
Showing 13 changed files with 797 additions and 285 deletions.
99 changes: 60 additions & 39 deletions pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1698,11 +1698,31 @@ func TestQuotaPool(t *testing.T) {

mtc.replicateRange(rangeID, 1, 2)

leaderRepl := mtc.getRaftLeader(rangeID)
leaderRepl.SetQuotaPool(quota)
leaderRepl.InitQuotaReleaseQueue()
leaderRepl.InitCommandSizes()
assertEqualLastIndex := func() error {
var expectedIndex uint64

for i, s := range mtc.stores {
repl, err := s.GetReplica(rangeID)
if err != nil {
t.Fatal(err)
}

index, err := repl.GetLastIndex()
if err != nil {
t.Fatal(err)
}
if i == 0 {
expectedIndex = index
} else if expectedIndex != index {
return fmt.Errorf("%s: expected lastIndex %d, but found %d", repl, expectedIndex, index)
}
}
return nil
}
testutils.SucceedsSoon(t, assertEqualLastIndex)

leaderRepl := mtc.getRaftLeader(rangeID)
leaderRepl.InitQuotaPool(quota)
followerRepl := func() *storage.Replica {
for _, store := range mtc.stores {
repl, err := store.GetReplica(rangeID)
Expand All @@ -1720,15 +1740,11 @@ func TestQuotaPool(t *testing.T) {
t.Fatal("could not get a handle on a follower replica")
}

followerDesc, err := followerRepl.GetReplicaDescriptor()
if err != nil {
t.Fatal(err)
}

// We block the third replica effectively causing acquisition of quota
// without subsequent release.
//
// NB: See TestRaftBlockedReplica/#9914 for why we use a separate goroutine.
// NB: See TestRaftBlockedReplica/#9914 for why we use a separate
// goroutine.
var wg sync.WaitGroup
wg.Add(1)
go func() {
Expand All @@ -1737,41 +1753,46 @@ func TestQuotaPool(t *testing.T) {
}()
wg.Wait()

// We keep writing to the same key, generating raft log entries, until we
// detect that we're being throttled.
// In order to verify write throttling we insert a value 3/4th the size of
// total quota available in the system. This should effectively go through
// and block the subsequent insert of the same size. We check to see whether
// or not after this write has gone through by verifying that the total
// quota available has decreased as expected.
//
// NB: Quota acquisition is based on the size (in bytes) of the
// log entry which may very well change in the future, which is why we wait
// until quota is depleted as opposed to writing a fixed number of entries.
// Additionally there are other moving parts of the system that can get proposals in (e.g.
// node liveness heartbeats and raft log truncations).
var count int64
incArgs := incrementArgs([]byte("k"), 1)
ch := make(chan *roachpb.Error, 1)
for {
go func() {
_, pErr := client.SendWrapped(context.Background(), leaderRepl, incArgs)
ch <- pErr
}()
select {
case pErr := <-ch:
if pErr != nil {
t.Fatal(pErr)
}
count += 1
continue
case <-time.After(50 * time.Millisecond):
}
break
// Following this we unblock the 'slow' replica allowing it to catch up to
// the first write. This in turn releases quota back to the pool and the
// second write, previously blocked by virtue of there not being enough
// quota, is now free to proceed. We expect the final quota in the system
// to be the same as what we started with.
key := roachpb.Key("k")
value := bytes.Repeat([]byte("v"), (3*quota)/4)
_, pErr := client.SendWrapped(context.Background(), leaderRepl, putArgs(key, value))
if pErr != nil {
t.Fatal(pErr)
}

expected := []int64{count, count, count}
expected[followerDesc.ReplicaID-1] = 0
mtc.waitForValues(roachpb.Key("k"), expected)
if curQuota := leaderRepl.QuotaAvailable(); curQuota > quota/4 {
t.Fatalf("didn't observe the expected quota acquisition, available: %d", curQuota)
}

ch := make(chan *roachpb.Error, 1)
go func() {
_, pErr := client.SendWrapped(context.Background(), leaderRepl, putArgs(key, value))
ch <- pErr
}()

followerRepl.RaftUnlock()

mtc.waitForValues(roachpb.Key("k"), []int64{count + 1, count + 1, count + 1})
testutils.SucceedsSoonDepth(1, t, func() error {
if curQuota := leaderRepl.QuotaAvailable(); curQuota != quota {
return errors.Errorf("expected available quota %d, got %d", quota, curQuota)
}
return nil
})

if pErr := <-ch; pErr != nil {
t.Fatal(pErr)
}
}

// TestRaftHeartbeats verifies that coalesced heartbeats are correctly
Expand Down
22 changes: 12 additions & 10 deletions pkg/storage/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,10 @@ type multiTestContext struct {
clock *hlc.Clock
rpcContext *rpc.Context

nodeIDtoAddrMu *syncutil.RWMutex
nodeIDtoAddr map[roachpb.NodeID]net.Addr
nodeIDtoAddrMu struct {
*syncutil.RWMutex
nodeIDtoAddr map[roachpb.NodeID]net.Addr
}

transport *storage.RaftTransport

Expand Down Expand Up @@ -225,7 +227,7 @@ type multiTestContext struct {

func (m *multiTestContext) getNodeIDAddress(nodeID roachpb.NodeID) (net.Addr, error) {
m.nodeIDtoAddrMu.RLock()
addr, ok := m.nodeIDtoAddr[nodeID]
addr, ok := m.nodeIDtoAddrMu.nodeIDtoAddr[nodeID]
m.nodeIDtoAddrMu.RUnlock()
if ok {
return addr, nil
Expand All @@ -250,7 +252,7 @@ func (m *multiTestContext) Start(t *testing.T, numStores int) {
}
m.t = t

m.nodeIDtoAddrMu = &syncutil.RWMutex{}
m.nodeIDtoAddrMu.RWMutex = &syncutil.RWMutex{}
m.mu = &syncutil.RWMutex{}
m.stores = make([]*storage.Store, numStores)
m.storePools = make([]*storage.StorePool, numStores)
Expand Down Expand Up @@ -709,7 +711,7 @@ func (m *multiTestContext) addStore(idx int) {
resolvers := func() []resolver.Resolver {
m.nodeIDtoAddrMu.Lock()
defer m.nodeIDtoAddrMu.Unlock()
addr := m.nodeIDtoAddr[1]
addr := m.nodeIDtoAddrMu.nodeIDtoAddr[1]
if addr == nil {
return nil
}
Expand Down Expand Up @@ -765,12 +767,12 @@ func (m *multiTestContext) addStore(idx int) {
m.t.Fatal(err)
}
m.nodeIDtoAddrMu.Lock()
if m.nodeIDtoAddr == nil {
m.nodeIDtoAddr = make(map[roachpb.NodeID]net.Addr)
if m.nodeIDtoAddrMu.nodeIDtoAddr == nil {
m.nodeIDtoAddrMu.nodeIDtoAddr = make(map[roachpb.NodeID]net.Addr)
}
_, ok := m.nodeIDtoAddr[nodeID]
_, ok := m.nodeIDtoAddrMu.nodeIDtoAddr[nodeID]
if !ok {
m.nodeIDtoAddr[nodeID] = ln.Addr()
m.nodeIDtoAddrMu.nodeIDtoAddr[nodeID] = ln.Addr()
}
m.nodeIDtoAddrMu.Unlock()
if ok {
Expand Down Expand Up @@ -829,7 +831,7 @@ func (m *multiTestContext) addStore(idx int) {
}

func (m *multiTestContext) nodeDesc(nodeID roachpb.NodeID) *roachpb.NodeDescriptor {
addr := m.nodeIDtoAddr[nodeID]
addr := m.nodeIDtoAddrMu.nodeIDtoAddr[nodeID]
return &roachpb.NodeDescriptor{
NodeID: nodeID,
Address: util.MakeUnresolvedAddr(addr.Network(), addr.String()),
Expand Down
26 changes: 10 additions & 16 deletions pkg/storage/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,10 @@ func (r *Replica) GetLease() (roachpb.Lease, *roachpb.Lease) {
}

// SetQuotaPool allows the caller to set a replica's quota pool initialized to
// a given quota. Only safe to call on the leader replica.
func (r *Replica) SetQuotaPool(quota int64) {
// a given quota. Additionally it initializes the replica's quota release queue
// and its command sizes map. Only safe to call on the replica that is both
// lease holder and raft leader.
func (r *Replica) InitQuotaPool(quota int64) {
r.mu.Lock()
defer r.mu.Unlock()

Expand All @@ -266,24 +268,16 @@ func (r *Replica) SetQuotaPool(quota int64) {
r.mu.proposalQuota.close()
}
r.mu.proposalQuota = newQuotaPool(quota)
r.mu.quotaReleaseQueue = nil
r.mu.commandSizes = make(map[storagebase.CmdIDKey]int)
}

// InitQuotaReleaseQueue initializes the replica's quota release queue. Only
// intended to be called on the leader replica.
func (r *Replica) InitQuotaReleaseQueue() {
r.mu.Lock()
defer r.mu.Unlock()

r.mu.quotaReleaseQueue = make([]int, 0)
}

// InitCommandSizes initializes the replica's command sizes map. Only intended
// to be called on the leader replica.
func (r *Replica) InitCommandSizes() {
// QuotaAvailable returns the quota available in the replica's quota pool. Only
// safe to call on the replica that is both leaseholder and raft leader.
func (r *Replica) QuotaAvailable() int64 {
r.mu.Lock()
defer r.mu.Unlock()

r.mu.commandSizes = make(map[storagebase.CmdIDKey]int)
return r.mu.proposalQuota.approximateQuota()
}

// GetTimestampCacheLowWater returns the timestamp cache low water mark.
Expand Down
Loading

0 comments on commit 4aa1893

Please sign in to comment.