Skip to content

Commit

Permalink
externalconn,changefeedccl: support kafka in external connections
Browse files Browse the repository at this point in the history
This change adds the ability to create an external connection that
rerpresents a `kafka` sink. It also teaches changefeeds to recognize
the `external` schema URI.

When creating an external connection that represents a kafka sink, we
run validation on the passed in URI before persisting it in the system
table. This PR does not add any `WITH` options to the create statement
but in the near future we will want to allow the user to pass in certain
sink/resource specific configurations that will apply to all users of the
external connection object. For example, a kafka JSON config.

A changefeed can now be run to an `external` scheme URI that points to
an existing external connection object. Since we can only represent kafka
sinks as external connections as of now, the statement will only accept the
options that are valid for a kafka sink.

Informs: cockroachdb#84753

Release note (sql change): Users can now `CREATE EXTERNAL CONNECTION`
to represent a `kafka` sink. Subsequently, users can run
`CREATE CHANGEFEED` with an `external:///<external-connection-object-name`
URI as the sink to use the kafka resource represented by the
external connection object.
  • Loading branch information
adityamaru committed Aug 10, 2022
1 parent 6c24f35 commit 5c026d0
Show file tree
Hide file tree
Showing 12 changed files with 434 additions and 3 deletions.
6 changes: 6 additions & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ go_library(
"scram_client.go",
"sink.go",
"sink_cloudstorage.go",
"sink_external_connection.go",
"sink_kafka.go",
"sink_kafka_connection.go",
"sink_pubsub.go",
"sink_sql.go",
"sink_webhook.go",
Expand All @@ -45,6 +47,8 @@ go_library(
"//pkg/ccl/changefeedccl/schemafeed",
"//pkg/ccl/utilccl",
"//pkg/cloud",
"//pkg/cloud/externalconn",
"//pkg/cloud/externalconn/connectionpb",
"//pkg/clusterversion",
"//pkg/docs",
"//pkg/featureflag",
Expand Down Expand Up @@ -89,6 +93,7 @@ go_library(
"//pkg/sql/sem/eval",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondatapb",
"//pkg/sql/sqlutil",
"//pkg/sql/types",
"//pkg/util/bitarray",
"//pkg/util/bufalloc",
Expand Down Expand Up @@ -151,6 +156,7 @@ go_test(
"schema_registry_test.go",
"show_changefeed_jobs_test.go",
"sink_cloudstorage_test.go",
"sink_kafka_connection_test.go",
"sink_test.go",
"sink_webhook_test.go",
"testfeed_test.go",
Expand Down
14 changes: 14 additions & 0 deletions pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ const (
SinkSchemeNull = `null`
SinkSchemeWebhookHTTP = `webhook-http`
SinkSchemeWebhookHTTPS = `webhook-https`
SinkSchemeExternalConnection = `external`
SinkParamSASLEnabled = `sasl_enabled`
SinkParamSASLHandshake = `sasl_handshake`
SinkParamSASLUser = `sasl_user`
Expand Down Expand Up @@ -340,6 +341,19 @@ var WebhookValidOptions = makeStringSet(OptWebhookAuthHeader, OptWebhookClientTi
// PubsubValidOptions is options exclusive to pubsub sink
var PubsubValidOptions = makeStringSet()

// ExternalConnectionValidOptions is options exclusive to the external
// connection sink.
//
// TODO(adityamaru): Some of these options should be supported when creating the
// external connection rather than when setting up the changefeed. Move them once
// we support `CREATE EXTERNAL CONNECTION ... WITH <options>`.
var ExternalConnectionValidOptions = makeStringSet(
// Options valid for a kafka sink.
OptAvroSchemaPrefix,
OptConfluentSchemaRegistry,
OptKafkaSinkConfig,
)

// CaseInsensitiveOpts options which supports case Insensitive value
var CaseInsensitiveOpts = makeStringSet(OptFormat, OptEnvelope, OptCompression, OptSchemaChangeEvents,
OptSchemaChangePolicy, OptOnError, OptInitialScan)
Expand Down
5 changes: 5 additions & 0 deletions pkg/ccl/changefeedccl/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,11 @@ func getSink(
return validateOptionsAndMakeSink(changefeedbase.SQLValidOptions, func() (Sink, error) {
return makeSQLSink(sinkURL{URL: u}, sqlSinkTableName, AllTargets(feedCfg), metricsBuilder)
})
case u.Scheme == changefeedbase.SinkSchemeExternalConnection:
return validateOptionsAndMakeSink(changefeedbase.ExternalConnectionValidOptions, func() (Sink, error) {
return makeExternalConnectionSink(ctx, sinkURL{URL: u}, user, serverCfg.DB,
serverCfg.Executor, serverCfg, feedCfg, timestampOracle, jobID, m)
})
case u.Scheme == "":
return nil, errors.Errorf(`no scheme found for sink URL %q`, feedCfg.SinkURI)
default:
Expand Down
65 changes: 65 additions & 0 deletions pkg/ccl/changefeedccl/sink_external_connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright 2022 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package changefeedccl

import (
"context"

"github.com/cockroachdb/cockroach/pkg/cloud/externalconn"
"github.com/cockroachdb/cockroach/pkg/cloud/externalconn/connectionpb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/errors"
)

func makeExternalConnectionSink(
ctx context.Context,
u sinkURL,
user username.SQLUsername,
db *kv.DB,
ie sqlutil.InternalExecutor,
serverCfg *execinfra.ServerConfig,
// TODO(cdc): Replace jobspb.ChangefeedDetails with ChangefeedConfig.
feedCfg jobspb.ChangefeedDetails,
timestampOracle timestampLowerBoundOracle,
jobID jobspb.JobID,
m metricsRecorder,
) (Sink, error) {
if u.Host == "" {
return nil, errors.Newf("host component of an external URI must refer to an "+
"existing External Connection object: %s", u.String())
}

externalConnectionName := u.Host

// Retrieve the external connection object from the system table.
var ec externalconn.ExternalConnection
if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
var err error
ec, err = externalconn.LoadExternalConnection(ctx, externalConnectionName, ie, txn)
return err
}); err != nil {
return nil, errors.Wrap(err, "failed to load external connection object")
}

// Construct a Sink handle for the underlying resource represented by the
// external connection object.
switch d := ec.ConnectionProto().Details.(type) {
case *connectionpb.ConnectionDetails_SimpleURI:
// Replace the external connection URI in the `feedCfg` with the URI of the
// underlying resource.
feedCfg.SinkURI = d.SimpleURI.URI
return getSink(ctx, serverCfg, feedCfg, timestampOracle, user, jobID, m)
default:
return nil, errors.Newf("cannot connect to %T; unsupported resource for a Sink connection", d)
}
}
48 changes: 48 additions & 0 deletions pkg/ccl/changefeedccl/sink_kafka_connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2022 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package changefeedccl

import (
"context"
"net/url"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/cloud/externalconn"
"github.com/cockroachdb/cockroach/pkg/cloud/externalconn/connectionpb"
"github.com/cockroachdb/errors"
)

func parseAndValidateKafkaSinkURI(
ctx context.Context, uri *url.URL,
) (externalconn.ExternalConnection, error) {
// Validate the kafka URI by creating a kafka sink and throwing it away.
//
// TODO(adityamaru): When we add `CREATE EXTERNAL CONNECTION ... WITH` support
// to accept JSONConfig we should validate that here too.
_, err := makeKafkaSink(ctx, sinkURL{URL: uri}, changefeedbase.Targets{}, "",
nilMetricsRecorderBuilder)
if err != nil {
return nil, errors.Wrap(err, "invalid Kafka URI")
}

connDetails := connectionpb.ConnectionDetails{
Provider: connectionpb.ConnectionProvider_TypeKafka,
Details: &connectionpb.ConnectionDetails_SimpleURI{
SimpleURI: &connectionpb.SimpleURI{
URI: uri.String(),
},
},
}
return externalconn.NewExternalConnection(connDetails), nil
}

func init() {
externalconn.RegisterConnectionDetailsFromURIFactory(changefeedbase.SinkSchemeKafka,
parseAndValidateKafkaSinkURI)
}
Loading

0 comments on commit 5c026d0

Please sign in to comment.