Skip to content

Commit

Permalink
Implementation of batch insert
Browse files Browse the repository at this point in the history
  • Loading branch information
AnaNek committed Feb 2, 2022
1 parent 76e3374 commit 0a947f0
Show file tree
Hide file tree
Showing 7 changed files with 379 additions and 0 deletions.
6 changes: 6 additions & 0 deletions crud.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
-- @module crud

local insert = require('crud.insert')
local batch_insert = require('crud.batch_insert')
local replace = require('crud.replace')
local get = require('crud.get')
local update = require('crud.update')
Expand All @@ -24,10 +25,14 @@ local crud = {}
-- @function insert
crud.insert = insert.tuple

crud.batch_insert = batch_insert.tuples_batch

-- @refer insert.object
-- @function insert_object
crud.insert_object = insert.object

crud.batch_insert_object = batch_insert.objects_batch

-- @refer get.call
-- @function get
crud.get = get.call
Expand Down Expand Up @@ -105,6 +110,7 @@ function crud.init_storage()
end

insert.init()
batch_insert.init()
get.init()
replace.init()
update.init()
Expand Down
145 changes: 145 additions & 0 deletions crud/batch_insert.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
local checks = require('checks')
local errors = require('errors')
local vshard = require('vshard')

local call = require('crud.common.call')
local utils = require('crud.common.utils')
local sharding = require('crud.common.sharding')
local dev_checks = require('crud.common.dev_checks')
local schema = require('crud.common.schema')

local BatchInsertError = errors.new_class('BatchInsertError', {capture_stack = false})

local batch_insert = {}

local BATCH_INSERT_FUNC_NAME = '_crud.batch_insert_on_storage'

local function batch_insert_on_storage(space_name, batch, opts)
dev_checks('string', 'table', {
fields = '?table',
})

opts = opts or {}

local space = box.space[space_name]
if space == nil then
return nil, BatchInsertError:new("Space %q doesn't exist", space_name)
end

local inserted_tuples = {}

box.begin()
for _, tuple in ipairs(batch) do
local insert_result = schema.wrap_box_space_func_result(space, 'insert', {tuple}, {
field_names = opts.fields,
})

table.insert(inserted_tuples, insert_result.res)
if insert_result.err ~= nil then
box.commit()
return nil, {
err = insert_result.err,
tuple = tuple,
}
end
end
box.commit()

return inserted_tuples
end

function batch_insert.init()
_G._crud.batch_insert_on_storage = batch_insert_on_storage
end

-- returns result, err, need_reload
-- need_reload indicates if reloading schema could help
-- see crud.common.schema.wrap_func_reload()
local function call_batch_insert_on_router(space_name, tuples, opts)
dev_checks('string', 'table', {
timeout = '?number',
bucket_id = '?number|cdata',
fields = '?table',
})

opts = opts or {}

local space = utils.get_space(space_name, vshard.router.routeall())
if space == nil then
return nil, BatchInsertError:new("Space %q doesn't exist", space_name), true
end

local batches_by_replicasets, err = sharding.split_tuples_by_replicaset(tuples, space)
if err ~= nil then
return nil, err, true
end

local batch_insert_on_storage_opts = {
fields = opts.fields,
}

local call_opts = {
timeout = opts.timeout,
is_async = true,
}

local futures_by_replicasets = {}
for replicaset, batch in pairs(batches_by_replicasets) do
local func_args = {
space_name,
batch,
batch_insert_on_storage_opts,
}

local future = replicaset:call(BATCH_INSERT_FUNC_NAME, func_args, call_opts)
futures_by_replicasets[replicaset.uuid] = future
end

local results, errs = call.batch(
futures_by_replicasets,
BATCH_INSERT_FUNC_NAME,
opts.timeout
)

local rows = {}
for _, result in pairs(results) do
rows = utils.table_extend(rows, result[1])
end

return utils.format_result(rows, space, opts.fields), errs
end

function batch_insert.tuples_batch(space_name, tuples, opts)
checks('string', 'table', {
timeout = '?number',
bucket_id = '?number|cdata',
add_space_schema_hash = '?boolean',
fields = '?table',
})

