Skip to content

Commit

Permalink
Implementation of batch replace
Browse files Browse the repository at this point in the history
Batch upsert is mostly used for operation with
one bucket / one Tarantool node in a transaction.
In this case batch replace is more efficient
then replacing tuple-by-tuple.
Right now CRUD cannot provide batch replace with full consistency.
CRUD offers batch upsert with partial consistency. That means
that full consistency can be provided only on single replicaset
using `box` transactions.

Part of #193
  • Loading branch information
AnaNek committed May 20, 2022
1 parent 3074100 commit 379a252
Show file tree
Hide file tree
Showing 6 changed files with 2,135 additions and 140 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
* Batch insert/upsert operation
`crud.insert_many()`/`crud.insert_object_many()`/
`crud.upsert_many()`/`crud.upsert_object_many()`
`crud.replace_many()`/`crud.replace_object_many()`
with partial consistency

### Changed
Expand Down
89 changes: 89 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,95 @@ crud.replace_object('customers', {
...
```

### Replace many

```lua
-- Batch replace tuples
local result, err = crud.replace_many(space_name, tuples, opts)
-- Batch replace objects
local result, err = crud.replace_object_many(space_name, objects, opts)
```

where:

* `space_name` (`string`) - name of the space to insert/replace an object
* `tuples` / `objects` (`table`) - array of tuples/objects to insert
* `opts`:
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
* `fields` (`?table`) - field names for getting only a subset of fields
* `stop_on_error` (`?boolean`) - stop on a first error and report errors
regarding the failed operation and all not performed ones., default is
`false`
* `rollback_on_error` (`?boolean`) - any failed operation will lead to
rollback on a storage, where the operation is failed, default is
`false`

Returns metadata and array contains inserted rows, array of errors
(one error corresponds to one replicaset for which the error occurred).
Error object can contain `tuple` field. This field contains the tuple
for which the error occurred.

Right now CRUD cannot provide batch replace with full consistency.
CRUD offers batch replace with partial consistency. That means
that full consistency can be provided only on single replicaset
using `box` transactions.

**Example:**

```lua
crud.replace_many('developers', {
{1, box.NULL, 'Elizabeth', 'lizaaa'},
{2, box.NULL, 'Anastasia', 'iamnewdeveloper'},
})
---
- metadata:
- {'name': 'id', 'type': 'unsigned'}
- {'name': 'bucket_id', 'type': 'unsigned'}
- {'name': 'name', 'type': 'string'}
- {'name': 'login', 'type': 'string'}
rows:
- [1, 477, 'Elizabeth', 'lizaaa']
- [2, 401, 'Anastasia', 'iamnewdeveloper']
...
crud.replace_object_many('developers', {
{id = 1, name = 'Inga', login = 'mylogin'},
{id = 10, name = 'Anastasia', login = 'qwerty'},
})
---
- metadata:
- {'name': 'id', 'type': 'unsigned'}
- {'name': 'bucket_id', 'type': 'unsigned'}
- {'name': 'name', 'type': 'string'}
- {'name': 'age', 'type': 'number'}
rows:
- [1, 477, 'Inga', 'mylogin']
- [10, 569, 'Anastasia', 'qwerty']

-- Partial success
-- Let's say login has unique secondary index
local res, errs = crud.replace_object_many('developers', {
{id = 22, name = 'Alex', login = 'pushkinn'},
{id = 3, name = 'Anastasia', login = 'qwerty'},
{id = 5, name = 'Sergey', login = 'crudisthebest'},
})
---
res
- metadata:
- {'name': 'id', 'type': 'unsigned'}
- {'name': 'bucket_id', 'type': 'unsigned'}
- {'name': 'name', 'type': 'string'}
- {'name': 'age', 'type': 'number'}
rows:
- [5, 1172, 'Sergey', 'crudisthebest'],
- [22, 655, 'Alex', 'pushkinn'],

#errs -- 1
errs[1].class_name -- BatchReplaceError
errs[1].err -- 'Duplicate key exists <...>'
errs[1].tuple -- {3, 2804, 'Anastasia', 'qwerty'}
...
```

### Upsert

```lua
Expand Down
10 changes: 10 additions & 0 deletions crud.lua
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ local cfg = require('crud.cfg')
local insert = require('crud.insert')
local batch_insert = require('crud.batch_insert')
local replace = require('crud.replace')
local batch_replace = require('crud.batch_replace')
local get = require('crud.get')
local update = require('crud.update')
local upsert = require('crud.upsert')
Expand Down Expand Up @@ -53,6 +54,14 @@ crud.replace = stats.wrap(replace.tuple, stats.op.REPLACE)
-- @function replace_object
crud.replace_object = stats.wrap(replace.object, stats.op.REPLACE)

-- @refer batch_replace.tuples_batch
-- @function replace_many
crud.replace_many = batch_replace.tuples_batch

-- @refer batch_replace.objects_batch
-- @function replace_object_many
crud.replace_object_many = batch_replace.objects_batch

-- @refer update.call
-- @function update
crud.update = stats.wrap(update.call, stats.op.UPDATE)
Expand Down Expand Up @@ -145,6 +154,7 @@ function crud.init_storage()
batch_insert.init()
get.init()
replace.init()
batch_replace.init()
update.init()
upsert.init()
batch_upsert.init()
Expand Down
246 changes: 246 additions & 0 deletions crud/batch_replace.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
local checks = require('checks')
local errors = require('errors')
local vshard = require('vshard')

local call = require('crud.common.call')
local const = require('crud.common.const')
local utils = require('crud.common.utils')
local sharding = require('crud.common.sharding')
local dev_checks = require('crud.common.dev_checks')
local schema = require('crud.common.schema')

local BatchInsertIterator = require('crud.common.map_call_cases.batch_insert_iter')
local BatchPostprocessor = require('crud.common.map_call_cases.batch_postprocessor')

local BatchReplaceError = errors.new_class('BatchReplaceError', {capture_stack = false})

local batch_replace = {}

local BATCH_REPLACE_FUNC_NAME = '_crud.batch_replace_on_storage'

local function batch_replace_on_storage(space_name, tuples, opts)
dev_checks('string', 'table', {
add_space_schema_hash = '?boolean',
fields = '?table',
stop_on_error = '?boolean',
rollback_on_error = '?boolean',
sharding_key_hash = '?number',
sharding_func_hash = '?number',
skip_sharding_hash_check = '?boolean',
})

opts = opts or {}

local space = box.space[space_name]
if space == nil then
return nil, {BatchReplaceError:new("Space %q doesn't exist", space_name)}
end

local _, err = sharding.check_sharding_hash(space_name,
opts.sharding_func_hash,
opts.sharding_key_hash,
opts.skip_sharding_hash_check)

if err ~= nil then
return nil, {err}
end

local inserted_tuples = {}
local errs = {}

box.begin()
for _, tuple in ipairs(tuples) do
-- add_space_schema_hash is true only in case of replace_object_many
-- the only one case when reloading schema can avoid replace error
-- is flattening object on router
local insert_result = schema.wrap_box_space_func_result(space, 'replace', {tuple}, {
add_space_schema_hash = opts.add_space_schema_hash,
field_names = opts.fields,
})

if insert_result.err ~= nil then
local err = {
err = insert_result.err,
tuple = tuple,
}

if opts.stop_on_error == true then
if opts.rollback_on_error == true then
box.rollback()
return nil, {err}
end

box.commit()

return inserted_tuples, {err}
end

table.insert(errs, err)
end

table.insert(inserted_tuples, insert_result.res)
end

if next(errs) ~= nil then
if opts.rollback_on_error == true then
box.rollback()
return nil, errs
end

box.commit()

return inserted_tuples, errs
end

box.commit()

return inserted_tuples
end

function batch_replace.init()
_G._crud.batch_replace_on_storage = batch_replace_on_storage
end

-- returns result, err, need_reload
-- need_reload indicates if reloading schema could help
-- see crud.common.schema.wrap_func_reload()
local function call_batch_replace_on_router(space_name, original_tuples, opts)
dev_checks('string', 'table', {
timeout = '?number',
fields = '?table',
add_space_schema_hash = '?boolean',
stop_on_error = '?boolean',
rollback_on_error = '?boolean',
})

opts = opts or {}

local space = utils.get_space(space_name, vshard.router.routeall())
if space == nil then
return nil, {BatchReplaceError:new("Space %q doesn't exist", space_name)}, true
end

local tuples = table.deepcopy(original_tuples)

local batch_replace_on_storage_opts = {
add_space_schema_hash = opts.add_space_schema_hash,
fields = opts.fields,
stop_on_error = opts.stop_on_error,
rollback_on_error = opts.rollback_on_error,
}

local iter, err = BatchInsertIterator:new({
tuples = tuples,
space = space,
execute_on_storage_opts = batch_replace_on_storage_opts,
})
if err ~= nil then
return nil, {err}, const.NEED_SCHEMA_RELOAD
end

local postprocessor = BatchPostprocessor:new()

local rows, errs = call.map(BATCH_REPLACE_FUNC_NAME, nil, {
timeout = opts.timeout,
mode = 'write',
iter = iter,
postprocessor = postprocessor,
})

if next(rows) == nil then
return nil, errs
end

local res, err = utils.format_result(rows, space, opts.fields)
if err ~= nil then
return nil, {err}
end

return res, errs
end

--- Batch replace tuples to the specified space
--
-- @function tuples_batch
--
-- @param string space_name
-- A space name
--
-- @param table tuples
-- Tuples
--
-- @tparam ?table opts
-- Options of batch_replace.tuples_batch
--
-- @return[1] tuples
-- @treturn[2] nil
-- @treturn[2] table of tables Error description

function batch_replace.tuples_batch(space_name, tuples, opts)
checks('string', 'table', {
timeout = '?number',
fields = '?table',
add_space_schema_hash = '?boolean',
stop_on_error = '?boolean',
rollback_on_error = '?boolean',
})

return schema.wrap_func_reload(call_batch_replace_on_router, space_name, tuples, opts)
end

--- Batch replace objects to the specified space
--
-- @function objects_batch
--
-- @param string space_name
-- A space name
--
-- @param table objs
-- Objects
--
-- @tparam ?table opts
-- Options of batch_insert.tuples_batch
--
-- @return[1] objects
-- @treturn[2] nil
-- @treturn[2] table of tables Error description

function batch_replace.objects_batch(space_name, objs, opts)
checks('string', 'table', {
timeout = '?number',
fields = '?table',
stop_on_error = '?boolean',
rollback_on_error = '?boolean',
})

-- insert can fail if router uses outdated schema to flatten object
opts = utils.merge_options(opts, {add_space_schema_hash = true})

local tuples = {}
local errs = {}

for _, obj in ipairs(objs) do

local tuple, err = utils.flatten_obj_reload(space_name, obj)
if err ~= nil then
local err_obj = BatchReplaceError:new("Failed to flatten object: %s", err)
err_obj.tuple = obj

if opts.stop_on_error == true then
return nil, {err_obj}
end

table.insert(errs, err_obj)
end

table.insert(tuples, tuple)
end

if next(errs) ~= nil then
return nil, errs
end

return batch_replace.tuples_batch(space_name, tuples, opts)
end

return batch_replace
Loading

0 comments on commit 379a252

Please sign in to comment.