Skip to content

Commit

Permalink
api: add support of a batch insert request
Browse files Browse the repository at this point in the history
Draft changes: add support the IPROTO_INSERT_ARROW request and message pack type MP_ARROW .

Closes #399
  • Loading branch information
dmyger committed Oct 4, 2024
1 parent 592db69 commit c0802b2
Show file tree
Hide file tree
Showing 6 changed files with 308 additions and 6 deletions.
7 changes: 4 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
## [Unreleased]

### Added
- Add err log to `ConnectionPool.Add()` in case, when unable to establish
connection and ctx is not canceled;
also added logs for error case of `ConnectionPool.tryConnect()` calls in
- Add err log to `ConnectionPool.Add()` in case, when unable to establish
connection and ctx is not canceled;
also added logs for error case of `ConnectionPool.tryConnect()` calls in
`ConnectionPool.controller()` and `ConnectionPool.reconnect()`
- Methods that are implemented but not included in the pooler interface (#395).
- Implemented stringer methods for pool.Role (#405).
- Support the IPROTO_INSERT_ARROW request (#399).

### Changed

Expand Down
59 changes: 59 additions & 0 deletions arrow/arrow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package arrow

import (
"fmt"
"reflect"

"github.com/vmihailenco/msgpack/v5"
)

// Arrow MessagePack extension type
const arrowExtId = 8

// Arrow struct wraps a raw arrow data buffer.
type Arrow struct {
data []byte
}

// MakeArrow returns a new arrow.Arrow object that contains
// wrapped a raw arrow data buffer.
func MakeArrow(arrow []byte) (Arrow, error) {
if len(arrow) == 0 {
return Arrow{}, fmt.Errorf("no Arrow data")
}
return Arrow{arrow}, nil
}

// ToArrow returns a []byte that contains Arrow raw data.
func (a *Arrow) ToArrow() []byte {
return a.data
}

func arrowDecoder(d *msgpack.Decoder, v reflect.Value, extLen int) error {
arrow := Arrow{
data: make([]byte, 0, extLen),
}
n, err := d.Buffered().Read(arrow.data)
if err != nil {
return fmt.Errorf("msgpack: can't read bytes on Arrow decode: %w", err)
}
if n < extLen || n != len(arrow.data) {
return fmt.Errorf("msgpack: unexpected end of stream after %d Arrow bytes", n)
}

v.Set(reflect.ValueOf(arrow))
return nil
}

func arrowEncoder(e *msgpack.Encoder, v reflect.Value) ([]byte, error) {
if v.IsValid() {
return v.Interface().(Arrow).data, nil
}

return []byte{}, fmt.Errorf("msgpack: not valid Arrow value")
}

func init() {
msgpack.RegisterExtDecoder(arrowExtId, Arrow{}, arrowDecoder)
msgpack.RegisterExtEncoder(arrowExtId, Arrow{}, arrowEncoder)
}
130 changes: 130 additions & 0 deletions arrow/arrow_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package arrow_test

import (
"encoding/hex"
"log"
"os"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/tarantool/go-tarantool/v2"
"github.com/tarantool/go-tarantool/v2/arrow"
"github.com/tarantool/go-tarantool/v2/test_helpers"
)

var isArrowSupported = false

var server = "127.0.0.1:3013"
var dialer = tarantool.NetDialer{
Address: server,
User: "test",
Password: "test",
}
var space = "testArrow"

var opts = tarantool.Opts{
//! Timeout: 5 * time.Second,
}

func skipIfArrowUnsupported(t *testing.T) {
t.Helper()
if !isArrowSupported {
t.Skip("Skipping test for Tarantool without Arrow support in msgpack")
}
}

// TestInsert uses Arrow sequence from Tarantool's test .
// nolint:lll
// See: https://github.com/tarantool/tarantool/blob/master/test/box-luatest/gh_10508_iproto_insert_arrow_test.lua
func TestInsert_invalid(t *testing.T) {
skipIfArrowUnsupported(t)

arrows := []struct {
arrow string
expected string
}{
{
"",
"no Arrow data",
},
{
"00",
"Failed to decode Arrow IPC data",
},
{
"ffffffff70000000040000009effffff0400010004000000" +
"b6ffffff0c00000004000000000000000100000004000000daffffff140000000202" +
"000004000000f0ffffff4000000001000000610000000600080004000c0010000400" +
"080009000c000c000c0000000400000008000a000c00040006000800ffffffff8800" +
"0000040000008affffff0400030010000000080000000000000000000000acffffff" +
"01000000000000003400000008000000000000000200000000000000000000000000" +
"00000000000000000000000000000800000000000000000000000100000001000000" +
"0000000000000000000000000a00140004000c0010000c0014000400060008000c00" +
"00000000000000000000",
"memtx does not support arrow format",
},
}

conn := test_helpers.ConnectWithValidation(t, dialer, opts)
defer conn.Close()

for i, a := range arrows {
t.Run(strconv.Itoa(i), func(t *testing.T) {
data, err := hex.DecodeString(a.arrow)
require.NoError(t, err)

arr, err := arrow.MakeArrow(data)
if err != nil {
require.ErrorContains(t, err, a.expected)
return
}
req := tarantool.NewInsertArrowRequest(space).Arrow(arr)

_, err = conn.Do(req).Get()
require.ErrorContains(t, err, a.expected)
})
}

}

// runTestMain is a body of TestMain function
// (see https://pkg.go.dev/testing#hdr-Main).
// Using defer + os.Exit is not works so TestMain body
// is a separate function, see
// https://stackoverflow.com/questions/27629380/how-to-exit-a-go-program-honoring-deferred-calls
func runTestMain(m *testing.M) int {
isLess, err := test_helpers.IsTarantoolVersionLess(3, 3, 0)
if err != nil {
log.Fatalf("Failed to extract Tarantool version: %s", err)
}
isArrowSupported = !isLess

if !isArrowSupported {
log.Println("Skipping insert Arrow tests...")
return m.Run()
}

instance, err := test_helpers.StartTarantool(test_helpers.StartOpts{
Dialer: dialer,
InitScript: "config.lua",
Listen: server,
WaitStart: 100 * time.Millisecond,
ConnectRetry: 10,
RetryTimeout: 500 * time.Millisecond,
})
defer test_helpers.StopTarantoolWithCleanup(instance)

if err != nil {
log.Printf("Failed to prepare test Tarantool: %s", err)
return 1
}

return m.Run()
}

func TestMain(m *testing.M) {
code := runTestMain(m)
os.Exit(code)
}
34 changes: 34 additions & 0 deletions arrow/config.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
--? local uuid = require('uuid')
--? local msgpack = require('msgpack')

-- Do not set listen for now so connector won't be
-- able to send requests until everything is configured.
box.cfg{
work_dir = os.getenv("TEST_TNT_WORK_DIR"),
}

box.schema.user.create('test', { password = 'test' , if_not_exists = true })
box.schema.user.grant('test', 'execute', 'universe', nil, { if_not_exists = true })

--? local uuid_msgpack_supported = pcall(msgpack.encode, uuid.new())
--? if not uuid_msgpack_supported then
--? error('UUID unsupported, use Tarantool 2.4.1 or newer')
--? end

local s = box.schema.space.create('testArrow', {
id = 524,
if_not_exists = true,
})
s:create_index('primary', {
type = 'tree',
parts = {{ field = 1, type = 'integer' }},
if_not_exists = true
})
s:truncate()

box.schema.user.grant('test', 'read,write', 'space', 'testArrow', { if_not_exists = true })

-- Set listen only when every other thing is configured.
box.cfg{
listen = os.getenv("TEST_TNT_LISTEN"),
}
70 changes: 70 additions & 0 deletions request.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,18 @@ import (
"github.com/vmihailenco/msgpack/v5"
)

// INSERT Arrow request.
//
// FIXME: replace with iproto.IPROTO_INSERT_ARROW when iproto will released.
// https://github.com/tarantool/go-replica/issues/30
const iprotoInsertArrowType = iproto.Type(17)

// The data in Arrow format.
//
// FIXME: replace with iproto.IPROTO_ARROW when iproto will released.
// https://github.com/tarantool/go-replica/issues/30
const iprotoArrowKey = iproto.Key(0x36)

type spaceEncoder struct {
Id uint32
Name string
Expand Down Expand Up @@ -136,6 +148,20 @@ func fillInsert(enc *msgpack.Encoder, spaceEnc spaceEncoder, tuple interface{})
return enc.Encode(tuple)
}

func fillInsertArrow(enc *msgpack.Encoder, spaceEnc spaceEncoder, arrow interface{}) error {
if err := enc.EncodeMapLen(2); err != nil {
return err
}
if err := spaceEnc.Encode(enc); err != nil {
return err
}

if err := enc.EncodeUint(uint64(iprotoArrowKey)); err != nil {
return err
}
return enc.Encode(arrow)
}

func fillSelect(enc *msgpack.Encoder, spaceEnc spaceEncoder, indexEnc indexEncoder,
offset, limit uint32, iterator Iter, key, after interface{}, fetchPos bool) error {
mapLen := 6
Expand Down Expand Up @@ -1156,6 +1182,50 @@ func (req *InsertRequest) Context(ctx context.Context) *InsertRequest {
return req
}

// InsertArrowRequest helps you to create an insert request object for execution
// by a Connection.
type InsertArrowRequest struct {
spaceRequest
arrow interface{}
}

// NewInsertArrowRequest returns a new empty InsertArrowRequest.
func NewInsertArrowRequest(space interface{}) *InsertArrowRequest {
req := new(InsertArrowRequest)
req.rtype = iprotoInsertArrowType
req.setSpace(space)
req.arrow = []interface{}{}
return req
}

// Arrow sets the arrow for insertion the insert arrow request.
// Note: default value is nil.
func (req *InsertArrowRequest) Arrow(arrow interface{}) *InsertArrowRequest {
req.arrow = arrow
return req
}

// Body fills an msgpack.Encoder with the insert arrow request body.
func (req *InsertArrowRequest) Body(res SchemaResolver, enc *msgpack.Encoder) error {
spaceEnc, err := newSpaceEncoder(res, req.space)
if err != nil {
return err
}

return fillInsertArrow(enc, spaceEnc, req.arrow)
}

// Context sets a passed context to the request.
//
// Pay attention that when using context with request objects,
// the timeout option for Connection does not affect the lifetime
// of the request. For those purposes use context.WithTimeout() as
// the root context.
func (req *InsertArrowRequest) Context(ctx context.Context) *InsertArrowRequest {
req.ctx = ctx
return req
}

// ReplaceRequest helps you to create a replace request object for execution
// by a Connection.
type ReplaceRequest struct {
Expand Down
14 changes: 11 additions & 3 deletions test_helpers/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,22 @@ func atoiUint64(str string) (uint64, error) {
return res, nil
}

func getTarantoolExec() string {

if tar_bin := os.Getenv("TARANTOOL_BIN"); tar_bin != "" {
return tar_bin
}

return "tarantool"
}

// IsTarantoolVersionLess checks if tarantool version is less
// than passed <major.minor.patch>. Returns error if failed
// to extract version.
func IsTarantoolVersionLess(majorMin uint64, minorMin uint64, patchMin uint64) (bool, error) {
var major, minor, patch uint64

out, err := exec.Command("tarantool", "--version").Output()
out, err := exec.Command(getTarantoolExec(), "--version").Output()

if err != nil {
return true, err
Expand Down Expand Up @@ -202,8 +211,7 @@ func StartTarantool(startOpts StartOpts) (TarantoolInstance, error) {
return inst, err
}
}

inst.Cmd = exec.Command("tarantool", startOpts.InitScript)
inst.Cmd = exec.Command(getTarantoolExec(), startOpts.InitScript)

inst.Cmd.Env = append(
os.Environ(),
Expand Down

0 comments on commit c0802b2

Please sign in to comment.