Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: improve parquet byte estimates and flushing scheme #111138

Merged
merged 5 commits into from
Sep 26, 2023

Conversation

jayshrivastava
Copy link
Contributor

@jayshrivastava jayshrivastava commented Sep 22, 2023

changefeedccl: ensure that parquet counts emitted rows in metrics

Previously, the parquet sink would not update the EmittedMessages
metric for emitted rows. This change updates the sink to do so.
This change does not add a test, but there is an open issue to add tests
for metrics: #111042.

Release note: None
Epic: None


roachtest/cdc: add parquet initial scan roachtest

This change adds a simple parquet initial scan roachtest.

Release note: None
Epic: None


util/parquet: support explicit flushing and add byte estimation

This change updates the parquet writer to have two new
APIs BufferedBytesEstimate() int64 and Flush() error.

BufferedBytesEstimate returns the number of bytes currently
buffered by the writer (ie. bytes which have not been written
to the sink). This value is calcualted continuously by the writer
as rows are written.

Flush flushes buffered bytes by closing the current row group writer
and initializing a new one.

These methods can be used by a caller to flush the writer depending on
how much data is buffered in mem.

This change also adds testing for both of these methods. BufferedBytesEstimate
is tested via a unit test. Flush test coverage is added in TestRandomDatums.
The test calls Flush randomly and asserts the output is correct.

Release note: None
Epic: None


changefeedccl: use byte estimate when flushing parquet files

Previously, the parquet sink/encoder would use a buffer to decide
when it emit files to cloudstorage. This scheme did not take into
account the bytes buffered by the parquet writer library. This change
makes it so that the parquet sink/encoder takes into account
memory buffered by the library when deciding if it needs to perform
a size-based flush.

Release note: None
Epic: None


changefeedccl: add changefeed.cloudstorage_buffered_bytes metric

This change adds a metric to track the number of bytes buffered by
the cloudstorage sink.

Release note: None
Epic: None

@cockroach-teamcity
Copy link
Member

This change is Reviewable

@jayshrivastava jayshrivastava force-pushed the bench-parquet branch 2 times, most recently from 5d0ddc8 to 30c07d9 Compare September 22, 2023 21:08
@jayshrivastava jayshrivastava changed the title Bench parquet changefeedccl: improve parquet byte estimates and flushing scheme Sep 22, 2023
@jayshrivastava jayshrivastava marked this pull request as ready for review September 22, 2023 22:06
@jayshrivastava jayshrivastava requested review from a team as code owners September 22, 2023 22:06
@jayshrivastava jayshrivastava requested review from smg260, DarrylWong and miretskiy and removed request for a team September 22, 2023 22:06
Copy link
Contributor

@miretskiy miretskiy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 13 of 14 files at r2, all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @DarrylWong, @jayshrivastava, and @smg260)


