Skip to content

Commit

Permalink
Merge pull request #10391 from petermattis/pmattis/fill-reservation
Browse files Browse the repository at this point in the history
storage: fill snapshot reservations in Store.HandleSnapshot
  • Loading branch information
petermattis authored Nov 2, 2016
2 parents b0bb16e + d4826a6 commit 99e14f7
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 106 deletions.
3 changes: 3 additions & 0 deletions pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1990,6 +1990,9 @@ func TestStoreRangeRebalance(t *testing.T) {
generated += m.RangeSnapshotsGenerated.Count()
normalApplied += m.RangeSnapshotsNormalApplied.Count()
preemptiveApplied += m.RangeSnapshotsPreemptiveApplied.Count()
if n := s.ReservationCount(); n != 0 {
t.Fatalf("expected 0 reservations, but found %d", n)
}
}
if generated == 0 {
t.Fatalf("expected at least 1 snapshot, but found 0")
Expand Down
6 changes: 6 additions & 0 deletions pkg/storage/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,12 @@ func (s *Store) ManualReplicaGC(repl *Replica) error {
return s.gcQueue.process(ctx, s.Clock().Now(), repl, cfg)
}

func (s *Store) ReservationCount() int {
s.bookie.mu.Lock()
defer s.bookie.mu.Unlock()
return len(s.bookie.mu.reservationsByRangeID)
}

func (r *Replica) RaftLock() {
r.raftMu.Lock()
}
Expand Down
3 changes: 0 additions & 3 deletions pkg/storage/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,9 +553,6 @@ func (r *Replica) applySnapshot(
) error {
// Extract the updated range descriptor.
desc := inSnap.RangeDescriptor
// Fill the reservation if there was one for this range, regardless of
// whether the application succeeded.
defer r.store.bookie.Fill(desc.RangeID)

r.mu.Lock()
replicaID := r.mu.replicaID
Expand Down
76 changes: 0 additions & 76 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/bufalloc"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -6221,81 +6220,6 @@ func TestCommandTimeThreshold(t *testing.T) {
}
}

// TestReserveAndApplySnapshot checks to see if a snapshot is correctly applied
// and that its reservation is removed.
func TestReserveAndApplySnapshot(t *testing.T) {
defer leaktest.AfterTest(t)()

tsc := TestStoreConfig()
tc := testContext{}
tc.StartWithStoreConfig(t, tsc)
defer tc.Stop()

checkReservations := func(t *testing.T, expected int) {
tc.store.bookie.mu.Lock()
defer tc.store.bookie.mu.Unlock()
if e, a := expected, len(tc.store.bookie.mu.reservationsByRangeID); e != a {
t.Fatalf("wrong number of reservations - expected:%d, actual:%d", e, a)
}
}

key := roachpb.RKey("a")
firstRng := tc.store.LookupReplica(key, nil)
snap, err := firstRng.GetSnapshot(context.Background())
if err != nil {
t.Fatal(err)
}

tc.store.metrics.Available.Update(tc.store.bookie.maxReservedBytes)

// Note that this is an artificial scenario in which we're adding a
// reservation for a replica that is already on the range. This test is
// designed to test the filling of the reservation specifically and in
// normal operation there should not be a reservation for an existing
// replica.
req := ReservationRequest{
StoreRequestHeader: StoreRequestHeader{
StoreID: tc.store.StoreID(),
NodeID: tc.store.nodeDesc.NodeID,
},
RangeID: firstRng.RangeID,
RangeSize: 10,
}

if !tc.store.Reserve(context.Background(), req).Reserved {
t.Fatalf("Can't reserve the replica")
}
checkReservations(t, 1)

b := firstRng.store.Engine().NewBatch()
var alloc bufalloc.ByteAllocator
for ; snap.Iter.Valid(); snap.Iter.Next() {
var key engine.MVCCKey
var value []byte
alloc, key, value = snap.Iter.allocIterKeyValue(alloc)
mvccKey := engine.MVCCKey{
Key: key.Key,
Timestamp: key.Timestamp,
}
if err := b.Put(mvccKey, value); err != nil {
t.Fatal(err)
}
}

// Apply a snapshot and check the reservation was filled. Note that this
// out-of-band application could be a root cause if this test ever crashes.
if err := firstRng.applySnapshot(context.Background(), IncomingSnapshot{
SnapUUID: snap.SnapUUID,
RangeDescriptor: *firstRng.Desc(),
Batches: [][]byte{b.Repr()},
},
snap.RaftSnap, raftpb.HardState{}); err != nil {
t.Fatal(err)
}
firstRng.CloseOutSnap()
checkReservations(t, 0)
}

