Skip to content

Commit

Permalink
GH-38255: [Go][C++] Implement Flight SQL Bulk Ingestion (#38385)
Browse files Browse the repository at this point in the history
### Rationale for this change

It was suggested in the discussion around apache/arrow-adbc#1107 for the Flight SQL ADBC driver that an "Ingest" command would be a helpful addition to the Flight SQL specification. This command would enable a Flight SQL client to provide a FlightData stream to the server without needing to know its SQL syntax, and have that stream loaded into a target table by whichever means the server deems appropriate.

### What changes are included in this PR?

- Format:
  - Add CommandStatementIngest message type to Flight SQL proto definition
  - Add FLIGHT_SQL_SERVER_BULK_INGESTION and FLIGHT_SQL_SERVER_INGEST_TRANSACTIONS_SUPPORTED options for SqlInfo
- Go:
  - Generate pb
  - Server-side implementation
  - Client-side implementation
  - Unit + integration tests
- C++:
  - Server-side implementation
  - Client-side implementation
  - Integration tests

### Are these changes tested?

Yes, see `server_test.go`, `scenario.go`, and `test_integration.cc`.

### Are there any user-facing changes?

Yes, new Flight SQL client and server functionality is being added. Changes are not expected to break existing users.

* Closes: #38255

Lead-authored-by: Joel Lubinitsky <[email protected]>
Co-authored-by: Joel Lubinitsky <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
Signed-off-by: Matt Topol <[email protected]>
  • Loading branch information
3 people authored and kou committed Aug 30, 2024
1 parent 3e03031 commit 4fcdd01
Show file tree
Hide file tree
Showing 8 changed files with 1,440 additions and 589 deletions.
79 changes: 79 additions & 0 deletions arrow/flight/flightsql/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,85 @@ func (c *Client) ExecuteSubstraitUpdate(ctx context.Context, plan SubstraitPlan,
return updateResult.GetRecordCount(), nil
}

// ExecuteIngest is for executing a bulk ingestion and only returns the number of affected rows.
// The provided RecordReader will be retained for the duration of the call, but it is the caller's
// responsibility to release the original reference.
func (c *Client) ExecuteIngest(ctx context.Context, rdr array.RecordReader, reqOptions *ExecuteIngestOpts, opts ...grpc.CallOption) (int64, error) {
var (
err error
desc *flight.FlightDescriptor
stream pb.FlightService_DoPutClient
wr *flight.Writer
res *pb.PutResult
updateResult pb.DoPutUpdateResult
)

cmd := (*pb.CommandStatementIngest)(reqOptions)

// Servers cannot infer defaults for these parameters, so we validate the request to ensure they are set.
if cmd.GetTableDefinitionOptions() == nil {
return 0, fmt.Errorf("cannot ExecuteIngest: invalid ExecuteIngestOpts, TableDefinitionOptions is required")
}
if cmd.GetTable() == "" {
return 0, fmt.Errorf("cannot ExecuteIngest: invalid ExecuteIngestOpts, Table is required")
}

if desc, err = descForCommand(cmd); err != nil {
return 0, err
}

if stream, err = c.Client.DoPut(ctx, opts...); err != nil {
return 0, err
}

wr = flight.NewRecordWriter(stream, ipc.WithAllocator(c.Alloc), ipc.WithSchema(rdr.Schema()))
defer wr.Close()

wr.SetFlightDescriptor(desc)

for rdr.Next() {
rec := rdr.Record()
err = wr.Write(rec)
if err == io.EOF {
// gRPC returns io.EOF if the error was generated by the server.
// The specific error will be retrieved in the server response.
// ref: https://pkg.go.dev/google.golang.org/grpc#ClientStream
break
}
if err != nil {
return 0, err
}
}

if err = rdr.Err(); err != nil {
return 0, err
}

if err = stream.CloseSend(); err != nil {
return 0, err
}

if res, err = stream.Recv(); err != nil {
return 0, err
}

if err = proto.Unmarshal(res.GetAppMetadata(), &updateResult); err != nil {
return 0, err
}

// Drain the stream. If ingestion was successful, no more messages should arrive.
// If there was a failure, the next message contains the error and the DoPutUpdateResult
// we recieved indicates a partial ingestion if the RecordCount is non-zero.
for {
_, err := stream.Recv()
if err == io.EOF {
return updateResult.GetRecordCount(), nil
} else if err != nil {
return updateResult.GetRecordCount(), err
}
}
}

// GetCatalogs requests the list of catalogs from the server and
// returns a flightInfo object where the response can be retrieved
func (c *Client) GetCatalogs(ctx context.Context, opts ...grpc.CallOption) (*flight.FlightInfo, error) {
Expand Down
38 changes: 38 additions & 0 deletions arrow/flight/flightsql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,17 @@ type ActionEndSavepointRequest interface {
GetAction() EndSavepointRequestType
}

// StatementIngest represents a bulk ingestion request
type StatementIngest interface {
GetTableDefinitionOptions() *TableDefinitionOptions
GetTable() string
GetSchema() string
GetCatalog() string
GetTemporary() bool
GetTransactionId() []byte
GetOptions() map[string]string
}

type getXdbcTypeInfo struct {
*pb.CommandGetXdbcTypeInfo
}
Expand Down Expand Up @@ -507,6 +518,10 @@ func (BaseServer) DoPutPreparedStatementUpdate(context.Context, PreparedStatemen
return 0, status.Error(codes.Unimplemented, "DoPutPreparedStatementUpdate not implemented")
}

func (BaseServer) DoPutCommandStatementIngest(context.Context, StatementIngest, flight.MessageReader) (int64, error) {
return 0, status.Error(codes.Unimplemented, "DoPutCommandStatementIngest not implemented")
}

func (BaseServer) BeginTransaction(context.Context, ActionBeginTransactionRequest) ([]byte, error) {
return nil, status.Error(codes.Unimplemented, "BeginTransaction not implemented")
}
Expand Down Expand Up @@ -694,6 +709,9 @@ type Server interface {
GetSessionOptions(context.Context, *flight.GetSessionOptionsRequest) (*flight.GetSessionOptionsResult, error)
// CloseSession closes/invalidates the current server session.
CloseSession(context.Context, *flight.CloseSessionRequest) (*flight.CloseSessionResult, error)
// DoPutCommandStatementIngest executes a bulk ingestion and returns
// the number of affected rows
DoPutCommandStatementIngest(context.Context, StatementIngest, flight.MessageReader) (int64, error)

mustEmbedBaseServer()
}
Expand Down Expand Up @@ -985,6 +1003,26 @@ func (f *flightSqlServer) DoPut(stream flight.FlightService_DoPutServer) error {
return status.Errorf(codes.Internal, "failed to marshal PutResult: %s", err.Error())
}
return stream.Send(out)
case *pb.CommandStatementIngest:
// Even if there was an error, the server may have ingested some records.
// For this reason we send PutResult{recordCount} no matter what, potentially followed by an error
// if there was one.
recordCount, rpcErr := f.srv.DoPutCommandStatementIngest(stream.Context(), cmd, rdr)

result := pb.DoPutUpdateResult{RecordCount: recordCount}
out := &flight.PutResult{}
if out.AppMetadata, err = proto.Marshal(&result); err != nil {
return status.Errorf(codes.Internal, "failed to marshal PutResult: %s", err.Error())
}

// If we fail to send the recordCount, just return an error outright
if err := stream.Send(out); err != nil {
return err
}

// We successfully sent the recordCount.
// Send the error if one occurred in the RPC, otherwise this is nil.
return rpcErr
default:
return status.Error(codes.InvalidArgument, "the defined request is invalid")
}
Expand Down
155 changes: 155 additions & 0 deletions arrow/flight/flightsql/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,19 @@ func (*testServer) CloseSession(ctx context.Context, req *flight.CloseSessionReq
return &flight.CloseSessionResult{Status: flight.CloseSessionResultClosed}, nil
}

func (*testServer) DoPutCommandStatementIngest(ctx context.Context, cmd flightsql.StatementIngest, rdr flight.MessageReader) (int64, error) {
var maxRows int64 = 50
var nRows int64
for rdr.Next() {
rec := rdr.Record()
if nRows+rec.NumRows() > maxRows {
return nRows, fmt.Errorf("ingested rows exceeded maximum of %d", maxRows)
}
nRows += rec.NumRows()
}
return nRows, nil
}

type FlightSqlServerSuite struct {
suite.Suite

Expand All @@ -202,9 +215,16 @@ func (s *FlightSqlServerSuite) SetupTest() {
cl, err := flightsql.NewClient(s.s.Addr().String(), nil, nil, dialOpts...)
s.Require().NoError(err)
s.cl = cl

checked := memory.NewCheckedAllocator(s.cl.Alloc)
s.cl.Alloc = checked
}

func (s *FlightSqlServerSuite) TearDownTest() {
checked, ok := s.cl.Alloc.(*memory.CheckedAllocator)
s.Require().True(ok)
checked.AssertSize(s.T(), 0)

s.Require().NoError(s.cl.Close())
s.cl = nil
}
Expand Down Expand Up @@ -281,6 +301,111 @@ func (s *FlightSqlServerSuite) TestExecutePoll() {
s.Len(poll.GetInfo().Endpoint, 2)
}

func (s *FlightSqlServerSuite) TestExecuteIngestNil() {
// Ingest with nil options errors, but does not panic
nRecords, err := s.cl.ExecuteIngest(context.TODO(), nil, nil)
s.Error(err)
s.Equal(int64(0), nRecords)
}

func (s *FlightSqlServerSuite) TestExecuteIngestInvalid() {
reclist := []arrow.Record{}
rdr, err := array.NewRecordReader(arrow.NewSchema([]arrow.Field{}, nil), reclist)
s.NoError(err)
defer rdr.Release()

// Cannot execute ingest without specifying required options
nRecords, err := s.cl.ExecuteIngest(context.TODO(), rdr, &flightsql.ExecuteIngestOpts{})
s.Error(err)
s.Equal(int64(0), nRecords)
}

func (s *FlightSqlServerSuite) TestExecuteIngest() {
nRecords := 3
nRowsPerRecord := 5
reclist := generateRecords(s.cl.Alloc, nRecords, nRowsPerRecord)
for _, rec := range reclist {
defer rec.Release()
}

rdr, err := array.NewRecordReader(reclist[0].Schema(), reclist)
s.NoError(err)
defer rdr.Release()

nRowsIngested, err := s.cl.ExecuteIngest(
context.TODO(),
rdr,
&flightsql.ExecuteIngestOpts{
TableDefinitionOptions: &flightsql.TableDefinitionOptions{
IfNotExist: flightsql.TableDefinitionOptionsTableNotExistOptionCreate,
IfExists: flightsql.TableDefinitionOptionsTableExistsOptionReplace,
},
Table: "test_table",
},
)
s.NoError(err)

nRowsExpected := int64(nRecords * nRowsPerRecord)
s.Equal(nRowsExpected, nRowsIngested)
}

func (s *FlightSqlServerSuite) TestExecuteIngestWithServerError() {
nRecords := 11 // intentionally exceed maximum number of rows the server can ingest
nRowsPerRecord := 5
reclist := generateRecords(s.cl.Alloc, nRecords, nRowsPerRecord)
for _, rec := range reclist {
defer rec.Release()
}

rdr, err := array.NewRecordReader(reclist[0].Schema(), reclist)
s.NoError(err)
defer rdr.Release()

nRowsIngested, err := s.cl.ExecuteIngest(
context.TODO(),
rdr,
&flightsql.ExecuteIngestOpts{
TableDefinitionOptions: &flightsql.TableDefinitionOptions{
IfNotExist: flightsql.TableDefinitionOptionsTableNotExistOptionCreate,
IfExists: flightsql.TableDefinitionOptionsTableExistsOptionReplace,
},
Table: "test_table",
},
)
s.Error(err)
s.ErrorContains(err, "ingested rows exceeded maximum")

nRowsExpected := int64(50) // max rows the server can ingest
s.Equal(nRowsExpected, nRowsIngested)
}

func generateRecords(alloc memory.Allocator, nRecords, nRowsPerRecord int) []arrow.Record {
schema := arrow.NewSchema(
[]arrow.Field{
{Name: "one", Type: arrow.FixedWidthTypes.Boolean},
{Name: "two", Type: arrow.BinaryTypes.String},
{Name: "three", Type: arrow.PrimitiveTypes.Int64},
},
nil,
)

bldr := array.NewRecordBuilder(alloc, schema)
defer bldr.Release()

var val int
reclist := make([]arrow.Record, nRecords)
for i := 0; i < nRecords; i++ {
for j := 0; j < nRowsPerRecord; j++ {
bldr.Field(0).(*array.BooleanBuilder).Append(val%2 == 0)
bldr.Field(1).(*array.StringBuilder).Append(fmt.Sprint(val))
bldr.Field(2).(*array.Int64Builder).Append(int64(val))
val++
}
reclist[i] = bldr.NewRecord()
}
return reclist
}

type UnimplementedFlightSqlServerSuite struct {
suite.Suite

Expand Down Expand Up @@ -459,6 +584,36 @@ func (s *UnimplementedFlightSqlServerSuite) TestDoGet() {
}
}

func (s *UnimplementedFlightSqlServerSuite) TestExecuteIngest() {
nRecords := 3
nRowsPerRecord := 5
reclist := generateRecords(s.cl.Alloc, nRecords, nRowsPerRecord)
for _, rec := range reclist {
defer rec.Release()
}

rdr, err := array.NewRecordReader(reclist[0].Schema(), reclist)
s.NoError(err)
defer rdr.Release()

info, err := s.cl.ExecuteIngest(
context.TODO(),
rdr,
&flightsql.ExecuteIngestOpts{
TableDefinitionOptions: &flightsql.TableDefinitionOptions{
IfNotExist: flightsql.TableDefinitionOptionsTableNotExistOptionCreate,
IfExists: flightsql.TableDefinitionOptionsTableExistsOptionReplace,
},
Table: "test_table",
},
)
st, ok := status.FromError(err)
s.True(ok)
s.Equal(codes.Unimplemented, st.Code())
s.Equal("DoPutCommandStatementIngest not implemented", st.Message())
s.Zero(info)
}

func (s *UnimplementedFlightSqlServerSuite) TestDoAction() {
prep, err := s.cl.Prepare(context.TODO(), "IRRELEVANT")
s.Nil(prep)
Expand Down
43 changes: 43 additions & 0 deletions arrow/flight/flightsql/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,20 @@ type (
// the substrait release, e.g. "0.23.0"
Version string
}

// ExecuteIngestOpts contains the options for executing a bulk ingestion:
//
// Required:
// - TableDefinitionOptions: Specifies the behavior for creating or updating table definitions
// - Table: The destination table to load into
//
// Optional:
// - Schema: The DB schema containing the destination table
// - Catalog: The catalog containing the destination table
// - Temporary: Use a temporary table as the destination
// - TransactionId: Ingest as part of this transaction
// - Options: Additional, backend-specific options
ExecuteIngestOpts pb.CommandStatementIngest
)

// SqlInfo enum values
Expand Down Expand Up @@ -198,6 +212,18 @@ const (
// If 0, there is no timeout. Servers should reset the timeout when the handle is used in a command.
SqlInfoFlightSqlServerTransactionTimeout = SqlInfo(pb.SqlInfo_FLIGHT_SQL_SERVER_TRANSACTION_TIMEOUT)

// Retrieves a boolean value indicating whether the Flight SQL Server supports executing
// bulk ingestion.
SqlInfoFlightSqlServerBulkIngestion = SqlInfo(pb.SqlInfo_FLIGHT_SQL_SERVER_BULK_INGESTION)
// Retrieves a boolean value indicating whether transactions are supported for bulk ingestion. If not, invoking
// the method commit in the context of a bulk ingestion is a noop, and the isolation level is
// `arrow.flight.protocol.sql.SqlTransactionIsolationLevel.TRANSACTION_NONE`.
//
// Returns:
// - false: if bulk ingestion transactions are unsupported;
// - true: if bulk ingestion transactions are supported.
SqlInfoFlightSqlServerIngestTransactionsSupported = SqlInfo(pb.SqlInfo_FLIGHT_SQL_SERVER_INGEST_TRANSACTIONS_SUPPORTED)

// SQL Syntax Information
// Values [500-1000): provide information about the supported SQL Syntax

Expand Down Expand Up @@ -854,3 +880,20 @@ const (
)

type CreatePreparedStatementResult = pb.ActionCreatePreparedStatementResult

type (
TableDefinitionOptions = pb.CommandStatementIngest_TableDefinitionOptions
TableDefinitionOptionsTableNotExistOption = pb.CommandStatementIngest_TableDefinitionOptions_TableNotExistOption
TableDefinitionOptionsTableExistsOption = pb.CommandStatementIngest_TableDefinitionOptions_TableExistsOption
)

const (
TableDefinitionOptionsTableNotExistOptionUnspecified = pb.CommandStatementIngest_TableDefinitionOptions_TABLE_NOT_EXIST_OPTION_UNSPECIFIED
TableDefinitionOptionsTableNotExistOptionCreate = pb.CommandStatementIngest_TableDefinitionOptions_TABLE_NOT_EXIST_OPTION_CREATE
TableDefinitionOptionsTableNotExistOptionFail = pb.CommandStatementIngest_TableDefinitionOptions_TABLE_NOT_EXIST_OPTION_FAIL

TableDefinitionOptionsTableExistsOptionUnspecified = pb.CommandStatementIngest_TableDefinitionOptions_TABLE_EXISTS_OPTION_UNSPECIFIED
TableDefinitionOptionsTableExistsOptionFail = pb.CommandStatementIngest_TableDefinitionOptions_TABLE_EXISTS_OPTION_FAIL
TableDefinitionOptionsTableExistsOptionAppend = pb.CommandStatementIngest_TableDefinitionOptions_TABLE_EXISTS_OPTION_APPEND
TableDefinitionOptionsTableExistsOptionReplace = pb.CommandStatementIngest_TableDefinitionOptions_TABLE_EXISTS_OPTION_REPLACE
)
Loading

0 comments on commit 4fcdd01

Please sign in to comment.