diff --git a/ci/docker/conda-integration.dockerfile b/ci/docker/conda-integration.dockerfile index ea1cd672aa165..cce6a287f86be 100644 --- a/ci/docker/conda-integration.dockerfile +++ b/ci/docker/conda-integration.dockerfile @@ -38,7 +38,7 @@ RUN mamba install -q -y \ nodejs=${node} \ yarn \ openjdk=${jdk} \ - zlib=1.2.11 && \ + zlib=1.2.11 && \ mamba clean --all --force-pkgs-dirs # Install Rust with only the needed components diff --git a/dev/release/01-prepare-test.rb b/dev/release/01-prepare-test.rb index b498a29763241..fa00d6290c1f0 100644 --- a/dev/release/01-prepare-test.rb +++ b/dev/release/01-prepare-test.rb @@ -169,6 +169,13 @@ def test_version_pre_tag ], ], }, + { + path: "go/arrow/doc.go", + hunks: [ + ["-const PkgVersion = \"#{@snapshot_version}\"", + "+const PkgVersion = \"#{@release_version}\""], + ], + }, { path: "go/parquet/writer_properties.go", hunks: [ diff --git a/dev/release/post-11-bump-versions-test.rb b/dev/release/post-11-bump-versions-test.rb index 6770ca1c22edd..1c87a9ea45da2 100644 --- a/dev/release/post-11-bump-versions-test.rb +++ b/dev/release/post-11-bump-versions-test.rb @@ -185,6 +185,17 @@ def test_version_post_tag ] Dir.glob("go/**/{go.mod,*.go,*.go.*}") do |path| + if path == "go/arrow/doc.go" + expected_changes << { + path: path, + hunks: [ + [ + "-const PkgVersion = \"#{@snapshot_version}\"", + "+const PkgVersion = \"#{@next_snapshot_version}\"", + ], + ]} + next + end import_path = "github.com/apache/arrow/go/v#{@snapshot_major_version}" lines = File.readlines(path, chomp: true) target_lines = lines.grep(/#{Regexp.escape(import_path)}/) diff --git a/dev/release/utils-prepare.sh b/dev/release/utils-prepare.sh index c68632b9d0487..49af5c608aa1a 100644 --- a/dev/release/utils-prepare.sh +++ b/dev/release/utils-prepare.sh @@ -156,6 +156,9 @@ update_versions() { sed -i.bak -E -e \ "s/\"parquet-go version .+\"/\"parquet-go version ${version}\"/" \ parquet/writer_properties.go + sed -i.bak -E -e \ + "s/const PkgVersion = \".*/const PkgVersion = \"${version}\"/" \ + arrow/doc.go find . -name "*.bak" -exec rm {} \; git add . popd diff --git a/go/arrow/array/booleanbuilder.go b/go/arrow/array/booleanbuilder.go index 83e1fc6d31f2e..760d755314a7a 100644 --- a/go/arrow/array/booleanbuilder.go +++ b/go/arrow/array/booleanbuilder.go @@ -187,6 +187,12 @@ func (b *BooleanBuilder) unmarshalOne(dec *json.Decoder) error { return err } b.Append(val) + case json.Number: + val, err := strconv.ParseBool(v.String()) + if err != nil { + return err + } + b.Append(val) case nil: b.AppendNull() default: @@ -210,6 +216,7 @@ func (b *BooleanBuilder) unmarshal(dec *json.Decoder) error { func (b *BooleanBuilder) UnmarshalJSON(data []byte) error { dec := json.NewDecoder(bytes.NewReader(data)) + dec.UseNumber() t, err := dec.Token() if err != nil { return err diff --git a/go/arrow/array/builder.go b/go/arrow/array/builder.go index 4733ba9bbee7b..6a2146c080c40 100644 --- a/go/arrow/array/builder.go +++ b/go/arrow/array/builder.go @@ -304,7 +304,7 @@ func NewBuilder(mem memory.Allocator, dtype arrow.DataType) Builder { } case arrow.LIST: typ := dtype.(*arrow.ListType) - return NewListBuilder(mem, typ.Elem()) + return NewListBuilderWithField(mem, typ.ElemField()) case arrow.STRUCT: typ := dtype.(*arrow.StructType) return NewStructBuilder(mem, typ) @@ -319,7 +319,7 @@ func NewBuilder(mem memory.Allocator, dtype arrow.DataType) Builder { return NewDictionaryBuilder(mem, typ) case arrow.LARGE_LIST: typ := dtype.(*arrow.LargeListType) - return NewLargeListBuilder(mem, typ.Elem()) + return NewLargeListBuilderWithField(mem, typ.ElemField()) case arrow.MAP: typ := dtype.(*arrow.MapType) return NewMapBuilder(mem, typ.KeyType(), typ.ItemType(), typ.KeysSorted) diff --git a/go/arrow/array/list.go b/go/arrow/array/list.go index a603f7f7ada66..07e38944348ac 100644 --- a/go/arrow/array/list.go +++ b/go/arrow/array/list.go @@ -321,12 +321,31 @@ func NewListBuilder(mem memory.Allocator, etype arrow.DataType) *ListBuilder { } } +// NewListBuilderWithField takes a field to use for the child rather than just +// a datatype to allow for more customization. +func NewListBuilderWithField(mem memory.Allocator, field arrow.Field) *ListBuilder { + offsetBldr := NewInt32Builder(mem) + return &ListBuilder{ + baseListBuilder{ + builder: builder{refCount: 1, mem: mem}, + values: NewBuilder(mem, field.Type), + offsets: offsetBldr, + dt: arrow.ListOfField(field), + appendOffsetVal: func(o int) { offsetBldr.Append(int32(o)) }, + }, + } +} + func (b *baseListBuilder) Type() arrow.DataType { - switch b.dt.ID() { - case arrow.LIST: - return arrow.ListOf(b.values.Type()) - case arrow.LARGE_LIST: - return arrow.LargeListOf(b.values.Type()) + switch dt := b.dt.(type) { + case *arrow.ListType: + f := dt.ElemField() + f.Type = b.values.Type() + return arrow.ListOfField(f) + case *arrow.LargeListType: + f := dt.ElemField() + f.Type = b.values.Type() + return arrow.LargeListOfField(f) } return nil } @@ -346,6 +365,21 @@ func NewLargeListBuilder(mem memory.Allocator, etype arrow.DataType) *LargeListB } } +// NewLargeListBuilderWithField takes a field rather than just an element type +// to allow for more customization of the final type of the LargeList Array +func NewLargeListBuilderWithField(mem memory.Allocator, field arrow.Field) *LargeListBuilder { + offsetBldr := NewInt64Builder(mem) + return &LargeListBuilder{ + baseListBuilder{ + builder: builder{refCount: 1, mem: mem}, + values: NewBuilder(mem, field.Type), + offsets: offsetBldr, + dt: arrow.LargeListOfField(field), + appendOffsetVal: func(o int) { offsetBldr.Append(int64(o)) }, + }, + } +} + // Release decreases the reference count by 1. // When the reference count goes to zero, the memory is freed. func (b *baseListBuilder) Release() { @@ -356,15 +390,14 @@ func (b *baseListBuilder) Release() { b.nullBitmap.Release() b.nullBitmap = nil } + b.values.Release() + b.offsets.Release() } - b.values.Release() - b.offsets.Release() } func (b *baseListBuilder) appendNextOffset() { b.appendOffsetVal(b.values.Len()) - // b.offsets.Append(int32(b.values.Len())) } func (b *baseListBuilder) Append(v bool) { @@ -454,9 +487,6 @@ func (b *LargeListBuilder) NewArray() arrow.Array { // NewListArray creates a List array from the memory buffers used by the builder and resets the ListBuilder // so it can be used to build a new array. func (b *ListBuilder) NewListArray() (a *List) { - if b.offsets.Len() != b.length+1 { - b.appendNextOffset() - } data := b.newData() a = NewListData(data) data.Release() @@ -466,9 +496,6 @@ func (b *ListBuilder) NewListArray() (a *List) { // NewLargeListArray creates a List array from the memory buffers used by the builder and resets the LargeListBuilder // so it can be used to build a new array. func (b *LargeListBuilder) NewLargeListArray() (a *LargeList) { - if b.offsets.Len() != b.length+1 { - b.appendNextOffset() - } data := b.newData() a = NewLargeListData(data) data.Release() @@ -476,6 +503,9 @@ func (b *LargeListBuilder) NewLargeListArray() (a *LargeList) { } func (b *baseListBuilder) newData() (data *Data) { + if b.offsets.Len() != b.length+1 { + b.appendNextOffset() + } values := b.values.NewArray() defer values.Release() diff --git a/go/arrow/array/list_test.go b/go/arrow/array/list_test.go index f493167f76a1c..eb09f655d7e52 100644 --- a/go/arrow/array/list_test.go +++ b/go/arrow/array/list_test.go @@ -33,6 +33,8 @@ func TestListArray(t *testing.T) { }{ {arrow.LIST, []int32{0, 3, 3, 3, 7}, arrow.ListOf(arrow.PrimitiveTypes.Int32)}, {arrow.LARGE_LIST, []int64{0, 3, 3, 3, 7}, arrow.LargeListOf(arrow.PrimitiveTypes.Int32)}, + {arrow.LIST, []int32{0, 3, 3, 3, 7}, arrow.ListOfField(arrow.Field{Name: "item", Type: arrow.PrimitiveTypes.Int32, Nullable: true})}, + {arrow.LARGE_LIST, []int64{0, 3, 3, 3, 7}, arrow.LargeListOfField(arrow.Field{Name: "item", Type: arrow.PrimitiveTypes.Int32, Nullable: true})}, } for _, tt := range tests { diff --git a/go/arrow/array/record_test.go b/go/arrow/array/record_test.go index d5fbeb8c8e6dc..5deeb27853b73 100644 --- a/go/arrow/array/record_test.go +++ b/go/arrow/array/record_test.go @@ -135,7 +135,7 @@ func TestRecord(t *testing.T) { { schema: schema, cols: nil, - rows: 0, + rows: 0, }, { schema: schema, diff --git a/go/arrow/doc.go b/go/arrow/doc.go index 0af5cd163ab1b..cf73f1a00b3f7 100644 --- a/go/arrow/doc.go +++ b/go/arrow/doc.go @@ -31,6 +31,8 @@ array is valid (not null). If the array has no null entries, it is possible to o */ package arrow +const PkgVersion = "10.0.0-SNAPSHOT" + //go:generate go run _tools/tmpl/main.go -i -data=numeric.tmpldata type_traits_numeric.gen.go.tmpl type_traits_numeric.gen_test.go.tmpl array/numeric.gen.go.tmpl array/numericbuilder.gen.go.tmpl array/bufferbuilder_numeric.gen.go.tmpl //go:generate go run _tools/tmpl/main.go -i -data=datatype_numeric.gen.go.tmpldata datatype_numeric.gen.go.tmpl tensor/numeric.gen.go.tmpl tensor/numeric.gen_test.go.tmpl //go:generate go run _tools/tmpl/main.go -i -data=scalar/numeric.gen.go.tmpldata scalar/numeric.gen.go.tmpl scalar/numeric.gen_test.go.tmpl diff --git a/go/arrow/flight/flightsql/client.go b/go/arrow/flight/flightsql/client.go index 2f57d05484f76..5f7f693d2b2f7 100644 --- a/go/arrow/flight/flightsql/client.go +++ b/go/arrow/flight/flightsql/client.go @@ -42,13 +42,15 @@ func NewClient(addr string, auth flight.ClientAuthHandler, middleware []flight.C if err != nil { return nil, err } - return &Client{cl}, nil + return &Client{cl, memory.DefaultAllocator}, nil } // Client wraps a regular Flight RPC Client to provide the FlightSQL // interface functions and methods. type Client struct { Client flight.Client + + Alloc memory.Allocator } func descForCommand(cmd proto.Message) (*flight.FlightDescriptor, error) { @@ -141,7 +143,7 @@ func (c *Client) DoGet(ctx context.Context, in *flight.Ticket, opts ...grpc.Call return nil, err } - return flight.NewRecordReader(stream) + return flight.NewRecordReader(stream, ipc.WithAllocator(c.Alloc)) } // GetTables requests a list of tables from the server, with the provided @@ -236,6 +238,7 @@ func (c *Client) GetSqlInfo(ctx context.Context, info []SqlInfo, opts ...grpc.Ca // and use the specified allocator for any allocations it needs to perform. func (c *Client) Prepare(ctx context.Context, mem memory.Allocator, query string, opts ...grpc.CallOption) (prep *PreparedStatement, err error) { const actionType = CreatePreparedStatementActionType + var ( cmd, cmdResult anypb.Any res *pb.Result diff --git a/go/arrow/flight/flightsql/column_metadata.go b/go/arrow/flight/flightsql/column_metadata.go index 59c133757d5db..6ad8030d9950f 100644 --- a/go/arrow/flight/flightsql/column_metadata.go +++ b/go/arrow/flight/flightsql/column_metadata.go @@ -142,6 +142,11 @@ func NewColumnMetadataBuilder() *ColumnMetadataBuilder { return &ColumnMetadataBuilder{make([]string, 0), make([]string, 0)} } +func (c *ColumnMetadataBuilder) Clear() { + c.keys = c.keys[:0] + c.vals = c.vals[:0] +} + func (c *ColumnMetadataBuilder) Build() ColumnMetadata { md := c.Metadata() return ColumnMetadata{&md} diff --git a/go/arrow/flight/flightsql/example/sql_batch_reader.go b/go/arrow/flight/flightsql/example/sql_batch_reader.go new file mode 100644 index 0000000000000..5e7fbde1afef7 --- /dev/null +++ b/go/arrow/flight/flightsql/example/sql_batch_reader.go @@ -0,0 +1,285 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build go1.17 +// +build go1.17 + +package example + +import ( + "database/sql" + "reflect" + "strings" + "sync/atomic" + + "github.com/apache/arrow/go/v10/arrow" + "github.com/apache/arrow/go/v10/arrow/array" + "github.com/apache/arrow/go/v10/arrow/flight/flightsql" + "github.com/apache/arrow/go/v10/arrow/internal/debug" + "github.com/apache/arrow/go/v10/arrow/memory" +) + +func getArrowTypeFromString(dbtype string) arrow.DataType { + dbtype = strings.ToLower(dbtype) + if strings.HasPrefix(dbtype, "varchar") { + return arrow.BinaryTypes.String + } + + switch dbtype { + case "int", "integer": + return arrow.PrimitiveTypes.Int64 + case "real": + return arrow.PrimitiveTypes.Float64 + case "blob": + return arrow.BinaryTypes.Binary + case "text", "date", "char": + return arrow.BinaryTypes.String + default: + panic("invalid sqlite type: " + dbtype) + } +} + +func getArrowType(c *sql.ColumnType) arrow.DataType { + dbtype := strings.ToLower(c.DatabaseTypeName()) + if dbtype == "" { + switch c.ScanType().Kind() { + case reflect.Int, reflect.Int64, reflect.Uint64: + return arrow.PrimitiveTypes.Int64 + case reflect.Float32, reflect.Float64: + return arrow.PrimitiveTypes.Float64 + } + } + return getArrowTypeFromString(dbtype) +} + +const maxBatchSize = 1024 + +type SqlBatchReader struct { + refCount int64 + + schema *arrow.Schema + rows *sql.Rows + record arrow.Record + bldr *array.RecordBuilder + err error + + rowdest []interface{} +} + +func NewSqlBatchReaderWithSchema(mem memory.Allocator, schema *arrow.Schema, rows *sql.Rows) (*SqlBatchReader, error) { + rowdest := make([]interface{}, len(schema.Fields())) + for i, f := range schema.Fields() { + switch f.Type.ID() { + case arrow.UINT8: + if f.Nullable { + rowdest[i] = &sql.NullInt32{} + } else { + rowdest[i] = new(uint8) + } + case arrow.INT32: + if f.Nullable { + rowdest[i] = &sql.NullInt32{} + } else { + rowdest[i] = new(int32) + } + case arrow.INT64: + if f.Nullable { + rowdest[i] = &sql.NullInt64{} + } else { + rowdest[i] = new(int64) + } + case arrow.FLOAT64: + if f.Nullable { + rowdest[i] = &sql.NullFloat64{} + } else { + rowdest[i] = new(float64) + } + case arrow.BINARY: + var b []byte + rowdest[i] = &b + case arrow.STRING: + if f.Nullable { + rowdest[i] = &sql.NullString{} + } else { + rowdest[i] = new(string) + } + } + } + + return &SqlBatchReader{ + refCount: 1, + bldr: array.NewRecordBuilder(mem, schema), + schema: schema, + rowdest: rowdest, + rows: rows}, nil +} + +func NewSqlBatchReader(mem memory.Allocator, rows *sql.Rows) (*SqlBatchReader, error) { + bldr := flightsql.NewColumnMetadataBuilder() + + cols, err := rows.ColumnTypes() + if err != nil { + rows.Close() + return nil, err + } + + rowdest := make([]interface{}, len(cols)) + fields := make([]arrow.Field, len(cols)) + for i, c := range cols { + fields[i].Name = c.Name() + fields[i].Nullable, _ = c.Nullable() + fields[i].Type = getArrowType(c) + fields[i].Metadata = getColumnMetadata(bldr, getSqlTypeFromTypeName(c.DatabaseTypeName()), "") + switch fields[i].Type.ID() { + case arrow.UINT8: + if fields[i].Nullable { + rowdest[i] = &sql.NullInt32{} + } else { + rowdest[i] = new(uint8) + } + case arrow.INT32: + if fields[i].Nullable { + rowdest[i] = &sql.NullInt32{} + } else { + rowdest[i] = new(int32) + } + case arrow.INT64: + if fields[i].Nullable { + rowdest[i] = &sql.NullInt64{} + } else { + rowdest[i] = new(int64) + } + case arrow.FLOAT64: + if fields[i].Nullable { + rowdest[i] = &sql.NullFloat64{} + } else { + rowdest[i] = new(float64) + } + case arrow.BINARY: + var b []byte + rowdest[i] = &b + case arrow.STRING: + if fields[i].Nullable { + rowdest[i] = &sql.NullString{} + } else { + rowdest[i] = new(string) + } + } + } + + schema := arrow.NewSchema(fields, nil) + return &SqlBatchReader{ + refCount: 1, + bldr: array.NewRecordBuilder(mem, schema), + schema: schema, + rowdest: rowdest, + rows: rows}, nil +} + +func (r *SqlBatchReader) Retain() { + atomic.AddInt64(&r.refCount, 1) +} + +func (r *SqlBatchReader) Release() { + debug.Assert(atomic.LoadInt64(&r.refCount) > 0, "too many releases") + + if atomic.AddInt64(&r.refCount, -1) == 0 { + r.rows.Close() + r.rows, r.schema, r.rowdest = nil, nil, nil + r.bldr.Release() + r.bldr = nil + if r.record != nil { + r.record.Release() + r.record = nil + } + } +} +func (r *SqlBatchReader) Schema() *arrow.Schema { return r.schema } + +func (r *SqlBatchReader) Record() arrow.Record { return r.record } + +func (r *SqlBatchReader) Err() error { return r.err } + +func (r *SqlBatchReader) Next() bool { + if r.record != nil { + r.record.Release() + r.record = nil + } + + rows := 0 + for rows < maxBatchSize && r.rows.Next() { + if err := r.rows.Scan(r.rowdest...); err != nil { + r.err = err + return false + } + + for i, v := range r.rowdest { + fb := r.bldr.Field(i) + switch v := v.(type) { + case *uint8: + fb.(*array.Uint8Builder).Append(*v) + case *int64: + fb.(*array.Int64Builder).Append(*v) + case *sql.NullInt64: + if !v.Valid { + fb.AppendNull() + } else { + fb.(*array.Int64Builder).Append(v.Int64) + } + case *int32: + fb.(*array.Int32Builder).Append(*v) + case *sql.NullInt32: + if !v.Valid { + fb.AppendNull() + } else { + switch b := fb.(type) { + case *array.Int32Builder: + b.Append(v.Int32) + case *array.Uint8Builder: + b.Append(uint8(v.Int32)) + } + } + case *float64: + fb.(*array.Float64Builder).Append(*v) + case *sql.NullFloat64: + if !v.Valid { + fb.AppendNull() + } else { + fb.(*array.Float64Builder).Append(v.Float64) + } + case *[]byte: + if v == nil { + fb.AppendNull() + } else { + fb.(*array.BinaryBuilder).Append(*v) + } + case *string: + fb.(*array.StringBuilder).Append(*v) + case *sql.NullString: + if !v.Valid { + fb.AppendNull() + } else { + fb.(*array.StringBuilder).Append(v.String) + } + } + } + + rows++ + } + + r.record = r.bldr.NewRecord() + return rows > 0 +} diff --git a/go/arrow/flight/flightsql/example/sqlite_info.go b/go/arrow/flight/flightsql/example/sqlite_info.go new file mode 100644 index 0000000000000..e4dcd160b0aca --- /dev/null +++ b/go/arrow/flight/flightsql/example/sqlite_info.go @@ -0,0 +1,199 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build go1.17 +// +build go1.17 + +package example + +import ( + "github.com/apache/arrow/go/v10/arrow" + "github.com/apache/arrow/go/v10/arrow/flight/flightsql" +) + +func SqlInfoResultMap() flightsql.SqlInfoResultMap { + return flightsql.SqlInfoResultMap{ + uint32(flightsql.SqlInfoFlightSqlServerName): "db_name", + uint32(flightsql.SqlInfoFlightSqlServerVersion): "sqlite 3", + uint32(flightsql.SqlInfoFlightSqlServerArrowVersion): arrow.PkgVersion, + uint32(flightsql.SqlInfoFlightSqlServerReadOnly): false, + uint32(flightsql.SqlInfoDDLCatalog): false, + uint32(flightsql.SqlInfoDDLSchema): false, + uint32(flightsql.SqlInfoDDLTable): true, + uint32(flightsql.SqlInfoIdentifierCase): int64(flightsql.SqlCaseSensitivityCaseInsensitive), + uint32(flightsql.SqlInfoIdentifierQuoteChar): `"`, + uint32(flightsql.SqlInfoQuotedIdentifierCase): int64(flightsql.SqlCaseSensitivityCaseInsensitive), + uint32(flightsql.SqlInfoAllTablesAreASelectable): true, + uint32(flightsql.SqlInfoNullOrdering): int64(flightsql.SqlNullOrderingSortAtStart), + uint32(flightsql.SqlInfoKeywords): []string{"ABORT", + "ACTION", + "ADD", + "AFTER", + "ALL", + "ALTER", + "ALWAYS", + "ANALYZE", + "AND", + "AS", + "ASC", + "ATTACH", + "AUTOINCREMENT", + "BEFORE", + "BEGIN", + "BETWEEN", + "BY", + "CASCADE", + "CASE", + "CAST", + "CHECK", + "COLLATE", + "COLUMN", + "COMMIT", + "CONFLICT", + "CONSTRAINT", + "CREATE", + "CROSS", + "CURRENT", + "CURRENT_DATE", + "CURRENT_TIME", + "CURRENT_TIMESTAMP", + "DATABASE", + "DEFAULT", + "DEFERRABLE", + "DEFERRED", + "DELETE", + "DESC", + "DETACH", + "DISTINCT", + "DO", + "DROP", + "EACH", + "ELSE", + "END", + "ESCAPE", + "EXCEPT", + "EXCLUDE", + "EXCLUSIVE", + "EXISTS", + "EXPLAIN", + "FAIL", + "FILTER", + "FIRST", + "FOLLOWING", + "FOR", + "FOREIGN", + "FROM", + "FULL", + "GENERATED", + "GLOB", + "GROUP", + "GROUPS", + "HAVING", + "IF", + "IGNORE", + "IMMEDIATE", + "IN", + "INDEX", + "INDEXED", + "INITIALLY", + "INNER", + "INSERT", + "INSTEAD", + "INTERSECT", + "INTO", + "IS", + "ISNULL", + "JOIN", + "KEY", + "LAST", + "LEFT", + "LIKE", + "LIMIT", + "MATCH", + "MATERIALIZED", + "NATURAL", + "NO", + "NOT", + "NOTHING", + "NOTNULL", + "NULL", + "NULLS", + "OF", + "OFFSET", + "ON", + "OR", + "ORDER", + "OTHERS", + "OUTER", + "OVER", + "PARTITION", + "PLAN", + "PRAGMA", + "PRECEDING", + "PRIMARY", + "QUERY", + "RAISE", + "RANGE", + "RECURSIVE", + "REFERENCES", + "REGEXP", + "REINDEX", + "RELEASE", + "RENAME", + "REPLACE", + "RESTRICT", + "RETURNING", + "RIGHT", + "ROLLBACK", + "ROW", + "ROWS", + "SAVEPOINT", + "SELECT", + "SET", + "TABLE", + "TEMP", + "TEMPORARY", + "THEN", + "TIES", + "TO", + "TRANSACTION", + "TRIGGER", + "UNBOUNDED", + "UNION", + "UNIQUE", + "UPDATE", + "USING", + "VACUUM", + "VALUES", + "VIEW", + "VIRTUAL", + "WHEN", + "WHERE", + "WINDOW", + "WITH", + "WITHOUT"}, + uint32(flightsql.SqlInfoNumericFunctions): []string{ + "ACOS", "ACOSH", "ASIN", "ASINH", "ATAN", "ATAN2", "ATANH", "CEIL", + "CEILING", "COS", "COSH", "DEGREES", "EXP", "FLOOR", "LN", "LOG", + "LOG10", "LOG2", "MOD", "PI", "POW", "POWER", "RADIANS", + "SIN", "SINH", "SQRT", "TAN", "TANH", "TRUNC"}, + uint32(flightsql.SqlInfoStringFunctions): []string{"SUBSTR", "TRIM", "LTRIM", "RTRIM", "LENGTH", + "REPLACE", "UPPER", "LOWER", "INSTR"}, + uint32(flightsql.SqlInfoSupportsConvert): map[int32][]int32{ + int32(flightsql.SqlConvertBigInt): {int32(flightsql.SqlConvertInteger)}, + }, + } +} diff --git a/go/arrow/flight/flightsql/example/sqlite_server.go b/go/arrow/flight/flightsql/example/sqlite_server.go new file mode 100644 index 0000000000000..5d2599b52c09f --- /dev/null +++ b/go/arrow/flight/flightsql/example/sqlite_server.go @@ -0,0 +1,569 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build go1.17 +// +build go1.17 + +// Package example contains a FlightSQL Server implementation using +// sqlite as the backing engine. +// +// In order to ensure portability we'll use modernc.org/sqlite instead +// of github.com/mattn/go-sqlite3 because modernc is a translation of the +// SQLite source into Go, such that it doesn't require CGO to run and +// doesn't need to link against the actual libsqlite3 libraries. This way +// we don't require CGO or libsqlite3 to run this example or the tests. +// +// That said, since both implement in terms of Go's standard database/sql +// package, it's easy to swap them out if desired as the modernc.org/sqlite +// package is slower than go-sqlite3. +// +// One other important note is that modernc.org/sqlite only works in go +// 1.17+ so this entire package is given the build constraint to only +// build when using go1.17 or higher +package example + +import ( + "context" + "database/sql" + "fmt" + "math/rand" + "strings" + "sync" + + "github.com/apache/arrow/go/v10/arrow" + "github.com/apache/arrow/go/v10/arrow/array" + "github.com/apache/arrow/go/v10/arrow/flight" + "github.com/apache/arrow/go/v10/arrow/flight/flightsql" + "github.com/apache/arrow/go/v10/arrow/flight/flightsql/schema_ref" + "github.com/apache/arrow/go/v10/arrow/memory" + "github.com/apache/arrow/go/v10/arrow/scalar" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + _ "modernc.org/sqlite" +) + +func genRandomString() []byte { + const length = 16 + max := int('z') + min := int('0') + + out := make([]byte, length) + for i := range out { + out[i] = byte(rand.Intn(max-min+1) + min) + } + return out +} + +func prepareQueryForGetTables(cmd flightsql.GetTables) string { + var b strings.Builder + b.WriteString(`SELECT null AS catalog_name, null AS schema_name, + name AS table_name, type AS table_type FROM sqlite_master WHERE 1=1`) + + if cmd.GetCatalog() != nil { + b.WriteString(" and catalog_name = '") + b.WriteString(*cmd.GetCatalog()) + b.WriteByte('\'') + } + + if cmd.GetDBSchemaFilterPattern() != nil { + b.WriteString(" and schema_name LIKE '") + b.WriteString(*cmd.GetDBSchemaFilterPattern()) + b.WriteByte('\'') + } + + if cmd.GetTableNameFilterPattern() != nil { + b.WriteString(" and table_name LIKE '") + b.WriteString(*cmd.GetTableNameFilterPattern()) + b.WriteByte('\'') + } + + if len(cmd.GetTableTypes()) > 0 { + b.WriteString(" and table_type IN (") + for i, t := range cmd.GetTableTypes() { + if i != 0 { + b.WriteByte(',') + } + fmt.Fprintf(&b, "'%s'", t) + } + b.WriteByte(')') + } + + b.WriteString(" order by table_name") + return b.String() +} + +func prepareQueryForGetKeys(filter string) string { + return `SELECT * FROM ( + SELECT + NULL AS pk_catalog_name, + NULL AS pk_schema_name, + p."table" AS pk_table_name, + p."to" AS pk_column_name, + NULL AS fk_catalog_name, + NULL AS fk_schema_name, + m.name AS fk_table_name, + p."from" AS fk_column_name, + p.seq AS key_sequence, + NULL AS pk_key_name, + NULL AS fk_key_name, + CASE + WHEN p.on_update = 'CASCADE' THEN 0 + WHEN p.on_update = 'RESTRICT' THEN 1 + WHEN p.on_update = 'SET NULL' THEN 2 + WHEN p.on_update = 'NO ACTION' THEN 3 + WHEN p.on_update = 'SET DEFAULT' THEN 4 + END AS update_rule, + CASE + WHEN p.on_delete = 'CASCADE' THEN 0 + WHEN p.on_delete = 'RESTRICT' THEN 1 + WHEN p.on_delete = 'SET NULL' THEN 2 + WHEN p.on_delete = 'NO ACTION' THEN 3 + WHEN p.on_delete = 'SET DEFAULT' THEN 4 + END AS delete_rule + FROM sqlite_master m + JOIN pragma_foreign_key_list(m.name) p ON m.name != p."table" + WHERE m.type = 'table') WHERE ` + filter + + ` ORDER BY pk_catalog_name, pk_schema_name, pk_table_name, pk_key_name, key_sequence` +} + +type Statement struct { + stmt *sql.Stmt + params []interface{} +} + +type SQLiteFlightSQLServer struct { + flightsql.BaseServer + db *sql.DB + + prepared sync.Map +} + +func NewSQLiteFlightSQLServer() (*SQLiteFlightSQLServer, error) { + db, err := sql.Open("sqlite", ":memory:") + if err != nil { + return nil, err + } + + _, err = db.Exec(` + CREATE TABLE foreignTable ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + foreignName varchar(100), + value int); + + CREATE TABLE intTable ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + keyName varchar(100), + value int, + foreignId int references foreignTable(id)); + + INSERT INTO foreignTable (foreignName, value) VALUES ('keyOne', 1); + INSERT INTO foreignTable (foreignName, value) VALUES ('keyTwo', 0); + INSERT INTO foreignTable (foreignName, value) VALUES ('keyThree', -1); + INSERT INTO intTable (keyName, value, foreignId) VALUES ('one', 1, 1); + INSERT INTO intTable (keyName, value, foreignId) VALUES ('zero', 0, 1); + INSERT INTO intTable (keyName, value, foreignId) VALUES ('negative one', -1, 1); + INSERT INTO intTable (keyName, value, foreignId) VALUES (NULL, NULL, NULL); + `) + + if err != nil { + return nil, err + } + ret := &SQLiteFlightSQLServer{db: db} + for k, v := range SqlInfoResultMap() { + ret.RegisterSqlInfo(flightsql.SqlInfo(k), v) + } + return ret, nil +} + +func (s *SQLiteFlightSQLServer) flightInfoForCommand(desc *flight.FlightDescriptor, schema *arrow.Schema) *flight.FlightInfo { + return &flight.FlightInfo{ + Endpoint: []*flight.FlightEndpoint{{Ticket: &flight.Ticket{Ticket: desc.Cmd}}}, + FlightDescriptor: desc, + Schema: flight.SerializeSchema(schema, s.Alloc), + TotalRecords: -1, + TotalBytes: -1, + } +} + +func (s *SQLiteFlightSQLServer) GetFlightInfoStatement(ctx context.Context, cmd flightsql.StatementQuery, desc *flight.FlightDescriptor) (*flight.FlightInfo, error) { + query := cmd.GetQuery() + tkt, err := flightsql.CreateStatementQueryTicket([]byte(query)) + if err != nil { + return nil, err + } + + return &flight.FlightInfo{ + Endpoint: []*flight.FlightEndpoint{{Ticket: &flight.Ticket{Ticket: tkt}}}, + FlightDescriptor: desc, + TotalRecords: -1, + TotalBytes: -1, + }, nil +} + +func (s *SQLiteFlightSQLServer) DoGetStatement(ctx context.Context, cmd flightsql.StatementQueryTicket) (*arrow.Schema, <-chan flight.StreamChunk, error) { + return doGetQuery(ctx, s.Alloc, s.db, string(cmd.GetStatementHandle()), nil) +} + +func (s *SQLiteFlightSQLServer) GetFlightInfoCatalogs(_ context.Context, desc *flight.FlightDescriptor) (*flight.FlightInfo, error) { + return s.flightInfoForCommand(desc, schema_ref.Catalogs), nil +} + +func (s *SQLiteFlightSQLServer) DoGetCatalogs(context.Context) (*arrow.Schema, <-chan flight.StreamChunk, error) { + // sqlite doesn't support catalogs, this returns an empty record batch + schema := schema_ref.Catalogs + + ch := make(chan flight.StreamChunk) + close(ch) + + return schema, ch, nil +} + +func (s *SQLiteFlightSQLServer) GetFlightInfoSchemas(_ context.Context, cmd flightsql.GetDBSchemas, desc *flight.FlightDescriptor) (*flight.FlightInfo, error) { + return s.flightInfoForCommand(desc, schema_ref.DBSchemas), nil +} + +func (s *SQLiteFlightSQLServer) DoGetDBSchemas(context.Context, flightsql.GetDBSchemas) (*arrow.Schema, <-chan flight.StreamChunk, error) { + // sqlite doesn't support schemas, this returns an empty record batch + schema := schema_ref.DBSchemas + + ch := make(chan flight.StreamChunk) + close(ch) + + return schema, ch, nil +} + +func (s *SQLiteFlightSQLServer) GetFlightInfoTables(_ context.Context, cmd flightsql.GetTables, desc *flight.FlightDescriptor) (*flight.FlightInfo, error) { + schema := schema_ref.Tables + if cmd.GetIncludeSchema() { + schema = schema_ref.TablesWithIncludedSchema + } + return s.flightInfoForCommand(desc, schema), nil +} + +func (s *SQLiteFlightSQLServer) DoGetTables(ctx context.Context, cmd flightsql.GetTables) (*arrow.Schema, <-chan flight.StreamChunk, error) { + query := prepareQueryForGetTables(cmd) + + rows, err := s.db.QueryContext(ctx, query) + if err != nil { + return nil, nil, err + } + + var rdr array.RecordReader + + rdr, err = NewSqlBatchReaderWithSchema(s.Alloc, schema_ref.Tables, rows) + if err != nil { + return nil, nil, err + } + + ch := make(chan flight.StreamChunk, 2) + if cmd.GetIncludeSchema() { + rdr, err = NewSqliteTablesSchemaBatchReader(ctx, s.Alloc, rdr, s.db, query) + if err != nil { + return nil, nil, err + } + } + + schema := rdr.Schema() + go flight.StreamChunksFromReader(rdr, ch) + return schema, ch, nil +} + +func (s *SQLiteFlightSQLServer) GetFlightInfoXdbcTypeInfo(_ context.Context, _ flightsql.GetXdbcTypeInfo, desc *flight.FlightDescriptor) (*flight.FlightInfo, error) { + return s.flightInfoForCommand(desc, schema_ref.XdbcTypeInfo), nil +} + +func (s *SQLiteFlightSQLServer) DoGetXdbcTypeInfo(_ context.Context, cmd flightsql.GetXdbcTypeInfo) (*arrow.Schema, <-chan flight.StreamChunk, error) { + var batch arrow.Record + if cmd.GetDataType() == nil { + batch = GetTypeInfoResult(s.Alloc) + } else { + batch = GetFilteredTypeInfoResult(s.Alloc, *cmd.GetDataType()) + } + + ch := make(chan flight.StreamChunk, 1) + ch <- flight.StreamChunk{Data: batch} + close(ch) + return batch.Schema(), ch, nil +} + +func (s *SQLiteFlightSQLServer) GetFlightInfoTableTypes(_ context.Context, desc *flight.FlightDescriptor) (*flight.FlightInfo, error) { + return s.flightInfoForCommand(desc, schema_ref.TableTypes), nil +} + +func (s *SQLiteFlightSQLServer) DoGetTableTypes(ctx context.Context) (*arrow.Schema, <-chan flight.StreamChunk, error) { + query := "SELECT DISTINCT type AS table_type FROM sqlite_master" + return doGetQuery(ctx, s.Alloc, s.db, query, schema_ref.TableTypes) +} + +func (s *SQLiteFlightSQLServer) DoPutCommandStatementUpdate(ctx context.Context, cmd flightsql.StatementUpdate) (int64, error) { + res, err := s.db.ExecContext(ctx, cmd.GetQuery()) + if err != nil { + return 0, err + } + return res.RowsAffected() +} + +func (s *SQLiteFlightSQLServer) CreatePreparedStatement(ctx context.Context, req flightsql.ActionCreatePreparedStatementRequest) (result flightsql.ActionCreatePreparedStatementResult, err error) { + stmt, err := s.db.PrepareContext(ctx, req.GetQuery()) + if err != nil { + return result, err + } + + handle := genRandomString() + s.prepared.Store(string(handle), Statement{stmt: stmt}) + + result.Handle = handle + // no way to get the dataset or parameter schemas from sql.DB + return +} + +func (s *SQLiteFlightSQLServer) ClosePreparedStatement(ctx context.Context, request flightsql.ActionClosePreparedStatementRequest) error { + handle := request.GetPreparedStatementHandle() + if val, loaded := s.prepared.LoadAndDelete(string(handle)); loaded { + stmt := val.(Statement) + return stmt.stmt.Close() + } + + return status.Error(codes.InvalidArgument, "prepared statement not found") +} + +func (s *SQLiteFlightSQLServer) GetFlightInfoPreparedStatement(_ context.Context, cmd flightsql.PreparedStatementQuery, desc *flight.FlightDescriptor) (*flight.FlightInfo, error) { + _, ok := s.prepared.Load(string(cmd.GetPreparedStatementHandle())) + if !ok { + return nil, status.Error(codes.InvalidArgument, "prepared statement not found") + } + + return &flight.FlightInfo{ + Endpoint: []*flight.FlightEndpoint{{Ticket: &flight.Ticket{Ticket: desc.Cmd}}}, + FlightDescriptor: desc, + TotalRecords: -1, + TotalBytes: -1, + }, nil +} + +func doGetQuery(ctx context.Context, mem memory.Allocator, db *sql.DB, query string, schema *arrow.Schema, args ...interface{}) (*arrow.Schema, <-chan flight.StreamChunk, error) { + rows, err := db.QueryContext(ctx, query, args...) + if err != nil { + return nil, nil, err + } + + var rdr *SqlBatchReader + if schema != nil { + rdr, err = NewSqlBatchReaderWithSchema(mem, schema, rows) + } else { + rdr, err = NewSqlBatchReader(mem, rows) + if err == nil { + schema = rdr.schema + } + } + + if err != nil { + return nil, nil, err + } + + ch := make(chan flight.StreamChunk) + go flight.StreamChunksFromReader(rdr, ch) + return schema, ch, nil +} + +func (s *SQLiteFlightSQLServer) DoGetPreparedStatement(ctx context.Context, cmd flightsql.PreparedStatementQuery) (*arrow.Schema, <-chan flight.StreamChunk, error) { + val, ok := s.prepared.Load(string(cmd.GetPreparedStatementHandle())) + if !ok { + return nil, nil, status.Error(codes.InvalidArgument, "prepared statement not found") + } + + stmt := val.(Statement) + rows, err := stmt.stmt.QueryContext(ctx, stmt.params...) + if err != nil { + return nil, nil, err + } + + rdr, err := NewSqlBatchReader(s.Alloc, rows) + if err != nil { + return nil, nil, err + } + + schema := rdr.schema + ch := make(chan flight.StreamChunk) + go flight.StreamChunksFromReader(rdr, ch) + return schema, ch, nil +} + +func getParamsForStatement(rdr flight.MessageReader) (params []interface{}, err error) { + for rdr.Next() { + rec := rdr.Record() + + nrows := int(rec.NumRows()) + ncols := int(rec.NumCols()) + + if len(params) < int(ncols) { + params = make([]interface{}, ncols) + } + + for i := 0; i < nrows; i++ { + for c := 0; c < ncols; c++ { + col := rec.Column(c) + sc, err := scalar.GetScalar(col, i) + if err != nil { + return nil, err + } + if r, ok := sc.(scalar.Releasable); ok { + r.Release() + } + + switch v := sc.(*scalar.DenseUnion).Value.(type) { + case *scalar.Int64: + params[c] = v.Value + case *scalar.Float32: + params[c] = v.Value + case *scalar.Float64: + params[c] = v.Value + case *scalar.String: + params[c] = string(v.Value.Bytes()) + case *scalar.Binary: + params[c] = v.Value.Bytes() + default: + return nil, fmt.Errorf("unsupported type: %s", v) + } + } + } + } + + return params, rdr.Err() +} + +func (s *SQLiteFlightSQLServer) DoPutPreparedStatementQuery(_ context.Context, cmd flightsql.PreparedStatementQuery, rdr flight.MessageReader, _ flight.MetadataWriter) error { + val, ok := s.prepared.Load(string(cmd.GetPreparedStatementHandle())) + if !ok { + return status.Error(codes.InvalidArgument, "prepared statement not found") + } + + stmt := val.(Statement) + args, err := getParamsForStatement(rdr) + if err != nil { + return status.Errorf(codes.Internal, "error gathering parameters for prepared statement query: %s", err.Error()) + } + + stmt.params = args + s.prepared.Store(string(cmd.GetPreparedStatementHandle()), stmt) + return nil +} + +func (s *SQLiteFlightSQLServer) DoPutPreparedStatementUpdate(ctx context.Context, cmd flightsql.PreparedStatementUpdate, rdr flight.MessageReader) (int64, error) { + val, ok := s.prepared.Load(string(cmd.GetPreparedStatementHandle())) + if !ok { + return 0, status.Error(codes.InvalidArgument, "prepared statement not found") + } + + stmt := val.(Statement) + args, err := getParamsForStatement(rdr) + if err != nil { + return 0, status.Errorf(codes.Internal, "error gathering parameters for prepared statement: %s", err.Error()) + } + + stmt.params = args + result, err := stmt.stmt.ExecContext(ctx, args...) + if err != nil { + return 0, err + } + + return result.RowsAffected() +} + +func (s *SQLiteFlightSQLServer) GetFlightInfoPrimaryKeys(_ context.Context, cmd flightsql.TableRef, desc *flight.FlightDescriptor) (*flight.FlightInfo, error) { + return s.flightInfoForCommand(desc, schema_ref.PrimaryKeys), nil +} + +func (s *SQLiteFlightSQLServer) DoGetPrimaryKeys(ctx context.Context, cmd flightsql.TableRef) (*arrow.Schema, <-chan flight.StreamChunk, error) { + // the field key_name can not be recovered by sqlite so it is + // being set to null following the same pattern for catalog name and schema_name + var b strings.Builder + + b.WriteString(` + SELECT null AS catalog_name, null AS schema_name, table_name, name AS column_name, pk AS key_sequence, null as key_name + FROM pragma_table_info(table_name) + JOIN (SELECT null AS catalog_name, null AS schema_name, name AS table_name, type AS table_type + FROM sqlite_master) where 1=1 AND pk !=0`) + + if cmd.Catalog != nil { + fmt.Fprintf(&b, " and catalog_name LIKE '%s'", *cmd.Catalog) + } + if cmd.DBSchema != nil { + fmt.Fprintf(&b, " and schema_name LIKE '%s'", *cmd.DBSchema) + } + + fmt.Fprintf(&b, " and table_name LIKE '%s'", cmd.Table) + + return doGetQuery(ctx, s.Alloc, s.db, b.String(), schema_ref.PrimaryKeys) +} + +func (s *SQLiteFlightSQLServer) GetFlightInfoImportedKeys(_ context.Context, _ flightsql.TableRef, desc *flight.FlightDescriptor) (*flight.FlightInfo, error) { + return s.flightInfoForCommand(desc, schema_ref.ImportedKeys), nil +} + +func (s *SQLiteFlightSQLServer) DoGetImportedKeys(ctx context.Context, ref flightsql.TableRef) (*arrow.Schema, <-chan flight.StreamChunk, error) { + filter := "fk_table_name = '" + ref.Table + "'" + if ref.Catalog != nil { + filter += " AND fk_catalog_name = '" + *ref.Catalog + "'" + } + if ref.DBSchema != nil { + filter += " AND fk_schema_name = '" + *ref.DBSchema + "'" + } + query := prepareQueryForGetKeys(filter) + return doGetQuery(ctx, s.Alloc, s.db, query, schema_ref.ImportedKeys) +} + +func (s *SQLiteFlightSQLServer) GetFlightInfoExportedKeys(_ context.Context, _ flightsql.TableRef, desc *flight.FlightDescriptor) (*flight.FlightInfo, error) { + return s.flightInfoForCommand(desc, schema_ref.ExportedKeys), nil +} + +func (s *SQLiteFlightSQLServer) DoGetExportedKeys(ctx context.Context, ref flightsql.TableRef) (*arrow.Schema, <-chan flight.StreamChunk, error) { + filter := "pk_table_name = '" + ref.Table + "'" + if ref.Catalog != nil { + filter += " AND pk_catalog_name = '" + *ref.Catalog + "'" + } + if ref.DBSchema != nil { + filter += " AND pk_schema_name = '" + *ref.DBSchema + "'" + } + query := prepareQueryForGetKeys(filter) + return doGetQuery(ctx, s.Alloc, s.db, query, schema_ref.ExportedKeys) +} + +func (s *SQLiteFlightSQLServer) GetFlightInfoCrossReference(_ context.Context, _ flightsql.CrossTableRef, desc *flight.FlightDescriptor) (*flight.FlightInfo, error) { + return s.flightInfoForCommand(desc, schema_ref.CrossReference), nil +} + +func (s *SQLiteFlightSQLServer) DoGetCrossReference(ctx context.Context, cmd flightsql.CrossTableRef) (*arrow.Schema, <-chan flight.StreamChunk, error) { + pkref := cmd.PKRef + filter := "pk_table_name = '" + pkref.Table + "'" + if pkref.Catalog != nil { + filter += " AND pk_catalog_name = '" + *pkref.Catalog + "'" + } + if pkref.DBSchema != nil { + filter += " AND pk_schema_name = '" + *pkref.DBSchema + "'" + } + + fkref := cmd.FKRef + filter += " AND fk_table_name = '" + fkref.Table + "'" + if fkref.Catalog != nil { + filter += " AND fk_catalog_name = '" + *fkref.Catalog + "'" + } + if fkref.DBSchema != nil { + filter += " AND fk_schema_name = '" + *fkref.DBSchema + "'" + } + query := prepareQueryForGetKeys(filter) + return doGetQuery(ctx, s.Alloc, s.db, query, schema_ref.ExportedKeys) +} diff --git a/go/arrow/flight/flightsql/example/sqlite_tables_schema_batch_reader.go b/go/arrow/flight/flightsql/example/sqlite_tables_schema_batch_reader.go new file mode 100644 index 0000000000000..851b301c7482f --- /dev/null +++ b/go/arrow/flight/flightsql/example/sqlite_tables_schema_batch_reader.go @@ -0,0 +1,203 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build go1.17 +// +build go1.17 + +package example + +import ( + "context" + "database/sql" + "strings" + "sync/atomic" + + "github.com/apache/arrow/go/v10/arrow" + "github.com/apache/arrow/go/v10/arrow/array" + "github.com/apache/arrow/go/v10/arrow/flight" + "github.com/apache/arrow/go/v10/arrow/flight/flightsql" + "github.com/apache/arrow/go/v10/arrow/internal/debug" + "github.com/apache/arrow/go/v10/arrow/memory" + sqlite3 "modernc.org/sqlite/lib" +) + +type SqliteTablesSchemaBatchReader struct { + refCount int64 + + mem memory.Allocator + ctx context.Context + rdr array.RecordReader + stmt *sql.Stmt + schemaBldr *array.BinaryBuilder + record arrow.Record + err error +} + +func NewSqliteTablesSchemaBatchReader(ctx context.Context, mem memory.Allocator, rdr array.RecordReader, db *sql.DB, mainQuery string) (*SqliteTablesSchemaBatchReader, error) { + schemaQuery := `SELECT table_name, name, type, [notnull] + FROM pragma_table_info(table_name) + JOIN (` + mainQuery + `) WHERE table_name = ?` + + stmt, err := db.PrepareContext(ctx, schemaQuery) + if err != nil { + rdr.Release() + return nil, err + } + + return &SqliteTablesSchemaBatchReader{ + refCount: 1, + ctx: ctx, + rdr: rdr, + stmt: stmt, + mem: mem, + schemaBldr: array.NewBinaryBuilder(mem, arrow.BinaryTypes.Binary), + }, nil +} + +func (s *SqliteTablesSchemaBatchReader) Err() error { return s.err } + +func (s *SqliteTablesSchemaBatchReader) Retain() { atomic.AddInt64(&s.refCount, 1) } + +func (s *SqliteTablesSchemaBatchReader) Release() { + debug.Assert(atomic.LoadInt64(&s.refCount) > 0, "too many releases") + + if atomic.AddInt64(&s.refCount, -1) == 0 { + s.rdr.Release() + s.stmt.Close() + s.schemaBldr.Release() + if s.record != nil { + s.record.Release() + s.record = nil + } + } +} + +func (s *SqliteTablesSchemaBatchReader) Schema() *arrow.Schema { + fields := append(s.rdr.Schema().Fields(), + arrow.Field{Name: "table_schema", Type: arrow.BinaryTypes.Binary}) + return arrow.NewSchema(fields, nil) +} + +func (s *SqliteTablesSchemaBatchReader) Record() arrow.Record { return s.record } + +func getSqlTypeFromTypeName(sqltype string) int { + if sqltype == "" { + return sqlite3.SQLITE_NULL + } + + sqltype = strings.ToLower(sqltype) + + if strings.HasPrefix(sqltype, "varchar") || strings.HasPrefix(sqltype, "char") { + return sqlite3.SQLITE_TEXT + } + + switch sqltype { + case "int", "integer": + return sqlite3.SQLITE_INTEGER + case "real": + return sqlite3.SQLITE_FLOAT + case "blob": + return sqlite3.SQLITE_BLOB + case "text", "date": + return sqlite3.SQLITE_TEXT + default: + return sqlite3.SQLITE_NULL + } +} + +func getPrecisionFromCol(sqltype int) int { + switch sqltype { + case sqlite3.SQLITE_INTEGER: + return 10 + case sqlite3.SQLITE_FLOAT: + return 15 + } + return 0 +} + +func getColumnMetadata(bldr *flightsql.ColumnMetadataBuilder, sqltype int, table string) arrow.Metadata { + defer bldr.Clear() + + bldr.Scale(15).IsReadOnly(false).IsAutoIncrement(false) + if table != "" { + bldr.TableName(table) + } + switch sqltype { + case sqlite3.SQLITE_TEXT, sqlite3.SQLITE_BLOB: + default: + bldr.Precision(int32(getPrecisionFromCol(sqltype))) + } + + return bldr.Metadata() +} + +func (s *SqliteTablesSchemaBatchReader) Next() bool { + if s.record != nil { + s.record.Release() + s.record = nil + } + + if !s.rdr.Next() { + return false + } + + rec := s.rdr.Record() + tableNameArr := rec.Column(rec.Schema().FieldIndices("table_name")[0]).(*array.String) + + bldr := flightsql.NewColumnMetadataBuilder() + columnFields := make([]arrow.Field, 0) + for i := 0; i < tableNameArr.Len(); i++ { + table := tableNameArr.Value(i) + rows, err := s.stmt.QueryContext(s.ctx, table) + if err != nil { + s.err = err + return false + } + + var tableName, name, typ string + var nn int + for rows.Next() { + if err := rows.Scan(&tableName, &name, &typ, &nn); err != nil { + rows.Close() + s.err = err + return false + } + + columnFields = append(columnFields, arrow.Field{ + Name: name, + Type: getArrowTypeFromString(typ), + Nullable: nn == 1, + Metadata: getColumnMetadata(bldr, getSqlTypeFromTypeName(typ), tableName), + }) + } + + rows.Close() + if rows.Err() != nil { + s.err = rows.Err() + return false + } + val := flight.SerializeSchema(arrow.NewSchema(columnFields, nil), s.mem) + s.schemaBldr.Append(val) + + columnFields = columnFields[:0] + } + + schemaCol := s.schemaBldr.NewArray() + defer schemaCol.Release() + + s.record = array.NewRecord(s.Schema(), append(rec.Columns(), schemaCol), rec.NumRows()) + return true +} diff --git a/go/arrow/flight/flightsql/example/type_info.go b/go/arrow/flight/flightsql/example/type_info.go new file mode 100644 index 0000000000000..dcba42b1f847b --- /dev/null +++ b/go/arrow/flight/flightsql/example/type_info.go @@ -0,0 +1,118 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build go1.17 +// +build go1.17 + +package example + +import ( + "strings" + + "github.com/apache/arrow/go/v10/arrow" + "github.com/apache/arrow/go/v10/arrow/array" + "github.com/apache/arrow/go/v10/arrow/flight/flightsql/schema_ref" + "github.com/apache/arrow/go/v10/arrow/memory" +) + +func GetTypeInfoResult(mem memory.Allocator) arrow.Record { + typeNames, _, _ := array.FromJSON(mem, arrow.BinaryTypes.String, + strings.NewReader(`["bit", "tinyint", "bigint", "longvarbinary", + "varbinary", "text", "longvarchar", "char", + "integer", "smallint", "float", "double", + "numeric", "varchar", "date", "time", "timestamp"]`)) + defer typeNames.Release() + + dataType, _, _ := array.FromJSON(mem, arrow.PrimitiveTypes.Int32, + strings.NewReader(`[-7, -6, -5, -4, -3, -1, -1, 1, 4, 5, 6, 8, 8, 12, 91, 92, 93]`)) + defer dataType.Release() + + columnSize, _, _ := array.FromJSON(mem, arrow.PrimitiveTypes.Int32, + strings.NewReader(`[1, 3, 19, 65536, 255, 65536, 65536, 255, 9, 5, 7, 15, 15, 255, 10, 8, 32]`)) + defer columnSize.Release() + + literalPrefix, _, _ := array.FromJSON(mem, arrow.BinaryTypes.String, + strings.NewReader(`[null, null, null, null, null, "'", "'", "'", null, null, null, null, null, "'" ,"'", "'", "'"]`)) + defer literalPrefix.Release() + + literalSuffix, _, _ := array.FromJSON(mem, arrow.BinaryTypes.String, + strings.NewReader(`[null, null, null, null, null, "'", "'", "'", null, null, null, null, null, "'" ,"'", "'", "'"]`)) + defer literalSuffix.Release() + + createParams, _, _ := array.FromJSON(mem, arrow.ListOfField(arrow.Field{Name: "item", Type: arrow.BinaryTypes.String, Nullable: false}), + strings.NewReader(`[[], [], [], [], [], ["length"], ["length"], ["length"], [], [], [], [], [], ["length"], [], [], []]`)) + defer createParams.Release() + + nullable, _, _ := array.FromJSON(mem, arrow.PrimitiveTypes.Int32, + strings.NewReader(`[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]`)) + defer nullable.Release() + + // reference for creating a boolean() array with only zeros + zeroBoolArray, _, err := array.FromJSON(mem, arrow.FixedWidthTypes.Boolean, + strings.NewReader(`[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]`), array.WithUseNumber()) + if err != nil { + panic(err) + } + defer zeroBoolArray.Release() + caseSensitive := zeroBoolArray + + searchable, _, _ := array.FromJSON(mem, arrow.PrimitiveTypes.Int32, + strings.NewReader(`[3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3]`)) + defer searchable.Release() + + unsignedAttribute := zeroBoolArray + fixedPrecScale := zeroBoolArray + autoUniqueVal := zeroBoolArray + + localTypeName := typeNames + + zeroIntArray, _, _ := array.FromJSON(mem, arrow.PrimitiveTypes.Int32, + strings.NewReader(`[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]`)) + defer zeroIntArray.Release() + + minimalScale := zeroIntArray + maximumScale := zeroIntArray + sqlDataType := dataType + sqlDateTimeSub := zeroIntArray + numPrecRadix := zeroIntArray + intervalPrecision := zeroIntArray + + return array.NewRecord(schema_ref.XdbcTypeInfo, []arrow.Array{ + typeNames, dataType, columnSize, literalPrefix, literalSuffix, + createParams, nullable, caseSensitive, searchable, unsignedAttribute, + fixedPrecScale, autoUniqueVal, localTypeName, minimalScale, maximumScale, + sqlDataType, sqlDateTimeSub, numPrecRadix, intervalPrecision}, 17) +} + +func GetFilteredTypeInfoResult(mem memory.Allocator, filter int32) arrow.Record { + batch := GetTypeInfoResult(mem) + defer batch.Release() + + dataTypeVector := []int32{-7, -6, -5, -4, -3, -1, -1, 1, 4, 5, 6, 8, 8, 12, 91, 92, 93} + start, end := -1, -1 + for i, v := range dataTypeVector { + if filter == v { + if start == -1 { + start = i + } + } else if start != -1 && end == -1 { + end = i + break + } + } + + return batch.NewSlice(int64(start), int64(end)) +} diff --git a/go/arrow/flight/flightsql/server.go b/go/arrow/flight/flightsql/server.go index 3823b84c1d2ad..17bc9e188aa9c 100644 --- a/go/arrow/flight/flightsql/server.go +++ b/go/arrow/flight/flightsql/server.go @@ -164,6 +164,10 @@ func (BaseServer) mustEmbedBaseServer() {} // // Once registered, this value will be returned for any SqlInfo requests. func (b *BaseServer) RegisterSqlInfo(id SqlInfo, result interface{}) error { + if b.sqlInfoToResult == nil { + b.sqlInfoToResult = make(SqlInfoResultMap) + } + switch result.(type) { case string, bool, int64, int32, []string, map[int32][]int32: b.sqlInfoToResult[uint32(id)] = result diff --git a/go/arrow/flight/flightsql/sqlite_server_test.go b/go/arrow/flight/flightsql/sqlite_server_test.go new file mode 100644 index 0000000000000..b6e6335700d83 --- /dev/null +++ b/go/arrow/flight/flightsql/sqlite_server_test.go @@ -0,0 +1,783 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build go1.17 +// +build go1.17 + +package flightsql_test + +import ( + "context" + "os" + "strings" + "testing" + + "github.com/apache/arrow/go/v10/arrow" + "github.com/apache/arrow/go/v10/arrow/array" + "github.com/apache/arrow/go/v10/arrow/flight" + "github.com/apache/arrow/go/v10/arrow/flight/flightsql" + "github.com/apache/arrow/go/v10/arrow/flight/flightsql/example" + "github.com/apache/arrow/go/v10/arrow/flight/flightsql/schema_ref" + "github.com/apache/arrow/go/v10/arrow/memory" + "github.com/apache/arrow/go/v10/arrow/scalar" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" + "google.golang.org/protobuf/proto" + sqlite3 "modernc.org/sqlite/lib" +) + +type FlightSqliteServerSuite struct { + suite.Suite + + srv *example.SQLiteFlightSQLServer + s flight.Server + cl *flightsql.Client + + mem *memory.CheckedAllocator +} + +func (s *FlightSqliteServerSuite) getColMetadata(colType int, table string) arrow.Metadata { + bldr := flightsql.NewColumnMetadataBuilder() + bldr.Scale(15).IsReadOnly(false).IsAutoIncrement(false) + if table != "" { + bldr.TableName(table) + } + switch colType { + case sqlite3.SQLITE_TEXT, sqlite3.SQLITE_BLOB: + case sqlite3.SQLITE_INTEGER: + bldr.Precision(10) + case sqlite3.SQLITE_FLOAT: + bldr.Precision(15) + default: + bldr.Precision(0) + } + return bldr.Metadata() +} + +func (s *FlightSqliteServerSuite) SetupTest() { + var err error + s.mem = memory.NewCheckedAllocator(memory.DefaultAllocator) + s.s = flight.NewServerWithMiddleware(nil) + s.srv, err = example.NewSQLiteFlightSQLServer() + s.Require().NoError(err) + s.srv.Alloc = s.mem + + s.s.RegisterFlightService(flightsql.NewFlightServer(s.srv)) + s.s.Init("localhost:0") + s.s.SetShutdownOnSignals(os.Interrupt, os.Kill) + go s.s.Serve() + s.cl, err = flightsql.NewClient(s.s.Addr().String(), nil, nil, dialOpts...) + s.Require().NoError(err) + s.Require().NotNil(s.cl) + s.cl.Alloc = s.mem +} + +func (s *FlightSqliteServerSuite) TearDownTest() { + s.Require().NoError(s.cl.Close()) + s.s.Shutdown() + s.srv = nil + s.mem.AssertSize(s.T(), 0) +} + +func (s *FlightSqliteServerSuite) fromJSON(dt arrow.DataType, json string) arrow.Array { + arr, _, _ := array.FromJSON(s.mem, dt, strings.NewReader(json)) + return arr +} + +func (s *FlightSqliteServerSuite) execCountQuery(query string) int64 { + info, err := s.cl.Execute(context.Background(), query) + s.NoError(err) + + rdr, err := s.cl.DoGet(context.Background(), info.Endpoint[0].Ticket) + s.NoError(err) + defer rdr.Release() + + rec, err := rdr.Read() + s.NoError(err) + return rec.Column(0).(*array.Int64).Value(0) +} + +func (s *FlightSqliteServerSuite) TestCommandStatementQuery() { + ctx := context.Background() + info, err := s.cl.Execute(ctx, "SELECT * FROM intTable") + s.NoError(err) + rdr, err := s.cl.DoGet(ctx, info.Endpoint[0].Ticket) + s.NoError(err) + defer rdr.Release() + + s.True(rdr.Next()) + rec := rdr.Record() + s.NotNil(rec) + + expectedSchema := arrow.NewSchema([]arrow.Field{ + {Name: "id", Type: arrow.PrimitiveTypes.Int64, Metadata: s.getColMetadata(sqlite3.SQLITE_INTEGER, ""), Nullable: true}, + {Name: "keyName", Type: arrow.BinaryTypes.String, Metadata: s.getColMetadata(sqlite3.SQLITE_TEXT, ""), Nullable: true}, + {Name: "value", Type: arrow.PrimitiveTypes.Int64, Metadata: s.getColMetadata(sqlite3.SQLITE_INTEGER, ""), Nullable: true}, + {Name: "foreignId", Type: arrow.PrimitiveTypes.Int64, Metadata: s.getColMetadata(sqlite3.SQLITE_INTEGER, ""), Nullable: true}, + }, nil) + + s.Truef(expectedSchema.Equal(rec.Schema()), "expected: %s\ngot: %s", expectedSchema, rec.Schema()) + + idarr := s.fromJSON(arrow.PrimitiveTypes.Int64, `[1, 2, 3, 4]`) + defer idarr.Release() + keyarr := s.fromJSON(arrow.BinaryTypes.String, `["one", "zero", "negative one", null]`) + defer keyarr.Release() + valarr := s.fromJSON(arrow.PrimitiveTypes.Int64, `[1, 0, -1, null]`) + defer valarr.Release() + foreignarr := s.fromJSON(arrow.PrimitiveTypes.Int64, `[1, 1, 1, null]`) + defer foreignarr.Release() + + expectedRec := array.NewRecord(expectedSchema, []arrow.Array{idarr, keyarr, valarr, foreignarr}, 4) + defer expectedRec.Release() + + s.Truef(array.RecordEqual(expectedRec, rec), "expected: %s\ngot: %s", expectedRec, rec) +} + +func (s *FlightSqliteServerSuite) TestCommandGetTables() { + ctx := context.Background() + info, err := s.cl.GetTables(ctx, &flightsql.GetTablesOpts{}) + s.NoError(err) + s.NotNil(info) + + rdr, err := s.cl.DoGet(ctx, info.Endpoint[0].Ticket) + s.NoError(err) + defer rdr.Release() + + catalogName := scalar.MakeArrayOfNull(arrow.BinaryTypes.String, 3, s.mem) + defer catalogName.Release() + schemaName := scalar.MakeArrayOfNull(arrow.BinaryTypes.String, 3, s.mem) + defer schemaName.Release() + + tableName := s.fromJSON(arrow.BinaryTypes.String, `["foreignTable", "intTable", "sqlite_sequence"]`) + defer tableName.Release() + + tableType := s.fromJSON(arrow.BinaryTypes.String, `["table", "table", "table"]`) + defer tableType.Release() + + expectedRec := array.NewRecord(schema_ref.Tables, []arrow.Array{catalogName, schemaName, tableName, tableType}, 3) + defer expectedRec.Release() + + s.True(rdr.Next()) + rec := rdr.Record() + s.NotNil(rec) + rec.Retain() + defer rec.Release() + s.False(rdr.Next()) + + s.Truef(array.RecordEqual(expectedRec, rec), "expected: %s\ngot: %s", expectedRec, rec) +} + +func (s *FlightSqliteServerSuite) TestCommandGetTablesWithTableFilter() { + ctx := context.Background() + info, err := s.cl.GetTables(ctx, &flightsql.GetTablesOpts{ + TableNameFilterPattern: proto.String("int%"), + }) + s.NoError(err) + s.NotNil(info) + + rdr, err := s.cl.DoGet(ctx, info.Endpoint[0].Ticket) + s.NoError(err) + defer rdr.Release() + + catalog := s.fromJSON(arrow.BinaryTypes.String, `[null]`) + schema := s.fromJSON(arrow.BinaryTypes.String, `[null]`) + table := s.fromJSON(arrow.BinaryTypes.String, `["intTable"]`) + tabletype := s.fromJSON(arrow.BinaryTypes.String, `["table"]`) + expected := array.NewRecord(schema_ref.Tables, []arrow.Array{catalog, schema, table, tabletype}, 1) + defer func() { + catalog.Release() + schema.Release() + table.Release() + tabletype.Release() + expected.Release() + }() + + s.True(rdr.Next()) + rec := rdr.Record() + s.NotNil(rec) + rec.Retain() + defer rec.Release() + s.False(rdr.Next()) + s.NoError(rdr.Err()) + + s.Truef(array.RecordEqual(expected, rec), "expected: %s\ngot: %s", expected, rec) +} + +func (s *FlightSqliteServerSuite) TestCommandGetTablesWithTableTypesFilter() { + ctx := context.Background() + info, err := s.cl.GetTables(ctx, &flightsql.GetTablesOpts{ + TableTypes: []string{"index"}, + }) + s.NoError(err) + + rdr, err := s.cl.DoGet(ctx, info.Endpoint[0].Ticket) + s.NoError(err) + defer rdr.Release() + + s.True(schema_ref.Tables.Equal(rdr.Schema()), rdr.Schema().String()) + s.False(rdr.Next()) +} + +func (s *FlightSqliteServerSuite) TestCommandGetTablesWithExistingTableTypeFilter() { + ctx := context.Background() + info, err := s.cl.GetTables(ctx, &flightsql.GetTablesOpts{ + TableTypes: []string{"table"}, + }) + s.NoError(err) + s.NotNil(info) + + rdr, err := s.cl.DoGet(ctx, info.Endpoint[0].Ticket) + s.NoError(err) + defer rdr.Release() + + catalogName := scalar.MakeArrayOfNull(arrow.BinaryTypes.String, 3, s.mem) + defer catalogName.Release() + schemaName := scalar.MakeArrayOfNull(arrow.BinaryTypes.String, 3, s.mem) + defer schemaName.Release() + + tableName := s.fromJSON(arrow.BinaryTypes.String, `["foreignTable", "intTable", "sqlite_sequence"]`) + defer tableName.Release() + + tableType := s.fromJSON(arrow.BinaryTypes.String, `["table", "table", "table"]`) + defer tableType.Release() + + expectedRec := array.NewRecord(schema_ref.Tables, []arrow.Array{catalogName, schemaName, tableName, tableType}, 3) + defer expectedRec.Release() + + s.True(rdr.Next()) + rec := rdr.Record() + s.NotNil(rec) + rec.Retain() + defer rec.Release() + s.False(rdr.Next()) + + s.Truef(array.RecordEqual(expectedRec, rec), "expected: %s\ngot: %s", expectedRec, rec) +} + +func (s *FlightSqliteServerSuite) TestCommandGetTablesWithIncludedSchemas() { + ctx := context.Background() + info, err := s.cl.GetTables(ctx, &flightsql.GetTablesOpts{ + TableNameFilterPattern: proto.String("int%"), + IncludeSchema: true, + }) + s.NoError(err) + s.NotNil(info) + + rdr, err := s.cl.DoGet(ctx, info.Endpoint[0].Ticket) + s.NoError(err) + defer rdr.Release() + + catalog := s.fromJSON(arrow.BinaryTypes.String, `[null]`) + schema := s.fromJSON(arrow.BinaryTypes.String, `[null]`) + table := s.fromJSON(arrow.BinaryTypes.String, `["intTable"]`) + tabletype := s.fromJSON(arrow.BinaryTypes.String, `["table"]`) + + dbTableName := "intTable" + + tableSchema := arrow.NewSchema([]arrow.Field{ + {Name: "id", Type: arrow.PrimitiveTypes.Int64, + Metadata: s.getColMetadata(sqlite3.SQLITE_INTEGER, dbTableName)}, + {Name: "keyName", Type: arrow.BinaryTypes.String, + Metadata: s.getColMetadata(sqlite3.SQLITE_TEXT, dbTableName)}, + {Name: "value", Type: arrow.PrimitiveTypes.Int64, + Metadata: s.getColMetadata(sqlite3.SQLITE_INTEGER, dbTableName)}, + {Name: "foreignId", Type: arrow.PrimitiveTypes.Int64, + Metadata: s.getColMetadata(sqlite3.SQLITE_INTEGER, dbTableName)}, + }, nil) + schemaBuf := flight.SerializeSchema(tableSchema, s.mem) + binaryBldr := array.NewBinaryBuilder(s.mem, arrow.BinaryTypes.Binary) + binaryBldr.Append(schemaBuf) + schemaCol := binaryBldr.NewArray() + + expected := array.NewRecord(schema_ref.TablesWithIncludedSchema, []arrow.Array{catalog, schema, table, tabletype, schemaCol}, 1) + defer func() { + catalog.Release() + schema.Release() + table.Release() + tabletype.Release() + binaryBldr.Release() + schemaCol.Release() + expected.Release() + }() + + s.True(rdr.Next()) + rec := rdr.Record() + s.NotNil(rec) + rec.Retain() + defer rec.Release() + s.False(rdr.Next()) + s.NoError(rdr.Err()) + + s.Truef(array.RecordEqual(expected, rec), "expected: %s\ngot: %s", expected, rec) +} + +func (s *FlightSqliteServerSuite) TestCommandGetTypeInfo() { + ctx := context.Background() + info, err := s.cl.GetXdbcTypeInfo(ctx, nil) + s.NoError(err) + rdr, err := s.cl.DoGet(ctx, info.Endpoint[0].Ticket) + s.NoError(err) + defer rdr.Release() + + expected := example.GetTypeInfoResult(s.mem) + defer expected.Release() + + s.True(rdr.Next()) + rec := rdr.Record() + s.Truef(array.RecordEqual(expected, rec), "expected: %s\ngot: %s", expected, rec) + s.False(rdr.Next()) +} + +func (s *FlightSqliteServerSuite) TestCommandGetTypeInfoFiltered() { + ctx := context.Background() + info, err := s.cl.GetXdbcTypeInfo(ctx, proto.Int32(-4)) + s.NoError(err) + rdr, err := s.cl.DoGet(ctx, info.Endpoint[0].Ticket) + s.NoError(err) + defer rdr.Release() + + expected := example.GetFilteredTypeInfoResult(s.mem, -4) + defer expected.Release() + + s.True(rdr.Next()) + rec := rdr.Record() + s.Truef(array.RecordEqual(expected, rec), "expected: %s\ngot: %s", expected, rec) + s.False(rdr.Next()) +} + +func (s *FlightSqliteServerSuite) TestCommandGetCatalogs() { + ctx := context.Background() + info, err := s.cl.GetCatalogs(ctx) + s.NoError(err) + rdr, err := s.cl.DoGet(ctx, info.Endpoint[0].Ticket) + s.NoError(err) + defer rdr.Release() + + s.True(rdr.Schema().Equal(schema_ref.Catalogs), rdr.Schema().String()) + s.False(rdr.Next()) +} + +func (s *FlightSqliteServerSuite) TestCommandGetDbSchemas() { + ctx := context.Background() + info, err := s.cl.GetDBSchemas(ctx, &flightsql.GetDBSchemasOpts{}) + s.NoError(err) + rdr, err := s.cl.DoGet(ctx, info.Endpoint[0].Ticket) + s.NoError(err) + defer rdr.Release() + + s.True(rdr.Schema().Equal(schema_ref.DBSchemas), rdr.Schema().String()) + s.False(rdr.Next()) +} + +func (s *FlightSqliteServerSuite) TestCommandGetTableTypes() { + ctx := context.Background() + info, err := s.cl.GetTableTypes(ctx) + s.NoError(err) + rdr, err := s.cl.DoGet(ctx, info.Endpoint[0].Ticket) + s.NoError(err) + defer rdr.Release() + + expected := s.fromJSON(arrow.BinaryTypes.String, `["table"]`) + defer expected.Release() + expectedRec := array.NewRecord(schema_ref.TableTypes, []arrow.Array{expected}, 1) + defer expectedRec.Release() + + s.True(rdr.Next()) + rec := rdr.Record() + s.Truef(array.RecordEqual(expectedRec, rec), "expected: %s\ngot: %s", expected, rec) + s.False(rdr.Next()) +} + +func (s *FlightSqliteServerSuite) TestCommandStatementUpdate() { + ctx := context.Background() + result, err := s.cl.ExecuteUpdate(ctx, `INSERT INTO intTable (keyName, value) VALUES + ('KEYNAME1', 1001), ('KEYNAME2', 1002), ('KEYNAME3', 1003)`) + s.NoError(err) + s.EqualValues(3, result) + + result, err = s.cl.ExecuteUpdate(ctx, `UPDATE intTable SET keyName = 'KEYNAME1' + WHERE keyName = 'KEYNAME2' OR keyName = 'KEYNAME3'`) + s.NoError(err) + s.EqualValues(2, result) + + result, err = s.cl.ExecuteUpdate(ctx, `DELETE FROM intTable WHERE keyName = 'KEYNAME1'`) + s.NoError(err) + s.EqualValues(3, result) +} + +func (s *FlightSqliteServerSuite) TestCommandPreparedStatementQuery() { + ctx := context.Background() + prep, err := s.cl.Prepare(ctx, s.mem, "SELECT * FROM intTable") + s.NoError(err) + defer prep.Close(ctx) + + info, err := prep.Execute(ctx) + s.NoError(err) + rdr, err := s.cl.DoGet(ctx, info.Endpoint[0].Ticket) + s.NoError(err) + + expectedSchema := arrow.NewSchema([]arrow.Field{ + {Name: "id", Type: arrow.PrimitiveTypes.Int64, Metadata: s.getColMetadata(sqlite3.SQLITE_INTEGER, ""), Nullable: true}, + {Name: "keyName", Type: arrow.BinaryTypes.String, Metadata: s.getColMetadata(sqlite3.SQLITE_TEXT, ""), Nullable: true}, + {Name: "value", Type: arrow.PrimitiveTypes.Int64, Metadata: s.getColMetadata(sqlite3.SQLITE_INTEGER, ""), Nullable: true}, + {Name: "foreignId", Type: arrow.PrimitiveTypes.Int64, Metadata: s.getColMetadata(sqlite3.SQLITE_INTEGER, ""), Nullable: true}}, nil) + + idArr := s.fromJSON(arrow.PrimitiveTypes.Int64, `[1, 2, 3, 4]`) + defer idArr.Release() + keyNameArr := s.fromJSON(arrow.BinaryTypes.String, `["one", "zero", "negative one", null]`) + defer keyNameArr.Release() + valueArr := s.fromJSON(arrow.PrimitiveTypes.Int64, `[1, 0, -1, null]`) + defer valueArr.Release() + foreignIdArr := s.fromJSON(arrow.PrimitiveTypes.Int64, `[1, 1, 1, null]`) + defer foreignIdArr.Release() + + expected := array.NewRecord(expectedSchema, []arrow.Array{idArr, keyNameArr, valueArr, foreignIdArr}, 4) + defer expected.Release() + + s.True(rdr.Next()) + rec := rdr.Record() + s.Truef(array.RecordEqual(expected, rec), "expected: %s\ngot: %s", expected, rec) + s.False(rdr.Next()) +} + +func (s *FlightSqliteServerSuite) TestCommandPreparedStatementQueryWithParams() { + ctx := context.Background() + stmt, err := s.cl.Prepare(ctx, s.mem, "SELECT * FROM intTable WHERE keyName LIKE ?") + s.NoError(err) + defer stmt.Close(ctx) + + typeIDs := s.fromJSON(arrow.PrimitiveTypes.Int8, "[0]") + offsets := s.fromJSON(arrow.PrimitiveTypes.Int32, "[0]") + strArray := s.fromJSON(arrow.BinaryTypes.String, `["%one"]`) + bytesArr := s.fromJSON(arrow.BinaryTypes.Binary, "[]") + bigintArr := s.fromJSON(arrow.PrimitiveTypes.Int64, "[]") + dblArr := s.fromJSON(arrow.PrimitiveTypes.Float64, "[]") + paramArr, _ := array.NewDenseUnionFromArraysWithFields(typeIDs, + offsets, []arrow.Array{strArray, bytesArr, bigintArr, dblArr}, + []string{"string", "bytes", "bigint", "double"}) + batch := array.NewRecord(arrow.NewSchema([]arrow.Field{ + {Name: "parameter_1", Type: paramArr.DataType()}}, nil), + []arrow.Array{paramArr}, 1) + defer func() { + typeIDs.Release() + offsets.Release() + strArray.Release() + bytesArr.Release() + bigintArr.Release() + dblArr.Release() + paramArr.Release() + batch.Release() + }() + + stmt.SetParameters(batch) + info, err := stmt.Execute(ctx) + s.NoError(err) + rdr, err := s.cl.DoGet(ctx, info.Endpoint[0].Ticket) + s.NoError(err) + + expectedSchema := arrow.NewSchema([]arrow.Field{ + {Name: "id", Type: arrow.PrimitiveTypes.Int64, Metadata: s.getColMetadata(sqlite3.SQLITE_INTEGER, ""), Nullable: true}, + {Name: "keyName", Type: arrow.BinaryTypes.String, Metadata: s.getColMetadata(sqlite3.SQLITE_TEXT, ""), Nullable: true}, + {Name: "value", Type: arrow.PrimitiveTypes.Int64, Metadata: s.getColMetadata(sqlite3.SQLITE_INTEGER, ""), Nullable: true}, + {Name: "foreignId", Type: arrow.PrimitiveTypes.Int64, Metadata: s.getColMetadata(sqlite3.SQLITE_INTEGER, ""), Nullable: true}}, nil) + + idArr := s.fromJSON(arrow.PrimitiveTypes.Int64, `[1, 3]`) + defer idArr.Release() + keyNameArr := s.fromJSON(arrow.BinaryTypes.String, `["one", "negative one"]`) + defer keyNameArr.Release() + valueArr := s.fromJSON(arrow.PrimitiveTypes.Int64, `[1, -1]`) + defer valueArr.Release() + foreignIdArr := s.fromJSON(arrow.PrimitiveTypes.Int64, `[1, 1]`) + defer foreignIdArr.Release() + + expected := array.NewRecord(expectedSchema, []arrow.Array{idArr, keyNameArr, valueArr, foreignIdArr}, 2) + defer expected.Release() + + s.True(rdr.Next()) + rec := rdr.Record() + s.Truef(array.RecordEqual(expected, rec), "expected: %s\ngot: %s", expected, rec) + s.False(rdr.Next()) +} + +func (s *FlightSqliteServerSuite) TestCommandPreparedStatementUpdateWithParams() { + ctx := context.Background() + stmt, err := s.cl.Prepare(ctx, s.mem, "INSERT INTO intTable (keyName, value) VALUES ('new_value', ?)") + s.NoError(err) + defer stmt.Close(ctx) + + typeIDs := s.fromJSON(arrow.PrimitiveTypes.Int8, "[2]") + offsets := s.fromJSON(arrow.PrimitiveTypes.Int32, "[0]") + strArray := s.fromJSON(arrow.BinaryTypes.String, "[]") + bytesArr := s.fromJSON(arrow.BinaryTypes.Binary, "[]") + bigintArr := s.fromJSON(arrow.PrimitiveTypes.Int64, "[999]") + dblArr := s.fromJSON(arrow.PrimitiveTypes.Float64, "[]") + paramArr, err := array.NewDenseUnionFromArraysWithFields(typeIDs, + offsets, []arrow.Array{strArray, bytesArr, bigintArr, dblArr}, + []string{"string", "bytes", "bigint", "double"}) + s.NoError(err) + batch := array.NewRecord(arrow.NewSchema([]arrow.Field{ + {Name: "parameter_1", Type: paramArr.DataType()}}, nil), + []arrow.Array{paramArr}, 1) + defer func() { + typeIDs.Release() + offsets.Release() + strArray.Release() + bytesArr.Release() + bigintArr.Release() + dblArr.Release() + paramArr.Release() + batch.Release() + }() + + stmt.SetParameters(batch) + s.EqualValues(4, s.execCountQuery("SELECT COUNT(*) FROM intTable")) + n, err := stmt.ExecuteUpdate(context.Background()) + s.NoError(err) + s.EqualValues(1, n) + s.EqualValues(5, s.execCountQuery("SELECT COUNT(*) FROM intTable")) + n, err = s.cl.ExecuteUpdate(context.Background(), "DELETE FROM intTable WHERE keyName = 'new_value'") + s.NoError(err) + s.EqualValues(1, n) + s.EqualValues(4, s.execCountQuery("SELECT COUNT(*) FROM intTable")) +} + +func (s *FlightSqliteServerSuite) TestCommandPreparedStatementUpdate() { + ctx := context.Background() + stmt, err := s.cl.Prepare(ctx, s.mem, "INSERT INTO intTable (keyName, value) VALUES ('new_value', 999)") + s.NoError(err) + defer stmt.Close(ctx) + + s.EqualValues(4, s.execCountQuery("SELECT COUNT(*) FROM intTable")) + result, err := stmt.ExecuteUpdate(ctx) + s.NoError(err) + s.EqualValues(1, result) + s.EqualValues(5, s.execCountQuery("SELECT COUNT(*) FROM intTable")) + result, err = s.cl.ExecuteUpdate(ctx, "DELETE FROM intTable WHERE keyName = 'new_value'") + s.NoError(err) + s.EqualValues(1, result) + s.EqualValues(4, s.execCountQuery("SELECT COUNT(*) FROM intTable")) +} + +func (s *FlightSqliteServerSuite) TestCommandGetPrimaryKeys() { + ctx := context.Background() + info, err := s.cl.GetPrimaryKeys(ctx, flightsql.TableRef{Table: "int%"}) + s.NoError(err) + rdr, err := s.cl.DoGet(ctx, info.Endpoint[0].Ticket) + s.NoError(err) + defer rdr.Release() + + bldr := array.NewRecordBuilder(s.mem, schema_ref.PrimaryKeys) + defer bldr.Release() + bldr.Field(0).AppendNull() + bldr.Field(1).AppendNull() + bldr.Field(2).(*array.StringBuilder).Append("intTable") + bldr.Field(3).(*array.StringBuilder).Append("id") + bldr.Field(4).(*array.Int32Builder).Append(1) + bldr.Field(5).AppendNull() + expected := bldr.NewRecord() + defer expected.Release() + + s.True(rdr.Next()) + rec := rdr.Record() + s.Truef(array.RecordEqual(expected, rec), "expected: %s\ngot: %s", expected, rec) + s.False(rdr.Next()) +} + +func (s *FlightSqliteServerSuite) TestCommandGetImportedKeys() { + ctx := context.Background() + info, err := s.cl.GetImportedKeys(ctx, flightsql.TableRef{Table: "intTable"}) + s.NoError(err) + rdr, err := s.cl.DoGet(ctx, info.Endpoint[0].Ticket) + s.NoError(err) + defer rdr.Release() + + bldr := array.NewRecordBuilder(s.mem, schema_ref.ImportedKeys) + defer bldr.Release() + bldr.Field(0).AppendNull() + bldr.Field(1).AppendNull() + bldr.Field(2).(*array.StringBuilder).Append("foreignTable") + bldr.Field(3).(*array.StringBuilder).Append("id") + bldr.Field(4).AppendNull() + bldr.Field(5).AppendNull() + bldr.Field(6).(*array.StringBuilder).Append("intTable") + bldr.Field(7).(*array.StringBuilder).Append("foreignId") + bldr.Field(8).(*array.Int32Builder).Append(0) + bldr.Field(9).AppendNull() + bldr.Field(10).AppendNull() + bldr.Field(11).(*array.Uint8Builder).Append(3) + bldr.Field(12).(*array.Uint8Builder).Append(3) + expected := bldr.NewRecord() + defer expected.Release() + + s.True(rdr.Next()) + rec := rdr.Record() + s.Truef(array.RecordEqual(expected, rec), "expected: %s\ngot: %s", expected, rec) + s.False(rdr.Next()) +} + +func (s *FlightSqliteServerSuite) TestCommandGetExportedKeys() { + ctx := context.Background() + info, err := s.cl.GetExportedKeys(ctx, flightsql.TableRef{Table: "foreignTable"}) + s.NoError(err) + rdr, err := s.cl.DoGet(ctx, info.Endpoint[0].Ticket) + s.NoError(err) + defer rdr.Release() + + bldr := array.NewRecordBuilder(s.mem, schema_ref.ImportedKeys) + defer bldr.Release() + bldr.Field(0).AppendNull() + bldr.Field(1).AppendNull() + bldr.Field(2).(*array.StringBuilder).Append("foreignTable") + bldr.Field(3).(*array.StringBuilder).Append("id") + bldr.Field(4).AppendNull() + bldr.Field(5).AppendNull() + bldr.Field(6).(*array.StringBuilder).Append("intTable") + bldr.Field(7).(*array.StringBuilder).Append("foreignId") + bldr.Field(8).(*array.Int32Builder).Append(0) + bldr.Field(9).AppendNull() + bldr.Field(10).AppendNull() + bldr.Field(11).(*array.Uint8Builder).Append(3) + bldr.Field(12).(*array.Uint8Builder).Append(3) + expected := bldr.NewRecord() + defer expected.Release() + + s.True(rdr.Next()) + rec := rdr.Record() + s.Truef(array.RecordEqual(expected, rec), "expected: %s\ngot: %s", expected, rec) + s.False(rdr.Next()) +} + +func (s *FlightSqliteServerSuite) TestCommandGetCrossRef() { + ctx := context.Background() + info, err := s.cl.GetCrossReference(ctx, + flightsql.TableRef{Table: "foreignTable"}, + flightsql.TableRef{Table: "intTable"}) + s.NoError(err) + rdr, err := s.cl.DoGet(ctx, info.Endpoint[0].Ticket) + s.NoError(err) + defer rdr.Release() + + bldr := array.NewRecordBuilder(s.mem, schema_ref.ImportedKeys) + defer bldr.Release() + bldr.Field(0).AppendNull() + bldr.Field(1).AppendNull() + bldr.Field(2).(*array.StringBuilder).Append("foreignTable") + bldr.Field(3).(*array.StringBuilder).Append("id") + bldr.Field(4).AppendNull() + bldr.Field(5).AppendNull() + bldr.Field(6).(*array.StringBuilder).Append("intTable") + bldr.Field(7).(*array.StringBuilder).Append("foreignId") + bldr.Field(8).(*array.Int32Builder).Append(0) + bldr.Field(9).AppendNull() + bldr.Field(10).AppendNull() + bldr.Field(11).(*array.Uint8Builder).Append(3) + bldr.Field(12).(*array.Uint8Builder).Append(3) + expected := bldr.NewRecord() + defer expected.Release() + + s.True(rdr.Next()) + rec := rdr.Record() + s.Truef(array.RecordEqual(expected, rec), "expected: %s\ngot: %s", expected, rec) + s.False(rdr.Next()) +} + +func validateSqlInfo(t *testing.T, expected interface{}, sc scalar.Scalar) bool { + switch ex := expected.(type) { + case string: + return assert.Equal(t, ex, sc.String()) + case bool: + return assert.Equal(t, ex, sc.(*scalar.Boolean).Value) + case int64: + return assert.Equal(t, ex, sc.(*scalar.Int64).Value) + case int32: + return assert.Equal(t, ex, sc.(*scalar.Int32).Value) + case []string: + arr := sc.(*scalar.List).Value.(*array.String) + assert.EqualValues(t, len(ex), arr.Len()) + for i, v := range ex { + assert.Equal(t, v, arr.Value(i)) + } + case map[int32][]int32: + // map is a list of structs with key and values + structArr := sc.(*scalar.Map).Value.(*array.Struct) + keys := structArr.Field(0).(*array.Int32) + values := structArr.Field(1).(*array.List) + // assert that the map has the right size + assert.EqualValues(t, len(ex), keys.Len()) + + // for each element, match the argument + for i := 0; i < keys.Len(); i++ { + keyScalar, _ := scalar.GetScalar(keys, i) + infoID := keyScalar.(*scalar.Int32).Value + + // assert the key exists + list, ok := ex[infoID] + assert.True(t, ok) + + // assert the int32list is the right size + start, end := values.ValueOffsets(i) + assert.EqualValues(t, len(list), end-start) + + // for each element make sure it matches + for j, v := range list { + listItem, err := scalar.GetScalar(values.ListValues(), int(start)+j) + assert.NoError(t, err) + assert.Equal(t, v, listItem.(*scalar.Int32).Value) + } + } + } + return true +} + +func (s *FlightSqliteServerSuite) TestCommandGetSqlInfo() { + expectedResults := example.SqlInfoResultMap() + infoIDs := make([]flightsql.SqlInfo, 0, len(expectedResults)) + for k := range expectedResults { + infoIDs = append(infoIDs, flightsql.SqlInfo(k)) + } + + ctx := context.Background() + info, err := s.cl.GetSqlInfo(ctx, infoIDs) + s.NoError(err) + rdr, err := s.cl.DoGet(ctx, info.Endpoint[0].Ticket) + s.NoError(err) + defer rdr.Release() + + s.True(rdr.Next()) + rec := rdr.Record() + rec.Retain() + defer rec.Release() + s.False(rdr.Next()) + + s.EqualValues(2, rec.NumCols()) + s.EqualValues(len(expectedResults), rec.NumRows()) + + colName := rec.Column(0).(*array.Uint32) + colValue := rec.Column(1) + for i := 0; i < int(rec.NumRows()); i++ { + expected := expectedResults[colName.Value(i)] + sc, err := scalar.GetScalar(colValue, i) + s.NoError(err) + + s.True(validateSqlInfo(s.T(), expected, sc.(*scalar.DenseUnion).ChildValue())) + + sc.(*scalar.DenseUnion).Release() + } +} + +func TestSqliteServer(t *testing.T) { + suite.Run(t, new(FlightSqliteServerSuite)) +} diff --git a/go/arrow/flight/record_batch_reader.go b/go/arrow/flight/record_batch_reader.go index 75e09f2008f6b..035ba9c4bbeff 100644 --- a/go/arrow/flight/record_batch_reader.go +++ b/go/arrow/flight/record_batch_reader.go @@ -153,12 +153,13 @@ func NewRecordReader(r DataStreamReader, opts ...ipc.Option) (*Reader, error) { return nil, err } - rdr := &Reader{dmr: &dataMessageReader{rdr: r}} + rdr := &Reader{dmr: &dataMessageReader{rdr: r, refCount: 1}} rdr.dmr.descr = data.FlightDescriptor if len(data.DataHeader) > 0 { rdr.dmr.peeked = data } + rdr.dmr.Retain() if rdr.Reader, err = ipc.NewReaderFromMessageReader(rdr.dmr, opts...); err != nil { return nil, fmt.Errorf("arrow/flight: could not create flight reader: %w", err) } @@ -201,6 +202,10 @@ type MessageReader interface { LatestAppMetadata() []byte } +type haserr interface { + Err() error +} + // StreamChunksFromReader is a convenience function to populate a channel // from a record reader. It is intended to be run using a separate goroutine // by calling `go flight.StreamChunksFromReader(rdr, ch)`. @@ -222,4 +227,10 @@ func StreamChunksFromReader(rdr array.RecordReader, ch chan<- StreamChunk) { rec.Retain() ch <- StreamChunk{Data: rec} } + + if e, ok := rdr.(haserr); ok { + if e.Err() != nil { + ch <- StreamChunk{Err: e.Err()} + } + } } diff --git a/go/go.mod b/go/go.mod index 95a5538d0a5f9..c27de3029f9bd 100644 --- a/go/go.mod +++ b/go/go.mod @@ -29,6 +29,7 @@ require ( github.com/klauspost/asmfmt v1.3.2 github.com/klauspost/compress v1.15.9 github.com/kr/pretty v0.1.0 // indirect + github.com/mattn/go-isatty v0.0.14 // indirect github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 github.com/pierrec/lz4/v4 v4.1.15 @@ -36,11 +37,18 @@ require ( github.com/zeebo/xxh3 v1.0.2 golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 - golang.org/x/sys v0.0.0-20220804214406-8e32c043e418 + golang.org/x/sys v0.0.0-20220808155132-1c4a2a72c664 golang.org/x/tools v0.1.12 golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f gonum.org/v1/gonum v0.11.0 google.golang.org/grpc v1.48.0 google.golang.org/protobuf v1.28.1 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect + lukechampine.com/uint128 v1.2.0 // indirect + modernc.org/cc/v3 v3.36.1 // indirect + modernc.org/ccgo/v3 v3.16.8 // indirect + modernc.org/libc v1.16.19 // indirect + modernc.org/opt v0.1.3 // indirect + modernc.org/sqlite v1.18.0 + modernc.org/strutil v1.1.2 // indirect ) diff --git a/go/go.sum b/go/go.sum index 7653c2cc1b917..58e2996e4351f 100644 --- a/go/go.sum +++ b/go/go.sum @@ -32,6 +32,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815 h1:bWDMxwH3px2JBh6AyO7hdCn/PkvCZXii8TGj7sbtEbQ= github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= +github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= +github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= @@ -80,15 +82,20 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/jung-kurt/gofpdf v1.0.0/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes= github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes= +github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs= +github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4= github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= @@ -101,6 +108,11 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= +github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9Y= +github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= +github.com/mattn/go-sqlite3 v1.14.12 h1:TJ1bhYJPV44phC+IMu1u2K/i5RriLTPe+yc68XDJ1Z0= +github.com/mattn/go-sqlite3 v1.14.12/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY= github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI= @@ -115,6 +127,8 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 h1:OdAsTTz6OkFY5QxjkYwrChwuRruF69c169dPK26NUlk= +github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/ruudk/golang-pdf417 v0.0.0-20181029194003-1af4ab5afa58/go.mod h1:6lfFZQK844Gfx8o5WFuvpxWRwnSoipWe/p622j1v06w= github.com/ruudk/golang-pdf417 v0.0.0-20201230142125-a7e3863a1245/go.mod h1:pQAZKsJ8yyVxGRWYNEm9oFB8ieLgKFnamEyDmSA0BRk= @@ -198,6 +212,7 @@ golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -205,11 +220,13 @@ golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210304124612-50617c2ba197/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220804214406-8e32c043e418 h1:9vYwv7OjYaky/tlAeD7C4oC9EsPTlaFl1H2jS++V+ME= -golang.org/x/sys v0.0.0-20220804214406-8e32c043e418/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220808155132-1c4a2a72c664 h1:v1W7bwXHsnLLloWYTVEdvGvA7BHMeBYsPcF0GLDxIRs= +golang.org/x/sys v0.0.0-20220808155132-1c4a2a72c664/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -227,6 +244,7 @@ golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3 golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20190927191325-030b2cf1153e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20201124115921-2c860bdd6e78/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/tools v0.1.9/go.mod h1:nABZi5QlRsZVlzPpHl034qft6wpY4eDcsTt5AaioBiU= golang.org/x/tools v0.1.10/go.mod h1:Uh6Zz+xoGYZom868N8YTex3t7RhtHDBrE8Gzo9bV56E= @@ -287,4 +305,46 @@ gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.1.3/go.mod h1:NgwopIslSNH47DimFoV78dnkksY2EFtX0ajyb3K/las= +lukechampine.com/uint128 v1.1.1/go.mod h1:c4eWIwlEGaxC/+H1VguhU4PHXNWDCDMUlWdIWl2j1gk= +lukechampine.com/uint128 v1.2.0 h1:mBi/5l91vocEN8otkC5bDLhi2KdCticRiwbdB0O+rjI= +lukechampine.com/uint128 v1.2.0/go.mod h1:c4eWIwlEGaxC/+H1VguhU4PHXNWDCDMUlWdIWl2j1gk= +modernc.org/cc/v3 v3.36.0/go.mod h1:NFUHyPn4ekoC/JHeZFfZurN6ixxawE1BnVonP/oahEI= +modernc.org/cc/v3 v3.36.1 h1:CICrjwr/1M4+6OQ4HJZ/AHxjcwe67r5vPUF518MkO8A= +modernc.org/cc/v3 v3.36.1/go.mod h1:NFUHyPn4ekoC/JHeZFfZurN6ixxawE1BnVonP/oahEI= +modernc.org/ccgo/v3 v3.0.0-20220428102840-41399a37e894/go.mod h1:eI31LL8EwEBKPpNpA4bU1/i+sKOwOrQy8D87zWUcRZc= +modernc.org/ccgo/v3 v3.0.0-20220430103911-bc99d88307be/go.mod h1:bwdAnOoaIt8Ax9YdWGjxWsdkPcZyRPHqrOvJxaKAKGw= +modernc.org/ccgo/v3 v3.16.4/go.mod h1:tGtX0gE9Jn7hdZFeU88slbTh1UtCYKusWOoCJuvkWsQ= +modernc.org/ccgo/v3 v3.16.6/go.mod h1:tGtX0gE9Jn7hdZFeU88slbTh1UtCYKusWOoCJuvkWsQ= +modernc.org/ccgo/v3 v3.16.8 h1:G0QNlTqI5uVgczBWfGKs7B++EPwCfXPWGD2MdeKloDs= +modernc.org/ccgo/v3 v3.16.8/go.mod h1:zNjwkizS+fIFDrDjIAgBSCLkWbJuHF+ar3QRn+Z9aws= +modernc.org/ccorpus v1.11.6 h1:J16RXiiqiCgua6+ZvQot4yUuUy8zxgqbqEEUuGPlISk= +modernc.org/ccorpus v1.11.6/go.mod h1:2gEUTrWqdpH2pXsmTM1ZkjeSrUWDpjMu2T6m29L/ErQ= +modernc.org/httpfs v1.0.6 h1:AAgIpFZRXuYnkjftxTAZwMIiwEqAfk8aVB2/oA6nAeM= +modernc.org/httpfs v1.0.6/go.mod h1:7dosgurJGp0sPaRanU53W4xZYKh14wfzX420oZADeHM= +modernc.org/libc v0.0.0-20220428101251-2d5f3daf273b/go.mod h1:p7Mg4+koNjc8jkqwcoFBJx7tXkpj00G77X7A72jXPXA= +modernc.org/libc v1.16.0/go.mod h1:N4LD6DBE9cf+Dzf9buBlzVJndKr/iJHG97vGLHYnb5A= +modernc.org/libc v1.16.1/go.mod h1:JjJE0eu4yeK7tab2n4S1w8tlWd9MxXLRzheaRnAKymU= +modernc.org/libc v1.16.7/go.mod h1:hYIV5VZczAmGZAnG15Vdngn5HSF5cSkbvfz2B7GRuVU= +modernc.org/libc v1.16.17/go.mod h1:hYIV5VZczAmGZAnG15Vdngn5HSF5cSkbvfz2B7GRuVU= +modernc.org/libc v1.16.19 h1:S8flPn5ZeXx6iw/8yNa986hwTQDrY8RXU7tObZuAozo= +modernc.org/libc v1.16.19/go.mod h1:p7Mg4+koNjc8jkqwcoFBJx7tXkpj00G77X7A72jXPXA= +modernc.org/mathutil v1.2.2/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E= +modernc.org/mathutil v1.4.1 h1:ij3fYGe8zBF4Vu+g0oT7mB06r8sqGWKuJu1yXeR4by8= +modernc.org/mathutil v1.4.1/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E= +modernc.org/memory v1.1.1 h1:bDOL0DIDLQv7bWhP3gMvIrnoFw+Eo6F7a2QK9HPDiFU= +modernc.org/memory v1.1.1/go.mod h1:/0wo5ibyrQiaoUoH7f9D8dnglAmILJ5/cxZlRECf+Nw= +modernc.org/opt v0.1.1/go.mod h1:WdSiB5evDcignE70guQKxYUl14mgWtbClRi5wmkkTX0= +modernc.org/opt v0.1.3 h1:3XOZf2yznlhC+ibLltsDGzABUGVx8J6pnFMS3E4dcq4= +modernc.org/opt v0.1.3/go.mod h1:WdSiB5evDcignE70guQKxYUl14mgWtbClRi5wmkkTX0= +modernc.org/sqlite v1.18.0 h1:ef66qJSgKeyLyrF4kQ2RHw/Ue3V89fyFNbGL073aDjI= +modernc.org/sqlite v1.18.0/go.mod h1:B9fRWZacNxJBHoCJZQr1R54zhVn3fjfl0aszflrTSxY= +modernc.org/strutil v1.1.1/go.mod h1:DE+MQQ/hjKBZS2zNInV5hhcipt5rLPWkmpbGeW5mmdw= +modernc.org/strutil v1.1.2 h1:iFBDH6j1Z0bN/Q9udJnnFoFpENA4252qe/7/5woE5MI= +modernc.org/strutil v1.1.2/go.mod h1:OYajnUAcI/MX+XD/Wx7v1bbdvcQSvxgtb0gC+u3d3eg= +modernc.org/tcl v1.13.1 h1:npxzTwFTZYM8ghWicVIX1cRWzj7Nd8i6AqqX2p+IYao= +modernc.org/tcl v1.13.1/go.mod h1:XOLfOwzhkljL4itZkK6T72ckMgvj0BDsnKNdZVUOecw= +modernc.org/token v1.0.0 h1:a0jaWiNMDhDUtqOj09wvjWWAqd3q7WpBulmL9H2egsk= +modernc.org/token v1.0.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM= +modernc.org/z v1.5.1 h1:RTNHdsrOpeoSeOF4FbzTo8gBYByaJ5xT7NgZ9ZqRiJM= +modernc.org/z v1.5.1/go.mod h1:eWFB510QWW5Th9YGZT81s+LwvaAs3Q2yr4sP0rmLkv8= rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4=