diff --git a/.bazelrc b/.bazelrc index 46e9b0725c1c..f3c28c80312f 100644 --- a/.bazelrc +++ b/.bazelrc @@ -1,12 +1,17 @@ try-import %workspace%/.bazelrc.user -build --symlink_prefix=_bazel/ --ui_event_filters=-DEBUG --define gotags=bazel,gss --experimental_proto_descriptor_sets_include_source_info --incompatible_strict_action_env --incompatible_enable_cc_toolchain_resolution +build --define gotags=bazel,gss +build --experimental_proto_descriptor_sets_include_source_info +build --incompatible_strict_action_env --incompatible_enable_cc_toolchain_resolution +build --symlink_prefix=_bazel/ --experimental_no_product_name_out_symlink test --config=test --experimental_ui_max_stdouterr_bytes=10485760 build:with_ui --define cockroach_with_ui=y build:test --define crdb_test=y build:race --@io_bazel_rules_go//go/config:race --test_env=GORACE=halt_on_error=1 --test_sharding_strategy=disabled test:test --test_env=TZ= test:race --test_timeout=1200,6000,18000,72000 + +build --ui_event_filters=-DEBUG query --ui_event_filters=-DEBUG # CI should always run with `--config=ci`. @@ -22,7 +27,7 @@ test:ci --test_tmpdir=/artifacts/tmp build:cross --stamp build:cross --define cockroach_cross=y -# cross-compilation configurations. Add e.g. --config=crosslinux to turn these on. +# Cross-compilation configurations. Add e.g. --config=crosslinux to turn these on. build:crosslinux --platforms=//build/toolchains:cross_linux build:crosslinux '--workspace_status_command=./build/bazelutil/stamp.sh x86_64-pc-linux-gnu' build:crosslinux --config=cross @@ -36,10 +41,9 @@ build:crosslinuxarm --platforms=//build/toolchains:cross_linux_arm build:crosslinuxarm '--workspace_status_command=./build/bazelutil/stamp.sh aarch64-unknown-linux-gnu' build:crosslinuxarm --config=cross -# developer configurations. Add e.g. --config=devdarwinx86_64 to turn these on. +# Developer configurations. Add e.g. --config=devdarwinx86_64 to turn these on. +# NB: This is consumed in `BUILD` files (see build/toolchains/BUILD.bazel). build:devdarwinx86_64 --platforms=//build/toolchains:darwin_x86_64 -# NOTE(ricky): This is consumed in `BUILD` files (see -# `build/toolchains/BUILD.bazel`). build:devdarwinx86_64 --config=dev build:dev --define cockroach_bazel_dev=y build:dev --stamp --workspace_status_command=./build/bazelutil/stamp.sh diff --git a/bazel-out b/bazel-out deleted file mode 100644 index 6fd24fbd9518..000000000000 --- a/bazel-out +++ /dev/null @@ -1,3 +0,0 @@ -This placeholder file prevents bazel from writing a symlink here which points to its output dir. -That symlink often causes tools that traverse the repo to get confused when they find lots of extra go files. -The _bazel/out symlink points to the same thing, but go doesn't traverse into paths that start with _. diff --git a/build/bazelutil/stamp.sh b/build/bazelutil/stamp.sh index 1c1267982280..9d663d257f19 100755 --- a/build/bazelutil/stamp.sh +++ b/build/bazelutil/stamp.sh @@ -38,9 +38,9 @@ fi # * https://docs.bazel.build/versions/main/user-manual.html#workspace_status # * https://github.com/bazelbuild/rules_go/blob/master/go/core.rst#defines-and-stamping cat <{"after": {"a": 1, "b": 1}, "before": null}`}, + payloadAfterUpdate: []string{`cc: [1]->{"after": {"a": 1, "b": 10}, "before": {"a": 1, "b": 1}}`}, + }, + `format="json",virtual_columns="null"`: { + formatOpt: changefeedbase.OptFormatJSON, + virtualColumnVisibility: changefeedbase.OptVirtualColumnsNull, + payloadAfterInsert: []string{`cc: [1]->{"after": {"a": 1, "b": 1, "c": null}, "before": null}`}, + payloadAfterUpdate: []string{`cc: [1]->{"after": {"a": 1, "b": 10, "c": null}, "before": {"a": 1, "b": 1, "c": null}}`}, + }, + `format="avro",virtual_columns="omitted"`: { + formatOpt: changefeedbase.OptFormatAvro, + virtualColumnVisibility: changefeedbase.OptVirtualColumnsOmitted, + payloadAfterInsert: []string{`cc: {"a":{"long":1}}->{"after":{"cc":{"a":{"long":1},"b":{"long":1}}},"before":null}`}, + payloadAfterUpdate: []string{`cc: {"a":{"long":1}}->{"after":{"cc":{"a":{"long":1},"b":{"long":10}}},"before":{"cc_before":{"a":{"long":1},"b":{"long":1}}}}`}, + }, + `format="avro",virtual_columns="null"`: { + formatOpt: changefeedbase.OptFormatAvro, + virtualColumnVisibility: changefeedbase.OptVirtualColumnsNull, + payloadAfterInsert: []string{`cc: {"a":{"long":1}}->{"after":{"cc":{"a":{"long":1},"b":{"long":1},"c":null}},"before":null}`}, + payloadAfterUpdate: []string{`cc: {"a":{"long":1}}->{"after":{"cc":{"a":{"long":1},"b":{"long":10},"c":null}},"before":{"cc_before":{"a":{"long":1},"b":{"long":1},"c":null}}}`}, + }, + } - cc := feed(t, f, `CREATE CHANGEFEED FOR cc with diff`) - defer closeFeed(t, cc) + for _, test := range tests { + testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(db) - assertPayloads(t, cc, []string{ - `cc: [1]->{"after": {"a": 1, "b": 1, "c": null}, "before": null}`, - }) + sqlDB.Exec(t, `CREATE TABLE cc ( + a INT primary key, b INT, c INT AS (b + 1) VIRTUAL NOT NULL + )`) + defer sqlDB.Exec(t, `DROP TABLE cc`) - sqlDB.Exec(t, `UPDATE cc SET b=10 WHERE a=1`) - assertPayloads(t, cc, []string{ - `cc: [1]->{"after": {"a": 1, "b": 10, "c": null}, "before": {"a": 1, "b": 1, "c": null}}`, - }) - } + sqlDB.Exec(t, `INSERT INTO cc VALUES (1, 1)`) - t.Run(`sinkless`, sinklessTest(testFn)) - t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`kafka`, kafkaTest(testFn)) - t.Run(`webhook`, webhookTest(testFn)) + changeFeed := feed(t, f, fmt.Sprintf(`CREATE CHANGEFEED FOR cc WITH diff, format="%s", virtual_columns="%s"`, + test.formatOpt, test.virtualColumnVisibility)) + defer closeFeed(t, changeFeed) + + assertPayloads(t, changeFeed, test.payloadAfterInsert) + + sqlDB.Exec(t, `UPDATE cc SET b=10 WHERE a=1`) + + assertPayloads(t, changeFeed, test.payloadAfterUpdate) + } + + if test.formatOpt != changefeedbase.OptFormatAvro { + t.Run(`sinkless`, sinklessTest(testFn)) + t.Run(`enterprise`, enterpriseTest(testFn)) + t.Run(`webhook`, webhookTest(testFn)) + } + + t.Run(`kafka`, kafkaTest(testFn)) + } } func TestChangefeedUpdatePrimaryKey(t *testing.T) { diff --git a/pkg/ccl/changefeedccl/changefeedbase/options.go b/pkg/ccl/changefeedccl/changefeedbase/options.go index fb3a1460c15e..083db065c617 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/options.go +++ b/pkg/ccl/changefeedccl/changefeedbase/options.go @@ -27,6 +27,10 @@ type SchemaChangeEventClass string // change event which is a member of the changefeed's schema change events. type SchemaChangePolicy string +// VirtualColumnVisibility defines the behaviour of how the changefeed will +// include virtual columns in an event +type VirtualColumnVisibility string + // Constants for the options. const ( OptAvroSchemaPrefix = `avro_schema_prefix` @@ -50,6 +54,10 @@ const ( OptWebhookClientTimeout = `webhook_client_timeout` OptOnError = `on_error` OptMetricsScope = `metrics_label` + OptVirtualColumns = `virtual_columns` + + OptVirtualColumnsOmitted VirtualColumnVisibility = `omitted` + OptVirtualColumnsNull VirtualColumnVisibility = `null` // OptSchemaChangeEventClassColumnChange corresponds to all schema change // events which add or remove any column. @@ -170,6 +178,7 @@ var ChangefeedOptionExpectValues = map[string]sql.KVStringOptValidate{ OptWebhookClientTimeout: sql.KVStringOptRequireValue, OptOnError: sql.KVStringOptRequireValue, OptMetricsScope: sql.KVStringOptRequireValue, + OptVirtualColumns: sql.KVStringOptRequireValue, } func makeStringSet(opts ...string) map[string]struct{} { @@ -189,7 +198,7 @@ var CommonOptions = makeStringSet(OptCursor, OptEnvelope, OptSchemaChangeEvents, OptSchemaChangePolicy, OptProtectDataFromGCOnPause, OptOnError, OptInitialScan, OptNoInitialScan, - OptMinCheckpointFrequency, OptMetricsScope) + OptMinCheckpointFrequency, OptMetricsScope, OptVirtualColumns) // SQLValidOptions is options exclusive to SQL sink var SQLValidOptions map[string]struct{} = nil diff --git a/pkg/ccl/changefeedccl/changefeedbase/validate.go b/pkg/ccl/changefeedccl/changefeedbase/validate.go index 9ade9779ff76..3027919bf8a0 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/validate.go +++ b/pkg/ccl/changefeedccl/changefeedbase/validate.go @@ -56,13 +56,17 @@ func ValidateTable(targets jobspb.ChangefeedTargets, tableDesc catalog.TableDesc } // WarningsForTable returns any known nonfatal issues with running a changefeed on this kind of table. -func WarningsForTable(targets jobspb.ChangefeedTargets, tableDesc catalog.TableDescriptor) []error { +func WarningsForTable( + targets jobspb.ChangefeedTargets, tableDesc catalog.TableDescriptor, opts map[string]string, +) []error { warnings := []error{} - for _, col := range tableDesc.AccessibleColumns() { - if col.IsVirtual() { - warnings = append(warnings, - errors.Errorf("Changefeeds will emit null values for virtual column %s in table %s", col.ColName(), tableDesc.GetName()), - ) + if _, ok := opts[OptVirtualColumns]; !ok { + for _, col := range tableDesc.AccessibleColumns() { + if col.IsVirtual() { + warnings = append(warnings, + errors.Errorf("Changefeeds will filter out values for virtual column %s in table %s", col.ColName(), tableDesc.GetName()), + ) + } } } return warnings diff --git a/pkg/ccl/changefeedccl/encoder.go b/pkg/ccl/changefeedccl/encoder.go index c9a148c959e8..e619f98f575a 100644 --- a/pkg/ccl/changefeedccl/encoder.go +++ b/pkg/ccl/changefeedccl/encoder.go @@ -102,9 +102,10 @@ func getEncoder(opts map[string]string, targets jobspb.ChangefeedTargets) (Encod type jsonEncoder struct { updatedField, mvccTimestampField, beforeField, wrapped, keyOnly, keyInValue, topicInValue bool - targets jobspb.ChangefeedTargets - alloc tree.DatumAlloc - buf bytes.Buffer + targets jobspb.ChangefeedTargets + alloc tree.DatumAlloc + buf bytes.Buffer + virtualColumnVisibility string } var _ Encoder = &jsonEncoder{} @@ -113,9 +114,10 @@ func makeJSONEncoder( opts map[string]string, targets jobspb.ChangefeedTargets, ) (*jsonEncoder, error) { e := &jsonEncoder{ - targets: targets, - keyOnly: changefeedbase.EnvelopeType(opts[changefeedbase.OptEnvelope]) == changefeedbase.OptEnvelopeKeyOnly, - wrapped: changefeedbase.EnvelopeType(opts[changefeedbase.OptEnvelope]) == changefeedbase.OptEnvelopeWrapped, + targets: targets, + keyOnly: changefeedbase.EnvelopeType(opts[changefeedbase.OptEnvelope]) == changefeedbase.OptEnvelopeKeyOnly, + wrapped: changefeedbase.EnvelopeType(opts[changefeedbase.OptEnvelope]) == changefeedbase.OptEnvelopeWrapped, + virtualColumnVisibility: opts[changefeedbase.OptVirtualColumns], } _, e.updatedField = opts[changefeedbase.OptUpdatedTimestamps] _, e.mvccTimestampField = opts[changefeedbase.OptMVCCTimestamps] @@ -199,8 +201,11 @@ func (e *jsonEncoder) EncodeValue(_ context.Context, row encodeRow) ([]byte, err var after map[string]interface{} if !row.deleted { columns := row.tableDesc.PublicColumns() - after = make(map[string]interface{}, len(columns)) + after = make(map[string]interface{}) for i, col := range columns { + if col.IsVirtual() && e.virtualColumnVisibility == string(changefeedbase.OptVirtualColumnsOmitted) { + continue + } datum := row.datums[i] if err := datum.EnsureDecoded(col.GetType(), &e.alloc); err != nil { return nil, err @@ -220,8 +225,11 @@ func (e *jsonEncoder) EncodeValue(_ context.Context, row encodeRow) ([]byte, err var before map[string]interface{} if row.prevDatums != nil && !row.prevDeleted { columns := row.prevTableDesc.PublicColumns() - before = make(map[string]interface{}, len(columns)) + before = make(map[string]interface{}) for i, col := range columns { + if col.IsVirtual() && e.virtualColumnVisibility == string(changefeedbase.OptVirtualColumnsOmitted) { + continue + } datum := row.prevDatums[i] if err := datum.EnsureDecoded(col.GetType(), &e.alloc); err != nil { return nil, err @@ -321,6 +329,7 @@ type confluentAvroEncoder struct { schemaPrefix string updatedField, beforeField, keyOnly bool targets jobspb.ChangefeedTargets + virtualColumnVisibility string keyCache *cache.UnorderedCache // [tableIDAndVersion]confluentRegisteredKeySchema valueCache *cache.UnorderedCache // [tableIDAndVersionPair]confluentRegisteredEnvelopeSchema @@ -361,8 +370,9 @@ func newConfluentAvroEncoder( opts map[string]string, targets jobspb.ChangefeedTargets, ) (*confluentAvroEncoder, error) { e := &confluentAvroEncoder{ - schemaPrefix: opts[changefeedbase.OptAvroSchemaPrefix], - targets: targets, + schemaPrefix: opts[changefeedbase.OptAvroSchemaPrefix], + targets: targets, + virtualColumnVisibility: opts[changefeedbase.OptVirtualColumns], } switch opts[changefeedbase.OptEnvelope] { @@ -475,13 +485,13 @@ func (e *confluentAvroEncoder) EncodeValue(ctx context.Context, row encodeRow) ( var beforeDataSchema *avroDataRecord if e.beforeField && row.prevTableDesc != nil { var err error - beforeDataSchema, err = tableToAvroSchema(row.prevTableDesc, `before`, e.schemaPrefix) + beforeDataSchema, err = tableToAvroSchema(row.prevTableDesc, `before`, e.schemaPrefix, e.virtualColumnVisibility) if err != nil { return nil, err } } - afterDataSchema, err := tableToAvroSchema(row.tableDesc, avroSchemaNoSuffix, e.schemaPrefix) + afterDataSchema, err := tableToAvroSchema(row.tableDesc, avroSchemaNoSuffix, e.schemaPrefix, e.virtualColumnVisibility) if err != nil { return nil, err } diff --git a/pkg/cmd/dev/doctor.go b/pkg/cmd/dev/doctor.go index 63713806ed8b..642922a295cf 100644 --- a/pkg/cmd/dev/doctor.go +++ b/pkg/cmd/dev/doctor.go @@ -78,14 +78,18 @@ Please perform the following steps: if err != nil { return err } - if !strings.Contains(fileContents, "STABLE_BUILD_GIT_COMMIT") { + if !strings.Contains(fileContents, "STABLE_BUILD_GIT_BUILD_TYPE") { passedStampTest = false } } + workspace, err := d.getWorkspace(ctx) + if err != nil { + return err + } if !passedStampTest { success = false log.Printf(`Your machine is not configured to "stamp" your built executables. -Please add one of the following to your ~/.bazelrc:`) +Please add one of the following to your %s/.bazelrc.user:`, workspace) if runtime.GOOS == "darwin" && runtime.GOARCH == "amd64" { log.Printf(" build --config=devdarwinx86_64") } else if runtime.GOOS == "linux" && runtime.GOARCH == "amd64" {