Skip to content

Commit

Permalink
api: add support of a batch insert request
Browse files Browse the repository at this point in the history
Draft changes: add support the IPROTO_INSERT_ARROW request and message pack type MP_ARROW .

Closes #399
  • Loading branch information
dmyger committed Oct 2, 2024
1 parent 592db69 commit d0ab8ab
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 3 deletions.
7 changes: 4 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
## [Unreleased]

### Added
- Add err log to `ConnectionPool.Add()` in case, when unable to establish
connection and ctx is not canceled;
also added logs for error case of `ConnectionPool.tryConnect()` calls in
- Add err log to `ConnectionPool.Add()` in case, when unable to establish
connection and ctx is not canceled;
also added logs for error case of `ConnectionPool.tryConnect()` calls in
`ConnectionPool.controller()` and `ConnectionPool.reconnect()`
- Methods that are implemented but not included in the pooler interface (#395).
- Implemented stringer methods for pool.Role (#405).
- Support the IPROTO_INSERT_ARROW request (#399).

### Changed

Expand Down
59 changes: 59 additions & 0 deletions arrow/arrow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package arrow

import (
"fmt"
"reflect"

"github.com/vmihailenco/msgpack/v5"
)

// Arrow MessagePack extension type
const arrowExtId = 8

// Arrow struct wraps a raw arrow data buffer.
type Arrow struct {
data []byte
}

// MakeArrow returns a new arrow.Arrow object that contains
// wrapped a raw arrow data buffer.
func MakeArrow(arrow []byte) (Arrow, error) {
if len(arrow) == 0 {
return Arrow{}, fmt.Errorf("no Arrow data")
}
return Arrow{arrow}, nil
}

// ToArrow returns a []byte that contains Arrow raw data.
func (a *Arrow) ToArrow() []byte {
return a.data
}

func arrowDecoder(d *msgpack.Decoder, v reflect.Value, extLen int) error {
arrow := Arrow{
data: make([]byte, 0, extLen),
}
n, err := d.Buffered().Read(arrow.data)
if err != nil {
return fmt.Errorf("msgpack: can't read bytes on Arrow decode: %w", err)
}
if n < extLen || n != len(arrow.data) {
return fmt.Errorf("msgpack: unexpected end of stream after %d Arrow bytes", n)
}

v.Set(reflect.ValueOf(arrow))
return nil
}

func arrowEncoder(e *msgpack.Encoder, v reflect.Value) ([]byte, error) {
if v.IsValid() {
return v.Interface().(Arrow).data, nil
}

return []byte{}, fmt.Errorf("msgpack: not valid Arrow value")
}

func init() {
msgpack.RegisterExtDecoder(arrowExtId, Arrow{}, arrowDecoder)
msgpack.RegisterExtEncoder(arrowExtId, Arrow{}, arrowEncoder)
}
1 change: 1 addition & 0 deletions arrow/arrow_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package arrow_test
44 changes: 44 additions & 0 deletions request.go
Original file line number Diff line number Diff line change
Expand Up @@ -1156,6 +1156,50 @@ func (req *InsertRequest) Context(ctx context.Context) *InsertRequest {
return req
}

// InsertArrowRequest helps you to create an insert request object for execution
// by a Connection.
type InsertArrowRequest struct {
spaceRequest
arrow interface{}
}

// NewInsertArrowRequest returns a new empty InsertArrowRequest.
func NewInsertArrowRequest(space interface{}) *InsertArrowRequest {
req := new(InsertArrowRequest)
req.rtype = iproto.IPROTO_INSERT
req.setSpace(space)
req.arrow = []interface{}{}
return req
}

// Arrow sets the arrow for insertion the insert arrow request.
// Note: default value is nil.
func (req *InsertArrowRequest) Arrow(arrow interface{}) *InsertArrowRequest {
req.arrow = arrow
return req
}

// Body fills an msgpack.Encoder with the insert arrow request body.
func (req *InsertArrowRequest) Body(res SchemaResolver, enc *msgpack.Encoder) error {
spaceEnc, err := newSpaceEncoder(res, req.space)
if err != nil {
return err
}

return fillInsert(enc, spaceEnc, req.arrow)
}

// Context sets a passed context to the request.
//
// Pay attention that when using context with request objects,
// the timeout option for Connection does not affect the lifetime
// of the request. For those purposes use context.WithTimeout() as
// the root context.
func (req *InsertArrowRequest) Context(ctx context.Context) *InsertArrowRequest {
req.ctx = ctx
return req
}

// ReplaceRequest helps you to create a replace request object for execution
// by a Connection.
type ReplaceRequest struct {
Expand Down

0 comments on commit d0ab8ab

Please sign in to comment.