Skip to content

Commit

Permalink
kvserver: de-flake TestProtectedTimestamps
Browse files Browse the repository at this point in the history
Fixed #75020. This test makes use of an "upsert-until-backpressure"
primitive, where it continually writes blobs to a scratch table
configured with a lower max range size (1<<18 from the default of
512<<20) until it experiences replica backpressure. Before #73876 (when
using the system config span), the splitting off of the scratch table's
ranges happened near instantly after the creation of the table itself.
This changed slightly with the span configs infrastructure, where
there's more of asynchrony built in and ranges may appear after a while
longer.

The test previously started upserting as soon as it created the table,
being able to implicitly rely on the tight synchrony of already having
the table's ranges carved out. This comes when deciding to record a
replica's largest previous max range size -- something we only do if the
replica's current size is larger than the new limit (see
(*Replica).SetSpanCnfig). When we weren't upserting until the range
applied the latest config, this was not possible. With span configs and
its additional asynchrony, this assumption no longer held. It was
possible then for the range to accept writes larger than the newest max
range size, which unintentionally (for this test at least) triggers an
escape hatch where we avoid backpressure when lowering a range's max
size below its current size (#46863).

The failure is easy to repro. This test runs in ~8s, and with the
following we were reliably seeing timeouts where the test was waiting
for backpressure to kick in (but it never did because of the escape
hatch).

    dev test pkg/kv/kvserver \
      -f TestProtectedTimestamps -v --show-logs \
      --timeout 300s --ignore-cache --count 10

De-flaking this test is simple -- we just wait for the table's replicas
to apply the latest config before issuing writes to it.

Release note: None
  • Loading branch information
irfansharif committed Jan 24, 2022
1 parent 0a40796 commit 20161f9
Showing 1 changed file with 40 additions and 5 deletions.
45 changes: 40 additions & 5 deletions pkg/kv/kvserver/client_protectedts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,12 @@ func TestProtectedTimestamps(t *testing.T) {
_, err = conn.Exec("SET CLUSTER SETTING kv.protectedts.poll_interval = '10ms';")
require.NoError(t, err)

_, err = conn.Exec("ALTER TABLE foo CONFIGURE ZONE USING " +
"gc.ttlseconds = 1, range_max_bytes = 1<<18, range_min_bytes = 1<<10;")
_, err = conn.Exec("SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'") // speeds up the test
require.NoError(t, err)

const tableRangeMaxBytes = 1 << 18
_, err = conn.Exec("ALTER TABLE foo CONFIGURE ZONE USING "+
"gc.ttlseconds = 1, range_max_bytes = $1, range_min_bytes = 1<<10;", tableRangeMaxBytes)
require.NoError(t, err)

rRand, _ := randutil.NewTestRand()
Expand All @@ -86,7 +90,25 @@ func TestProtectedTimestamps(t *testing.T) {
const processedPattern = `(?s)shouldQueue=true.*processing replica.*GC score after GC`
processedRegexp := regexp.MustCompile(processedPattern)

getTableStartKey := func(table string) roachpb.Key {
waitForTableSplit := func() {
testutils.SucceedsSoon(t, func() error {
count := 0
if err := conn.QueryRow(
"SELECT count(*) "+
"FROM crdb_internal.ranges_no_leases "+
"WHERE table_name = $1 "+
"AND database_name = current_database()",
"foo").Scan(&count); err != nil {
return err
}
if count == 0 {
return errors.New("waiting for table split")
}
return nil
})
}

getTableStartKey := func() roachpb.Key {
row := conn.QueryRow(
"SELECT start_key "+
"FROM crdb_internal.ranges_no_leases "+
Expand All @@ -102,7 +124,7 @@ func TestProtectedTimestamps(t *testing.T) {
}

getStoreAndReplica := func() (*kvserver.Store, *kvserver.Replica) {
startKey := getTableStartKey("foo")
startKey := getTableStartKey()
// Okay great now we have a key and can go find replicas and stores and what not.
r := tc.LookupRangeOrFatal(t, startKey)
l, _, err := tc.FindRangeLease(r, nil)
Expand All @@ -112,6 +134,16 @@ func TestProtectedTimestamps(t *testing.T) {
return getFirstStoreReplica(t, lhServer, startKey)
}

waitForRangeMaxBytes := func(maxBytes int64) {
testutils.SucceedsSoon(t, func() error {
_, r := getStoreAndReplica()
if r.GetMaxBytes() != maxBytes {
return errors.New("waiting for range_max_bytes to be applied")
}
return nil
})
}

gcSoon := func() {
testutils.SucceedsSoon(t, func() error {
upsertUntilBackpressure()
Expand All @@ -134,13 +166,16 @@ func TestProtectedTimestamps(t *testing.T) {
return thresh
}

waitForTableSplit()
waitForRangeMaxBytes(tableRangeMaxBytes)

beforeWrites := s0.Clock().Now()
gcSoon()

pts := ptstorage.New(s0.ClusterSettings(), s0.InternalExecutor().(*sql.InternalExecutor),
nil /* knobs */)
ptsWithDB := ptstorage.WithDatabase(pts, s0.DB())
startKey := getTableStartKey("foo")
startKey := getTableStartKey()
ptsRec := ptpb.Record{
ID: uuid.MakeV4().GetBytes(),
Timestamp: s0.Clock().Now(),
Expand Down

0 comments on commit 20161f9

Please sign in to comment.