Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
74555: spanconfig: support rangefeeds for dynamic system tables r=irfansharif a=irfansharif

Fixes #73045.

We're running out of system table IDs (see #57531), and as a result
we're now introducing the notion of dynamic system IDs throughout the
system. Previously KV baked-in the assumption of static system IDs at
two points:
- When deciding to allow rangefeeds on a given range;
- When enforcing strict GC TTL;

It did so by decoding the range's key span and comparing against the
hard-coded maximum system ID, all to determine whether the range in
question contained system tables. If so, we allowed rangefeeds to be
declared over it, and also did not enforce strict GC TTL (only really
applies to user tables). This way of doing things does not compose with
dynamically allocated system table IDs. With arbitrary, possibly
non-contiguous IDs, we don't have the convenient key-comparison
properties to rely on.

To that end, we use the span configs infrastructure to to delegate
control of whether rangefeeds are enabled over a given range and whether
strict GC is enforced. This scheme allows SQL code to declare "system
table configs" over arbitrary schemas, and have KV still respect it.
This PR does not expose these span config settings as part of zone
configs -- there's no need to (though we could in the future). To
account for the asynchronous nature of the span configs infra, we need
to ensure that ranges without an available config default to enabling
rangefeeds.

Release note: None

75698: sql/catalog/descs: move direct KV access methods under an interface r=ajwerner a=ajwerner

These things are both generally frowned upon and have different contracts
than the collection itself. Separate them with an interface layer of
indirection to make it clearer that there's a trap lurking around these
methods.

Release note: None

75724: server.go: misc refactors r=tbg a=knz

This moves code out of server.go into distinct files, to make the code easier to navigate/explore.



75743: roachprod: add cockroach-roachstress GCE project to GC cronjob r=srosenberg a=srosenberg

Roachtest Stress runs nightly in TC; to reduce the chance of
resource exhaustion, these tests are executed in cockroach-roachstress
project whereas roachtests are normally executed in cockroach-ephemeral.
Thus, to ensure all resources are eventually GCed, we add
cockroach-roachstress to the list of currently GCed projects.

Release note: None

Co-authored-by: irfan sharif <[email protected]>
Co-authored-by: Andrew Werner <[email protected]>
Co-authored-by: Raphael 'kena' Poss <[email protected]>
Co-authored-by: Stan Rosenberg <[email protected]>
  • Loading branch information
