Skip to content

Commit

Permalink
Merge #45826
Browse files Browse the repository at this point in the history
45826: storage: enforce gc ttl strictly r=ajwerner a=ajwerner

This PR creates a cluster setting to control whether the GC TTL is enforced
"strictly". Strict GC enforcement is not a strong guarantee that we will
serve no user reads whatsoever for timestamps that are older than the current
reading of the node's clock less the GC TTL. Rather it is a setting to enable
the behavior that most of the time, most requests will see an error if they
use a timestamp which is older than the GC TTL.

The main case in which the user will see an error is if the only reason a
read would be able to request a timestamp was because of a protected timestamp.
The process of verifying a protected timestamp will prevent this case.
If the users of protected timestamps verify the protected timestamp before
they operate, or they use admin commands, they'll be fine.

In the presence of a protected timestamp, all requests can read back to
that protected timestamp.

Lease transfers are another source possibility for data to be visible at the
GC Threshold rather than the TTL.

Strict mode is controlled via a cluster setting.

Strict mode does not apply to admin commands or to system ranges.

The interesting thing with this all is that the problem which prompted this
setting in the first place was due to backups being run at a period longer
than the GC TTL and pretty much never failing.

Backups in 20.1 (in theory) are going to use protected timestamps. If they
successfully verify the protected timestamp then they'll be able to still
succeed an even higher percentage of the time! If we want to discourage this
behavior we could

* Have the protected timestamp subsystem enforce that clients can't save
  data they currently view as expired. This is probably a bad idea because
  interestingly enough the protected timestamp subsystem ignores zone configs
  completely.

* Have the higher levels try to do it. This seems probably better. The backup
  pulls the table descriptors and knows exactly the timestamp ranges it's
  backing up. In a lot of cases I can imagine wanting to back up data that
  is expired. Making that hard would probably be worse than making messing
  up like this worse. It probably also has zone configs. I might instead
  just warn loudly in this case.

What this will do is bring greater awareness of the GC ttl. If you don't
update or delete data frequently the GC hueristic won't kick in for quite
some time.


Release justification: bug fixes and low-risk updates to new functionality

Release note (general change): GC TTLs will now be enforced by default.
This enforcement can be controlled by disabling `kv.gc_ttl.strict_enforcement`

Co-authored-by: Andrew Werner <[email protected]>
  • Loading branch information
craig[bot] and ajwerner committed Mar 12, 2020
2 parents f960172 + 2e0fa12 commit 793a920
Show file tree
Hide file tree
Showing 25 changed files with 585 additions and 83 deletions.
7 changes: 6 additions & 1 deletion pkg/ccl/changefeedccl/cdctest/testfeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,14 +321,19 @@ func MakeTableFeedFactory(
}

// Feed implements the TestFeedFactory interface
func (f *tableFeedFactory) Feed(create string, args ...interface{}) (TestFeed, error) {
func (f *tableFeedFactory) Feed(create string, args ...interface{}) (_ TestFeed, err error) {
sink := f.sink
sink.Path = fmt.Sprintf(`table_%d`, timeutil.Now().UnixNano())

db, err := gosql.Open("postgres", sink.String())
if err != nil {
return nil, err
}
defer func() {
if err != nil {
_ = db.Close()
}
}()

sink.Scheme = `experimental-sql`
c := &TableFeed{
Expand Down
13 changes: 13 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,19 @@ func changefeedPlanHook(
}
return err
}
// If we created a protected timestamp for an initial scan, verify it.
// Doing this synchronously here rather than asynchronously later provides
// a nice UX win in the case that the data isn't actually available.
if protectedTimestampID != uuid.Nil {
if err := p.ExecCfg().ProtectedTimestampProvider.Verify(ctx, protectedTimestampID); err != nil {
if cancelErr := sj.Cancel(ctx); cancelErr != nil {
if ctx.Err() == nil {
log.Warningf(ctx, "failed to cancel job: %v", cancelErr)
}
}
return err
}
}
}

// Start the job and wait for it to signal on startedCh.
Expand Down
58 changes: 58 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/distsql"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
Expand All @@ -55,6 +56,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
crdberrors "github.com/cockroachdb/errors"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -2306,6 +2308,62 @@ func TestChangefeedProtectedTimestamps(t *testing.T) {
}))
}

