Skip to content

Commit

Permalink
changefeedccl: fix PTS test
Browse files Browse the repository at this point in the history
Fix failing TestPTSRecordProtectsTargetsAndSystemTables test

Fixes: #135639
Fixes: #138066
Fixes: #137885
Fixes: #137505
Fixes: #136396
Fixes: #135805
Fixes: #135639

Release note: None
  • Loading branch information
asg0451 committed Jan 6, 2025
1 parent fb01a96 commit e0b128e
Show file tree
Hide file tree
Showing 9 changed files with 68 additions and 17 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ go_test(
"//pkg/server/telemetry",
"//pkg/settings/cluster",
"//pkg/spanconfig",
"//pkg/spanconfig/spanconfigjob",
"//pkg/spanconfig/spanconfigptsreader",
"//pkg/sql",
"//pkg/sql/catalog",
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,15 +149,15 @@ func fetchTableDescriptors(
) error {
targetDescs = make([]catalog.TableDescriptor, 0, targets.NumUniqueTables())
if err := txn.KV().SetFixedTimestamp(ctx, ts); err != nil {
return err
return errors.Wrapf(err, "setting timestamp for table descriptor fetch")
}
// Note that all targets are currently guaranteed to have a Table ID
// and lie within the primary index span. Deduplication is important
// here as requesting the same span twice will deadlock.
return targets.EachTableID(func(id catid.DescID) error {
tableDesc, err := descriptors.ByIDWithoutLeased(txn.KV()).WithoutNonPublic().Get().Table(ctx, id)
if err != nil {
return err
return errors.Wrapf(err, "fetching table descriptor %d", id)
}
targetDescs = append(targetDescs, tableDesc)
return nil
Expand Down
54 changes: 49 additions & 5 deletions pkg/ccl/changefeedccl/protected_timestamps_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigjob"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigptsreader"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap"
Expand Down Expand Up @@ -451,16 +453,41 @@ func TestPTSRecordProtectsTargetsAndSystemTables(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

s, db, stopServer := startTestFullServer(t, feedTestOptions{})
// skip.UnderStress(t, "under stress it can take forever for the span config reconciler to update")

ctx := context.Background()

// Useful for debugging.
require.NoError(t, log.SetVModule("spanconfigstore=2,store=2,reconciler=2,mvcc_gc_queue=2,kvaccessor=2"))

settings := cluster.MakeTestingClusterSettings()
spanconfigjob.ReconciliationJobCheckpointInterval.Override(ctx, &settings.SV, 1*time.Second)

// Keep track of where the spanconfig reconciler is up to.
lastReconcilerCheckpoint := atomic.Value{}
lastReconcilerCheckpoint.Store(hlc.Timestamp{})
s, db, stopServer := startTestFullServer(t, feedTestOptions{
knobsFn: func(knobs *base.TestingKnobs) {
if knobs.SpanConfig == nil {
knobs.SpanConfig = &spanconfig.TestingKnobs{}
}
knobs.SpanConfig.(*spanconfig.TestingKnobs).JobOnCheckpointInterceptor = func(lastCheckpoint hlc.Timestamp) error {
now := hlc.Timestamp{WallTime: time.Now().UnixNano()}
t.Logf("reconciler checkpoint %s (%s)", lastCheckpoint, now.GoTime().Sub(lastCheckpoint.GoTime()))
lastReconcilerCheckpoint.Store(lastCheckpoint)
return nil
}
},
settings: settings,
})

defer stopServer()
execCfg := s.ExecutorConfig().(sql.ExecutorConfig)
sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `ALTER DATABASE system CONFIGURE ZONE USING gc.ttlseconds = 1`)
sqlDB.Exec(t, "CREATE TABLE foo (a INT, b STRING)")
sqlDB.Exec(t, `CREATE USER test`)
sqlDB.Exec(t, `GRANT admin TO test`)
ts := s.Clock().Now()
ctx := context.Background()

fooDescr := cdctest.GetHydratedTableDescriptor(t, s.ExecutorConfig(), "d", "foo")
var targets changefeedbase.Targets
Expand All @@ -474,6 +501,11 @@ func TestPTSRecordProtectsTargetsAndSystemTables(t *testing.T) {
return execCfg.ProtectedTimestampProvider.WithTxn(txn).Protect(ctx, ptr)
}))

// Set GC TTL to a small value to make the tables GC'd. We need to set this
// *after* we set the PTS record so that we dont GC the tables before
// the PTS is applied/picked up.
sqlDB.Exec(t, `ALTER DATABASE system CONFIGURE ZONE USING gc.ttlseconds = 1`)

// The following code was shameless stolen from
// TestShowTenantFingerprintsProtectsTimestamp which almost
// surely copied it from the 2-3 other tests that have
Expand Down Expand Up @@ -509,7 +541,7 @@ func TestPTSRecordProtectsTargetsAndSystemTables(t *testing.T) {
var rangeID int64
row.Scan(&rangeID)
refreshPTSReaderCache(s.Clock().Now(), tableName, databaseName)
t.Logf("enqueuing range %d for mvccGC", rangeID)
t.Logf("enqueuing range %d (table %s.%s) for mvccGC", rangeID, tableName, databaseName)
sqlDB.Exec(t, `SELECT crdb_internal.kv_enqueue_replica($1, 'mvccGC', true)`, rangeID)
}

Expand All @@ -523,7 +555,18 @@ func TestPTSRecordProtectsTargetsAndSystemTables(t *testing.T) {
// Change the user's password to update the users table.
sqlDB.Exec(t, `ALTER USER test WITH PASSWORD 'testpass'`)

time.Sleep(2 * time.Second)
// Wait for the spanconfigs to be reconciled.
now := hlc.Timestamp{WallTime: time.Now().UnixNano()}
t.Logf("waiting for spanconfigs to be reconciled")
testutils.SucceedsWithin(t, func() error {
lastCheckpoint := lastReconcilerCheckpoint.Load().(hlc.Timestamp)
if lastCheckpoint.Less(now) {
return errors.Errorf("last checkpoint %s is not less than now %s", lastCheckpoint, now)
}
t.Logf("last reconciler checkpoint ok at %s", lastCheckpoint)
return nil
}, 1*time.Minute)

// If you want to GC all system tables:
//
// tabs := systemschema.MakeSystemTables()
Expand All @@ -532,6 +575,7 @@ func TestPTSRecordProtectsTargetsAndSystemTables(t *testing.T) {
// gcTestTableRange("system", t.GetName())
// }
// }
t.Logf("GC'ing system tables")
gcTestTableRange("system", "descriptor")
gcTestTableRange("system", "zones")
gcTestTableRange("system", "comments")
Expand Down
12 changes: 6 additions & 6 deletions pkg/spanconfig/spanconfigjob/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type resumer struct {

var _ jobs.Resumer = (*resumer)(nil)

var reconciliationJobCheckpointInterval = settings.RegisterDurationSetting(
var ReconciliationJobCheckpointInterval = settings.RegisterDurationSetting(
settings.ApplicationLevel,
"spanconfig.reconciliation_job.checkpoint_interval",
"the frequency at which the span config reconciliation job checkpoints itself",
Expand Down Expand Up @@ -104,17 +104,17 @@ func (r *resumer) Resume(ctx context.Context, execCtxI interface{}) (jobErr erro
syncutil.Mutex
util.EveryN
}{}
persistCheckpointsMu.EveryN = util.Every(reconciliationJobCheckpointInterval.Get(settingValues))
persistCheckpointsMu.EveryN = util.Every(ReconciliationJobCheckpointInterval.Get(settingValues))

reconciliationJobCheckpointInterval.SetOnChange(settingValues, func(ctx context.Context) {
ReconciliationJobCheckpointInterval.SetOnChange(settingValues, func(ctx context.Context) {
persistCheckpointsMu.Lock()
defer persistCheckpointsMu.Unlock()
persistCheckpointsMu.EveryN = util.Every(reconciliationJobCheckpointInterval.Get(settingValues))
persistCheckpointsMu.EveryN = util.Every(ReconciliationJobCheckpointInterval.Get(settingValues))
})

checkpointingDisabled := false
shouldSkipRetry := false
var onCheckpointInterceptor func() error
var onCheckpointInterceptor func(lastCheckpoint hlc.Timestamp) error

retryOpts := retry.Options{
InitialBackoff: 5 * time.Second,
Expand All @@ -140,7 +140,7 @@ func (r *resumer) Resume(ctx context.Context, execCtxI interface{}) (jobErr erro
started := timeutil.Now()
if err := rc.Reconcile(ctx, lastCheckpoint, r.job.Session(), func() error {
if onCheckpointInterceptor != nil {
if err := onCheckpointInterceptor(); err != nil {
if err := onCheckpointInterceptor(lastCheckpoint); err != nil {
return err
}
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/spanconfig/spanconfigkvaccessor/kvaccessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ func (k *KVAccessor) UpdateSpanConfigRecords(
toUpsert []spanconfig.Record,
minCommitTS, maxCommitTS hlc.Timestamp,
) error {
log.VInfof(ctx, 2, "kv accessor updating span configs: toDelete=%+v, toUpsert=%+v, minCommitTS=%s, maxCommitTS=%s", toDelete, toUpsert, minCommitTS, maxCommitTS)

if k.optionalTxn != nil {
return k.updateSpanConfigRecordsWithTxn(ctx, toDelete, toUpsert, k.optionalTxn, minCommitTS, maxCommitTS)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/spanconfig/spanconfigmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func TestReconciliationJobErrorAndRecovery(t *testing.T) {
ManagerDisableJobCreation: true, // disable the automatic job creation
JobDisableInternalRetry: true,
SQLWatcherCheckpointNoopsEveryDurationOverride: 100 * time.Millisecond,
JobOnCheckpointInterceptor: func() error {
JobOnCheckpointInterceptor: func(_ hlc.Timestamp) error {
mu.Lock()
defer mu.Unlock()

Expand Down Expand Up @@ -388,7 +388,7 @@ func TestReconciliationUsesRightCheckpoint(t *testing.T) {
},
ManagerDisableJobCreation: true, // disable the automatic job creation
SQLWatcherCheckpointNoopsEveryDurationOverride: 10 * time.Millisecond,
JobOnCheckpointInterceptor: func() error {
JobOnCheckpointInterceptor: func(_ hlc.Timestamp) error {
select {
case err := <-errCh:
return err
Expand Down
4 changes: 4 additions & 0 deletions pkg/spanconfig/spanconfigreconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,10 @@ func updateSpanConfigRecords(
}
return err // not a retryable error, bubble up
}

if log.V(2) {
log.Infof(ctx, "successfully updated span config records: deleted = %+#v; upserted = %+#v", toDelete, toUpsert)
}
return nil // we performed the update; we're done here
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/spanconfig/spanconfigstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ func (s *Store) maybeLogUpdate(ctx context.Context, update *spanconfig.Update) e

// Log if there is a SpanConfig change in any field other than
// ProtectedTimestamps to avoid logging PTS updates.
if found && curSpanConfig.HasConfigurationChange(nextSC) {
if log.V(2) || (found && curSpanConfig.HasConfigurationChange(nextSC)) {
log.KvDistribution.Infof(ctx,
"changing the spanconfig for span:%+v from:%+v to:%+v",
target, curSpanConfig, nextSC)
Expand Down
2 changes: 1 addition & 1 deletion pkg/spanconfig/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type TestingKnobs struct {

// JobPersistCheckpointInterceptor, if set, is invoked before the
// reconciliation job persists checkpoints.
JobOnCheckpointInterceptor func() error
JobOnCheckpointInterceptor func(lastCheckpoint hlc.Timestamp) error

// KVSubscriberRangeFeedKnobs control lifecycle events for the rangefeed
// underlying the KVSubscriber.
Expand Down

0 comments on commit e0b128e

Please sign in to comment.