Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…db#68903 cockroachdb#68978

68567: colrpc: enhance warnings from the outbox r=yuzefovich a=yuzefovich

This commit marks several string constants as "safe" from the
redactability perspective so that the warnings logged by the outboxes
are more helpful. Additionally, several minor nits around error
formatting are addressed.

Release note: None

68675: sql: implement IterateStatementStats() for PersistedSQLStats r=maryliag a=Azhng

Depends on: cockroachdb#68555, cockroachdb#68620
Related to: cockroachdb#64743

Previously, IterateStatementStats() for PersistedSQLStats was
left unimplemented and it defaults to the implementation of
SQLStats.IterateStatementStats(). This means calls to
IterateStatementStats() on PersistedSQLStats cannot read
the statement statistitcs stored in system table.

This commit implements the IterateStatementStats() through
the new CombinedStmtStatsIterator which enables this method
to read both in-memory and persited statement statistics.

Release note: None

68738: sql/catalog/dbdesc: repair old descriptors corrupted due to DROPPED SCHEMA name r=ajwerner a=sajjadrizvi

cockroachdb#63119 fixes a bug that corrupts a database descriptor when a child schema was 
dropped, adding an entry in schema-info structure erroneously with database name
instead of schema name . Although the bug was fixed , there can be database backups 
with corrupted descriptors.

This commit adds a post-deserialization function to repair a corrupted descriptor. Moreover,
it adds a test function to ensure that descriptors with such corruption are fixed during
migration.
 
Release note: None
 
Fixes: cockroachdb#63148


68903: github: redirect KV code reviews to @cockroachdb/kv-prs r=irfansharif a=irfansharif

We try to use the @cockroachdb/kv alias for notifying the entire team on
issues. There's no way to disable notifications for github's automatic
codeowners driven review requests, and that tends to be a firehose, so
lets use this sub-team alias instead.

Release note: None

68978: changefeedccl: detect sink URLs with no scheme r=HonoreDB a=stevendanna

Previously, if a user provided a sink URL with no scheme (such as
` kafka%3A%2F%2Fnope%0A`), a changefeed job would be started. However,
this changefeed job would be writing into a bufferSink.  The
bufferSink is used by core changefeeds.

The user may have provided such a URL because of confusion over how to
URL encode their sink URL.

Now, they will receive an error such as

```
pq: no scheme found for sink URL "kafka%3A%2F%2Fnope%0A"
```

Release note (enterprise change): CHANGEFEED statements now error
if the provided sink URL does not contain a scheme. Such URLs are
typically a mistake and will result in non-functional changefeeds.

Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Azhng <[email protected]>
Co-authored-by: Sajjad Rizvi <[email protected]>
Co-authored-by: irfan sharif <[email protected]>
Co-authored-by: Steven Danna <[email protected]>
  • Loading branch information
6 people committed Aug 16, 2021
6 parents dfe97af + 9ac3f70 + 08a8eec + 88bf4ba + d8d71d4 + e8dfc48 commit f2aded5
Show file tree
Hide file tree
Showing 38 changed files with 1,106 additions and 173 deletions.
18 changes: 9 additions & 9 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
# last-rule-wins so bulk i/o takes userfile.go even though cli-prs takes pkg/cli
/pkg/cli/userfile.go @cockroachdb/bulk-prs
/pkg/cli/demo*.go @cockroachdb/cli-prs @cockroachdb/sql-experience @cockroachdb/server-prs
/pkg/cli/debug*.go @cockroachdb/cli-prs @cockroachdb/kv
/pkg/cli/debug*.go @cockroachdb/cli-prs @cockroachdb/kv-prs
/pkg/cli/debug_job_trace*.go @cockroachdb/bulk-prs
/pkg/cli/doctor*.go @cockroachdb/cli-prs @cockroachdb/sql-schema
/pkg/cli/import_test.go @cockroachdb/cli-prs @cockroachdb/bulk-prs
Expand Down Expand Up @@ -98,7 +98,7 @@

/pkg/geo/ @cockroachdb/geospatial

/pkg/kv/ @cockroachdb/kv
/pkg/kv/ @cockroachdb/kv-prs

/pkg/storage/ @cockroachdb/storage