5 people committed Jan 31, 2022
5 parents baeba80 + 43a96b3 + d61df4c + 760f681 + ae9e7a4 commit 6a0bdec
Show file tree
Hide file tree
Showing 103 changed files with 3,084 additions and 2,339 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/create_scheduled_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -774,7 +774,7 @@ func fullyQualifyScheduledBackupTargetTables(
if err != nil {
return err
}
schemaID, err = col.ResolveSchemaID(ctx, txn, dbDesc.GetID(), tp.SchemaName.String())
schemaID, err = col.Direct().ResolveSchemaID(ctx, txn, dbDesc.GetID(), tp.SchemaName.String())
return err
}); err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -2484,7 +2484,7 @@ func getRestorePrivilegesForTableOrSchema(
}
}
} else if descCoverage == tree.RequestedDescriptors {
parentDB, err := descsCol.MustGetDatabaseDescByID(ctx, txn, desc.GetParentID())
parentDB, err := descsCol.Direct().MustGetDatabaseDescByID(ctx, txn, desc.GetParentID())
if err != nil {
return nil, errors.Wrapf(err, "failed to lookup parent DB %d", errors.Safe(desc.GetParentID()))
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/restore_old_versions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ func TestRestoreWithDroppedSchemaCorruption(t *testing.T) {
hasSameNameSchema := func(dbName string) (exists bool) {
require.NoError(t, sql.DescsTxn(ctx, &execCfg, func(ctx context.Context, txn *kv.Txn, col *descs.Collection) error {
// Using this method to avoid validation.
id, err := col.LookupDatabaseID(ctx, txn, dbName)
id, err := col.Direct().LookupDatabaseID(ctx, txn, dbName)
if err != nil {
return err
}
Expand Down
26 changes: 13 additions & 13 deletions pkg/ccl/backupccl/restore_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func synthesizePGTempSchema(
var synthesizedSchemaID descpb.ID
err := sql.DescsTxn(ctx, p.ExecCfg(), func(ctx context.Context, txn *kv.Txn, col *descs.Collection) error {
var err error
schemaID, err := col.LookupSchemaID(ctx, txn, dbID, schemaName)
schemaID, err := col.Direct().LookupSchemaID(ctx, txn, dbID, schemaName)
if err != nil {
return err
}
Expand Down Expand Up @@ -528,7 +528,7 @@ func allocateDescriptorRewrites(
if err := sql.DescsTxn(ctx, p.ExecCfg(), func(ctx context.Context, txn *kv.Txn, col *descs.Collection) error {
// Check that any DBs being restored do _not_ exist.
for name := range restoreDBNames {
dbID, err := col.LookupDatabaseID(ctx, txn, name)
dbID, err := col.Direct().LookupDatabaseID(ctx, txn, name)
if err != nil {
return err
}
Expand Down Expand Up @@ -562,7 +562,7 @@ func allocateDescriptorRewrites(
}

// See if there is an existing schema with the same name.
id, err := col.LookupSchemaID(ctx, txn, parentID, sc.Name)
id, err := col.Direct().LookupSchemaID(ctx, txn, parentID, sc.Name)
if err != nil {
return err
}
Expand All @@ -572,7 +572,7 @@ func allocateDescriptorRewrites(
} else {
// If we found an existing schema, then we need to remap all references
// to this schema to the existing one.
desc, err := col.MustGetSchemaDescByID(ctx, txn, id)
desc, err := col.Direct().MustGetSchemaDescByID(ctx, txn, id)
if err != nil {
return err
}
Expand Down Expand Up @@ -608,7 +608,7 @@ func allocateDescriptorRewrites(
} else {
var parentID descpb.ID
{
newParentID, err := col.LookupDatabaseID(ctx, txn, targetDB)
newParentID, err := col.Direct().LookupDatabaseID(ctx, txn, targetDB)
if err != nil {
return err
}
Expand All @@ -621,13 +621,13 @@ func allocateDescriptorRewrites(
// Check that the table name is _not_ in use.
// This would fail the CPut later anyway, but this yields a prettier error.
tableName := tree.NewUnqualifiedTableName(tree.Name(table.GetName()))
err := col.CheckObjectCollision(ctx, txn, parentID, table.GetParentSchemaID(), tableName)
err := col.Direct().CheckObjectCollision(ctx, txn, parentID, table.GetParentSchemaID(), tableName)
if err != nil {
return err
}

// Check privileges.
parentDB, err := col.MustGetDatabaseDescByID(ctx, txn, parentID)
parentDB, err := col.Direct().MustGetDatabaseDescByID(ctx, txn, parentID)
if err != nil {
return errors.Wrapf(err,
"failed to lookup parent DB %d", errors.Safe(parentID))
Expand Down Expand Up @@ -680,7 +680,7 @@ func allocateDescriptorRewrites(
}

// Look up the parent database's ID.
parentID, err := col.LookupDatabaseID(ctx, txn, targetDB)
parentID, err := col.Direct().LookupDatabaseID(ctx, txn, targetDB)
if err != nil {
return err
}
Expand All @@ -689,7 +689,7 @@ func allocateDescriptorRewrites(
targetDB, typ.Name)
}
// Check privileges on the parent DB.
parentDB, err := col.MustGetDatabaseDescByID(ctx, txn, parentID)
parentDB, err := col.Direct().MustGetDatabaseDescByID(ctx, txn, parentID)
if err != nil {
return errors.Wrapf(err,
"failed to lookup parent DB %d", errors.Safe(parentID))
Expand All @@ -704,7 +704,7 @@ func allocateDescriptorRewrites(
}
return
}
desc, err := col.GetDescriptorCollidingWithObject(
desc, err := col.Direct().GetDescriptorCollidingWithObject(
ctx,
txn,
parentID,
Expand All @@ -729,7 +729,7 @@ func allocateDescriptorRewrites(
// Ensure that there isn't a collision with the array type name.
arrTyp := typesByID[typ.ArrayTypeID]
typeName := tree.NewUnqualifiedTypeName(arrTyp.GetName())
err = col.CheckObjectCollision(ctx, txn, parentID, getParentSchemaID(typ), typeName)
err = col.Direct().CheckObjectCollision(ctx, txn, parentID, getParentSchemaID(typ), typeName)
if err != nil {
return errors.Wrapf(err, "name collision for %q's array type", typ.Name)
}
Expand Down Expand Up @@ -911,15 +911,15 @@ func allocateDescriptorRewrites(
func getDatabaseIDAndDesc(
ctx context.Context, txn *kv.Txn, col *descs.Collection, targetDB string,
) (dbID descpb.ID, dbDesc catalog.DatabaseDescriptor, err error) {
dbID, err = col.LookupDatabaseID(ctx, txn, targetDB)
dbID, err = col.Direct().LookupDatabaseID(ctx, txn, targetDB)
if err != nil {
return 0, nil, err
}
if dbID == descpb.InvalidID {
return dbID, nil, errors.Errorf("a database named %q needs to exist", targetDB)
}
// Check privileges on the parent DB.
dbDesc, err = col.MustGetDatabaseDescByID(ctx, txn, dbID)
dbDesc, err = col.Direct().MustGetDatabaseDescByID(ctx, txn, dbID)
if err != nil {
return 0, nil, errors.Wrapf(err,
"failed to lookup parent DB %d", errors.Safe(dbID))
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/targets.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ func checkMultiRegionCompatible(
// For REGION BY TABLE IN <region> tables, allow the restore if the
// database has the region.
regionEnumID := database.GetRegionConfig().RegionEnumID
regionEnum, err := col.MustGetTypeDescByID(ctx, txn, regionEnumID)
regionEnum, err := col.Direct().MustGetTypeDescByID(ctx, txn, regionEnumID)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -895,7 +895,7 @@ func getQualifiedTableName(
ctx context.Context, execCfg *sql.ExecutorConfig, txn *kv.Txn, desc catalog.TableDescriptor,
) (string, error) {
col := execCfg.CollectionFactory.MakeCollection(nil /* temporarySchemaProvider */)
dbDesc, err := col.MustGetDatabaseDescByID(ctx, txn, desc.GetParentID())
dbDesc, err := col.Direct().MustGetDatabaseDescByID(ctx, txn, desc.GetParentID())
if err != nil {
return "", err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"testing"
"time"

"github.com/cockroachdb/apd/v3"
apd "github.com/cockroachdb/apd/v3"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/blobs"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest"
Expand Down Expand Up @@ -566,10 +566,10 @@ func cloudStorageTestWithOptions(testFn cdcTestFn, options feedTestOptions) func
}
blobClientFactory := blobs.NewLocalOnlyBlobClientFactory(options.externalIODir)
if serverKnobs, ok := knobs.Server.(*server.TestingKnobs); ok {
serverKnobs.TenantBlobClientFactory = blobClientFactory
serverKnobs.BlobClientFactory = blobClientFactory
} else {
knobs.Server = &server.TestingKnobs{
TenantBlobClientFactory: blobClientFactory,
BlobClientFactory: blobClientFactory,
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/importccl/import_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ func prepareNewTablesForIngestion(
// collisions with any importing tables.
for i := range newMutableTableDescriptors {
tbl := newMutableTableDescriptors[i]
err := descsCol.CheckObjectCollision(
err := descsCol.Direct().CheckObjectCollision(
ctx,
txn,
tbl.GetParentID(),
Expand Down Expand Up @@ -672,7 +672,7 @@ func createSchemaDescriptorWithID(
log.VEventf(ctx, 2, "CPut %s -> %d", idKey, descID)
}
b.CPut(idKey, descID, nil)
if err := descsCol.WriteNewDescToBatch(
if err := descsCol.Direct().WriteNewDescToBatch(
ctx,
p.ExtendedEvalContext().Tracing.KVTracingEnabled(),
b,
Expand Down Expand Up @@ -1125,7 +1125,7 @@ func (r *importResumer) checkForUDTModification(
ctx context.Context, txn *kv.Txn, col *descs.Collection,
savedTypeDesc *descpb.TypeDescriptor,
) error {
typeDesc, err := col.MustGetTypeDescByID(ctx, txn, savedTypeDesc.GetID())
typeDesc, err := col.Direct().MustGetTypeDescByID(ctx, txn, savedTypeDesc.GetID())
if err != nil {
return errors.Wrap(err, "resolving type descriptor when checking version mismatch")
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1939,7 +1939,7 @@ func TestFailedImportGC(t *testing.T) {
tableID := descpb.ID(dbID + 2)
var td catalog.TableDescriptor
if err := sql.TestingDescsTxn(ctx, tc.Server(0), func(ctx context.Context, txn *kv.Txn, col *descs.Collection) (err error) {
td, err = col.MustGetTableDescByID(ctx, txn, tableID)
td, err = col.Direct().MustGetTableDescByID(ctx, txn, tableID)
return err
}); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -6131,7 +6131,7 @@ func TestImportPgDumpSchemas(t *testing.T) {
for _, schemaID := range schemaIDs {
// Expect that the schema descriptor is deleted.
if err := sql.TestingDescsTxn(ctx, tc.Server(0), func(ctx context.Context, txn *kv.Txn, col *descs.Collection) (err error) {
_, err = col.MustGetSchemaDescByID(ctx, txn, schemaID)
_, err = col.Direct().MustGetSchemaDescByID(ctx, txn, schemaID)
if !testutils.IsError(err, "descriptor not found") {
return err
}
Expand All @@ -6144,7 +6144,7 @@ func TestImportPgDumpSchemas(t *testing.T) {
for _, tableID := range tableIDs {
// Expect that the table descriptor is deleted.
if err := sql.TestingDescsTxn(ctx, tc.Server(0), func(ctx context.Context, txn *kv.Txn, col *descs.Collection) (err error) {
_, err = col.MustGetTableDescByID(ctx, txn, tableID)
_, err = col.Direct().MustGetTableDescByID(ctx, txn, tableID)
if !testutils.IsError(err, "descriptor not found") {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/importccl/read_import_pgdump.go
Original file line number Diff line number Diff line change
Expand Up @@ -874,11 +874,11 @@ func readPostgresStmt(
for _, name := range names {
tableName := name.ToUnresolvedObjectName().String()
if err := sql.DescsTxn(ctx, p.ExecCfg(), func(ctx context.Context, txn *kv.Txn, col *descs.Collection) error {
dbDesc, err := col.MustGetDatabaseDescByID(ctx, txn, parentID)
dbDesc, err := col.Direct().MustGetDatabaseDescByID(ctx, txn, parentID)
if err != nil {
return err
}
err = col.CheckObjectCollision(
err = col.Direct().CheckObjectCollision(
ctx,
txn,
parentID,
Expand Down
12 changes: 6 additions & 6 deletions pkg/ccl/oidcccl/authentication_oidc.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/coreos/go-oidc"
oidc "github.com/coreos/go-oidc"
"golang.org/x/oauth2"
)

Expand Down Expand Up @@ -258,7 +258,7 @@ var ConfigureOIDC = func(
serverCtx context.Context,
st *cluster.Settings,
locality roachpb.Locality,
mux *http.ServeMux,
handleHTTP func(pattern string, handler http.Handler),
userLoginFromSSO func(ctx context.Context, username string) (*http.Cookie, error),
ambientCtx log.AmbientContext,
cluster uuid.UUID,
Expand All @@ -268,7 +268,7 @@ var ConfigureOIDC = func(
// Don't want to use GRPC here since these endpoints require HTTP-Redirect behaviors and the
// callback endpoint will be receiving specialized parameters that grpc-gateway will only get
// in the way of processing.
mux.HandleFunc(oidcCallbackPath, func(w http.ResponseWriter, r *http.Request) {
handleHTTP(oidcCallbackPath, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()

// Verify state and errors.
Expand Down Expand Up @@ -376,9 +376,9 @@ var ConfigureOIDC = func(
http.Redirect(w, r, "/", http.StatusTemporaryRedirect)

telemetry.Inc(loginSuccessUseCounter)
})
}))

mux.HandleFunc(oidcLoginPath, func(w http.ResponseWriter, r *http.Request) {
handleHTTP(oidcLoginPath, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()

oidcAuthentication.mutex.Lock()
Expand Down Expand Up @@ -406,7 +406,7 @@ var ConfigureOIDC = func(
http.Redirect(
w, r, oidcAuthentication.oauth2Config.AuthCodeURL(kast.signedTokenEncoded), http.StatusFound,
)
})
}))

reloadConfig(serverCtx, oidcAuthentication, locality, st)

Expand Down
Loading

0 comments on commit 6a0bdec

Please sign in to comment.