diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 6e7b4a840b5b..9888128b4b21 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -289,6 +289,7 @@ go_test( "//pkg/server/telemetry", "//pkg/settings/cluster", "//pkg/spanconfig", + "//pkg/spanconfig/spanconfigjob", "//pkg/spanconfig/spanconfigptsreader", "//pkg/sql", "//pkg/sql/catalog", diff --git a/pkg/ccl/changefeedccl/changefeed_dist.go b/pkg/ccl/changefeedccl/changefeed_dist.go index d2db50babc54..ee3c2f092483 100644 --- a/pkg/ccl/changefeedccl/changefeed_dist.go +++ b/pkg/ccl/changefeedccl/changefeed_dist.go @@ -149,7 +149,7 @@ func fetchTableDescriptors( ) error { targetDescs = make([]catalog.TableDescriptor, 0, targets.NumUniqueTables()) if err := txn.KV().SetFixedTimestamp(ctx, ts); err != nil { - return err + return errors.Wrapf(err, "setting timestamp for table descriptor fetch") } // Note that all targets are currently guaranteed to have a Table ID // and lie within the primary index span. Deduplication is important @@ -157,7 +157,7 @@ func fetchTableDescriptors( return targets.EachTableID(func(id catid.DescID) error { tableDesc, err := descriptors.ByIDWithoutLeased(txn.KV()).WithoutNonPublic().Get().Table(ctx, id) if err != nil { - return err + return errors.Wrapf(err, "fetching table descriptor %d", id) } targetDescs = append(targetDescs, tableDesc) return nil diff --git a/pkg/ccl/changefeedccl/protected_timestamps_test.go b/pkg/ccl/changefeedccl/protected_timestamps_test.go index 9ac09309cac2..2c4e8152c032 100644 --- a/pkg/ccl/changefeedccl/protected_timestamps_test.go +++ b/pkg/ccl/changefeedccl/protected_timestamps_test.go @@ -25,7 +25,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/spanconfig" + "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigjob" "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigptsreader" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap" @@ -451,16 +453,41 @@ func TestPTSRecordProtectsTargetsAndSystemTables(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - s, db, stopServer := startTestFullServer(t, feedTestOptions{}) + ctx := context.Background() + + // Useful for debugging. + require.NoError(t, log.SetVModule("spanconfigstore=2,store=2,reconciler=3,mvcc_gc_queue=2,kvaccessor=2")) + + settings := cluster.MakeTestingClusterSettings() + spanconfigjob.ReconciliationJobCheckpointInterval.Override(ctx, &settings.SV, 1*time.Second) + + // Keep track of where the spanconfig reconciler is up to. + lastReconcilerCheckpoint := atomic.Value{} + lastReconcilerCheckpoint.Store(hlc.Timestamp{}) + s, db, stopServer := startTestFullServer(t, feedTestOptions{ + knobsFn: func(knobs *base.TestingKnobs) { + if knobs.SpanConfig == nil { + knobs.SpanConfig = &spanconfig.TestingKnobs{} + } + scKnobs := knobs.SpanConfig.(*spanconfig.TestingKnobs) + scKnobs.JobOnCheckpointInterceptor = func(lastCheckpoint hlc.Timestamp) error { + now := hlc.Timestamp{WallTime: time.Now().UnixNano()} + t.Logf("reconciler checkpoint %s (%s)", lastCheckpoint, now.GoTime().Sub(lastCheckpoint.GoTime())) + lastReconcilerCheckpoint.Store(lastCheckpoint) + return nil + } + scKnobs.SQLWatcherCheckpointNoopsEveryDurationOverride = 1 * time.Second + }, + settings: settings, + }) + defer stopServer() execCfg := s.ExecutorConfig().(sql.ExecutorConfig) sqlDB := sqlutils.MakeSQLRunner(db) - sqlDB.Exec(t, `ALTER DATABASE system CONFIGURE ZONE USING gc.ttlseconds = 1`) sqlDB.Exec(t, "CREATE TABLE foo (a INT, b STRING)") sqlDB.Exec(t, `CREATE USER test`) sqlDB.Exec(t, `GRANT admin TO test`) ts := s.Clock().Now() - ctx := context.Background() fooDescr := cdctest.GetHydratedTableDescriptor(t, s.ExecutorConfig(), "d", "foo") var targets changefeedbase.Targets @@ -468,12 +495,30 @@ func TestPTSRecordProtectsTargetsAndSystemTables(t *testing.T) { TableID: fooDescr.GetID(), }) + // We need to give our PTS record a legit job ID so the protected ts + // reconciler doesn't delete it, so start up a dummy changefeed job and use its id. + registry := s.JobRegistry().(*jobs.Registry) + dummyJobDone := make(chan struct{}) + defer close(dummyJobDone) + registry.TestingWrapResumerConstructor(jobspb.TypeChangefeed, + func(raw jobs.Resumer) jobs.Resumer { + return &fakeResumer{done: dummyJobDone} + }) + var jobID jobspb.JobID + sqlDB.QueryRow(t, `CREATE CHANGEFEED FOR TABLE foo INTO 'null://'`).Scan(&jobID) + waitForJobStatus(sqlDB, t, jobID, `running`) + // Lay protected timestamp record. - ptr := createProtectedTimestampRecord(ctx, s.Codec(), 42, targets, ts) + ptr := createProtectedTimestampRecord(ctx, s.Codec(), jobID, targets, ts) require.NoError(t, execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { return execCfg.ProtectedTimestampProvider.WithTxn(txn).Protect(ctx, ptr) })) + // Set GC TTL to a small value to make the tables GC'd. We need to set this + // *after* we set the PTS record so that we dont GC the tables before + // the PTS is applied/picked up. + sqlDB.Exec(t, `ALTER DATABASE system CONFIGURE ZONE USING gc.ttlseconds = 1`) + // The following code was shameless stolen from // TestShowTenantFingerprintsProtectsTimestamp which almost // surely copied it from the 2-3 other tests that have @@ -509,7 +554,7 @@ func TestPTSRecordProtectsTargetsAndSystemTables(t *testing.T) { var rangeID int64 row.Scan(&rangeID) refreshPTSReaderCache(s.Clock().Now(), tableName, databaseName) - t.Logf("enqueuing range %d for mvccGC", rangeID) + t.Logf("enqueuing range %d (table %s.%s) for mvccGC", rangeID, tableName, databaseName) sqlDB.Exec(t, `SELECT crdb_internal.kv_enqueue_replica($1, 'mvccGC', true)`, rangeID) } @@ -523,7 +568,21 @@ func TestPTSRecordProtectsTargetsAndSystemTables(t *testing.T) { // Change the user's password to update the users table. sqlDB.Exec(t, `ALTER USER test WITH PASSWORD 'testpass'`) + // Sleep for enough time to pass the configured GC threshold (1 second). time.Sleep(2 * time.Second) + + // Wait for the spanconfigs to be reconciled. + now := hlc.Timestamp{WallTime: time.Now().UnixNano()} + t.Logf("waiting for spanconfigs to be reconciled") + testutils.SucceedsWithin(t, func() error { + lastCheckpoint := lastReconcilerCheckpoint.Load().(hlc.Timestamp) + if lastCheckpoint.Less(now) { + return errors.Errorf("last checkpoint %s is not less than now %s", lastCheckpoint, now) + } + t.Logf("last reconciler checkpoint ok at %s", lastCheckpoint) + return nil + }, 1*time.Minute) + // If you want to GC all system tables: // // tabs := systemschema.MakeSystemTables() @@ -532,6 +591,7 @@ func TestPTSRecordProtectsTargetsAndSystemTables(t *testing.T) { // gcTestTableRange("system", t.GetName()) // } // } + t.Logf("GC'ing system tables") gcTestTableRange("system", "descriptor") gcTestTableRange("system", "zones") gcTestTableRange("system", "comments") diff --git a/pkg/spanconfig/spanconfigjob/job.go b/pkg/spanconfig/spanconfigjob/job.go index 7d48cbff1f20..90d642f97b31 100644 --- a/pkg/spanconfig/spanconfigjob/job.go +++ b/pkg/spanconfig/spanconfigjob/job.go @@ -30,7 +30,7 @@ type resumer struct { var _ jobs.Resumer = (*resumer)(nil) -var reconciliationJobCheckpointInterval = settings.RegisterDurationSetting( +var ReconciliationJobCheckpointInterval = settings.RegisterDurationSetting( settings.ApplicationLevel, "spanconfig.reconciliation_job.checkpoint_interval", "the frequency at which the span config reconciliation job checkpoints itself", @@ -104,17 +104,17 @@ func (r *resumer) Resume(ctx context.Context, execCtxI interface{}) (jobErr erro syncutil.Mutex util.EveryN }{} - persistCheckpointsMu.EveryN = util.Every(reconciliationJobCheckpointInterval.Get(settingValues)) + persistCheckpointsMu.EveryN = util.Every(ReconciliationJobCheckpointInterval.Get(settingValues)) - reconciliationJobCheckpointInterval.SetOnChange(settingValues, func(ctx context.Context) { + ReconciliationJobCheckpointInterval.SetOnChange(settingValues, func(ctx context.Context) { persistCheckpointsMu.Lock() defer persistCheckpointsMu.Unlock() - persistCheckpointsMu.EveryN = util.Every(reconciliationJobCheckpointInterval.Get(settingValues)) + persistCheckpointsMu.EveryN = util.Every(ReconciliationJobCheckpointInterval.Get(settingValues)) }) checkpointingDisabled := false shouldSkipRetry := false - var onCheckpointInterceptor func() error + var onCheckpointInterceptor func(lastCheckpoint hlc.Timestamp) error retryOpts := retry.Options{ InitialBackoff: 5 * time.Second, @@ -140,7 +140,7 @@ func (r *resumer) Resume(ctx context.Context, execCtxI interface{}) (jobErr erro started := timeutil.Now() if err := rc.Reconcile(ctx, lastCheckpoint, r.job.Session(), func() error { if onCheckpointInterceptor != nil { - if err := onCheckpointInterceptor(); err != nil { + if err := onCheckpointInterceptor(lastCheckpoint); err != nil { return err } } diff --git a/pkg/spanconfig/spanconfigkvaccessor/kvaccessor.go b/pkg/spanconfig/spanconfigkvaccessor/kvaccessor.go index ca0b5a22a7c8..fc9e87f1f7ed 100644 --- a/pkg/spanconfig/spanconfigkvaccessor/kvaccessor.go +++ b/pkg/spanconfig/spanconfigkvaccessor/kvaccessor.go @@ -163,6 +163,8 @@ func (k *KVAccessor) UpdateSpanConfigRecords( toUpsert []spanconfig.Record, minCommitTS, maxCommitTS hlc.Timestamp, ) error { + log.VInfof(ctx, 2, "kv accessor updating span configs: toDelete=%+v, toUpsert=%+v, minCommitTS=%s, maxCommitTS=%s", toDelete, toUpsert, minCommitTS, maxCommitTS) + if k.optionalTxn != nil { return k.updateSpanConfigRecordsWithTxn(ctx, toDelete, toUpsert, k.optionalTxn, minCommitTS, maxCommitTS) } diff --git a/pkg/spanconfig/spanconfigmanager/manager_test.go b/pkg/spanconfig/spanconfigmanager/manager_test.go index d9ee96394557..69986b235511 100644 --- a/pkg/spanconfig/spanconfigmanager/manager_test.go +++ b/pkg/spanconfig/spanconfigmanager/manager_test.go @@ -303,7 +303,7 @@ func TestReconciliationJobErrorAndRecovery(t *testing.T) { ManagerDisableJobCreation: true, // disable the automatic job creation JobDisableInternalRetry: true, SQLWatcherCheckpointNoopsEveryDurationOverride: 100 * time.Millisecond, - JobOnCheckpointInterceptor: func() error { + JobOnCheckpointInterceptor: func(_ hlc.Timestamp) error { mu.Lock() defer mu.Unlock() @@ -388,7 +388,7 @@ func TestReconciliationUsesRightCheckpoint(t *testing.T) { }, ManagerDisableJobCreation: true, // disable the automatic job creation SQLWatcherCheckpointNoopsEveryDurationOverride: 10 * time.Millisecond, - JobOnCheckpointInterceptor: func() error { + JobOnCheckpointInterceptor: func(_ hlc.Timestamp) error { select { case err := <-errCh: return err diff --git a/pkg/spanconfig/spanconfigreconciler/reconciler.go b/pkg/spanconfig/spanconfigreconciler/reconciler.go index 9e2c9d43ee98..122cd92fcaba 100644 --- a/pkg/spanconfig/spanconfigreconciler/reconciler.go +++ b/pkg/spanconfig/spanconfigreconciler/reconciler.go @@ -461,6 +461,10 @@ func updateSpanConfigRecords( } return err // not a retryable error, bubble up } + + if log.V(3) { + log.Infof(ctx, "successfully updated span config records: deleted = %+#v; upserted = %+#v", toDelete, toUpsert) + } return nil // we performed the update; we're done here } return nil diff --git a/pkg/spanconfig/spanconfigstore/store.go b/pkg/spanconfig/spanconfigstore/store.go index 20b78331473d..922b702d0c99 100644 --- a/pkg/spanconfig/spanconfigstore/store.go +++ b/pkg/spanconfig/spanconfigstore/store.go @@ -360,7 +360,7 @@ func (s *Store) maybeLogUpdate(ctx context.Context, update *spanconfig.Update) e // Log if there is a SpanConfig change in any field other than // ProtectedTimestamps to avoid logging PTS updates. - if found && curSpanConfig.HasConfigurationChange(nextSC) { + if log.V(2) || (found && curSpanConfig.HasConfigurationChange(nextSC)) { log.KvDistribution.Infof(ctx, "changing the spanconfig for span:%+v from:%+v to:%+v", target, curSpanConfig, nextSC) diff --git a/pkg/spanconfig/testing_knobs.go b/pkg/spanconfig/testing_knobs.go index 2226c9ecb20d..ad968d3a6bed 100644 --- a/pkg/spanconfig/testing_knobs.go +++ b/pkg/spanconfig/testing_knobs.go @@ -49,7 +49,7 @@ type TestingKnobs struct { // JobPersistCheckpointInterceptor, if set, is invoked before the // reconciliation job persists checkpoints. - JobOnCheckpointInterceptor func() error + JobOnCheckpointInterceptor func(lastCheckpoint hlc.Timestamp) error // KVSubscriberRangeFeedKnobs control lifecycle events for the rangefeed // underlying the KVSubscriber.