diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 465918ce7842..6993f88aa777 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -22,7 +22,9 @@ go_library( "scram_client.go", "sink.go", "sink_cloudstorage.go", + "sink_external_connection.go", "sink_kafka.go", + "sink_kafka_connection.go", "sink_pubsub.go", "sink_sql.go", "sink_webhook.go", @@ -45,6 +47,8 @@ go_library( "//pkg/ccl/changefeedccl/schemafeed", "//pkg/ccl/utilccl", "//pkg/cloud", + "//pkg/cloud/externalconn", + "//pkg/cloud/externalconn/connectionpb", "//pkg/clusterversion", "//pkg/docs", "//pkg/featureflag", @@ -89,6 +93,7 @@ go_library( "//pkg/sql/sem/eval", "//pkg/sql/sem/tree", "//pkg/sql/sessiondatapb", + "//pkg/sql/sqlutil", "//pkg/sql/types", "//pkg/util/bitarray", "//pkg/util/bufalloc", @@ -151,6 +156,7 @@ go_test( "schema_registry_test.go", "show_changefeed_jobs_test.go", "sink_cloudstorage_test.go", + "sink_kafka_connection_test.go", "sink_test.go", "sink_webhook_test.go", "testfeed_test.go", diff --git a/pkg/ccl/changefeedccl/changefeedbase/options.go b/pkg/ccl/changefeedccl/changefeedbase/options.go index ad5d8155b39a..1d03185d8b05 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/options.go +++ b/pkg/ccl/changefeedccl/changefeedbase/options.go @@ -187,6 +187,7 @@ const ( SinkSchemeNull = `null` SinkSchemeWebhookHTTP = `webhook-http` SinkSchemeWebhookHTTPS = `webhook-https` + SinkSchemeExternalConnection = `external` SinkParamSASLEnabled = `sasl_enabled` SinkParamSASLHandshake = `sasl_handshake` SinkParamSASLUser = `sasl_user` @@ -340,6 +341,19 @@ var WebhookValidOptions = makeStringSet(OptWebhookAuthHeader, OptWebhookClientTi // PubsubValidOptions is options exclusive to pubsub sink var PubsubValidOptions = makeStringSet() +// ExternalConnectionValidOptions is options exclusive to the external +// connection sink. +// +// TODO(adityamaru): Some of these options should be supported when creating the +// external connection rather than when setting up the changefeed. Move them once +// we support `CREATE EXTERNAL CONNECTION ... WITH `. +var ExternalConnectionValidOptions = makeStringSet( + // Options valid for a kafka sink. + OptAvroSchemaPrefix, + OptConfluentSchemaRegistry, + OptKafkaSinkConfig, +) + // CaseInsensitiveOpts options which supports case Insensitive value var CaseInsensitiveOpts = makeStringSet(OptFormat, OptEnvelope, OptCompression, OptSchemaChangeEvents, OptSchemaChangePolicy, OptOnError, OptInitialScan) diff --git a/pkg/ccl/changefeedccl/sink.go b/pkg/ccl/changefeedccl/sink.go index aa1b8b4b60d1..95f794173e6f 100644 --- a/pkg/ccl/changefeedccl/sink.go +++ b/pkg/ccl/changefeedccl/sink.go @@ -190,6 +190,11 @@ func getSink( return validateOptionsAndMakeSink(changefeedbase.SQLValidOptions, func() (Sink, error) { return makeSQLSink(sinkURL{URL: u}, sqlSinkTableName, AllTargets(feedCfg), metricsBuilder) }) + case u.Scheme == changefeedbase.SinkSchemeExternalConnection: + return validateOptionsAndMakeSink(changefeedbase.ExternalConnectionValidOptions, func() (Sink, error) { + return makeExternalConnectionSink(ctx, sinkURL{URL: u}, user, serverCfg.DB, + serverCfg.Executor, serverCfg, feedCfg, timestampOracle, jobID, m) + }) case u.Scheme == "": return nil, errors.Errorf(`no scheme found for sink URL %q`, feedCfg.SinkURI) default: diff --git a/pkg/ccl/changefeedccl/sink_external_connection.go b/pkg/ccl/changefeedccl/sink_external_connection.go new file mode 100644 index 000000000000..cfd3a4610dae --- /dev/null +++ b/pkg/ccl/changefeedccl/sink_external_connection.go @@ -0,0 +1,65 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package changefeedccl + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/cloud/externalconn" + "github.com/cockroachdb/cockroach/pkg/cloud/externalconn/connectionpb" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" + "github.com/cockroachdb/errors" +) + +func makeExternalConnectionSink( + ctx context.Context, + u sinkURL, + user username.SQLUsername, + db *kv.DB, + ie sqlutil.InternalExecutor, + serverCfg *execinfra.ServerConfig, + // TODO(cdc): Replace jobspb.ChangefeedDetails with ChangefeedConfig. + feedCfg jobspb.ChangefeedDetails, + timestampOracle timestampLowerBoundOracle, + jobID jobspb.JobID, + m metricsRecorder, +) (Sink, error) { + if u.Host == "" { + return nil, errors.Newf("host component of an external URI must refer to an "+ + "existing External Connection object: %s", u.String()) + } + + externalConnectionName := u.Host + + // Retrieve the external connection object from the system table. + var ec externalconn.ExternalConnection + if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + var err error + ec, err = externalconn.LoadExternalConnection(ctx, externalConnectionName, ie, txn) + return err + }); err != nil { + return nil, errors.Wrap(err, "failed to load external connection object") + } + + // Construct a Sink handle for the underlying resource represented by the + // external connection object. + switch d := ec.ConnectionProto().Details.(type) { + case *connectionpb.ConnectionDetails_SimpleURI: + // Replace the external connection URI in the `feedCfg` with the URI of the + // underlying resource. + feedCfg.SinkURI = d.SimpleURI.URI + return getSink(ctx, serverCfg, feedCfg, timestampOracle, user, jobID, m) + default: + return nil, errors.Newf("cannot connect to %T; unsupported resource for a Sink connection", d) + } +} diff --git a/pkg/ccl/changefeedccl/sink_kafka_connection.go b/pkg/ccl/changefeedccl/sink_kafka_connection.go new file mode 100644 index 000000000000..8327d74b9f31 --- /dev/null +++ b/pkg/ccl/changefeedccl/sink_kafka_connection.go @@ -0,0 +1,48 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package changefeedccl + +import ( + "context" + "net/url" + + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" + "github.com/cockroachdb/cockroach/pkg/cloud/externalconn" + "github.com/cockroachdb/cockroach/pkg/cloud/externalconn/connectionpb" + "github.com/cockroachdb/errors" +) + +func parseAndValidateKafkaSinkURI( + ctx context.Context, uri *url.URL, +) (externalconn.ExternalConnection, error) { + // Validate the kafka URI by creating a kafka sink and throwing it away. + // + // TODO(adityamaru): When we add `CREATE EXTERNAL CONNECTION ... WITH` support + // to accept JSONConfig we should validate that here too. + _, err := makeKafkaSink(ctx, sinkURL{URL: uri}, changefeedbase.Targets{}, "", + nilMetricsRecorderBuilder) + if err != nil { + return nil, errors.Wrap(err, "invalid Kafka URI") + } + + connDetails := connectionpb.ConnectionDetails{ + Provider: connectionpb.ConnectionProvider_TypeKafka, + Details: &connectionpb.ConnectionDetails_SimpleURI{ + SimpleURI: &connectionpb.SimpleURI{ + URI: uri.String(), + }, + }, + } + return externalconn.NewExternalConnection(connDetails), nil +} + +func init() { + externalconn.RegisterConnectionDetailsFromURIFactory(changefeedbase.SinkSchemeKafka, + parseAndValidateKafkaSinkURI) +} diff --git a/pkg/ccl/changefeedccl/sink_kafka_connection_test.go b/pkg/ccl/changefeedccl/sink_kafka_connection_test.go new file mode 100644 index 000000000000..a679a2b815b4 --- /dev/null +++ b/pkg/ccl/changefeedccl/sink_kafka_connection_test.go @@ -0,0 +1,240 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package changefeedccl + +import ( + "context" + "fmt" + "strings" + "testing" + + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent" + "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" +) + +// externalConnectionKafkaSink is a wrapper sink that asserts the underlying +// resource it is Dial()ing is a kafka sink. This is used to test that External +// Connections route to the correct sink. +type externalConnectionKafkaSink struct { + sink Sink +} + +// Dial implements the Sink interface. +func (e *externalConnectionKafkaSink) Dial() error { + if _, ok := e.sink.(*kafkaSink); !ok { + return errors.Newf("unexpected sink type %T; expected a kafka sink", e.sink) + } + return nil +} + +// Close implements the Sink interface. +func (e *externalConnectionKafkaSink) Close() error { + return nil +} + +// EmitRow implements the Sink interface. +func (e *externalConnectionKafkaSink) EmitRow( + _ context.Context, _ TopicDescriptor, _, _ []byte, _, _ hlc.Timestamp, _ kvevent.Alloc, +) error { + return nil +} + +// Flush implements the Sink interface. +func (e *externalConnectionKafkaSink) Flush(_ context.Context) error { + return nil +} + +// EmitResolvedTimestamp implements the Sink interface. +func (e *externalConnectionKafkaSink) EmitResolvedTimestamp( + _ context.Context, _ Encoder, _ hlc.Timestamp, +) error { + return nil +} + +var _ Sink = (*externalConnectionKafkaSink)(nil) + +func TestChangefeedExternalConnections(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + s, stopServer := makeServer(t) + defer stopServer() + + knobs := s.TestingKnobs. + DistSQL.(*execinfra.TestingKnobs). + Changefeed.(*TestingKnobs) + knobs.WrapSink = func(s Sink, _ jobspb.JobID) Sink { + // External Connections recursively invokes `getSink` for the underlying + // resource. We want to prevent double wrapping the sink since we assert on + // the underlying Sink type in `Dial`. + if _, ok := s.(*externalConnectionKafkaSink); ok { + return s + } + return &externalConnectionKafkaSink{sink: s} + } + + sqlDB := sqlutils.MakeSQLRunner(s.DB) + sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) + sqlDB.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`) + enableEnterprise := utilccl.TestingDisableEnterprise() + enableEnterprise() + unknownParams := func(sink string, params ...string) string { + return fmt.Sprintf(`unknown %s sink query parameters: [%s]`, sink, strings.Join(params, ", ")) + } + + for _, tc := range []struct { + name string + uri string + expectedError string + }{ + { + // kafka_topic_prefix was referenced by an old version of the RFC, it's + // "topic_prefix" now. + name: "kafka_topic_prefix", + uri: "kafka://nope/?kafka_topic_prefix=foo", + expectedError: unknownParams(`kafka`, `kafka_topic_prefix`), + }, + { + // schema_topic will be implemented but isn't yet. + name: "schema_topic is not yet supported", + uri: "kafka://nope/?schema_topic=foo", + expectedError: "schema_topic is not yet supported", + }, + // Sanity check kafka tls parameters. + { + name: "param tls_enabled must be a bool", + uri: "kafka://nope/?tls_enabled=foo", + expectedError: "param tls_enabled must be a bool", + }, + { + name: "param insecure_tls_skip_verify must be a bool", + uri: "kafka://nope/?tls_enabled=true&insecure_tls_skip_verify=foo", + expectedError: "param insecure_tls_skip_verify must be a bool", + }, + { + name: "param ca_cert must be base 64 encoded", + uri: "kafka://nope/?ca_cert=!", + expectedError: "param ca_cert must be base 64 encoded", + }, + { + name: "ca_cert requires tls_enabled=true", + uri: "kafka://nope/?&ca_cert=Zm9v", + expectedError: "ca_cert requires tls_enabled=true", + }, + { + name: "param client_cert must be base 64 encoded", + uri: "kafka://nope/?client_cert=!", + expectedError: "param client_cert must be base 64 encoded", + }, + { + name: "param client_key must be base 64 encoded", + uri: "kafka://nope/?client_key=!", + expectedError: "param client_key must be base 64 encoded", + }, + { + name: "client_cert requires tls_enabled=true", + uri: "kafka://nope/?client_cert=Zm9v", + expectedError: "client_cert requires tls_enabled=true", + }, + { + name: "client_cert requires client_key to be set", + uri: "kafka://nope/?tls_enabled=true&client_cert=Zm9v", + expectedError: "client_cert requires client_key to be set", + }, + { + name: "client_key requires client_cert to be set", + uri: "kafka://nope/?tls_enabled=true&client_key=Zm9v", + expectedError: "client_key requires client_cert to be set", + }, + { + name: "invalid client certificate", + uri: "kafka://nope/?tls_enabled=true&client_cert=Zm9v&client_key=Zm9v", + expectedError: "invalid client certificate", + }, + // Sanity check kafka sasl parameters. + { + name: "param sasl_enabled must be a bool", + uri: "kafka://nope/?sasl_enabled=maybe", + expectedError: "param sasl_enabled must be a bool", + }, + { + name: "param sasl_handshake must be a bool", + uri: "kafka://nope/?sasl_enabled=true&sasl_handshake=maybe", + expectedError: "param sasl_handshake must be a bool", + }, + { + name: "sasl_enabled must be enabled to configure SASL handshake behavior", + uri: "kafka://nope/?sasl_handshake=false", + expectedError: "sasl_enabled must be enabled to configure SASL handshake behavior", + }, + { + name: "sasl_user must be provided when SASL is enabled", + uri: "kafka://nope/?sasl_enabled=true", + expectedError: "sasl_user must be provided when SASL is enabled", + }, + { + name: "sasl_password must be provided when SASL is enabled", + uri: "kafka://nope/?sasl_enabled=true&sasl_user=a", + expectedError: "sasl_password must be provided when SASL is enabled", + }, + { + name: "sasl_enabled must be enabled if a SASL user is provided", + uri: "kafka://nope/?sasl_user=a", + expectedError: "sasl_enabled must be enabled if a SASL user is provided", + }, + { + name: "sasl_enabled must be enabled if a SASL password is provided", + uri: "kafka://nope/?sasl_password=a", + expectedError: "sasl_enabled must be enabled if a SASL password is provided", + }, + { + name: "sasl_enabled must be enabled to configure SASL mechanism", + uri: "kafka://nope/?sasl_mechanism=SCRAM-SHA-256", + expectedError: "sasl_enabled must be enabled to configure SASL mechanism", + }, + { + name: "param sasl_mechanism must be one of SCRAM-SHA-256, SCRAM-SHA-512, or PLAIN", + uri: "kafka://nope/?sasl_enabled=true&sasl_mechanism=unsuppported", + expectedError: "param sasl_mechanism must be one of SCRAM-SHA-256, SCRAM-SHA-512, or PLAIN", + }, + } { + t.Run(tc.name, func(t *testing.T) { + sqlDB.ExpectErr( + t, tc.expectedError, + fmt.Sprintf(`CREATE EXTERNAL CONNECTION '%s' AS '%s'`, tc.name, tc.uri), + ) + }) + } + + // We wrap the changefeed Sink with `externalConnectionKafkaSink` that asserts + // the underlying Sink is a kafka sink. + t.Run("changefeed-with-well-formed-uri", func(t *testing.T) { + sqlDB.Exec(t, `CREATE EXTERNAL CONNECTION nope AS 'kafka://nope'`) + sqlDB.Exec(t, `CREATE CHANGEFEED FOR foo INTO 'external://nope'`) + + sqlDB.Exec(t, `CREATE EXTERNAL CONNECTION "nope-with-params" AS 'kafka://nope/?tls_enabled=true&insecure_tls_skip_verify=true&topic_name=foo'`) + sqlDB.Exec(t, `CREATE CHANGEFEED FOR foo INTO 'external://nope-with-params'`) + + sqlDB.Exec( + t, `CREATE CHANGEFEED FOR foo INTO 'external://nope/' WITH kafka_sink_config='{"Flush": {"Messages": 100, "Frequency": "1s"}}'`, + ) + + sqlDB.ExpectErr( + t, `this sink is incompatible with option webhook_client_timeout`, + `CREATE CHANGEFEED FOR foo INTO 'external://nope/' WITH webhook_client_timeout='1s'`, + ) + }) +} diff --git a/pkg/ccl/cloudccl/externalconn/BUILD.bazel b/pkg/ccl/cloudccl/externalconn/BUILD.bazel index 80c7b4042209..e9c5d68e93ce 100644 --- a/pkg/ccl/cloudccl/externalconn/BUILD.bazel +++ b/pkg/ccl/cloudccl/externalconn/BUILD.bazel @@ -10,6 +10,7 @@ go_test( data = glob(["testdata/**"]), deps = [ "//pkg/base", + "//pkg/ccl/changefeedccl", "//pkg/ccl/kvccl/kvtenantccl", "//pkg/cloud/externalconn/providers", "//pkg/cloud/externalconn/utils", diff --git a/pkg/ccl/cloudccl/externalconn/datadriven_test.go b/pkg/ccl/cloudccl/externalconn/datadriven_test.go index 2de372a1e79c..5ea9113c010c 100644 --- a/pkg/ccl/cloudccl/externalconn/datadriven_test.go +++ b/pkg/ccl/cloudccl/externalconn/datadriven_test.go @@ -15,6 +15,7 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/base" + _ "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl" // register the sink External Connection implementations _ "github.com/cockroachdb/cockroach/pkg/cloud/externalconn/providers" // register all the concrete External Connection implementations "github.com/cockroachdb/cockroach/pkg/cloud/externalconn/utils" "github.com/cockroachdb/cockroach/pkg/jobs" diff --git a/pkg/ccl/cloudccl/externalconn/testdata/create_drop_external_connection b/pkg/ccl/cloudccl/externalconn/testdata/create_drop_external_connection index c03464ae42f3..7d0e8249540a 100644 --- a/pkg/ccl/cloudccl/externalconn/testdata/create_drop_external_connection +++ b/pkg/ccl/cloudccl/externalconn/testdata/create_drop_external_connection @@ -124,3 +124,26 @@ inspect-system-table ---- subtest end + +subtest basic-kafka-sink + +exec-sql +CREATE EXTERNAL CONNECTION "foo-kafka" AS 'kafka://broker.address.com:9092?topic_prefix=bar_&tls_enabled=true&ca_cert=Zm9vCg==&sasl_enabled=true&sasl_user={sasl user}&sasl_password={url-encoded password}&sasl_mechanism=SCRAM-SHA-256' +---- + +# Reject invalid kafka external connections. +exec-sql +CREATE EXTERNAL CONNECTION "invalid-param-kafka" AS 'kafka://broker.address.com:9092?INVALIDPARAM=baz' +---- +pq: failed to construct External Connection details: invalid Kafka URI: unknown kafka sink query parameters: INVALIDPARAM + +exec-sql +CREATE EXTERNAL CONNECTION "invalid-cert" AS 'kafka://broker.address.com?topic_prefix=bar_&tls_enabled=true&ca_cert=Zm9vCg=11' +---- +pq: failed to construct External Connection details: invalid Kafka URI: param ca_cert must be base 64 encoded: illegal base64 data at input byte 6 + +inspect-system-table +---- +foo-kafka STORAGE {"provider": "kafka", "simpleUri": {"uri": "kafka://broker.address.com:9092?topic_prefix=bar_&tls_enabled=true&ca_cert=Zm9vCg==&sasl_enabled=true&sasl_user={sasl user}&sasl_password={url-encoded password}&sasl_mechanism=SCRAM-SHA-256"}} + +subtest end diff --git a/pkg/ccl/cloudccl/externalconn/testdata/multi-tenant/create_drop_external_connection b/pkg/ccl/cloudccl/externalconn/testdata/multi-tenant/create_drop_external_connection index a5f0eefdab77..ed8a9f7dc815 100644 --- a/pkg/ccl/cloudccl/externalconn/testdata/multi-tenant/create_drop_external_connection +++ b/pkg/ccl/cloudccl/externalconn/testdata/multi-tenant/create_drop_external_connection @@ -116,3 +116,26 @@ inspect-system-table ---- subtest end + +subtest basic-kafka-sink + +exec-sql +CREATE EXTERNAL CONNECTION "foo-kafka" AS 'kafka://broker.address.com:9092?topic_prefix=bar_&tls_enabled=true&ca_cert=Zm9vCg==&sasl_enabled=true&sasl_user={sasl user}&sasl_password={url-encoded password}&sasl_mechanism=SCRAM-SHA-256' +---- + +# Reject invalid kafka external connections. +exec-sql +CREATE EXTERNAL CONNECTION "invalid-param-kafka" AS 'kafka://broker.address.com:9092?INVALIDPARAM=baz' +---- +pq: failed to construct External Connection details: invalid Kafka URI: unknown kafka sink query parameters: INVALIDPARAM + +exec-sql +CREATE EXTERNAL CONNECTION "invalid-cert" AS 'kafka://broker.address.com?topic_prefix=bar_&tls_enabled=true&ca_cert=Zm9vCg=11' +---- +pq: failed to construct External Connection details: invalid Kafka URI: param ca_cert must be base 64 encoded: illegal base64 data at input byte 6 + +inspect-system-table +---- +foo-kafka STORAGE {"provider": "kafka", "simpleUri": {"uri": "kafka://broker.address.com:9092?topic_prefix=bar_&tls_enabled=true&ca_cert=Zm9vCg==&sasl_enabled=true&sasl_user={sasl user}&sasl_password={url-encoded password}&sasl_mechanism=SCRAM-SHA-256"}} + +subtest end diff --git a/pkg/cloud/externalconn/connectionpb/connection.go b/pkg/cloud/externalconn/connectionpb/connection.go index f1da9ec4bb2d..27904c7d735a 100644 --- a/pkg/cloud/externalconn/connectionpb/connection.go +++ b/pkg/cloud/externalconn/connectionpb/connection.go @@ -19,6 +19,8 @@ func (d *ConnectionDetails) Type() ConnectionType { return TypeStorage case ConnectionProvider_TypeGSKMS: return TypeKMS + case ConnectionProvider_TypeKafka: + return TypeStorage default: panic(errors.AssertionFailedf("ConnectionDetails.Type called on a details with an unknown type: %T", d.Provider.String())) } diff --git a/pkg/cloud/externalconn/connectionpb/connection.proto b/pkg/cloud/externalconn/connectionpb/connection.proto index dc2f40ad8757..7f9098e17ddf 100644 --- a/pkg/cloud/externalconn/connectionpb/connection.proto +++ b/pkg/cloud/externalconn/connectionpb/connection.proto @@ -15,13 +15,16 @@ option go_package = "connectionpb"; import "gogoproto/gogo.proto"; enum ConnectionProvider { - Unknown = 0 [(gogoproto.enumvalue_customname) = "TypeUnspecified"];; + Unknown = 0 [(gogoproto.enumvalue_customname) = "TypeUnspecified"]; // External Storage providers. - nodelocal = 1 [(gogoproto.enumvalue_customname) = "TypeNodelocal"];; + nodelocal = 1 [(gogoproto.enumvalue_customname) = "TypeNodelocal"]; // KMS providers. - gs_kms = 2 [(gogoproto.enumvalue_customname) = "TypeGSKMS"];; + gs_kms = 2 [(gogoproto.enumvalue_customname) = "TypeGSKMS"]; + + // Sink providers. + kafka = 3 [(gogoproto.enumvalue_customname) = "TypeKafka"]; } // ConnectionType is the type of the External Connection object.