Skip to content

Commit

Permalink
storage: propagate closed timestamp information to RHS after split
Browse files Browse the repository at this point in the history
This commit prevents discontinuities in ability to read a value from a follower
after a split. The invariant we want to uphold is that values which were safe
to read from a follower will remain safe to read following the split. In the
face of merges the system provides the claim that timestamps which were readable
on both the left and right sides of the merge will remain readable after the
merge.

Release note: None
  • Loading branch information
ajwerner committed Mar 12, 2019
1 parent 7037b54 commit 4c0cbf8
Show file tree
Hide file tree
Showing 6 changed files with 211 additions and 38 deletions.
18 changes: 18 additions & 0 deletions pkg/server/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,24 @@ func (ts *TestServer) LookupRange(key roachpb.Key) (roachpb.RangeDescriptor, err
return rs[0], nil
}

// MergeRanges merges the range containing leftKey with the range to its right.
func (ts *TestServer) MergeRanges(leftKey roachpb.Key) (roachpb.RangeDescriptor, error) {

ctx := context.Background()
mergeReq := roachpb.AdminMergeRequest{
RequestHeader: roachpb.RequestHeader{
Key: leftKey,
},
}
_, pErr := client.SendWrapped(ctx, ts.DB().NonTransactionalSender(), &mergeReq)
if pErr != nil {
return roachpb.RangeDescriptor{},
errors.Errorf(
"%q: merge unexpected error: %s", leftKey, pErr)
}
return ts.LookupRange(leftKey)
}

// SplitRange splits the range containing splitKey.
// The right range created by the split starts at the split key and extends to the
// original range's end key.
Expand Down
204 changes: 169 additions & 35 deletions pkg/storage/closed_timestamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
Expand All @@ -57,7 +60,7 @@ func TestClosedTimestampCanServe(t *testing.T) {

ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}
testutils.SucceedsSoon(t, func() error {
return verifyCanReadFromAllRepls(ctx, t, desc, ts, repls, 1)
return verifyCanReadFromAllRepls(ctx, t, desc, ts, repls, expectRows(1))
})

// We just served a follower read. As a sanity check, make sure that we can't write at
Expand Down Expand Up @@ -94,6 +97,9 @@ func TestClosedTimestampCanServe(t *testing.T) {
}
}

// TestClosedTimestampCanServerThroughoutLeaseTransfer verifies that lease
// transfers does not prevent reading a value from a follower that was
// previously readable.
func TestClosedTimestampCanServeThroughoutLeaseTransfer(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand All @@ -112,13 +118,15 @@ func TestClosedTimestampCanServeThroughoutLeaseTransfer(t *testing.T) {
}
ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}
testutils.SucceedsSoon(t, func() error {
return verifyCanReadFromAllRepls(ctx, t, desc, ts, repls, 1)
return verifyCanReadFromAllRepls(ctx, t, desc, ts, repls, expectRows(1))
})

// Once we know that we can read safely at this timestamp, want to ensure
// Once we know that we can read safely at this timestamp, we want to ensure
// that we can always read from this timestamp from all replicas even while
// lease transfers are ongoing. The test launches a goroutine to randomly
// trigger transfers every
// trigger transfers at random intervals up to 50ms and ensures that there
// are no errors reading the same value from any replica throughout the
// duration of the test (testTime).
const testTime = 500 * time.Millisecond
const maxTransferWait = 50 * time.Millisecond
deadline := timeutil.Now().Add(testTime)
Expand Down Expand Up @@ -151,7 +159,7 @@ func TestClosedTimestampCanServeThroughoutLeaseTransfer(t *testing.T) {
}
g.Go(transferLeasesRandomlyUntilDeadline)

// Attempt to send read requests to a replica in a tight loop until deadline
// Attempt to send read requests to a replica in a tight loop until deadline
// is reached. If an error is seen on any replica then it is returned to the
// errgroup.
baRead := makeReadBatchRequestForDesc(desc, ts)
Expand Down Expand Up @@ -179,6 +187,84 @@ func TestClosedTimestampCanServeThroughoutLeaseTransfer(t *testing.T) {
}
}

