Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-23.1: cdc: support the parquet format in changefeeds #105287

Conversation

jayshrivastava
Copy link
Contributor

@jayshrivastava jayshrivastava commented Jun 21, 2023

Backport 3/3 commits from #99288.

/cc @cockroachdb/release

Release justification: This changes adds a new feature, running changefeeds with the parquet format, which is addative only and opt-in for users.


cdc: add apache arrow parquet library

This commit installs the apache arrow parquet library for Go
at version 11. The release can be found here:
https://github.com/apache/arrow/releases/tag/go%2Fv11.0.0

This library is licensed under the Apache License 2.0.

Informs: #99028
Epic: None
Release note: None


util/parquet: create parquet writer library

This change implements a Writer struct in the new util/parquet package.
This Writer writes datums to the io.Writer sink
using a configurable parquet version (defaults to v2.6).

The package implements several features internally required to write in the parquet format:

  • schema creation
  • row group / column page management
  • encoding/decoding of CRDB datums to parquet datums
    Currently, the writer only supports types found in the TPCC workload, namely INT, DECIMAL, STRING
    UUID, TIMESTAMP and BOOL.

This change also adds a benchmark and tests which verify the correctness of the
writer and test utils for reading datums from parquet files.

Informs: #99028
Epic: None
Release note: None


changefeedccl: add parquet writer

This change adds the file parquet.go which contains
helper functions to help create parquet writers
and export data via cdcevent.Row structs.

This change also adds tests to ensure rows are written
to parquet files correctly.

Epic: None
Release note: None


util/parquet: add support for arrays
This change extends and refactors the util/parquet library to
be able to read and write arrays.

Release note: None

Informs: #99028
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-15071


util/parquet: add support for all types
This change adds support for the following types families to the util/parquet
library:
types.INetFamily, types.JsonFamily, types.FloatFamily, types.BytesFamily,
types.BitFamily, types.EnumFamily, types.Box2DFamily, types.GeographyFamily,
types.GeometryFamily, types.DateFamily, types.TimeFamily, types.TimeTZFamily,
case types.IntervalFamily, types.TimestampTZFamily.

Release note: None

Informs: #99028
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-15071


util/parquet: add compression options
This change updates the parquet writer to be able to use
GZIP, ZSTD, SNAPPY, and BROTLI compression codecs. By
default, no compression is used. LZO and LZ4 are unsupported
by the library.

Epic: https://cockroachlabs.atlassian.net/browse/CRDB-15071
Informs: #99028
Release note: None


util/parquet: add option to write kv metadata to files
This change adds an option to the writer which allows the caller
to write arbitrary kv metadata to parquet files. This is useful
for testing purposes.

Informs: #99028
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-15071
Release note: None


util/parquet: remove dependency on writer to read parquet files
Previously, the test utils used to read parquet files
would require the writer as an argument. The main reason
the writer was required is that the writer contained
crdb-specific type information which could be used to
decode raw data until crdb datums.

With this change, the writer is updated to write this
crdb-specific type information to the parquet file in
its metadata. The reader is updated to the read type
information from the file metadata. There is a new
test utility function ReadFile(parquetFile string)
which can be used to read all datums from a parquet
file without providing any additional type information.
The function also returns metadata since it is possible
for users of the Writer to write arbitrary metadata
and such users may need this metadata in testing.

Informs: #99028
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-15071
Release note: None


changefeedccl: use new parquet library
This change updates changefeeds to use the new parquet library
added in pkg/util/parquet when using format=parquet.

Informs: #99028
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-15071
Release note: None


util/parquet: refactor random testing types
This change refactors randomized testing to use randgen.RandType.
randgen.RandType is better as it takes into account all allowable
types which can appear in CRDB (ex. array of tuple). The previous
code only generated random types which are supported by the writer
which leaves a gap when new types are added. Now, the code defaults
to all types and filters out unsupported ones.

The previous code also unnessarily duplicates code from randgen.
For example, generating a random tuple can be done by calling one
method in randgen. Generating a random tuple using the
previous code would require more complex work.

Informs: #99028
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-15071
Release note: None


util/parquet: support tuples
This change adds support for writing tuples. Implementation details below.

The standard way to write a tuple in parquet is to use a group:

message schema {                 -- toplevel schema
   optional group a (LIST) {
       optional T1 element;       -- physical column for the first field
       ...
       optional Tn element;       -- physical column for the nth field
   }
}

