Skip to content

Commit

Permalink
changefeedccl: detect sink URLs with no scheme
Browse files Browse the repository at this point in the history
Previously, if a user provided a sink URL with no scheme (such as
` kafka%3A%2F%2Fnope%0A`), a changefeed job would be started. However,
this changefeed job would be writing into a bufferSink.  The
bufferSink is used by core changefeeds.

The user may have provided such a URL because of confusion over how to
URL encode their sink URL.

Now, they will receive an error such as

```
pq: no scheme found for sink URL "kafka%3A%2F%2Fnope%0A"
```

Release note (enterprise change): CHANGEFEED statements now error
if the provided sink URL does not contain a scheme. Such URLs are
typically a mistake and will result in non-functional changefeeds.
  • Loading branch information
stevendanna committed Aug 16, 2021
1 parent 04a41e7 commit e8dfc48
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 4 deletions.
8 changes: 7 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1466,7 +1466,7 @@ func TestChangefeedAvroNotice(t *testing.T) {
sqlDB.Exec(t, "CREATE table foo (i int)")
sqlDB.Exec(t, `INSERT INTO foo VALUES (0)`)

sql := fmt.Sprintf("CREATE CHANGEFEED FOR d.foo INTO 'dummysink' WITH format=experimental_avro, confluent_schema_registry='%s'", schemaReg.URL())
sql := fmt.Sprintf("CREATE CHANGEFEED FOR d.foo INTO 'null://' WITH format=experimental_avro, confluent_schema_registry='%s'", schemaReg.URL())
expectNotice(t, s, sql, `avro is no longer experimental, use format=avro`)
}

Expand Down Expand Up @@ -2614,6 +2614,12 @@ func TestChangefeedErrors(t *testing.T) {
return fmt.Sprintf(`unknown %s sink query parameters: [%s]`, sink, strings.Join(params, ", "))
}

// Check that sink URLs have valid scheme
sqlDB.ExpectErr(
t, `no scheme found for sink URL`,
`CREATE CHANGEFEED FOR foo INTO 'kafka%3A%2F%2Fnope%0A'`,
)

// Check that confluent_schema_registry is only accepted if format is avro.
// TODO: This should be testing it as a WITH option and check avro_schema_prefix too
sqlDB.ExpectErr(
Expand Down
1 change: 0 additions & 1 deletion pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ const (
SinkParamSkipTLSVerify = `insecure_tls_skip_verify`
SinkParamTopicPrefix = `topic_prefix`
SinkParamTopicName = `topic_name`
SinkSchemeBuffer = ``
SinkSchemeCloudStorageAzure = `experimental-azure`
SinkSchemeCloudStorageGCS = `experimental-gs`
SinkSchemeCloudStorageHTTP = `experimental-http`
Expand Down
8 changes: 6 additions & 2 deletions pkg/ccl/changefeedccl/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,11 @@ func getSink(
}

newSink := func() (Sink, error) {
switch {
case u.Scheme == changefeedbase.SinkSchemeBuffer:
if feedCfg.SinkURI == "" {
return &bufferSink{}, nil
}

switch {
case u.Scheme == changefeedbase.SinkSchemeNull:
return makeNullSink(sinkURL{URL: u})
case u.Scheme == changefeedbase.SinkSchemeKafka:
Expand All @@ -98,6 +100,8 @@ func getSink(
case u.Scheme == changefeedbase.SinkSchemeHTTP || u.Scheme == changefeedbase.SinkSchemeHTTPS:
return nil, errors.Errorf(`unsupported sink: %s. HTTP endpoints can be used with %s and %s`,
u.Scheme, changefeedbase.SinkSchemeWebhookHTTPS, changefeedbase.SinkSchemeCloudStorageHTTPS)
case u.Scheme == "":
return nil, errors.Errorf(`no scheme found for sink URL %q`, feedCfg.SinkURI)
default:
return nil, errors.Errorf(`unsupported sink: %s`, u.Scheme)
}
Expand Down

0 comments on commit e8dfc48

Please sign in to comment.