Expand Down Expand Up @@ -138,8 +138,8 @@
/pkg/ccl/utilccl/ @cockroachdb/server-prs
/pkg/ccl/workloadccl/ @cockroachdb/sql-experience
/pkg/ccl/benchccl/rttanalysisccl/ @cockroachdb/sql-experience
/pkg/clusterversion/ @cockroachdb/kv
/pkg/cmd/allocsim/ @cockroachdb/kv
/pkg/clusterversion/ @cockroachdb/kv-prs
/pkg/cmd/allocsim/ @cockroachdb/kv-prs
/pkg/cmd/bazci/ @cockroachdb/dev-inf
/pkg/cmd/cmdutil/ @cockroachdb/dev-inf
/pkg/cmd/cmp-protocol/ @cockroachdb/sql-experience
Expand All @@ -160,7 +160,7 @@
/pkg/cmd/geoviz/ @cockroachdb/geospatial
/pkg/cmd/github-post/ @cockroachdb/test-eng
/pkg/cmd/github-pull-request-make/ @cockroachdb/dev-inf
/pkg/cmd/gossipsim/ @cockroachdb/kv
/pkg/cmd/gossipsim/ @cockroachdb/kv-prs
/pkg/cmd/import-tools/ @cockroachdb/dev-inf
/pkg/cmd/internal/issues/ @cockroachdb/test-eng
/pkg/cmd/prereqs/ @cockroachdb/dev-inf
Expand Down Expand Up @@ -192,26 +192,26 @@
/pkg/docs/ @cockroachdb/docs
/pkg/featureflag/ @cockroachdb/cli-prs-noreview
/pkg/gossip/ @cockroachdb/kv-noreview
/pkg/internal/client/requestbatcher/ @cockroachdb/kv
/pkg/internal/client/requestbatcher/ @cockroachdb/kv-prs
/pkg/internal/codeowners/ @cockroachdb/test-eng
/pkg/internal/reporoot @cockroachdb/dev-inf
/pkg/internal/rsg/ @cockroachdb/sql-queries
/pkg/internal/sqlsmith/ @cockroachdb/sql-queries
/pkg/internal/team/ @cockroachdb/test-eng
/pkg/jobs/ @cockroachdb/sql-schema
/pkg/keys/ @cockroachdb/kv
/pkg/keys/ @cockroachdb/kv-prs
/pkg/migration/ @cockroachdb/kv @cockroachdb/sql-schema
/pkg/multitenant @cockroachdb/unowned
/pkg/release/ @cockroachdb/dev-inf
/pkg/roachpb/ @cockroachdb/kv
/pkg/roachpb/ @cockroachdb/kv-prs
/pkg/rpc/ @cockroachdb/server-prs
/pkg/scheduledjobs/ @cockroachdb/bulk-prs
/pkg/security/ @cockroachdb/server-prs
/pkg/settings/ @cockroachdb/server-prs
/pkg/startupmigrations/ @cockroachdb/server-prs @cockroachdb/sql-schema
/pkg/streaming/ @cockroachdb/bulk-prs
/pkg/testutils/ @cockroachdb/test-eng
/pkg/ts/ @cockroachdb/kv
/pkg/ts/ @cockroachdb/kv-prs
/pkg/ts/catalog/ @cockroachdb/obs-inf-prs
/pkg/util/ @cockroachdb/unowned
/pkg/util/log @cockroachdb/server-prs
Expand Down
1 change: 1 addition & 0 deletions TEAMS.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ cockroachdb/sql-observability:
cockroachdb/kv:
aliases:
cockroachdb/kv-triage: roachtest
cockroachdb/kv-prs: other
triage_column_id: 14242655
cockroachdb/geospatial:
triage_column_id: 9487269
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ go_test(
"//pkg/sql/catalog",
"//pkg/sql/catalog/catalogkv",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/descs",
"//pkg/sql/catalog/lease",
"//pkg/sql/catalog/systemschema",
"//pkg/sql/catalog/tabledesc",
Expand Down
61 changes: 61 additions & 0 deletions pkg/ccl/backupccl/restore_old_versions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,14 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
Expand Down Expand Up @@ -503,3 +510,57 @@ func TestRestoreOldBackupMissingOfflineIndexes(t *testing.T) {
}
}
}

func TestRestoreWithDroppedSchemaCorruption(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

const (
dbName = "foo"
backupDir = "testdata/restore_with_dropped_schema/exports/v20.2.7"
fromDir = "nodelocal://0/"
)

args := base.TestServerArgs{ExternalIODir: backupDir}
s, sqlDB, _ := serverutils.StartServer(t, args)
tdb := sqlutils.MakeSQLRunner(sqlDB)
defer s.Stopper().Stop(ctx)

tdb.Exec(t, fmt.Sprintf("RESTORE DATABASE %s FROM '%s'", dbName, fromDir))
query := fmt.Sprintf("SELECT database_name FROM [SHOW DATABASES] WHERE database_name = '%s'", dbName)
tdb.CheckQueryResults(t, query, [][]string{{dbName}})

// Read descriptor without validation.
execCfg := s.ExecutorConfig().(sql.ExecutorConfig)
hasSameNameSchema := func(dbName string) bool {
exists := false
var desc catalog.DatabaseDescriptor
require.NoError(t, sql.DescsTxn(ctx, &execCfg, func(
ctx context.Context, txn *kv.Txn, collection *descs.Collection,
) error {
// Using this method to avoid validation.
allDescs, err := catalogkv.GetAllDescriptors(ctx, txn, execCfg.Codec, false)
if err != nil {
return err
}
for _, d := range allDescs {
if d.GetName() == dbName {
desc, err = catalog.AsDatabaseDescriptor(d)
require.NoError(t, err, "unable to cast to database descriptor")
return nil
}
}
return nil
}))
require.NoError(t, desc.ForEachSchemaInfo(
func(id descpb.ID, name string, isDropped bool) error {
if name == dbName {
exists = true
}
return nil
}))
return exists
}
require.Falsef(t, hasSameNameSchema(dbName), "corrupted descriptor exists")
}
14 changes: 14 additions & 0 deletions pkg/ccl/backupccl/testdata/restore_with_dropped_schema/create.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
-- The below SQL is used to create a backup of a database that
-- contains a corrupted database descriptor. Data is produced
-- using version 20.2.7. This backup is used in
-- TestRestoreWithDroppedSchemaCorruption test.

CREATE DATABASE foo;

SET DATABASE = foo;

CREATE SCHEMA bar;

DROP SCHEMA bar;

BACKUP DATABASE foo to 'nodelocal://0/foo_backup';
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
���T
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
���T
Empty file.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
=�f
8 changes: 7 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1466,7 +1466,7 @@ func TestChangefeedAvroNotice(t *testing.T) {
sqlDB.Exec(t, "CREATE table foo (i int)")
sqlDB.Exec(t, `INSERT INTO foo VALUES (0)`)

sql := fmt.Sprintf("CREATE CHANGEFEED FOR d.foo INTO 'dummysink' WITH format=experimental_avro, confluent_schema_registry='%s'", schemaReg.URL())
sql := fmt.Sprintf("CREATE CHANGEFEED FOR d.foo INTO 'null://' WITH format=experimental_avro, confluent_schema_registry='%s'", schemaReg.URL())
expectNotice(t, s, sql, `avro is no longer experimental, use format=avro`)
}

Expand Down Expand Up @@ -2614,6 +2614,12 @@ func TestChangefeedErrors(t *testing.T) {
return fmt.Sprintf(`unknown %s sink query parameters: [%s]`, sink, strings.Join(params, ", "))
}

// Check that sink URLs have valid scheme
sqlDB.ExpectErr(
t, `no scheme found for sink URL`,
`CREATE CHANGEFEED FOR foo INTO 'kafka%3A%2F%2Fnope%0A'`,
)

// Check that confluent_schema_registry is only accepted if format is avro.
// TODO: This should be testing it as a WITH option and check avro_schema_prefix too
sqlDB.ExpectErr(
Expand Down
1 change: 0 additions & 1 deletion pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ const (
SinkParamSkipTLSVerify = `insecure_tls_skip_verify`
SinkParamTopicPrefix = `topic_prefix`
SinkParamTopicName = `topic_name`
SinkSchemeBuffer = ``
SinkSchemeCloudStorageAzure = `experimental-azure`
SinkSchemeCloudStorageGCS = `experimental-gs`
SinkSchemeCloudStorageHTTP = `experimental-http`
Expand Down
8 changes: 6 additions & 2 deletions pkg/ccl/changefeedccl/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,11 @@ func getSink(
}

newSink := func() (Sink, error) {
switch {
case u.Scheme == changefeedbase.SinkSchemeBuffer:
if feedCfg.SinkURI == "" {
return &bufferSink{}, nil
}

switch {
case u.Scheme == changefeedbase.SinkSchemeNull:
return makeNullSink(sinkURL{URL: u})
case u.Scheme == changefeedbase.SinkSchemeKafka:
Expand All @@ -98,6 +100,8 @@ func getSink(
case u.Scheme == changefeedbase.SinkSchemeHTTP || u.Scheme == changefeedbase.SinkSchemeHTTPS:
return nil, errors.Errorf(`unsupported sink: %s. HTTP endpoints can be used with %s and %s`,
u.Scheme, changefeedbase.SinkSchemeWebhookHTTPS, changefeedbase.SinkSchemeCloudStorageHTTPS)
case u.Scheme == "":
return nil, errors.Errorf(`no scheme found for sink URL %q`, feedCfg.SinkURI)
default:
return nil, errors.Errorf(`unsupported sink: %s`, u.Scheme)
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/migration/migrations/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,11 @@ go_test(
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/catalogkeys",
"//pkg/sql/catalog/catalogkv",
"//pkg/sql/catalog/dbdesc",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/descs",
"//pkg/sql/catalog/systemschema",
Expand Down
107 changes: 107 additions & 0 deletions pkg/migration/migrations/fix_descriptor_migration_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,21 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/dbdesc"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -191,3 +198,103 @@ func TestFixPrivilegesMigration(t *testing.T) {
tc.Stopper().Stop(ctx)
}
}

func TestFixDBDescriptorDroppedSchemaName(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

clusterArgs := base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
DisableAutomaticVersionUpgrade: 1,
BinaryVersionOverride: clusterversion.ByKey(
clusterversion.FixDescriptors - 1),
},
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
},
},
}

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 1, clusterArgs)
defer tc.Stopper().Stop(ctx)
s := tc.Server(0)
sqlDB := tc.ServerConn(0)
tdb := sqlutils.MakeSQLRunner(sqlDB)

// - Create database.
// - Read its descriptor, add the bad entry, write the descriptor
// without validation.
// - Read the descriptor without validation and ensure that the bad
// entry exists, while ensures that the bad entry is not removed
// while writing the descriptor.
// - Run migration.
// - Read the descriptor without validation and ensure that the bad
// entry does not exist.

const dbName = "t"
tdb.Exec(t, "CREATE DATABASE "+dbName)

// Write a corrupted descriptor.
var descID descpb.ID
execCfg := s.ExecutorConfig().(sql.ExecutorConfig)
require.NoError(t, sql.DescsTxn(ctx, &execCfg, func(
ctx context.Context, txn *kv.Txn, collection *descs.Collection,
) error {
flags := tree.DatabaseLookupFlags{Required: true}
desc, err := collection.GetMutableDatabaseByName(ctx, txn, dbName, flags)
if err != nil {
return err
}
descID = desc.GetID()
desc.Schemas = map[string]descpb.DatabaseDescriptor_SchemaInfo{dbName: {ID: descID, Dropped: true}}
builder := dbdesc.NewBuilder(desc.DatabaseDesc())
badDesc := builder.BuildCreatedMutable()
badDesc.MaybeIncrementVersion()
collection.SkipValidationOnWrite()
return collection.WriteDesc(ctx, false, badDesc, txn)
}))

// Checks whether the erroneous entry exists or not.
hasSameNameSchema := func(dbName string) bool {
exists := false
var desc catalog.DatabaseDescriptor
require.NoError(t, sql.DescsTxn(ctx, &execCfg, func(
ctx context.Context, txn *kv.Txn, collection *descs.Collection,
) error {
// Using this method to avoid calling RunPostDeserializationChanges().
allDescs, err := catalogkv.GetAllDescriptors(ctx, txn, execCfg.Codec, false)
if err != nil {
return err
}
for _, d := range allDescs {
if d.GetID() == descID {
desc, err = catalog.AsDatabaseDescriptor(d)
require.NoError(t, err, "unable to cast to database descriptor")
return nil
}
}
return nil
}))
require.NoError(t, desc.ForEachSchemaInfo(
func(id descpb.ID, name string, isDropped bool) error {
if name == dbName {
exists = true
}
return nil
}))
return exists
}

// Validate that the bad entry does exist after writing the descriptor.
require.True(t, hasSameNameSchema(dbName), "bad entry does not exist")

// Migrate to the new version.
_, err := sqlDB.Exec(`SET CLUSTER SETTING version = $1`,
clusterversion.ByKey(clusterversion.FixDescriptors).String())
require.NoError(t, err)

// Validate that the bad entry is removed.
require.False(t, hasSameNameSchema(dbName), "bad entry exists")
}
Loading

0 comments on commit f2aded5

Please sign in to comment.