Skip to content

Commit

Permalink
changefeedccl: support the resolved option with format=parquet
Browse files Browse the repository at this point in the history
Previously, `format=parquet` and `resolved` could not be used
together when running changefeeds. This change adds support for
this.

The release note is being left intentionally blank for a future
commit.

Informs: #103129
Release note: None
  • Loading branch information
jayshrivastava committed Jun 5, 2023
1 parent 76c337a commit 07101ad
Show file tree
Hide file tree
Showing 10 changed files with 170 additions and 39 deletions.
4 changes: 3 additions & 1 deletion pkg/ccl/changefeedccl/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ func AllTargets(cd jobspb.ChangefeedDetails) (targets changefeedbase.Targets) {
}

const (
jsonMetaSentinel = `__crdb__`
// metaSentinel is a key or prefix used to mark metadata fields or columns
// into rows returned by an encoder.
metaSentinel = `__crdb__`
)

// emitResolvedTimestamp emits a changefeed-level resolved timestamp to the
Expand Down
6 changes: 4 additions & 2 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6282,8 +6282,10 @@ func TestChangefeedPropagatesTerminalError(t *testing.T) {
DefaultTestTenant: base.TestTenantDisabled,
Knobs: base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{
DrainFast: true,
Changefeed: &TestingKnobs{},
DrainFast: true,
Changefeed: &TestingKnobs{
EnableParquetMetadata: true,
},
},
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ var InitialScanOnlyUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, Opt
// columns, so there is no reason to emit duplicate key datums.
//
// TODO(#103129): add support for some of these
var ParquetFormatUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, OptResolvedTimestamps, OptDiff,
var ParquetFormatUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, OptDiff,
OptMVCCTimestamps, OptUpdatedTimestamps, OptKeyInValue)

