Skip to content

Commit

Permalink
kvserver: add Replica.WaitForLeaseAppliedIndex()
Browse files Browse the repository at this point in the history
This allows a caller to wait for a replica to reach a certain lease
applied index. Similar functionality elsewhere is not migrated yet, out
of caution.

Epic: none
Release note: None
  • Loading branch information
erikgrinaker committed Jan 20, 2024
1 parent c0cdacd commit eb0be13
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 0 deletions.
26 changes: 26 additions & 0 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -894,6 +894,32 @@ func waitForReplicasInit(
})
}

// WaitForLeaseAppliedIndex waits for the replica to reach the given lease
// applied index, or until the context is cancelled or the replica is destroyed.
// Note that the lease applied index may regress across restarts, since we don't
// sync state machine application to disk.
//
// TODO(erikgrinaker): it would be nice if we could be notified about LAI
// updates instead, but polling will do for now.
func (r *Replica) WaitForLeaseAppliedIndex(
ctx context.Context, lai kvpb.LeaseAppliedIndex,
) (kvpb.LeaseAppliedIndex, error) {
retryOpts := retry.Options{
InitialBackoff: 10 * time.Millisecond,
Multiplier: 2,
MaxBackoff: time.Second,
}
for retry := retry.StartWithCtx(ctx, retryOpts); retry.Next(); {
if currentLAI := r.GetLeaseAppliedIndex(); currentLAI >= lai {
return currentLAI, nil
}
if _, err := r.IsDestroyed(); err != nil {
return 0, err
}
}
return 0, ctx.Err()
}

// ChangeReplicas atomically changes the replicas that are members of a range.
// The change is performed in a distributed transaction and takes effect when
// that transaction is committed. This transaction confirms that the supplied
Expand Down
82 changes: 82 additions & 0 deletions pkg/kv/kvserver/replica_command_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ package kvserver
import (
"context"
"encoding/binary"
math "math"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
Expand All @@ -23,6 +25,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -608,3 +613,80 @@ func TestSynthesizeTargetsByChangeType(t *testing.T) {
})
}
}

func TestWaitForLeaseAppliedIndex(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

const maxLAI = math.MaxUint64

ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

tc := testContext{}
tc.Start(ctx, t, stopper)
db := tc.store.DB()

// Submit a write and read it back to bump the initial LAI.
write := func(key, value string) {
require.NoError(t, db.Put(ctx, key, value))
_, err := db.Get(ctx, key)
require.NoError(t, err)
}
write("foo", "bar")

// Should return immediately when already at or past the LAI.
currentLAI := tc.repl.GetLeaseAppliedIndex()
require.NotZero(t, currentLAI)
resultLAI, err := tc.repl.WaitForLeaseAppliedIndex(ctx, currentLAI)
require.NoError(t, err)
require.GreaterOrEqual(t, resultLAI, currentLAI)

// Should wait for a future LAI to be reached.
const numWrites = 10
waitLAI := tc.repl.GetLeaseAppliedIndex() + numWrites
laiC := make(chan kvpb.LeaseAppliedIndex, 1)
go func() {
lai, err := tc.repl.WaitForLeaseAppliedIndex(ctx, waitLAI)
assert.NoError(t, err) // can't use require in goroutine
laiC <- lai
}()

select {
case lai := <-laiC:
t.Fatalf("unexpected early LAI %d", lai)
case <-time.After(time.Second):
}

for i := 0; i < numWrites; i++ {
write("foo", "bar")
}

select {
case lai := <-laiC:
require.GreaterOrEqual(t, lai, waitLAI)
require.GreaterOrEqual(t, tc.repl.GetLeaseAppliedIndex(), lai)
case <-time.After(5 * time.Second):
t.Fatalf("timed out waiting for LAI %d", waitLAI)
}

// Should error on context cancellation.
cancelCtx, cancel := context.WithCancel(ctx)
cancel()
_, err = tc.repl.WaitForLeaseAppliedIndex(cancelCtx, maxLAI)
require.Error(t, err)
require.Equal(t, cancelCtx.Err(), err)

// Should error on destroyed replicas.
stopper.Stop(ctx)

destroyErr := errors.New("destroyed")
tc.repl.mu.Lock()
tc.repl.mu.destroyStatus.Set(destroyErr, destroyReasonRemoved)
tc.repl.mu.Unlock()

_, err = tc.repl.WaitForLeaseAppliedIndex(ctx, maxLAI)
require.Error(t, err)
require.Equal(t, destroyErr, err)
}
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/stores_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ func (is Server) CollectChecksum(
//
// It is the caller's responsibility to cancel or set a timeout on the context.
// If the context is never canceled, WaitForApplication will retry forever.
//
// TODO(erikgrinaker): consider using Replica.WaitForLeaseAppliedIndex().
func (is Server) WaitForApplication(
ctx context.Context, req *WaitForApplicationRequest,
) (*WaitForApplicationResponse, error) {
Expand Down

0 comments on commit eb0be13

Please sign in to comment.