// This test ensures that the changefeed attempts to verify its initial protected
// timestamp record and that when that verification fails, the job is canceled
// and the record removed.
func TestChangefeedProtectedTimestampsVerificationFails(t *testing.T) {
defer leaktest.AfterTest(t)()

defer func(i time.Duration) { jobs.DefaultAdoptInterval = i }(jobs.DefaultAdoptInterval)
jobs.DefaultAdoptInterval = 10 * time.Millisecond

verifyRequestCh := make(chan *roachpb.AdminVerifyProtectedTimestampRequest, 1)
requestFilter := storagebase.ReplicaRequestFilter(func(
ctx context.Context, ba roachpb.BatchRequest,
) *roachpb.Error {
if r, ok := ba.GetArg(roachpb.AdminVerifyProtectedTimestamp); ok {
req := r.(*roachpb.AdminVerifyProtectedTimestampRequest)
verifyRequestCh <- req
return roachpb.NewError(errors.Errorf("failed to verify protection %v on %v", req.RecordID, ba.RangeID))
}
return nil
})
t.Run(`enterprise`, enterpriseTestWithServerArgs(
func(args *base.TestServerArgs) {
storeKnobs := &kvserver.StoreTestingKnobs{}
storeKnobs.TestingRequestFilter = requestFilter
args.Knobs.Store = storeKnobs
},
func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
ctx := context.TODO()
sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
_, err := f.Feed(`CREATE CHANGEFEED FOR foo WITH resolved`)
// Make sure we got the injected error.
require.Regexp(t, "failed to verify", err)
// Make sure we tried to verify the request.
r := <-verifyRequestCh
cfg := f.Server().ExecutorConfig().(sql.ExecutorConfig)
kvDB := cfg.DB
pts := cfg.ProtectedTimestampProvider
// Make sure that the canceled job gets moved through its OnFailOrCancel
// phase and removes its protected timestamp.
testutils.SucceedsSoon(t, func() error {
err := kvDB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
_, err := pts.GetRecord(ctx, txn, r.RecordID)
return err
})
if err == nil {
return errors.Errorf("expected record to be removed")
}
if crdberrors.Is(err, protectedts.ErrNotExists) {
return nil
}
return err
})
}))
}

func TestManyChangefeedsOneTable(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down
7 changes: 7 additions & 0 deletions pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -811,3 +811,10 @@ func (sj *StartableJob) CleanupOnRollback(ctx context.Context) error {
sj.registry.unregister(*sj.ID())
return nil
}

// Cancel will mark the job as canceled and release its resources in the
// Registry.
func (sj *StartableJob) Cancel(ctx context.Context) error {
defer sj.registry.unregister(*sj.ID())
return sj.registry.CancelRequested(ctx, nil, *sj.ID())
}
15 changes: 15 additions & 0 deletions pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2049,4 +2049,19 @@ func TestStartableJob(t *testing.T) {
require.NotEqual(t, id, *sj.ID())
}
})
t.Run("Cancel", func(t *testing.T) {
txn := db.NewTxn(ctx, "test")
sj, err := jr.CreateStartableJobWithTxn(ctx, rec, txn, nil)
require.NoError(t, err)
require.NoError(t, txn.Commit(ctx))
require.NoError(t, sj.Cancel(ctx))
status, err := sj.CurrentStatus(ctx)
require.NoError(t, err)
require.Equal(t, jobs.StatusCancelRequested, status)
for _, id := range jr.CurrentlyRunningJobs() {
require.NotEqual(t, id, *sj.ID())
}
_, err = sj.Start(ctx)
require.Regexp(t, "job with status cancel-requested cannot be marked started", err)
})
}
194 changes: 194 additions & 0 deletions pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"math"
"math/rand"
"reflect"
"strconv"
"sync"
"sync/atomic"
"testing"
Expand All @@ -28,10 +29,12 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/storagebase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/storagepb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand Down Expand Up @@ -2939,6 +2942,197 @@ func TestTransferLeaseBlocksWrites(t *testing.T) {
require.NoError(t, <-transferErr)
}

