Skip to content

Commit

Permalink
Implementation of batch insert
Browse files Browse the repository at this point in the history
Batch insert is mostly used for operation with
one bucket / one Tarantool node in a transaction.
In this case batch insert is more efficient
then inserting tuple-by-tuple.
Right now CRUD cannot provide batch insert with full consistency.
CRUD offers batch insert with partial consistency. That means
that full consistency can be provided only on single replicaset
using `box` transactions.

Part of #193
  • Loading branch information
AnaNek committed Jun 27, 2022
1 parent 8e00652 commit 5e5bbc0
Show file tree
Hide file tree
Showing 17 changed files with 3,097 additions and 41 deletions.
95 changes: 95 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ It also provides the `crud-storage` and `crud-router` roles for
- [Quickstart](#quickstart)
- [API](#api)
- [Insert](#insert)
- [Insert many](#insert-many)
- [Get](#get)
- [Update](#update)
- [Delete](#delete)
Expand Down Expand Up @@ -233,6 +234,100 @@ crud.insert_object('customers', {
...
```

### Insert many

```lua
-- Insert batch of tuples
local result, err = crud.insert_many(space_name, tuples, opts)
-- Insert batch of objects
local result, err = crud.insert_object_many(space_name, objects, opts)
```

where:

* `space_name` (`string`) - name of the space to insert an object
* `tuples` / `objects` (`table`) - array of tuples/objects to insert
* `opts`:
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
* `fields` (`?table`) - field names for getting only a subset of fields
* `stop_on_error` (`?boolean`) - stop on a first error and report error
regarding the failed operation and error about what tuples were not
performed, default is `false`
* `rollback_on_error` (`?boolean`) - any failed operation will lead to
rollback on a storage, where the operation is failed, report error
about what tuples were rollback, default is `false`

Returns metadata and array contains inserted rows, array of errors.
Error object can contain field `operation_data`.

This field can contain:
* tuple for which the error occurred;
* object with an incorrect format;
* tuple the operation on which was performed but
operation was rollback;
* tuple the operation on which was not performed
because operation was stopped by error.

Right now CRUD cannot provide batch insert with full consistency.
CRUD offers batch insert with partial consistency. That means
that full consistency can be provided only on single replicaset
using `box` transactions.

**Example:**

```lua
crud.insert_many('customers', {
{1, box.NULL, 'Elizabeth', 23},
{2, box.NULL, 'Anastasia', 22},
})
---
- metadata:
- {'name': 'id', 'type': 'unsigned'}
- {'name': 'bucket_id', 'type': 'unsigned'}
- {'name': 'name', 'type': 'string'}
- {'name': 'age', 'type': 'number'}
rows:
- [1, 477, 'Elizabeth', 23]
- [2, 401, 'Anastasia', 22]
...
crud.insert_object_many('customers', {
{id = 3, name = 'Elizabeth', age = 24},
{id = 10, name = 'Anastasia', age = 21},
})
---
- metadata:
- {'name': 'id', 'type': 'unsigned'}
- {'name': 'bucket_id', 'type': 'unsigned'}
- {'name': 'name', 'type': 'string'}
- {'name': 'age', 'type': 'number'}
rows:
- [3, 2804, 'Elizabeth', 24]
- [10, 569, 'Anastasia', 21]

-- Partial success
local res, errs = crud.insert_object_many('customers', {
{id = 22, name = 'Alex', age = 34},
{id = 3, name = 'Anastasia', age = 22},
{id = 5, name = 'Sergey', age = 25},
})
---
res
- metadata:
- {'name': 'id', 'type': 'unsigned'}
- {'name': 'bucket_id', 'type': 'unsigned'}
- {'name': 'name', 'type': 'string'}
- {'name': 'age', 'type': 'number'}
rows:
- [5, 1172, 'Sergey', 25],
- [22, 655, 'Alex', 34],

#errs -- 1
errs[1].class_name -- BatchInsertError
errs[1].err -- 'Duplicate key exists <...>'
errs[1].tuple -- {3, 2804, 'Anastasia', 22}
...
```

### Get

```lua
Expand Down
10 changes: 10 additions & 0 deletions crud.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

local cfg = require('crud.cfg')
local insert = require('crud.insert')
local insert_many = require('crud.insert_many')
local replace = require('crud.replace')
local get = require('crud.get')
local update = require('crud.update')
Expand Down Expand Up @@ -31,6 +32,14 @@ crud.insert = stats.wrap(insert.tuple, stats.op.INSERT)
-- @function insert_object
crud.insert_object = stats.wrap(insert.object, stats.op.INSERT)

-- @refer insert_many.tuples
-- @function insert_many
crud.insert_many = insert_many.tuples

-- @refer insert_many.objects
-- @function insert_object_many
crud.insert_object_many = insert_many.objects

-- @refer get.call
-- @function get
crud.get = stats.wrap(get.call, stats.op.GET)
Expand Down Expand Up @@ -124,6 +133,7 @@ function crud.init_storage()
end

insert.init()
insert_many.init()
get.init()
replace.init()
update.init()
Expand Down
38 changes: 38 additions & 0 deletions crud/common/batching_utils.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
local errors = require('errors')
local dev_checks = require('crud.common.dev_checks')
local sharding_utils = require('crud.common.sharding.utils')

local NotPerformedError = errors.new_class('NotPerformedError', {capture_stack = false})

local batching_utils = {}

batching_utils.stop_on_error_msg = "Operation with tuple was not performed"
batching_utils.rollback_on_error_msg = "Operation with tuple was rollback"

function batching_utils.construct_sharding_hash_mismatch_errors(err_msg, tuples)
dev_checks('string', 'table')

local errs = {}

for _, tuple in ipairs(tuples) do
local err_obj = sharding_utils.ShardingHashMismatchError:new(err_msg)
err_obj.operation_data = tuple
table.insert(errs, err_obj)
end

return errs
end

function batching_utils.complement_batching_errors(errs, err_msg, tuples)
dev_checks('table', 'string', 'table')

for _, tuple in ipairs(tuples) do
local err_obj = NotPerformedError:new(err_msg)
err_obj.operation_data = tuple
table.insert(errs, err_obj)
end

return errs
end

return batching_utils
51 changes: 33 additions & 18 deletions crud/common/call.lua
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ local utils = require('crud.common.utils')
local sharding_utils = require('crud.common.sharding.utils')
local fiber_clock = require('fiber').clock

local BaseIterator = require('crud.common.map_call_cases.base_iter')
local BasePostprocessor = require('crud.common.map_call_cases.base_postprocessor')

local CallError = errors.new_class('CallError')

local call = {}
Expand Down Expand Up @@ -71,6 +74,8 @@ function call.map(func_name, func_args, opts)
balance = '?boolean',
timeout = '?number',
replicasets = '?table',
iter = '?table',
postprocessor = '?table',
})
opts = opts or {}

Expand All @@ -81,24 +86,27 @@ function call.map(func_name, func_args, opts)

local timeout = opts.timeout or call.DEFAULT_VSHARD_CALL_TIMEOUT

local replicasets, err
if opts.replicasets ~= nil then
replicasets = opts.replicasets
else
replicasets, err = vshard.router.routeall()
if replicasets == nil then
return nil, CallError:new("Failed to get all replicasets: %s", err.err)
local iter = opts.iter
if iter == nil then
iter, err = BaseIterator:new({func_args = func_args, replicasets = opts.replicasets})
if err ~= nil then
return nil, err
end
end

local postprocessor = opts.postprocessor
if postprocessor == nil then
postprocessor = BasePostprocessor:new()
end

local futures_by_replicasets = {}
local call_opts = {is_async = true}
for _, replicaset in pairs(replicasets) do
local future = replicaset[vshard_call_name](replicaset, func_name, func_args, call_opts)
while iter:has_next() do
local args, replicaset = iter:get()
local future = replicaset[vshard_call_name](replicaset, func_name, args, call_opts)
futures_by_replicasets[replicaset.uuid] = future
end

local results = {}
local deadline = fiber_clock() + timeout
for replicaset_uuid, future in pairs(futures_by_replicasets) do
local wait_timeout = deadline - fiber_clock()
Expand All @@ -107,18 +115,25 @@ function call.map(func_name, func_args, opts)
end

local result, err = future:wait_result(wait_timeout)
if err == nil and result[1] == nil then
err = result[2]
end

if err ~= nil then
return nil, wrap_vshard_err(err, func_name, replicaset_uuid)
end
local result_info = {
key = replicaset_uuid,
value = result,
}

results[replicaset_uuid] = result
local err_info = {
err_wrapper = wrap_vshard_err,
err = err,
wrapper_args = {func_name, replicaset_uuid},
}

local early_exit = postprocessor:collect(result_info, err_info)
if early_exit then
break
end
end

return results
return postprocessor:get()
end

function call.single(bucket_id, func_name, func_args, opts)
Expand Down
76 changes: 76 additions & 0 deletions crud/common/map_call_cases/base_iter.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
local errors = require('errors')
local vshard = require('vshard')

local dev_checks = require('crud.common.dev_checks')
local GetReplicasetsError = errors.new_class('GetReplicasetsError')

local BaseIterator = {}

--- Create new base iterator for map call
--
-- @function new
--
-- @tparam[opt] table opts
-- Options of BaseIterator:new
-- @tparam[opt] table opts.func_args
-- Function arguments to call
-- @tparam[opt] table opts.replicasets
-- Replicasets to call
--
-- @return[1] table iterator
-- @treturn[2] nil
-- @treturn[2] table of tables Error description
function BaseIterator:new(opts)
dev_checks('table', {
func_args = '?table',
replicasets = '?table',
})

local replicasets, err
if opts.replicasets ~= nil then
replicasets = opts.replicasets
else
replicasets, err = vshard.router.routeall()
if replicasets == nil then
return nil, GetReplicasetsError:new("Failed to get all replicasets: %s", err.err)
end
end

local next_index, next_replicaset = next(replicasets)

local iter = {
func_args = opts.func_args,
replicasets = replicasets,
next_replicaset = next_replicaset,
next_index = next_index
}

setmetatable(iter, self)
self.__index = self

return iter
end

--- Check there is next replicaset to call
--
-- @function has_next
--
-- @return[1] boolean
function BaseIterator:has_next()
return self.next_index ~= nil
end

--- Get function arguments and next replicaset
--
-- @function get
--
-- @return[1] table func_args
-- @return[2] table replicaset
function BaseIterator:get()
local replicaset = self.next_replicaset
self.next_index, self.next_replicaset = next(self.replicasets, self.next_index)

return self.func_args, replicaset
end

return BaseIterator
Loading

0 comments on commit 5e5bbc0

Please sign in to comment.