Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
68818: changefeedccl: mark avro format as no longer experimental r=[miretskiy,spiffyeng] a=HonoreDB

The avro format for changefeeds now supports all column types
and has been in production use for several releases.
We'll now allow format=avro rather than format=experimental_avro
The old string will remain supported because job payloads can
persist across upgrades and downgrades.

Release note (enterprise change): changefeed avro format no longer marked experimental

Co-authored-by: Aaron Zinger <[email protected]>
  • Loading branch information
craig[bot] and HonoreDB committed Aug 14, 2021
2 parents 5dc07da + 69e70e7 commit ee3efd6
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 32 deletions.
2 changes: 2 additions & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ go_library(
"//pkg/sql/flowinfra",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/pgwire/pgnotice",
"//pkg/sql/privilege",
"//pkg/sql/roleoption",
"//pkg/sql/row",
Expand Down Expand Up @@ -216,6 +217,7 @@ go_test(
"@com_github_cockroachdb_errors//:errors",
"@com_github_dustin_go_humanize//:go-humanize",
"@com_github_jackc_pgx//:pgx",
"@com_github_lib_pq//:pq",
"@com_github_shopify_sarama//:sarama",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
Expand Down
12 changes: 11 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgnotice"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
"github.com/cockroachdb/cockroach/pkg/sql/roleoption"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -94,6 +95,7 @@ func changefeedPlanHook(
var header colinfo.ResultColumns
unspecifiedSink := changefeedStmt.SinkURI == nil
avoidBuffering := false

if unspecifiedSink {
// An unspecified sink triggers a fairly radical change in behavior.
// Instead of setting up a system.job to emit to a sink in the
Expand Down Expand Up @@ -152,6 +154,14 @@ func changefeedPlanHook(
return err
}

if opts[changefeedbase.OptFormat] == changefeedbase.DeprecatedOptFormatAvro {
p.BufferClientNotice(ctx, pgnotice.Newf(
`%[1]s is no longer experimental, use %[2]s=%[1]s`,
changefeedbase.OptFormatAvro, changefeedbase.OptFormat),
)
// Still serialize the experimental_ form for backwards compatibility
}

jobDescription, err := changefeedJobDescription(p, changefeedStmt, sinkURI, opts)
if err != nil {
return err
Expand Down Expand Up @@ -534,7 +544,7 @@ func validateDetails(details jobspb.ChangefeedDetails) (jobspb.ChangefeedDetails
switch v := changefeedbase.FormatType(details.Opts[opt]); v {
case ``, changefeedbase.OptFormatJSON:
details.Opts[opt] = string(changefeedbase.OptFormatJSON)
case changefeedbase.OptFormatAvro:
case changefeedbase.OptFormatAvro, changefeedbase.DeprecatedOptFormatAvro:
// No-op.
default:
return jobspb.ChangefeedDetails{}, errors.Errorf(
Expand Down
35 changes: 26 additions & 9 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1453,6 +1453,23 @@ func TestChangefeedAuthorization(t *testing.T) {
}
}

func TestChangefeedAvroNotice(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

s, db, stop := startTestServer(t, feedTestOptions{})
defer stop()
schemaReg := cdctest.StartTestSchemaRegistry()
defer schemaReg.Close()

sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, "CREATE table foo (i int)")
sqlDB.Exec(t, `INSERT INTO foo VALUES (0)`)

sql := fmt.Sprintf("CREATE CHANGEFEED FOR d.foo INTO 'dummysink' WITH format=experimental_avro, confluent_schema_registry='%s'", schemaReg.URL())
expectNotice(t, s, sql, `avro is no longer experimental, use format=avro`)
}

func requireErrorSoon(
ctx context.Context, t *testing.T, f cdctest.TestFeed, errRegex *regexp.Regexp,
) {
Expand Down Expand Up @@ -1547,7 +1564,7 @@ func TestChangefeedWorksOnRBRChange(t *testing.T) {
sqlDB.Exec(t, `CREATE TABLE rbr (a INT PRIMARY KEY, b INT)`)
defer sqlDB.Exec(t, `DROP TABLE rbr`)
sqlDB.Exec(t, `INSERT INTO rbr VALUES (0, NULL)`)
rbr := feed(t, f, fmt.Sprintf("CREATE CHANGEFEED FOR rbr WITH format=experimental_avro, confluent_schema_registry='%s'", schemaReg.URL()))
rbr := feed(t, f, fmt.Sprintf("CREATE CHANGEFEED FOR rbr WITH format=avro, confluent_schema_registry='%s'", schemaReg.URL()))
defer closeFeed(t, rbr)
sqlDB.Exec(t, `INSERT INTO rbr VALUES (1, 2)`)
assertPayloads(t, rbr, []string{
Expand All @@ -1564,7 +1581,7 @@ func TestChangefeedWorksOnRBRChange(t *testing.T) {
sqlDB.Exec(t, `CREATE TABLE rbr (a INT PRIMARY KEY, b INT, region crdb_internal_region NOT NULL DEFAULT 'us-east-1')`)
defer sqlDB.Exec(t, `DROP TABLE rbr`)
sqlDB.Exec(t, `INSERT INTO rbr VALUES (0, NULL)`)
rbr := feed(t, f, fmt.Sprintf("CREATE CHANGEFEED FOR rbr WITH format=experimental_avro, confluent_schema_registry='%s'", schemaReg.URL()))
rbr := feed(t, f, fmt.Sprintf("CREATE CHANGEFEED FOR rbr WITH format=avro, confluent_schema_registry='%s'", schemaReg.URL()))
defer closeFeed(t, rbr)
sqlDB.Exec(t, `INSERT INTO rbr VALUES (1, 2)`)
assertPayloads(t, rbr, []string{
Expand Down Expand Up @@ -1616,7 +1633,7 @@ func TestChangefeedRBRAvroAddRegion(t *testing.T) {
sqlDB.Exec(t, `CREATE TABLE rbr (a INT PRIMARY KEY)`)
waitForSchemaChange(t, sqlDB, `ALTER TABLE rbr SET LOCALITY REGIONAL BY ROW`)
sqlDB.Exec(t, `INSERT INTO rbr VALUES (0)`)
rbr := feed(t, f, fmt.Sprintf("CREATE CHANGEFEED FOR rbr WITH format=experimental_avro, confluent_schema_registry='%s'", schemaReg.URL()))
rbr := feed(t, f, fmt.Sprintf("CREATE CHANGEFEED FOR rbr WITH format=avro, confluent_schema_registry='%s'", schemaReg.URL()))
defer closeFeed(t, rbr)
assertPayloads(t, rbr, []string{
`rbr: {"a":{"long":0},"crdb_region":{"string":"us-east1"}}->{"after":{"rbr":{"a":{"long":0},"crdb_region":{"string":"us-east1"}}}}`,
Expand Down Expand Up @@ -2720,20 +2737,20 @@ func TestChangefeedErrors(t *testing.T) {
)
// The avro format doesn't support key_in_value or topic_in_value yet.
sqlDB.ExpectErr(
t, `key_in_value is not supported with format=experimental_avro`,
t, `key_in_value is not supported with format=avro`,
`CREATE CHANGEFEED FOR foo INTO $1 WITH key_in_value, format='experimental_avro'`,
`kafka://nope`,
)
sqlDB.ExpectErr(
t, `topic_in_value is not supported with format=experimental_avro`,
t, `topic_in_value is not supported with format=avro`,
`CREATE CHANGEFEED FOR foo INTO $1 WITH topic_in_value, format='experimental_avro'`,
`kafka://nope`,
)

// The cloudStorageSink is particular about the options it will work with.
sqlDB.ExpectErr(
t, `this sink is incompatible with format=experimental_avro`,
`CREATE CHANGEFEED FOR foo INTO $1 WITH format='experimental_avro', confluent_schema_registry=$2`,
t, `this sink is incompatible with format=avro`,
`CREATE CHANGEFEED FOR foo INTO $1 WITH format='avro', confluent_schema_registry=$2`,
`experimental-nodelocal://0/bar`, schemaReg.URL(),
)
sqlDB.ExpectErr(
Expand Down Expand Up @@ -2816,8 +2833,8 @@ func TestChangefeedErrors(t *testing.T) {
`CREATE CHANGEFEED FOR foo INTO $1`, `webhook-http://fake-host`,
)
sqlDB.ExpectErr(
t, `this sink is incompatible with format=experimental_avro`,
`CREATE CHANGEFEED FOR foo INTO $1 WITH format='experimental_avro', confluent_schema_registry=$2`,
t, `this sink is incompatible with format=avro`,
`CREATE CHANGEFEED FOR foo INTO $1 WITH format='avro', confluent_schema_registry=$2`,
`webhook-https://fake-host`, schemaReg.URL(),
)
sqlDB.ExpectErr(
Expand Down
7 changes: 5 additions & 2 deletions pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,16 @@ const (
OptEnvelopeDeprecatedRow EnvelopeType = `deprecated_row`
OptEnvelopeWrapped EnvelopeType = `wrapped`

OptFormatJSON FormatType = `json`
OptFormatAvro FormatType = `experimental_avro`
OptFormatJSON FormatType = `json`
OptFormatAvro FormatType = `avro`

OptFormatNative FormatType = `native`

OptOnErrorFail OnErrorType = `fail`
OptOnErrorPause OnErrorType = `pause`

DeprecatedOptFormatAvro = `experimental_avro`

// OptKafkaSinkConfig is a JSON configuration for kafka sink (kafkaSinkConfig).
OptKafkaSinkConfig = `kafka_sink_config`

Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func getEncoder(
switch changefeedbase.FormatType(opts[changefeedbase.OptFormat]) {
case ``, changefeedbase.OptFormatJSON:
return makeJSONEncoder(opts, targets)
case changefeedbase.OptFormatAvro:
case changefeedbase.OptFormatAvro, changefeedbase.DeprecatedOptFormatAvro:
return newConfluentAvroEncoder(ctx, opts, targets)
case changefeedbase.OptFormatNative:
return &nativeEncoder{}, nil
Expand Down
38 changes: 19 additions & 19 deletions pkg/ccl/changefeedccl/encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,46 +116,46 @@ func TestEncoders(t *testing.T) {
delete: `[1]->{"after": null, "before": {"a": 1, "b": "bar"}, "updated": "1.0000000002"}`,
resolved: `{"resolved":"1.0000000002"}`,
},
`format=experimental_avro,envelope=key_only`: {
`format=avro,envelope=key_only`: {
insert: `{"a":{"long":1}}->`,
delete: `{"a":{"long":1}}->`,
resolved: `{"resolved":{"string":"1.0000000002"}}`,
},
`format=experimental_avro,envelope=key_only,updated`: {
`format=avro,envelope=key_only,updated`: {
err: `updated is only usable with envelope=wrapped`,
},
`format=experimental_avro,envelope=key_only,diff`: {
`format=avro,envelope=key_only,diff`: {
err: `diff is only usable with envelope=wrapped`,
},
`format=experimental_avro,envelope=key_only,updated,diff`: {
`format=avro,envelope=key_only,updated,diff`: {
err: `updated is only usable with envelope=wrapped`,
},
`format=experimental_avro,envelope=row`: {
err: `envelope=row is not supported with format=experimental_avro`,
`format=avro,envelope=row`: {
err: `envelope=row is not supported with format=avro`,
},
`format=experimental_avro,envelope=row,updated`: {
err: `envelope=row is not supported with format=experimental_avro`,
`format=avro,envelope=row,updated`: {
err: `envelope=row is not supported with format=avro`,
},
`format=experimental_avro,envelope=row,diff`: {
err: `envelope=row is not supported with format=experimental_avro`,
`format=avro,envelope=row,diff`: {
err: `envelope=row is not supported with format=avro`,
},
`format=experimental_avro,envelope=row,updated,diff`: {
err: `envelope=row is not supported with format=experimental_avro`,
`format=avro,envelope=row,updated,diff`: {
err: `envelope=row is not supported with format=avro`,
},
`format=experimental_avro,envelope=wrapped`: {
`format=avro,envelope=wrapped`: {
insert: `{"a":{"long":1}}->` +
`{"after":{"foo":{"a":{"long":1},"b":{"string":"bar"}}}}`,
delete: `{"a":{"long":1}}->{"after":null}`,
resolved: `{"resolved":{"string":"1.0000000002"}}`,
},
`format=experimental_avro,envelope=wrapped,updated`: {
`format=avro,envelope=wrapped,updated`: {
insert: `{"a":{"long":1}}->` +
`{"after":{"foo":{"a":{"long":1},"b":{"string":"bar"}}},` +
`"updated":{"string":"1.0000000002"}}`,
delete: `{"a":{"long":1}}->{"after":null,"updated":{"string":"1.0000000002"}}`,
resolved: `{"resolved":{"string":"1.0000000002"}}`,
},
`format=experimental_avro,envelope=wrapped,diff`: {
`format=avro,envelope=wrapped,diff`: {
insert: `{"a":{"long":1}}->` +
`{"after":{"foo":{"a":{"long":1},"b":{"string":"bar"}}},` +
`"before":null}`,
Expand All @@ -164,7 +164,7 @@ func TestEncoders(t *testing.T) {
`"before":{"foo_before":{"a":{"long":1},"b":{"string":"bar"}}}}`,
resolved: `{"resolved":{"string":"1.0000000002"}}`,
},
`format=experimental_avro,envelope=wrapped,updated,diff`: {
`format=avro,envelope=wrapped,updated,diff`: {
insert: `{"a":{"long":1}}->` +
`{"after":{"foo":{"a":{"long":1},"b":{"string":"bar"}}},` +
`"before":null,` +
Expand Down Expand Up @@ -194,7 +194,7 @@ func TestEncoders(t *testing.T) {
case string(changefeedbase.OptFormatJSON):
rowStringFn = func(k, v []byte) string { return fmt.Sprintf(`%s->%s`, k, v) }
resolvedStringFn = func(r []byte) string { return string(r) }
case string(changefeedbase.OptFormatAvro):
case string(changefeedbase.OptFormatAvro), string(changefeedbase.DeprecatedOptFormatAvro):
reg := cdctest.StartTestSchemaRegistry()
defer reg.Close()
o[changefeedbase.OptConfluentSchemaRegistry] = reg.URL()
Expand Down Expand Up @@ -327,7 +327,7 @@ func TestAvroEncoderWithTLS(t *testing.T) {
ts := hlc.Timestamp{WallTime: 1, Logical: 2}

opts := map[string]string{
changefeedbase.OptFormat: "experimental_avro",
changefeedbase.OptFormat: "avro",
changefeedbase.OptEnvelope: "key_only",
}
expected := struct {
Expand All @@ -340,7 +340,7 @@ func TestAvroEncoderWithTLS(t *testing.T) {
resolved: `{"resolved":{"string":"1.0000000002"}}`,
}

t.Run("format=experimental_avro,envelope=key_only", func(t *testing.T) {
t.Run("format=avro,envelope=key_only", func(t *testing.T) {
cert, certBase64, err := cdctest.NewCACertBase64Encoded()
require.NoError(t, err)

Expand Down
25 changes: 25 additions & 0 deletions pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/lib/pq"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -647,6 +648,30 @@ func serverArgsRegion(args base.TestServerArgs) string {
return ""
}

// expectNotice creates a pretty crude database connection that doesn't involve
// a lot of cdc test framework, use with caution. Driver-agnostic tools don't
// have clean ways of inspecting incoming notices.
func expectNotice(t *testing.T, s serverutils.TestServerInterface, sql string, expected string) {
url, cleanup := sqlutils.PGUrl(t, s.ServingSQLAddr(), t.Name(), url.User(security.RootUser))
defer cleanup()
base, err := pq.NewConnector(url.String())
if err != nil {
t.Fatal(err)
}
actual := "(no notice)"
connector := pq.ConnectorWithNoticeHandler(base, func(n *pq.Error) {
actual = n.Message
})

dbWithHandler := gosql.OpenDB(connector)
defer dbWithHandler.Close()
sqlDB := sqlutils.MakeSQLRunner(dbWithHandler)

sqlDB.Exec(t, sql)

require.Equal(t, expected, actual)
}

func feed(
t testing.TB, f cdctest.TestFeedFactory, create string, args ...interface{},
) cdctest.TestFeed {
Expand Down

0 comments on commit ee3efd6

Please sign in to comment.