return schema.wrap_func_reload(call_batch_insert_on_router, space_name, tuples, opts)
end

function batch_insert.objects_batch(space_name, objs, opts)
checks('string', 'table', {
timeout = '?number',
bucket_id = '?number|cdata',
add_space_schema_hash = '?boolean',
fields = '?table',
})

local tuples = {}
for _, obj in ipairs(objs) do

local tuple, err = utils.flatten_obj_reload(space_name, obj)
if err ~= nil then
return nil, BatchInsertError:new("Failed to flatten object: %s", err)
end

table.insert(tuples, tuple)
end

return batch_insert.tuples_batch(space_name, tuples, opts)
end

return batch_insert
34 changes: 34 additions & 0 deletions crud/common/call.lua
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,40 @@ local function wrap_vshard_err(err, func_name, replicaset_uuid, bucket_id)
))
end

function call.batch(futures_by_replicasets, func_name, timeout)
dev_checks('table', 'string', '?number')

local timeout = timeout or call.DEFAULT_VSHARD_CALL_TIMEOUT
local errs

local results = {}
local deadline = fiber_clock() + timeout
for replicaset_uuid, future in pairs(futures_by_replicasets) do
local wait_timeout = deadline - fiber_clock()
if wait_timeout < 0 then
wait_timeout = 0
end

local result, err = future:wait_result(wait_timeout)
if err == nil and result[1] == nil then
err = result[2]
end

if err ~= nil then
local err_obj = wrap_vshard_err(err.err or err, func_name, replicaset_uuid)
err_obj.tuple = err.tuple

errs = errs or {}
table.insert(errs, err_obj)
else
table.insert(results, result)
end

end

return results, errs
end

function call.map(func_name, func_args, opts)
dev_checks('string', '?table', {
mode = 'string',
Expand Down
23 changes: 23 additions & 0 deletions crud/common/sharding.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ local vshard = require('vshard')
local errors = require('errors')

local BucketIDError = errors.new_class("BucketIDError", {capture_stack = false})
local GetReplicasetsError = errors.new_class("GetReplicasetsError", {capture_stack = false})

local utils = require('crud.common.utils')
local sharding_key_module = require('crud.common.sharding_key')
Expand Down Expand Up @@ -65,4 +66,26 @@ function sharding.tuple_set_and_return_bucket_id(tuple, space, specified_bucket_
return bucket_id
end

function sharding.split_tuples_by_replicaset(tuples, space)
local batches = {}

for _, tuple in ipairs(tuples) do
local bucket_id, err = sharding.tuple_set_and_return_bucket_id(tuple, space)
if err ~= nil then
return nil, BucketIDError:new("Failed to get bucket ID: %s", err)
end

local replicaset, err = vshard.router.route(bucket_id)
if replicaset == nil then
return nil, GetReplicasetsError:new("Failed to get replicaset for bucket_id %s: %s", bucket_id, err.err)
end

local tuples_by_replicaset = batches[replicaset] or {}
table.insert(tuples_by_replicaset, tuple)
batches[replicaset] = tuples_by_replicaset
end

return batches
end

return sharding
13 changes: 13 additions & 0 deletions crud/common/utils.lua
Original file line number Diff line number Diff line change
Expand Up @@ -606,4 +606,17 @@ function utils.merge_options(opts_a, opts_b)
return fun.chain(opts_a or {}, opts_b or {}):tomap()
end

function utils.table_extend(list, values)
if values == nil then
return list
end

list = list or {}

return fun.reduce(
function(list, value) table.insert(list, value) return list end,
list, pairs(values)
)
end

return utils
18 changes: 18 additions & 0 deletions test/helper.lua
Original file line number Diff line number Diff line change
Expand Up @@ -332,4 +332,22 @@ function helpers.update_cache(cluster, space_name)
]], {space_name})
end

function helpers.get_n_words_from_err_message(err_msg, n)
local words = string.split(err_msg, " ")

local result_words = {}
local i = 0
for _, word in ipairs(words) do
if i >= n then
break
end

table.insert(result_words, word)

i = i + 1
end

return table.concat(result_words, " ")
end

return helpers
Loading

0 comments on commit 0a947f0

Please sign in to comment.