// TestStrictGCEnforcement ensures that strict GC enforcement is respected and
// furthermore is responsive to changes in protected timestamps and in changes
// to the zone configs.
func TestStrictGCEnforcement(t *testing.T) {
defer leaktest.AfterTest(t)()

// The unfortunate thing about this test is that the gcttl is in seconds and
// we need to wait for the replica's lease start time to be sufficiently old.
// It takes about two seconds. All of that time is in setup.
if testing.Short() {
return
}
ctx := context.Background()

tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
})
defer tc.Stopper().Stop(ctx)

sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0))
sqlDB.Exec(t, `CREATE TABLE foo (i INT PRIMARY KEY)`)

var (
db = tc.Server(0).DB()
getTableID = func() (tableID uint32) {
sqlDB.QueryRow(t, `SELECT table_id FROM crdb_internal.tables`+
` WHERE name = 'foo' AND database_name = current_database()`).Scan(&tableID)
return tableID
}
tableID = getTableID()
tenSecondsAgo hlc.Timestamp // written in setup
tableKey = roachpb.Key(keys.MakeTablePrefix(tableID))
tableSpan = roachpb.Span{Key: tableKey, EndKey: tableKey.PrefixEnd()}
mkRecord = func() ptpb.Record {
return ptpb.Record{
ID: uuid.MakeV4(),
Timestamp: tenSecondsAgo.Add(-10*time.Second.Nanoseconds(), 0),
Spans: []roachpb.Span{tableSpan},
}
}
mkStaleTxn = func() *kv.Txn {
txn := db.NewTxn(ctx, "foo")
txn.SetFixedTimestamp(ctx, tenSecondsAgo)
return txn
}
getRejectedMsg = func() string {
return tenSecondsAgo.String() + " must be after replica GC threshold "
}
performScan = func() error {
txn := mkStaleTxn()
_, err := txn.Scan(ctx, tableKey, tableKey.PrefixEnd(), 1)
return err
}
assertScanRejected = func(t *testing.T) {
t.Helper()
require.Regexp(t, getRejectedMsg(), performScan())
}

assertScanOk = func(t *testing.T) {
t.Helper()
require.NoError(t, performScan())
}
// Make sure the cache has been updated. Once it has then we know it won't
// be for minutes. It should read on startup.
waitForCacheAfter = func(t *testing.T, min hlc.Timestamp) {
t.Helper()
testutils.SucceedsSoon(t, func() error {
for i := 0; i < tc.NumServers(); i++ {
ptp := tc.Server(i).ExecutorConfig().(sql.ExecutorConfig).ProtectedTimestampProvider
if ptp.Iterate(ctx, tableKey, tableKey, func(record *ptpb.Record) (wantMore bool) {
return false
}).Less(min) {
return errors.Errorf("not yet read")
}
}
return nil
})
}
setGCTTL = func(t *testing.T, object string, exp int) {
t.Helper()
testutils.SucceedsSoon(t, func() error {
sqlDB.Exec(t, `ALTER `+object+` CONFIGURE ZONE USING gc.ttlseconds = `+strconv.Itoa(exp))
for i := 0; i < tc.NumServers(); i++ {
s := tc.Server(i)
_, r := getFirstStoreReplica(t, s, tableKey)
if _, z := r.DescAndZone(); z.GC.TTLSeconds != int32(exp) {
_, sysCfg := getFirstStoreReplica(t, tc.Server(i), keys.SystemConfigSpan.Key)
require.NoError(t, sysCfg.MaybeGossipSystemConfig(ctx))
return errors.Errorf("expected %d, got %d", exp, z.GC.TTLSeconds)
}
}
return nil
})
}
setStrictGC = func(t *testing.T, val bool) {
t.Helper()
sqlDB.Exec(t, `SET CLUSTER SETTING kv.gc_ttl.strict_enforcement.enabled = `+fmt.Sprint(val))
testutils.SucceedsSoon(t, func() error {
for i := 0; i < tc.NumServers(); i++ {
s, r := getFirstStoreReplica(t, tc.Server(i), keys.SystemConfigSpan.Key)
if kvserver.StrictGCEnforcement.Get(&s.ClusterSettings().SV) != val {
require.NoError(t, r.MaybeGossipSystemConfig(ctx))
return errors.Errorf("expected %v, got %v", val, !val)
}
}
return nil
})
}
setTableGCTTL = func(t *testing.T, exp int) {
t.Helper()
setGCTTL(t, "TABLE foo", exp)
}
setSystemGCTTL = func(t *testing.T, exp int) {
// TODO(ajwerner): adopt this to test the system ranges are unaffected.
t.Helper()
setGCTTL(t, "RANGE system", exp)
}
refreshPastLeaseStart = func(t *testing.T) {
for i := 0; i < tc.NumServers(); i++ {
ptp := tc.Server(i).ExecutorConfig().(sql.ExecutorConfig).ProtectedTimestampProvider
_, r := getFirstStoreReplica(t, tc.Server(i), tableKey)
l, _ := r.GetLease()
require.NoError(t, ptp.Refresh(ctx, l.Start.Next()))
r.ReadProtectedTimestamps(ctx)
}
}
)