Because parquet has a very strict format, it does not write such groups
as one column with all the fields adjacent to each other. Instead, it
writes each field in the tuple as its own column. This 1:N mapping
from CRDB datum to physical column in parquet violates the assumption
used in this library that the mapping is 1:1.

This change aims to update the library to break that assumption. Firstly,
there is now a clear distiction between a "datum column" and a "physical
column". Also, the Writer is updated to be able to write to multiple
physical columns for a given datum, and the reader is updated
to "squash" physical columns into single tuple datums if needed. Finally,
randomized testing and benchmarking is extended to cover tuples.

Informs: #99028
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-15071
Release note: None


changefeedccl: refactor parquet column iterators
Previously, some tests would fail due to invalid conversion
from parquet to JSON in testing. These failures showed two
underlying issues with how columns were written:

  • in tests watching a column family not containing the primary
    key of the table, we would not write the primary key to
    the parquet file
  • in tests where the primary key was defined with a particular
    order which is different than the table, the parquet testing
    code would not order the keys correctly

This change fixes the two above issues by (a) using the correct
iterators on cdcevent.Row and writing more verbose
metadata to the parquet file to use in testing.

Epic: None
Release note: None


changefeedccl: support the resolved option with format=parquet
Previously, format=parquet and resolved could not be used
together when running changefeeds. This change adds support for
this.

The release note is being left intentionally blank for a future
commit.

Informs: #103129
Release note: None


changefeedccl: use buildutil.CrdbTestBuild for parquet testing
Parquet testing requires that extra metadata be written to parquet files
so tests can create CDC rows from the raw data. Previously, the production
parquet code relied on a testing knob to be passed to write this extra
metadata. This is problematic as not all tests would pass the testing knob,
making it so that we could not randomly use parquet in those tests for
metamorphic testing purposes. With this change, the parquet production code
uses buildutil.CrdbTestBuild, which is a global flag enabled in tests.
Now, metamorphic parquet testing can be applied to more tests.

Epic: None
Release note: None


changefeedccl: support the updated and mvcc timestamp options with parquet format

Previously, using the parquet format in changefeeds did not support
using the mvcc or updated options. This change adds support for using
these options with parquet.

Epic: None
Release note: None


util/parquet: copy bytes from fmtCtx when writing byte arrays
While running CI, it was discovered that there is a data race when
writing bytes with the fmtCtx in this pkg. The reason for the
data race is that when a byte array is written to the underlying
library, it creates a shallow copy and buffers it to be flushed
later. While its being buffered, the Writer in this package closes
the fmtCtx and returns it to the pool. Then, any code running
concurrently may access the buffer in the fmtCtx.
The data race can be seen here. To fix this, we now create a copy of the byte array when using fmtCtx before passing it to the underlying library.

In this particular failure, the problem is in page statistics.
The underlying library will buffer the min and max byte array datums
until it needs to flush stats to the output file. See here.

It's worth noting that some encoders will copy byte slices before
buffering them and some will not. For example the plain encoder
copies slices (see UnsafeWrite) and the dict encoder does not (see here and appendVal). If the problem with statistics
above were to be solved, then we could consider using a non-dictionary encoding when writing byte slices. For now, we cannot.

As expected, this change causes more alloctions when writing datums:
Before
BenchmarkParquetWriter/bytes-10 162480 7136 ns/op 1913 B/op 64 allocs/op
After
BenchmarkParquetWriter/bytes-10 181750 6422 ns/op 1148 B/op 32 allocs/op

The affected types are bit, box2d, date, decimal, inet, json, interval, timestamp, timestamptz and timetz.

Informs: #99028
Epic: CRDB-27372
Release note: None


changefeedccl: support key_in_value with parquet format
Previously, the option key_in_value was disallowed with
format=parquet. This change allows these settings to be
used together. Note that key_in_value is enabled by
default with cloudstorage sinks and format=parquet is
only allowed with cloudstorage sinks, so key_in_value is
enabled for parquet by default.

Informs: #103129
Informs: #99028
Epic: CRDB-27372
Release note: None


changefeedccl: add test coverage for parquet event types
When using format=parquet, an additional column is produced to
indicate the type of operation corresponding to the row: create,
update, or delete. This change adds coverage for this in unit
testing.

Additionally, the test modified in this change is made more simple
by reducing the number of rows and different types because this
complexity is unnecessary as all types are tested within the
util/parquet package already.

Informs: #99028
Epic: CRDB-27372
Release note: None
Epic: None


util/parquet: support tuple labels in util/parquet testutils
Previously, the test utilities in util/parquet would not reconstruct
tuples read from files with their labels. This change updates the
package to do so. This is required for testing in users of this
package such as CDC.

