diff --git a/pkg/ccl/changefeedccl/changefeed.go b/pkg/ccl/changefeedccl/changefeed.go index 4b842036e856..d336ed06e666 100644 --- a/pkg/ccl/changefeedccl/changefeed.go +++ b/pkg/ccl/changefeedccl/changefeed.go @@ -87,7 +87,9 @@ func AllTargets(cd jobspb.ChangefeedDetails) (targets changefeedbase.Targets) { } const ( - jsonMetaSentinel = `__crdb__` + // metaSentinel is a key or prefix used to mark metadata fields or columns + // into rows returned by an encoder. + metaSentinel = `__crdb__` ) // emitResolvedTimestamp emits a changefeed-level resolved timestamp to the diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index c7698c94bfcd..e20522641e10 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -6282,8 +6282,10 @@ func TestChangefeedPropagatesTerminalError(t *testing.T) { DefaultTestTenant: base.TestTenantDisabled, Knobs: base.TestingKnobs{ DistSQL: &execinfra.TestingKnobs{ - DrainFast: true, - Changefeed: &TestingKnobs{}, + DrainFast: true, + Changefeed: &TestingKnobs{ + EnableParquetMetadata: true, + }, }, JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), }, diff --git a/pkg/ccl/changefeedccl/changefeedbase/options.go b/pkg/ccl/changefeedccl/changefeedbase/options.go index fa4eded0daca..6be98c32925a 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/options.go +++ b/pkg/ccl/changefeedccl/changefeedbase/options.go @@ -443,7 +443,7 @@ var InitialScanOnlyUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, Opt // columns, so there is no reason to emit duplicate key datums. // // TODO(#103129): add support for some of these -var ParquetFormatUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, OptResolvedTimestamps, OptDiff, +var ParquetFormatUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, OptDiff, OptMVCCTimestamps, OptUpdatedTimestamps, OptKeyInValue) // AlterChangefeedUnsupportedOptions are changefeed options that we do not allow diff --git a/pkg/ccl/changefeedccl/encoder_json.go b/pkg/ccl/changefeedccl/encoder_json.go index 8d181cb86766..4e84cb1c6b9a 100644 --- a/pkg/ccl/changefeedccl/encoder_json.go +++ b/pkg/ccl/changefeedccl/encoder_json.go @@ -186,7 +186,7 @@ func (e *versionEncoder) rowAsGoNative( if !row.HasValues() || (emitDeletedRowAsNull && row.IsDeleted()) { if meta != nil { b := json.NewObjectBuilder(1) - b.Add(jsonMetaSentinel, meta) + b.Add(metaSentinel, meta) return b.Build(), nil } return json.NullJSONValue, nil @@ -199,7 +199,7 @@ func (e *versionEncoder) rowAsGoNative( return nil }) if meta != nil { - keys = append(keys, jsonMetaSentinel) + keys = append(keys, metaSentinel) } b, err := json.NewFixedKeysObjectBuilder(keys) if err != nil { @@ -219,7 +219,7 @@ func (e *versionEncoder) rowAsGoNative( } if meta != nil { - if err := e.valueBuilder.Set(jsonMetaSentinel, meta); err != nil { + if err := e.valueBuilder.Set(metaSentinel, meta); err != nil { return nil, err } } @@ -404,7 +404,7 @@ func (e *jsonEncoder) EncodeResolvedTimestamp( jsonEntries = meta } else { jsonEntries = map[string]interface{}{ - jsonMetaSentinel: meta, + metaSentinel: meta, } } return gojson.Marshal(jsonEntries) diff --git a/pkg/ccl/changefeedccl/helpers_test.go b/pkg/ccl/changefeedccl/helpers_test.go index 43a38bec37e6..dfba5ca8dfb0 100644 --- a/pkg/ccl/changefeedccl/helpers_test.go +++ b/pkg/ccl/changefeedccl/helpers_test.go @@ -325,6 +325,12 @@ func expectResolvedTimestamp(t testing.TB, f cdctest.TestFeed) (hlc.Timestamp, s return extractResolvedTimestamp(t, m), m.Partition } +// resolvedRaw represents a JSON object containing the single key "resolved" +// with a resolved timestamp value. +type resolvedRaw struct { + Resolved string `json:"resolved"` +} + func extractResolvedTimestamp(t testing.TB, m *cdctest.TestFeedMessage) hlc.Timestamp { t.Helper() if m.Key != nil { @@ -334,14 +340,12 @@ func extractResolvedTimestamp(t testing.TB, m *cdctest.TestFeedMessage) hlc.Time t.Fatal(`expected a resolved timestamp notification`) } - var resolvedRaw struct { - Resolved string `json:"resolved"` - } - if err := gojson.Unmarshal(m.Resolved, &resolvedRaw); err != nil { + var resolved resolvedRaw + if err := gojson.Unmarshal(m.Resolved, &resolved); err != nil { t.Fatal(err) } - return parseTimeToHLC(t, resolvedRaw.Resolved) + return parseTimeToHLC(t, resolved.Resolved) } func expectResolvedTimestampAvro(t testing.TB, f cdctest.TestFeed) hlc.Timestamp { diff --git a/pkg/ccl/changefeedccl/parquet.go b/pkg/ccl/changefeedccl/parquet.go index 632f78227e3a..d3a8b1726321 100644 --- a/pkg/ccl/changefeedccl/parquet.go +++ b/pkg/ccl/changefeedccl/parquet.go @@ -66,19 +66,12 @@ func newParquetWriterFromRow( return nil, err } - writerConstructor := parquet.NewWriter - - if knobs.EnableParquetMetadata { + if knobs != nil && knobs.EnableParquetMetadata { if opts, err = addParquetTestMetadata(row, opts); err != nil { return nil, err } - - // To use parquet test utils for reading datums, the writer needs to be - // configured with additional metadata. - writerConstructor = parquet.NewWriterWithReaderMeta } - - writer, err := writerConstructor(schemaDef, sink, opts...) + writer, err := newParquetWriter(schemaDef, sink, knobs, opts...) if err != nil { return nil, err } @@ -216,3 +209,20 @@ func deserializeMap(s string) (orderedKeys []string, m map[string]int, err error } return orderedKeys, m, nil } + +// newParquetWriter allocates a new parquet writer using the provided +// schema definition. +func newParquetWriter( + sch *parquet.SchemaDefinition, + sink io.Writer, + knobs *TestingKnobs, /* may be nil */ + opts ...parquet.Option, +) (*parquet.Writer, error) { + if knobs != nil && knobs.EnableParquetMetadata { + // To use parquet test utils for reading datums, the writer needs to be + // configured with additional metadata. + return parquet.NewWriterWithReaderMeta(sch, sink, opts...) + } + + return parquet.NewWriter(sch, sink, opts...) +} diff --git a/pkg/ccl/changefeedccl/parquet_sink_cloudstorage.go b/pkg/ccl/changefeedccl/parquet_sink_cloudstorage.go index 78401efaff29..1ca96cccab84 100644 --- a/pkg/ccl/changefeedccl/parquet_sink_cloudstorage.go +++ b/pkg/ccl/changefeedccl/parquet_sink_cloudstorage.go @@ -9,12 +9,19 @@ package changefeedccl import ( + "bytes" "context" + "fmt" + "path/filepath" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent" + "github.com/cockroachdb/cockroach/pkg/cloud" + "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/parquet" "github.com/cockroachdb/errors" ) @@ -22,7 +29,7 @@ import ( // This is an extra column that will be added to every parquet file which tells // us about the type of event that generated a particular row. The types are // defined below. -const parquetCrdbEventTypeColName string = "__crdb_event_type__" +const parquetCrdbEventTypeColName string = "__crdb__event_type__" type parquetEventType int @@ -117,9 +124,48 @@ func (parquetSink *parquetCloudStorageSink) Dial() error { // EmitResolvedTimestamp does not do anything as of now. It is there to // implement Sink interface. func (parquetSink *parquetCloudStorageSink) EmitResolvedTimestamp( - ctx context.Context, encoder Encoder, resolved hlc.Timestamp, -) error { - return errors.AssertionFailedf("Parquet format does not support emitting resolved timestamp") + ctx context.Context, _ Encoder, resolved hlc.Timestamp, +) (err error) { + // TODO: There should be a better way to check if the sink is closed. + // This is copied from the wrapped sink's EmitResolvedTimestamp() + // method. + if parquetSink.wrapped.files == nil { + return errors.New(`cannot EmitRow on a closed sink`) + } + + defer parquetSink.wrapped.metrics.recordResolvedCallback()() + + if err := parquetSink.wrapped.waitAsyncFlush(ctx); err != nil { + return errors.Wrapf(err, "while emitting resolved timestamp") + } + + var buf bytes.Buffer + sch, err := parquet.NewSchema([]string{metaSentinel + "resolved"}, []*types.T{types.Decimal}) + if err != nil { + return err + } + + // TODO: Ideally, we do not create a new schema and writer every time + // we emit a resolved timestamp. Currently, util/parquet does not support it. + writer, err := newParquetWriter(sch, &buf, parquetSink.wrapped.testingKnobs) + if err != nil { + return err + } + + if err := writer.AddRow([]tree.Datum{eval.TimestampToDecimalDatum(resolved)}); err != nil { + return err + } + + if err := writer.Close(); err != nil { + return err + } + + part := resolved.GoTime().Format(parquetSink.wrapped.partitionFormat) + filename := fmt.Sprintf(`%s.RESOLVED`, cloudStorageFormatTime(resolved)) + if log.V(1) { + log.Infof(ctx, "writing file %s %s", filename, resolved.AsOfSystemTime()) + } + return cloud.WriteFile(ctx, parquetSink.wrapped.es, filepath.Join(part, filename), &buf) } // Flush implements the Sink interface. diff --git a/pkg/ccl/changefeedccl/parquet_test.go b/pkg/ccl/changefeedccl/parquet_test.go index 886352c17dc8..0f1772ea4a5d 100644 --- a/pkg/ccl/changefeedccl/parquet_test.go +++ b/pkg/ccl/changefeedccl/parquet_test.go @@ -22,12 +22,14 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/parquet" + "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -162,3 +164,29 @@ func makeRangefeedReaderAndDecoder( require.NoError(t, err) return popRow, cleanup, decoder } + +func TestParquetResolvedTimestamps(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(s.DB) + sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`) + + foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH format=parquet, resolved='10ms'`) + defer closeFeed(t, foo) + + firstResolved, _ := expectResolvedTimestamp(t, foo) + testutils.SucceedsSoon(t, func() error { + nextResolved, _ := expectResolvedTimestamp(t, foo) + if !firstResolved.Less(nextResolved) { + return errors.AssertionFailedf( + "expected resolved timestamp %s to eventually exceed timestamp %s", + nextResolved, firstResolved) + } + return nil + }) + } + + cdcTest(t, testFn, feedTestForceSink("cloudstorage")) +} diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage.go b/pkg/ccl/changefeedccl/sink_cloudstorage.go index a3ca5bb532a2..b26c632ecae1 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage.go @@ -557,6 +557,7 @@ func (s *cloudStorageSink) EmitRow( func (s *cloudStorageSink) EmitResolvedTimestamp( ctx context.Context, encoder Encoder, resolved hlc.Timestamp, ) error { + // TODO: There should be a better way to check if the sink is closed. if s.files == nil { return errors.New(`cannot EmitRow on a closed sink`) } diff --git a/pkg/ccl/changefeedccl/testfeed_test.go b/pkg/ccl/changefeedccl/testfeed_test.go index b53060e79ced..de6662574316 100644 --- a/pkg/ccl/changefeedccl/testfeed_test.go +++ b/pkg/ccl/changefeedccl/testfeed_test.go @@ -40,6 +40,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/distsql" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -1052,7 +1053,11 @@ func (f *cloudFeedFactory) Feed( // Determine if we can enable the parquet format if the changefeed is not // being created with incompatible options. If it can be enabled, we will use // parquet format with a probability of 0.4. - parquetPossible := true + // + // TODO: Consider making this knob a global flag so tests that don't + // initialize testing knobs can use parquet metamorphically. + knobs := f.s.TestingKnobs().DistSQL.(*execinfra.TestingKnobs).Changefeed + parquetPossible := knobs != nil && knobs.(*TestingKnobs).EnableParquetMetadata explicitEnvelope := false for _, opt := range createStmt.Options { if string(opt.Key) == changefeedbase.OptEnvelope { @@ -1175,19 +1180,19 @@ func extractFieldFromJSONValue( if isBare { meta := make(map[string]gojson.RawMessage) - if metaVal, haveMeta := parsed[jsonMetaSentinel]; haveMeta { + if metaVal, haveMeta := parsed[metaSentinel]; haveMeta { if err := gojson.Unmarshal(metaVal, &meta); err != nil { return nil, nil, errors.Wrapf(err, "unmarshalling json %v", metaVal) } field = meta[fieldName] delete(meta, fieldName) if len(meta) == 0 { - delete(parsed, jsonMetaSentinel) + delete(parsed, metaSentinel) } else { if metaVal, err = reformatJSON(meta); err != nil { return nil, nil, err } - parsed[jsonMetaSentinel] = metaVal + parsed[metaSentinel] = metaVal } } } else { @@ -1340,6 +1345,29 @@ func (c *cloudFeed) appendParquetTestFeedMessages( return nil } +// readParquetResolvedPayload reads a resolved timestamp value from the +// specified parquet file and returns it encoded as JSON. +func (c *cloudFeed) readParquetResolvedPayload(path string) ([]byte, error) { + meta, datums, err := parquet.ReadFile(path) + if err != nil { + return nil, err + } + + if meta.NumRows != 1 || meta.NumCols != 1 { + return nil, errors.AssertionFailedf("expected one row with one col containing the resolved timestamp") + } + + resolvedDatum := datums[0][0] + + resolved := resolvedRaw{Resolved: resolvedDatum.String()} + resolvedBytes, err := gojson.Marshal(resolved) + if err != nil { + return nil, err + } + + return resolvedBytes, nil +} + // Next implements the TestFeed interface. func (c *cloudFeed) Next() (*cdctest.TestFeedMessage, error) { for { @@ -1413,11 +1441,27 @@ func (c *cloudFeed) walkDir(path string, info os.FileInfo, err error) error { return nil } + details, err := c.Details() + if err != nil { + return err + } + format := changefeedbase.FormatType(details.Opts[changefeedbase.OptFormat]) + if strings.HasSuffix(path, `RESOLVED`) { - resolvedPayload, err := os.ReadFile(path) - if err != nil { - return err + var resolvedPayload []byte + var err error + if format == changefeedbase.OptFormatParquet { + resolvedPayload, err = c.readParquetResolvedPayload(path) + if err != nil { + return err + } + } else { + resolvedPayload, err = os.ReadFile(path) + if err != nil { + return err + } } + resolvedEntry := &cdctest.TestFeedMessage{Resolved: resolvedPayload} c.rows = append(c.rows, resolvedEntry) c.resolved = path @@ -1439,12 +1483,6 @@ func (c *cloudFeed) walkDir(path string, info os.FileInfo, err error) error { return err } defer f.Close() - details, err := c.Details() - if err != nil { - return err - } - - format := changefeedbase.FormatType(details.Opts[changefeedbase.OptFormat]) if format == changefeedbase.OptFormatParquet { envelopeType := changefeedbase.EnvelopeType(details.Opts[changefeedbase.OptEnvelope])