pkg/ccl/changefeedccl/parquet_sink_cloudstorage.go line 237 at r2 (raw file):

	if log.V(1) {
		if parquetSink.everyN.ShouldLog() {

collapse log.V(1) && everyN.ShouldLog?


pkg/util/parquet/writer.go line 136 at r2 (raw file):

//
// NB: The writer may buffer all the written data in memory until it is
// Flush()ed to the sink.

Thanks for adding this important notice.

I understand why you say "may buffer" -- I think that's because the underlying group write has a 64M
limit on how much it buffers before it decides to auto flush.

I think that, at least in theory, if you allow underlying group write to flush automatically, it's possible
that your byte estimate may get out of whack because of the updateBytesEstimate line:
w.bufferedBytesEstimate += internalCw.EstimatedBufferedValueBytes()
This additive update only works if you can guarantee that underlying group write does not flush.

I think you should make this notice a lot stronger to say that this writer buffers all data in memory until
explicit flush is called. You should use math.MaxInt64 as the row writer parameter to guarantee that it does not
flush (and of course, you should add a note why this is used).

I may be misunderstanding something and if so, please correct me if I am.


pkg/util/parquet/writer.go line 248 at r2 (raw file):

	w.bufferedBytesEstimate = 0
	for datumColIdx, d := range datums {
		if err := w.sch.cols[datumColIdx].colWriter.Write(d, w.columnChunkWriterCache[datumColIdx], w.ba, w.updateByteEstimate); err != nil {

I think it would be better if you changed the colWriter interface (Write method) to instead return the number of bytes currently buffered. Update your estimate after calling colWriter.Write explicitly.
There are few benefits imo: 1. I think you won't need to do type assertions in updateByteEstimate since the underlying writer knows its byte estimate 2. you won't be bleeding internals of "writer" implementation into your write functions .


pkg/util/parquet/writer.go line 304 at r2 (raw file):

// internalColumnWriter is an interface used to expose methods on the column
// chunk writers below which are not a part of the file.ColumnChunkWriter
// inteface.

nit : augment comment it a bit to indicate that the underlying parquet types (BytesWrite, IntWriter, etc) all implement this method.


pkg/ccl/changefeedccl/changefeed_test.go line 8909 at r2 (raw file):

		// Because checkpoints are disabled, we should have some bytes build up
		// in the sink.
		// Parquet is a much more efficient format, so the buffered files will

nit: move parquet part of the comment inside if?


pkg/ccl/changefeedccl/changefeed_test.go line 8924 at r2 (raw file):

		// Allow checkpoints to pass through and flush the sink. We should see
		// zero bytes buffered after that.

nice.


pkg/ccl/changefeedccl/testfeed_test.go line 1089 at r2 (raw file):

		}
		randNum := rand.Intn(5)
		if randNum < 0 {

did you mean to include this change?


pkg/util/parquet/writer_test.go line 705 at r2 (raw file):

		tree.NewDTuple(tupleTyp, tree.NewDInt(8), tree.NewDInt(9), tree.NewDString("uiop")),
		da,
	}}

let's add a test case that adds identical tuple too.


pkg/util/parquet/writer_test.go line 737 at r2 (raw file):

	// The first estimate is large because of the overhead of the encoder.
	// The deltas are small because parquet is good at bit packing.
	expectedEstimates := []int64{754, 792, 830, 866, 904}

i'm a bit worried this test may become brittle if the underlying parquet library changes
(it's encoder, compressor, etc)...
I understand what you're trying to do here, and I wonder if we should settle on something that's
a bit less precise. Like: asserting the estimate is >0 when you add something. Asserting that
it increases when you add something new. It must also increase even if you add identical row again --
no such thing as infinite compression.
And of course, estimate should be exactly 0 after flush.

Copy link
Contributor Author

@jayshrivastava jayshrivastava left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @DarrylWong, @miretskiy, and @smg260)


pkg/ccl/changefeedccl/parquet_sink_cloudstorage.go line 237 at r2 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

collapse log.V(1) && everyN.ShouldLog?

Done.


pkg/util/parquet/write_functions.go line 227 at r3 (raw file):

			len(cw), len(tw))
	}
	return writeTuple(d, cw, a, tw)

Unlike the arraywriter and scalarwriter, we don't call estimatedBufferedBytesForChunkWriter(cw[0]) because we actually need to call it for each column writer. We push this call down to writeTuple because that is actually doing the iteration.


pkg/util/parquet/writer.go line 136 at r2 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

Thanks for adding this important notice.

I understand why you say "may buffer" -- I think that's because the underlying group write has a 64M
limit on how much it buffers before it decides to auto flush.

I think that, at least in theory, if you allow underlying group write to flush automatically, it's possible
that your byte estimate may get out of whack because of the updateBytesEstimate line:
w.bufferedBytesEstimate += internalCw.EstimatedBufferedValueBytes()
This additive update only works if you can guarantee that underlying group write does not flush.

I think you should make this notice a lot stronger to say that this writer buffers all data in memory until
explicit flush is called. You should use math.MaxInt64 as the row writer parameter to guarantee that it does not
flush (and of course, you should add a note why this is used).

I may be misunderstanding something and if so, please correct me if I am.

Okay so I think it's ideal to have full control over flushing. Therefore, I pushed what you recommended. Having the data page size uncapped and row group size uncapped means that we can control when exactly to flush.

With my most recent changes, calling Flush() will flush the whole row group. Because all the data is buffered, one data page is created per column. So if we flush every 1MB, we have one 1MB row groups and 1/16MB data pages if the table has 16 cols.

There are some recommendations on row group sizes and data pages sizes https://parquet.apache.org/docs/file-format/configurations/#:~:text=Data%20Page%20Size,-Data%20pages%20should&text=Larger%20page%20sizes%20incur%20less,recommend%208KB%20for%20page%20sizes, but I don't think this is too important to worry about now. Maybe if a customer has a real concern in the future, we can come up with something else.


