Skip to content

Commit

Permalink
Merge #74266 #74814
Browse files Browse the repository at this point in the history
74266: spanconfig: set stage for migration r=irfansharif a=irfansharif

**spanconfig: get rid of ReconciliationDependencies interface**

It was hollow, simply embedding the spanconfig.Reconciler interface. In
a future commit we end up relying on each pod's span config reconciler
outside of just the reconciliation job. This makes the interface even
awkwarder than it was.


**spanconfig/reconciler: export the checkpoint timestamp**

We'll make use of it in a future commit.


**kvserver: plumb in a context into (*Store).GetConfReader**

We'll use it in a future commit.


**clusterversion: improve a version comment**

Gets rid of a squiggly line in Goland.

---

Set of commits to set the stage for #73876.

74814: Tag test rules that fail with TestNoLinkForbidden r=rickystewart a=ulfjack

```
--- FAIL: TestNoLinkForbidden (0.01s)
    build.go:56: cannot find package "github.com/cockroachdb/cockroach/pkg/roachpb" in any of:
        	GOROOT/src/github.com/cockroachdb/cockroach/pkg/roachpb (from $GOROOT)
        	/go/src/github.com/cockroachdb/cockroach/pkg/roachpb (from $GOPATH)
```

This is a workaround for #74176.

Release note: None

Co-authored-by: irfan sharif <[email protected]>
Co-authored-by: Ulf Adams <[email protected]>
  • Loading branch information
