Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: support the resolved option with format=parquet #104283

Merged
merged 1 commit into from
Jun 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pkg/ccl/changefeedccl/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ func AllTargets(cd jobspb.ChangefeedDetails) (targets changefeedbase.Targets) {
}

const (
jsonMetaSentinel = `__crdb__`
// metaSentinel is a key or prefix used to mark metadata fields or columns
// into rows returned by an encoder.
metaSentinel = `__crdb__`
)

// emitResolvedTimestamp emits a changefeed-level resolved timestamp to the
Expand Down
6 changes: 4 additions & 2 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6282,8 +6282,10 @@ func TestChangefeedPropagatesTerminalError(t *testing.T) {
DefaultTestTenant: base.TestTenantDisabled,
Knobs: base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{
DrainFast: true,
Changefeed: &TestingKnobs{},
DrainFast: true,
Changefeed: &TestingKnobs{
EnableParquetMetadata: true,
},
},
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ var InitialScanOnlyUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, Opt
// columns, so there is no reason to emit duplicate key datums.
//
// TODO(#103129): add support for some of these
var ParquetFormatUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, OptResolvedTimestamps, OptDiff,
var ParquetFormatUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, OptDiff,
OptMVCCTimestamps, OptUpdatedTimestamps, OptKeyInValue)