// TestClosedTimestampCanServeAfterSplitsAndMerges validates the invariant that
// if a timestamp is safe for reading on both the left side and right side of a
// a merge then it will be safe after the merge and that if a timestamp is safe
// for reading before the beginning of a split it will be safe on both sides of
// of the split.
func TestClosedTimestampCanServeAfterSplitAndMerges(t *testing.T) {
defer leaktest.AfterTest(t)()

if util.RaceEnabled {
// Limiting how long transactions can run does not work
// well with race unless we're extremely lenient, which
// drives up the test duration.
t.Skip("skipping under race")
}
ctx := context.Background()
tc, db0, desc, repls := setupTestClusterForClosedTimestampTesting(ctx, t)
// Disable the automatic merging.
if _, err := db0.Exec("SET CLUSTER SETTING kv.range_merge.queue_enabled = false"); err != nil {
t.Fatal(err)
}

defer tc.Stopper().Stop(ctx)

if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil {
t.Fatal(err)
}
if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(3, $1)`, "foo"); err != nil {
t.Fatal(err)
}
// Start by ensuring that the values can be read from all replicas at ts.
ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}
testutils.SucceedsSoon(t, func() error {
return verifyCanReadFromAllRepls(ctx, t, desc, ts, repls, expectRows(2))
})
// Manually split the table to have easier access to descriptors.
tableID, err := getTableID(db0, "cttest", "kv")
if err != nil {
t.Fatalf("failed to lookup ids: %v", err)
}
// Split the table at key 2.
k, err := sqlbase.EncodeTableKey(sqlbase.EncodeTableIDIndexID(nil, tableID, 1),
tree.NewDInt(2), encoding.Ascending)
if err != nil {
t.Fatalf("failed to encode key: %v", err)
}
lr, rr, err := tc.Server(0).SplitRange(k)
if err != nil {
t.Fatalf("failed to split range at key %v: %v", roachpb.Key(k), err)
}

// Ensure that we can perform follower reads from all replicas.
lRepls := replsForRange(ctx, t, tc, lr)
rRepls := replsForRange(ctx, t, tc, rr)
// Now immediately query both the ranges and there's 1 value per range.
// We need to tollerate RangeNotFound as the split range may not have been
// created yet.
require.Nil(t, verifyCanReadFromAllRepls(ctx, t, lr, ts, lRepls,
respFuncs(retryOnRangeNotFound, expectRows(1))))
require.Nil(t, verifyCanReadFromAllRepls(ctx, t, rr, ts, rRepls,
respFuncs(retryOnRangeNotFound, expectRows(1))))
// Now merge the ranges back together and ensure that there's two values in
// the merged range.
merged, err := tc.Server(0).MergeRanges(lr.StartKey.AsRawKey())
require.Nil(t, err)
mergedRepls := replsForRange(ctx, t, tc, merged)
// The hazard here is that a follower is not yet aware of the merge and will
// return an error. We'll accept that because a client wouldn't see that error
// from distsender.
require.Nil(t, verifyCanReadFromAllRepls(ctx, t, merged, ts, mergedRepls,
respFuncs(retryOnRangeKeyMismatch, expectRows(2))))
}

func getTableID(db *gosql.DB, dbName, tableName string) (tableID sqlbase.ID, err error) {
err = db.QueryRow(`SELECT table_id FROM crdb_internal.tables WHERE database_name = $1 AND name = $2`,
dbName, tableName).Scan(&tableID)
return
}

// Every 0.1s=100ms, try close out a timestamp ~300ms in the past.
// We don't want to be more aggressive than that since it's also
// a limit on how long transactions can run.
Expand Down Expand Up @@ -208,13 +294,13 @@ func replsForRange(
return repls
}

// This gnarly helper function creates a test cluster that is prepared to
// exercise follower reads. The returned test cluster has follower reads enabled
// using the above targetDuration and closeFraction. In addition to the newly
// minted test cluster, this function returns a db handle to node 0, a range
// descriptor for the range used by the table `cttest.kv` and the replica
// objects corresponding to the replicas for the range. It is the caller's
// responsibility to Stop the Stopper on the returned test cluster when done.
// This function creates a test cluster that is prepared to exercise follower
// reads. The returned test cluster has follower reads enabled using the above
// targetDuration and closeFraction. In addition to the newly minted test
// cluster, this function returns a db handle to node 0, a range descriptor for
// the range used by the table `cttest.kv` and the replica objects corresponding
// to the replicas for the range. It is the caller's responsibility to Stop the
// Stopper on the returned test cluster when done.
func setupTestClusterForClosedTimestampTesting(
ctx context.Context, t *testing.T,
) (
Expand Down Expand Up @@ -296,40 +382,88 @@ CREATE TABLE cttest.kv (id INT PRIMARY KEY, value STRING);
return tc, db0, desc, repls
}

type respFunc func(*roachpb.BatchResponse, *roachpb.Error) (err error, shouldRetry bool)

// respFuncs returns a respFunc which is passes its arguments to each passed
// func until one returns shouldRetry or a non-nil error.
func respFuncs(funcs ...respFunc) respFunc {
return func(resp *roachpb.BatchResponse, pErr *roachpb.Error) (err error, shouldRetry bool) {
for _, f := range funcs {
err, shouldRetry = f(resp, pErr)
if err != nil || shouldRetry {
break
}
}
return err, shouldRetry
}
}

func retryOnError(f func(*roachpb.Error) bool) respFunc {
return func(resp *roachpb.BatchResponse, pErr *roachpb.Error) (err error, shouldRetry bool) {
if pErr != nil && f(pErr) {
return nil, true
}
return pErr.GoError(), false
}
}

var retryOnRangeKeyMismatch = retryOnError(func(pErr *roachpb.Error) bool {
_, isRangeKeyMismatch := pErr.Detail.Value.(*roachpb.ErrorDetail_RangeKeyMismatch)
return isRangeKeyMismatch
})

var retryOnRangeNotFound = retryOnError(func(pErr *roachpb.Error) bool {
_, isRangeNotFound := pErr.Detail.Value.(*roachpb.ErrorDetail_RangeNotFound)
return isRangeNotFound
})

func expectRows(expectedRows int) respFunc {
return func(resp *roachpb.BatchResponse, pErr *roachpb.Error) (err error, shouldRetry bool) {
if pErr != nil {
return pErr.GoError(), false
}
rows := resp.Responses[0].GetInner().(*roachpb.ScanResponse).Rows
// Should see the write.
if len(rows) != expectedRows {
return fmt.Errorf("expected %d rows, but got %d", expectedRows, len(rows)), false
}
return nil, false
}
}

func verifyCanReadFromAllRepls(
ctx context.Context,
t *testing.T,
desc roachpb.RangeDescriptor,
ts hlc.Timestamp,
repls []*storage.Replica,
expectedRows int,
f respFunc,
) error {
t.Helper()
retryOptions := retry.Options{
InitialBackoff: 500 * time.Microsecond,
MaxBackoff: 5 * time.Millisecond,
MaxRetries: 100,
}
baRead := makeReadBatchRequestForDesc(desc, ts)
// The read should succeed once enough time (~300ms, but it's difficult to
// assert on that) has passed - on all replicas!

for _, repl := range repls {
resp, pErr := repl.Send(ctx, baRead)
if pErr != nil {
switch tErr := pErr.GetDetail().(type) {
case *roachpb.NotLeaseHolderError:
log.Infof(ctx, "got not lease holder error, here's the lease: %v %v %v %v", *tErr.Lease, tErr.Lease.Start.GoTime(), ts.GoTime(), ts.GoTime().Sub(tErr.Lease.Start.GoTime()))
return tErr
case *roachpb.RangeNotFoundError:
// Can happen during upreplication.
return tErr
default:
t.Fatal(errors.Wrapf(pErr.GoError(), "on %s", repl))
}
}
rows := resp.Responses[0].GetInner().(*roachpb.ScanResponse).Rows
// Should see the write.
if len(rows) != expectedRows {
t.Fatalf("expected %d rows, but got %d", expectedRows, len(rows))
}
g, ctx := errgroup.WithContext(ctx)
for i := range repls {
repl := repls[i]
func(r *storage.Replica) {
g.Go(func() (err error) {
var shouldRetry bool
for r := retry.StartWithCtx(ctx, retryOptions); r.Next(); <-r.NextCh() {
if err, shouldRetry = f(repl.Send(ctx, baRead)); !shouldRetry {
return err
}
}
return err
})
}(repls[i])
}
return nil
return g.Wait()
}

func makeReadBatchRequestForDesc(
Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,9 @@ type Replica struct {
// we know that the replica has caught up.
lastReplicaAdded roachpb.ReplicaID
lastReplicaAddedTime time.Time
// initialMaxClosed is the initial maxClosed timestamp for the replica as known
// from its left-hand-side upon creation.
initialMaxClosed hlc.Timestamp

// The most recently updated time for each follower of this range. This is updated
// every time a Raft message is received from a peer.
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/replica_follower_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,11 @@ func (r *Replica) maxClosed(ctx context.Context) hlc.Timestamp {
r.mu.RLock()
lai := r.mu.state.LeaseAppliedIndex
lease := *r.mu.state.Lease
initialMaxClosed := r.mu.initialMaxClosed
r.mu.RUnlock()
maxClosed := r.store.cfg.ClosedTimestamp.Provider.MaxClosed(
lease.Replica.NodeID, r.RangeID, ctpb.Epoch(lease.Epoch), ctpb.LAI(lai))
if maxClosed.IsEmpty() || maxClosed.Less(lease.Start) {
maxClosed = lease.Start
}
maxClosed.Forward(lease.Start)
maxClosed.Forward(initialMaxClosed)
return maxClosed
}
14 changes: 14 additions & 0 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2069,10 +2069,24 @@ func splitPostApply(
}

// Finish initialization of the RHS.

// This initialMaxClosedValue is created here to ensure that follower reads
// do not regress following the split. After the split occurs there will be no
// information in the closedts subsystem about the newly minted RHS range from
// its leaseholder's store. Furthermore, the RHS will have a lease start time
// equal to that of the LHS which might be quite old. This means that
// timestamps which follow the least StartTime for the LHS part are below the
// current closed timestamp for the LHS would no longer be readable on the RHS
// after the split. It is critical that this call to maxClosed happen during
// the splitPostApply so that it refers to a LAI that is equal to the index at
// which this lease was applied. If it were to refer to a LAI after the split
// then the value of initialMaxClosed might be unsafe.
initialMaxClosed := r.maxClosed(ctx)
r.mu.Lock()
rightRng.mu.Lock()
// Copy the minLeaseProposedTS from the LHS.
rightRng.mu.minLeaseProposedTS = r.mu.minLeaseProposedTS
rightRng.mu.initialMaxClosed = initialMaxClosed
rightLease := *rightRng.mu.state.Lease
rightRng.mu.Unlock()
r.mu.Unlock()
Expand Down
4 changes: 4 additions & 0 deletions pkg/testutils/serverutils/test_server_shim.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ type TestServerInterface interface {
splitKey roachpb.Key,
) (left roachpb.RangeDescriptor, right roachpb.RangeDescriptor, err error)

// MergeRanges merges the range containing leftKey with the following adjacent
// range.
MergeRanges(leftKey roachpb.Key) (merged roachpb.RangeDescriptor, err error)

// ExpectedInitialRangeCount returns the expected number of ranges that should
// be on the server after initial (asynchronous) splits have been completed,
// assuming no additional information is added outside of the normal bootstrap
Expand Down

0 comments on commit 4c0cbf8

Please sign in to comment.