Skip to content

Commit

Permalink
ARROW-17359: [Go][FlightSQL] Create Example with SQLite in-mem and us…
Browse files Browse the repository at this point in the history
…e to test FlightSQL server (apache#13868)

Authored-by: Matt Topol <[email protected]>
Signed-off-by: Matt Topol <[email protected]>
  • Loading branch information
zeroshade authored and ksuarez1423 committed Aug 15, 2022
1 parent 1d1e766 commit 3c9c4c3
Show file tree
Hide file tree
Showing 22 changed files with 2,334 additions and 24 deletions.
2 changes: 1 addition & 1 deletion ci/docker/conda-integration.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ RUN mamba install -q -y \
nodejs=${node} \
yarn \
openjdk=${jdk} \
zlib=1.2.11 && \
zlib=1.2.11 && \
mamba clean --all --force-pkgs-dirs

# Install Rust with only the needed components
Expand Down
7 changes: 7 additions & 0 deletions dev/release/01-prepare-test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,13 @@ def test_version_pre_tag
],
],
},
{
path: "go/arrow/doc.go",
hunks: [
["-const PkgVersion = \"#{@snapshot_version}\"",
"+const PkgVersion = \"#{@release_version}\""],
],
},
{
path: "go/parquet/writer_properties.go",
hunks: [
Expand Down
11 changes: 11 additions & 0 deletions dev/release/post-11-bump-versions-test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,17 @@ def test_version_post_tag
]

Dir.glob("go/**/{go.mod,*.go,*.go.*}") do |path|
if path == "go/arrow/doc.go"
expected_changes << {
path: path,
hunks: [
[
"-const PkgVersion = \"#{@snapshot_version}\"",
"+const PkgVersion = \"#{@next_snapshot_version}\"",
],
]}
next
end
import_path = "github.com/apache/arrow/go/v#{@snapshot_major_version}"
lines = File.readlines(path, chomp: true)
target_lines = lines.grep(/#{Regexp.escape(import_path)}/)
Expand Down
3 changes: 3 additions & 0 deletions dev/release/utils-prepare.sh
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ update_versions() {
sed -i.bak -E -e \
"s/\"parquet-go version .+\"/\"parquet-go version ${version}\"/" \
parquet/writer_properties.go
sed -i.bak -E -e \
"s/const PkgVersion = \".*/const PkgVersion = \"${version}\"/" \
arrow/doc.go
find . -name "*.bak" -exec rm {} \;
git add .
popd
Expand Down
7 changes: 7 additions & 0 deletions go/arrow/array/booleanbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,12 @@ func (b *BooleanBuilder) unmarshalOne(dec *json.Decoder) error {
return err
}
b.Append(val)
case json.Number:
val, err := strconv.ParseBool(v.String())
if err != nil {
return err
}
b.Append(val)
case nil:
b.AppendNull()
default:
Expand All @@ -210,6 +216,7 @@ func (b *BooleanBuilder) unmarshal(dec *json.Decoder) error {

func (b *BooleanBuilder) UnmarshalJSON(data []byte) error {
dec := json.NewDecoder(bytes.NewReader(data))
dec.UseNumber()
t, err := dec.Token()
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions go/arrow/array/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func NewBuilder(mem memory.Allocator, dtype arrow.DataType) Builder {
}
case arrow.LIST:
typ := dtype.(*arrow.ListType)
return NewListBuilder(mem, typ.Elem())
return NewListBuilderWithField(mem, typ.ElemField())
case arrow.STRUCT:
typ := dtype.(*arrow.StructType)
return NewStructBuilder(mem, typ)
Expand All @@ -319,7 +319,7 @@ func NewBuilder(mem memory.Allocator, dtype arrow.DataType) Builder {
return NewDictionaryBuilder(mem, typ)
case arrow.LARGE_LIST:
typ := dtype.(*arrow.LargeListType)
return NewLargeListBuilder(mem, typ.Elem())
return NewLargeListBuilderWithField(mem, typ.ElemField())
case arrow.MAP:
typ := dtype.(*arrow.MapType)
return NewMapBuilder(mem, typ.KeyType(), typ.ItemType(), typ.KeysSorted)
Expand Down
58 changes: 44 additions & 14 deletions go/arrow/array/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,12 +321,31 @@ func NewListBuilder(mem memory.Allocator, etype arrow.DataType) *ListBuilder {
}
}

// NewListBuilderWithField takes a field to use for the child rather than just
// a datatype to allow for more customization.
func NewListBuilderWithField(mem memory.Allocator, field arrow.Field) *ListBuilder {
offsetBldr := NewInt32Builder(mem)
return &ListBuilder{
baseListBuilder{
builder: builder{refCount: 1, mem: mem},
values: NewBuilder(mem, field.Type),
offsets: offsetBldr,
dt: arrow.ListOfField(field),
appendOffsetVal: func(o int) { offsetBldr.Append(int32(o)) },
},
}
}

func (b *baseListBuilder) Type() arrow.DataType {
switch b.dt.ID() {
case arrow.LIST:
return arrow.ListOf(b.values.Type())
case arrow.LARGE_LIST:
return arrow.LargeListOf(b.values.Type())
switch dt := b.dt.(type) {
case *arrow.ListType:
f := dt.ElemField()
f.Type = b.values.Type()
return arrow.ListOfField(f)
case *arrow.LargeListType:
f := dt.ElemField()
f.Type = b.values.Type()
return arrow.LargeListOfField(f)
}
return nil
}
Expand All @@ -346,6 +365,21 @@ func NewLargeListBuilder(mem memory.Allocator, etype arrow.DataType) *LargeListB
}
}

// NewLargeListBuilderWithField takes a field rather than just an element type
// to allow for more customization of the final type of the LargeList Array
func NewLargeListBuilderWithField(mem memory.Allocator, field arrow.Field) *LargeListBuilder {
offsetBldr := NewInt64Builder(mem)
return &LargeListBuilder{
baseListBuilder{
builder: builder{refCount: 1, mem: mem},
values: NewBuilder(mem, field.Type),
offsets: offsetBldr,
dt: arrow.LargeListOfField(field),
appendOffsetVal: func(o int) { offsetBldr.Append(int64(o)) },
},
}
}

// Release decreases the reference count by 1.
// When the reference count goes to zero, the memory is freed.
func (b *baseListBuilder) Release() {
Expand All @@ -356,15 +390,14 @@ func (b *baseListBuilder) Release() {
b.nullBitmap.Release()
b.nullBitmap = nil
}
b.values.Release()
b.offsets.Release()
}

b.values.Release()
b.offsets.Release()
}

func (b *baseListBuilder) appendNextOffset() {
b.appendOffsetVal(b.values.Len())
// b.offsets.Append(int32(b.values.Len()))
}

func (b *baseListBuilder) Append(v bool) {
Expand Down Expand Up @@ -454,9 +487,6 @@ func (b *LargeListBuilder) NewArray() arrow.Array {
// NewListArray creates a List array from the memory buffers used by the builder and resets the ListBuilder
// so it can be used to build a new array.
func (b *ListBuilder) NewListArray() (a *List) {
if b.offsets.Len() != b.length+1 {
b.appendNextOffset()
}
data := b.newData()
a = NewListData(data)
data.Release()
Expand All @@ -466,16 +496,16 @@ func (b *ListBuilder) NewListArray() (a *List) {
// NewLargeListArray creates a List array from the memory buffers used by the builder and resets the LargeListBuilder
// so it can be used to build a new array.
func (b *LargeListBuilder) NewLargeListArray() (a *LargeList) {
if b.offsets.Len() != b.length+1 {
b.appendNextOffset()
}
data := b.newData()
a = NewLargeListData(data)
data.Release()
return
}

func (b *baseListBuilder) newData() (data *Data) {
if b.offsets.Len() != b.length+1 {
b.appendNextOffset()
}
values := b.values.NewArray()
defer values.Release()

Expand Down
2 changes: 2 additions & 0 deletions go/arrow/array/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ func TestListArray(t *testing.T) {
}{
{arrow.LIST, []int32{0, 3, 3, 3, 7}, arrow.ListOf(arrow.PrimitiveTypes.Int32)},
{arrow.LARGE_LIST, []int64{0, 3, 3, 3, 7}, arrow.LargeListOf(arrow.PrimitiveTypes.Int32)},
{arrow.LIST, []int32{0, 3, 3, 3, 7}, arrow.ListOfField(arrow.Field{Name: "item", Type: arrow.PrimitiveTypes.Int32, Nullable: true})},
{arrow.LARGE_LIST, []int64{0, 3, 3, 3, 7}, arrow.LargeListOfField(arrow.Field{Name: "item", Type: arrow.PrimitiveTypes.Int32, Nullable: true})},
}

for _, tt := range tests {
Expand Down
2 changes: 1 addition & 1 deletion go/arrow/array/record_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func TestRecord(t *testing.T) {
{
schema: schema,
cols: nil,
rows: 0,
rows: 0,
},
{
schema: schema,
Expand Down
2 changes: 2 additions & 0 deletions go/arrow/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ array is valid (not null). If the array has no null entries, it is possible to o
*/
package arrow

const PkgVersion = "10.0.0-SNAPSHOT"

//go:generate go run _tools/tmpl/main.go -i -data=numeric.tmpldata type_traits_numeric.gen.go.tmpl type_traits_numeric.gen_test.go.tmpl array/numeric.gen.go.tmpl array/numericbuilder.gen.go.tmpl array/bufferbuilder_numeric.gen.go.tmpl
//go:generate go run _tools/tmpl/main.go -i -data=datatype_numeric.gen.go.tmpldata datatype_numeric.gen.go.tmpl tensor/numeric.gen.go.tmpl tensor/numeric.gen_test.go.tmpl
//go:generate go run _tools/tmpl/main.go -i -data=scalar/numeric.gen.go.tmpldata scalar/numeric.gen.go.tmpl scalar/numeric.gen_test.go.tmpl
Expand Down
7 changes: 5 additions & 2 deletions go/arrow/flight/flightsql/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,15 @@ func NewClient(addr string, auth flight.ClientAuthHandler, middleware []flight.C
if err != nil {
return nil, err
}
return &Client{cl}, nil
return &Client{cl, memory.DefaultAllocator}, nil
}

// Client wraps a regular Flight RPC Client to provide the FlightSQL
// interface functions and methods.
type Client struct {
Client flight.Client

Alloc memory.Allocator
}

func descForCommand(cmd proto.Message) (*flight.FlightDescriptor, error) {
Expand Down Expand Up @@ -141,7 +143,7 @@ func (c *Client) DoGet(ctx context.Context, in *flight.Ticket, opts ...grpc.Call
return nil, err
}

return flight.NewRecordReader(stream)
return flight.NewRecordReader(stream, ipc.WithAllocator(c.Alloc))
}

// GetTables requests a list of tables from the server, with the provided
Expand Down Expand Up @@ -236,6 +238,7 @@ func (c *Client) GetSqlInfo(ctx context.Context, info []SqlInfo, opts ...grpc.Ca
// and use the specified allocator for any allocations it needs to perform.
func (c *Client) Prepare(ctx context.Context, mem memory.Allocator, query string, opts ...grpc.CallOption) (prep *PreparedStatement, err error) {
const actionType = CreatePreparedStatementActionType

var (
cmd, cmdResult anypb.Any
res *pb.Result
Expand Down
5 changes: 5 additions & 0 deletions go/arrow/flight/flightsql/column_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ func NewColumnMetadataBuilder() *ColumnMetadataBuilder {
return &ColumnMetadataBuilder{make([]string, 0), make([]string, 0)}
}

func (c *ColumnMetadataBuilder) Clear() {
c.keys = c.keys[:0]
c.vals = c.vals[:0]
}

func (c *ColumnMetadataBuilder) Build() ColumnMetadata {
md := c.Metadata()
return ColumnMetadata{&md}
Expand Down
Loading

0 comments on commit 3c9c4c3

Please sign in to comment.