From b0e034773f87b24fc2bf907b2c870ee673b8a178 Mon Sep 17 00:00:00 2001 From: AnaNek Date: Thu, 11 Nov 2021 12:32:26 +0300 Subject: [PATCH] Implementation of batch insert Batch insert is mostly used for operation with one bucket / one Tarantool node in a transaction. In this case batch insert is more efficient then inserting tuple-by-tuple. Right now CRUD cannot provide batch insert with full consistency. CRUD offers batch insert with partial consistency. That means that full consistency can be provided only on single replicaset using `box` transactions. Part of #193 --- README.md | 138 ++ crud.lua | 10 + crud/common/batching_utils.lua | 38 + crud/common/call.lua | 51 +- crud/common/map_call_cases/base_iter.lua | 76 + .../map_call_cases/base_postprocessor.lua | 84 + .../map_call_cases/batch_insert_iter.lua | 82 + .../map_call_cases/batch_postprocessor.lua | 70 + crud/common/schema.lua | 12 + crud/common/sharding/init.lua | 72 + crud/common/utils.lua | 37 + crud/insert_many.lua | 288 +++ test/entrypoint/srv_batch_operations.lua | 54 + test/integration/ddl_sharding_func_test.lua | 50 + .../ddl_sharding_info_reload_test.lua | 115 +- test/integration/ddl_sharding_key_test.lua | 51 + test/integration/insert_many_test.lua | 1954 +++++++++++++++++ 17 files changed, 3141 insertions(+), 41 deletions(-) create mode 100644 crud/common/batching_utils.lua create mode 100644 crud/common/map_call_cases/base_iter.lua create mode 100644 crud/common/map_call_cases/base_postprocessor.lua create mode 100644 crud/common/map_call_cases/batch_insert_iter.lua create mode 100644 crud/common/map_call_cases/batch_postprocessor.lua create mode 100644 crud/insert_many.lua create mode 100755 test/entrypoint/srv_batch_operations.lua create mode 100644 test/integration/insert_many_test.lua diff --git a/README.md b/README.md index bd5669a46..73c22a004 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,7 @@ It also provides the `crud-storage` and `crud-router` roles for - [Quickstart](#quickstart) - [API](#api) - [Insert](#insert) + - [Insert many](#insert-many) - [Get](#get) - [Update](#update) - [Delete](#delete) @@ -233,6 +234,143 @@ crud.insert_object('customers', { ... ``` +### Insert many + +```lua +-- Insert batch of tuples +local result, err = crud.insert_many(space_name, tuples, opts) +-- Insert batch of objects +local result, err = crud.insert_object_many(space_name, objects, opts) +``` + +where: + +* `space_name` (`string`) - name of the space to insert an object +* `tuples` / `objects` (`table`) - array of tuples/objects to insert +* `opts`: + * `timeout` (`?number`) - `vshard.call` timeout (in seconds) + * `fields` (`?table`) - field names for getting only a subset of fields + * `stop_on_error` (`?boolean`) - stop on a first error and report error + regarding the failed operation and error about what tuples were not + performed, default is `false` + * `rollback_on_error` (`?boolean`) - any failed operation will lead to + rollback on a storage, where the operation is failed, report error + about what tuples were rollback, default is `false` + +Returns metadata and array with inserted rows, array of errors. +Each error object can contain field `operation_data`. + +`operation_data` field can contain: +* tuple for which the error occurred; +* object with an incorrect format; +* tuple the operation on which was performed but + operation was rollback; +* tuple the operation on which was not performed + because operation was stopped by error. + +Right now CRUD cannot provide batch insert with full consistency. +CRUD offers batch insert with partial consistency. That means +that full consistency can be provided only on single replicaset +using `box` transactions. + +**Example:** + +```lua +crud.insert_many('customers', { + {1, box.NULL, 'Elizabeth', 23}, + {2, box.NULL, 'Anastasia', 22}, +}) +--- +- metadata: + - {'name': 'id', 'type': 'unsigned'} + - {'name': 'bucket_id', 'type': 'unsigned'} + - {'name': 'name', 'type': 'string'} + - {'name': 'age', 'type': 'number'} + rows: + - [1, 477, 'Elizabeth', 23] + - [2, 401, 'Anastasia', 22] +... +crud.insert_object_many('customers', { + {id = 3, name = 'Elizabeth', age = 24}, + {id = 10, name = 'Anastasia', age = 21}, +}) +--- +- metadata: + - {'name': 'id', 'type': 'unsigned'} + - {'name': 'bucket_id', 'type': 'unsigned'} + - {'name': 'name', 'type': 'string'} + - {'name': 'age', 'type': 'number'} + rows: + - [3, 2804, 'Elizabeth', 24] + - [10, 569, 'Anastasia', 21] + +-- Partial success +local res, errs = crud.insert_object_many('customers', { + {id = 22, name = 'Alex', age = 34}, + {id = 3, name = 'Anastasia', age = 22}, + {id = 5, name = 'Sergey', age = 25}, +}) +--- +res +- metadata: + - {'name': 'id', 'type': 'unsigned'} + - {'name': 'bucket_id', 'type': 'unsigned'} + - {'name': 'name', 'type': 'string'} + - {'name': 'age', 'type': 'number'} + rows: + - [5, 1172, 'Sergey', 25], + - [22, 655, 'Alex', 34], + +#errs -- 1 +errs[1].class_name -- BatchInsertError +errs[1].err -- 'Duplicate key exists <...>' +errs[1].tuple -- {3, 2804, 'Anastasia', 22} +... + +-- Partial success with stop and rollback on error +-- stop_on_error = true, rollback_on_error = true +-- two error on one storage with rollback, inserts +-- stop by error on this storage inserts before +-- error are rollback +local res, errs = crud.insert_object_many('customers', { + {id = 6, name = 'Alex', age = 34}, + {id = 92, name = 'Artur', age = 29}, + {id = 3, name = 'Anastasia', age = 22}, + {id = 4, name = 'Sergey', age = 25}, + {id = 9, name = 'Anna', age = 30}, + {id = 71, name = 'Oksana', age = 29}, +}, { + stop_on_error = true, + rollback_on_error = true, +}}) +--- +res +- metadata: +- {'name': 'id', 'type': 'unsigned'} +- {'name': 'bucket_id', 'type': 'unsigned'} +- {'name': 'name', 'type': 'string'} +- {'name': 'age', 'type': 'number'} +rows: +- [4, 1161, 'Sergey', 25], +- [6, 1064, 'Alex', 34], +#errs -- 4 +errs[1].class_name -- BatchInsertError +errs[1].err -- 'Duplicate key exists <...>' +errs[1].tuple -- {3, 2804, 'Anastasia', 22} + +errs[2].class_name -- NotPerformedError +errs[2].err -- 'Operation with tuple was not performed' +errs[2].tuple -- {9, 1644, "Anna", 30} + +errs[3].class_name -- NotPerformedError +errs[3].err -- 'Operation with tuple was not performed' +errs[3].tuple -- {71, 1802, "Oksana", 29} + +errs[4].class_name -- NotPerformedError +errs[4].err -- 'Operation with tuple was rollback' +errs[4].tuple -- {92, 2040, "Artur", 29} +``` + ### Get ```lua diff --git a/crud.lua b/crud.lua index 3f2b5c599..9a12115e2 100644 --- a/crud.lua +++ b/crud.lua @@ -4,6 +4,7 @@ local cfg = require('crud.cfg') local insert = require('crud.insert') +local insert_many = require('crud.insert_many') local replace = require('crud.replace') local get = require('crud.get') local update = require('crud.update') @@ -31,6 +32,14 @@ crud.insert = stats.wrap(insert.tuple, stats.op.INSERT) -- @function insert_object crud.insert_object = stats.wrap(insert.object, stats.op.INSERT) +-- @refer insert_many.tuples +-- @function insert_many +crud.insert_many = insert_many.tuples + +-- @refer insert_many.objects +-- @function insert_object_many +crud.insert_object_many = insert_many.objects + -- @refer get.call -- @function get crud.get = stats.wrap(get.call, stats.op.GET) @@ -124,6 +133,7 @@ function crud.init_storage() end insert.init() + insert_many.init() get.init() replace.init() update.init() diff --git a/crud/common/batching_utils.lua b/crud/common/batching_utils.lua new file mode 100644 index 000000000..0ea9403a9 --- /dev/null +++ b/crud/common/batching_utils.lua @@ -0,0 +1,38 @@ +local errors = require('errors') +local dev_checks = require('crud.common.dev_checks') +local sharding_utils = require('crud.common.sharding.utils') + +local NotPerformedError = errors.new_class('NotPerformedError', {capture_stack = false}) + +local batching_utils = {} + +batching_utils.stop_on_error_msg = "Operation with tuple was not performed" +batching_utils.rollback_on_error_msg = "Operation with tuple was rollback" + +function batching_utils.construct_sharding_hash_mismatch_errors(err_msg, tuples) + dev_checks('string', 'table') + + local errs = {} + + for _, tuple in ipairs(tuples) do + local err_obj = sharding_utils.ShardingHashMismatchError:new(err_msg) + err_obj.operation_data = tuple + table.insert(errs, err_obj) + end + + return errs +end + +function batching_utils.complement_batching_errors(errs, err_msg, tuples) + dev_checks('table', 'string', 'table') + + for _, tuple in ipairs(tuples) do + local err_obj = NotPerformedError:new(err_msg) + err_obj.operation_data = tuple + table.insert(errs, err_obj) + end + + return errs +end + +return batching_utils diff --git a/crud/common/call.lua b/crud/common/call.lua index f3a1b997d..a4659e4a2 100644 --- a/crud/common/call.lua +++ b/crud/common/call.lua @@ -6,6 +6,9 @@ local utils = require('crud.common.utils') local sharding_utils = require('crud.common.sharding.utils') local fiber_clock = require('fiber').clock +local BaseIterator = require('crud.common.map_call_cases.base_iter') +local BasePostprocessor = require('crud.common.map_call_cases.base_postprocessor') + local CallError = errors.new_class('CallError') local call = {} @@ -71,6 +74,8 @@ function call.map(func_name, func_args, opts) balance = '?boolean', timeout = '?number', replicasets = '?table', + iter = '?table', + postprocessor = '?table', }) opts = opts or {} @@ -81,24 +86,27 @@ function call.map(func_name, func_args, opts) local timeout = opts.timeout or call.DEFAULT_VSHARD_CALL_TIMEOUT - local replicasets, err - if opts.replicasets ~= nil then - replicasets = opts.replicasets - else - replicasets, err = vshard.router.routeall() - if replicasets == nil then - return nil, CallError:new("Failed to get all replicasets: %s", err.err) + local iter = opts.iter + if iter == nil then + iter, err = BaseIterator:new({func_args = func_args, replicasets = opts.replicasets}) + if err ~= nil then + return nil, err end end + local postprocessor = opts.postprocessor + if postprocessor == nil then + postprocessor = BasePostprocessor:new() + end + local futures_by_replicasets = {} local call_opts = {is_async = true} - for _, replicaset in pairs(replicasets) do - local future = replicaset[vshard_call_name](replicaset, func_name, func_args, call_opts) + while iter:has_next() do + local args, replicaset = iter:get() + local future = replicaset[vshard_call_name](replicaset, func_name, args, call_opts) futures_by_replicasets[replicaset.uuid] = future end - local results = {} local deadline = fiber_clock() + timeout for replicaset_uuid, future in pairs(futures_by_replicasets) do local wait_timeout = deadline - fiber_clock() @@ -107,18 +115,25 @@ function call.map(func_name, func_args, opts) end local result, err = future:wait_result(wait_timeout) - if err == nil and result[1] == nil then - err = result[2] - end - if err ~= nil then - return nil, wrap_vshard_err(err, func_name, replicaset_uuid) - end + local result_info = { + key = replicaset_uuid, + value = result, + } - results[replicaset_uuid] = result + local err_info = { + err_wrapper = wrap_vshard_err, + err = err, + wrapper_args = {func_name, replicaset_uuid}, + } + + local early_exit = postprocessor:collect(result_info, err_info) + if early_exit then + break + end end - return results + return postprocessor:get() end function call.single(bucket_id, func_name, func_args, opts) diff --git a/crud/common/map_call_cases/base_iter.lua b/crud/common/map_call_cases/base_iter.lua new file mode 100644 index 000000000..bb732a754 --- /dev/null +++ b/crud/common/map_call_cases/base_iter.lua @@ -0,0 +1,76 @@ +local errors = require('errors') +local vshard = require('vshard') + +local dev_checks = require('crud.common.dev_checks') +local GetReplicasetsError = errors.new_class('GetReplicasetsError') + +local BaseIterator = {} + +--- Create new base iterator for map call +-- +-- @function new +-- +-- @tparam[opt] table opts +-- Options of BaseIterator:new +-- @tparam[opt] table opts.func_args +-- Function arguments to call +-- @tparam[opt] table opts.replicasets +-- Replicasets to call +-- +-- @return[1] table iterator +-- @treturn[2] nil +-- @treturn[2] table of tables Error description +function BaseIterator:new(opts) + dev_checks('table', { + func_args = '?table', + replicasets = '?table', + }) + + local replicasets, err + if opts.replicasets ~= nil then + replicasets = opts.replicasets + else + replicasets, err = vshard.router.routeall() + if replicasets == nil then + return nil, GetReplicasetsError:new("Failed to get all replicasets: %s", err.err) + end + end + + local next_index, next_replicaset = next(replicasets) + + local iter = { + func_args = opts.func_args, + replicasets = replicasets, + next_replicaset = next_replicaset, + next_index = next_index + } + + setmetatable(iter, self) + self.__index = self + + return iter +end + +--- Check there is next replicaset to call +-- +-- @function has_next +-- +-- @return[1] boolean +function BaseIterator:has_next() + return self.next_index ~= nil +end + +--- Get function arguments and next replicaset +-- +-- @function get +-- +-- @return[1] table func_args +-- @return[2] table replicaset +function BaseIterator:get() + local replicaset = self.next_replicaset + self.next_index, self.next_replicaset = next(self.replicasets, self.next_index) + + return self.func_args, replicaset +end + +return BaseIterator diff --git a/crud/common/map_call_cases/base_postprocessor.lua b/crud/common/map_call_cases/base_postprocessor.lua new file mode 100644 index 000000000..569637705 --- /dev/null +++ b/crud/common/map_call_cases/base_postprocessor.lua @@ -0,0 +1,84 @@ +local dev_checks = require('crud.common.dev_checks') + +local BasePostprocessor = {} + +--- Create new base postprocessor for map call +-- +-- @function new +-- +-- @return[1] table postprocessor +function BasePostprocessor:new() + local postprocessor = { + results = {}, + early_exit = false, + errs = nil + } + + setmetatable(postprocessor, self) + self.__index = self + + return postprocessor +end + +--- Collect data after call +-- +-- @function collect +-- +-- @tparam[opt] table result_info +-- Data of function call result +-- @tparam[opt] result_info.key +-- Key for collecting result +-- @tparam[opt] result_info.value +-- Value for collecting result by result_info.key +-- +-- @tparam[opt] table err_info +-- Data of function call error +-- @tparam[opt] function|table err_info.err_wrapper +-- Wrapper for error formatting +-- @tparam[opt] table|cdata err_info.err +-- Err of function call +-- @tparam[opt] table err_info.wrapper_args +-- Additional args for error wrapper +-- +-- @return[1] boolean early_exit +function BasePostprocessor:collect(result_info, err_info) + dev_checks('table', { + key = '?', + value = '?', + },{ + err_wrapper = 'function|table', + err = '?table|cdata', + wrapper_args = '?table', + }) + + local err = err_info.err + if err == nil and result_info.value[1] == nil then + err = result_info.value[2] + end + + if err ~= nil then + self.results = nil + self.errs = err_info.err_wrapper(err, unpack(err_info.wrapper_args)) + self.early_exit = true + + return self.early_exit + end + + if self.early_exit ~= true then + self.results[result_info.key] = result_info.value + end + + return self.early_exit +end + +--- Get collected data +-- +-- @function get +-- +-- @return[1] table results +-- @return[2] table errs +function BasePostprocessor:get() + return self.results, self.errs +end + +return BasePostprocessor diff --git a/crud/common/map_call_cases/batch_insert_iter.lua b/crud/common/map_call_cases/batch_insert_iter.lua new file mode 100644 index 000000000..24e07a878 --- /dev/null +++ b/crud/common/map_call_cases/batch_insert_iter.lua @@ -0,0 +1,82 @@ +local errors = require('errors') + +local dev_checks = require('crud.common.dev_checks') +local sharding = require('crud.common.sharding') + +local BaseIterator = require('crud.common.map_call_cases.base_iter') + +local SplitTuplesError = errors.new_class('SplitTuplesError') + +local BatchInsertIterator = {} +-- inheritance from BaseIterator +setmetatable(BatchInsertIterator, {__index = BaseIterator}) + +--- Create new batch insert iterator for map call +-- +-- @function new +-- +-- @tparam[opt] table opts +-- Options of BatchInsertIterator:new +-- @tparam[opt] table opts.tuples +-- Tuples to be inserted +-- @tparam[opt] table opts.space +-- Space to be inserted into +-- @tparam[opt] table opts.execute_on_storage_opts +-- Additional opts for call on storage +-- +-- @return[1] table iterator +-- @treturn[2] nil +-- @treturn[2] table of tables Error description +function BatchInsertIterator:new(opts) + dev_checks('table', { + tuples = 'table', + space = 'table', + execute_on_storage_opts = 'table', + }) + + local sharding_data, err = sharding.split_tuples_by_replicaset(opts.tuples, opts.space) + if err ~= nil then + return nil, SplitTuplesError:new("Failed to split tuples by replicaset: %s", err.err) + end + + local next_replicaset, next_batch = next(sharding_data.batches) + + local execute_on_storage_opts = opts.execute_on_storage_opts + execute_on_storage_opts.sharding_func_hash = sharding_data.sharding_func_hash + execute_on_storage_opts.sharding_key_hash = sharding_data.sharding_key_hash + execute_on_storage_opts.skip_sharding_hash_check = sharding_data.skip_sharding_hash_check + + local iter = { + space_name = opts.space.name, + opts = execute_on_storage_opts, + batches_by_replicasets = sharding_data.batches, + next_index = next_replicaset, + next_batch = next_batch, + } + + setmetatable(iter, self) + self.__index = self + + return iter +end + +--- Get function arguments and next replicaset +-- +-- @function get +-- +-- @return[1] table func_args +-- @return[2] table replicaset +function BatchInsertIterator:get() + local replicaset = self.next_index + local func_args = { + self.space_name, + self.next_batch, + self.opts, + } + + self.next_index, self.next_batch = next(self.batches_by_replicasets, self.next_index) + + return func_args, replicaset +end + +return BatchInsertIterator diff --git a/crud/common/map_call_cases/batch_postprocessor.lua b/crud/common/map_call_cases/batch_postprocessor.lua new file mode 100644 index 000000000..2217854de --- /dev/null +++ b/crud/common/map_call_cases/batch_postprocessor.lua @@ -0,0 +1,70 @@ +local dev_checks = require('crud.common.dev_checks') +local utils = require('crud.common.utils') +local sharding_utils = require('crud.common.sharding.utils') + +local BasePostprocessor = require('crud.common.map_call_cases.base_postprocessor') + +local BatchPostprocessor = {} +-- inheritance from BasePostprocessor +setmetatable(BatchPostprocessor, {__index = BasePostprocessor}) + +--- Collect data after call +-- +-- @function collect +-- +-- @tparam[opt] table result_info +-- Data of function call result +-- @tparam[opt] result_info.key +-- Key for collecting result +-- @tparam[opt] result_info.value +-- Value for collecting result by result_info.key +-- +-- @tparam[opt] table err_info +-- Data of function call error +-- @tparam[opt] function|table err_info.err_wrapper +-- Wrapper for error formatting +-- @tparam[opt] table|cdata err_info.err +-- Err of function call +-- @tparam[opt] table err_info.wrapper_args +-- Additional args for error wrapper +-- +-- @return[1] boolean early_exit +function BatchPostprocessor:collect(result_info, err_info) + dev_checks('table', { + key = '?', + value = '?', + },{ + err_wrapper = 'function|table', + err = '?table|cdata', + wrapper_args = '?table', + }) + + local errs = {err_info.err} + if err_info.err == nil then + errs = result_info.value[2] + end + + if errs ~= nil then + for _, err in pairs(errs) do + local err_to_wrap = err + if err.class_name ~= sharding_utils.ShardingHashMismatchError.name and err.err then + err_to_wrap = err.err + end + + local err_obj = err_info.err_wrapper(err_to_wrap, unpack(err_info.wrapper_args)) + err_obj.operation_data = err.operation_data + err_obj.space_schema_hash = err.space_schema_hash + + self.errs = self.errs or {} + table.insert(self.errs, err_obj) + end + end + + if result_info.value ~= nil and result_info.value[1] ~= nil then + self.results = utils.list_extend(self.results, result_info.value[1]) + end + + return self.early_exit +end + +return BatchPostprocessor diff --git a/crud/common/schema.lua b/crud/common/schema.lua index 85487d7fa..0979a5442 100644 --- a/crud/common/schema.lua +++ b/crud/common/schema.lua @@ -244,4 +244,16 @@ function schema.result_needs_reload(space, result) return result.space_schema_hash ~= get_space_schema_hash(space) end +function schema.batching_result_needs_reload(space, results, tuples_count) + local storage_errs_count = 0 + local space_schema_hash = get_space_schema_hash(space) + for _, result in ipairs(results) do + if result.space_schema_hash ~= nil and result.space_schema_hash ~= space_schema_hash then + storage_errs_count = storage_errs_count + 1 + end + end + + return storage_errs_count == tuples_count +end + return schema diff --git a/crud/common/sharding/init.lua b/crud/common/sharding/init.lua index 48bf980c1..db3b9b463 100644 --- a/crud/common/sharding/init.lua +++ b/crud/common/sharding/init.lua @@ -130,6 +130,17 @@ function sharding.result_needs_sharding_reload(err) return err.class_name == sharding_utils.ShardingHashMismatchError.name end +function sharding.batching_result_needs_sharding_reload(errs, tuples_count) + local sharding_errs_count = 0 + for _, err in ipairs(errs) do + if err.class_name == sharding_utils.ShardingHashMismatchError.name then + sharding_errs_count = sharding_errs_count + 1 + end + end + + return sharding_errs_count == tuples_count +end + function sharding.wrap_method(method, space_name, ...) local i = 0 @@ -185,4 +196,65 @@ function sharding.wrap_select_method(method, space_name, ...) return res, err end +--- Split tuples by replicaset for specified space +-- +-- @function split_tuples_by_replicaset +-- +-- @param table tuples +-- Tuples to split +-- +-- @param table space +-- Specified space +-- +-- @return[1] batches +-- Map where key is a replicaset and value +-- is table of tuples related to this replicaset +function sharding.split_tuples_by_replicaset(tuples, space) + dev_checks('table', 'table') + + local batches = {} + + local sharding_func_hash + local sharding_key_hash + local skip_sharding_hash_check + local sharding_data + local err + for _, tuple in ipairs(tuples) do + sharding_data, err = sharding.tuple_set_and_return_bucket_id(tuple, space) + if err ~= nil then + return nil, BucketIDError:new("Failed to get bucket ID: %s", err) + end + + if sharding_data.sharding_func_hash ~= nil then + sharding_func_hash = sharding_data.sharding_func_hash + end + + if sharding_data.sharding_key_hash ~= nil then + sharding_key_hash = sharding_data.sharding_key_hash + end + + if sharding_data.skip_sharding_hash_check ~= true then + skip_sharding_hash_check = false + end + + local replicaset, err = vshard.router.route(sharding_data.bucket_id) + if replicaset == nil then + return nil, GetReplicasetsError:new( + "Failed to get replicaset for bucket_id %s: %s", + sharding_data.bucket_id, err.err) + end + + local tuples_by_replicaset = batches[replicaset] or {} + table.insert(tuples_by_replicaset, tuple) + batches[replicaset] = tuples_by_replicaset + end + + return { + batches = batches, + sharding_func_hash = sharding_func_hash, + sharding_key_hash = sharding_key_hash, + skip_sharding_hash_check = skip_sharding_hash_check, + } +end + return sharding diff --git a/crud/common/utils.lua b/crud/common/utils.lua index bcc961a30..5b3614b25 100644 --- a/crud/common/utils.lua +++ b/crud/common/utils.lua @@ -711,4 +711,41 @@ function utils.update_storage_call_error_description(err, func_name, replicaset_ return err end +--- Insert each value from values to list +-- +-- @function list_extend +-- +-- @param table list +-- List to be extended +-- +-- @param table values +-- Values to be inserted to list +-- +-- @return[1] list +-- List with old values and inserted values +function utils.list_extend(list, values) + dev_checks('table', 'table') + + for _, value in ipairs(values) do + table.insert(list, value) + end + + return list +end + +function utils.list_slice(list, start_index, end_index) + dev_checks('table', 'number', '?number') + + if end_index == nil then + end_index = table.maxn(list) + end + + local slice = {} + for i = start_index, end_index do + table.insert(slice, list[i]) + end + + return slice +end + return utils diff --git a/crud/insert_many.lua b/crud/insert_many.lua new file mode 100644 index 000000000..6d076a6a5 --- /dev/null +++ b/crud/insert_many.lua @@ -0,0 +1,288 @@ +local checks = require('checks') +local errors = require('errors') +local vshard = require('vshard') + +local call = require('crud.common.call') +local const = require('crud.common.const') +local utils = require('crud.common.utils') +local batching_utils = require('crud.common.batching_utils') +local sharding = require('crud.common.sharding') +local dev_checks = require('crud.common.dev_checks') +local schema = require('crud.common.schema') + +local BatchInsertIterator = require('crud.common.map_call_cases.batch_insert_iter') +local BatchPostprocessor = require('crud.common.map_call_cases.batch_postprocessor') + +local InsertManyError = errors.new_class('InsertManyError', {capture_stack = false}) + +local insert_many = {} + +local INSERT_MANY_FUNC_NAME = '_crud.insert_many_on_storage' + +local function insert_many_on_storage(space_name, tuples, opts) + dev_checks('string', 'table', { + add_space_schema_hash = '?boolean', + fields = '?table', + stop_on_error = '?boolean', + rollback_on_error = '?boolean', + sharding_key_hash = '?number', + sharding_func_hash = '?number', + skip_sharding_hash_check = '?boolean', + }) + + opts = opts or {} + + local space = box.space[space_name] + if space == nil then + return nil, {InsertManyError:new("Space %q doesn't exist", space_name)} + end + + local _, err = sharding.check_sharding_hash(space_name, + opts.sharding_func_hash, + opts.sharding_key_hash, + opts.skip_sharding_hash_check) + + if err ~= nil then + return nil, batching_utils.construct_sharding_hash_mismatch_errors(err.err, tuples) + end + + local inserted_tuples = {} + local errs = {} + + box.begin() + for i, tuple in ipairs(tuples) do + -- add_space_schema_hash is true only in case of insert_object_many + -- the only one case when reloading schema can avoid insert error + -- is flattening object on router + local insert_result = schema.wrap_box_space_func_result(space, 'insert', {tuple}, { + add_space_schema_hash = opts.add_space_schema_hash, + field_names = opts.fields, + }) + + if insert_result.err ~= nil then + local err = { + err = insert_result.err, + space_schema_hash = insert_result.space_schema_hash, + operation_data = tuple, + } + + table.insert(errs, err) + + if opts.stop_on_error == true then + local left_tuples = utils.list_slice(tuples, i + 1) + if next(left_tuples) then + errs = batching_utils.complement_batching_errors(errs, + batching_utils.stop_on_error_msg, left_tuples) + end + + if opts.rollback_on_error == true then + box.rollback() + if next(inserted_tuples) then + errs = batching_utils.complement_batching_errors(errs, + batching_utils.rollback_on_error_msg, inserted_tuples) + end + + return nil, errs + end + + box.commit() + + return inserted_tuples, errs + end + end + + table.insert(inserted_tuples, insert_result.res) + end + + if next(errs) ~= nil then + if opts.rollback_on_error == true then + box.rollback() + if next(inserted_tuples) then + errs = batching_utils.complement_batching_errors(errs, + batching_utils.rollback_on_error_msg, inserted_tuples) + end + + return nil, errs + end + + box.commit() + + return inserted_tuples, errs + end + + box.commit() + + return inserted_tuples +end + +function insert_many.init() + _G._crud.insert_many_on_storage = insert_many_on_storage +end + +-- returns result, err, need_reload +-- need_reload indicates if reloading schema could help +-- see crud.common.schema.wrap_func_reload() +local function call_insert_many_on_router(space_name, original_tuples, opts) + dev_checks('string', 'table', { + timeout = '?number', + fields = '?table', + add_space_schema_hash = '?boolean', + stop_on_error = '?boolean', + rollback_on_error = '?boolean', + }) + + opts = opts or {} + + local space = utils.get_space(space_name, vshard.router.routeall()) + if space == nil then + return nil, {InsertManyError:new("Space %q doesn't exist", space_name)}, const.NEED_SCHEMA_RELOAD + end + + local tuples = table.deepcopy(original_tuples) + + local batch_insert_on_storage_opts = { + add_space_schema_hash = opts.add_space_schema_hash, + fields = opts.fields, + stop_on_error = opts.stop_on_error, + rollback_on_error = opts.rollback_on_error, + } + + local iter, err = BatchInsertIterator:new({ + tuples = tuples, + space = space, + execute_on_storage_opts = batch_insert_on_storage_opts, + }) + if err ~= nil then + return nil, {err}, const.NEED_SCHEMA_RELOAD + end + + local postprocessor = BatchPostprocessor:new() + + local rows, errs = call.map(INSERT_MANY_FUNC_NAME, nil, { + timeout = opts.timeout, + mode = 'write', + iter = iter, + postprocessor = postprocessor, + }) + + if errs ~= nil then + local tuples_count = table.maxn(tuples) + if sharding.batching_result_needs_sharding_reload(errs, tuples_count) then + return nil, errs, const.NEED_SHARDING_RELOAD + end + + if schema.batching_result_needs_reload(space, errs, tuples_count) then + return nil, errs, const.NEED_SCHEMA_RELOAD + end + end + + if next(rows) == nil then + return nil, errs + end + + local res, err = utils.format_result(rows, space, opts.fields) + if err ~= nil then + errs = errs or {} + table.insert(errs, err) + return nil, errs + end + + return res, errs +end + +--- Inserts batch of tuples to the specified space +-- +-- @function tuples +-- +-- @param string space_name +-- A space name +-- +-- @param table tuples +-- Tuples +-- +-- @tparam ?table opts +-- Options of batch_insert.tuples_batch +-- +-- @return[1] tuples +-- @treturn[2] nil +-- @treturn[2] table of tables Error description + +function insert_many.tuples(space_name, tuples, opts) + checks('string', 'table', { + timeout = '?number', + fields = '?table', + add_space_schema_hash = '?boolean', + stop_on_error = '?boolean', + rollback_on_error = '?boolean', + }) + + return schema.wrap_func_reload(sharding.wrap_method, + call_insert_many_on_router, space_name, tuples, opts) +end + +--- Inserts batch of objects to the specified space +-- +-- @function objects +-- +-- @param string space_name +-- A space name +-- +-- @param table objs +-- Objects +-- +-- @tparam ?table opts +-- Options of batch_insert.tuples_batch +-- +-- @return[1] objects +-- @treturn[2] nil +-- @treturn[2] table of tables Error description + +function insert_many.objects(space_name, objs, opts) + checks('string', 'table', { + timeout = '?number', + fields = '?table', + stop_on_error = '?boolean', + rollback_on_error = '?boolean', + }) + + -- insert can fail if router uses outdated schema to flatten object + opts = utils.merge_options(opts, {add_space_schema_hash = true}) + + local tuples = {} + local format_errs = {} + + for _, obj in ipairs(objs) do + + local tuple, err = utils.flatten_obj_reload(space_name, obj) + if err ~= nil then + local err_obj = InsertManyError:new("Failed to flatten object: %s", err) + err_obj.operation_data = obj + + if opts.stop_on_error == true then + return nil, {err_obj} + end + + table.insert(format_errs, err_obj) + end + + table.insert(tuples, tuple) + end + + if next(tuples) == nil then + return nil, format_errs + end + + local res, errs = insert_many.tuples(space_name, tuples, opts) + + if next(format_errs) ~= nil then + if errs == nil then + errs = format_errs + else + errs = utils.list_extend(errs, format_errs) + end + end + + return res, errs +end + +return insert_many diff --git a/test/entrypoint/srv_batch_operations.lua b/test/entrypoint/srv_batch_operations.lua new file mode 100755 index 000000000..24aa05f3e --- /dev/null +++ b/test/entrypoint/srv_batch_operations.lua @@ -0,0 +1,54 @@ +#!/usr/bin/env tarantool + +require('strict').on() +_G.is_initialized = function() return false end + +local log = require('log') +local errors = require('errors') +local cartridge = require('cartridge') + +package.preload['customers-storage'] = function() + return { + role_name = 'customers-storage', + init = function() + local engine = os.getenv('ENGINE') or 'memtx' + local customers_space = box.schema.space.create('customers', { + format = { + {name = 'id', type = 'unsigned'}, + {name = 'bucket_id', type = 'unsigned'}, + {name = 'name', type = 'string'}, + {name = 'age', type = 'number'}, + }, + if_not_exists = true, + engine = engine, + }) + customers_space:create_index('id', { + parts = { {field = 'id'} }, + if_not_exists = true, + }) + customers_space:create_index('bucket_id', { + parts = { {field = 'bucket_id'} }, + unique = false, + if_not_exists = true, + }) + end, + } +end + +local ok, err = errors.pcall('CartridgeCfgError', cartridge.cfg, { + advertise_uri = 'localhost:3301', + http_port = 8081, + bucket_count = 3000, + roles = { + 'customers-storage', + 'cartridge.roles.crud-router', + 'cartridge.roles.crud-storage', + }, +}) + +if not ok then + log.error('%s', err) + os.exit(1) +end + +_G.is_initialized = cartridge.is_healthy diff --git a/test/integration/ddl_sharding_func_test.lua b/test/integration/ddl_sharding_func_test.lua index a4114e139..d062290cb 100644 --- a/test/integration/ddl_sharding_func_test.lua +++ b/test/integration/ddl_sharding_func_test.lua @@ -127,6 +127,56 @@ pgroup.test_insert = function(g) t.assert_equals(result, nil) end +pgroup.test_insert_object_many = function(g) + local result, err = g.cluster.main_server.net_box:call( + 'crud.insert_object_many', {g.params.space_name, {{id = 158, name = 'Augustus', age = 48}}}) + t.assert_equals(err, nil) + + t.assert_equals(result.metadata, { + {is_nullable = false, name = 'id', type = 'unsigned'}, + {is_nullable = false, name = 'bucket_id', type = 'unsigned'}, + {is_nullable = false, name = 'name', type = 'string'}, + {is_nullable = false, name = 'age', type = 'number'}, + }) + local objects = crud.unflatten_rows(result.rows, result.metadata) + t.assert_equals(objects, {{id = 158, bucket_id = 8, name = 'Augustus', age = 48}}) + + local conn_s1 = g.cluster:server('s1-master').net_box + -- There is no tuple on s1 that we inserted before using crud.insert_object(). + local result = conn_s1.space[g.params.space_name]:get({158, 'Augustus'}) + t.assert_equals(result, nil) + + local conn_s2 = g.cluster:server('s2-master').net_box + -- There is a tuple on s2 that we inserted before using crud.insert_object(). + local result = conn_s2.space[g.params.space_name]:get({158, 'Augustus'}) + t.assert_equals(result, {158, 8, 'Augustus', 48}) +end + +pgroup.test_insert_many = function(g) + -- Insert a tuple. + local result, err = g.cluster.main_server.net_box:call( + 'crud.insert_many', {g.params.space_name, {{27, box.NULL, 'Ivan', 25}}}) + t.assert_equals(err, nil) + t.assert_equals(result.metadata, { + {is_nullable = false, name = 'id', type = 'unsigned'}, + {is_nullable = false, name = 'bucket_id', type = 'unsigned'}, + {is_nullable = false, name = 'name', type = 'string'}, + {is_nullable = false, name = 'age', type = 'number'}, + }) + t.assert_equals(#result.rows, 1) + t.assert_equals(result.rows[1], {27, 7, 'Ivan', 25}) + + -- There is a tuple on s2 that we inserted before using crud.insert(). + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space[g.params.space_name]:get({27, 'Ivan'}) + t.assert_equals(result, {27, 7, 'Ivan', 25}) + + -- There is no tuple on s1 that we inserted before using crud.insert(). + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space[g.params.space_name]:get({27, 'Ivan'}) + t.assert_equals(result, nil) +end + pgroup.test_replace_object = function(g) -- bucket_id is 596, storage is s-2 local tuple = {8, 596, 'Dimitrion', 20} diff --git a/test/integration/ddl_sharding_info_reload_test.lua b/test/integration/ddl_sharding_info_reload_test.lua index fe307779f..83ed6822e 100644 --- a/test/integration/ddl_sharding_info_reload_test.lua +++ b/test/integration/ddl_sharding_info_reload_test.lua @@ -30,6 +30,8 @@ local pgroup_func_change = t.group('ddl_sharding_func_reload_after_schema_change {engine = 'vinyl'}, }) +local select_limit = 100 + local function start_cluster(g) g.cluster = helpers.Cluster:new({ datadir = fio.tempdir(), @@ -111,6 +113,8 @@ pgroup_key_change.before_each(function(g) local conn_s2 = g.cluster:server('s2-master').net_box local result = conn_s2.space['customers']:get(0) t.assert_equals(result, nil) + + conn_s1.space['customers']:delete(0) end) pgroup_func_change.before_each(function(g) @@ -135,6 +139,8 @@ pgroup_func_change.before_each(function(g) local conn_s2 = g.cluster:server('s2-master').net_box local result = conn_s2.space['customers_pk']:get(0) t.assert_equals(result, {0, 1, 'Emma', 22}) + + conn_s2.space['customers_pk']:delete(0) end) @@ -286,10 +292,26 @@ end local test_tuple = {1, box.NULL, 'Emma', 22} local test_object = { id = 1, bucket_id = box.NULL, name = 'Emma', age = 22 } +local test_tuples_batch = { + {1, box.NULL, 'Emma', 22}, + {2, box.NULL, 'Anton', 19}, + {3, box.NULL, 'Petra', 27}, +} +local test_objects_batch = { + { id = 1, bucket_id = box.NULL, name = 'Emma', age = 22 }, + { id = 2, bucket_id = box.NULL, name = 'Anton', age = 19 }, + { id = 3, bucket_id = box.NULL, name = 'Petra', age = 27 }, +} + -- Sharded by "name" and computed with custom sharding function. local test_customers_new_result = { - s1 = {1, 2861, 'Emma', 22}, - s2 = nil, + s1 = {{1, 2861, 'Emma', 22}}, + s2 = {}, +} + +local test_customers_new_batching_result = { + s1 = {{1, 2861, 'Emma', 22}, {2, 2276, 'Anton', 19}}, + s2 = {{3, 910, 'Petra', 27}}, } local new_space_cases = { @@ -303,6 +325,16 @@ local new_space_cases = { input = {'customers_new', test_object}, result = test_customers_new_result, }, + insert_many = { + func = 'crud.insert_many', + input = {'customers_new', test_tuples_batch}, + result = test_customers_new_batching_result, + }, + insert_object_many = { + func = 'crud.insert_object_many', + input = {'customers_new', test_objects_batch}, + result = test_customers_new_batching_result, + }, replace = { func = 'crud.replace', input = {'customers_new', test_tuple}, @@ -341,11 +373,11 @@ for name, case in pairs(new_space_cases) do -- Assert it is sharded based on updated ddl info. local conn_s1 = g.cluster:server('s1-master').net_box - local result = conn_s1.space['customers_new']:get(1) + local result = conn_s1.space['customers_new']:select(nil, {limit = select_limit}) t.assert_equals(result, case.result.s1) local conn_s2 = g.cluster:server('s2-master').net_box - local result = conn_s2.space['customers_new']:get(1) + local result = conn_s2.space['customers_new']:select(nil, {limit = select_limit}) t.assert_equals(result, case.result.s2) end end @@ -356,18 +388,23 @@ end -- Sharded by "age". local test_customers_age_tuple = {1, 655, 'Emma', 22} local test_customers_age_result = { - s1 = nil, - s2 = test_customers_age_tuple, + s1 = {}, + s2 = {test_customers_age_tuple}, +} + +local test_customers_age_batching_result = { + s1 = {{3, 1811, 'Petra', 27}}, + s2 = {{1, 655, 'Emma', 22}, {2, 1325, 'Anton', 19}}, } local function setup_customers_migrated_data(g) - if test_customers_age_result.s1 ~= nil then + if test_customers_age_result.s1 ~= nil and next(test_customers_age_result.s1) then local conn_s1 = g.cluster:server('s1-master').net_box - conn_s1.space['customers']:insert(test_customers_age_result.s1) + conn_s1.space['customers']:insert(unpack(test_customers_age_result.s1)) end - if test_customers_age_result.s2 ~= nil then + if test_customers_age_result.s2 ~= nil and next(test_customers_age_result.s2) then local conn_s2 = g.cluster:server('s2-master').net_box - conn_s2.space['customers']:insert(test_customers_age_result.s2) + conn_s2.space['customers']:insert(unpack(test_customers_age_result.s2)) end end @@ -382,6 +419,16 @@ local schema_change_sharding_key_cases = { input = {'customers', test_object}, result = test_customers_age_result, }, + insert_many = { + func = 'crud.insert_many', + input = {'customers', test_tuples_batch}, + result = test_customers_age_batching_result, + }, + insert_object_many = { + func = 'crud.insert_object_many', + input = {'customers', test_objects_batch}, + result = test_customers_age_batching_result, + }, replace = { func = 'crud.replace', input = {'customers', test_tuple}, @@ -419,11 +466,11 @@ for name, case in pairs(schema_change_sharding_key_cases) do t.assert_equals(err, nil) local conn_s1 = g.cluster:server('s1-master').net_box - local result = conn_s1.space['customers']:get(1) + local result = conn_s1.space['customers']:select(nil, {limit = select_limit}) t.assert_equals(result, case.result.s1) local conn_s2 = g.cluster:server('s2-master').net_box - local result = conn_s2.space['customers']:get(1) + local result = conn_s2.space['customers']:select(nil, {limit = select_limit}) t.assert_equals(result, case.result.s2) end end @@ -510,18 +557,27 @@ end -- Sharded by 'id' with custom sharding function. local test_customers_pk_func_tuple = {1, 44, "Emma", 22} local test_customers_pk_func = { - s1 = nil, - s2 = test_customers_pk_func_tuple, + s1 = {}, + s2 = {test_customers_pk_func_tuple}, +} + +local test_customers_pk_batching_func = { + s1 = {}, + s2 = { + {1, 44, 'Emma', 22}, + {2, 45, 'Anton', 19}, + {3, 46, 'Petra', 27}, + }, } local function setup_customers_pk_migrated_data(g) - if test_customers_pk_func.s1 ~= nil then + if test_customers_pk_func.s1 ~= nil and next(test_customers_pk_func.s1) then local conn_s1 = g.cluster:server('s1-master').net_box - conn_s1.space['customers_pk']:insert(test_customers_pk_func.s1) + conn_s1.space['customers_pk']:insert(unpack(test_customers_pk_func.s1)) end - if test_customers_pk_func.s2 ~= nil then + if test_customers_pk_func.s2 ~= nil and next(test_customers_pk_func.s2) then local conn_s2 = g.cluster:server('s2-master').net_box - conn_s2.space['customers_pk']:insert(test_customers_pk_func.s2) + conn_s2.space['customers_pk']:insert(unpack(test_customers_pk_func.s2)) end end @@ -536,6 +592,16 @@ local schema_change_sharding_func_cases = { input = {'customers_pk', test_object}, result = test_customers_pk_func, }, + insert_many = { + func = 'crud.insert_many', + input = {'customers_pk', test_tuples_batch}, + result = test_customers_pk_batching_func, + }, + insert_object_many = { + func = 'crud.insert_object_many', + input = {'customers_pk', test_objects_batch}, + result = test_customers_pk_batching_func, + }, replace = { func = 'crud.replace', input = {'customers_pk', test_tuple}, @@ -560,15 +626,18 @@ local schema_change_sharding_func_cases = { before_test = setup_customers_pk_migrated_data, func = 'crud.delete', input = {'customers_pk', 1}, - result = {}, + result = { + s1 = {}, + s2 = {}, + }, }, update = { before_test = setup_customers_pk_migrated_data, func = 'crud.update', input = {'customers_pk', 1, {{'+', 4, 1}}}, result = { - s1 = nil, - s2 = {1, 44, "Emma", 23}, + s1 = {}, + s2 = {{1, 44, "Emma", 23}}, }, }, } @@ -593,11 +662,11 @@ for name, case in pairs(schema_change_sharding_func_cases) do t.assert_equals(err, nil) local conn_s1 = g.cluster:server('s1-master').net_box - local result = conn_s1.space['customers_pk']:get(1) + local result = conn_s1.space['customers_pk']:select(nil, {limit = select_limit}) t.assert_equals(result, case.result.s1) local conn_s2 = g.cluster:server('s2-master').net_box - local result = conn_s2.space['customers_pk']:get(1) + local result = conn_s2.space['customers_pk']:select(nil, {limit = select_limit}) t.assert_equals(result, case.result.s2) end end diff --git a/test/integration/ddl_sharding_key_test.lua b/test/integration/ddl_sharding_key_test.lua index 6ac321b19..ac7b831b4 100644 --- a/test/integration/ddl_sharding_key_test.lua +++ b/test/integration/ddl_sharding_key_test.lua @@ -103,6 +103,57 @@ pgroup.test_insert = function(g) t.assert_equals(result, nil) end +pgroup.test_insert_object_many = function(g) + local result, err = g.cluster.main_server.net_box:call( + 'crud.insert_object_many', {'customers_name_key', {{id = 1, name = 'Augustus', age = 48}}}) + t.assert_equals(err, nil) + + t.assert_equals(result.metadata, { + {is_nullable = false, name = 'id', type = 'unsigned'}, + {is_nullable = false, name = 'bucket_id', type = 'unsigned'}, + {is_nullable = false, name = 'name', type = 'string'}, + {is_nullable = false, name = 'age', type = 'number'}, + }) + local objects = crud.unflatten_rows(result.rows, result.metadata) + t.assert_equals(objects, {{id = 1, bucket_id = 782, name = 'Augustus', age = 48}}) + + local conn_s1 = g.cluster:server('s1-master').net_box + -- There is no tuple on s1 that we inserted before using crud.insert_object_many(). + local result = conn_s1.space['customers_name_key']:get({1, 'Augustus'}) + t.assert_equals(result, nil) + + local conn_s2 = g.cluster:server('s2-master').net_box + -- There is a tuple on s2 that we inserted before using crud.insert_object_many(). + local result = conn_s2.space['customers_name_key']:get({1, 'Augustus'}) + t.assert_equals(result, {1, 782, 'Augustus', 48}) + +end + +pgroup.test_insert_many = function(g) + -- Insert a tuple. + local result, err = g.cluster.main_server.net_box:call( + 'crud.insert_many', {'customers_name_key', {{2, box.NULL, 'Ivan', 20}}}) + t.assert_equals(err, nil) + t.assert_equals(result.metadata, { + {is_nullable = false, name = 'id', type = 'unsigned'}, + {is_nullable = false, name = 'bucket_id', type = 'unsigned'}, + {is_nullable = false, name = 'name', type = 'string'}, + {is_nullable = false, name = 'age', type = 'number'}, + }) + t.assert_equals(#result.rows, 1) + t.assert_equals(result.rows[1], {2, 1366, 'Ivan', 20}) + + -- There is a tuple on s2 that we inserted before using crud.insert_many(). + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers_name_key']:get({2, 'Ivan'}) + t.assert_equals(result, {2, 1366, 'Ivan', 20}) + + -- There is no tuple on s1 that we inserted before using crud.insert_many(). + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers_name_key']:get({2, 'Ivan'}) + t.assert_equals(result, nil) +end + pgroup.test_replace = function(g) -- bucket_id is 596, storage is s-2 local tuple = {7, 596, 'Dimitrion', 20} diff --git a/test/integration/insert_many_test.lua b/test/integration/insert_many_test.lua new file mode 100644 index 000000000..e5f34faff --- /dev/null +++ b/test/integration/insert_many_test.lua @@ -0,0 +1,1954 @@ +local fio = require('fio') + +local t = require('luatest') +local crud = require('crud') + +local helpers = require('test.helper') + +local batching_utils = require('crud.common.batching_utils') + +local pgroup = t.group('insert_many', { + {engine = 'memtx'}, + {engine = 'vinyl'}, +}) + +pgroup.before_all(function(g) + g.cluster = helpers.Cluster:new({ + datadir = fio.tempdir(), + server_command = helpers.entrypoint('srv_batch_operations'), + use_vshard = true, + replicasets = helpers.get_test_replicasets(), + env = { + ['ENGINE'] = g.params.engine, + }, + }) + + g.cluster:start() +end) + +pgroup.after_all(function(g) helpers.stop_cluster(g.cluster) end) + +pgroup.before_each(function(g) + helpers.truncate_space_on_cluster(g.cluster, 'customers') +end) + +pgroup.test_non_existent_space = function(g) + -- insert_many + local result, errs = g.cluster.main_server.net_box:call('crud.insert_many', { + 'non_existent_space', + { + {1, box.NULL, 'Alex', 59}, + {2, box.NULL, 'Anna', 23}, + {3, box.NULL, 'Daria', 18} + } + }) + + t.assert_equals(result, nil) + t.assert_not_equals(errs, nil) + t.assert_equals(#errs, 1) + t.assert_str_contains(errs[1].err, 'Space "non_existent_space" doesn\'t exist') + + -- insert_object_many + -- default: stop_on_error == false + local result, errs = g.cluster.main_server.net_box:call('crud.insert_object_many', { + 'non_existent_space', + { + {id = 1, name = 'Fedor', age = 59}, + {id = 2, name = 'Anna', age = 23}, + {id = 3, name = 'Daria', age = 18} + } + }) + + t.assert_equals(result, nil) + t.assert_not_equals(errs, nil) + + -- we got 3 errors about non existent space, because it caused by flattening objects + t.assert_equals(#errs, 3) + t.assert_str_contains(errs[1].err, 'Space "non_existent_space" doesn\'t exist') + t.assert_str_contains(errs[2].err, 'Space "non_existent_space" doesn\'t exist') + t.assert_str_contains(errs[3].err, 'Space "non_existent_space" doesn\'t exist') + + -- insert_object_many + -- stop_on_error == true + local result, errs = g.cluster.main_server.net_box:call('crud.insert_object_many', { + 'non_existent_space', + { + {id = 1, name = 'Fedor', age = 59}, + {id = 2, name = 'Anna', age = 23}, + {id = 3, name = 'Daria', age = 18} + }, + { + stop_on_error = true, + } + }) + + t.assert_equals(result, nil) + t.assert_not_equals(errs, nil) + + -- we got 1 error about non existent space, because stop_on_error == true + t.assert_equals(#errs, 1) + t.assert_str_contains(errs[1].err, 'Space "non_existent_space" doesn\'t exist') +end + +pgroup.test_object_bad_format = function(g) + -- bad format + local result, errs = g.cluster.main_server.net_box:call('crud.insert_object_many', { + 'customers', + { + {id = 1, name = 'Fedor', age = 59}, + {id = 2, name = 'Anna'}, + } + }) + + t.assert_not_equals(errs, nil) + t.assert_equals(#errs, 1) + t.assert_str_contains(errs[1].err, 'Field \"age\" isn\'t nullable') + t.assert_equals(errs[1].operation_data, {id = 2, name = 'Anna'}) + + t.assert_equals(result.metadata, { + {name = 'id', type = 'unsigned'}, + {name = 'bucket_id', type = 'unsigned'}, + {name = 'name', type = 'string'}, + {name = 'age', type = 'number'}, + }) + + local objects = crud.unflatten_rows(result.rows, result.metadata) + t.assert_items_equals(objects, { + {id = 1, name = 'Fedor', age = 59, bucket_id = 477}, + }) + + -- get + -- primary key = 1 -> bucket_id = 477 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(1) + t.assert_equals(result, {1, 477, 'Fedor', 59}) + + -- bad format + local result, errs = g.cluster.main_server.net_box:call('crud.insert_object_many', { + 'customers', + { + {id = 1, name = 'Fedor', age = 59}, + {id = 2, name = 'Anna'}, + } + }) + + t.assert_equals(result, nil) + t.assert_not_equals(errs, nil) + t.assert_equals(#errs, 2) + + t.assert_str_contains(errs[1].err, 'Duplicate key exists') + t.assert_equals(errs[1].operation_data, {1, 477, "Fedor", 59}) + + t.assert_str_contains(errs[2].err, 'Field \"age\" isn\'t nullable') + t.assert_equals(errs[2].operation_data, {id = 2, name = 'Anna'}) + + -- get + -- primary key = 1 -> bucket_id = 477 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(1) + t.assert_equals(result, {1, 477, 'Fedor', 59}) + + -- bad format + -- two errors, default: stop_on_error == false + local result, errs = g.cluster.main_server.net_box:call('crud.insert_object_many', { + 'customers', + { + {id = 1, name = 'Fedor'}, + {id = 2, name = 'Anna'}, + } + }) + + t.assert_equals(result, nil) + t.assert_not_equals(errs, nil) + t.assert_equals(#errs, 2) + + table.sort(errs, function(err1, err2) return err1.operation_data.id < err2.operation_data.id end) + + t.assert_str_contains(errs[1].err, 'Field \"age\" isn\'t nullable') + t.assert_equals(errs[1].operation_data, {id = 1, name = 'Fedor'}) + + t.assert_str_contains(errs[2].err, 'Field \"age\" isn\'t nullable') + t.assert_equals(errs[2].operation_data, {id = 2, name = 'Anna'}) +end + +pgroup.test_all_success = function(g) + -- insert_many + -- all success + local result, errs = g.cluster.main_server.net_box:call('crud.insert_many', { + 'customers', + { + {1, box.NULL, 'Fedor', 59}, + {2, box.NULL, 'Anna', 23}, + {3, box.NULL, 'Daria', 18} + } + }) + + t.assert_equals(errs, nil) + t.assert_equals(result.metadata, { + {name = 'id', type = 'unsigned'}, + {name = 'bucket_id', type = 'unsigned'}, + {name = 'name', type = 'string'}, + {name = 'age', type = 'number'}, + }) + + local objects = crud.unflatten_rows(result.rows, result.metadata) + t.assert_items_equals(objects, { + {id = 1, name = 'Fedor', age = 59, bucket_id = 477}, + {id = 2, name = 'Anna', age = 23, bucket_id = 401}, + {id = 3, name = 'Daria', age = 18, bucket_id = 2804}, + }) + + -- get + -- primary key = 1 -> bucket_id = 477 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(1) + t.assert_equals(result, {1, 477, 'Fedor', 59}) + + -- primary key = 2 -> bucket_id = 401 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(2) + t.assert_equals(result, {2, 401, 'Anna', 23}) + + -- primary key = 3 -> bucket_id = 2804 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(3) + t.assert_equals(result, {3, 2804, 'Daria', 18}) +end + +pgroup.test_object_all_success = function(g) + -- batch_insert_object + -- all success + local result, errs = g.cluster.main_server.net_box:call('crud.insert_object_many', { + 'customers', + { + {id = 1, name = 'Fedor', age = 59}, + {id = 2, name = 'Anna', age = 23}, + {id = 3, name = 'Daria', age = 18} + } + }) + + t.assert_equals(errs, nil) + t.assert_equals(result.metadata, { + {name = 'id', type = 'unsigned'}, + {name = 'bucket_id', type = 'unsigned'}, + {name = 'name', type = 'string'}, + {name = 'age', type = 'number'}, + }) + + local objects = crud.unflatten_rows(result.rows, result.metadata) + t.assert_items_equals(objects, { + {id = 1, name = 'Fedor', age = 59, bucket_id = 477}, + {id = 2, name = 'Anna', age = 23, bucket_id = 401}, + {id = 3, name = 'Daria', age = 18, bucket_id = 2804}, + }) + + -- get + -- primary key = 1 -> bucket_id = 477 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(1) + t.assert_equals(result, {1, 477, 'Fedor', 59}) + + -- primary key = 2 -> bucket_id = 401 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(2) + t.assert_equals(result, {2, 401, 'Anna', 23}) + + -- primary key = 3 -> bucket_id = 2804 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(3) + t.assert_equals(result, {3, 2804, 'Daria', 18}) +end + +pgroup.test_one_error = function(g) + -- insert + -- primary key = 3 -> bucket_id = 2804 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:insert({3, 2804, 'Daria', 18}) + t.assert_equals(result, {3, 2804, 'Daria', 18}) + + -- insert_many + -- default: stop_on_error = false, rollback_on_error = false + -- one error on one storage without rollback + local result, errs = g.cluster.main_server.net_box:call('crud.insert_many', { + 'customers', + { + {22, box.NULL, 'Alex', 34}, + {3, box.NULL, 'Anastasia', 22}, + {5, box.NULL, 'Sergey', 25}, + {9, box.NULL, 'Anna', 30}, + } + }) + + t.assert_not_equals(errs, nil) + t.assert_equals(#errs, 1) + t.assert_str_contains(errs[1].err, 'Duplicate key exists') + t.assert_equals(errs[1].operation_data, {3, 2804, 'Anastasia', 22}) + t.assert_equals(result.metadata, { + {name = 'id', type = 'unsigned'}, + {name = 'bucket_id', type = 'unsigned'}, + {name = 'name', type = 'string'}, + {name = 'age', type = 'number'}, + }) + + local objects = crud.unflatten_rows(result.rows, result.metadata) + t.assert_items_equals(objects, { + {id = 5, name = 'Sergey', age = 25, bucket_id = 1172}, + {id = 9, name = 'Anna', age = 30, bucket_id = 1644}, + {id = 22, name = 'Alex', age = 34, bucket_id = 655}, + }) + + -- get + -- primary key = 22 -> bucket_id = 655 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(22) + t.assert_equals(result, {22, 655, 'Alex', 34}) + + -- primary key = 5 -> bucket_id = 1172 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(5) + t.assert_equals(result, {5, 1172, 'Sergey', 25}) + + -- primary key = 3 -> bucket_id = 2804 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(3) + t.assert_equals(result, {3, 2804, 'Daria', 18}) + + -- primary key = 9 -> bucket_id = 1644 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(9) + t.assert_equals(result, {9, 1644, 'Anna', 30}) +end + +pgroup.test_object_one_error = function(g) + -- insert + -- primary key = 3 -> bucket_id = 2804 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:insert({3, 2804, 'Daria', 18}) + t.assert_equals(result, {3, 2804, 'Daria', 18}) + + -- batch_insert_object again + -- default: stop_on_error = false, rollback_on_error = false + -- one error on one storage without rollback + local result, errs = g.cluster.main_server.net_box:call('crud.insert_object_many', { + 'customers', + { + {id = 22, name = 'Alex', age = 34}, + {id = 3, name = 'Anastasia', age = 22}, + {id = 5, name = 'Sergey', age = 25}, + {id = 9, name = 'Anna', age = 30}, + } + }) + + t.assert_not_equals(errs, nil) + t.assert_equals(#errs, 1) + t.assert_str_contains(errs[1].err, 'Duplicate key exists') + t.assert_equals(errs[1].operation_data, {3, 2804, 'Anastasia', 22}) + t.assert_equals(result.metadata, { + {name = 'id', type = 'unsigned'}, + {name = 'bucket_id', type = 'unsigned'}, + {name = 'name', type = 'string'}, + {name = 'age', type = 'number'}, + }) + + local objects = crud.unflatten_rows(result.rows, result.metadata) + t.assert_items_equals(objects, { + {id = 5, name = 'Sergey', age = 25, bucket_id = 1172}, + {id = 9, name = 'Anna', age = 30, bucket_id = 1644}, + {id = 22, name = 'Alex', age = 34, bucket_id = 655}, + }) + + -- get + -- primary key = 22 -> bucket_id = 655 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(22) + t.assert_equals(result, {22, 655, 'Alex', 34}) + + -- primary key = 5 -> bucket_id = 1172 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(5) + t.assert_equals(result, {5, 1172, 'Sergey', 25}) + + -- primary key = 3 -> bucket_id = 2804 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(3) + t.assert_equals(result, {3, 2804, 'Daria', 18}) + + -- primary key = 9 -> bucket_id = 1644 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(9) + t.assert_equals(result, {9, 1644, 'Anna', 30}) +end + +pgroup.test_many_errors = function(g) + -- insert + -- primary key = 2 -> bucket_id = 401 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:insert({2, 401, 'Anna', 23}) + t.assert_equals(result, {2, 401, 'Anna', 23}) + + -- primary key = 3 -> bucket_id = 2804 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:insert({3, 2804, 'Daria', 18}) + t.assert_equals(result, {3, 2804, 'Daria', 18}) + + -- insert_many + -- fails for both: s1-master s2-master + -- one error on each storage, one success on each storage + local result, errs = g.cluster.main_server.net_box:call('crud.insert_many', { + 'customers', + { + {2, box.NULL, 'Alex', 34}, + {3, box.NULL, 'Anastasia', 22}, + {10, box.NULL, 'Sergey', 25}, + {92, box.NULL, 'Artur', 29}, + } + }) + + t.assert_not_equals(errs, nil) + t.assert_equals(#errs, 2) + + table.sort(errs, function(err1, err2) return err1.operation_data[1] < err2.operation_data[1] end) + + t.assert_str_contains(errs[1].err, 'Duplicate key exists') + t.assert_equals(errs[1].operation_data, {2, 401, 'Alex', 34}) + + t.assert_str_contains(errs[2].err, 'Duplicate key exists') + t.assert_equals(errs[2].operation_data, {3, 2804, 'Anastasia', 22}) + + t.assert_equals(result.metadata, { + {name = 'id', type = 'unsigned'}, + {name = 'bucket_id', type = 'unsigned'}, + {name = 'name', type = 'string'}, + {name = 'age', type = 'number'}, + }) + + local objects = crud.unflatten_rows(result.rows, result.metadata) + t.assert_items_equals(objects, { + {id = 10, name = 'Sergey', age = 25, bucket_id = 569}, + {id = 92, name = 'Artur', age = 29, bucket_id = 2040}, + }) + + -- primary key = 2 -> bucket_id = 401 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(2) + t.assert_equals(result, {2, 401, 'Anna', 23}) + + -- primary key = 3 -> bucket_id = 2804 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(3) + t.assert_equals(result, {3, 2804, 'Daria', 18}) + + -- primary key = 10 -> bucket_id = 569 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(10) + t.assert_equals(result, {10, 569, 'Sergey', 25}) + + -- primary key = 92 -> bucket_id = 2040 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(92) + t.assert_equals(result, {92, 2040, 'Artur', 29}) +end + +pgroup.test_object_many_errors = function(g) + -- insert + -- primary key = 2 -> bucket_id = 401 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:insert({2, 401, 'Anna', 23}) + t.assert_equals(result, {2, 401, 'Anna', 23}) + + -- primary key = 3 -> bucket_id = 2804 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:insert({3, 2804, 'Daria', 18}) + t.assert_equals(result, {3, 2804, 'Daria', 18}) + + -- batch_insert_object again + -- fails for both: s1-master s2-master + -- one error on each storage, one success on each storage + local result, errs = g.cluster.main_server.net_box:call('crud.insert_object_many', { + 'customers', + { + {id = 2, name = 'Alex', age = 34}, + {id = 3, name = 'Anastasia', age = 22}, + {id = 10, name = 'Sergey', age = 25}, + {id = 92, name = 'Artur', age = 29}, + } + }) + + t.assert_not_equals(errs, nil) + t.assert_equals(#errs, 2) + + table.sort(errs, function(err1, err2) return err1.operation_data[1] < err2.operation_data[1] end) + + t.assert_str_contains(errs[1].err, 'Duplicate key exists') + t.assert_equals(errs[1].operation_data, {2, 401, 'Alex', 34}) + + t.assert_str_contains(errs[2].err, 'Duplicate key exists') + t.assert_equals(errs[2].operation_data, {3, 2804, 'Anastasia', 22}) + + t.assert_equals(result.metadata, { + {name = 'id', type = 'unsigned'}, + {name = 'bucket_id', type = 'unsigned'}, + {name = 'name', type = 'string'}, + {name = 'age', type = 'number'}, + }) + + local objects = crud.unflatten_rows(result.rows, result.metadata) + t.assert_items_equals(objects, { + {id = 10, name = 'Sergey', age = 25, bucket_id = 569}, + {id = 92, name = 'Artur', age = 29, bucket_id = 2040}, + }) + + -- primary key = 2 -> bucket_id = 401 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(2) + t.assert_equals(result, {2, 401, 'Anna', 23}) + + -- primary key = 3 -> bucket_id = 2804 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(3) + t.assert_equals(result, {3, 2804, 'Daria', 18}) + + -- primary key = 10 -> bucket_id = 569 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(10) + t.assert_equals(result, {10, 569, 'Sergey', 25}) + + -- primary key = 92 -> bucket_id = 2040 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(92) + t.assert_equals(result, {92, 2040, 'Artur', 29}) +end + +pgroup.test_no_success = function(g) + -- insert + -- primary key = 2 -> bucket_id = 401 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:insert({2, 401, 'Anna', 23}) + t.assert_equals(result, {2, 401, 'Anna', 23}) + + -- primary key = 3 -> bucket_id = 2804 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:insert({3, 2804, 'Daria', 18}) + t.assert_equals(result, {3, 2804, 'Daria', 18}) + + -- primary key = 10 -> bucket_id = 569 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:insert({10, 569, 'Sergey', 25}) + t.assert_equals(result, {10, 569, 'Sergey', 25}) + + -- primary key = 92 -> bucket_id = 2040 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:insert({92, 2040, 'Artur', 29}) + t.assert_equals(result, {92, 2040, 'Artur', 29}) + + -- insert_many again + -- fails for both: s1-master s2-master + -- no success + local result, errs = g.cluster.main_server.net_box:call('crud.insert_many', { + 'customers', + { + {2, box.NULL, 'Alex', 34}, + {3, box.NULL, 'Anastasia', 22}, + {10, box.NULL, 'Vlad', 25}, + {92, box.NULL, 'Mark', 29}, + } + }) + + t.assert_equals(result, nil) + t.assert_not_equals(errs, nil) + t.assert_equals(#errs, 4) + + table.sort(errs, function(err1, err2) return err1.operation_data[1] < err2.operation_data[1] end) + + t.assert_str_contains(errs[1].err, 'Duplicate key exists') + t.assert_equals(errs[1].operation_data, {2, 401, 'Alex', 34}) + + t.assert_str_contains(errs[2].err, 'Duplicate key exists') + t.assert_equals(errs[2].operation_data, {3, 2804, 'Anastasia', 22}) + + t.assert_str_contains(errs[3].err, 'Duplicate key exists') + t.assert_equals(errs[3].operation_data, {10, 569, 'Vlad', 25}) + + t.assert_str_contains(errs[4].err, 'Duplicate key exists') + t.assert_equals(errs[4].operation_data, {92, 2040, 'Mark', 29}) + + -- primary key = 2 -> bucket_id = 401 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(2) + t.assert_equals(result, {2, 401, 'Anna', 23}) + + -- primary key = 3 -> bucket_id = 2804 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(3) + t.assert_equals(result, {3, 2804, 'Daria', 18}) + + -- primary key = 10 -> bucket_id = 569 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(10) + t.assert_equals(result, {10, 569, 'Sergey', 25}) + + -- primary key = 92 -> bucket_id = 2040 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(92) + t.assert_equals(result, {92, 2040, 'Artur', 29}) +end + +pgroup.test_object_no_success = function(g) + -- insert + -- primary key = 2 -> bucket_id = 401 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:insert({2, 401, 'Anna', 23}) + t.assert_equals(result, {2, 401, 'Anna', 23}) + + -- primary key = 3 -> bucket_id = 2804 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:insert({3, 2804, 'Daria', 18}) + t.assert_equals(result, {3, 2804, 'Daria', 18}) + + -- primary key = 10 -> bucket_id = 569 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:insert({10, 569, 'Sergey', 25}) + t.assert_equals(result, {10, 569, 'Sergey', 25}) + + -- primary key = 92 -> bucket_id = 2040 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:insert({92, 2040, 'Artur', 29}) + t.assert_equals(result, {92, 2040, 'Artur', 29}) + + -- batch_insert_object again + -- fails for both: s1-master s2-master + -- no success + local result, errs = g.cluster.main_server.net_box:call('crud.insert_object_many', { + 'customers', + { + {id = 2, name = 'Alex', age = 34}, + {id = 3, name = 'Anastasia', age = 22}, + {id = 10, name = 'Vlad', age = 25}, + {id = 92, name = 'Mark', age = 29}, + } + }) + + t.assert_equals(result, nil) + t.assert_not_equals(errs, nil) + t.assert_equals(#errs, 4) + + table.sort(errs, function(err1, err2) return err1.operation_data[1] < err2.operation_data[1] end) + + t.assert_str_contains(errs[1].err, 'Duplicate key exists') + t.assert_equals(errs[1].operation_data, {2, 401, 'Alex', 34}) + + t.assert_str_contains(errs[2].err, 'Duplicate key exists') + t.assert_equals(errs[2].operation_data, {3, 2804, 'Anastasia', 22}) + + t.assert_str_contains(errs[3].err, 'Duplicate key exists') + t.assert_equals(errs[3].operation_data, {10, 569, 'Vlad', 25}) + + t.assert_str_contains(errs[4].err, 'Duplicate key exists') + t.assert_equals(errs[4].operation_data, {92, 2040, 'Mark', 29}) + + -- primary key = 2 -> bucket_id = 401 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(2) + t.assert_equals(result, {2, 401, 'Anna', 23}) + + -- primary key = 3 -> bucket_id = 2804 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(3) + t.assert_equals(result, {3, 2804, 'Daria', 18}) + + -- primary key = 10 -> bucket_id = 569 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(10) + t.assert_equals(result, {10, 569, 'Sergey', 25}) + + -- primary key = 92 -> bucket_id = 2040 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(92) + t.assert_equals(result, {92, 2040, 'Artur', 29}) +end + +pgroup.test_object_bad_format_stop_on_error = function(g) + -- bad format + -- two errors, stop_on_error == true + local result, errs = g.cluster.main_server.net_box:call('crud.insert_object_many', { + 'customers', + { + {id = 1, name = 'Fedor'}, + {id = 2, name = 'Anna'}, + }, + { + stop_on_error = true, + } + }) + + t.assert_equals(result, nil) + t.assert_not_equals(errs, nil) + t.assert_equals(#errs, 1) + + t.assert_str_contains(errs[1].err, 'Field \"age\" isn\'t nullable') + t.assert_equals(errs[1].operation_data, {id = 1, name = 'Fedor'}) +end + +pgroup.test_all_success_stop_on_error = function(g) + -- insert_many + -- all success + local result, errs = g.cluster.main_server.net_box:call('crud.insert_many', { + 'customers', + { + {2, box.NULL, 'Anna', 23}, + {3, box.NULL, 'Daria', 18}, + {71, box.NULL, 'Oleg', 32} + }, + { + stop_on_error = true, + } + }) + + t.assert_equals(errs, nil) + t.assert_equals(result.metadata, { + {name = 'id', type = 'unsigned'}, + {name = 'bucket_id', type = 'unsigned'}, + {name = 'name', type = 'string'}, + {name = 'age', type = 'number'}, + }) + + local objects = crud.unflatten_rows(result.rows, result.metadata) + t.assert_items_equals(objects, { + {id = 2, name = 'Anna', age = 23, bucket_id = 401}, + {id = 3, name = 'Daria', age = 18, bucket_id = 2804}, + {id = 71, name = 'Oleg', age = 32, bucket_id = 1802}, + }) + + -- get + -- primary key = 2 -> bucket_id = 401 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(2) + t.assert_equals(result, {2, 401, 'Anna', 23}) + + -- primary key = 3 -> bucket_id = 2804 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(3) + t.assert_equals(result, {3, 2804, 'Daria', 18}) + + -- primary key = 71 -> bucket_id = 1802 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(71) + t.assert_equals(result, {71, 1802, 'Oleg', 32}) +end + +pgroup.test_object_all_success_stop_on_error = function(g) + -- batch_insert_object + -- all success + local result, errs = g.cluster.main_server.net_box:call('crud.insert_object_many', { + 'customers', + { + {id = 2, name = 'Anna', age = 23}, + {id = 3, name = 'Daria', age = 18}, + {id = 71, name = 'Oleg', age = 32} + }, + { + stop_on_error = true, + } + }) + + t.assert_equals(errs, nil) + t.assert_equals(result.metadata, { + {name = 'id', type = 'unsigned'}, + {name = 'bucket_id', type = 'unsigned'}, + {name = 'name', type = 'string'}, + {name = 'age', type = 'number'}, + }) + + local objects = crud.unflatten_rows(result.rows, result.metadata) + t.assert_items_equals(objects, { + {id = 2, name = 'Anna', age = 23, bucket_id = 401}, + {id = 3, name = 'Daria', age = 18, bucket_id = 2804}, + {id = 71, name = 'Oleg', age = 32, bucket_id = 1802}, + }) + + -- get + -- primary key = 2 -> bucket_id = 401 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(2) + t.assert_equals(result, {2, 401, 'Anna', 23}) + + -- primary key = 3 -> bucket_id = 2804 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(3) + t.assert_equals(result, {3, 2804, 'Daria', 18}) + + -- primary key = 71 -> bucket_id = 1802 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(71) + t.assert_equals(result, {71, 1802, 'Oleg', 32}) +end + +pgroup.test_partial_success_stop_on_error = function(g) + -- insert + -- primary key = 3 -> bucket_id = 2804 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:insert({3, 2804, 'Daria', 18}) + t.assert_equals(result, {3, 2804, 'Daria', 18}) + + -- primary key = 9 -> bucket_id = 1644 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:insert({9, 1644, 'Nicolo', 35}) + t.assert_equals(result, {9, 1644, 'Nicolo', 35}) + + -- insert_many + -- stop_on_error = true, rollback_on_error = false + -- one error on one storage without rollback, inserts stop by error on this storage + -- inserts before error are successful + local result, errs = g.cluster.main_server.net_box:call('crud.insert_many', { + 'customers', + { + {22, box.NULL, 'Alex', 34}, + {92, box.NULL, 'Artur', 29}, + {3, box.NULL, 'Anastasia', 22}, + {5, box.NULL, 'Sergey', 25}, + {9, box.NULL, 'Anna', 30}, + }, + { + stop_on_error = true, + } + }) + + t.assert_not_equals(errs, nil) + t.assert_equals(#errs, 2) + + table.sort(errs, function(err1, err2) return err1.operation_data[1] < err2.operation_data[1] end) + + t.assert_str_contains(errs[1].err, 'Duplicate key exists') + t.assert_equals(errs[1].operation_data, {3, 2804, 'Anastasia', 22}) + + t.assert_str_contains(errs[2].err, batching_utils.stop_on_error_msg) + t.assert_equals(errs[2].operation_data, {9, 1644, "Anna", 30}) + + t.assert_equals(result.metadata, { + {name = 'id', type = 'unsigned'}, + {name = 'bucket_id', type = 'unsigned'}, + {name = 'name', type = 'string'}, + {name = 'age', type = 'number'}, + }) + + local objects = crud.unflatten_rows(result.rows, result.metadata) + t.assert_items_equals(objects, { + {id = 5, name = 'Sergey', age = 25, bucket_id = 1172}, + {id = 22, name = 'Alex', age = 34, bucket_id = 655}, + {id = 92, name = 'Artur', age = 29, bucket_id = 2040}, + }) + + -- get + -- primary key = 22 -> bucket_id = 655 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(22) + t.assert_equals(result, {22, 655, 'Alex', 34}) + + -- primary key = 92 -> bucket_id = 2040 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(92) + t.assert_equals(result, {92, 2040, 'Artur', 29}) + + -- primary key = 5 -> bucket_id = 1172 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(5) + t.assert_equals(result, {5, 1172, 'Sergey', 25}) + + -- primary key = 3 -> bucket_id = 2804 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(3) + t.assert_equals(result, {3, 2804, 'Daria', 18}) + + -- primary key = 9 -> bucket_id = 1644 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(9) + t.assert_equals(result, {9, 1644, 'Nicolo', 35}) +end + +pgroup.test_object_partial_success_stop_on_error = function(g) + -- insert + -- primary key = 3 -> bucket_id = 2804 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:insert({3, 2804, 'Daria', 18}) + t.assert_equals(result, {3, 2804, 'Daria', 18}) + + -- primary key = 9 -> bucket_id = 1644 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:insert({9, 1644, 'Nicolo', 35}) + t.assert_equals(result, {9, 1644, 'Nicolo', 35}) + + -- insert_object_many + -- stop_on_error = true, rollback_on_error = false + -- one error on one storage without rollback, inserts stop by error on this storage + -- inserts before error are successful + local result, errs = g.cluster.main_server.net_box:call('crud.insert_object_many', { + 'customers', + { + {id = 22, name = 'Alex', age = 34}, + {id = 92, name = 'Artur', age = 29}, + {id = 3, name = 'Anastasia', age = 22}, + {id = 5, name = 'Sergey', age = 25}, + {id = 9, name = 'Anna', age = 30}, + }, + { + stop_on_error = true, + } + }) + + t.assert_not_equals(errs, nil) + t.assert_equals(#errs, 2) + + table.sort(errs, function(err1, err2) return err1.operation_data[1] < err2.operation_data[1] end) + + t.assert_str_contains(errs[1].err, 'Duplicate key exists') + t.assert_equals(errs[1].operation_data, {3, 2804, 'Anastasia', 22}) + + t.assert_str_contains(errs[2].err, batching_utils.stop_on_error_msg) + t.assert_equals(errs[2].operation_data, {9, 1644, "Anna", 30}) + + t.assert_equals(result.metadata, { + {name = 'id', type = 'unsigned'}, + {name = 'bucket_id', type = 'unsigned'}, + {name = 'name', type = 'string'}, + {name = 'age', type = 'number'}, + }) + + local objects = crud.unflatten_rows(result.rows, result.metadata) + t.assert_items_equals(objects, { + {id = 5, name = 'Sergey', age = 25, bucket_id = 1172}, + {id = 22, name = 'Alex', age = 34, bucket_id = 655}, + {id = 92, name = 'Artur', age = 29, bucket_id = 2040}, + }) + + -- get + -- primary key = 22 -> bucket_id = 655 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(22) + t.assert_equals(result, {22, 655, 'Alex', 34}) + + -- primary key = 92 -> bucket_id = 2040 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(92) + t.assert_equals(result, {92, 2040, 'Artur', 29}) + + -- primary key = 5 -> bucket_id = 1172 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(5) + t.assert_equals(result, {5, 1172, 'Sergey', 25}) + + -- primary key = 3 -> bucket_id = 2804 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(3) + t.assert_equals(result, {3, 2804, 'Daria', 18}) + + -- primary key = 9 -> bucket_id = 1644 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(9) + t.assert_equals(result, {9, 1644, 'Nicolo', 35}) +end + +pgroup.test_no_success_stop_on_error = function(g) + -- insert + -- primary key = 2 -> bucket_id = 401 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:insert({2, 401, 'Anna', 23}) + t.assert_equals(result, {2, 401, 'Anna', 23}) + + -- primary key = 3 -> bucket_id = 2804 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:insert({3, 2804, 'Daria', 18}) + t.assert_equals(result, {3, 2804, 'Daria', 18}) + + -- primary key = 92 -> bucket_id = 2040 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:insert({92, 2040, 'Artur', 29}) + t.assert_equals(result, {92, 2040, 'Artur', 29}) + + -- insert_many + -- fails for both: s1-master s2-master + -- one error on each storage, all inserts stop by error + local result, errs = g.cluster.main_server.net_box:call('crud.insert_many', { + 'customers', + { + {2, box.NULL, 'Alex', 34}, + {3, box.NULL, 'Anastasia', 22}, + {10, box.NULL, 'Sergey', 25}, + {9, box.NULL, 'Anna', 30}, + {92, box.NULL, 'Leo', 29}, + }, + { + stop_on_error = true, + } + }) + + t.assert_equals(result, nil) + t.assert_not_equals(errs, nil) + t.assert_equals(#errs, 5) + + table.sort(errs, function(err1, err2) return err1.operation_data[1] < err2.operation_data[1] end) + + t.assert_str_contains(errs[1].err, 'Duplicate key exists') + t.assert_equals(errs[1].operation_data, {2, 401, 'Alex', 34}) + + t.assert_str_contains(errs[2].err, 'Duplicate key exists') + t.assert_equals(errs[2].operation_data, {3, 2804, 'Anastasia', 22}) + + t.assert_str_contains(errs[3].err, batching_utils.stop_on_error_msg) + t.assert_equals(errs[3].operation_data, {9, 1644, "Anna", 30}) + + t.assert_str_contains(errs[4].err, batching_utils.stop_on_error_msg) + t.assert_equals(errs[4].operation_data, {10, 569, "Sergey", 25}) + + t.assert_str_contains(errs[5].err, batching_utils.stop_on_error_msg) + t.assert_equals(errs[5].operation_data, {92, 2040, "Leo", 29}) + + -- primary key = 2 -> bucket_id = 401 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(2) + t.assert_equals(result, {2, 401, 'Anna', 23}) + + -- primary key = 3 -> bucket_id = 2804 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(3) + t.assert_equals(result, {3, 2804, 'Daria', 18}) + + -- primary key = 10 -> bucket_id = 569 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(10) + t.assert_equals(result, nil) + + -- primary key = 9 -> bucket_id = 1644 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(9) + t.assert_equals(result, nil) + + -- primary key = 92 -> bucket_id = 2040 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(92) + t.assert_equals(result, {92, 2040, 'Artur', 29}) +end + +pgroup.test_object_no_success_stop_on_error = function(g) + -- insert + -- primary key = 2 -> bucket_id = 401 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:insert({2, 401, 'Anna', 23}) + t.assert_equals(result, {2, 401, 'Anna', 23}) + + -- primary key = 3 -> bucket_id = 2804 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:insert({3, 2804, 'Daria', 18}) + t.assert_equals(result, {3, 2804, 'Daria', 18}) + + -- primary key = 92 -> bucket_id = 2040 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:insert({92, 2040, 'Artur', 29}) + t.assert_equals(result, {92, 2040, 'Artur', 29}) + + -- insert_object_many + -- fails for both: s1-master s2-master + -- one error on each storage, all inserts stop by error + local result, errs = g.cluster.main_server.net_box:call('crud.insert_object_many', { + 'customers', + { + {id = 2, name = 'Alex', age = 34}, + {id = 3, name = 'Anastasia', age = 22}, + {id = 10, name = 'Sergey', age = 25}, + {id = 9, name = 'Anna', age = 30}, + {id = 92, name = 'Leo', age = 29}, + }, + { + stop_on_error = true, + } + }) + + t.assert_equals(result, nil) + t.assert_not_equals(errs, nil) + t.assert_equals(#errs, 5) + + table.sort(errs, function(err1, err2) return err1.operation_data[1] < err2.operation_data[1] end) + + t.assert_str_contains(errs[1].err, 'Duplicate key exists') + t.assert_equals(errs[1].operation_data, {2, 401, 'Alex', 34}) + + t.assert_str_contains(errs[2].err, 'Duplicate key exists') + t.assert_equals(errs[2].operation_data, {3, 2804, 'Anastasia', 22}) + + t.assert_str_contains(errs[3].err, batching_utils.stop_on_error_msg) + t.assert_equals(errs[3].operation_data, {9, 1644, "Anna", 30}) + + t.assert_str_contains(errs[4].err, batching_utils.stop_on_error_msg) + t.assert_equals(errs[4].operation_data, {10, 569, "Sergey", 25}) + + t.assert_str_contains(errs[5].err, batching_utils.stop_on_error_msg) + t.assert_equals(errs[5].operation_data, {92, 2040, "Leo", 29}) + + -- primary key = 2 -> bucket_id = 401 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(2) + t.assert_equals(result, {2, 401, 'Anna', 23}) + + -- primary key = 3 -> bucket_id = 2804 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(3) + t.assert_equals(result, {3, 2804, 'Daria', 18}) + + -- primary key = 10 -> bucket_id = 569 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(10) + t.assert_equals(result, nil) + + -- primary key = 9 -> bucket_id = 1644 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(9) + t.assert_equals(result, nil) + + -- primary key = 92 -> bucket_id = 2040 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(92) + t.assert_equals(result, {92, 2040, 'Artur', 29}) +end + +pgroup.test_all_success_rollback_on_error = function(g) + -- insert_many + -- all success + local result, errs = g.cluster.main_server.net_box:call('crud.insert_many', { + 'customers', + { + {2, box.NULL, 'Anna', 23}, + {3, box.NULL, 'Daria', 18}, + {71, box.NULL, 'Oleg', 32} + }, + { + rollback_on_error = true, + } + }) + + t.assert_equals(errs, nil) + t.assert_equals(result.metadata, { + {name = 'id', type = 'unsigned'}, + {name = 'bucket_id', type = 'unsigned'}, + {name = 'name', type = 'string'}, + {name = 'age', type = 'number'}, + }) + + local objects = crud.unflatten_rows(result.rows, result.metadata) + t.assert_items_equals(objects, { + {id = 2, name = 'Anna', age = 23, bucket_id = 401}, + {id = 3, name = 'Daria', age = 18, bucket_id = 2804}, + {id = 71, name = 'Oleg', age = 32, bucket_id = 1802}, + }) + + -- get + -- primary key = 2 -> bucket_id = 401 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(2) + t.assert_equals(result, {2, 401, 'Anna', 23}) + + -- primary key = 3 -> bucket_id = 2804 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(3) + t.assert_equals(result, {3, 2804, 'Daria', 18}) + + -- primary key = 71 -> bucket_id = 1802 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(71) + t.assert_equals(result, {71, 1802, 'Oleg', 32}) +end + +pgroup.test_object_all_success_rollback_on_error = function(g) + -- insert_object_many + -- all success + local result, errs = g.cluster.main_server.net_box:call('crud.insert_object_many', { + 'customers', + { + {id = 2, name = 'Anna', age = 23}, + {id = 3, name = 'Daria', age = 18}, + {id = 71, name = 'Oleg', age = 32} + }, + { + rollback_on_error = true, + } + }) + + t.assert_equals(errs, nil) + t.assert_equals(result.metadata, { + {name = 'id', type = 'unsigned'}, + {name = 'bucket_id', type = 'unsigned'}, + {name = 'name', type = 'string'}, + {name = 'age', type = 'number'}, + }) + + local objects = crud.unflatten_rows(result.rows, result.metadata) + t.assert_items_equals(objects, { + {id = 2, name = 'Anna', age = 23, bucket_id = 401}, + {id = 3, name = 'Daria', age = 18, bucket_id = 2804}, + {id = 71, name = 'Oleg', age = 32, bucket_id = 1802}, + }) + + -- get + -- primary key = 2 -> bucket_id = 401 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(2) + t.assert_equals(result, {2, 401, 'Anna', 23}) + + -- primary key = 3 -> bucket_id = 2804 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(3) + t.assert_equals(result, {3, 2804, 'Daria', 18}) + + -- primary key = 71 -> bucket_id = 1802 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(71) + t.assert_equals(result, {71, 1802, 'Oleg', 32}) +end + +pgroup.test_partial_success_rollback_on_error = function(g) + -- insert + -- primary key = 3 -> bucket_id = 2804 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:insert({3, 2804, 'Daria', 18}) + t.assert_equals(result, {3, 2804, 'Daria', 18}) + + -- primary key = 9 -> bucket_id = 1644 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:insert({9, 1644, 'Nicolo', 35}) + t.assert_equals(result, {9, 1644, 'Nicolo', 35}) + + -- insert_many + -- stop_on_error = false, rollback_on_error = true + -- two error on one storage with rollback + local result, errs = g.cluster.main_server.net_box:call('crud.insert_many', { + 'customers', + { + {22, box.NULL, 'Alex', 34}, + {92, box.NULL, 'Artur', 29}, + {3, box.NULL, 'Anastasia', 22}, + {5, box.NULL, 'Sergey', 25}, + {9, box.NULL, 'Anna', 30}, + }, + { + rollback_on_error = true, + } + }) + + t.assert_not_equals(errs, nil) + t.assert_equals(#errs, 3) + + table.sort(errs, function(err1, err2) return err1.operation_data[1] < err2.operation_data[1] end) + + t.assert_str_contains(errs[1].err, 'Duplicate key exists') + t.assert_equals(errs[1].operation_data, {3, 2804, 'Anastasia', 22}) + + t.assert_str_contains(errs[2].err, 'Duplicate key exists') + t.assert_equals(errs[2].operation_data, {9, 1644, 'Anna', 30}) + + t.assert_str_contains(errs[3].err, batching_utils.rollback_on_error_msg) + t.assert_equals(errs[3].operation_data, {92, 2040, 'Artur', 29}) + + t.assert_equals(result.metadata, { + {name = 'id', type = 'unsigned'}, + {name = 'bucket_id', type = 'unsigned'}, + {name = 'name', type = 'string'}, + {name = 'age', type = 'number'}, + }) + + local objects = crud.unflatten_rows(result.rows, result.metadata) + t.assert_items_equals(objects, { + {id = 5, name = 'Sergey', age = 25, bucket_id = 1172}, + {id = 22, name = 'Alex', age = 34, bucket_id = 655}, + }) + + -- get + -- primary key = 22 -> bucket_id = 655 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(22) + t.assert_equals(result, {22, 655, 'Alex', 34}) + + -- primary key = 92 -> bucket_id = 2040 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(92) + t.assert_equals(result, nil) + + -- primary key = 5 -> bucket_id = 1172 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(5) + t.assert_equals(result, {5, 1172, 'Sergey', 25}) + + -- primary key = 3 -> bucket_id = 2804 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(3) + t.assert_equals(result, {3, 2804, 'Daria', 18}) + + -- primary key = 9 -> bucket_id = 1644 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(9) + t.assert_equals(result, {9, 1644, 'Nicolo', 35}) +end + +pgroup.test_object_partial_success_rollback_on_error = function(g) + -- insert + -- primary key = 3 -> bucket_id = 2804 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:insert({3, 2804, 'Daria', 18}) + t.assert_equals(result, {3, 2804, 'Daria', 18}) + + -- primary key = 9 -> bucket_id = 1644 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:insert({9, 1644, 'Nicolo', 35}) + t.assert_equals(result, {9, 1644, 'Nicolo', 35}) + + -- insert_object_many + -- stop_on_error = false, rollback_on_error = true + -- two error on one storage with rollback + local result, errs = g.cluster.main_server.net_box:call('crud.insert_object_many', { + 'customers', + { + {id = 22, name = 'Alex', age = 34}, + {id = 92, name = 'Artur', age = 29}, + {id = 3, name = 'Anastasia', age = 22}, + {id = 5, name = 'Sergey', age = 25}, + {id = 9, name = 'Anna', age = 30}, + }, + { + rollback_on_error = true, + } + }) + + t.assert_not_equals(errs, nil) + t.assert_equals(#errs, 3) + + table.sort(errs, function(err1, err2) return err1.operation_data[1] < err2.operation_data[1] end) + + t.assert_str_contains(errs[1].err, 'Duplicate key exists') + t.assert_equals(errs[1].operation_data, {3, 2804, 'Anastasia', 22}) + + t.assert_str_contains(errs[2].err, 'Duplicate key exists') + t.assert_equals(errs[2].operation_data, {9, 1644, 'Anna', 30}) + + t.assert_str_contains(errs[3].err, batching_utils.rollback_on_error_msg) + t.assert_equals(errs[3].operation_data, {92, 2040, "Artur", 29}) + + t.assert_equals(result.metadata, { + {name = 'id', type = 'unsigned'}, + {name = 'bucket_id', type = 'unsigned'}, + {name = 'name', type = 'string'}, + {name = 'age', type = 'number'}, + }) + + local objects = crud.unflatten_rows(result.rows, result.metadata) + t.assert_items_equals(objects, { + {id = 5, name = 'Sergey', age = 25, bucket_id = 1172}, + {id = 22, name = 'Alex', age = 34, bucket_id = 655}, + }) + + -- get + -- primary key = 22 -> bucket_id = 655 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(22) + t.assert_equals(result, {22, 655, 'Alex', 34}) + + -- primary key = 92 -> bucket_id = 2040 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(92) + t.assert_equals(result, nil) + + -- primary key = 5 -> bucket_id = 1172 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(5) + t.assert_equals(result, {5, 1172, 'Sergey', 25}) + + -- primary key = 3 -> bucket_id = 2804 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(3) + t.assert_equals(result, {3, 2804, 'Daria', 18}) + + -- primary key = 9 -> bucket_id = 1644 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(9) + t.assert_equals(result, {9, 1644, 'Nicolo', 35}) +end + +pgroup.test_no_success_rollback_on_error = function(g) + -- insert + -- primary key = 2 -> bucket_id = 401 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:insert({2, 401, 'Anna', 23}) + t.assert_equals(result, {2, 401, 'Anna', 23}) + + -- primary key = 3 -> bucket_id = 2804 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:insert({3, 2804, 'Daria', 18}) + t.assert_equals(result, {3, 2804, 'Daria', 18}) + + -- primary key = 5 -> bucket_id = 1172 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:insert({5, 1172, 'Sergey', 25}) + t.assert_equals(result, {5, 1172, 'Sergey', 25}) + + -- primary key = 71 -> bucket_id = 1802 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:insert({71, 1802, 'Oleg', 32}) + t.assert_equals(result, {71, 1802, 'Oleg', 32}) + + -- insert_many + -- fails for both: s1-master s2-master + -- two errors on each storage with rollback + local result, errs = g.cluster.main_server.net_box:call('crud.insert_many', { + 'customers', + { + {1, box.NULL, 'Olga', 27}, + {92, box.NULL, 'Oleg', 32}, + {71, box.NULL, 'Sergey', 25}, + {5, box.NULL, 'Anna', 30}, + {2, box.NULL, 'Alex', 34}, + {3, box.NULL, 'Anastasia', 22}, + }, + { + rollback_on_error = true, + } + }) + + t.assert_equals(result, nil) + t.assert_not_equals(errs, nil) + t.assert_equals(#errs, 6) + + table.sort(errs, function(err1, err2) return err1.operation_data[1] < err2.operation_data[1] end) + + t.assert_str_contains(errs[1].err, batching_utils.rollback_on_error_msg) + t.assert_equals(errs[1].operation_data, {1, 477, "Olga", 27}) + + t.assert_str_contains(errs[2].err, 'Duplicate key exists') + t.assert_equals(errs[2].operation_data, {2, 401, 'Alex', 34}) + + t.assert_str_contains(errs[3].err, 'Duplicate key exists') + t.assert_equals(errs[3].operation_data, {3, 2804, 'Anastasia', 22}) + + t.assert_str_contains(errs[4].err, 'Duplicate key exists') + t.assert_equals(errs[4].operation_data, {5, 1172, "Anna", 30}) + + t.assert_str_contains(errs[5].err, 'Duplicate key exists') + t.assert_equals(errs[5].operation_data, {71, 1802, "Sergey", 25}) + + t.assert_str_contains(errs[6].err, batching_utils.rollback_on_error_msg) + t.assert_equals(errs[6].operation_data, {92, 2040, "Oleg", 32}) + + -- primary key = 1 -> bucket_id = 477 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(1) + t.assert_equals(result, nil) + + -- primary key = 92 -> bucket_id = 2040 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(92) + t.assert_equals(result, nil) + + -- primary key = 2 -> bucket_id = 401 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(2) + t.assert_equals(result, {2, 401, 'Anna', 23}) + + -- primary key = 3 -> bucket_id = 2804 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(3) + t.assert_equals(result, {3, 2804, 'Daria', 18}) + + -- primary key = 5 -> bucket_id = 1172 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(5) + t.assert_equals(result, {5, 1172, 'Sergey', 25}) + + -- primary key = 71 -> bucket_id = 1802 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(71) + t.assert_equals(result, {71, 1802, 'Oleg', 32}) +end + +pgroup.test_object_no_success_rollback_on_error = function(g) + -- insert + -- primary key = 2 -> bucket_id = 401 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:insert({2, 401, 'Anna', 23}) + t.assert_equals(result, {2, 401, 'Anna', 23}) + + -- primary key = 3 -> bucket_id = 2804 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:insert({3, 2804, 'Daria', 18}) + t.assert_equals(result, {3, 2804, 'Daria', 18}) + + -- primary key = 5 -> bucket_id = 1172 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:insert({5, 1172, 'Sergey', 25}) + t.assert_equals(result, {5, 1172, 'Sergey', 25}) + + -- primary key = 71 -> bucket_id = 1802 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:insert({71, 1802, 'Oleg', 32}) + t.assert_equals(result, {71, 1802, 'Oleg', 32}) + + -- insert_object_many + -- fails for both: s1-master s2-master + -- two errors on each storage with rollback + local result, errs = g.cluster.main_server.net_box:call('crud.insert_object_many', { + 'customers', + { + {id = 1, name = 'Olga', age = 27}, + {id = 92, name = 'Oleg', age = 32}, + {id = 71, name = 'Sergey', age = 25}, + {id = 5, name = 'Anna', age = 30}, + {id = 2, name = 'Alex', age = 34}, + {id = 3, name = 'Anastasia', age = 22}, + }, + { + rollback_on_error = true, + } + }) + + t.assert_equals(result, nil) + t.assert_not_equals(errs, nil) + t.assert_equals(#errs, 6) + + table.sort(errs, function(err1, err2) return err1.operation_data[1] < err2.operation_data[1] end) + + t.assert_str_contains(errs[1].err, batching_utils.rollback_on_error_msg) + t.assert_equals(errs[1].operation_data, {1, 477, "Olga", 27}) + + t.assert_str_contains(errs[2].err, 'Duplicate key exists') + t.assert_equals(errs[2].operation_data, {2, 401, 'Alex', 34}) + + t.assert_str_contains(errs[3].err, 'Duplicate key exists') + t.assert_equals(errs[3].operation_data, {3, 2804, 'Anastasia', 22}) + + t.assert_str_contains(errs[4].err, 'Duplicate key exists') + t.assert_equals(errs[4].operation_data, {5, 1172, "Anna", 30}) + + t.assert_str_contains(errs[5].err, 'Duplicate key exists') + t.assert_equals(errs[5].operation_data, {71, 1802, "Sergey", 25}) + + t.assert_str_contains(errs[6].err, batching_utils.rollback_on_error_msg) + t.assert_equals(errs[6].operation_data, {92, 2040, "Oleg", 32}) + + -- primary key = 1 -> bucket_id = 477 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(1) + t.assert_equals(result, nil) + + -- primary key = 92 -> bucket_id = 2040 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(92) + t.assert_equals(result, nil) + + -- primary key = 2 -> bucket_id = 401 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(2) + t.assert_equals(result, {2, 401, 'Anna', 23}) + + -- primary key = 3 -> bucket_id = 2804 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(3) + t.assert_equals(result, {3, 2804, 'Daria', 18}) + + -- primary key = 5 -> bucket_id = 1172 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(5) + t.assert_equals(result, {5, 1172, 'Sergey', 25}) + + -- primary key = 71 -> bucket_id = 1802 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(71) + t.assert_equals(result, {71, 1802, 'Oleg', 32}) +end + +pgroup.test_all_success_rollback_and_stop_on_error = function(g) + -- insert_many + -- all success + local result, errs = g.cluster.main_server.net_box:call('crud.insert_many', { + 'customers', + { + {2, box.NULL, 'Anna', 23}, + {3, box.NULL, 'Daria', 18}, + {71, box.NULL, 'Oleg', 32} + }, + { + stop_on_error = true, + rollback_on_error = true, + } + }) + + t.assert_equals(errs, nil) + t.assert_equals(result.metadata, { + {name = 'id', type = 'unsigned'}, + {name = 'bucket_id', type = 'unsigned'}, + {name = 'name', type = 'string'}, + {name = 'age', type = 'number'}, + }) + + local objects = crud.unflatten_rows(result.rows, result.metadata) + t.assert_items_equals(objects, { + {id = 2, name = 'Anna', age = 23, bucket_id = 401}, + {id = 3, name = 'Daria', age = 18, bucket_id = 2804}, + {id = 71, name = "Oleg", age = 32, bucket_id = 1802} + }) + + -- get + -- primary key = 2 -> bucket_id = 401 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(2) + t.assert_equals(result, {2, 401, 'Anna', 23}) + + -- primary key = 3 -> bucket_id = 2804 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(3) + t.assert_equals(result, {3, 2804, 'Daria', 18}) + + -- primary key = 71 -> bucket_id = 1802 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(71) + t.assert_equals(result, {71, 1802, 'Oleg', 32}) +end + +pgroup.test_object_all_success_rollback_and_stop_on_error = function(g) + -- insert_object_many + -- all success + local result, errs = g.cluster.main_server.net_box:call('crud.insert_object_many', { + 'customers', + { + {id = 2, name = 'Anna', age = 23}, + {id = 3, name = 'Daria', age = 18}, + {id = 71, name = 'Oleg', age = 32} + }, + { + stop_on_error = true, + rollback_on_error = true, + } + }) + + t.assert_equals(errs, nil) + t.assert_equals(result.metadata, { + {name = 'id', type = 'unsigned'}, + {name = 'bucket_id', type = 'unsigned'}, + {name = 'name', type = 'string'}, + {name = 'age', type = 'number'}, + }) + + local objects = crud.unflatten_rows(result.rows, result.metadata) + t.assert_items_equals(objects, { + {id = 2, name = 'Anna', age = 23, bucket_id = 401}, + {id = 3, name = 'Daria', age = 18, bucket_id = 2804}, + {id = 71, name = "Oleg", age = 32, bucket_id = 1802} + }) + + -- get + -- primary key = 2 -> bucket_id = 401 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(2) + t.assert_equals(result, {2, 401, 'Anna', 23}) + + -- primary key = 3 -> bucket_id = 2804 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(3) + t.assert_equals(result, {3, 2804, 'Daria', 18}) + + -- primary key = 71 -> bucket_id = 1802 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(71) + t.assert_equals(result, {71, 1802, 'Oleg', 32}) +end + +pgroup.test_partial_success_rollback_and_stop_on_error = function(g) + -- insert + -- primary key = 3 -> bucket_id = 2804 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:insert({3, 2804, 'Daria', 18}) + t.assert_equals(result, {3, 2804, 'Daria', 18}) + + -- primary key = 71 -> bucket_id = 1802 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:insert({71, 1802, 'Oleg', 32}) + t.assert_equals(result, {71, 1802, 'Oleg', 32}) + + -- insert_many + -- stop_on_error = true, rollback_on_error = true + -- two error on one storage with rollback, inserts stop by error on this storage + -- inserts before error are rollbacked + local result, errs = g.cluster.main_server.net_box:call('crud.insert_many', { + 'customers', + { + {22, box.NULL, 'Alex', 34}, + {92, box.NULL, 'Artur', 29}, + {3, box.NULL, 'Anastasia', 22}, + {5, box.NULL, 'Sergey', 25}, + {9, box.NULL, 'Anna', 30}, + {71, box.NULL, 'Oksana', 29}, + }, + { + stop_on_error = true, + rollback_on_error = true, + } + }) + + t.assert_not_equals(errs, nil) + t.assert_equals(#errs, 4) + + table.sort(errs, function(err1, err2) return err1.operation_data[1] < err2.operation_data[1] end) + + t.assert_str_contains(errs[1].err, 'Duplicate key exists') + t.assert_equals(errs[1].operation_data, {3, 2804, 'Anastasia', 22}) + + t.assert_str_contains(errs[2].err, batching_utils.stop_on_error_msg) + t.assert_equals(errs[2].operation_data, {9, 1644, "Anna", 30}) + + t.assert_str_contains(errs[3].err, batching_utils.stop_on_error_msg) + t.assert_equals(errs[3].operation_data, {71, 1802, "Oksana", 29}) + + t.assert_str_contains(errs[4].err, batching_utils.rollback_on_error_msg) + t.assert_equals(errs[4].operation_data, {92, 2040, "Artur", 29}) + + t.assert_equals(result.metadata, { + {name = 'id', type = 'unsigned'}, + {name = 'bucket_id', type = 'unsigned'}, + {name = 'name', type = 'string'}, + {name = 'age', type = 'number'}, + }) + + local objects = crud.unflatten_rows(result.rows, result.metadata) + t.assert_items_equals(objects, { + {id = 5, name = 'Sergey', age = 25, bucket_id = 1172}, + {id = 22, name = 'Alex', age = 34, bucket_id = 655}, + }) + + -- get + -- primary key = 22 -> bucket_id = 655 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(22) + t.assert_equals(result, {22, 655, 'Alex', 34}) + + -- primary key = 92 -> bucket_id = 2040 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(92) + t.assert_equals(result, nil) + + -- primary key = 5 -> bucket_id = 1172 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(5) + t.assert_equals(result, {5, 1172, 'Sergey', 25}) + + -- primary key = 3 -> bucket_id = 2804 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(3) + t.assert_equals(result, {3, 2804, 'Daria', 18}) + + -- primary key = 9 -> bucket_id = 1644 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(9) + t.assert_equals(result, nil) + + -- primary key = 71 -> bucket_id = 1802 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(71) + t.assert_equals(result, {71, 1802, 'Oleg', 32}) +end + +pgroup.test_object_partial_success_rollback_and_stop_on_error = function(g) + -- insert + -- primary key = 3 -> bucket_id = 2804 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:insert({3, 2804, 'Daria', 18}) + t.assert_equals(result, {3, 2804, 'Daria', 18}) + + -- primary key = 71 -> bucket_id = 1802 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:insert({71, 1802, 'Oleg', 32}) + t.assert_equals(result, {71, 1802, 'Oleg', 32}) + + -- insert_object_many + -- stop_on_error = true, rollback_on_error = true + -- two error on one storage with rollback, inserts stop by error on this storage + -- inserts before error are rollbacked + local result, errs = g.cluster.main_server.net_box:call('crud.insert_object_many', { + 'customers', + { + {id = 22, name = 'Alex', age = 34}, + {id = 92, name = 'Artur', age = 29}, + {id = 3, name = 'Anastasia', age = 22}, + {id = 5, name = 'Sergey', age = 25}, + {id = 9, name = 'Anna', age = 30}, + {id = 71, name = 'Oksana', age = 29}, + }, + { + stop_on_error = true, + rollback_on_error = true, + } + }) + + t.assert_not_equals(errs, nil) + t.assert_equals(#errs, 4) + + table.sort(errs, function(err1, err2) return err1.operation_data[1] < err2.operation_data[1] end) + + t.assert_str_contains(errs[1].err, 'Duplicate key exists') + t.assert_equals(errs[1].operation_data, {3, 2804, 'Anastasia', 22}) + + t.assert_str_contains(errs[2].err, batching_utils.stop_on_error_msg) + t.assert_equals(errs[2].operation_data, {9, 1644, "Anna", 30}) + + t.assert_str_contains(errs[3].err, batching_utils.stop_on_error_msg) + t.assert_equals(errs[3].operation_data, {71, 1802, "Oksana", 29}) + + t.assert_str_contains(errs[4].err, batching_utils.rollback_on_error_msg) + t.assert_equals(errs[4].operation_data, {92, 2040, "Artur", 29}) + + t.assert_equals(result.metadata, { + {name = 'id', type = 'unsigned'}, + {name = 'bucket_id', type = 'unsigned'}, + {name = 'name', type = 'string'}, + {name = 'age', type = 'number'}, + }) + + local objects = crud.unflatten_rows(result.rows, result.metadata) + t.assert_items_equals(objects, { + {id = 5, name = 'Sergey', age = 25, bucket_id = 1172}, + {id = 22, name = 'Alex', age = 34, bucket_id = 655}, + }) + + -- get + -- primary key = 22 -> bucket_id = 655 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(22) + t.assert_equals(result, {22, 655, 'Alex', 34}) + + -- primary key = 92 -> bucket_id = 2040 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(92) + t.assert_equals(result, nil) + + -- primary key = 5 -> bucket_id = 1172 -> s2-master + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(5) + t.assert_equals(result, {5, 1172, 'Sergey', 25}) + + -- primary key = 3 -> bucket_id = 2804 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(3) + t.assert_equals(result, {3, 2804, 'Daria', 18}) + + -- primary key = 9 -> bucket_id = 1644 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(9) + t.assert_equals(result, nil) + + -- primary key = 71 -> bucket_id = 1802 -> s1-master + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(71) + t.assert_equals(result, {71, 1802, 'Oleg', 32}) +end + +pgroup.test_partial_result = function(g) + -- bad fields format + local result, errs = g.cluster.main_server.net_box:call('crud.insert_many', { + 'customers', + { + {15, box.NULL, 'Fedor', 59}, + {25, box.NULL, 'Anna', 23}, + }, + {fields = {'id', 'invalid'}}, + }) + + t.assert_equals(result, nil) + t.assert_not_equals(errs, nil) + t.assert_equals(#errs, 1) + t.assert_str_contains(errs[1].err, 'Space format doesn\'t contain field named "invalid"') + + -- insert_many + local result, errs = g.cluster.main_server.net_box:call('crud.insert_many', { + 'customers', + { + {1, box.NULL, 'Fedor', 59}, + {2, box.NULL, 'Anna', 23}, + {3, box.NULL, 'Daria', 18} + }, + {fields = {'id', 'name'}}, + }) + + t.assert_equals(errs, nil) + t.assert_equals(result.metadata, { + {name = 'id', type = 'unsigned'}, + {name = 'name', type = 'string'}, + }) + + local objects = crud.unflatten_rows(result.rows, result.metadata) + t.assert_items_equals(objects, {{id = 1, name = 'Fedor'}, {id = 2, name = 'Anna'}, {id = 3, name = 'Daria'}}) +end + +pgroup.test_object_partial_result = function(g) + -- bad fields format + local result, errs = g.cluster.main_server.net_box:call('crud.insert_object_many', { + 'customers', + { + {id = 15, name = 'Fedor', age = 59}, + {id = 25, name = 'Anna', age = 23}, + }, + {fields = {'id', 'invalid'}}, + }) + + t.assert_equals(result, nil) + t.assert_not_equals(errs, nil) + t.assert_equals(#errs, 1) + t.assert_str_contains(errs[1].err, 'Space format doesn\'t contain field named "invalid"') + + -- insert_object_many + local result, errs = g.cluster.main_server.net_box:call('crud.insert_object_many', { + 'customers', + { + {id = 1, name = 'Fedor', age = 59}, + {id = 2, name = 'Anna', age = 23}, + {id = 3, name = 'Daria', age = 18} + }, + {fields = {'id', 'name'}}, + }) + + t.assert_equals(errs, nil) + t.assert_equals(result.metadata, { + {name = 'id', type = 'unsigned'}, + {name = 'name', type = 'string'}, + }) + + local objects = crud.unflatten_rows(result.rows, result.metadata) + t.assert_items_equals(objects, {{id = 1, name = 'Fedor'}, {id = 2, name = 'Anna'}, {id = 3, name = 'Daria'}}) +end + +pgroup.test_opts_not_damaged = function(g) + -- insert_many + local batch_insert_opts = {timeout = 1, fields = {'name', 'age'}} + local new_batch_insert_opts, err = g.cluster.main_server:eval([[ + local crud = require('crud') + + local batch_insert_opts = ... + + local _, err = crud.insert_many('customers', { + {1, box.NULL, 'Alex', 59} + }, batch_insert_opts) + + return batch_insert_opts, err + ]], {batch_insert_opts}) + + t.assert_equals(err, nil) + t.assert_equals(new_batch_insert_opts, batch_insert_opts) + + -- insert_object_many + local batch_insert_opts = {timeout = 1, fields = {'name', 'age'}} + local new_batch_insert_opts, err = g.cluster.main_server:eval([[ + local crud = require('crud') + + local batch_insert_opts = ... + + local _, err = crud.insert_object_many('customers', { + {id = 2, name = 'Fedor', age = 59} + }, batch_insert_opts) + + return batch_insert_opts, err + ]], {batch_insert_opts}) + + t.assert_equals(err, nil) + t.assert_equals(new_batch_insert_opts, batch_insert_opts) +end