pkg/util/parquet/writer.go line 248 at r2 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

I think it would be better if you changed the colWriter interface (Write method) to instead return the number of bytes currently buffered. Update your estimate after calling colWriter.Write explicitly.
There are few benefits imo: 1. I think you won't need to do type assertions in updateByteEstimate since the underlying writer knows its byte estimate 2. you won't be bleeding internals of "writer" implementation into your write functions .

Done. I think it's better too. I put it in the writer so I could pass in testing knobs. Now the testing knob is just a global var.


pkg/ccl/changefeedccl/changefeed_test.go line 8909 at r2 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

nit: move parquet part of the comment inside if?

Done


pkg/ccl/changefeedccl/testfeed_test.go line 1089 at r2 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

did you mean to include this change?

Nope. Removed.


pkg/util/parquet/writer_test.go line 705 at r2 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

let's add a test case that adds identical tuple too.

Done. I added a test which inserts identical rows.


pkg/util/parquet/writer_test.go line 737 at r2 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

i'm a bit worried this test may become brittle if the underlying parquet library changes
(it's encoder, compressor, etc)...
I understand what you're trying to do here, and I wonder if we should settle on something that's
a bit less precise. Like: asserting the estimate is >0 when you add something. Asserting that
it increases when you add something new. It must also increase even if you add identical row again --
no such thing as infinite compression.
And of course, estimate should be exactly 0 after flush.

Updated the test. Lmk what you think!

Previously, the parquet sink would not update the `EmittedMessages`
metric for emitted rows. This change updates the sink to do so.
This change does not add a test, but there is an open issue to add tests
for metrics: cockroachdb#111042.

Release note: None
Epic: None
Copy link
Contributor

@miretskiy miretskiy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 1 of 7 files at r3.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @DarrylWong, @jayshrivastava, and @smg260)


pkg/util/parquet/write_functions.go line 227 at r3 (raw file):

Previously, jayshrivastava (Jayant) wrote…

Unlike the arraywriter and scalarwriter, we don't call estimatedBufferedBytesForChunkWriter(cw[0]) because we actually need to call it for each column writer. We push this call down to writeTuple because that is actually doing the iteration.

ack.


pkg/util/parquet/write_functions.go line 145 at r4 (raw file):

// and writing it to a file.ColumnChunkWriter. For tuples, there is a
// file.ColumnChunkWriter per tuple field. The `callback` is executed
// once per file.ColumnChunkWriter after the respective datums are written.

nit: callback comment no longer applicable.


