Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigquery): use storage api for query jobs #6822

Merged
merged 52 commits into from
Jan 23, 2023
Merged
Show file tree
Hide file tree
Changes from 51 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
bc85c4f
feat(bigquery): use storage api for query jobs
alvarowolfx Oct 6, 2022
8ce3146
fix(bigquery): check for nil rowSource on iterator
alvarowolfx Oct 7, 2022
3837e96
fix(bigquery): pass job config to job.read
alvarowolfx Oct 7, 2022
5a5853b
fix(bigquery): remove internal timeout for storage read
alvarowolfx Oct 7, 2022
b4f91c4
feat(bigquery): move storage api integration to managedreader package
alvarowolfx Oct 13, 2022
35a074e
fix minimal job config test
alvarowolfx Oct 14, 2022
14bd232
feat(bigquery): add managed reader and all arrow types integration tests
alvarowolfx Oct 14, 2022
365dc70
feat(bigquery): allow reading queries, jobs and table with storage api
alvarowolfx Oct 17, 2022
68f8ae2
feat(bigquery): basic ordering detection for storage api
alvarowolfx Oct 17, 2022
6bade1f
fix(bigquery): rename function to parse raw bq values
alvarowolfx Oct 24, 2022
ce887e2
refactor(bigquery): remove some duplication for ValueLoader resolution
alvarowolfx Oct 25, 2022
de3dff2
fix(bigquery): iterator value loader refactor issue
alvarowolfx Oct 25, 2022
a65fa88
refactor(bigquery): rename managed reader to reader
alvarowolfx Oct 25, 2022
a6668b1
feat(bigquery): remove ordering check as backend handles that
alvarowolfx Oct 25, 2022
46ccf6b
fix(bigquery): remove unused files to check query ordering
alvarowolfx Oct 26, 2022
a1c432e
feat(bigquery): add benchmarks for storage read api
alvarowolfx Oct 27, 2022
484d307
feat(bigquery): add arrow raw iterator to storage reader package
alvarowolfx Oct 27, 2022
1cbf868
fix(bigquery): remove bq storage client from core pkg test
alvarowolfx Nov 3, 2022
24db8c3
refactor(bigquery): remove Reader and add ReadSession concept
alvarowolfx Nov 8, 2022
524ad39
feat(bigquery): make arrow iterator internal and expose direct ReadSe…
alvarowolfx Nov 11, 2022
bd79a8c
Merge branch 'main' into bq-query-storage-api
alvarowolfx Nov 11, 2022
4b4af10
feat: move storage read package to core for fetching data
alvarowolfx Nov 17, 2022
db84150
fix: storage api bench
alvarowolfx Nov 17, 2022
9fb17dd
Merge branch 'main' into bq-query-storage-api
alvarowolfx Nov 17, 2022
3f9657e
feat(bigquery): error handling improvements and clone ReadSessionInfo
alvarowolfx Nov 22, 2022
a364c16
fix: rename sClient to storageOptimizedClient
alvarowolfx Nov 22, 2022
f94d934
feat(bigquery): hide storage read client and read session
alvarowolfx Nov 23, 2022
ac83ff0
fix(bigquery): check with unexported field on QueryConfig
alvarowolfx Nov 23, 2022
370def8
feat(bigquery): optimize memory/resource usage on storage api iterator
alvarowolfx Dec 5, 2022
1945387
fix: move forceStorageAPI check to probeFastPath
alvarowolfx Dec 13, 2022
c394375
fix: optimize query path with storage api when more pages available
alvarowolfx Dec 13, 2022
cee24fb
Merge branch 'main' into bq-query-storage-api
alvarowolfx Dec 13, 2022
3a8cca9
feat(bigquery): limit stream consumption, improve error handling and …
alvarowolfx Dec 14, 2022
33703f8
fix: remove t.logf, method comment mismatch and break on context dead…
alvarowolfx Dec 15, 2022
95b155a
fix: potential race condition when starting read stream goroutines
alvarowolfx Dec 15, 2022
e6bf050
refactor: improve readability on storage api integration tests
alvarowolfx Dec 15, 2022
070faad
fix: race condition when streamCount > maxWorker
alvarowolfx Dec 16, 2022
1247364
refactor: allow mock of storage read rpcs methods to improve testing
alvarowolfx Dec 16, 2022
5c88f62
feat(bigquery): handle script jobs queries with storage api
alvarowolfx Dec 30, 2022
68b21d9
fix(bigquery): properly handle script jobs
alvarowolfx Jan 5, 2023
16569aa
fix(bigquery): race condition when setting up wait group
alvarowolfx Jan 5, 2023
635d842
Merge branch 'main' into bq-query-storage-api
alvarowolfx Jan 5, 2023
3f5dfbc
feat(bigquery): use ReadSession.EstimateRowCount for reads using stor…
alvarowolfx Jan 5, 2023
d513981
fix(bigquery): update copyright year to 2023
alvarowolfx Jan 5, 2023
a3c3a0a
test(bigquery): improve retry logic test for storage iterator
alvarowolfx Jan 6, 2023
4b0da4e
feat(bigquery): check for order by clause and limit storage api strea…
alvarowolfx Jan 11, 2023
3e38df5
Merge branch 'main' into bq-query-storage-api
alvarowolfx Jan 11, 2023
777dc07
Merge branch 'main' into bq-query-storage-api
alvarowolfx Jan 18, 2023
083f421
feat(bigquery): bump arrow version to v10.0.1
alvarowolfx Jan 20, 2023
9c43018
fix(bigquery): data race when marking iterator as done
alvarowolfx Jan 20, 2023
4ad7739
fix(bigquery): remove atomic.Bool usage
alvarowolfx Jan 20, 2023
c476cfc
Merge branch 'main' into bq-query-storage-api
alvarowolfx Jan 23, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 221 additions & 0 deletions bigquery/arrow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package bigquery