Informs: #99028
Epic: CRDB-27372
Release note: None


changefeedccl: support diff option with parquet format
This change adds support for the diff changefeed
options when using format=parquet. Enabling diff also adds
support for CDC Transformations with parquet.

Informs: #103129
Informs: #99028
Epic: CRDB-27372
Release note: None


changefeedccl: support end_time option with parquet format
This change adds support for the end_time changefeed
options when using format=parquet. No significant code
changes were needed to enable this feature.

Closes: #103129
Closes: #99028
Epic: CRDB-27372
Release note (enterprise change): Changefeeds now officially
support the parquet format at specificiation version 2.6.
It is only usable with the cloudstorage sink.

The syntax to use parquet is like the following:
CREATE CHANGEFEED FOR foo INTO ... WITH format=parquet

It supports all standard changefeed options and features
including CDC transformations, except it does not support the
topic_in_value option.


changefeedccl: use parquet with 50% probability in nemeses test
Informs: #99028
Epic: CRDB-27372
Release note: None


changefeedccl: tweak structs in TestParquetRows to appease linter
Me: shakes fist at linter. "Damn you, linter! Damn you!"

Epic: None
Release note: None

This commit installs the apache arrow parquet library for Go
at version 11. The release can be found here:
https://github.com/apache/arrow/releases/tag/go%2Fv11.0.0

This library is licensed under the Apache License 2.0.

Informs: cockroachdb#99028
Epic: None
Release note: None
This change implements a `Writer` struct in the new `util/parquet` package.
This `Writer` writes datums the `io.Writer` sink
using a configurable parquet version (defaults to v2.6).

The package implements several features internally required to write in the parquet format:
- schema creation
- row group / column page management
- encoding/decoding of CRDB datums to parquet datums
Currently, the writer only supports types found in the TPCC workload, namely INT, DECIMAL, STRING
UUID, TIMESTAMP and BOOL.

This change also adds a benchmark and tests which verify the correctness of the
writer and test utils for reading datums from parquet files.

Informs: cockroachdb#99028
Epic: None
Release note: None
@blathers-crl
Copy link

blathers-crl bot commented Jun 21, 2023

Thanks for opening a backport.

Please check the backport criteria before merging:

  • Patches should only be created for serious issues or test-only changes.
  • Patches should not break backwards-compatibility.
  • Patches should change as little code as possible.
  • Patches should not change on-disk formats or node communication protocols.
  • Patches should not add new functionality.
  • Patches must not add, edit, or otherwise modify cluster versions; or add version gates.
If some of the basic criteria cannot be satisfied, ensure that the exceptional criteria are satisfied within.
  • There is a high priority need for the functionality that cannot wait until the next release and is difficult to address in another way.
  • The new functionality is additive-only and only runs for clusters which have specifically “opted in” to it (e.g. by a cluster setting).
  • New code is protected by a conditional check that is trivial to verify and ensures that it only runs for opt-in clusters.
  • The PM and TL on the team that owns the changed code have signed off that the change obeys the above rules.

Add a brief release justification to the body of your PR to justify this backport.

Some other things to consider:

  • What did we do to ensure that a user that doesn’t know & care about this backport, has no idea that it happened?
  • Will this work in a cluster of mixed patch versions? Did we test that?
  • If a user upgrades a patch version, uses this feature, and then downgrades, what happens?

@cockroach-teamcity
Copy link
Member

This change is Reviewable

@jayshrivastava jayshrivastava changed the title release-23.1: cdc: add apache arrow parquet library and writer release-23.1: cdc: support the parquet format in changefeeds Jun 21, 2023
This change adds the file `parquet.go` which contains
helper functions to help create parquet writers
and export data via `cdcevent.Row` structs.

This change also adds tests to ensure rows are written
to parquet files correctly.

Epic: None
Release note: None
This change extends and refactors the util/parquet library to
be able to read and write arrays.

Release note: None

Informs: cockroachdb#99028
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-15071
This change adds support for the following types families to the `util/parquet`
library:
types.INetFamily, types.JsonFamily, types.FloatFamily, types.BytesFamily,
types.BitFamily, types.EnumFamily, types.Box2DFamily, types.GeographyFamily,
types.GeometryFamily, types.DateFamily, types.TimeFamily, types.TimeTZFamily,
case types.IntervalFamily, types.TimestampTZFamily.

Release note: None