pkg/util/parquet/write_functions.go line 258 at r4 (raw file):

			defLevel = tupleFieldNilDefLevel
		}
		if err := wFn(dt.D[i], w[i], a, defLevel, newEntryRepLevel); err != nil {

kind of a shame to repeat (almost verbatim) the code here, and on top for the d == tree.DNull case.
What do you think of something like:

var dt *tree.DTuple
if d != tree.DNull {
   if tup, ok := tree.AsDTuple(d); ok {
      dt = tup
   } else {
      return 0, pgerror.Newf(pgcode.DatatypeMismatch, "expected DTuple, found %T", d)
   }
}

// Helper function to return datum at specified tuple position and def level.
tupleDatumAndDefLevel := func(pos int) (tree.Datum, []int16) {
   if  dt == nil {
      return tree.DNull, nilTupleDefLevel
   }
   if dt.D[i] == tree.DNull {
      return tree.DNull,  tupleFieldNilDefLevel
   }
   return dt.D[i], tupleFieldNonNilDefLevel

And then, basically, have a single loop that accesses appropriate datum/deflevel for each i.


pkg/util/parquet/writer.go line 136 at r2 (raw file):

Previously, jayshrivastava (Jayant) wrote…

Okay so I think it's ideal to have full control over flushing. Therefore, I pushed what you recommended. Having the data page size uncapped and row group size uncapped means that we can control when exactly to flush.

With my most recent changes, calling Flush() will flush the whole row group. Because all the data is buffered, one data page is created per column. So if we flush every 1MB, we have one 1MB row groups and 1/16MB data pages if the table has 16 cols.

There are some recommendations on row group sizes and data pages sizes https://parquet.apache.org/docs/file-format/configurations/#:~:text=Data%20Page%20Size,-Data%20pages%20should&text=Larger%20page%20sizes%20incur%20less,recommend%208KB%20for%20page%20sizes, but I don't think this is too important to worry about now. Maybe if a customer has a real concern in the future, we can come up with something else.

You could consider making this threshold somewhat dynamic based on the number of columns. But I think it's fine as is.


pkg/util/parquet/writer.go line 168 at r4 (raw file):

	// data page size and row group size are uncapped. This means that the library will not flush
	// automatically when the buffered data size is large. It will only flush the caller calls Flush().
	// Note that this means there will be one data page per column per row group in the final file.

one data page per column per row group in the final file.... that's if the caller calls Flush only once?


pkg/util/parquet/writer_test.go line 790 at r4 (raw file):

				for i := 0; i < tc.numInsertsPerStep; i++ {
					for _, d := range tc.datums {
						err := writer.AddRow(d)

perhaps a short comment why we add the same thing multiple times.

Copy link
Contributor Author

@jayshrivastava jayshrivastava left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @DarrylWong, @miretskiy, and @smg260)


pkg/util/parquet/write_functions.go line 258 at r4 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

kind of a shame to repeat (almost verbatim) the code here, and on top for the d == tree.DNull case.
What do you think of something like:

var dt *tree.DTuple
if d != tree.DNull {
   if tup, ok := tree.AsDTuple(d); ok {
      dt = tup
   } else {
      return 0, pgerror.Newf(pgcode.DatatypeMismatch, "expected DTuple, found %T", d)
   }
}

// Helper function to return datum at specified tuple position and def level.
tupleDatumAndDefLevel := func(pos int) (tree.Datum, []int16) {
   if  dt == nil {
      return tree.DNull, nilTupleDefLevel
   }
   if dt.D[i] == tree.DNull {
      return tree.DNull,  tupleFieldNilDefLevel
   }
   return dt.D[i], tupleFieldNonNilDefLevel

And then, basically, have a single loop that accesses appropriate datum/deflevel for each i.

Done.


pkg/util/parquet/writer.go line 136 at r2 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

You could consider making this threshold somewhat dynamic based on the number of columns. But I think it's fine as is.

Ack


pkg/util/parquet/writer.go line 168 at r4 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

one data page per column per row group in the final file.... that's if the caller calls Flush only once?

My comment doesn't say there is one row group :).


pkg/util/parquet/writer_test.go line 790 at r4 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

perhaps a short comment why we add the same thing multiple times.

I had the comment on tc.numInsertsPerStep, but I can move it here.

This change adds a simple parquet initial scan roachtest.

Release note: None
Epic: None
This change updates the parquet writer to have two new
APIs `BufferedBytesEstimate() int64` and `Flush() error`.

`BufferedBytesEstimate` returns the number of bytes currently
buffered by the writer (ie. bytes which have not been written
to the sink). This value is calcualted continuously by the writer
as rows are written.

`Flush` flushes buffered bytes by closing the current row group writer
and initializing a new one.

These methods can be used by a caller to flush the writer depending on
how much data is buffered in mem.

This change also adds testing for both of these methods. `BufferedBytesEstimate`
is tested via a unit test. `Flush` test coverage is added in `TestRandomDatums`.
The test calls `Flush` randomly and asserts the output is correct.

Release note: None
Epic: None
Previously, the parquet sink/encoder would use a buffer to decide
when it emit files to cloudstorage. This scheme did not take into
account the bytes buffered by the parquet writer library. This change
makes it so that the parquet sink/encoder takes into account
memory buffered by the library when deciding if it needs to perform
a size-based flush.

Release note: None
Epic: None
This change adds a metric to track the number of bytes buffered by
the cloudstorage sink.

Release note: None
Epic: None
@jayshrivastava
Copy link
Contributor Author

bors r+

@craig
Copy link
Contributor

craig bot commented Sep 26, 2023

Build succeeded:

@craig craig bot merged commit 0453b28 into cockroachdb:master Sep 26, 2023
@jayshrivastava jayshrivastava deleted the bench-parquet branch September 26, 2023 15:30
@jayshrivastava jayshrivastava mentioned this pull request Dec 13, 2023
13 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants