Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: fix PTS test #138243

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ go_test(
"//pkg/server/telemetry",
"//pkg/settings/cluster",
"//pkg/spanconfig",
"//pkg/spanconfig/spanconfigjob",
"//pkg/spanconfig/spanconfigptsreader",
"//pkg/sql",
"//pkg/sql/catalog",
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,15 +149,15 @@ func fetchTableDescriptors(
) error {
targetDescs = make([]catalog.TableDescriptor, 0, targets.NumUniqueTables())
if err := txn.KV().SetFixedTimestamp(ctx, ts); err != nil {
return err
return errors.Wrapf(err, "setting timestamp for table descriptor fetch")
}
// Note that all targets are currently guaranteed to have a Table ID
// and lie within the primary index span. Deduplication is important
// here as requesting the same span twice will deadlock.
return targets.EachTableID(func(id catid.DescID) error {
tableDesc, err := descriptors.ByIDWithoutLeased(txn.KV()).WithoutNonPublic().Get().Table(ctx, id)
if err != nil {
return err
return errors.Wrapf(err, "fetching table descriptor %d", id)
}
targetDescs = append(targetDescs, tableDesc)
return nil
Expand Down
56 changes: 51 additions & 5 deletions pkg/ccl/changefeedccl/protected_timestamps_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigjob"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigptsreader"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap"
Expand Down Expand Up @@ -451,16 +453,43 @@ func TestPTSRecordProtectsTargetsAndSystemTables(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

s, db, stopServer := startTestFullServer(t, feedTestOptions{})
// skip.UnderStress(t, "under stress it can take forever for the span config reconciler to update")
Copy link
Contributor Author

@asg0451 asg0451 Jan 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test is still a little flaky... still working on tightening it up

(sometimes waiting for the reconciler takes >1min)


ctx := context.Background()

// Useful for debugging.
require.NoError(t, log.SetVModule("spanconfigstore=2,store=2,reconciler=2,mvcc_gc_queue=2,kvaccessor=2"))

settings := cluster.MakeTestingClusterSettings()
spanconfigjob.ReconciliationJobCheckpointInterval.Override(ctx, &settings.SV, 1*time.Second)

// Keep track of where the spanconfig reconciler is up to.
lastReconcilerCheckpoint := atomic.Value{}
lastReconcilerCheckpoint.Store(hlc.Timestamp{})
s, db, stopServer := startTestFullServer(t, feedTestOptions{
knobsFn: func(knobs *base.TestingKnobs) {
if knobs.SpanConfig == nil {
knobs.SpanConfig = &spanconfig.TestingKnobs{}
}
scKnobs := knobs.SpanConfig.(*spanconfig.TestingKnobs)
scKnobs.JobOnCheckpointInterceptor = func(lastCheckpoint hlc.Timestamp) error {
now := hlc.Timestamp{WallTime: time.Now().UnixNano()}
t.Logf("reconciler checkpoint %s (%s)", lastCheckpoint, now.GoTime().Sub(lastCheckpoint.GoTime()))
lastReconcilerCheckpoint.Store(lastCheckpoint)
return nil
}
scKnobs.SQLWatcherCheckpointNoopsEveryDurationOverride = 1 * time.Second
},
settings: settings,
})

defer stopServer()
execCfg := s.ExecutorConfig().(sql.ExecutorConfig)
sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `ALTER DATABASE system CONFIGURE ZONE USING gc.ttlseconds = 1`)
sqlDB.Exec(t, "CREATE TABLE foo (a INT, b STRING)")
sqlDB.Exec(t, `CREATE USER test`)
sqlDB.Exec(t, `GRANT admin TO test`)
ts := s.Clock().Now()
ctx := context.Background()

fooDescr := cdctest.GetHydratedTableDescriptor(t, s.ExecutorConfig(), "d", "foo")
var targets changefeedbase.Targets
Expand All @@ -474,6 +503,11 @@ func TestPTSRecordProtectsTargetsAndSystemTables(t *testing.T) {
return execCfg.ProtectedTimestampProvider.WithTxn(txn).Protect(ctx, ptr)
}))

// Set GC TTL to a small value to make the tables GC'd. We need to set this
// *after* we set the PTS record so that we dont GC the tables before
// the PTS is applied/picked up.
sqlDB.Exec(t, `ALTER DATABASE system CONFIGURE ZONE USING gc.ttlseconds = 1`)

// The following code was shameless stolen from
// TestShowTenantFingerprintsProtectsTimestamp which almost
// surely copied it from the 2-3 other tests that have
Expand Down Expand Up @@ -509,7 +543,7 @@ func TestPTSRecordProtectsTargetsAndSystemTables(t *testing.T) {
var rangeID int64
row.Scan(&rangeID)
refreshPTSReaderCache(s.Clock().Now(), tableName, databaseName)
t.Logf("enqueuing range %d for mvccGC", rangeID)
t.Logf("enqueuing range %d (table %s.%s) for mvccGC", rangeID, tableName, databaseName)
sqlDB.Exec(t, `SELECT crdb_internal.kv_enqueue_replica($1, 'mvccGC', true)`, rangeID)
}

Expand All @@ -523,7 +557,18 @@ func TestPTSRecordProtectsTargetsAndSystemTables(t *testing.T) {
// Change the user's password to update the users table.
sqlDB.Exec(t, `ALTER USER test WITH PASSWORD 'testpass'`)

time.Sleep(2 * time.Second)
// Wait for the spanconfigs to be reconciled.
now := hlc.Timestamp{WallTime: time.Now().UnixNano()}
t.Logf("waiting for spanconfigs to be reconciled")
testutils.SucceedsWithin(t, func() error {
lastCheckpoint := lastReconcilerCheckpoint.Load().(hlc.Timestamp)
if lastCheckpoint.Less(now) {
return errors.Errorf("last checkpoint %s is not less than now %s", lastCheckpoint, now)
}
t.Logf("last reconciler checkpoint ok at %s", lastCheckpoint)
return nil
}, 1*time.Minute)

// If you want to GC all system tables:
//
// tabs := systemschema.MakeSystemTables()
Expand All @@ -532,6 +577,7 @@ func TestPTSRecordProtectsTargetsAndSystemTables(t *testing.T) {
// gcTestTableRange("system", t.GetName())
// }
// }
t.Logf("GC'ing system tables")
gcTestTableRange("system", "descriptor")
gcTestTableRange("system", "zones")
gcTestTableRange("system", "comments")
Expand Down
12 changes: 6 additions & 6 deletions pkg/spanconfig/spanconfigjob/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type resumer struct {

var _ jobs.Resumer = (*resumer)(nil)

var reconciliationJobCheckpointInterval = settings.RegisterDurationSetting(
var ReconciliationJobCheckpointInterval = settings.RegisterDurationSetting(
settings.ApplicationLevel,
"spanconfig.reconciliation_job.checkpoint_interval",
"the frequency at which the span config reconciliation job checkpoints itself",
Expand Down Expand Up @@ -104,17 +104,17 @@ func (r *resumer) Resume(ctx context.Context, execCtxI interface{}) (jobErr erro
syncutil.Mutex
util.EveryN
}{}
persistCheckpointsMu.EveryN = util.Every(reconciliationJobCheckpointInterval.Get(settingValues))
persistCheckpointsMu.EveryN = util.Every(ReconciliationJobCheckpointInterval.Get(settingValues))

reconciliationJobCheckpointInterval.SetOnChange(settingValues, func(ctx context.Context) {
ReconciliationJobCheckpointInterval.SetOnChange(settingValues, func(ctx context.Context) {
persistCheckpointsMu.Lock()
defer persistCheckpointsMu.Unlock()
persistCheckpointsMu.EveryN = util.Every(reconciliationJobCheckpointInterval.Get(settingValues))
persistCheckpointsMu.EveryN = util.Every(ReconciliationJobCheckpointInterval.Get(settingValues))
})

checkpointingDisabled := false
shouldSkipRetry := false
var onCheckpointInterceptor func() error
var onCheckpointInterceptor func(lastCheckpoint hlc.Timestamp) error

retryOpts := retry.Options{
InitialBackoff: 5 * time.Second,
Expand All @@ -140,7 +140,7 @@ func (r *resumer) Resume(ctx context.Context, execCtxI interface{}) (jobErr erro
started := timeutil.Now()
if err := rc.Reconcile(ctx, lastCheckpoint, r.job.Session(), func() error {
if onCheckpointInterceptor != nil {
if err := onCheckpointInterceptor(); err != nil {
if err := onCheckpointInterceptor(lastCheckpoint); err != nil {
return err
}
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/spanconfig/spanconfigkvaccessor/kvaccessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ func (k *KVAccessor) UpdateSpanConfigRecords(
toUpsert []spanconfig.Record,
minCommitTS, maxCommitTS hlc.Timestamp,
) error {
log.VInfof(ctx, 2, "kv accessor updating span configs: toDelete=%+v, toUpsert=%+v, minCommitTS=%s, maxCommitTS=%s", toDelete, toUpsert, minCommitTS, maxCommitTS)

if k.optionalTxn != nil {
return k.updateSpanConfigRecordsWithTxn(ctx, toDelete, toUpsert, k.optionalTxn, minCommitTS, maxCommitTS)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/spanconfig/spanconfigmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func TestReconciliationJobErrorAndRecovery(t *testing.T) {
ManagerDisableJobCreation: true, // disable the automatic job creation
JobDisableInternalRetry: true,
SQLWatcherCheckpointNoopsEveryDurationOverride: 100 * time.Millisecond,
JobOnCheckpointInterceptor: func() error {
JobOnCheckpointInterceptor: func(_ hlc.Timestamp) error {
mu.Lock()
defer mu.Unlock()

Expand Down Expand Up @@ -388,7 +388,7 @@ func TestReconciliationUsesRightCheckpoint(t *testing.T) {
},
ManagerDisableJobCreation: true, // disable the automatic job creation
SQLWatcherCheckpointNoopsEveryDurationOverride: 10 * time.Millisecond,
JobOnCheckpointInterceptor: func() error {
JobOnCheckpointInterceptor: func(_ hlc.Timestamp) error {
select {
case err := <-errCh:
return err
Expand Down
4 changes: 4 additions & 0 deletions pkg/spanconfig/spanconfigreconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,10 @@ func updateSpanConfigRecords(
}
return err // not a retryable error, bubble up
}

if log.V(2) {
log.Infof(ctx, "successfully updated span config records: deleted = %+#v; upserted = %+#v", toDelete, toUpsert)
}
return nil // we performed the update; we're done here
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/spanconfig/spanconfigstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ func (s *Store) maybeLogUpdate(ctx context.Context, update *spanconfig.Update) e

// Log if there is a SpanConfig change in any field other than
// ProtectedTimestamps to avoid logging PTS updates.
if found && curSpanConfig.HasConfigurationChange(nextSC) {
if log.V(2) || (found && curSpanConfig.HasConfigurationChange(nextSC)) {
log.KvDistribution.Infof(ctx,
"changing the spanconfig for span:%+v from:%+v to:%+v",
target, curSpanConfig, nextSC)
Expand Down
2 changes: 1 addition & 1 deletion pkg/spanconfig/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type TestingKnobs struct {

// JobPersistCheckpointInterceptor, if set, is invoked before the
// reconciliation job persists checkpoints.
JobOnCheckpointInterceptor func() error
JobOnCheckpointInterceptor func(lastCheckpoint hlc.Timestamp) error

// KVSubscriberRangeFeedKnobs control lifecycle events for the rangefeed
// underlying the KVSubscriber.
Expand Down
Loading