// AlterChangefeedUnsupportedOptions are changefeed options that we do not allow
Expand Down
8 changes: 4 additions & 4 deletions pkg/ccl/changefeedccl/encoder_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (e *versionEncoder) rowAsGoNative(
if !row.HasValues() || (emitDeletedRowAsNull && row.IsDeleted()) {
if meta != nil {
b := json.NewObjectBuilder(1)
b.Add(jsonMetaSentinel, meta)
b.Add(metaSentinel, meta)
return b.Build(), nil
}
return json.NullJSONValue, nil
Expand All @@ -199,7 +199,7 @@ func (e *versionEncoder) rowAsGoNative(
return nil
})
if meta != nil {
keys = append(keys, jsonMetaSentinel)
keys = append(keys, metaSentinel)
}
b, err := json.NewFixedKeysObjectBuilder(keys)
if err != nil {
Expand All @@ -219,7 +219,7 @@ func (e *versionEncoder) rowAsGoNative(
}

if meta != nil {
if err := e.valueBuilder.Set(jsonMetaSentinel, meta); err != nil {
if err := e.valueBuilder.Set(metaSentinel, meta); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -404,7 +404,7 @@ func (e *jsonEncoder) EncodeResolvedTimestamp(
jsonEntries = meta
} else {
jsonEntries = map[string]interface{}{
jsonMetaSentinel: meta,
metaSentinel: meta,
}
}
return gojson.Marshal(jsonEntries)
Expand Down
14 changes: 9 additions & 5 deletions pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,12 @@ func expectResolvedTimestamp(t testing.TB, f cdctest.TestFeed) (hlc.Timestamp, s
return extractResolvedTimestamp(t, m), m.Partition
}

// resolvedRaw represents a JSON object containing the single key "resolved"
// with a resolved timestamp value.
type resolvedRaw struct {
Resolved string `json:"resolved"`
}

func extractResolvedTimestamp(t testing.TB, m *cdctest.TestFeedMessage) hlc.Timestamp {
t.Helper()
if m.Key != nil {
Expand All @@ -334,14 +340,12 @@ func extractResolvedTimestamp(t testing.TB, m *cdctest.TestFeedMessage) hlc.Time
t.Fatal(`expected a resolved timestamp notification`)
}

var resolvedRaw struct {
Resolved string `json:"resolved"`
}
if err := gojson.Unmarshal(m.Resolved, &resolvedRaw); err != nil {
var resolved resolvedRaw
if err := gojson.Unmarshal(m.Resolved, &resolved); err != nil {
t.Fatal(err)
}

return parseTimeToHLC(t, resolvedRaw.Resolved)
return parseTimeToHLC(t, resolved.Resolved)
}

func expectResolvedTimestampAvro(t testing.TB, f cdctest.TestFeed) hlc.Timestamp {
Expand Down
28 changes: 19 additions & 9 deletions pkg/ccl/changefeedccl/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,12 @@ func newParquetWriterFromRow(
return nil, err
}

writerConstructor := parquet.NewWriter

if knobs.EnableParquetMetadata {
if knobs != nil && knobs.EnableParquetMetadata {
if opts, err = addParquetTestMetadata(row, opts); err != nil {
return nil, err
}

// To use parquet test utils for reading datums, the writer needs to be
// configured with additional metadata.
writerConstructor = parquet.NewWriterWithReaderMeta
}

writer, err := writerConstructor(schemaDef, sink, opts...)
writer, err := newParquetWriter(schemaDef, sink, knobs, opts...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -216,3 +209,20 @@ func deserializeMap(s string) (orderedKeys []string, m map[string]int, err error
}
return orderedKeys, m, nil
}

// newParquetWriter allocates a new parquet writer using the provided
// schema definition.
func newParquetWriter(
sch *parquet.SchemaDefinition,
sink io.Writer,
knobs *TestingKnobs, /* may be nil */
opts ...parquet.Option,
) (*parquet.Writer, error) {
if knobs != nil && knobs.EnableParquetMetadata {
// To use parquet test utils for reading datums, the writer needs to be
// configured with additional metadata.
return parquet.NewWriterWithReaderMeta(sch, sink, opts...)
}

return parquet.NewWriter(sch, sink, opts...)
}
54 changes: 50 additions & 4 deletions pkg/ccl/changefeedccl/parquet_sink_cloudstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,27 @@
package changefeedccl

import (
"bytes"
"context"
"fmt"
"path/filepath"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent"
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/parquet"
"github.com/cockroachdb/errors"
)

// This is an extra column that will be added to every parquet file which tells
// us about the type of event that generated a particular row. The types are
// defined below.
const parquetCrdbEventTypeColName string = "__crdb_event_type__"
const parquetCrdbEventTypeColName string = "__crdb__event_type__"

type parquetEventType int

Expand Down Expand Up @@ -117,9 +124,48 @@ func (parquetSink *parquetCloudStorageSink) Dial() error {
// EmitResolvedTimestamp does not do anything as of now. It is there to
// implement Sink interface.
func (parquetSink *parquetCloudStorageSink) EmitResolvedTimestamp(
ctx context.Context, encoder Encoder, resolved hlc.Timestamp,
) error {
return errors.AssertionFailedf("Parquet format does not support emitting resolved timestamp")
ctx context.Context, _ Encoder, resolved hlc.Timestamp,
) (err error) {
// TODO: There should be a better way to check if the sink is closed.
// This is copied from the wrapped sink's EmitResolvedTimestamp()
// method.
if parquetSink.wrapped.files == nil {
return errors.New(`cannot EmitRow on a closed sink`)
}

defer parquetSink.wrapped.metrics.recordResolvedCallback()()

if err := parquetSink.wrapped.waitAsyncFlush(ctx); err != nil {
return errors.Wrapf(err, "while emitting resolved timestamp")
}

var buf bytes.Buffer
sch, err := parquet.NewSchema([]string{metaSentinel + "resolved"}, []*types.T{types.Decimal})
if err != nil {
return err
}

// TODO: Ideally, we do not create a new schema and writer every time
// we emit a resolved timestamp. Currently, util/parquet does not support it.
writer, err := newParquetWriter(sch, &buf, parquetSink.wrapped.testingKnobs)
if err != nil {
return err
}

if err := writer.AddRow([]tree.Datum{eval.TimestampToDecimalDatum(resolved)}); err != nil {
return err
}

if err := writer.Close(); err != nil {
return err
}

part := resolved.GoTime().Format(parquetSink.wrapped.partitionFormat)
filename := fmt.Sprintf(`%s.RESOLVED`, cloudStorageFormatTime(resolved))
if log.V(1) {
log.Infof(ctx, "writing file %s %s", filename, resolved.AsOfSystemTime())
}
return cloud.WriteFile(ctx, parquetSink.wrapped.es, filepath.Join(part, filename), &buf)
}

// Flush implements the Sink interface.
Expand Down
28 changes: 28 additions & 0 deletions pkg/ccl/changefeedccl/parquet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/parquet"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -162,3 +164,29 @@ func makeRangefeedReaderAndDecoder(
require.NoError(t, err)
return popRow, cleanup, decoder
}

func TestParquetResolvedTimestamps(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)

foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH format=parquet, resolved='10ms'`)
defer closeFeed(t, foo)

firstResolved, _ := expectResolvedTimestamp(t, foo)
testutils.SucceedsSoon(t, func() error {
nextResolved, _ := expectResolvedTimestamp(t, foo)
if !firstResolved.Less(nextResolved) {
return errors.AssertionFailedf(
"expected resolved timestamp %s to eventually exceed timestamp %s",
nextResolved, firstResolved)
}
return nil
})
}

cdcTest(t, testFn, feedTestForceSink("cloudstorage"))
}
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/sink_cloudstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,7 @@ func (s *cloudStorageSink) EmitRow(
func (s *cloudStorageSink) EmitResolvedTimestamp(
ctx context.Context, encoder Encoder, resolved hlc.Timestamp,
) error {
// TODO: There should be a better way to check if the sink is closed.
if s.files == nil {
return errors.New(`cannot EmitRow on a closed sink`)
}
Expand Down
64 changes: 51 additions & 13 deletions pkg/ccl/changefeedccl/testfeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/distsql"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -1052,7 +1053,11 @@ func (f *cloudFeedFactory) Feed(
// Determine if we can enable the parquet format if the changefeed is not
// being created with incompatible options. If it can be enabled, we will use
// parquet format with a probability of 0.4.
parquetPossible := true
//
// TODO: Consider making this knob a global flag so tests that don't
// initialize testing knobs can use parquet metamorphically.
knobs := f.s.TestingKnobs().DistSQL.(*execinfra.TestingKnobs).Changefeed
parquetPossible := knobs != nil && knobs.(*TestingKnobs).EnableParquetMetadata
explicitEnvelope := false
for _, opt := range createStmt.Options {
if string(opt.Key) == changefeedbase.OptEnvelope {
Expand Down Expand Up @@ -1175,19 +1180,19 @@ func extractFieldFromJSONValue(

if isBare {
meta := make(map[string]gojson.RawMessage)
if metaVal, haveMeta := parsed[jsonMetaSentinel]; haveMeta {
if metaVal, haveMeta := parsed[metaSentinel]; haveMeta {
if err := gojson.Unmarshal(metaVal, &meta); err != nil {
return nil, nil, errors.Wrapf(err, "unmarshalling json %v", metaVal)
}
field = meta[fieldName]
delete(meta, fieldName)
if len(meta) == 0 {
delete(parsed, jsonMetaSentinel)
delete(parsed, metaSentinel)
} else {
if metaVal, err = reformatJSON(meta); err != nil {
return nil, nil, err
}
parsed[jsonMetaSentinel] = metaVal
parsed[metaSentinel] = metaVal
}
}
} else {
Expand Down Expand Up @@ -1340,6 +1345,29 @@ func (c *cloudFeed) appendParquetTestFeedMessages(
return nil
}

// readParquetResolvedPayload reads a resolved timestamp value from the
// specified parquet file and returns it encoded as JSON.
func (c *cloudFeed) readParquetResolvedPayload(path string) ([]byte, error) {
meta, datums, err := parquet.ReadFile(path)
if err != nil {
return nil, err
}

if meta.NumRows != 1 || meta.NumCols != 1 {
return nil, errors.AssertionFailedf("expected one row with one col containing the resolved timestamp")
}

resolvedDatum := datums[0][0]

resolved := resolvedRaw{Resolved: resolvedDatum.String()}
resolvedBytes, err := gojson.Marshal(resolved)
if err != nil {
return nil, err
}

return resolvedBytes, nil
}

// Next implements the TestFeed interface.
func (c *cloudFeed) Next() (*cdctest.TestFeedMessage, error) {
for {
Expand Down Expand Up @@ -1413,11 +1441,27 @@ func (c *cloudFeed) walkDir(path string, info os.FileInfo, err error) error {
return nil
}

details, err := c.Details()
if err != nil {
return err
}
format := changefeedbase.FormatType(details.Opts[changefeedbase.OptFormat])

if strings.HasSuffix(path, `RESOLVED`) {
resolvedPayload, err := os.ReadFile(path)
if err != nil {
return err
var resolvedPayload []byte
var err error
if format == changefeedbase.OptFormatParquet {
resolvedPayload, err = c.readParquetResolvedPayload(path)
if err != nil {
return err
}
} else {
resolvedPayload, err = os.ReadFile(path)
if err != nil {
return err
}
}

resolvedEntry := &cdctest.TestFeedMessage{Resolved: resolvedPayload}
c.rows = append(c.rows, resolvedEntry)
c.resolved = path
Expand All @@ -1439,12 +1483,6 @@ func (c *cloudFeed) walkDir(path string, info os.FileInfo, err error) error {
return err
}
defer f.Close()
details, err := c.Details()
if err != nil {
return err
}

format := changefeedbase.FormatType(details.Opts[changefeedbase.OptFormat])

if format == changefeedbase.OptFormatParquet {
envelopeType := changefeedbase.EnvelopeType(details.Opts[changefeedbase.OptEnvelope])
Expand Down

0 comments on commit 07101ad

Please sign in to comment.