import (
"bytes"
"encoding/base64"
"errors"
"fmt"
"math/big"

"cloud.google.com/go/civil"
"github.com/apache/arrow/go/v10/arrow"
"github.com/apache/arrow/go/v10/arrow/array"
"github.com/apache/arrow/go/v10/arrow/ipc"
)

type arrowDecoder struct {
tableSchema Schema
rawArrowSchema []byte
arrowSchema *arrow.Schema
}

func newArrowDecoderFromSession(session *readSession, schema Schema) (*arrowDecoder, error) {
bqSession := session.bqSession
if bqSession == nil {
return nil, errors.New("read session not initialized")
}
arrowSerializedSchema := bqSession.GetArrowSchema().GetSerializedSchema()
buf := bytes.NewBuffer(arrowSerializedSchema)
r, err := ipc.NewReader(buf)
if err != nil {
return nil, err
}
defer r.Release()
p := &arrowDecoder{
tableSchema: schema,
rawArrowSchema: arrowSerializedSchema,
arrowSchema: r.Schema(),
}
return p, nil
}

func (ap *arrowDecoder) createIPCReaderForBatch(serializedArrowRecordBatch []byte) (*ipc.Reader, error) {
buf := bytes.NewBuffer(ap.rawArrowSchema)
buf.Write(serializedArrowRecordBatch)
return ipc.NewReader(buf, ipc.WithSchema(ap.arrowSchema))
}

// decodeArrowRecords decodes BQ ArrowRecordBatch into rows of []Value.
func (ap *arrowDecoder) decodeArrowRecords(serializedArrowRecordBatch []byte) ([][]Value, error) {
r, err := ap.createIPCReaderForBatch(serializedArrowRecordBatch)
if err != nil {
return nil, err
}
defer r.Release()
rs := make([][]Value, 0)
for r.Next() {
rec := r.Record()
values, err := ap.convertArrowRecordValue(rec)
if err != nil {
return nil, err
}
rs = append(rs, values...)
}
return rs, nil
}

// decodeRetainedArrowRecords decodes BQ ArrowRecordBatch into a list of retained arrow.Record.
func (ap *arrowDecoder) decodeRetainedArrowRecords(serializedArrowRecordBatch []byte) ([]arrow.Record, error) {
r, err := ap.createIPCReaderForBatch(serializedArrowRecordBatch)
if err != nil {
return nil, err
}
defer r.Release()
records := []arrow.Record{}
alvarowolfx marked this conversation as resolved.
Show resolved Hide resolved
for r.Next() {
rec := r.Record()
rec.Retain()
records = append(records, rec)
}
return records, nil
}

