From 37a8fa11a511888bfa695d3094927683a5298e94 Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Tue, 9 Aug 2022 14:04:17 -0400 Subject: [PATCH] add lots of comments --- go/arrow/flight/flightsql/client.go | 76 +++++++- go/arrow/flight/flightsql/client_test.go | 4 +- go/arrow/flight/flightsql/column_metadata.go | 26 ++- .../flightsql/schema_ref/reference_schemas.go | 2 + go/arrow/flight/flightsql/server.go | 165 +++++++++++++++++- go/arrow/flight/flightsql/sql_info.go | 2 + go/arrow/flight/flightsql/types.go | 23 ++- go/arrow/flight/record_batch_reader.go | 24 ++- 8 files changed, 291 insertions(+), 31 deletions(-) diff --git a/go/arrow/flight/flightsql/client.go b/go/arrow/flight/flightsql/client.go index e29c36dc7d124..2f57d05484f76 100644 --- a/go/arrow/flight/flightsql/client.go +++ b/go/arrow/flight/flightsql/client.go @@ -32,6 +32,11 @@ import ( "google.golang.org/protobuf/types/known/anypb" ) +// NewClient is a convenience function to automatically construct +// a flight.Client and return a flightsql.Client containing it rather +// than having to manually construct both yourself. It just delegates +// its arguments to flight.NewClientWithMiddleware to create the +// underlying Flight Client. func NewClient(addr string, auth flight.ClientAuthHandler, middleware []flight.ClientMiddleware, opts ...grpc.DialOption) (*Client, error) { cl, err := flight.NewClientWithMiddleware(addr, auth, middleware, opts...) if err != nil { @@ -40,6 +45,8 @@ func NewClient(addr string, auth flight.ClientAuthHandler, middleware []flight.C return &Client{cl}, nil } +// Client wraps a regular Flight RPC Client to provide the FlightSQL +// interface functions and methods. type Client struct { Client flight.Client } @@ -65,7 +72,7 @@ func flightInfoForCommand(ctx context.Context, cl *Client, cmd proto.Message, op if err != nil { return nil, err } - return cl.GetFlightInfo(ctx, desc, opts...) + return cl.getFlightInfo(ctx, desc, opts...) } // Execute executes the desired query on the server and returns a FlightInfo @@ -122,7 +129,7 @@ func (c *Client) GetCatalogs(ctx context.Context, opts ...grpc.CallOption) (*fli // GetDBSchemas requests the list of schemas from the database and // returns a FlightInfo object where the response can be retrieved func (c *Client) GetDBSchemas(ctx context.Context, cmdOpts *GetDBSchemasOpts, opts ...grpc.CallOption) (*flight.FlightInfo, error) { - return flightInfoForCommand(ctx, c, cmdOpts, opts...) + return flightInfoForCommand(ctx, c, (*pb.CommandGetDbSchemas)(cmdOpts), opts...) } // DoGet uses the provided flight ticket to request the stream of data. @@ -142,9 +149,12 @@ func (c *Client) DoGet(ctx context.Context, in *flight.Ticket, opts ...grpc.Call // should be returned, etc.). Returns a FlightInfo object where the response // can be retrieved. func (c *Client) GetTables(ctx context.Context, reqOptions *GetTablesOpts, opts ...grpc.CallOption) (*flight.FlightInfo, error) { - return flightInfoForCommand(ctx, c, reqOptions, opts...) + return flightInfoForCommand(ctx, c, (*pb.CommandGetTables)(reqOptions), opts...) } +// GetPrimaryKeys requests the primary keys for a specific table from the +// server, specified using a TableRef. Returns a FlightInfo object where +// the response can be retrieved. func (c *Client) GetPrimaryKeys(ctx context.Context, ref TableRef, opts ...grpc.CallOption) (*flight.FlightInfo, error) { cmd := pb.CommandGetPrimaryKeys{ Catalog: ref.Catalog, @@ -154,6 +164,9 @@ func (c *Client) GetPrimaryKeys(ctx context.Context, ref TableRef, opts ...grpc. return flightInfoForCommand(ctx, c, &cmd, opts...) } +// GetExportedKeys retrieves a description about the foreign key columns +// that reference the primary key columns of the specified table. Returns +// a FlightInfo object where the response can be retrieved. func (c *Client) GetExportedKeys(ctx context.Context, ref TableRef, opts ...grpc.CallOption) (*flight.FlightInfo, error) { cmd := pb.CommandGetExportedKeys{ Catalog: ref.Catalog, @@ -163,6 +176,8 @@ func (c *Client) GetExportedKeys(ctx context.Context, ref TableRef, opts ...grpc return flightInfoForCommand(ctx, c, &cmd, opts...) } +// GetImportedKeys returns the foreign key columns for the specified table. +// Returns a FlightInfo object indicating where the response can be retrieved. func (c *Client) GetImportedKeys(ctx context.Context, ref TableRef, opts ...grpc.CallOption) (*flight.FlightInfo, error) { cmd := pb.CommandGetImportedKeys{ Catalog: ref.Catalog, @@ -172,6 +187,11 @@ func (c *Client) GetImportedKeys(ctx context.Context, ref TableRef, opts ...grpc return flightInfoForCommand(ctx, c, &cmd, opts...) } +// GetCrossReference retrieves a description of the foreign key columns +// in the specified ForeignKey table that reference the primary key or +// columns representing a restraint of the parent table (could be the same +// or a different table). Returns a FlightInfo object indicating where +// the response can be retrieved with DoGet. func (c *Client) GetCrossReference(ctx context.Context, pkTable, fkTable TableRef, opts ...grpc.CallOption) (*flight.FlightInfo, error) { cmd := pb.CommandGetCrossReference{ PkCatalog: pkTable.Catalog, @@ -184,14 +204,23 @@ func (c *Client) GetCrossReference(ctx context.Context, pkTable, fkTable TableRe return flightInfoForCommand(ctx, c, &cmd, opts...) } +// GetTableTypes requests a list of the types of tables available on this +// server. Returns a FlightInfo object indicating where the response can +// be retrieved. func (c *Client) GetTableTypes(ctx context.Context, opts ...grpc.CallOption) (*flight.FlightInfo, error) { return flightInfoForCommand(ctx, c, &pb.CommandGetTableTypes{}, opts...) } +// GetXdbcTypeInfo requests the information about all the data types supported +// (dataType == nil) or a specific data type. Returns a FlightInfo object +// indicating where the response can be retrieved. func (c *Client) GetXdbcTypeInfo(ctx context.Context, dataType *int32, opts ...grpc.CallOption) (*flight.FlightInfo, error) { return flightInfoForCommand(ctx, c, &pb.CommandGetXdbcTypeInfo{DataType: dataType}, opts...) } +// GetSqlInfo returns a list of the requested SQL information corresponding +// to the values in the info slice. Returns a FlightInfo object indicating +// where the response can be retrieved. func (c *Client) GetSqlInfo(ctx context.Context, info []SqlInfo, opts ...grpc.CallOption) (*flight.FlightInfo, error) { cmd := &pb.CommandGetSqlInfo{Info: make([]uint32, len(info))} @@ -201,6 +230,10 @@ func (c *Client) GetSqlInfo(ctx context.Context, info []SqlInfo, opts ...grpc.Ca return flightInfoForCommand(ctx, c, cmd, opts...) } +// Prepare creates a PreparedStatement object for the specified query. +// The resulting PreparedStatement object should be Closed when no longer +// needed. It will maintain a reference to this Client for use to execute +// and use the specified allocator for any allocations it needs to perform. func (c *Client) Prepare(ctx context.Context, mem memory.Allocator, query string, opts ...grpc.CallOption) (prep *PreparedStatement, err error) { const actionType = CreatePreparedStatementActionType var ( @@ -262,12 +295,20 @@ func (c *Client) Prepare(ctx context.Context, mem memory.Allocator, query string return } -func (c *Client) GetFlightInfo(ctx context.Context, desc *flight.FlightDescriptor, opts ...grpc.CallOption) (*flight.FlightInfo, error) { +func (c *Client) getFlightInfo(ctx context.Context, desc *flight.FlightDescriptor, opts ...grpc.CallOption) (*flight.FlightInfo, error) { return c.Client.GetFlightInfo(ctx, desc, opts...) } +// Close will close the underlying flight Client in use by this flightsql.Client func (c *Client) Close() error { return c.Client.Close() } +// PreparedStatement represents a constructed PreparedStatement on the server +// and maintains a reference to the Client that created it along with the +// prepared statement handle. +// +// If the server returned the Dataset Schema or Parameter Binding schemas +// at creation, they will also be accessible from this object. Close +// should be called when no longer needed. type PreparedStatement struct { client *Client opts []grpc.CallOption @@ -278,6 +319,11 @@ type PreparedStatement struct { closed bool } +// Execute executes the prepared statement on the server and returns a FlightInfo +// indicating where to retrieve the response. If SetParameters has been called +// then the parameter bindings will be sent before execution. +// +// Will error if already closed. func (p *PreparedStatement) Execute(ctx context.Context) (*flight.FlightInfo, error) { if p.closed { return nil, errors.New("arrow/flightsql: prepared statement already closed") @@ -312,9 +358,12 @@ func (p *PreparedStatement) Execute(ctx context.Context) (*flight.FlightInfo, er } } - return p.client.GetFlightInfo(ctx, desc, p.opts...) + return p.client.getFlightInfo(ctx, desc, p.opts...) } +// ExecuteUpdate executes the prepared statement update query on the server +// and returns the number of rows affected. If SetParameters was called, +// the parameter bindings will be sent with the request to execute. func (p *PreparedStatement) ExecuteUpdate(ctx context.Context) (nrecords int64, err error) { if p.closed { return 0, errors.New("arrow/flightsql: prepared statement already closed") @@ -370,9 +419,21 @@ func (p *PreparedStatement) ExecuteUpdate(ctx context.Context) (nrecords int64, return updateResult.GetRecordCount(), nil } -func (p *PreparedStatement) DatasetSchema() *arrow.Schema { return p.datasetSchema } +// DatasetSchema may be nil if the server did not return it when creating the +// Prepared Statement. +func (p *PreparedStatement) DatasetSchema() *arrow.Schema { return p.datasetSchema } + +// ParameterSchema may be nil if the server did not return it when creating +// the prepared statement. func (p *PreparedStatement) ParameterSchema() *arrow.Schema { return p.paramSchema } +// SetParameters takes a record batch to send as the parameter bindings when +// executing. It should match the schema from ParameterSchema. +// +// This will call Retain on the record to ensure it doesn't get released +// out from under the statement. Release will be called on a previous +// binding record if it existed, and will be called upon calling Close +// on the PreparedStatement. func (p *PreparedStatement) SetParameters(binding arrow.Record) { if p.paramBinding != nil { p.paramBinding.Release() @@ -382,6 +443,9 @@ func (p *PreparedStatement) SetParameters(binding arrow.Record) { p.paramBinding.Retain() } +// Close calls release on any parameter binding record and sends +// a ClosePreparedStatement action to the server. After calling +// Close, the PreparedStatement should not be used again. func (p *PreparedStatement) Close(ctx context.Context) error { if p.closed { return errors.New("arrow/flightsql: already closed") diff --git a/go/arrow/flight/flightsql/client_test.go b/go/arrow/flight/flightsql/client_test.go index 9582847fc3224..92e468313910c 100644 --- a/go/arrow/flight/flightsql/client_test.go +++ b/go/arrow/flight/flightsql/client_test.go @@ -163,7 +163,7 @@ func (s *FlightSqlClientSuite) TestGetDBSchemas() { desc := getDesc(cmd) s.mockClient.On("GetFlightInfo", desc.Type, desc.Cmd, s.callOpts).Return(&emptyFlightInfo, nil) - info, err := s.sqlClient.GetDBSchemas(context.Background(), cmd, s.callOpts...) + info, err := s.sqlClient.GetDBSchemas(context.Background(), (*flightsql.GetDBSchemasOpts)(cmd), s.callOpts...) s.NoError(err) s.Equal(&emptyFlightInfo, info) } @@ -186,7 +186,7 @@ func (s *FlightSqlClientSuite) TestGetTables() { } desc := getDesc(cmd) s.mockClient.On("GetFlightInfo", desc.Type, desc.Cmd, s.callOpts).Return(&emptyFlightInfo, nil) - info, err := s.sqlClient.GetTables(context.Background(), cmd, s.callOpts...) + info, err := s.sqlClient.GetTables(context.Background(), (*flightsql.GetTablesOpts)(cmd), s.callOpts...) s.NoError(err) s.Equal(&emptyFlightInfo, info) } diff --git a/go/arrow/flight/flightsql/column_metadata.go b/go/arrow/flight/flightsql/column_metadata.go index fc08fcf0ef121..59c133757d5db 100644 --- a/go/arrow/flight/flightsql/column_metadata.go +++ b/go/arrow/flight/flightsql/column_metadata.go @@ -52,32 +52,38 @@ const ( IsSearchableKey = "ARROW:FLIGHT:SQL:IS_SEARCHABLE" ) +// ColumnMetadata is a helper object for managing and querying the +// standard SQL Column metadata using the expected Metadata Keys. +// It can be created by just Wrapping an existing *arrow.Metadata. +// +// Each of the methods return a value and a boolean indicating if it +// was set in the metadata or not. type ColumnMetadata struct { - data *arrow.Metadata + Data *arrow.Metadata } func (c *ColumnMetadata) findStrVal(key string) (string, bool) { - idx := c.data.FindKey(CatalogNameKey) + idx := c.Data.FindKey(CatalogNameKey) if idx == -1 { return "", false } - return c.data.Values()[idx], true + return c.Data.Values()[idx], true } func (c *ColumnMetadata) findBoolVal(key string) (bool, bool) { - idx := c.data.FindKey(CatalogNameKey) + idx := c.Data.FindKey(CatalogNameKey) if idx == -1 { return false, false } - return strToBool(c.data.Values()[idx]), true + return strToBool(c.Data.Values()[idx]), true } func (c *ColumnMetadata) findInt32Val(key string) (int32, bool) { - idx := c.data.FindKey(CatalogNameKey) + idx := c.Data.FindKey(CatalogNameKey) if idx == -1 { return 0, false } - v, err := strconv.ParseInt(c.data.Values()[idx], 10, 32) + v, err := strconv.ParseInt(c.Data.Values()[idx], 10, 32) if err != nil { return 0, false } @@ -112,7 +118,7 @@ func (c *ColumnMetadata) IsAutoIncrement() (bool, bool) { return c.findBoolVal(IsAutoIncrementKey) } -func (c *ColumnMetadata) IsCaseSensitiveKey() (bool, bool) { +func (c *ColumnMetadata) IsCaseSensitive() (bool, bool) { return c.findBoolVal(IsCaseSensitiveKey) } @@ -124,6 +130,10 @@ func (c *ColumnMetadata) IsSearchable() (bool, bool) { return c.findBoolVal(IsSearchableKey) } +// ColumnMetadataBuilder is a convenience builder for constructing +// sql column metadata using the expected standard metadata keys. +// All methods return the builder itself so it can be chained +// to easily construct a final metadata object. type ColumnMetadataBuilder struct { keys, vals []string } diff --git a/go/arrow/flight/flightsql/schema_ref/reference_schemas.go b/go/arrow/flight/flightsql/schema_ref/reference_schemas.go index a40def2e1b2e8..728280b5dee01 100644 --- a/go/arrow/flight/flightsql/schema_ref/reference_schemas.go +++ b/go/arrow/flight/flightsql/schema_ref/reference_schemas.go @@ -14,6 +14,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Package schema_ref contains the expected reference Schemas to be used +// by FlightSQL servers and clients. package schema_ref import "github.com/apache/arrow/go/v10/arrow" diff --git a/go/arrow/flight/flightsql/server.go b/go/arrow/flight/flightsql/server.go index 03ca222282bbf..3823b84c1d2ad 100644 --- a/go/arrow/flight/flightsql/server.go +++ b/go/arrow/flight/flightsql/server.go @@ -34,34 +34,57 @@ import ( "google.golang.org/protobuf/types/known/anypb" ) +// the following interfaces wrap the Protobuf commands to avoid +// exposing the Protobuf types themselves in the API. + +// StatementQuery represents a Sql Query type StatementQuery interface { GetQuery() string } +// StatementUpdate represents a SQL update query type StatementUpdate interface { GetQuery() string } +// StatementQueryTicket represents a request to execute a query type StatementQueryTicket interface { + // GetStatementHandle returns the server-generated opaque + // identifier for the query GetStatementHandle() []byte } +// PreparedStatementQuery represents a prepared query statement type PreparedStatementQuery interface { + // GetPreparedStatementHandle returns the server-generated opaque + // identifier for the statement GetPreparedStatementHandle() []byte } +// PreparedStatementUpdate represents a prepared update statement type PreparedStatementUpdate interface { + // GetPreparedStatementHandle returns the server-generated opaque + // identifier for the statement GetPreparedStatementHandle() []byte } +// ActionClosePreparedStatementRequest represents a request to close +// a prepared statement type ActionClosePreparedStatementRequest interface { + // GetPreparedStatementHandle returns the server-generated opaque + // identifier for the statement GetPreparedStatementHandle() []byte } +// ActionCreatePreparedStatementRequest represents a request to construct +// a new prepared statement type ActionCreatePreparedStatementRequest interface { GetQuery() string } +// ActionCreatePreparedStatementResult is the result of creating a new +// prepared statement, optionally including the dataset and parameter +// schemas. type ActionCreatePreparedStatementResult struct { Handle []byte DatasetSchema *arrow.Schema @@ -74,11 +97,16 @@ type getXdbcTypeInfo struct { func (c *getXdbcTypeInfo) GetDataType() *int32 { return c.DataType } +// GetXdbcTypeInfo represents a request for SQL Data Type information type GetXdbcTypeInfo interface { + // GetDataType returns either nil (get for all types) + // or a specific SQL type ID to fetch information about. GetDataType() *int32 } +// GetSqlInfo represents a request for SQL Information type GetSqlInfo interface { + // GetInfo returns a slice of SqlInfo ids to return information about GetInfo() []uint32 } @@ -89,6 +117,7 @@ type getDBSchemas struct { func (c *getDBSchemas) GetCatalog() *string { return c.Catalog } func (c *getDBSchemas) GetDBSchemaFilterPattern() *string { return c.DbSchemaFilterPattern } +// GetDBSchemas represents a request for list of database schemas type GetDBSchemas interface { GetCatalog() *string GetDBSchemaFilterPattern() *string @@ -102,6 +131,7 @@ func (c *getTables) GetCatalog() *string { return c.Catalog } func (c *getTables) GetDBSchemaFilterPattern() *string { return c.DbSchemaFilterPattern } func (c *getTables) GetTableNameFilterPattern() *string { return c.TableNameFilterPattern } +// GetTables represents a request to list the database's tables type GetTables interface { GetCatalog() *string GetDBSchemaFilterPattern() *string @@ -110,15 +140,33 @@ type GetTables interface { GetIncludeSchema() bool } +// BaseServer must be embedded into any FlightSQL Server implementation +// and provides default implementations of all methods returning an +// unimplemented error if called. This allows consumers to gradually +// implement methods as they want instead of requiring all consumers to +// boilerplate the same "unimplemented" methods. +// +// The base implementation also contains handling for registering sql info +// and serving it up in response to GetSqlInfo requests. type BaseServer struct { sqlInfoToResult SqlInfoResultMap + // Alloc allows specifying a particular allocator to use for any + // allocations done by the base implementation. + // Will use memory.DefaultAlloctor if nil + Alloc memory.Allocator } func (BaseServer) mustEmbedBaseServer() {} -func (b *BaseServer) RegisterSqlInfo(id uint32, result interface{}) error { +// RegisterSqlInfo registers a specific result to return for a given sqlinfo +// id. The result must be one of the following types: string, bool, int64, +// int32, []string, or map[int32][]int32. +// +// Once registered, this value will be returned for any SqlInfo requests. +func (b *BaseServer) RegisterSqlInfo(id SqlInfo, result interface{}) error { switch result.(type) { case string, bool, int64, int32, []string, map[int32][]int32: + b.sqlInfoToResult[uint32(id)] = result default: return fmt.Errorf("invalid sql info type '%T' registered for id: %d", result, id) } @@ -157,13 +205,35 @@ func (BaseServer) DoGetXdbcTypeInfo(context.Context, GetXdbcTypeInfo) (*arrow.Sc return nil, nil, status.Errorf(codes.Unimplemented, "DoGetXdbcTypeInfo not implemented") } -func (BaseServer) GetFlightInfoSqlInfo(context.Context, GetSqlInfo, *flight.FlightDescriptor) (*flight.FlightInfo, error) { - return nil, status.Errorf(codes.Unimplemented, "GetFlightInfoSqlInfo not implemented") +// GetFlightInfoSqlInfo is a base implementation of GetSqlInfo by using any +// registered sqlinfo (by calling RegisterSqlInfo). Will return an error +// if there is no sql info registered, otherwise a FlightInfo for retrieving +// the Sql info. +func (b *BaseServer) GetFlightInfoSqlInfo(_ context.Context, _ GetSqlInfo, desc *flight.FlightDescriptor) (*flight.FlightInfo, error) { + if len(b.sqlInfoToResult) == 0 { + return nil, status.Error(codes.NotFound, "no sql information available") + } + + if b.Alloc == nil { + b.Alloc = memory.DefaultAllocator + } + + return &flight.FlightInfo{ + Endpoint: []*flight.FlightEndpoint{{Ticket: &flight.Ticket{Ticket: desc.Cmd}}}, + FlightDescriptor: desc, + TotalRecords: -1, + TotalBytes: -1, + Schema: flight.SerializeSchema(schema_ref.SqlInfo, b.Alloc), + }, nil } +// DoGetSqlInfo returns a flight stream containing the list of sqlinfo results func (b *BaseServer) DoGetSqlInfo(_ context.Context, cmd GetSqlInfo) (*arrow.Schema, <-chan flight.StreamChunk, error) { - mem := memory.DefaultAllocator - bldr := array.NewRecordBuilder(mem, schema_ref.SqlInfo) + if b.Alloc == nil { + b.Alloc = memory.DefaultAllocator + } + + bldr := array.NewRecordBuilder(b.Alloc, schema_ref.SqlInfo) defer bldr.Release() nameFieldBldr := bldr.Field(0).(*array.Uint32Builder) @@ -197,6 +267,7 @@ func (b *BaseServer) DoGetSqlInfo(_ context.Context, cmd GetSqlInfo) (*arrow.Sch return nil, nil, status.Errorf(codes.Internal, "error producing record response: %s", err.Error()) } + // StreamChunksFromReader will call release on the reader when done go flight.StreamChunksFromReader(rdr, ch) return schema_ref.SqlInfo, ch, nil } @@ -276,48 +347,124 @@ func (BaseServer) DoPutPreparedStatementUpdate(context.Context, PreparedStatemen return 0, status.Error(codes.Unimplemented, "DoPutPreparedStatementUpdate not implemented") } +// Server is the required interface for a FlightSQL server. It is implemented by +// BaseServer which must be embedded in any implementation. The default +// implementation by BaseServer for each of these (except GetSqlInfo) +// +// GetFlightInfo* methods should return the FlightInfo object representing where +// to retrieve the results for a given request. +// +// DoGet* methods should return the Schema of the resulting stream along with +// a channel to retrieve stream chunks (each chunk is a record batch and optionally +// a descriptor and app metadata). The channel will be read from until it +// closes, sending each chunk on the stream. Since the channel is returned +// from the method, it should be populated within a goroutine to ensure +// there are no deadlocks. type Server interface { + // GetFlightInfoStatement returns a FlightInfo for executing the requested sql query GetFlightInfoStatement(context.Context, StatementQuery, *flight.FlightDescriptor) (*flight.FlightInfo, error) + // DoGetStatement returns a stream containing the query results for the + // requested statement handle that was populated by GetFlightInfoStatement DoGetStatement(context.Context, StatementQueryTicket) (*arrow.Schema, <-chan flight.StreamChunk, error) + // GetFlightInfoPreparedStatement returns a FlightInfo for executing an already + // prepared statement with the provided statement handle. GetFlightInfoPreparedStatement(context.Context, PreparedStatementQuery, *flight.FlightDescriptor) (*flight.FlightInfo, error) + // DoGetPreparedStatement returns a stream containing the results from executing + // a prepared statement query with the provided statement handle. DoGetPreparedStatement(context.Context, PreparedStatementQuery) (*arrow.Schema, <-chan flight.StreamChunk, error) + // GetFlightInfoCatalogs returns a FlightInfo for the listing of all catalogs GetFlightInfoCatalogs(context.Context, *flight.FlightDescriptor) (*flight.FlightInfo, error) + // DoGetCatalogs returns the stream containing the list of catalogs DoGetCatalogs(context.Context) (*arrow.Schema, <-chan flight.StreamChunk, error) + // GetFlightInfoXdbcTypeInfo returns a FlightInfo for retrieving data type info GetFlightInfoXdbcTypeInfo(context.Context, GetXdbcTypeInfo, *flight.FlightDescriptor) (*flight.FlightInfo, error) + // DoGetXdbcTypeInfo returns a stream containing the information about the + // requested supported datatypes DoGetXdbcTypeInfo(context.Context, GetXdbcTypeInfo) (*arrow.Schema, <-chan flight.StreamChunk, error) + // GetFlightInfoSqlInfo returns a FlightInfo for retrieving SqlInfo from the server GetFlightInfoSqlInfo(context.Context, GetSqlInfo, *flight.FlightDescriptor) (*flight.FlightInfo, error) + // DoGetSqlInfo returns a stream containing the list of SqlInfo results DoGetSqlInfo(context.Context, GetSqlInfo) (*arrow.Schema, <-chan flight.StreamChunk, error) + // GetFlightInfoSchemas returns a FlightInfo for requesting a list of schemas GetFlightInfoSchemas(context.Context, GetDBSchemas, *flight.FlightDescriptor) (*flight.FlightInfo, error) + // DoGetDBSchemas returns a stream containing the list of schemas DoGetDBSchemas(context.Context, GetDBSchemas) (*arrow.Schema, <-chan flight.StreamChunk, error) + // GetFlightInfoTables returns a FlightInfo for listing the tables available GetFlightInfoTables(context.Context, GetTables, *flight.FlightDescriptor) (*flight.FlightInfo, error) + // DoGetTables returns a stream containing the list of tables DoGetTables(context.Context, GetTables) (*arrow.Schema, <-chan flight.StreamChunk, error) + // GetFlightInfoTableTypes returns a FlightInfo for retrieving a list + // of table types supported GetFlightInfoTableTypes(context.Context, *flight.FlightDescriptor) (*flight.FlightInfo, error) + // DoGetTableTypes returns a stream containing the data related to the table types DoGetTableTypes(context.Context) (*arrow.Schema, <-chan flight.StreamChunk, error) + // GetFlightInfoPrimaryKeys returns a FlightInfo for extracting information about primary keys GetFlightInfoPrimaryKeys(context.Context, TableRef, *flight.FlightDescriptor) (*flight.FlightInfo, error) + // DoGetPrimaryKeys returns a stream containing the data related to primary keys DoGetPrimaryKeys(context.Context, TableRef) (*arrow.Schema, <-chan flight.StreamChunk, error) + // GetFlightInfoExportedKeys returns a FlightInfo for extracting information about foreign keys GetFlightInfoExportedKeys(context.Context, TableRef, *flight.FlightDescriptor) (*flight.FlightInfo, error) + // DoGetExportedKeys returns a stream containing the data related to foreign keys DoGetExportedKeys(context.Context, TableRef) (*arrow.Schema, <-chan flight.StreamChunk, error) + // GetFlightInfoImportedKeys returns a FlightInfo for extracting information about imported keys GetFlightInfoImportedKeys(context.Context, TableRef, *flight.FlightDescriptor) (*flight.FlightInfo, error) + // DoGetImportedKeys returns a stream containing the data related to imported keys DoGetImportedKeys(context.Context, TableRef) (*arrow.Schema, <-chan flight.StreamChunk, error) + // GetFlightInfoCrossReference returns a FlightInfo for extracting data related + // to primary and foreign keys GetFlightInfoCrossReference(context.Context, CrossTableRef, *flight.FlightDescriptor) (*flight.FlightInfo, error) + // DoGetCrossReference returns a stream of data related to foreign and primary keys DoGetCrossReference(context.Context, CrossTableRef) (*arrow.Schema, <-chan flight.StreamChunk, error) + // DoPutCommandStatementUpdate executes a sql update statement and returns + // the number of affected rows DoPutCommandStatementUpdate(context.Context, StatementUpdate) (int64, error) - DoPutPreparedStatementQuery(context.Context, PreparedStatementQuery, flight.MessageReader, flight.MetadataWriter) error - DoPutPreparedStatementUpdate(context.Context, PreparedStatementUpdate, flight.MessageReader) (int64, error) + // CreatePreparedStatement constructs a prepared statement from a sql query + // and returns an opaque statement handle for use. CreatePreparedStatement(context.Context, ActionCreatePreparedStatementRequest) (ActionCreatePreparedStatementResult, error) + // ClosePreparedStatement closes the prepared statement identified by the requested + // opaque statement handle. ClosePreparedStatement(context.Context, ActionClosePreparedStatementRequest) error + // DoPutPreparedStatementQuery binds parameters to a given prepared statement + // identified by the provided statement handle. + // + // The provided MessageReader is a stream of record batches with optional + // app metadata and flight descriptors to represent the values to bind + // to the parameters. + // + // Currently anything written to the writer will be ignored. It is in the + // interface for potential future enhancements to avoid having to change + // the interface in the future. + DoPutPreparedStatementQuery(context.Context, PreparedStatementQuery, flight.MessageReader, flight.MetadataWriter) error + // DoPutPreparedStatementUpdate executes an update SQL Prepared statement + // for the specified statement handle. The reader allows providing a sequence + // of uploaded record batches to bind the parameters to. Returns the number + // of affected records. + DoPutPreparedStatementUpdate(context.Context, PreparedStatementUpdate, flight.MessageReader) (int64, error) mustEmbedBaseServer() } +// NewFlightServer constructs a FlightRPC server from the provided +// FlightSQL Server so that it can be passed to RegisterFlightService. func NewFlightServer(srv Server) flight.FlightServer { return &flightSqlServer{srv: srv, mem: memory.DefaultAllocator} } +// NewFlightServerWithAllocator constructs a FlightRPC server from +// the provided FlightSQL Server so that it can be passed to +// RegisterFlightService, setting the provided allocator into the server +// for use with any allocations necessary by the routing. +// +// Will default to memory.DefaultAllocator if mem is nil func NewFlightServerWithAllocator(srv Server, mem memory.Allocator) flight.FlightServer { + if mem == nil { + mem = memory.DefaultAllocator + } return &flightSqlServer{srv: srv, mem: mem} } +// flightSqlServer is a wrapper around a FlightSQL server interface to +// perform routing from FlightRPC to FlightSQL. type flightSqlServer struct { flight.BaseFlightServer mem memory.Allocator @@ -420,6 +567,10 @@ func (f *flightSqlServer) DoGet(request *flight.Ticket, stream flight.FlightServ defer wr.Close() for chunk := range cc { + if chunk.Err != nil { + return err + } + wr.SetFlightDescriptor(chunk.Desc) if err = wr.WriteWithAppMetadata(chunk.Data, chunk.AppMetadata); err != nil { return err diff --git a/go/arrow/flight/flightsql/sql_info.go b/go/arrow/flight/flightsql/sql_info.go index 03dd438e43502..687f10f6f5af6 100644 --- a/go/arrow/flight/flightsql/sql_info.go +++ b/go/arrow/flight/flightsql/sql_info.go @@ -30,6 +30,8 @@ const ( int32ToInt32ListIdx ) +// sqlInfoResultBldr is a helper for building up the dense union response +// of a SqlInfo request. type sqlInfoResultBldr struct { valueBldr *array.DenseUnionBuilder diff --git a/go/arrow/flight/flightsql/types.go b/go/arrow/flight/flightsql/types.go index 96bcf8073ef15..5e033d00ee322 100644 --- a/go/arrow/flight/flightsql/types.go +++ b/go/arrow/flight/flightsql/types.go @@ -22,6 +22,7 @@ import ( "google.golang.org/protobuf/types/known/anypb" ) +// Constants for Action types const ( CreatePreparedStatementActionType = "CreatePreparedStatement" ClosePreparedStatementActionType = "ClosePreparedStatement" @@ -66,6 +67,9 @@ func impkToTableRef(cmd *pb.CommandGetImportedKeys) TableRef { } } +// CreateStatementQueryTicket is a helper that constructs a properly +// serialized TicketStatementQuery containing a given opaque binary handle +// for use with constructing a ticket to return from GetFlightInfoStatement. func CreateStatementQueryTicket(handle []byte) ([]byte, error) { query := &pb.TicketStatementQuery{StatementHandle: handle} var ticket anypb.Any @@ -75,11 +79,22 @@ func CreateStatementQueryTicket(handle []byte) ([]byte, error) { } type ( - GetDBSchemasOpts = pb.CommandGetDbSchemas - GetTablesOpts = pb.CommandGetTables - + // GetDBSchemasOpts contains the options to request Database Schemas: + // an optional Catalog and a Schema Name filter pattern. + GetDBSchemasOpts pb.CommandGetDbSchemas + // GetTablesOpts contains the options for retrieving a list of tables: + // optional Catalog, Schema filter pattern, Table name filter pattern, + // a filter of table types, and whether or not to include the schema + // in the response. + GetTablesOpts pb.CommandGetTables + + // SqlInfoResultMap is a mapping of SqlInfo ids to the desired response. + // This is part of a Server and used for registering responses to a + // SqlInfo request. SqlInfoResultMap map[uint32]interface{} + // TableRef is a helpful struct for referencing a specific Table + // by its catalog, schema, and table name. TableRef struct { // Catalog specifies the catalog this table belongs to. // An empty string refers to tables without a catalog. @@ -94,6 +109,8 @@ type ( Table string } + // CrossTableRef contains a reference to a Primary Key table + // and a Foreign Key table. CrossTableRef struct { PKRef TableRef FKRef TableRef diff --git a/go/arrow/flight/record_batch_reader.go b/go/arrow/flight/record_batch_reader.go index 14afac41b1322..75e09f2008f6b 100644 --- a/go/arrow/flight/record_batch_reader.go +++ b/go/arrow/flight/record_batch_reader.go @@ -181,6 +181,7 @@ func DeserializeSchema(info []byte, mem memory.Allocator) (*arrow.Schema, error) return rdr.Schema(), nil } +// StreamChunk represents a single chunk of a FlightData stream type StreamChunk struct { Data arrow.Record Desc *FlightDescriptor @@ -188,20 +189,33 @@ type StreamChunk struct { Err error } -type MetadataRecordBatchReader interface { +// MessageReader is an interface representing a RecordReader +// that also provides StreamChunks and/or the ability to retrieve +// FlightDescriptors and AppMetadata from the flight stream +type MessageReader interface { array.RecordReader arrio.Reader Err() error Chunk() StreamChunk -} - -type MessageReader interface { - MetadataRecordBatchReader LatestFlightDescriptor() *FlightDescriptor + LatestAppMetadata() []byte } +// StreamChunksFromReader is a convenience function to populate a channel +// from a record reader. It is intended to be run using a separate goroutine +// by calling `go flight.StreamChunksFromReader(rdr, ch)`. +// +// If the record reader panics, an error chunk will get sent on the channel. +// +// This will close the channel and release the reader when it completes. func StreamChunksFromReader(rdr array.RecordReader, ch chan<- StreamChunk) { defer close(ch) + defer func() { + if err := recover(); err != nil { + ch <- StreamChunk{Err: fmt.Errorf("panic while reading: %s", err)} + } + }() + defer rdr.Release() for rdr.Next() { rec := rdr.Record()