diff --git a/pkg/storage/client_raft_test.go b/pkg/storage/client_raft_test.go index c253c8f47fbc..c8d5151e915f 100644 --- a/pkg/storage/client_raft_test.go +++ b/pkg/storage/client_raft_test.go @@ -1990,6 +1990,9 @@ func TestStoreRangeRebalance(t *testing.T) { generated += m.RangeSnapshotsGenerated.Count() normalApplied += m.RangeSnapshotsNormalApplied.Count() preemptiveApplied += m.RangeSnapshotsPreemptiveApplied.Count() + if n := s.ReservationCount(); n != 0 { + t.Fatalf("expected 0 reservations, but found %d", n) + } } if generated == 0 { t.Fatalf("expected at least 1 snapshot, but found 0") diff --git a/pkg/storage/helpers_test.go b/pkg/storage/helpers_test.go index 2ff71bfcac03..6f32232483b1 100644 --- a/pkg/storage/helpers_test.go +++ b/pkg/storage/helpers_test.go @@ -169,6 +169,12 @@ func (s *Store) ManualReplicaGC(repl *Replica) error { return s.gcQueue.process(ctx, s.Clock().Now(), repl, cfg) } +func (s *Store) ReservationCount() int { + s.bookie.mu.Lock() + defer s.bookie.mu.Unlock() + return len(s.bookie.mu.reservationsByRangeID) +} + func (r *Replica) RaftLock() { r.raftMu.Lock() } diff --git a/pkg/storage/replica_raftstorage.go b/pkg/storage/replica_raftstorage.go index 13efc6bc5981..b3d8b5259cf6 100644 --- a/pkg/storage/replica_raftstorage.go +++ b/pkg/storage/replica_raftstorage.go @@ -553,9 +553,6 @@ func (r *Replica) applySnapshot( ) error { // Extract the updated range descriptor. desc := inSnap.RangeDescriptor - // Fill the reservation if there was one for this range, regardless of - // whether the application succeeded. - defer r.store.bookie.Fill(desc.RangeID) r.mu.Lock() replicaID := r.mu.replicaID diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 4a8ec539a591..30377007f269 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -50,7 +50,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/storagebase" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util" - "github.com/cockroachdb/cockroach/pkg/util/bufalloc" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -6221,81 +6220,6 @@ func TestCommandTimeThreshold(t *testing.T) { } } -// TestReserveAndApplySnapshot checks to see if a snapshot is correctly applied -// and that its reservation is removed. -func TestReserveAndApplySnapshot(t *testing.T) { - defer leaktest.AfterTest(t)() - - tsc := TestStoreConfig() - tc := testContext{} - tc.StartWithStoreConfig(t, tsc) - defer tc.Stop() - - checkReservations := func(t *testing.T, expected int) { - tc.store.bookie.mu.Lock() - defer tc.store.bookie.mu.Unlock() - if e, a := expected, len(tc.store.bookie.mu.reservationsByRangeID); e != a { - t.Fatalf("wrong number of reservations - expected:%d, actual:%d", e, a) - } - } - - key := roachpb.RKey("a") - firstRng := tc.store.LookupReplica(key, nil) - snap, err := firstRng.GetSnapshot(context.Background()) - if err != nil { - t.Fatal(err) - } - - tc.store.metrics.Available.Update(tc.store.bookie.maxReservedBytes) - - // Note that this is an artificial scenario in which we're adding a - // reservation for a replica that is already on the range. This test is - // designed to test the filling of the reservation specifically and in - // normal operation there should not be a reservation for an existing - // replica. - req := ReservationRequest{ - StoreRequestHeader: StoreRequestHeader{ - StoreID: tc.store.StoreID(), - NodeID: tc.store.nodeDesc.NodeID, - }, - RangeID: firstRng.RangeID, - RangeSize: 10, - } - - if !tc.store.Reserve(context.Background(), req).Reserved { - t.Fatalf("Can't reserve the replica") - } - checkReservations(t, 1) - - b := firstRng.store.Engine().NewBatch() - var alloc bufalloc.ByteAllocator - for ; snap.Iter.Valid(); snap.Iter.Next() { - var key engine.MVCCKey - var value []byte - alloc, key, value = snap.Iter.allocIterKeyValue(alloc) - mvccKey := engine.MVCCKey{ - Key: key.Key, - Timestamp: key.Timestamp, - } - if err := b.Put(mvccKey, value); err != nil { - t.Fatal(err) - } - } - - // Apply a snapshot and check the reservation was filled. Note that this - // out-of-band application could be a root cause if this test ever crashes. - if err := firstRng.applySnapshot(context.Background(), IncomingSnapshot{ - SnapUUID: snap.SnapUUID, - RangeDescriptor: *firstRng.Desc(), - Batches: [][]byte{b.Repr()}, - }, - snap.RaftSnap, raftpb.HardState{}); err != nil { - t.Fatal(err) - } - firstRng.CloseOutSnap() - checkReservations(t, 0) -} - func TestDeprecatedRequests(t *testing.T) { defer leaktest.AfterTest(t)() diff --git a/pkg/storage/reservation.go b/pkg/storage/reservation.go index 54ccb15e3b31..185d7568f494 100644 --- a/pkg/storage/reservation.go +++ b/pkg/storage/reservation.go @@ -136,14 +136,12 @@ func (b *bookie) Reserve( // To update the reservation, fill the original one and add the // new one. if log.V(2) { - log.Infof(ctx, "updating existing reservation for rangeID:%d, %+v", req.RangeID, - olderReservation) + log.Infof(ctx, "[r%d], updating existing reservation", req.RangeID) } - b.fillReservationLocked(olderReservation) + b.fillReservationLocked(ctx, olderReservation) } else { if log.V(2) { - log.Infof(ctx, "there is pre-existing reservation %+v, can't update with %+v", - olderReservation, req) + log.Infof(ctx, "[r%d] unable to update due to pre-existing reservation", req.RangeID) } return resp } @@ -152,8 +150,8 @@ func (b *bookie) Reserve( // Do we have too many current reservations? if len(b.mu.reservationsByRangeID) >= b.maxReservations { if log.V(1) { - log.Infof(ctx, "could not book reservation %+v, too many reservations already (current:%d, max:%d)", - req, len(b.mu.reservationsByRangeID), b.maxReservations) + log.Infof(ctx, "[r%d] unable to book reservation, too many reservations (current:%d, max:%d)", + req.RangeID, len(b.mu.reservationsByRangeID), b.maxReservations) } return resp } @@ -165,8 +163,8 @@ func (b *bookie) Reserve( available := b.metrics.Available.Value() if b.mu.size+(req.RangeSize*2) > available { if log.V(1) { - log.Infof(ctx, "could not book reservation %+v, not enough available disk space (requested:%d*2, reserved:%d, available:%d)", - req, req.RangeSize, b.mu.size, available) + log.Infof(ctx, "[r%d] unable to book reservation, not enough available disk space (requested:%d*2, reserved:%d, available:%d)", + req.RangeID, req.RangeSize, b.mu.size, available) } return resp } @@ -174,8 +172,8 @@ func (b *bookie) Reserve( // Do we have enough reserved space free for the reservation? if b.mu.size+req.RangeSize > b.maxReservedBytes { if log.V(1) { - log.Infof(ctx, "could not book reservation %+v, not enough available reservation space (requested:%d, reserved:%d, maxReserved:%d)", - req, req.RangeSize, b.mu.size, b.maxReservedBytes) + log.Infof(ctx, "[r%d] unable to book reservation, not enough available reservation space (requested:%d, reserved:%d, maxReserved:%d)", + req.RangeID, req.RangeSize, b.mu.size, b.maxReservedBytes) } return resp } @@ -184,8 +182,8 @@ func (b *bookie) Reserve( for _, rep := range deadReplicas { if req.RangeID == rep.RangeID { if log.V(1) { - log.Infof(ctx, "could not book reservation %+v, the replica has been destroyed", - req) + log.Infof(ctx, "[r%d] unable to book reservation, the replica has been destroyed", + req.RangeID) } return ReservationResponse{Reserved: false} } @@ -205,7 +203,8 @@ func (b *bookie) Reserve( b.metrics.Reserved.Inc(req.RangeSize) if log.V(1) { - log.Infof(ctx, "new reservation added: %+v", newReservation) + log.Infof(ctx, "[r%s] new reservation, size=%d", + newReservation.RangeID, newReservation.RangeSize) } resp.Reserved = true @@ -214,7 +213,7 @@ func (b *bookie) Reserve( // Fill removes a reservation. Returns true when the reservation has been // successfully removed. -func (b *bookie) Fill(rangeID roachpb.RangeID) bool { +func (b *bookie) Fill(ctx context.Context, rangeID roachpb.RangeID) bool { b.mu.Lock() defer b.mu.Unlock() @@ -222,20 +221,20 @@ func (b *bookie) Fill(rangeID roachpb.RangeID) bool { res, ok := b.mu.reservationsByRangeID[rangeID] if !ok { if log.V(2) { - log.Infof(context.TODO(), "there is no reservation for rangeID:%d", rangeID) + log.Infof(ctx, "[r%d] reservation not found", rangeID) } return false } - b.fillReservationLocked(res) + b.fillReservationLocked(ctx, res) return true } // fillReservationLocked fills a reservation. It requires that the bookie's // lock is held. This should only be called internally. -func (b *bookie) fillReservationLocked(res *reservation) { +func (b *bookie) fillReservationLocked(ctx context.Context, res *reservation) { if log.V(2) { - log.Infof(context.TODO(), "filling reservation: %+v", res) + log.Infof(ctx, "[r%d] filling reservation", res.RangeID) } // Remove it from reservationsByRangeID. Note that we don't remove it from the @@ -255,6 +254,7 @@ func (b *bookie) start(stopper *stop.Stopper) { stopper.RunWorker(func() { var timeoutTimer timeutil.Timer defer timeoutTimer.Stop() + ctx := context.TODO() for { var timeout time.Duration b.mu.Lock() @@ -269,9 +269,9 @@ func (b *bookie) start(stopper *stop.Stopper) { expiredReservation := b.mu.queue.dequeue() // Is it an active reservation? if b.mu.reservationsByRangeID[expiredReservation.RangeID] == expiredReservation { - b.fillReservationLocked(expiredReservation) + b.fillReservationLocked(ctx, expiredReservation) } else if log.V(2) { - log.Infof(context.TODO(), "the reservation for rangeID %d has already been filled.", + log.Infof(ctx, "[r%d] expired reservation has already been filled", expiredReservation.RangeID) } // Set the timeout to 0 to force another peek. diff --git a/pkg/storage/reservation_test.go b/pkg/storage/reservation_test.go index 54c845f689a2..d8dafdc6d459 100644 --- a/pkg/storage/reservation_test.go +++ b/pkg/storage/reservation_test.go @@ -105,6 +105,7 @@ func TestBookieReserve(t *testing.T) { {rangeID: 0, reserve: false, expSuc: true, expOut: 0, expBytes: 0, expReservations: 5}, } + ctx := context.Background() for i, testCase := range testCases { if testCase.reserve { // Try to reserve the range. @@ -116,7 +117,7 @@ func TestBookieReserve(t *testing.T) { RangeID: roachpb.RangeID(testCase.rangeID), RangeSize: int64(testCase.rangeID), } - if resp := b.Reserve(context.Background(), req, testCase.deadReplicas); resp.Reserved != testCase.expSuc { + if resp := b.Reserve(ctx, req, testCase.deadReplicas); resp.Reserved != testCase.expSuc { if testCase.expSuc { t.Errorf("%d: expected a successful reservation, was rejected", i) } else { @@ -125,7 +126,7 @@ func TestBookieReserve(t *testing.T) { } } else { // Fill the reservation. - if filled := b.Fill(roachpb.RangeID(testCase.rangeID)); filled != testCase.expSuc { + if filled := b.Fill(ctx, roachpb.RangeID(testCase.rangeID)); filled != testCase.expSuc { if testCase.expSuc { t.Errorf("%d: expected a successful filled reservation, was rejected", i) } else { @@ -313,7 +314,7 @@ func TestReservationQueue(t *testing.T) { verifyBookie(t, b, 10 /*reservations*/, 10 /*queue*/, 10*bytesPerReservation /*bytes*/) // Fill reservation 2. - if !b.Fill(2) { + if !b.Fill(context.Background(), 2) { t.Fatalf("Could not fill reservation 2") } // After filling a reservation, wait a full cycle so that it can be timed @@ -325,10 +326,10 @@ func TestReservationQueue(t *testing.T) { verifyBookie(t, b, 8 /*reservations*/, 9 /*queue*/, 8*bytesPerReservation /*bytes*/) // Fill reservations 4 and 6. - if !b.Fill(4) { + if !b.Fill(context.Background(), 4) { t.Fatalf("Could not fill reservation 4") } - if !b.Fill(6) { + if !b.Fill(context.Background(), 6) { t.Fatalf("Could not fill reservation 6") } verifyBookie(t, b, 6 /*reservations*/, 9 /*queue*/, 6*bytesPerReservation /*bytes*/) @@ -383,7 +384,7 @@ func TestReservationQueue(t *testing.T) { verifyBookie(t, b, 8 /*reservations*/, 9 /*queue*/, 8*bytesPerReservation /*bytes*/) // Fill 1 a second time. - if !b.Fill(1) { + if !b.Fill(context.Background(), 1) { t.Fatalf("Could not fill reservation 1 (second pass)") } verifyBookie(t, b, 7 /*reservations*/, 9 /*queue*/, 7*bytesPerReservation /*bytes*/) diff --git a/pkg/storage/store.go b/pkg/storage/store.go index a39d6ae5f399..0cfbe4821a0b 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -2505,6 +2505,7 @@ func (s *Store) HandleSnapshot( StoreCapacity: capacity, }) } + defer s.bookie.Fill(ctx, header.RangeDescriptor.RangeID) } // Check to see if the snapshot can be applied but don't attempt to add