// convertArrowRows converts an arrow.Record into a series of Value slices.
func (ap *arrowDecoder) convertArrowRecordValue(record arrow.Record) ([][]Value, error) {
rs := make([][]Value, record.NumRows())
for i := range rs {
rs[i] = make([]Value, record.NumCols())
}
for j, col := range record.Columns() {
fs := ap.tableSchema[j]
ft := ap.arrowSchema.Field(j).Type
for i := 0; i < col.Len(); i++ {
v, err := convertArrowValue(col, i, ft, fs)
if err != nil {
return nil, fmt.Errorf("found arrow type %s, but could not convert value: %v", ap.arrowSchema.Field(j).Type, err)
}
rs[i][j] = v
}
}
return rs, nil
}

// convertArrow gets row value in the given column and converts to a Value.
// Arrow is a colunar storage, so we navigate first by column and get the row value.
// More details on conversions can be seen here: https://cloud.google.com/bigquery/docs/reference/storage#arrow_schema_details
func convertArrowValue(col arrow.Array, i int, ft arrow.DataType, fs *FieldSchema) (Value, error) {
if !col.IsValid(i) {
return nil, nil
}
switch ft.(type) {
case *arrow.BooleanType:
v := col.(*array.Boolean).Value(i)
return convertBasicType(fmt.Sprintf("%v", v), fs.Type)
case *arrow.Int8Type:
v := col.(*array.Int8).Value(i)
return convertBasicType(fmt.Sprintf("%v", v), fs.Type)
case *arrow.Int16Type:
v := col.(*array.Int16).Value(i)
return convertBasicType(fmt.Sprintf("%v", v), fs.Type)
case *arrow.Int32Type:
v := col.(*array.Int32).Value(i)
return convertBasicType(fmt.Sprintf("%v", v), fs.Type)
case *arrow.Int64Type:
v := col.(*array.Int64).Value(i)
return convertBasicType(fmt.Sprintf("%v", v), fs.Type)
case *arrow.Float16Type:
v := col.(*array.Float16).Value(i)
return convertBasicType(fmt.Sprintf("%v", v.Float32()), fs.Type)
case *arrow.Float32Type:
v := col.(*array.Float32).Value(i)
return convertBasicType(fmt.Sprintf("%v", v), fs.Type)
case *arrow.Float64Type:
v := col.(*array.Float64).Value(i)
return convertBasicType(fmt.Sprintf("%v", v), fs.Type)
case *arrow.BinaryType:
v := col.(*array.Binary).Value(i)
encoded := base64.StdEncoding.EncodeToString(v)
return convertBasicType(encoded, fs.Type)
case *arrow.StringType:
v := col.(*array.String).Value(i)
return convertBasicType(v, fs.Type)
case *arrow.Date32Type:
v := col.(*array.Date32).Value(i)
return convertBasicType(v.FormattedString(), fs.Type)
case *arrow.Date64Type:
v := col.(*array.Date64).Value(i)
return convertBasicType(v.FormattedString(), fs.Type)
case *arrow.TimestampType:
v := col.(*array.Timestamp).Value(i)
dft := ft.(*arrow.TimestampType)
t := v.ToTime(dft.Unit)
if dft.TimeZone == "" { // Datetime
return Value(civil.DateTimeOf(t)), nil
}
return Value(t.UTC()), nil // Timestamp
case *arrow.Time32Type:
v := col.(*array.Time32).Value(i)
return convertBasicType(v.FormattedString(arrow.Microsecond), fs.Type)
case *arrow.Time64Type:
v := col.(*array.Time64).Value(i)
return convertBasicType(v.FormattedString(arrow.Microsecond), fs.Type)
case *arrow.Decimal128Type:
dft := ft.(*arrow.Decimal128Type)
v := col.(*array.Decimal128).Value(i)
rat := big.NewRat(1, 1)
rat.Num().SetBytes(v.BigInt().Bytes())
d := rat.Denom()
d.Exp(big.NewInt(10), big.NewInt(int64(dft.Scale)), nil)
return Value(rat), nil
case *arrow.Decimal256Type:
dft := ft.(*arrow.Decimal256Type)
v := col.(*array.Decimal256).Value(i)
rat := big.NewRat(1, 1)
rat.Num().SetBytes(v.BigInt().Bytes())
d := rat.Denom()
d.Exp(big.NewInt(10), big.NewInt(int64(dft.Scale)), nil)
return Value(rat), nil
case *arrow.ListType:
arr := col.(*array.List)
dft := ft.(*arrow.ListType)
values := []Value{}
start, end := arr.ValueOffsets(i)
slice := array.NewSlice(arr.ListValues(), start, end)
for j := 0; j < slice.Len(); j++ {
v, err := convertArrowValue(slice, j, dft.Elem(), fs)
if err != nil {
return nil, err
}
values = append(values, v)
}
return values, nil
case *arrow.StructType:
arr := col.(*array.Struct)
nestedValues := []Value{}
fields := ft.(*arrow.StructType).Fields()
for fIndex, f := range fields {
v, err := convertArrowValue(arr.Field(fIndex), i, f.Type, fs.Schema[fIndex])
if err != nil {
return nil, err
}
nestedValues = append(nestedValues, v)
}
return nestedValues, nil
default:
return nil, fmt.Errorf("unknown arrow type: %v", ft)
}
}
22 changes: 22 additions & 0 deletions bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type Client struct {

projectID string
bqs *bq.Service
rc *readClient
}