Informs: cockroachdb#99028
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-15071
This change updates the parquet writer to be able to use
GZIP, ZSTD, SNAPPY, and BROTLI compression codecs. By
default, no compression is used. LZO and LZ4 are unsupported
by the library.

Epic: https://cockroachlabs.atlassian.net/browse/CRDB-15071
Informs: cockroachdb#99028
Release note: None
This change adds an option to the writer which allows the caller
to write arbitrary kv metadata to parquet files. This is useful
for testing purposes.

Informs: cockroachdb#99028
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-15071
Release note: None
Previously, the test utils used to read parquet files
would require the writer as an argument. The main reason
the writer was required is that the writer contained
crdb-specific type information which could be used to
decode raw data until crdb datums.

With this change, the writer is updated to write this
crdb-specific type information to the parquet file in
its metadata. The reader is updated to the read type
information from the file metadata. There is a new
test utility function `ReadFile(parquetFile string)`
which can be used to read all datums from a parquet
file without providing any additional type information.
The function also returns metadata since it is possible
for users of the `Writer` to write arbitrary metadata
and such users may need this metadata in testing.

Informs: cockroachdb#99028
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-15071
Release note: None
This change updates changefeeds to use the new parquet library
added in `pkg/util/parquet` when using `format=parquet`.

Informs: cockroachdb#99028
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-15071
Release note: None
This change refactors randomized testing to use `randgen.RandType`.
`randgen.RandType` is better as it takes into account all allowable
types which can appear in CRDB (ex. array of tuple). The previous
code only generated random types which are supported by the writer
which leaves a gap when new types are added. Now, the code defaults
to all types and filters out unsupported ones.

The previous code also unnessarily duplicates code from `randgen`.
For example, generating a random tuple can be done by calling one
method in `randgen`. Generating a random tuple using the
previous code would require more complex work.

Informs: cockroachdb#99028
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-15071
Release note: None
This change adds support for writing tuples. Implementation details below.

The standard way to write a tuple in parquet is to use a group:
```
message schema {                 -- toplevel schema
   optional group a (LIST) {
       optional T1 element;       -- physical column for the first field
       ...
       optional Tn element;       -- physical column for the nth field
   }
}
```

Because parquet has a very strict format, it does not write such groups
as one column with all the fields adjacent to each other. Instead, it
writes each field in the tuple as its own column. This 1:N mapping
from CRDB datum to physical column in parquet violates the assumption
used in this library that the mapping is 1:1.

This change aims to update the library to break that assumption. Firstly,
there is now a clear distiction between a "datum column" and a "physical
column". Also, the `Writer` is updated to be able to write to multiple
physical columns for a given datum, and the reader is updated
to "squash" physical columns into single tuple datums if needed. Finally,
randomized testing and benchmarking is extended to cover tuples.

Informs: cockroachdb#99028
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-15071
Release note: None
Previously, some tests would fail due to invalid conversion
from parquet to JSON in testing. These failures showed two
underlying issues with how columns were written:
- in tests watching a column family not containing the primary
  key of the table, we would not write the primary key to
  the parquet file
- in tests where the primary key was defined with a particular
  order which is different than the table, the parquet testing
  code would not order the keys correctly

This change fixes the two above issues by (a) using the correct
iterators on `cdcevent.Row` and writing more verbose
metadata to the parquet file to use in testing.

Epic: None
Release note: None
Previously, `format=parquet` and `resolved` could not be used
together when running changefeeds. This change adds support for
this.

The release note is being left intentionally blank for a future
commit.

Informs: cockroachdb#103129
Release note: None
Parquet testing requires that extra metadata be written to parquet files
so tests can create CDC rows from the raw data. Previously, the production
parquet code relied on a testing knob to be passed to write this extra
metadata. This is problematic as not all tests would pass the testing knob,
making it so that we could not randomly use parquet in those tests for
metamorphic testing purposes. With this change, the parquet production code
uses `buildutil.CrdbTestBuild`, which is a global flag enabled in tests.
Now, metamorphic parquet testing can be applied to more tests.

Epic: None
Release note: None
…rquet format

Previously, using the parquet format in changefeeds did not support
using the `mvcc` or `updated` options. This change adds support for using
these options with parquet.

