-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(bigquery): expose Apache Arrow data through ArrowIterator #8506
Merged
gcf-merge-on-green
merged 13 commits into
googleapis:main
from
alvarowolfx:bq-arrow-iterator
Oct 23, 2023
Merged
Changes from 3 commits
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
699d262
feat(bigquery): detach storage api iterator from arrow decoding
alvarowolfx 8ad2672
feat(bigquery): make ArrowIterator public and add integration tests
alvarowolfx 6969f67
chore(bigquery): upgrade arrow to v13
alvarowolfx 6765573
feat(bigquery): add ArrowIteratorReader
alvarowolfx 87fb373
fix(bigquery): return err if ArrowIterator is nil
alvarowolfx aee8018
chore: rollback arrow version to upgrade it separately
alvarowolfx d142c53
chore: rollback other lib upgrade to upgrade them separately
alvarowolfx 01fc5e4
Merge branch 'main' into bq-arrow-iterator
alvarowolfx 0762adc
fix: return 0 and io.EOF when iterator is done
alvarowolfx c08080e
fix: release arrow records and avoid extra alloc of array.Int64
alvarowolfx 3c50652
test: add check allocator to check for memory leaks when retaining ar…
alvarowolfx ba17463
Merge branch 'main' into bq-arrow-iterator
alvarowolfx bcb7329
Merge branch 'main' into bq-arrow-iterator
alvarowolfx File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,51 +17,72 @@ package bigquery | |
import ( | ||
"bytes" | ||
"encoding/base64" | ||
"errors" | ||
"fmt" | ||
"io" | ||
"math/big" | ||
|
||
"cloud.google.com/go/civil" | ||
"github.com/apache/arrow/go/v12/arrow" | ||
"github.com/apache/arrow/go/v12/arrow/array" | ||
"github.com/apache/arrow/go/v12/arrow/ipc" | ||
"github.com/apache/arrow/go/v13/arrow" | ||
"github.com/apache/arrow/go/v13/arrow/array" | ||
"github.com/apache/arrow/go/v13/arrow/ipc" | ||
) | ||
|
||
type arrowDecoder struct { | ||
tableSchema Schema | ||
rawArrowSchema []byte | ||
arrowSchema *arrow.Schema | ||
// ArrowRecordBatch represents an Arrow RecordBatch with the source PartitionID | ||
type ArrowRecordBatch struct { | ||
reader io.Reader | ||
// Serialized Arrow Record Batch. | ||
Data []byte | ||
// Serialized Arrow Schema. | ||
Schema []byte | ||
// Source partition ID. In the Storage API world, it represents the ReadStream. | ||
PartitionID string | ||
} | ||
|
||
func newArrowDecoderFromSession(session *readSession, schema Schema) (*arrowDecoder, error) { | ||
bqSession := session.bqSession | ||
if bqSession == nil { | ||
return nil, errors.New("read session not initialized") | ||
// Read makes ArrowRecordBatch implements io.Reader | ||
func (r *ArrowRecordBatch) Read(p []byte) (int, error) { | ||
if r.reader == nil { | ||
buf := bytes.NewBuffer(r.Schema) | ||
buf.Write(r.Data) | ||
r.reader = buf | ||
} | ||
arrowSerializedSchema := bqSession.GetArrowSchema().GetSerializedSchema() | ||
return r.reader.Read(p) | ||
} | ||
|
||
// ArrowIterator represents a way to iterate through a stream of arrow records. | ||
// Experimental: this interface is experimental and may be modified or removed in future versions, | ||
// regardless of any other documented package stability guarantees. | ||
type ArrowIterator interface { | ||
Next() (*ArrowRecordBatch, error) | ||
Schema() Schema | ||
SerializedArrowSchema() []byte | ||
} | ||
|
||
type arrowDecoder struct { | ||
tableSchema Schema | ||
arrowSchema *arrow.Schema | ||
} | ||
|
||
func newArrowDecoder(arrowSerializedSchema []byte, schema Schema) (*arrowDecoder, error) { | ||
buf := bytes.NewBuffer(arrowSerializedSchema) | ||
r, err := ipc.NewReader(buf) | ||
if err != nil { | ||
return nil, err | ||
} | ||
defer r.Release() | ||
p := &arrowDecoder{ | ||
tableSchema: schema, | ||
rawArrowSchema: arrowSerializedSchema, | ||
arrowSchema: r.Schema(), | ||
tableSchema: schema, | ||
arrowSchema: r.Schema(), | ||
} | ||
return p, nil | ||
} | ||
|
||
func (ap *arrowDecoder) createIPCReaderForBatch(serializedArrowRecordBatch []byte) (*ipc.Reader, error) { | ||
buf := bytes.NewBuffer(ap.rawArrowSchema) | ||
buf.Write(serializedArrowRecordBatch) | ||
return ipc.NewReader(buf, ipc.WithSchema(ap.arrowSchema)) | ||
func (ap *arrowDecoder) createIPCReaderForBatch(arrowRecordBatch *ArrowRecordBatch) (*ipc.Reader, error) { | ||
return ipc.NewReader(arrowRecordBatch, ipc.WithSchema(ap.arrowSchema)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as above, it would be great (but could be a follow-up) if either |
||
} | ||
|
||
// decodeArrowRecords decodes BQ ArrowRecordBatch into rows of []Value. | ||
func (ap *arrowDecoder) decodeArrowRecords(serializedArrowRecordBatch []byte) ([][]Value, error) { | ||
r, err := ap.createIPCReaderForBatch(serializedArrowRecordBatch) | ||
func (ap *arrowDecoder) decodeArrowRecords(arrowRecordBatch *ArrowRecordBatch) ([][]Value, error) { | ||
r, err := ap.createIPCReaderForBatch(arrowRecordBatch) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
@@ -79,8 +100,8 @@ func (ap *arrowDecoder) decodeArrowRecords(serializedArrowRecordBatch []byte) ([ | |
} | ||
|
||
// decodeRetainedArrowRecords decodes BQ ArrowRecordBatch into a list of retained arrow.Record. | ||
func (ap *arrowDecoder) decodeRetainedArrowRecords(serializedArrowRecordBatch []byte) ([]arrow.Record, error) { | ||
r, err := ap.createIPCReaderForBatch(serializedArrowRecordBatch) | ||
func (ap *arrowDecoder) decodeRetainedArrowRecords(arrowRecordBatch *ArrowRecordBatch) ([]arrow.Record, error) { | ||
r, err := ap.createIPCReaderForBatch(arrowRecordBatch) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would probably be worthwhile (though certainly could be done as a follow-up) to allow passing a
memory.Allocator
interface here that would be stored in thearrowDecoder
to allow a user to configure how memory gets allocated for the arrow batches (it would be passed asipc.WithAllocator(mem)
toipc.NewReader
)In most cases users would probalby just use
memory.DefaultAllocator
but in other cases, depending on the constraints of the system, they might want to use a custom allocator such as amalloc
based allocator that uses C memory to avoid garbage collection passes, or any other custom allocation they might want for specialized situations.The other benefit of this would be that you could use
memory.CheckedAllocator
in unit tests to verify that everything properly hasRelease
called if necessary, etc.