// AlterChangefeedUnsupportedOptions are changefeed options that we do not allow
Expand Down
8 changes: 4 additions & 4 deletions pkg/ccl/changefeedccl/encoder_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (e *versionEncoder) rowAsGoNative(
if !row.HasValues() || (emitDeletedRowAsNull && row.IsDeleted()) {
if meta != nil {
b := json.NewObjectBuilder(1)
b.Add(jsonMetaSentinel, meta)
b.Add(metaSentinel, meta)
return b.Build(), nil
}
return json.NullJSONValue, nil
Expand All @@ -199,7 +199,7 @@ func (e *versionEncoder) rowAsGoNative(
return nil
})
if meta != nil {
keys = append(keys, jsonMetaSentinel)
keys = append(keys, metaSentinel)
}
b, err := json.NewFixedKeysObjectBuilder(keys)
if err != nil {
Expand All @@ -219,7 +219,7 @@ func (e *versionEncoder) rowAsGoNative(
}

if meta != nil {
if err := e.valueBuilder.Set(jsonMetaSentinel, meta); err != nil {
if err := e.valueBuilder.Set(metaSentinel, meta); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -404,7 +404,7 @@ func (e *jsonEncoder) EncodeResolvedTimestamp(
jsonEntries = meta
} else {
jsonEntries = map[string]interface{}{
jsonMetaSentinel: meta,
metaSentinel: meta,
}
}
return gojson.Marshal(jsonEntries)
Expand Down
14 changes: 9 additions & 5 deletions pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,12 @@ func expectResolvedTimestamp(t testing.TB, f cdctest.TestFeed) (hlc.Timestamp, s
return extractResolvedTimestamp(t, m), m.Partition
}

// resolvedRaw represents a JSON object containing the single key "resolved"
// with a resolved timestamp value.
type resolvedRaw struct {
Resolved string `json:"resolved"`
}

func extractResolvedTimestamp(t testing.TB, m *cdctest.TestFeedMessage) hlc.Timestamp {
t.Helper()
if m.Key != nil {
Expand All @@ -334,14 +340,12 @@ func extractResolvedTimestamp(t testing.TB, m *cdctest.TestFeedMessage) hlc.Time
t.Fatal(`expected a resolved timestamp notification`)
}

var resolvedRaw struct {
Resolved string `json:"resolved"`
}
if err := gojson.Unmarshal(m.Resolved, &resolvedRaw); err != nil {
var resolved resolvedRaw
if err := gojson.Unmarshal(m.Resolved, &resolved); err != nil {
t.Fatal(err)
}

return parseTimeToHLC(t, resolvedRaw.Resolved)
return parseTimeToHLC(t, resolved.Resolved)
}

func expectResolvedTimestampAvro(t testing.TB, f cdctest.TestFeed) hlc.Timestamp {
Expand Down
28 changes: 19 additions & 9 deletions pkg/ccl/changefeedccl/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,12 @@ func newParquetWriterFromRow(
return nil, err
}

writerConstructor := parquet.NewWriter

if knobs.EnableParquetMetadata {
if knobs != nil && knobs.EnableParquetMetadata {
if opts, err = addParquetTestMetadata(row, opts); err != nil {
return nil, err
}

// To use parquet test utils for reading datums, the writer needs to be
// configured with additional metadata.
writerConstructor = parquet.NewWriterWithReaderMeta
}

writer, err := writerConstructor(schemaDef, sink, opts...)
writer, err := newParquetWriter(schemaDef, sink, knobs, opts...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -216,3 +209,20 @@ func deserializeMap(s string) (orderedKeys []string, m map[string]int, err error
}
return orderedKeys, m, nil
}

// newParquetWriter allocates a new parquet writer using the provided
// schema definition.
func newParquetWriter(
sch *parquet.SchemaDefinition,
sink io.Writer,
knobs *TestingKnobs, /* may be nil */
opts ...parquet.Option,
) (*parquet.Writer, error) {
if knobs != nil && knobs.EnableParquetMetadata {
// To use parquet test utils for reading datums, the writer needs to be
// configured with additional metadata.
return parquet.NewWriterWithReaderMeta(sch, sink, opts...)
}

return parquet.NewWriter(sch, sink, opts...)
}
54 changes: 50 additions & 4 deletions pkg/ccl/changefeedccl/parquet_sink_cloudstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,27 @@
package changefeedccl

import (
"bytes"
"context"
"fmt"
"path/filepath"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent"
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/parquet"
"github.com/cockroachdb/errors"
)

// This is an extra column that will be added to every parquet file which tells
// us about the type of event that generated a particular row. The types are
// defined below.
const parquetCrdbEventTypeColName string = "__crdb_event_type__"
const parquetCrdbEventTypeColName string = "__crdb__event_type__"

type parquetEventType int

Expand Down Expand Up @@ -117,9 +124,48 @@ func (parquetSink *parquetCloudStorageSink) Dial() error {
// EmitResolvedTimestamp does not do anything as of now. It is there to
// implement Sink interface.
func (parquetSink *parquetCloudStorageSink) EmitResolvedTimestamp(
ctx context.Context, encoder Encoder, resolved hlc.Timestamp,
) error {
return errors.AssertionFailedf("Parquet format does not support emitting resolved timestamp")
ctx context.Context, _ Encoder, resolved hlc.Timestamp,
) (err error) {
// TODO: There should be a better way to check if the sink is closed.
// This is copied from the wrapped sink's EmitResolvedTimestamp()
// method.
if parquetSink.wrapped.files == nil {
return errors.New(`cannot EmitRow on a closed sink`)
}

defer parquetSink.wrapped.metrics.recordResolvedCallback()()

if err := parquetSink.wrapped.waitAsyncFlush(ctx); err != nil {
return errors.Wrapf(err, "while emitting resolved timestamp")
}

var buf bytes.Buffer
sch, err := parquet.NewSchema([]string{metaSentinel + "resolved"}, []*types.T{types.Decimal})
if err != nil {
return err
}

// TODO: Ideally, we do not create a new schema and writer every time
// we emit a resolved timestamp. Currently, util/parquet does not support it.
writer, err := newParquetWriter(sch, &buf, parquetSink.wrapped.testingKnobs)
if err != nil {
return err
}

if err := writer.AddRow([]tree.Datum{eval.TimestampToDecimalDatum(resolved)}); err != nil {
return err
}

if err := writer.Close(); err != nil {
return err
}

part := resolved.GoTime().Format(parquetSink.wrapped.partitionFormat)
filename := fmt.Sprintf(`%s.RESOLVED`, cloudStorageFormatTime(resolved))
if log.V(1) {
log.Infof(ctx, "writing file %s %s", filename, resolved.AsOfSystemTime())
}
return cloud.WriteFile(ctx, parquetSink.wrapped.es, filepath.Join(part, filename), &buf)
}

// Flush implements the Sink interface.
Expand Down
28 changes: 28 additions & 0 deletions pkg/ccl/changefeedccl/parquet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/parquet"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -162,3 +164,29 @@ func makeRangefeedReaderAndDecoder(
require.NoError(t, err)
return popRow, cleanup, decoder
}

func TestParquetResolvedTimestamps(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)

foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH format=parquet, resolved='10ms'`)
defer closeFeed(t, foo)

firstResolved, _ := expectResolvedTimestamp(t, foo)
testutils.SucceedsSoon(t, func() error {
nextResolved, _ := expectResolvedTimestamp(t, foo)
if !firstResolved.Less(nextResolved) {
return errors.AssertionFailedf(
"expected resolved timestamp %s to eventually exceed timestamp %s",
nextResolved, firstResolved)
}
return nil
})
}

cdcTest(t, testFn, feedTestForceSink("cloudstorage"))
}
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/sink_cloudstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,7 @@ func (s *cloudStorageSink) EmitRow(
func (s *cloudStorageSink) EmitResolvedTimestamp(
ctx context.Context, encoder Encoder, resolved hlc.Timestamp,
) error {
// TODO: There should be a better way to check if the sink is closed.
if s.files == nil {
return errors.New(`cannot EmitRow on a closed sink`)
}
Expand Down
64 changes: 51 additions & 13 deletions pkg/ccl/changefeedccl/testfeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/distsql"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -1052,7 +1053,11 @@ func (f *cloudFeedFactory) Feed(
// Determine if we can enable the parquet format if the changefeed is not
// being created with incompatible options. If it can be enabled, we will use
// parquet format with a probability of 0.4.
parquetPossible := true
//
// TODO: Consider making this knob a global flag so tests that don't
// initialize testing knobs can use parquet metamorphically.
knobs := f.s.TestingKnobs().DistSQL.(*execinfra.TestingKnobs).Changefeed
parquetPossible := knobs != nil && knobs.(*TestingKnobs).EnableParquetMetadata
explicitEnvelope := false
for _, opt := range createStmt.Options {
if string(opt.Key) == changefeedbase.OptEnvelope {
Expand Down Expand Up @@ -1175,19 +1180,19 @@ func extractFieldFromJSONValue(

if isBare {
meta := make(map[string]gojson.RawMessage)
if metaVal, haveMeta := parsed[jsonMetaSentinel]; haveMeta {
if metaVal, haveMeta := parsed[metaSentinel]; haveMeta {
if err := gojson.Unmarshal(metaVal, &meta); err != nil {
return nil, nil, errors.Wrapf(err, "unmarshalling json %v", metaVal)
}
field = meta[fieldName]
delete(meta, fieldName)
if len(meta) == 0 {
delete(parsed, jsonMetaSentinel)
delete(parsed, metaSentinel)
} else {
if metaVal, err = reformatJSON(meta); err != nil {
return nil, nil, err
}
parsed[jsonMetaSentinel] = metaVal
parsed[metaSentinel] = metaVal
}
}
} else {
Expand Down Expand Up @@ -1340,6 +1345,29 @@ func (c *cloudFeed) appendParquetTestFeedMessages(
return nil
}

// readParquetResolvedPayload reads a resolved timestamp value from the
// specified parquet file and returns it encoded as JSON.
func (c *cloudFeed) readParquetResolvedPayload(path string) ([]byte, error) {
meta, datums, err := parquet.ReadFile(path)
if err != nil {
return nil, err
}

if meta.NumRows != 1 || meta.NumCols != 1 {
return nil, errors.AssertionFailedf("expected one row with one col containing the resolved timestamp")
}

resolvedDatum := datums[0][0]

resolved := resolvedRaw{Resolved: resolvedDatum.String()}
resolvedBytes, err := gojson.Marshal(resolved)
if err != nil {
return nil, err
}

return resolvedBytes, nil
}

// Next implements the TestFeed interface.
func (c *cloudFeed) Next() (*cdctest.TestFeedMessage, error) {
for {
Expand Down Expand Up @@ -1413,11 +1441,27 @@ func (c *cloudFeed) walkDir(path string, info os.FileInfo, err error) error {
return nil
}

details, err := c.Details()
if err != nil {
return err
}
format := changefeedbase.FormatType(details.Opts[changefeedbase.OptFormat])

if strings.HasSuffix(path, `RESOLVED`) {
resolvedPayload, err := os.ReadFile(path)
if err != nil {
return err
var resolvedPayload []byte
var err error
if format == changefeedbase.OptFormatParquet {
resolvedPayload, err = c.readParquetResolvedPayload(path)
if err != nil {
return err
}
} else {
resolvedPayload, err = os.ReadFile(path)
if err != nil {
return err
}
}

resolvedEntry := &cdctest.TestFeedMessage{Resolved: resolvedPayload}
c.rows = append(c.rows, resolvedEntry)
c.resolved = path
Expand All @@ -1439,12 +1483,6 @@ func (c *cloudFeed) walkDir(path string, info os.FileInfo, err error) error {
return err
}
defer f.Close()
details, err := c.Details()
if err != nil {
return err
}

format := changefeedbase.FormatType(details.Opts[changefeedbase.OptFormat])

if format == changefeedbase.OptFormatParquet {
envelopeType := changefeedbase.EnvelopeType(details.Opts[changefeedbase.OptEnvelope])
Expand Down