// DetectProjectID is a sentinel value that instructs NewClient to detect the
Expand Down Expand Up @@ -97,6 +98,21 @@ func NewClient(ctx context.Context, projectID string, opts ...option.ClientOptio
return c, nil
}

// EnableStorageReadClient sets up Storage API connection to be used when fetching
// large datasets from tables, jobs or queries.
// Calling this method twice will return an error.
func (c *Client) EnableStorageReadClient(ctx context.Context, opts ...option.ClientOption) error {
if c.rc != nil {
return fmt.Errorf("failed: storage read client already set up")
}
rc, err := newReadClient(ctx, c.projectID, opts...)
if err != nil {
return err
}
c.rc = rc
return nil
}

// Project returns the project ID or number for this instance of the client, which may have
// either been explicitly specified or autodetected.
func (c *Client) Project() string {
Expand All @@ -107,6 +123,12 @@ func (c *Client) Project() string {
// Close should be called when the client is no longer needed.
// It need not be called at program exit.
func (c *Client) Close() error {
if c.rc != nil {
err := c.rc.close()
if err != nil {
return err
}
}
return nil
}

Expand Down
23 changes: 23 additions & 0 deletions bigquery/examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,29 @@ func ExampleQuery_Read() {
_ = it // TODO: iterate using Next or iterator.Pager.
}

func ExampleQuery_Read_accelerated() {
ctx := context.Background()
client, err := bigquery.NewClient(ctx, "project-id")
if err != nil {
// TODO: Handle error.
}

// Enable Storage API usage for fetching data
err = client.EnableStorageReadClient(ctx)
if err != nil {
// TODO: Handle error.
}

sql := fmt.Sprintf(`SELECT name, number, state FROM %s WHERE state = "CA"`, `bigquery-public-data.usa_names.usa_1910_current`)
q := client.Query(sql)
it, err := q.Read(ctx)
if err != nil {
// TODO: Handle error.
}

_ = it // TODO: iterate using Next or iterator.Pager.
}

func ExampleRowIterator_Next() {
ctx := context.Background()
client, err := bigquery.NewClient(ctx, "project-id")
Expand Down
17 changes: 16 additions & 1 deletion bigquery/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
cloud.google.com/go/datacatalog v1.8.1
cloud.google.com/go/iam v0.8.0
cloud.google.com/go/storage v1.28.1
github.com/apache/arrow/go/v10 v10.0.1
github.com/google/go-cmp v0.5.9
github.com/google/uuid v1.3.0
github.com/googleapis/gax-go/v2 v2.7.0
Expand All @@ -22,13 +23,27 @@ require (
require (
cloud.google.com/go/compute v1.14.0 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
github.com/andybalholm/brotli v1.0.4 // indirect
github.com/apache/thrift v0.16.0 // indirect
github.com/goccy/go-json v0.9.11 // indirect
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/flatbuffers v2.0.8+incompatible // indirect
github.com/google/martian/v3 v3.2.1 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.1 // indirect
github.com/klauspost/asmfmt v1.3.2 // indirect
github.com/klauspost/compress v1.15.9 // indirect
github.com/klauspost/cpuid/v2 v2.0.9 // indirect
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
github.com/pierrec/lz4/v4 v4.1.15 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
golang.org/x/net v0.0.0-20221014081412-f15817d10f9b // indirect
golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783 // indirect
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10 // indirect
golang.org/x/sys v0.0.0-20220829200755-d48e67d00261 // indirect
golang.org/x/text v0.5.0 // indirect
golang.org/x/tools v0.1.12 // indirect
google.golang.org/appengine v1.6.7 // indirect
)
Loading