-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(bigquery): expose Apache Arrow data #8500
Conversation
bigquery/iterator.go
Outdated
// NextRecord returns next batch of rows as serialised arrow.Record | ||
func (a *ArrowIterator) NextRecord() ([]byte, error) { | ||
return a.r.arrowIterator.next() | ||
} | ||
|
||
// Schema is available after first call to NextRecord | ||
func (a *ArrowIterator) Schema() []byte { | ||
return a.r.arrowIterator.decoder.rawArrowSchema | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be more interoperable with the Arrow libraries if you could expose a direct io.Reader
for the raw IPC stream
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I am using this to construct a array.RecordReader
. Sample here : https://github.com/rilldata/rill/blob/09726146ec04415e68f7e41efa64e5b5db12aa7e/runtime/drivers/bigquery/arrow.go#L19
I can move this code to the SDK and make ArrowIterator
confirm to array.RecordReader
as proposed in other comments as well. Is the array.RecordReader
locked for further modifications ?
It seemed tricky to expose a io.Reader
that can read from multiple IPC streams.
Also I was looking at the spec for io.Reader
and saw this statement : If some data is available but not len(p) bytes, Read conventionally returns what is available instead of waiting for more.
I am wondering what should be the behaviour here? Should we wait for next API call to succeed or return whatever is available ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The difficulty with having ArrowIterator
conform to array.RecordReader
is that it would require explicitly exposing the major versioned arrow.Record
interface which I believe was trying to be avoided. If it's okay to directly include the versioned arrow.Record
interface in the API then I would agree that conforming to the array.RecordReader
interface would be best.
I agree you wouldn't expose an io.Reader
to read from multiple IPC streams, but rather you'd expose an individual io.Reader
for each IPC stream, and then allow the consumer to construct an array.RecordReader
for each stream. That's the common way of doing it to allow the consumer to control the backpressure etc. It's also how I implemented the Snowflake arrow stream handling, it provides an io.Reader
per stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the inputs @zeroshade
Should the io.Reader
implementation wait for the next batch to be available or return whatever bytes are available as per the io.Reader
specs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it should return whatever bytes are available as per the io.Reader
spec. Just make sure you don't return an io.EOF
unless you actually are at the end 😄
bigquery/query.go
Outdated
if rowIter.arrowIterator == nil { | ||
return nil, errors.New("bigquery: results not available as arrow records") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a particular kind of query / results that can't be returned as an arrow record batch stream? Can we document those?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I understood, this is not possible right now since we enforce the usage of storageAPI
. I added this as a defensive check. I can remove this if this is adding confusion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the forceStorageAPI
flag was meant only for testing, because in some cases for smaller result sets, there is extra overhead involved into using the Storage API for fetching data, so it might be faster in those scenarios to use the "old" path with jobs.query/getqueryresults.
@zeroshade there is an easy way to convert a JSON Stream to a Arrow stream ? This way we can backport the REST API on those cases of the faster paths or small results sets.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also, even when forcing the Storage API with the flag, if some error happen opening a Read Stream (like if you hit one of the limitations or quota limits like the amount of CreateReadSession
per minute), we still fallback to the REST/JSON API
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alvarowolfx As long as you have a schema, you can use https://pkg.go.dev/github.com/apache/arrow/go/v13/arrow/array#NewJSONReader or the individual FromJSON
functions etc.
In addition, all of the Builder
s have an UnmarshalJSON
method which will append the unmarshalled json to the builder.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can some integration tests be added to show how the ArrowIterator interface is being used ?
bigquery/query.go
Outdated
if rowIter.arrowIterator == nil { | ||
return nil, errors.New("bigquery: results not available as arrow records") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the forceStorageAPI
flag was meant only for testing, because in some cases for smaller result sets, there is extra overhead involved into using the Storage API for fetching data, so it might be faster in those scenarios to use the "old" path with jobs.query/getqueryresults.
@zeroshade there is an easy way to convert a JSON Stream to a Arrow stream ? This way we can backport the REST API on those cases of the faster paths or small results sets.
bigquery/query.go
Outdated
// ReadAsArrowObjects submits a query for execution and returns the results via an ArrowIterator. | ||
// As a prerequisite storage read client should be enabled. | ||
// It will always use jobs.insert path so may not be efficient for small queries. | ||
func (q *Query) ReadAsArrowObjects(ctx context.Context) (it *ArrowIterator, err error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about other query/data paths, like reading from a Job
or a Table
? I feel like they should have the same interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will add this for those paths as well.
bigquery/query.go
Outdated
if rowIter.arrowIterator == nil { | ||
return nil, errors.New("bigquery: results not available as arrow records") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also, even when forcing the Storage API with the flag, if some error happen opening a Read Stream (like if you hit one of the limitations or quota limits like the amount of CreateReadSession
per minute), we still fallback to the REST/JSON API
#8100
This PR exposes an iterator to fetch serialised apache arrow records.