func TestDeprecatedRequests(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down
42 changes: 21 additions & 21 deletions pkg/storage/reservation.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,12 @@ func (b *bookie) Reserve(
// To update the reservation, fill the original one and add the
// new one.
if log.V(2) {
log.Infof(ctx, "updating existing reservation for rangeID:%d, %+v", req.RangeID,
olderReservation)
log.Infof(ctx, "[r%d], updating existing reservation", req.RangeID)
}
b.fillReservationLocked(olderReservation)
b.fillReservationLocked(ctx, olderReservation)
} else {
if log.V(2) {
log.Infof(ctx, "there is pre-existing reservation %+v, can't update with %+v",
olderReservation, req)
log.Infof(ctx, "[r%d] unable to update due to pre-existing reservation", req.RangeID)
}
return resp
}
Expand All @@ -152,8 +150,8 @@ func (b *bookie) Reserve(
// Do we have too many current reservations?
if len(b.mu.reservationsByRangeID) >= b.maxReservations {
if log.V(1) {
log.Infof(ctx, "could not book reservation %+v, too many reservations already (current:%d, max:%d)",
req, len(b.mu.reservationsByRangeID), b.maxReservations)
log.Infof(ctx, "[r%d] unable to book reservation, too many reservations (current:%d, max:%d)",
req.RangeID, len(b.mu.reservationsByRangeID), b.maxReservations)
}
return resp
}
Expand All @@ -165,17 +163,17 @@ func (b *bookie) Reserve(
available := b.metrics.Available.Value()
if b.mu.size+(req.RangeSize*2) > available {
if log.V(1) {
log.Infof(ctx, "could not book reservation %+v, not enough available disk space (requested:%d*2, reserved:%d, available:%d)",
req, req.RangeSize, b.mu.size, available)
log.Infof(ctx, "[r%d] unable to book reservation, not enough available disk space (requested:%d*2, reserved:%d, available:%d)",
req.RangeID, req.RangeSize, b.mu.size, available)
}
return resp
}

// Do we have enough reserved space free for the reservation?
if b.mu.size+req.RangeSize > b.maxReservedBytes {
if log.V(1) {
log.Infof(ctx, "could not book reservation %+v, not enough available reservation space (requested:%d, reserved:%d, maxReserved:%d)",
req, req.RangeSize, b.mu.size, b.maxReservedBytes)
log.Infof(ctx, "[r%d] unable to book reservation, not enough available reservation space (requested:%d, reserved:%d, maxReserved:%d)",
req.RangeID, req.RangeSize, b.mu.size, b.maxReservedBytes)
}
return resp
}
Expand All @@ -184,8 +182,8 @@ func (b *bookie) Reserve(
for _, rep := range deadReplicas {
if req.RangeID == rep.RangeID {
if log.V(1) {
log.Infof(ctx, "could not book reservation %+v, the replica has been destroyed",
req)
log.Infof(ctx, "[r%d] unable to book reservation, the replica has been destroyed",
req.RangeID)
}
return ReservationResponse{Reserved: false}
}
Expand All @@ -205,7 +203,8 @@ func (b *bookie) Reserve(
b.metrics.Reserved.Inc(req.RangeSize)

if log.V(1) {
log.Infof(ctx, "new reservation added: %+v", newReservation)
log.Infof(ctx, "[r%s] new reservation, size=%d",
newReservation.RangeID, newReservation.RangeSize)
}

resp.Reserved = true
Expand All @@ -214,28 +213,28 @@ func (b *bookie) Reserve(

// Fill removes a reservation. Returns true when the reservation has been
// successfully removed.
func (b *bookie) Fill(rangeID roachpb.RangeID) bool {
func (b *bookie) Fill(ctx context.Context, rangeID roachpb.RangeID) bool {
b.mu.Lock()
defer b.mu.Unlock()

// Lookup the reservation.
res, ok := b.mu.reservationsByRangeID[rangeID]
if !ok {
if log.V(2) {
log.Infof(context.TODO(), "there is no reservation for rangeID:%d", rangeID)
log.Infof(ctx, "[r%d] reservation not found", rangeID)
}
return false
}

b.fillReservationLocked(res)
b.fillReservationLocked(ctx, res)
return true
}

// fillReservationLocked fills a reservation. It requires that the bookie's
// lock is held. This should only be called internally.
func (b *bookie) fillReservationLocked(res *reservation) {
func (b *bookie) fillReservationLocked(ctx context.Context, res *reservation) {
if log.V(2) {
log.Infof(context.TODO(), "filling reservation: %+v", res)
log.Infof(ctx, "[r%d] filling reservation", res.RangeID)
}

// Remove it from reservationsByRangeID. Note that we don't remove it from the
Expand All @@ -255,6 +254,7 @@ func (b *bookie) start(stopper *stop.Stopper) {
stopper.RunWorker(func() {
var timeoutTimer timeutil.Timer
defer timeoutTimer.Stop()
ctx := context.TODO()
for {
var timeout time.Duration
b.mu.Lock()
Expand All @@ -269,9 +269,9 @@ func (b *bookie) start(stopper *stop.Stopper) {
expiredReservation := b.mu.queue.dequeue()
// Is it an active reservation?
if b.mu.reservationsByRangeID[expiredReservation.RangeID] == expiredReservation {
b.fillReservationLocked(expiredReservation)
b.fillReservationLocked(ctx, expiredReservation)
} else if log.V(2) {
log.Infof(context.TODO(), "the reservation for rangeID %d has already been filled.",
log.Infof(ctx, "[r%d] expired reservation has already been filled",
expiredReservation.RangeID)
}
// Set the timeout to 0 to force another peek.
Expand Down
13 changes: 7 additions & 6 deletions pkg/storage/reservation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func TestBookieReserve(t *testing.T) {
{rangeID: 0, reserve: false, expSuc: true, expOut: 0, expBytes: 0, expReservations: 5},
}

ctx := context.Background()
for i, testCase := range testCases {
if testCase.reserve {
// Try to reserve the range.
Expand All @@ -116,7 +117,7 @@ func TestBookieReserve(t *testing.T) {
RangeID: roachpb.RangeID(testCase.rangeID),
RangeSize: int64(testCase.rangeID),
}
if resp := b.Reserve(context.Background(), req, testCase.deadReplicas); resp.Reserved != testCase.expSuc {
if resp := b.Reserve(ctx, req, testCase.deadReplicas); resp.Reserved != testCase.expSuc {
if testCase.expSuc {
t.Errorf("%d: expected a successful reservation, was rejected", i)
} else {
Expand All @@ -125,7 +126,7 @@ func TestBookieReserve(t *testing.T) {
}
} else {
// Fill the reservation.
if filled := b.Fill(roachpb.RangeID(testCase.rangeID)); filled != testCase.expSuc {
if filled := b.Fill(ctx, roachpb.RangeID(testCase.rangeID)); filled != testCase.expSuc {
if testCase.expSuc {
t.Errorf("%d: expected a successful filled reservation, was rejected", i)
} else {
Expand Down Expand Up @@ -313,7 +314,7 @@ func TestReservationQueue(t *testing.T) {
verifyBookie(t, b, 10 /*reservations*/, 10 /*queue*/, 10*bytesPerReservation /*bytes*/)

// Fill reservation 2.
if !b.Fill(2) {
if !b.Fill(context.Background(), 2) {
t.Fatalf("Could not fill reservation 2")
}
// After filling a reservation, wait a full cycle so that it can be timed
Expand All @@ -325,10 +326,10 @@ func TestReservationQueue(t *testing.T) {
verifyBookie(t, b, 8 /*reservations*/, 9 /*queue*/, 8*bytesPerReservation /*bytes*/)

// Fill reservations 4 and 6.
if !b.Fill(4) {
if !b.Fill(context.Background(), 4) {
t.Fatalf("Could not fill reservation 4")
}
if !b.Fill(6) {
if !b.Fill(context.Background(), 6) {
t.Fatalf("Could not fill reservation 6")
}
verifyBookie(t, b, 6 /*reservations*/, 9 /*queue*/, 6*bytesPerReservation /*bytes*/)
Expand Down Expand Up @@ -383,7 +384,7 @@ func TestReservationQueue(t *testing.T) {
verifyBookie(t, b, 8 /*reservations*/, 9 /*queue*/, 8*bytesPerReservation /*bytes*/)

// Fill 1 a second time.
if !b.Fill(1) {
if !b.Fill(context.Background(), 1) {
t.Fatalf("Could not fill reservation 1 (second pass)")
}
verifyBookie(t, b, 7 /*reservations*/, 9 /*queue*/, 7*bytesPerReservation /*bytes*/)
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2505,6 +2505,7 @@ func (s *Store) HandleSnapshot(
StoreCapacity: capacity,
})
}
defer s.bookie.Fill(ctx, header.RangeDescriptor.RangeID)
}

// Check to see if the snapshot can be applied but don't attempt to add
Expand Down

0 comments on commit 99e14f7

Please sign in to comment.