{
// Setup the initial state to be sure that we'll actually strictly enforce
// gc ttls.
tc.SplitRangeOrFatal(t, tableKey)
_, err := tc.AddReplicas(tableKey, tc.Target(1), tc.Target(2))
require.NoError(t, err)
_, err = tc.AddReplicas(keys.SystemConfigSpan.Key, tc.Target(1), tc.Target(2))
require.NoError(t, err)

setTableGCTTL(t, 1)
waitForCacheAfter(t, hlc.Timestamp{})

defer sqlDB.Exec(t, `SET CLUSTER SETTING kv.gc_ttl.strict_enforcement.enabled = DEFAULT`)
setStrictGC(t, true)
tenSecondsAgo = tc.Server(0).Clock().Now().Add(-10*time.Second.Nanoseconds(), 0)
}

t.Run("strict enforcement", func(t *testing.T) {
refreshPastLeaseStart(t)
assertScanRejected(t)
})
t.Run("disable strict enforcement", func(t *testing.T) {
setStrictGC(t, false)
defer setStrictGC(t, true)
assertScanOk(t)
})
t.Run("zone config changes are respected", func(t *testing.T) {
setTableGCTTL(t, 60)
assertScanOk(t)
setTableGCTTL(t, 1)
assertScanRejected(t)
})
t.Run("system ranges are unaffected", func(t *testing.T) {
setSystemGCTTL(t, 1)
txn := mkStaleTxn()
descriptorTable := roachpb.Key(keys.MakeTablePrefix(keys.DescriptorTableID))
_, err := txn.Scan(ctx, descriptorTable, descriptorTable.PrefixEnd(), 1)
require.NoError(t, err)
})
t.Run("protected timestamps are respected", func(t *testing.T) {
waitForCacheAfter(t, hlc.Timestamp{})
ptp := tc.Server(0).ExecutorConfig().(sql.ExecutorConfig).ProtectedTimestampProvider
assertScanRejected(t)
// Create a protected timestamp, don't verify it, make sure it's not
// respected.
rec := mkRecord()
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
return ptp.Protect(ctx, txn, &rec)
}))
assertScanRejected(t)

require.NoError(t, ptp.Verify(ctx, rec.ID))
assertScanOk(t)

// Transfer the lease and demonstrate that the query succeeds because we're
// cautious in the face of lease transfers.
desc, err := tc.LookupRange(tableKey)
require.NoError(t, err)
require.NoError(t, tc.TransferRangeLease(desc, tc.Target(1)))
assertScanOk(t)
})
}

// getRangeInfo retreives range info by performing a get against the provided
// key and setting the ReturnRangeInfo flag to true.
func getRangeInfo(
Expand Down
8 changes: 8 additions & 0 deletions pkg/kv/kvserver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,14 @@ func (r *Replica) UnquiesceAndWakeLeader() {
r.unquiesceAndWakeLeaderLocked()
}

func (r *Replica) ReadProtectedTimestamps(ctx context.Context) {
var ts cachedProtectedTimestampState
defer r.maybeUpdateCachedProtectedTS(&ts)
r.mu.RLock()
defer r.mu.RUnlock()
ts = r.readProtectedTimestampsRLocked(ctx, nil /* f */)
}

func (nl *NodeLiveness) SetDrainingInternal(
ctx context.Context, liveness storagepb.Liveness, drain bool,
) error {
Expand Down
Loading

0 comments on commit 793a920

Please sign in to comment.