-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(bigquery): expose Apache Arrow data through ArrowIterator #8506
feat(bigquery): expose Apache Arrow data through ArrowIterator #8506
Conversation
t.Fatal("expected stream to be done") | ||
} | ||
} | ||
|
||
func TestIntegration_StorageReadArrow(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@k-anshul @zeroshade this integration test show an example on how that interface would be used.
@alvarowolfx I'm gonna try to get a thorough review of this in the next day or two from the Arrow perspective. Thanks for doing this! |
bigquery/storage_integration_test.go
Outdated
r, err := ipc.NewReader(&arrowIteratorReader{ | ||
it: arrowIt, | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we have an interface that doesn't require consumers to implement their own arrowIteratorReader
?
wg := sync.WaitGroup{} | ||
wg.Add(len(streams)) | ||
sem := semaphore.NewWeighted(int64(it.session.settings.maxWorkerCount)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would it be feasible to expose the streams themselves to a consumer to allow them to control the parallelization instead of forcing them to a specific route like this?
The new |
bigquery/arrow.go
Outdated
if err == io.EOF { | ||
batch, err := r.it.Next() | ||
if err == iterator.Done { | ||
return -1, io.EOF |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reading of the io.Reader interface would suggest we should return 0 rather than a negative value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point, just sent a fix
numrec := 0 | ||
for r.Next() { | ||
rec := r.Record() | ||
rec.Retain() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused by the release/retains here, but I've not been spending much time with arrow recently. If you retain individual records do you need to release them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
they are going to be releases by the ipc.Reader later on the r.Release()
call right after.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not exactly. If you call Retain
on an individual record, then you will need to call Release
on that record.
The ipc.Reader
keeps only the current Record, reusing that member. When you call Next()
it will release the record it had before loading the next one. This is why you need to call Retain
on the records that you put into the slice, so that they aren't deallocated by the ipc.Reader
calling Release
on them. However you should also add a defer rec.Release()
in the loop to ensure that record gets released, the ipc.Reader
will not retain any references to those records and therefore will not call Release
on them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
interesting, I didn't know that. I'll make the changes to call rec.Release
on each record.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you want to verify that everything is released/retained correctly, you could use memory.CheckedAllocator
and defer mem.AssertSize(t, 0)
then pass the checked allocator to everything (like to ipc.NewReader
) so that it is used for all the memory allocations.
Not absolutely necessary, but an optional way to add some assurances if desired.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm gonna add support for changing the allocator just internally for now for test purposes, I liked the idea of verifying that there are no memory leaks. Thanks for the tip.
bigquery/storage_integration_test.go
Outdated
var totalFromArrow int64 | ||
for tr.Next() { | ||
rec := tr.Record() | ||
vec := array.NewInt64Data(rec.Column(1).Data()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused here, why array.NewInt64Data(rec.Column(1).Data())
? Why not just rec.Column(1).(*array.Int64)
?
You're performing an additional allocation here when you don't need to be, and it's also a potential memory leak since this new array instance will not get released when the record is released.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't know that the rec.Column
here could be type converted to *array.Int64
directly. Is always hard to know when you can do those type conversions directly, so I was trying to use the API methods to do that instead. I'll make the change here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, rec.Column
returns an arrow.Array
interface value, so as long as you know the column is an int64 column, then you can type assert it to *array.Int64
. Alternately you can do a type switch / check the data type, etc. before doing the type assertion. The intent is to minimize copying and minimize allocations when handling arrays and utilizing them for records etc.
} | ||
totalFromSQL := sumValues[0].(int64) | ||
|
||
tr := array.NewTableReader(arrowTable, arrowTable.NumRows()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add defer tr.Release()
please.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
after adding support for changing the allocator I found that without the tr.Release
a memory leak was happening. Good catch and awesome tips on how to catch those leaks 🎉
arrowSchema *arrow.Schema | ||
} | ||
|
||
func newArrowDecoder(arrowSerializedSchema []byte, schema Schema) (*arrowDecoder, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would probably be worthwhile (though certainly could be done as a follow-up) to allow passing a memory.Allocator
interface here that would be stored in the arrowDecoder
to allow a user to configure how memory gets allocated for the arrow batches (it would be passed as ipc.WithAllocator(mem)
to ipc.NewReader
)
In most cases users would probalby just use memory.DefaultAllocator
but in other cases, depending on the constraints of the system, they might want to use a custom allocator such as a malloc
based allocator that uses C memory to avoid garbage collection passes, or any other custom allocation they might want for specialized situations.
The other benefit of this would be that you could use memory.CheckedAllocator
in unit tests to verify that everything properly has Release
called if necessary, etc.
bigquery/arrow.go
Outdated
buf.Write(serializedArrowRecordBatch) | ||
return ipc.NewReader(buf, ipc.WithSchema(ap.arrowSchema)) | ||
func (ap *arrowDecoder) createIPCReaderForBatch(arrowRecordBatch *ArrowRecordBatch) (*ipc.Reader, error) { | ||
return ipc.NewReader(arrowRecordBatch, ipc.WithSchema(ap.arrowSchema)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above, it would be great (but could be a follow-up) if either newArrowDecoder
accepted a memory.Allocator
which would get used here as ipc.WithAllocator(mem)
or if this method optionally took an allocator (defaulting to memory.DefaultAllocator
if nil
was passed)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a couple comments, but overall this looks good to me, though I would like to point at #8506 (comment) for possible consideration.
ADBC allows for a query to return a result set via partition identifiers (arbitrary byte blobs for a given driver) and then allow the consumer to retrieve the streams of data from each partition in parallel. Since BigQuery already can return the data as multiple streams, there could be a benefit to allowing this Arrow iterator to return the raw underlying streams of IPC data instead of only exposing an interface for the collected streams with the parallelization handled in here, rather than by the consumer.
It's not something that I believe should block this PR, but should possibly be picked up in a follow-up as an enhancement.
I'll create another issue to keep track of this request as a future enhancement. Thanks for the deeper review on the PR, I'm gonna push some improvements based on the comments.
|
🤖 I have created a release *beep* *boop* --- ## [1.57.0](https://togithub.com/googleapis/google-cloud-go/compare/bigquery/v1.56.0...bigquery/v1.57.0) (2023-10-30) ### Features * **bigquery/biglake:** Promote to GA ([e864fbc](https://togithub.com/googleapis/google-cloud-go/commit/e864fbcbc4f0a49dfdb04850b07451074c57edc8)) * **bigquery/storage/managedwriter:** Support default value controls ([#8686](https://togithub.com/googleapis/google-cloud-go/issues/8686)) ([dfa8e22](https://togithub.com/googleapis/google-cloud-go/commit/dfa8e22edf560211ae2a2ebf1f9a23b86887c7be)) * **bigquery:** Expose Apache Arrow data through ArrowIterator ([#8506](https://togithub.com/googleapis/google-cloud-go/issues/8506)) ([c8e7692](https://togithub.com/googleapis/google-cloud-go/commit/c8e76923621b379fb7deb6dfb944011af1d980bd)), refs [#8100](https://togithub.com/googleapis/google-cloud-go/issues/8100) * **bigquery:** Introduce query preview features ([#8653](https://togithub.com/googleapis/google-cloud-go/issues/8653)) ([f29683b](https://togithub.com/googleapis/google-cloud-go/commit/f29683bcd06567e4fc2d404f53bedbea5b5f0f90)) ### Bug Fixes * **bigquery:** Handle storage read api Recv call errors ([#8666](https://togithub.com/googleapis/google-cloud-go/issues/8666)) ([c73963f](https://togithub.com/googleapis/google-cloud-go/commit/c73963f64ef667daa8a33a5a4cc2156818fc6914)) * **bigquery:** Update golang.org/x/net to v0.17.0 ([174da47](https://togithub.com/googleapis/google-cloud-go/commit/174da47254fefb12921bbfc65b7829a453af6f5d)) * **bigquery:** Update grpc-go to v1.56.3 ([343cea8](https://togithub.com/googleapis/google-cloud-go/commit/343cea8c43b1e31ae21ad50ad31d3b0b60143f8c)) * **bigquery:** Update grpc-go to v1.59.0 ([81a97b0](https://togithub.com/googleapis/google-cloud-go/commit/81a97b06cb28b25432e4ece595c55a9857e960b7)) --- This PR was generated with [Release Please](https://togithub.com/googleapis/release-please). See [documentation](https://togithub.com/googleapis/release-please#release-please).
As we have some planned work to support Arrow data fetching on other query APIs, so we need to think of an interface that will support all of those query paths and also work as a base for other Arrow projects like ADBC. So this PR detaches the Storage API from the Arrow Decoder and creates a new ArrowIterator interface. This new interface is implemented by the Storage iterator and later can be implemented for other query interfaces that supports Arrow. Resolves #8100
🤖 I have created a release *beep* *boop* --- ## [1.57.0](https://togithub.com/googleapis/google-cloud-go/compare/bigquery/v1.56.0...bigquery/v1.57.0) (2023-10-30) ### Features * **bigquery/biglake:** Promote to GA ([e864fbc](https://togithub.com/googleapis/google-cloud-go/commit/e864fbcbc4f0a49dfdb04850b07451074c57edc8)) * **bigquery/storage/managedwriter:** Support default value controls ([#8686](https://togithub.com/googleapis/google-cloud-go/issues/8686)) ([dfa8e22](https://togithub.com/googleapis/google-cloud-go/commit/dfa8e22edf560211ae2a2ebf1f9a23b86887c7be)) * **bigquery:** Expose Apache Arrow data through ArrowIterator ([#8506](https://togithub.com/googleapis/google-cloud-go/issues/8506)) ([c8e7692](https://togithub.com/googleapis/google-cloud-go/commit/c8e76923621b379fb7deb6dfb944011af1d980bd)), refs [#8100](https://togithub.com/googleapis/google-cloud-go/issues/8100) * **bigquery:** Introduce query preview features ([#8653](https://togithub.com/googleapis/google-cloud-go/issues/8653)) ([f29683b](https://togithub.com/googleapis/google-cloud-go/commit/f29683bcd06567e4fc2d404f53bedbea5b5f0f90)) ### Bug Fixes * **bigquery:** Handle storage read api Recv call errors ([#8666](https://togithub.com/googleapis/google-cloud-go/issues/8666)) ([c73963f](https://togithub.com/googleapis/google-cloud-go/commit/c73963f64ef667daa8a33a5a4cc2156818fc6914)) * **bigquery:** Update golang.org/x/net to v0.17.0 ([174da47](https://togithub.com/googleapis/google-cloud-go/commit/174da47254fefb12921bbfc65b7829a453af6f5d)) * **bigquery:** Update grpc-go to v1.56.3 ([343cea8](https://togithub.com/googleapis/google-cloud-go/commit/343cea8c43b1e31ae21ad50ad31d3b0b60143f8c)) * **bigquery:** Update grpc-go to v1.59.0 ([81a97b0](https://togithub.com/googleapis/google-cloud-go/commit/81a97b06cb28b25432e4ece595c55a9857e960b7)) --- This PR was generated with [Release Please](https://togithub.com/googleapis/release-please). See [documentation](https://togithub.com/googleapis/release-please#release-please).
I'm wondering if we have plans to make |
As we have some planned work to support Arrow data fetching on other query APIs, so we need to think of an interface that will support all of those query paths and also work as a base for other Arrow projects like ADBC. So this PR detaches the Storage API from the Arrow Decoder and creates a new ArrowIterator interface. This new interface is implemented by the Storage iterator and later can be implemented for other query interfaces that supports Arrow.
Resolves #8100