Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
97127: changefeedccl: set external connection test rate to 50% r=HonoreDB a=HonoreDB

We briefly had the metamorphic tests for external connections set to 100% external connections to catch any failing tests. It's now more valuable to make them actually metamorphic so we can prove that external connections don't change the behavior.

Epic: none

Release note: None

97140: changefeedccl: allow external connection as confluent schema registry r=HonoreDB a=HonoreDB

WITH confluent_schema_registry is another place a user can specify an arbitrary external connection string, and therefore should have the same controls and features as sinks do. This PR adds the ability to use an "external://" URI for this option, and makes it mandatory when it's mandatory for sinks.

Addresses #97139.

Release note (enterprise change): External connections can now be used as the URI value for a confluent schema registry. For example,
CREATE EXTERNAL CONNECTION reg AS "https://example.cloud?opt=val"; CREATE CHANGEFEED FOR foo WITH format='avro',confluent_schema_registry='external://reg'

98142: sem/tree: fix out-out-bounds exception in Tuple.formatHideConstants r=mgartner a=mgartner

This commit fixes an out-of-bounds exception that occurred when
formatting a tuple with more than two elements and only a single label.

Fixes #95621

Release note (bug fix): A bug has been fixed that could crash the
process when a query contained a literal tuple expression with more than
two elements and only a single label, e.g., `((1, 2, 3) AS foo)`.


Co-authored-by: Aaron Zinger <[email protected]>
Co-authored-by: Marcus Gartner <[email protected]>
  • Loading branch information
3 people committed Mar 9, 2023
4 parents 2d87998 + d6349af + 1daa473 + 318e352 commit aa4047f
Show file tree
Hide file tree
Showing 17 changed files with 253 additions and 62 deletions.
42 changes: 24 additions & 18 deletions pkg/ccl/changefeedccl/authorization.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func authorizeUserToCreateChangefeed(
sinkURI string,
hasSelectPrivOnAllTables bool,
hasChangefeedPrivOnAllTables bool,
otherExternalURIs ...string,
) error {
isAdmin, err := p.HasAdminRole(ctx)
if err != nil {
Expand Down Expand Up @@ -109,27 +110,32 @@ func authorizeUserToCreateChangefeed(

enforceExternalConnections := changefeedbase.RequireExternalConnectionSink.Get(&p.ExecCfg().Settings.SV)
if enforceExternalConnections {
uri, err := url.Parse(sinkURI)
if err != nil {
return errors.Newf("failed to parse url %s", sinkURI)
}
if uri.Scheme == changefeedbase.SinkSchemeExternalConnection {
ec, err := externalconn.LoadExternalConnection(ctx, uri.Host, p.InternalSQLTxn())
if err != nil {
return errors.Wrap(err, "failed to load external connection object")
for _, uriString := range append(otherExternalURIs, sinkURI) {
if uriString == "" {
continue
}
ecPriv := &syntheticprivilege.ExternalConnectionPrivilege{
ConnectionName: ec.ConnectionName(),
uri, err := url.Parse(uriString)
if err != nil {
return errors.Newf("failed to parse url %s", uriString)
}
if err := p.CheckPrivilege(ctx, ecPriv, privilege.USAGE); err != nil {
return err
if uri.Scheme == changefeedbase.SinkSchemeExternalConnection {
ec, err := externalconn.LoadExternalConnection(ctx, uri.Host, p.InternalSQLTxn())
if err != nil {
return errors.Wrap(err, "failed to load external connection object")
}
ecPriv := &syntheticprivilege.ExternalConnectionPrivilege{
ConnectionName: ec.ConnectionName(),
}
if err := p.CheckPrivilege(ctx, ecPriv, privilege.USAGE); err != nil {
return err
}
} else {
return pgerror.Newf(
pgcode.InsufficientPrivilege,
`the %s privilege on all tables can only be used with external connection sinks. see cluster setting %s`,
privilege.CHANGEFEED, changefeedbase.RequireExternalConnectionSink.Key(),
)
}
} else {
return pgerror.Newf(
pgcode.InsufficientPrivilege,
`the %s privilege on all tables can only be used with external connection sinks. see cluster setting %s`,
privilege.CHANGEFEED, changefeedbase.RequireExternalConnectionSink.Key(),
)
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -931,7 +931,7 @@ func newChangeFrontierProcessor(
if err != nil {
return nil, err
}
if cf.encoder, err = getEncoder(encodingOpts, AllTargets(spec.Feed)); err != nil {
if cf.encoder, err = getEncoder(encodingOpts, AllTargets(spec.Feed), makeExternalConnectionProvider(ctx, flowCtx.Cfg.DB)); err != nil {
return nil, err
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ func createChangefeedJobRecord(
}
}
if checkPrivs {
if err := authorizeUserToCreateChangefeed(ctx, p, sinkURI, hasSelectPrivOnAllTables, hasChangefeedPrivOnAllTables); err != nil {
if err := authorizeUserToCreateChangefeed(ctx, p, sinkURI, hasSelectPrivOnAllTables, hasChangefeedPrivOnAllTables, opts.GetConfluentSchemaRegistry()); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -566,7 +566,7 @@ func createChangefeedJobRecord(
if err != nil {
return nil, err
}
if _, err := getEncoder(encodingOpts, AllTargets(details)); err != nil {
if _, err := getEncoder(encodingOpts, AllTargets(details), makeExternalConnectionProvider(ctx, p.ExecCfg().InternalDB)); err != nil {
return nil, err
}

Expand Down
28 changes: 28 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"encoding/json"
"fmt"
"math"
"math/rand"
"net/http"
"net/http/httptest"
"net/url"
Expand Down Expand Up @@ -2937,6 +2938,33 @@ func TestChangefeedBareJSON(t *testing.T) {
cdcTest(t, testFn, feedTestForceSink("cloudstorage"))
}

func TestChangefeedExternalConnectionSchemaRegistry(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
sqlDB.Exec(t, `INSERT INTO foo values (0, 'dog')`)

schemaReg := cdctest.StartTestSchemaRegistry()
defer schemaReg.Close()

name := fmt.Sprintf("schemareg%d", rand.Uint64())

sqlDB.Exec(t, fmt.Sprintf(`CREATE EXTERNAL CONNECTION "%s" AS '%s'`, name, schemaReg.URL()))

sql := fmt.Sprintf("CREATE CHANGEFEED WITH format=avro, confluent_schema_registry='external://%s' AS SELECT * FROM foo", name)

foo := feed(t, f, sql)
defer closeFeed(t, foo)
assertPayloads(t, foo, []string{`foo: {"a":{"long":0}}->{"record":{"foo":{"a":{"long":0},"b":{"string":"dog"}}}}`})
}
// Test helpers for avro assume Kafka
cdcTest(t, testFn, feedTestForceSink("kafka"))
}

func TestChangefeedAvroNotice(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -887,6 +887,10 @@ func (s StatementOptions) GetMinCheckpointFrequency() (*time.Duration, error) {
return s.getDurationValue(OptMinCheckpointFrequency)
}

func (s StatementOptions) GetConfluentSchemaRegistry() string {
return s.m[OptConfluentSchemaRegistry]
}

// GetPTSExpiration returns the maximum age of the protected timestamp record.
// Changefeeds that fail to update their records in time will be canceled.
func (s StatementOptions) GetPTSExpiration() (time.Duration, error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ type Encoder interface {
}

func getEncoder(
opts changefeedbase.EncodingOptions, targets changefeedbase.Targets,
opts changefeedbase.EncodingOptions, targets changefeedbase.Targets, p externalConnectionProvider,
) (Encoder, error) {
switch opts.Format {
case changefeedbase.OptFormatJSON:
return makeJSONEncoder(opts)
case changefeedbase.OptFormatAvro, changefeedbase.DeprecatedOptFormatAvro:
return newConfluentAvroEncoder(opts, targets)
return newConfluentAvroEncoder(opts, targets, p)
case changefeedbase.OptFormatCSV:
return newCSVEncoder(opts), nil
case changefeedbase.OptFormatParquet:
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/encoder_avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ var encoderCacheConfig = cache.Config{
}

func newConfluentAvroEncoder(
opts changefeedbase.EncodingOptions, targets changefeedbase.Targets,
opts changefeedbase.EncodingOptions, targets changefeedbase.Targets, p externalConnectionProvider,
) (*confluentAvroEncoder, error) {
e := &confluentAvroEncoder{
schemaPrefix: opts.AvroSchemaPrefix,
Expand Down Expand Up @@ -102,7 +102,7 @@ func newConfluentAvroEncoder(
changefeedbase.OptConfluentSchemaRegistry, changefeedbase.OptFormat, changefeedbase.OptFormatAvro)
}

reg, err := newConfluentSchemaRegistry(opts.SchemaRegistryURI)
reg, err := newConfluentSchemaRegistry(opts.SchemaRegistryURI, p)
if err != nil {
return nil, err
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/ccl/changefeedccl/encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func TestEncoders(t *testing.T) {
return
}
require.NoError(t, o.Validate())
e, err := getEncoder(o, targets)
e, err := getEncoder(o, targets, nil)
require.NoError(t, err)

rowInsert := cdcevent.TestingMakeEventRow(tableDesc, 0, row, false)
Expand Down Expand Up @@ -382,7 +382,7 @@ func TestAvroEncoderWithTLS(t *testing.T) {
StatementTimeName: changefeedbase.StatementTimeName(tableDesc.GetName()),
})

e, err := getEncoder(opts, targets)
e, err := getEncoder(opts, targets, nil)
require.NoError(t, err)

rowInsert := cdcevent.TestingMakeEventRow(tableDesc, 0, row, false)
Expand Down Expand Up @@ -414,7 +414,7 @@ func TestAvroEncoderWithTLS(t *testing.T) {
defer noCertReg.Close()
opts.SchemaRegistryURI = noCertReg.URL()

enc, err := getEncoder(opts, targets)
enc, err := getEncoder(opts, targets, nil)
require.NoError(t, err)
_, err = enc.EncodeKey(context.Background(), rowInsert)
require.Regexp(t, "x509", err)
Expand All @@ -427,7 +427,7 @@ func TestAvroEncoderWithTLS(t *testing.T) {
defer wrongCertReg.Close()
opts.SchemaRegistryURI = wrongCertReg.URL()

enc, err = getEncoder(opts, targets)
enc, err = getEncoder(opts, targets, nil)
require.NoError(t, err)
_, err = enc.EncodeKey(context.Background(), rowInsert)
require.Regexp(t, `contacting confluent schema registry.*: x509`, err)
Expand Down Expand Up @@ -916,7 +916,7 @@ func BenchmarkEncoders(b *testing.B) {
bench := func(b *testing.B, fn encodeFn, opts changefeedbase.EncodingOptions, updatedRows, prevRows []cdcevent.Row) {
b.ReportAllocs()
b.StopTimer()
encoder, err := getEncoder(opts, targets)
encoder, err := getEncoder(opts, targets, nil)
if err != nil {
b.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/event_processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func newEventConsumer(

makeConsumer := func(s EventSink, frontier frontier) (eventConsumer, error) {
var err error
encoder, err := getEncoder(encodingOpts, feed.Targets)
encoder, err := getEncoder(encodingOpts, feed.Targets, makeExternalConnectionProvider(ctx, cfg.DB))
if err != nil {
return nil, err
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1012,8 +1012,7 @@ func maybeUseExternalConnection(
) cdctest.TestFeedFactory {
// percentExternal is the chance of randomly running a test using an `external://` uri.
// Set to 1 to always do this.
// TODO (zinger): Set this to 0.5 before merging.
const percentExternal = 1
const percentExternal = 0.5
if sinkType == `sinkless` || sinkType == `enterprise` || strings.Contains(flakyWhenExternalConnection, sinkType) ||
options.forceNoExternalConnectionURI || rand.Float32() > percentExternal {
return factory
Expand Down
12 changes: 11 additions & 1 deletion pkg/ccl/changefeedccl/schema_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,22 @@ func getAndDeleteParams(u *url.URL) (schemaRegistryParams, error) {
return s, nil
}

func newConfluentSchemaRegistry(baseURL string) (*confluentSchemaRegistry, error) {
func newConfluentSchemaRegistry(
baseURL string, p externalConnectionProvider,
) (*confluentSchemaRegistry, error) {
u, err := url.Parse(baseURL)
if err != nil {
return nil, errors.Wrap(err, "malformed schema registry url")
}

if u.Scheme == changefeedbase.SinkSchemeExternalConnection {
actual, err := p.lookup(u.Host)
if err != nil {
return nil, err
}
return newConfluentSchemaRegistry(actual, p)
}

if u.Scheme != "http" && u.Scheme != "https" {
return nil, errors.Errorf("unsupported scheme: %q", u.Scheme)
}
Expand Down
47 changes: 42 additions & 5 deletions pkg/ccl/changefeedccl/schema_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package changefeedccl

import (
"context"
"errors"
"testing"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest"
Expand All @@ -23,16 +24,52 @@ func TestConfluentSchemaRegistry(t *testing.T) {
defer log.Scope(t).Close(t)

t.Run("errors with no scheme", func(t *testing.T) {
_, err := newConfluentSchemaRegistry("justsomestring")
_, err := newConfluentSchemaRegistry("justsomestring", nil)
require.Error(t, err)
})
t.Run("errors with unsupported scheme", func(t *testing.T) {
url := "gopher://myhost"
_, err := newConfluentSchemaRegistry(url)
_, err := newConfluentSchemaRegistry(url, nil)
require.Error(t, err)
})
}

type mockExternalConnectionProvider map[string]string

func (m mockExternalConnectionProvider) lookup(name string) (string, error) {
v, ok := m[name]
if !ok {
return v, errors.New("not found")
}
return v, nil
}

func TestConfluentSchemaRegistryExternalConnection(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

regServer := cdctest.StartTestSchemaRegistry()
defer regServer.Close()

m := mockExternalConnectionProvider{
"good_endpoint": regServer.URL(),
"bad_endpoint": "http://bad",
}

reg, err := newConfluentSchemaRegistry("external://good_endpoint", m)
require.NoError(t, err)
require.NoError(t, reg.Ping(context.Background()))

// We can load a bad endpoint, but ping should fail.
reg, err = newConfluentSchemaRegistry("external://bad_endpoint", m)
require.NoError(t, err)
require.Error(t, reg.Ping(context.Background()))

_, err = newConfluentSchemaRegistry("external://no_endpoint", m)
require.Error(t, err)

}

func TestConfluentSchemaRegistryPing(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand All @@ -41,17 +78,17 @@ func TestConfluentSchemaRegistryPing(t *testing.T) {
defer regServer.Close()

t.Run("ping works when all is well", func(t *testing.T) {
reg, err := newConfluentSchemaRegistry(regServer.URL())
reg, err := newConfluentSchemaRegistry(regServer.URL(), nil)
require.NoError(t, err)
require.NoError(t, reg.Ping(context.Background()))
})
t.Run("ping does not error from HTTP 404", func(t *testing.T) {
reg, err := newConfluentSchemaRegistry(regServer.URL() + "/path-does-not-exist-but-we-do-not-care")
reg, err := newConfluentSchemaRegistry(regServer.URL()+"/path-does-not-exist-but-we-do-not-care", nil)
require.NoError(t, err)
require.NoError(t, reg.Ping(context.Background()), "Ping")
})
t.Run("Ping errors with bad host", func(t *testing.T) {
reg, err := newConfluentSchemaRegistry("http://host-does-exist-and-we-care")
reg, err := newConfluentSchemaRegistry("http://host-does-exist-and-we-care", nil)
require.NoError(t, err)
require.Error(t, reg.Ping(context.Background()))
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func getSink(
case u.Scheme == changefeedbase.SinkSchemeExternalConnection:
return validateOptionsAndMakeSink(changefeedbase.ExternalConnectionValidOptions, func() (Sink, error) {
return makeExternalConnectionSink(
ctx, sinkURL{URL: u}, user, serverCfg.DB,
ctx, sinkURL{URL: u}, user, makeExternalConnectionProvider(ctx, serverCfg.DB),
serverCfg, feedCfg, timestampOracle, jobID, m,
)
})
Expand Down
Loading

0 comments on commit aa4047f

Please sign in to comment.