Epic: None
Release note: None
While running CI, it was discovered that there is a data race when
writing bytes with the fmtCtx in this pkg. The reason for the
data race is that when a byte array is written to the underlying
library, it creates a shallow copy and buffers it to be flushed
later. While its being buffered, the `Writer` in this package closes
the fmtCtx and returns it to the pool. Then, any code running
concurrently may access the buffer in the fmtCtx.
The data race can be seen [here](https://teamcity.cockroachdb.com/buildConfiguration/Cockroach_BazelExtendedCi/10458378?showRootCauses=true&expandBuildChangesSection=true&expandBuildProblemsSection=true&expandBuildTestsSection=true). To fix this, we now create a copy of the byte array when using fmtCtx before passing it to the underlying library.

In this particular failure, the problem is in page statistics.
The underlying library will buffer the min and max byte array datums
until it needs to flush stats to the output file. See [here](https://github.com/apache/arrow/blob/6808bfe3cdf5623212e575c1ec1083e194ed580c/go/parquet/file/column_writer_types.gen.go#L1428).

It's worth noting that some encoders will copy byte slices before
buffering them and some will not. For example the plain encoder
copies slices (see [UnsafeWrite](https://github.com/apache/arrow/blob/6808bfe3cdf5623212e575c1ec1083e194ed580c/go/parquet/internal/encoding/byte_array_encoder.go#LL39C1-L46C2)) and the dict encoder does not (see [here](https://github.com/apache/arrow/blob/6808bfe3cdf5623212e575c1ec1083e194ed580c/go/parquet/internal/encoding/byte_array_encoder.go#L98) and [appendVal](https://github.com/apache/arrow/blob/6808bfe3cdf5623212e575c1ec1083e194ed580c/go/internal/hashing/xxh3_memo_table.go#L248)). If the problem with statistics
above were to be solved, then we could consider using a non-dictionary encoding when writing byte slices. For now, we cannot.

As expected, this change causes more alloctions when writing datums:
Before
BenchmarkParquetWriter/bytes-10         	  162480	      7136 ns/op	    1913 B/op	      64 allocs/op
After
BenchmarkParquetWriter/bytes-10         	  181750	      6422 ns/op	    1148 B/op	      32 allocs/op

The affected types are bit, box2d, date, decimal, inet, json, interval, timestamp, timestamptz and timetz.

Informs: cockroachdb#99028
Epic: CRDB-27372
Release note: None
Previously, the option `key_in_value` was disallowed with
`format=parquet`. This change allows these settings to be
used together. Note that `key_in_value` is enabled by
default with `cloudstorage` sinks and `format=parquet` is
only allowed with cloudstorage sinks, so `key_in_value` is
enabled for parquet by default.

Informs: cockroachdb#103129
Informs: cockroachdb#99028
Epic: CRDB-27372
Release note: None
When using `format=parquet`, an additional column is produced to
indicate the type of operation corresponding to the row: create,
update, or delete. This change adds coverage for this in unit
testing.

Additionally, the test modified in this change is made more simple
by reducing the number of rows and different types because this
complexity is unnecessary as all types are tested within the
util/parquet package already.

Informs: cockroachdb#99028
Epic: CRDB-27372
Release note: None
Epic: None
Previously, the test utilities in `util/parquet` would not reconstruct
tuples read from files with their labels. This change updates the
package to do so. This is required for testing in users of this
package such as CDC.

Informs: cockroachdb#99028
Epic: CRDB-27372
Release note: None
This change adds support for the `diff` changefeed
options when using `format=parquet`. Enabling `diff` also adds
support for CDC Transformations with parquet.

Informs: cockroachdb#103129
Informs: cockroachdb#99028
Epic: CRDB-27372
Release note: None
This change adds support for the `end_time` changefeed
options when using `format=parquet`. No significant code
changes were needed to enable this feature.

Closes: cockroachdb#103129
Closes: cockroachdb#99028
Epic: CRDB-27372
Release note (enterprise change): Changefeeds now officially
support the parquet format at specificiation version 2.6.
It is only usable with the cloudstorage sink.

The syntax to use parquet is like the following:
`CREATE CHANGEFEED FOR foo INTO `...` WITH format=parquet`

It supports all standard changefeed options and features
including CDC transformations, except it does not support the
`topic_in_value` option.
@jayshrivastava jayshrivastava requested a review from miretskiy June 21, 2023 18:18
@jayshrivastava jayshrivastava marked this pull request as ready for review June 21, 2023 18:19
@jayshrivastava jayshrivastava requested review from a team as code owners June 21, 2023 18:19
@jayshrivastava jayshrivastava requested a review from a team as a code owner June 21, 2023 18:19
Me: shakes fist at linter. "Damn you, linter! Damn you!"

Epic: None
Release note: None
@yuzefovich
Copy link
Member

This is now too stale

@yuzefovich yuzefovich closed this Jun 11, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants