-
Notifications
You must be signed in to change notification settings - Fork 3.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
cdc: add apache arrow parquet library and writer #99288
Conversation
e6caf6e
to
af6cc60
Compare
af6cc60
to
062bca4
Compare
a10630a
to
cffa798
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lots of comments, but most are nits. I think this is an excellent start.
Reviewed 4 of 5 files at r1, 3 of 5 files at r2, all commit messages.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @jayshrivastava and @samiskin)
-- commits
line 26 at r2:
is the version mention important?
pkg/ccl/changefeedccl/parquet.go
line 31 at r2 (raw file):
// MaxParquetRowGroupSize is the maximal number of rows which can be written out to // a single row group in a parquet file. // TODO(jayant): customers may want to adjust the row group size based on rows instead of bytes
Meh... It's a bit of math.. I don't know if you need this todo.
pkg/ccl/changefeedccl/parquet.go
line 34 at r2 (raw file):
var MaxParquetRowGroupSize = settings.RegisterIntSetting( settings.TenantWritable, "changefeed.format.parquet.max_row_group_size",
Do we want this to be cluster setting? Customers already specify format=parquet option.... we can just have
a format specific option rows_per_group
or some such.
pkg/ccl/changefeedccl/parquet.go
line 75 at r2 (raw file):
// files. It should not be required for reading files since all the necessary // metadata to interpret columns will be written out to the file and read by a // reader.
❤️ this comment.
pkg/ccl/changefeedccl/parquet.go
line 94 at r2 (raw file):
func newParquetSchema(row cdcevent.Row) (*parquetSchemaDefinition, error) { fields := make([]schema.Node, 0) cols := make([]parquetColumn, 0)
you can probably allocate both of these as
cols := make([]parquetColumn, 0, len(row.ResultColumns()))
pkg/ccl/changefeedccl/parquet.go
line 125 at r2 (raw file):
}) groupNode, err := schema.NewGroupNode("schema", parquet.Repetitions.Required, fields, defaultParquetSchemaFieldID)
you probably don't need to build up fields as you're building up columns, do you?
You could just iterate columns and extract node.
pkg/ccl/changefeedccl/parquet.go
line 161 at r2 (raw file):
} opts := []parquet.WriterProperty{parquet.WithCreatedBy("cockroachdb"), parquet.WithMaxRowGroupLength(1)}
I know we have discussed the replacement of the existing parquet encoding library for export some time in the future.
And, I certainly do not expect this replacement to happen as part of this PR. I do think though, that we can make
this future replacement much easier by introducing a standalone parquet encoder/writer package that does not
have any dependency on CDC code.
There are multiple benefits to this approach:
- As mentioned above, easy to swap out export implementation
- Better defined API -- parquet library deals with taking cockroach specific types (tree.Datum) and producing parquet files.
- Better testing -- it's just a lot easier to test encoding/decoding by throwing bunch of tree.Datums and seeing what happens.
The final benefit is that I don't think it's such a tall order -- the code as it exists right now is already pretty clean and nice.
With that in mind, here are my concrete suggestions:
-
Create parquet directory where parquet writer and related stuff will live (under changefeeedccl for now). This entire package will eventually be moved
under sql or util package. -
Just like the underlying arrow library supports options (parquet.WithMaxRowGroupLength), your NewParquetWriter should also take options:
type config struct {
maxRowGroupLength int
}
type Option interface {
apply(*config)
}
Note: by moving this stuff under parquet package, you can clean up naming a lot -- by removing parquet
prefixes on many variables.
- You will need to define small
Schema
struct which just has a methodAdd(name, *types.T)
to add columns of specific type.
(the newParquetSchema will of course use the above schema object to convert cdcevent.Row -> Schema)
This function will take the Schema when constructing writer. - Similar change to the
AddData
method -- we don't want to take cdcevent.Row -- either taketree.Datums
or evenEncDatumRow
, or define an iterator like interface.
This file -- i.e. changefeed specific wrappers for parquet will still remain because we do want to AddRow, and add event type stuff, etc. But that's a higher level package
that will be used by changefeeds.
pkg/ccl/changefeedccl/parquet.go
line 171 at r2 (raw file):
} // nonNilDefLevel represents a def level of 1, meaning that the value is non-nil.
would be nice to add a bit more context as to what those def levels are.
Perhaps link https://github.com/apache/parquet-format/blob/master/README.md#nested-encoding
pkg/ccl/changefeedccl/parquet.go
line 178 at r2 (raw file):
// writeColBatch writes a value to the provided column chunk writer. func writeColBatch(colWriter file.ColumnChunkWriter, value interface{}) (int64, error) {
what's the int result? number of items written?
pkg/ccl/changefeedccl/parquet.go
line 191 at r2 (raw file):
return w.WriteBatch([]parquet.FixedLenByteArray{value.(parquet.FixedLenByteArray)}, nonNilDefLevel, nil) default: panic("unimplemented")
let's return assertion failed error instead of panic.
pkg/ccl/changefeedccl/parquet.go
line 208 at r2 (raw file):
return w.WriteBatch([]parquet.FixedLenByteArray{}, nilDefLevel, nil) default: panic("unimplemented")
s/panic/error/
pkg/ccl/changefeedccl/parquet.go
line 238 at r2 (raw file):
// AddData writes the updatedRow. There is no guarantee that the row will // immediately be flushed to the output sink. func (w *ParquetWriter) AddData(updatedRow cdcevent.Row, prevRow cdcevent.Row) error {
would it be better to pass in event type instead of the full prevRow?
pkg/ccl/changefeedccl/parquet.go
line 270 at r2 (raw file):
func getEventTypeDatum(updatedRow cdcevent.Row, prevRow cdcevent.Row) tree.Datum { eventTypeDatum := tree.NewDString(parquetEventInsert)
you probably want to define insert/update dstrings as variables here to avoid creating new dstrings for those.
Also, see setupContextForRow
in expr_eval where cdc expression computes op
type.
It's okay to repeat the code, but maybe there is a way not to... More importantly, see that function regarding
differentiating between update and insert -- which requires withDiff option.
pkg/ccl/changefeedccl/parquet.go
line 291 at r2 (raw file):
// By default, a column's repetitions are set to parquet.Repetitions.Optional, // which means that the column is nullable. func newParquetColumn(column cdcevent.ResultColumn) (parquetColumn, error) {
nit: perhaps makeParquetColumn is a better name (usually new returns *)
pkg/ccl/changefeedccl/parquet.go
line 310 at r2 (raw file):
case types.BoolFamily: result.node = schema.NewBooleanNode(colName, defaultRepetitions, defaultParquetSchemaFieldID) result.node.LogicalType()
is this a no-op call?
pkg/ccl/changefeedccl/parquet.go
line 394 at r2 (raw file):
var parquetBoolEncoder parquetEncodeFn = func(d tree.Datum) (interface{}, error) { return bool(*d.(*tree.DBool)), nil
let's be a lot more verbose. Let's make sure we return an error if d is not DBool.
When we AddData, we are adding tree.Datums -- those are interfaces, so it's quite possible to make a mistake
(that's why say EncDatumRow has EnsureDecoded to guaranteed that encoded datum can be decoded as target type).
Same for all of the below decoders.
(Another option: if this is only intended to be used in tests... you could move these to test function... and then
keep those type assertions. but: I do think having encoder/decoder support is good)
pkg/ccl/changefeedccl/parquet_test.go
line 122 at r2 (raw file):
}, }, } {
nice, high level tests. I think if the underlying parquet writer only concerned itself with datum -> parquet conversion, the
a lower level test, plus a lower level benchmark could be written using randgen.RandDatum to generate random datums for testing.
1d47bf5
to
9831f4d
Compare
98c6e10
to
f015156
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 17 of 22 files at r4, 1 of 1 files at r5.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @jayshrivastava and @samiskin)
pkg/util/parquet/writer.go
line 23 at r5 (raw file):
// Config stores configurable options for the Writer. type Config struct {
config does not need to be exported.
External callers should use WithXXX to construct config.
pkg/util/parquet/writer.go
line 35 at r5 (raw file):
} type option interface {
since option will be used by external callers, please export it.
apply does not need to be exported though.
pkg/util/parquet/writer.go
line 56 at r5 (raw file):
type WithVersion string func (v WithVersion) apply(c *Config) error {
it's an interesting way to define options... I guess it's fine, though it's quite common to have something like:
type funcOpt func(c *Config) error
func (f funcOpt) apply(c *Config) error {
return f(c)
}
func WithVersion(v string) option {
return func(c *Config) error {
....
}
}
pkg/util/parquet/writer.go
line 156 at r5 (raw file):
w.currentRowGroupWriter = w.writer.AppendBufferedRowGroup() w.currentRowGroupSize = 0 }
I was thinking about how to better integrate this AddData method with the existing iterator in cdcevent.
One way is to .. change cdcevent iterator somehow...
Here is another idea: instead of AddData(), we do something like:
type Row struct {
w *Writer
}
func (r Row) SetCol(idx int, tree.Datum) error {}
func (r Row) Close() error {
w.currentRowGroupSize += 1
if err := w.currentRowGroupWriter.Close(); err != nil {
return err
}
w.currentRowGroupWriter = w.writer.AppendBufferedRowGroup()
w.currentRowGroupSize = 0
return nil
}
func (w *Writer) AddRow() (Row, func()) {
if w.currentRowGroupWriter == nil {
w.currentRowGroupWriter = w.writer.AppendBufferedRowGroup()
}
return Row{w: w}
}
pkg/util/parquet/writer_bench_test.go
line 16 at r4 (raw file):
Previously, jayshrivastava (Jayant) wrote…
Here's the benchmark output.
BenchmarkParquetWriter-10 3896122 3112 ns/op 2958 B/op 48 allocs/op
Per our discussion, you have a plan on how to reduce allocs.
Use unsafe string to get bytes from datum; and use idea similar to
tree.DatumAlloc to remove allocations for single element batch arrays.
pkg/util/parquet/writer_bench_test.go
line 35 at r5 (raw file):
for i := 0; i < numCols; i++ { sch.columnTypes[i] = types.String sch.columnNames[i] = fmt.Sprintf("col%d", i)
Do we want to test different types? I suspect we do, and I also suspect this isn't done yet because we don't support all types.
Perhaps allocate few types you already support. Or... leave a todo.
716083d
to
4ab442b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @miretskiy and @samiskin)
pkg/util/parquet/writer.go
line 23 at r5 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
config does not need to be exported.
External callers should use WithXXX to construct config.
Done.
pkg/util/parquet/writer.go
line 35 at r5 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
since option will be used by external callers, please export it.
apply does not need to be exported though.
Done.
pkg/util/parquet/writer.go
line 56 at r5 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
it's an interesting way to define options... I guess it's fine, though it's quite common to have something like:
type funcOpt func(c *Config) error func (f funcOpt) apply(c *Config) error { return f(c) } func WithVersion(v string) option { return func(c *Config) error { .... } }
Done.
pkg/util/parquet/writer.go
line 156 at r5 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
I was thinking about how to better integrate this AddData method with the existing iterator in cdcevent.
One way is to .. change cdcevent iterator somehow...Here is another idea: instead of AddData(), we do something like:
type Row struct { w *Writer } func (r Row) SetCol(idx int, tree.Datum) error {} func (r Row) Close() error { w.currentRowGroupSize += 1 if err := w.currentRowGroupWriter.Close(); err != nil { return err } w.currentRowGroupWriter = w.writer.AppendBufferedRowGroup() w.currentRowGroupSize = 0 return nil } func (w *Writer) AddRow() (Row, func()) { if w.currentRowGroupWriter == nil { w.currentRowGroupWriter = w.writer.AppendBufferedRowGroup() } return Row{w: w} }
Done. I added many assertions and tests that break them. I ended up using a []uint64 bitmap because reseting the util.FastIntMap
would require reallocating it... I think. Using an []uint64 instead of uint64 lets us support more than 64 cols.
pkg/util/parquet/writer_bench_test.go
line 16 at r4 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
Per our discussion, you have a plan on how to reduce allocs.
Use unsafe string to get bytes from datum; and use idea similar to
tree.DatumAlloc to remove allocations for single element batch arrays.
Done.
pkg/util/parquet/writer_bench_test.go
line 35 at r5 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
Do we want to test different types? I suspect we do, and I also suspect this isn't done yet because we don't support all types.
Perhaps allocate few types you already support. Or... leave a todo.
Added a TODO.
bd19035
to
b824800
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 9 files at r7.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @jayshrivastava and @samiskin)
pkg/ccl/changefeedccl/parquet.go
line 54 at r7 (raw file):
func AddData( writer *parquet.Writer, updatedRow cdcevent.Row, prevRow cdcevent.Row, returnDatums bool, ) ([]tree.Datum, error) {
I only see returnDatums parameter being used in the tests.
I'd rather not. It's okay if in the test you have a helper that iterates the row again to extract datums.
pkg/util/json/parser.go
line 335 at r7 (raw file):
// (i.e. jsonString([]byte)). // See https://groups.google.com/g/golang-nuts/c/Zsfk-VMd_fU/m/O1ru4fO-BgAJ func UnsafeGetBytes(s string) ([]byte, error) {
Let's not export this. It's fine to copy this utility method into your library.
While doing this, we should add a todo to replace it w/ less unsafe version once we switch to go 1.20
pkg/util/parquet/writer.go
line 97 at r7 (raw file):
// compression schemes, allocator, batch size, page size etc func NewWriter(sch *SchemaDefinition, sink io.Writer, opts ...Option) (*Writer, error) { cfg := newConfig()
I would inline it; and I don't think you need to make a "new config" Just regular, stack allocated value is fine
(you can pass opt.apply(&cfg) below)
pkg/util/parquet/writer.go
line 154 at r7 (raw file):
func (r *RowWriter) writeIdx(idx int) { r.colIdxMap[idx/64] = r.colIdxMap[idx/64] | (1 << (idx % 64))
r.colIdxMap[idx >> 6 ] |= 1<<(idx & 64)
pkg/util/parquet/writer.go
line 170 at r7 (raw file):
} if r.idxIsWritten(idx) { return errors.AssertionFailedf("previously wrote datum to row at idx %d", idx)
I don't know if this is worth it... I mean, if the only reason we keep this bitmap is to tell
caller not to override the same column multiple times -- I'd say let the caller go for it.
Why not?
If you don't set anything, or you only set some columns, you wind up writing tree.DNull...
It's one thing to check bounds, and/or types. But I feel like this is a bit too much. And, there could be an argument
that if the caller knows that it only has 1 column, why force it to set more columns?
pkg/util/parquet/writer.go
line 200 at r7 (raw file):
// returned RowWriter must be used to write all datums in the row before // the next call to AddData. func (w *Writer) AddData() (*RowWriter, error) {
rename to AddRow() perhaps?
pkg/util/parquet/write_functions.go
line 34 at r7 (raw file):
// // This means any array below will not be in use outside the writeBatch // function below.
very nice.
b824800
to
e9d3b84
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @miretskiy and @samiskin)
pkg/ccl/changefeedccl/parquet.go
line 54 at r7 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
I only see returnDatums parameter being used in the tests.
I'd rather not. It's okay if in the test you have a helper that iterates the row again to extract datums.
Done.
pkg/ccl/changefeedccl/parquet.go
line 74 at r8 (raw file):
// populateDatums writes the appropriate datums into the datumAlloc slice. func populateDatums(updatedRow cdcevent.Row, prevRow cdcevent.Row, datumAlloc []tree.Datum) error {
This is its own function so we can use this production code in tests rather than copy the code in tests.
pkg/ccl/changefeedccl/parquet.go
line 75 at r8 (raw file):
// populateDatums writes the appropriate datums into the datumAlloc slice. func populateDatums(updatedRow cdcevent.Row, prevRow cdcevent.Row, datumAlloc []tree.Datum) error { datums := datumAlloc[:0]
I prefer this only because I dislike using an idx
when you have an iterator like so
idx := 0
if err := updatedRow.ForEachColumn().Datum(func(d tree.Datum, _ cdcevent.ResultColumn) error {
datums[idx] = d
idx += 1
})....
datums[idx] = eventType
pkg/util/parquet/writer.go
line 97 at r7 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
I would inline it; and I don't think you need to make a "new config" Just regular, stack allocated value is fine
(you can pass opt.apply(&cfg) below)
Done.
pkg/util/parquet/writer.go
line 154 at r7 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
r.colIdxMap[idx >> 6 ] |= 1<<(idx & 64)
Done (and deleted)
pkg/util/parquet/writer.go
line 170 at r7 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
I don't know if this is worth it... I mean, if the only reason we keep this bitmap is to tell
caller not to override the same column multiple times -- I'd say let the caller go for it.
Why not?
If you don't set anything, or you only set some columns, you wind up writing tree.DNull...
It's one thing to check bounds, and/or types. But I feel like this is a bit too much. And, there could be an argument
that if the caller knows that it only has 1 column, why force it to set more columns?
Per our discussion, we will now pass an []tree.Datum
pkg/util/parquet/writer.go
line 200 at r7 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
rename to AddRow() perhaps?
Done.
Code quote:
AddData
pkg/util/json/parser.go
line 335 at r7 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
Let's not export this. It's fine to copy this utility method into your library.
While doing this, we should add a todo to replace it w/ less unsafe version once we switch to go 1.20
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 2 of 16 files at r3, 1 of 22 files at r4, 1 of 9 files at r7, 7 of 8 files at r8.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @jayshrivastava and @samiskin)
pkg/ccl/changefeedccl/parquet.go
line 21 at r8 (raw file):
type parquetWriter struct { inner *parquet.Writer
you could just embed *parquet.Writer if you want (so that you don't need to reference inner)
pkg/ccl/changefeedccl/parquet.go
line 65 at r8 (raw file):
} return w.inner.AddRow(w.datumAlloc)
perfect.
pkg/sql/sem/tree/datum.go
line 1043 at r8 (raw file):
// AsDDecimal attempts to retrieve a DDecimal from an Expr, returning a DDecimal and // a flag signifying whether the assertion was successful. func AsDDecimal(e Expr) (*DDecimal, bool) {
surprised it wasn't there before.
pkg/util/parquet/write_functions.go
line 123 at r8 (raw file):
return []byte{}, nil } const maxStrLen = 1 << 30 // Really, can't see us supporting input JSONs that big.
s/JSONs/string/
e9d3b84
to
1c002e8
Compare
This commit installs the apache arrow parquet library for Go at version 11. The release can be found here: https://github.com/apache/arrow/releases/tag/go%2Fv11.0.0 This library is licensed under the Apache License 2.0. Informs: cockroachdb#99028 Epic: None Release note: None
1c002e8
to
77771c1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the reviews and for teaching me your ways 🙏
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @miretskiy and @samiskin)
bors r=miretskiy |
bors r- |
Canceled. |
This change implements a `Writer` struct in the new `util/parquet` package. This `Writer` writes datums the `io.Writer` sink using a configurable parquet version (defaults to v2.6). The package implements several features internally required to write in the parquet format: - schema creation - row group / column page management - encoding/decoding of CRDB datums to parquet datums Currently, the writer only supports types found in the TPCC workload, namely INT, DECIMAL, STRING UUID, TIMESTAMP and BOOL. This change also adds a benchmark and tests which verify the correctness of the writer and test utils for reading datums from parquet files. Informs: cockroachdb#99028 Epic: None Release note: None
This change adds the file `parquet.go` which contains helper functions to help create parquet writers and export data via `cdcevent.Row` structs. This change also adds tests to ensure rows are written to parquet files correctly. Epic: None Release note: None
77771c1
to
d6acc93
Compare
bors r=miretskiy |
Build succeeded: |
cdc: add apache arrow parquet library
This commit installs the apache arrow parquet library for Go
at version 11. The release can be found here:
https://github.com/apache/arrow/releases/tag/go%2Fv11.0.0
This library is licensed under the Apache License 2.0.
Informs: #99028
Epic: None
Release note: None
util/parquet: create parquet writer library
This change implements a
Writer
struct in the newutil/parquet
package.This
Writer
writes datums to theio.Writer
sinkusing a configurable parquet version (defaults to v2.6).
The package implements several features internally required to write in the parquet format:
Currently, the writer only supports types found in the TPCC workload, namely INT, DECIMAL, STRING
UUID, TIMESTAMP and BOOL.
This change also adds a benchmark and tests which verify the correctness of the
writer and test utils for reading datums from parquet files.
Informs: #99028
Epic: None
Release note: None
changefeedccl: add parquet writer
This change adds the file
parquet.go
which containshelper functions to help create parquet writers
and export data via
cdcevent.Row
structs.This change also adds tests to ensure rows are written
to parquet files correctly.
Epic: None
Release note: None