3 people committed Jan 14, 2022
3 parents a1346da + 5d7f4d3 + d5b32d2 commit ffe26ec
Show file tree
Hide file tree
Showing 45 changed files with 111 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,8 @@ func TestDataDriven(t *testing.T) {
case "reconcile":
tsBeforeReconcilerStart := tenant.Clock().Now()
go func() {
err := tenant.Reconciler().Reconcile(ctx, hlc.Timestamp{}, func(checkpoint hlc.Timestamp) error {
tenant.Checkpoint(checkpoint)
err := tenant.Reconciler().Reconcile(ctx, hlc.Timestamp{} /* startTS */, func() error {
tenant.RecordCheckpoint()
return nil
})
require.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ func TestDataDriven(t *testing.T) {
case "reconcile":
tsBeforeReconcilerStart := tenant.Clock().Now()
go func() {
err := tenant.Reconciler().Reconcile(ctx, hlc.Timestamp{}, func(checkpoint hlc.Timestamp) error {
tenant.Checkpoint(checkpoint)
err := tenant.Reconciler().Reconcile(ctx, hlc.Timestamp{} /* startTS */, func() error {
tenant.RecordCheckpoint()
return nil
})
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ const (
// This version comes with a migration to populate the same seed data
// for existing tenants.
SeedTenantSpanConfigs
// Public schema is backed by a descriptor.
// PublicSchemasWithDescriptors backs public schemas with descriptors.
PublicSchemasWithDescriptors
// AlterSystemProtectedTimestampAddColumn adds a target column to the
// system.protected_ts_records table that describes what is protected by the
Expand Down
1 change: 1 addition & 0 deletions pkg/col/coldata/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ go_test(
"vec_test.go",
],
embed = [":coldata"],
tags = ["no-remote"],
deps = [
"//pkg/col/coldatatestutils",
"//pkg/sql/colconv",
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvclient/kvcoord/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ go_test(
],
data = glob(["testdata/**"]),
embed = [":with-mocks"], # keep
tags = ["no-remote"],
deps = [
"//build/bazelutil:noop",
"//pkg/base",
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/mvcc_gc_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1001,7 +1001,7 @@ func TestMVCCGCQueueIntentResolution(t *testing.T) {
}

// Process through GC queue.
confReader, err := tc.store.GetConfReader()
confReader, err := tc.store.GetConfReader(ctx)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1062,7 +1062,7 @@ func TestMVCCGCQueueLastProcessedTimestamps(t *testing.T) {
}
}

confReader, err := tc.store.GetConfReader()
confReader, err := tc.store.GetConfReader(ctx)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1167,7 +1167,7 @@ func TestMVCCGCQueueChunkRequests(t *testing.T) {
}

// Forward the clock past the default GC time.
confReader, err := tc.store.GetConfReader()
confReader, err := tc.store.GetConfReader(ctx)
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ func (bq *baseQueue) maybeAdd(ctx context.Context, repl replicaInQueue, now hlc.
var confReader spanconfig.StoreReader
if bq.needsSystemConfig {
var err error
confReader, err = bq.store.GetConfReader()
confReader, err = bq.store.GetConfReader(ctx)
if err != nil {
if errors.Is(err, errSysCfgUnavailable) && log.V(1) {
log.Warningf(ctx, "unable to retrieve system config, skipping: %v", err)
Expand Down Expand Up @@ -901,7 +901,7 @@ func (bq *baseQueue) processReplica(ctx context.Context, repl replicaInQueue) er
var confReader spanconfig.StoreReader
if bq.needsSystemConfig {
var err error
confReader, err = bq.store.GetConfReader()
confReader, err = bq.store.GetConfReader(ctx)
if errors.Is(err, errSysCfgUnavailable) {
if log.V(1) {
log.Warningf(ctx, "unable to retrieve conf reader, skipping: %v", err)
Expand Down
16 changes: 8 additions & 8 deletions pkg/kv/kvserver/queue_helpers_testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ func (bq *baseQueue) testingAdd(
return bq.addInternal(ctx, repl.Desc(), repl.ReplicaID(), priority)
}

func forceScanAndProcess(s *Store, q *baseQueue) error {
func forceScanAndProcess(ctx context.Context, s *Store, q *baseQueue) error {
// Check that the system config is available. It is needed by many queues. If
// it's not available, some queues silently fail to process any replicas,
// which is undesirable for this method.
if _, err := s.GetConfReader(); err != nil {
if _, err := s.GetConfReader(ctx); err != nil {
return errors.Wrap(err, "unable to retrieve conf reader")
}

Expand All @@ -44,15 +44,15 @@ func forceScanAndProcess(s *Store, q *baseQueue) error {
}

func mustForceScanAndProcess(ctx context.Context, s *Store, q *baseQueue) {
if err := forceScanAndProcess(s, q); err != nil {
if err := forceScanAndProcess(ctx, s, q); err != nil {
log.Fatalf(ctx, "%v", err)
}
}

// ForceReplicationScanAndProcess iterates over all ranges and
// enqueues any that need to be replicated.
func (s *Store) ForceReplicationScanAndProcess() error {
return forceScanAndProcess(s, s.replicateQueue.baseQueue)
return forceScanAndProcess(context.TODO(), s, s.replicateQueue.baseQueue)
}

// MustForceReplicaGCScanAndProcess iterates over all ranges and enqueues any that
Expand All @@ -70,7 +70,7 @@ func (s *Store) MustForceMergeScanAndProcess() {
// ForceSplitScanAndProcess iterates over all ranges and enqueues any that
// may need to be split.
func (s *Store) ForceSplitScanAndProcess() error {
return forceScanAndProcess(s, s.splitQueue.baseQueue)
return forceScanAndProcess(context.TODO(), s, s.splitQueue.baseQueue)
}

// MustForceRaftLogScanAndProcess iterates over all ranges and enqueues any that
Expand All @@ -83,20 +83,20 @@ func (s *Store) MustForceRaftLogScanAndProcess() {
// any that need time series maintenance, then processes the time series
// maintenance queue.
func (s *Store) ForceTimeSeriesMaintenanceQueueProcess() error {
return forceScanAndProcess(s, s.tsMaintenanceQueue.baseQueue)
return forceScanAndProcess(context.TODO(), s, s.tsMaintenanceQueue.baseQueue)
}

// ForceRaftSnapshotQueueProcess iterates over all ranges, enqueuing
// any that need raft snapshots, then processes the raft snapshot
// queue.
func (s *Store) ForceRaftSnapshotQueueProcess() error {
return forceScanAndProcess(s, s.raftSnapshotQueue.baseQueue)
return forceScanAndProcess(context.TODO(), s, s.raftSnapshotQueue.baseQueue)
}

// ForceConsistencyQueueProcess runs all the ranges through the consistency
// queue.
func (s *Store) ForceConsistencyQueueProcess() error {
return forceScanAndProcess(s, s.consistencyQueue.baseQueue)
return forceScanAndProcess(context.TODO(), s, s.consistencyQueue.baseQueue)
}

// The methods below can be used to control a store's queues. Stopping a queue
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ func TestNeedsSystemConfig(t *testing.T) {
tc.StartWithStoreConfig(ctx, t, stopper, cfg)

{
confReader, err := tc.store.GetConfReader()
confReader, err := tc.store.GetConfReader(ctx)
require.Nil(t, confReader)
require.True(t, errors.Is(err, errSysCfgUnavailable))
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -2841,7 +2841,7 @@ func (s *Store) relocateOne(
`range %s was either in a joint configuration or had learner replicas: %v`, desc, desc.Replicas())
}

confReader, err := s.GetConfReader()
confReader, err := s.GetConfReader(ctx)
if err != nil {
return nil, nil, errors.Wrap(err, "can't relocate range")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,7 @@ func (r *Replica) updateRangeInfo(ctx context.Context, desc *roachpb.RangeDescri
// the original range wont work as the original and new ranges might belong
// to different zones.
// Load the system config.
confReader, err := r.store.GetConfReader()
confReader, err := r.store.GetConfReader(ctx)
if errors.Is(err, errSysCfgUnavailable) {
// This could be before the system config was ever gossiped, or it
// expired. Let the gossip callback set the info.
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2104,7 +2104,7 @@ func (s *Store) startGossip() {
var errSysCfgUnavailable = errors.New("system config not available in gossip")

// GetConfReader exposes access to a configuration reader.
func (s *Store) GetConfReader() (spanconfig.StoreReader, error) {
func (s *Store) GetConfReader(ctx context.Context) (spanconfig.StoreReader, error) {
if s.cfg.TestingKnobs.MakeSystemConfigSpanUnavailableToQueues {
return nil, errSysCfgUnavailable
}
Expand Down Expand Up @@ -3260,7 +3260,7 @@ func (s *Store) ManuallyEnqueue(
return nil, nil, errors.Errorf("unknown queue type %q", queueName)
}

confReader, err := s.GetConfReader()
confReader, err := s.GetConfReader(ctx)
if err != nil {
return nil, nil, errors.Wrap(err,
"unable to retrieve conf reader, cannot run queue; make sure "+
Expand Down
1 change: 1 addition & 0 deletions pkg/roachpb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ go_test(
"version_test.go",
],
embed = [":with-mocks"], # keep
tags = ["no-remote"],
deps = [
"//pkg/cli/exit",
"//pkg/kv/kvserver/concurrency/lock",
Expand Down
1 change: 1 addition & 0 deletions pkg/server/diagnostics/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ go_test(
"update_checker_test.go",
],
embed = [":diagnostics"],
tags = ["no-remote"],
deps = [
"//pkg/base",
"//pkg/build",
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -907,7 +907,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
spanConfigKnobs,
)

execCfg.SpanConfigReconciliationJobDeps = spanConfig.manager
execCfg.SpanConfigReconciler = spanConfigReconciler
}
execCfg.SpanConfigKVAccessor = cfg.sqlServerOptionalKVArgs.spanConfigKVAccessor

Expand Down
21 changes: 10 additions & 11 deletions pkg/spanconfig/spanconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,25 +160,24 @@ type Reconciler interface {
// timestamp. If it does not find MVCC history going far back enough[1], it
// falls back to a scan of all descriptors and zone configs before being
// able to do more incremental work. The provided callback is invoked
// with timestamps that can be safely checkpointed. A future Reconciliation
// attempt can make use of this timestamp to reduce the amount of necessary
// work (provided the MVCC history is still available).
// whenever incremental progress has been made and a Checkpoint() timestamp
// is available. A future Reconcile() attempt can make use of this timestamp
// to reduce the amount of necessary work (provided the MVCC history is
// still available).
//
// [1]: It's possible for system.{zones,descriptor} to have been GC-ed away;
// think suspended tenants.
Reconcile(
ctx context.Context,
startTS hlc.Timestamp,
callback func(checkpoint hlc.Timestamp) error,
onCheckpoint func() error,
) error
}

// ReconciliationDependencies captures what's needed by the span config
// reconciliation job to perform its task. The job is responsible for
// reconciling a tenant's zone configurations with the clusters span
// configurations.
type ReconciliationDependencies interface {
Reconciler
// Checkpoint returns a timestamp suitable for checkpointing. A future
// Reconcile() attempt can make use of this timestamp to reduce the
// amount of necessary work (provided the MVCC history is
// still available).
Checkpoint() hlc.Timestamp
}

// Store is a data structure used to store spans and their corresponding
Expand Down
6 changes: 3 additions & 3 deletions pkg/spanconfig/spanconfigjob/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ var _ jobs.Resumer = (*resumer)(nil)
// Resume implements the jobs.Resumer interface.
func (r *resumer) Resume(ctx context.Context, execCtxI interface{}) error {
execCtx := execCtxI.(sql.JobExecContext)
rc := execCtx.SpanConfigReconciliationJobDeps()
rc := execCtx.SpanConfigReconciler()

// TODO(irfansharif): #73086 bubbles up retryable errors from the
// reconciler/underlying watcher in the (very) unlikely event that it's
Expand All @@ -41,10 +41,10 @@ func (r *resumer) Resume(ctx context.Context, execCtxI interface{}) error {
// the job all over again after some time, it's just that the checks for
// failed jobs happen infrequently.

if err := rc.Reconcile(ctx, hlc.Timestamp{}, func(checkpoint hlc.Timestamp) error {
if err := rc.Reconcile(ctx, hlc.Timestamp{}, func() error {
// TODO(irfansharif): Stash this checkpoint somewhere and use it when
// starting back up.
_ = checkpoint
_ = rc.Checkpoint()
return nil
}); err != nil {
return err
Expand Down
7 changes: 4 additions & 3 deletions pkg/spanconfig/spanconfigmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,11 @@ var jobEnabledSetting = settings.RegisterBoolSetting(
"enable the use of the kv accessor", false)

// Manager is the coordinator of the span config subsystem. It ensures that
// there's only one span config reconciliation job for every tenant. It also
// there's only one span config reconciliation job[1] for every tenant. It also
// captures all relevant dependencies for the job.
//
// [1]: The reconciliation job is responsible for reconciling a tenant's zone
// configurations with the clusters span configurations.
type Manager struct {
db *kv.DB
jr *jobs.Registry
Expand All @@ -67,8 +70,6 @@ type Manager struct {
spanconfig.Reconciler
}

var _ spanconfig.ReconciliationDependencies = &Manager{}

// New constructs a new Manager.
func New(
db *kv.DB,
Expand Down
1 change: 1 addition & 0 deletions pkg/spanconfig/spanconfigreconciler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ go_library(
"//pkg/sql/catalog/descs",
"//pkg/sql/sem/tree",
"//pkg/util/hlc",
"//pkg/util/syncutil",
"@com_github_cockroachdb_errors//:errors",
],
)
Expand Down
33 changes: 29 additions & 4 deletions pkg/spanconfig/spanconfigreconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
)

Expand All @@ -38,6 +39,11 @@ type Reconciler struct {
codec keys.SQLCodec
tenID roachpb.TenantID
knobs *spanconfig.TestingKnobs

mu struct {
syncutil.RWMutex
lastCheckpoint hlc.Timestamp
}
}

var _ spanconfig.Reconciler = &Reconciler{}
Expand Down Expand Up @@ -122,7 +128,7 @@ func New(
// checkpoint. For changes to, say, RANGE DEFAULT, the RPC request proto is
// proportional to the number of schema objects.
func (r *Reconciler) Reconcile(
ctx context.Context, startTS hlc.Timestamp, callback func(checkpoint hlc.Timestamp) error,
ctx context.Context, startTS hlc.Timestamp, onCheckpoint func() error,
) error {
// TODO(irfansharif): Check system.{zones,descriptors} for last GC timestamp
// and avoid the full reconciliation pass if the startTS provided is
Expand All @@ -136,15 +142,20 @@ func (r *Reconciler) Reconcile(
codec: r.codec,
tenID: r.tenID,
}
latestStore, reconciledUpto, err := full.reconcile(ctx)
latestStore, reconciledUpUntil, err := full.reconcile(ctx)
if err != nil {
return err
}

if err := callback(reconciledUpto); err != nil {
r.mu.Lock()
r.mu.lastCheckpoint = reconciledUpUntil
r.mu.Unlock()

if err := onCheckpoint(); err != nil {
return err
}

incrementalStartTS := reconciledUpUntil
incremental := incrementalReconciler{
sqlTranslator: r.sqlTranslator,
sqlWatcher: r.sqlWatcher,
Expand All @@ -154,7 +165,21 @@ func (r *Reconciler) Reconcile(
codec: r.codec,
knobs: r.knobs,
}
return incremental.reconcile(ctx, reconciledUpto, callback)
return incremental.reconcile(ctx, incrementalStartTS, func(reconciledUpUntil hlc.Timestamp) error {
r.mu.Lock()
r.mu.lastCheckpoint = reconciledUpUntil
r.mu.Unlock()

return onCheckpoint()
})
}

// Checkpoint is part of the spanconfig.Reconciler interface.
func (r *Reconciler) Checkpoint() hlc.Timestamp {
r.mu.RLock()
defer r.mu.RUnlock()

return r.mu.lastCheckpoint
}

// fullReconciler is a single-use orchestrator for the full reconciliation
Expand Down
Loading

0 comments on commit ffe26ec

Please sign in to comment.