From d1bfdbd47d32ab01bf195a14016e358c49415453 Mon Sep 17 00:00:00 2001 From: Georgy Moiseev Date: Wed, 15 Dec 2021 15:25:11 +0300 Subject: [PATCH] Add performance tests to compare crud and vshard This patch reworks existing performance tests and adds new cases: select for equal conditions for primary and secondary indexes. It also adds corresponding vshard test cases to compare performance. Comparison may be not exactly precise since vshard test functions use naive mergers, but should at least estimate basic differences. Closes #225 --- test/performance/perf_test.lua | 539 ++++++++++++++++++++++++++ test/performance/select_perf_test.lua | 273 ------------- 2 files changed, 539 insertions(+), 273 deletions(-) create mode 100644 test/performance/perf_test.lua delete mode 100644 test/performance/select_perf_test.lua diff --git a/test/performance/perf_test.lua b/test/performance/perf_test.lua new file mode 100644 index 000000000..9339c03bf --- /dev/null +++ b/test/performance/perf_test.lua @@ -0,0 +1,539 @@ +local fio = require('fio') +local clock = require('clock') +local fiber = require('fiber') +local errors = require('errors') +local net_box = require('net.box') +local log = require('log') + +local t = require('luatest') +local g = t.group('perf') + +local helpers = require('test.helper') + + +local id = 0 +local function gen() + id = id + 1 + return id +end + +local function reset_gen() + id = 0 +end + +g.before_all(function(g) + g.cluster = helpers.Cluster:new({ + datadir = fio.tempdir(), + server_command = helpers.entrypoint('srv_select'), + use_vshard = true, + replicasets = { + { + uuid = helpers.uuid('a'), + alias = 'router', + roles = { 'crud-router' }, + servers = { + { instance_uuid = helpers.uuid('a', 1), alias = 'router' }, + }, + }, + { + uuid = helpers.uuid('b'), + alias = 's-1', + roles = { 'customers-storage', 'crud-storage' }, + servers = { + { instance_uuid = helpers.uuid('b', 1), alias = 's1-master' }, + { instance_uuid = helpers.uuid('b', 2), alias = 's1-replica' }, + }, + }, + { + uuid = helpers.uuid('c'), + alias = 's-2', + roles = { 'customers-storage', 'crud-storage' }, + servers = { + { instance_uuid = helpers.uuid('c', 1), alias = 's2-master' }, + { instance_uuid = helpers.uuid('c', 2), alias = 's2-replica' }, + }, + }, + { + uuid = helpers.uuid('d'), + alias = 's-2', + roles = { 'customers-storage', 'crud-storage' }, + servers = { + { instance_uuid = helpers.uuid('d', 1), alias = 's3-master' }, + { instance_uuid = helpers.uuid('d', 2), alias = 's3-replica' }, + }, + } + }, + }) + g.cluster:start() + + g.router = g.cluster:server('router').net_box + + g.router:eval("crud = require('crud')") + + -- Run real perf tests only with flag, otherwise run short version + -- to test compatibility as part of unit/integration test run. + g.perf_mode_on = os.getenv('PERF_MODE_ON') +end) + +g.before_each(function(g) + helpers.truncate_space_on_cluster(g.cluster, 'customers') + reset_gen() +end) + +g.after_all(function(g) + g.cluster:stop() + fio.rmtree(g.cluster.datadir) +end) + +local function generate_customer() + return { gen(), box.NULL, 'David', 'Smith', 33, 'Los Angeles' } +end + +local select_prepare = function(g) + local count + if g.perf_mode_on then + count = 100 + else + count = 10100 + end + + for _ = 1, count do + g.router:call('crud.insert', { 'customers', generate_customer() }) + end +end + +local insert_params = function() + return { 'customers', generate_customer() } +end + +local select_params_pk_eq = function() + return { 'customers', {{'==', 'id', gen() % 10000}} } +end + +local vshard_select_params_pk_eq = function() + return { 'customers', gen() % 10000 } +end + +local select_params_pk_gt = function() + return { 'customers', {{'>', 'id', gen() % 10000}}, { first = 10 } } +end + +local vshard_select_params_pk_gt = function() + return { 'customers', gen() % 10000, { limit = 10 } } +end + +local select_params_secondary_eq = function() + return { 'customers', {{'==', 'age', 33}}, { first = 10 } } +end + +local vshard_select_params_secondary_eq = function() + return { 'customers', 'age', 33, { limit = 10 } } +end + +local stats_cases = { + _with_stats_disabled = { + prepare = function(g) + g.router:call("crud.disable_stats") + end, + }, + _with_local_stats = { + prepare = function(g) + g.router:call("crud.enable_stats", {{ driver = 'local' }}) + end, + }, + _with_metrics_stats = { + prepare = function(g) + local is_metrics_supported = g.router:eval([[ + return require('crud.stats.metrics_registry').is_supported() + ]]) + t.skip_if(is_metrics_supported == false, 'Metrics registry is unsupported') + g.router:call("crud.enable_stats", {{ driver = 'metrics' }}) + end, + }, +} + +local integration_params = { + timeout = 2, + fiber_count = 5, + connection_count = 2, +} + +local insert_perf = { + timeout = 30, + fiber_count = 600, + connection_count = 10, +} + +-- Higher load may lead to net_msg_max limit break. +local select_perf = { + timeout = 30, + fiber_count = 200, + connection_count = 10, +} + +local cases = { + crud_insert = { + call = 'crud.insert', + params = insert_params, + matrix = stats_cases, + integration_params = integration_params, + perf_params = insert_perf, + }, + + crud_insert_without_stats_wrapper = { + prepare = function(g) + g.router:eval("_plain_insert = require('crud.insert').tuple") + end, + call = '_plain_insert', + params = insert_params, + integration_params = integration_params, + perf_params = insert_perf, + }, + + vshard_insert = { + prepare = function(g) + g.router:eval([[ + local vshard = require('vshard') + local function _vshard_insert(space_name, tuple) + local bucket_id = vshard.router.bucket_id_strcrc32(tuple[1]) + return vshard.router.callrw( + bucket_id, + '_vshard_insert_storage', + { space_name, tuple, bucket_id } + ) + end + + rawset(_G, '_vshard_insert', _vshard_insert) + ]]) + + + for _, server in ipairs(g.cluster.servers) do + server.net_box:eval([[ + local function _vshard_insert_storage(space_name, tuple, bucket_id) + local space = box.space[space_name] + assert(space ~= nil) + + assert(space.index.bucket_id ~= nil) + tuple[space.index.bucket_id.parts[1].fieldno] = bucket_id + + local ok = space:insert(tuple) + assert(ok ~= nil) + end + + rawset(_G, '_vshard_insert_storage', _vshard_insert_storage) + ]]) + end + end, + call = '_vshard_insert', + params = insert_params, + integration_params = integration_params, + perf_params = insert_perf, + }, + + crud_select_pk_eq = { + prepare = select_prepare, + call = 'crud.select', + params = select_params_pk_eq, + matrix = stats_cases, + integration_params = integration_params, + perf_params = select_perf, + }, + + crud_select_without_stats_wrapper_pk_eq = { + prepare = function(g) + g.router:eval("_plain_select = require('crud.select').call") + select_prepare(g) + end, + call = '_plain_select', + params = select_params_pk_eq, + integration_params = integration_params, + perf_params = select_perf, + }, + + vshard_select_pk_eq = { + prepare = function(g) + select_prepare(g) + + g.router:eval([[ + local vshard = require('vshard') + local function _vshard_select(space_name, key) + local bucket_id = vshard.router.bucket_id_strcrc32(key) + return vshard.router.callrw( + bucket_id, + '_vshard_select_storage', + { space_name, key } + ) + end + + rawset(_G, '_vshard_select', _vshard_select) + ]]) + + + for _, server in ipairs(g.cluster.servers) do + server.net_box:eval([[ + local function _vshard_select_storage(space_name, key) + local space = box.space[space_name] + assert(space ~= nil) + + return space:select(key) + end + + rawset(_G, '_vshard_select_storage', _vshard_select_storage) + ]]) + end + end, + call = '_vshard_select', + params = vshard_select_params_pk_eq, + integration_params = integration_params, + perf_params = insert_perf, + }, + + crud_select_pk_gt = { + prepare = select_prepare, + call = 'crud.select', + params = select_params_pk_gt, + matrix = stats_cases, + integration_params = integration_params, + perf_params = select_perf, + }, + + crud_select_without_stats_wrapper_pk_gt = { + prepare = function(g) + g.router:eval("_plain_select = require('crud.select').call") + select_prepare(g) + end, + call = '_plain_select', + params = select_params_pk_gt, + integration_params = integration_params, + perf_params = select_perf, + }, + + vshard_select_pk_gt = { + prepare = function(g) + select_prepare(g) + + g.router:eval([[ + local vshard = require('vshard') + + local function sort(a, b) + return a[1] < b[1] + end + + local function _vshard_select_gt(space, key, opts) + assert(type(opts.limit) == 'number') + assert(opts.limit > 0) + + local storage_response = {} + + for id, replicaset in pairs(vshard.router.routeall()) do + local resp, err = replicaset:call( + '_vshard_select_gt_storage', + { space, key, opts } + ) + if err ~= nil then + error(err) + end + + for _, v in ipairs(resp) + table.insert(storage_response, v) + end + + end + + -- Naive merger. + local response = { } + + table.sort(storage_response, sort) + + for i = 1, opts.limit do + response[ind] = tuples[i] + end + + return response + end + + rawset(_G, '_vshard_select_gt', _vshard_select_gt) + ]]) + + + for _, server in ipairs(g.cluster.servers) do + server.net_box:eval([[ + local function _vshard_select_gt_storage(space_name, key, opts) + local space = box.space[space_name] + assert(space ~= nil) + + return space:select(key, { limit = opts.limit, iterator = 'GT' }) + end + + rawset(_G, '_vshard_select_gt_storage', _vshard_select_gt_storage) + ]]) + end + end, + call = '_vshard_select_gt', + params = vshard_select_params_pk_gt, + integration_params = integration_params, + perf_params = select_perf, + }, + + crud_select_secondary_eq = { + prepare = select_prepare, + call = 'crud.select', + params = select_params_secondary_eq, + matrix = stats_cases, + integration_params = integration_params, + perf_params = select_perf, + }, + + crud_select_without_stats_wrapper_secondary_eq = { + prepare = function(g) + g.router:eval("_plain_select = require('crud.select').call") + select_prepare(g) + end, + call = '_plain_select', + params = select_params_secondary_eq, + integration_params = integration_params, + perf_params = select_perf, + }, + + vshard_select_secondary_eq = { + prepare = function(g) + select_prepare(g) + + g.router:eval([[ + local vshard = require('vshard') + local function _vshard_select_secondary(space, index, key, opts) + assert(type(opts.limit) == 'number') + assert(opts.limit > 0) + + local storage_response = {} + + for id, replicaset in pairs(vshard.router.routeall()) do + local resp, err = replicaset:call( + '_vshard_select_secondary_storage', + { space, index, key, opts } + ) + if err ~= nil then + error(err) + end + + storage_response[id] = resp + end + + -- Naive merger. + local response = { } + + local ind = 0 + for i = 1, opts.limit do + for _, tuples in pairs(storage_response) do + if tuples[i] ~= nil then + response[ind] = tuples[i] + end + end + + if ind == opts.limit then + break + end + end + + return response + end + + rawset(_G, '_vshard_select_secondary', _vshard_select_secondary) + ]]) + + + for _, server in ipairs(g.cluster.servers) do + server.net_box:eval([[ + local function _vshard_select_secondary_storage(space_name, index_name, key, opts) + local space = box.space[space_name] + assert(space ~= nil) + + local index = space.index[index_name] + assert(index ~= nil) + + return space:select(key, { limit = opts.limit }) + end + + rawset(_G, '_vshard_select_secondary_storage', _vshard_select_secondary_storage) + ]]) + end + end, + call = '_vshard_select_secondary', + params = vshard_select_params_secondary_eq, + integration_params = integration_params, + perf_params = select_perf, + }, +} + +local function fiber_generator(conn, call, params, report, timeout) + local start = clock.monotonic() + + while (clock.monotonic() - start) < timeout do + local ok, res, err = pcall(conn.call, conn, call, params()) + if not ok then + log.error(res) + table.insert(report.errors, res) + elseif err ~= nil then + errors.wrap(err) + log.error(err) + table.insert(report.errors, err) + else + report.count = report.count + 1 + end + end +end + +for name, case in pairs(cases) do + local matrix = case.matrix or { [''] = {} } + + for subname, subcase in pairs(matrix) do + local test_name = ('test_%s%s'):format(name, subname) + + g.before_test(test_name, function(g) + if case.prepare ~= nil then + case.prepare(g) + end + + if subcase.prepare ~= nil then + subcase.prepare(g) + end + end) + + g[test_name] = function(g) + local params + if g.perf_mode_on then + params = case.perf_params + else + params = case.integration_params + end + + local connections = {} + + local router = g.cluster:server('router') + for _ = 1, params.connection_count do + local c = net_box:connect(router.net_box_uri, router.net_box_credentials) + if c == nil then + t.fail('Failed to prepare connections') + end + table.insert(connections, c) + end + + local fibers = {} + local report = { errors = {}, count = 0 } + for id = 1, params.fiber_count do + local conn_id = id % params.connection_count + 1 + local conn = connections[conn_id] + local f = fiber.new(fiber_generator, conn, case.call, case.params, report, params.timeout) + f:set_joinable(true) + table.insert(fibers, f) + end + + for i = 1, params.fiber_count do + fibers[i]:join() + end + + log.info('\n%s: requests %d, rps %d, errors %d\n', + test_name, report.count, report.count / params.timeout, #report.errors) + end + end +end diff --git a/test/performance/select_perf_test.lua b/test/performance/select_perf_test.lua deleted file mode 100644 index 8c2f3b1b3..000000000 --- a/test/performance/select_perf_test.lua +++ /dev/null @@ -1,273 +0,0 @@ -local fio = require('fio') -local fiber = require('fiber') -local errors = require('errors') -local net_box = require('net.box') -local log = require('log') - -local t = require('luatest') -local g = t.group('perf') - -local helpers = require('test.helper') - -g.before_all(function(g) - g.cluster = helpers.Cluster:new({ - datadir = fio.tempdir(), - server_command = helpers.entrypoint('srv_select'), - use_vshard = true, - replicasets = { - { - uuid = helpers.uuid('a'), - alias = 'router', - roles = { 'crud-router' }, - servers = { - { instance_uuid = helpers.uuid('a', 1), alias = 'router' }, - }, - }, - { - uuid = helpers.uuid('b'), - alias = 's-1', - roles = { 'customers-storage', 'crud-storage' }, - servers = { - { instance_uuid = helpers.uuid('b', 1), alias = 's1-master' }, - { instance_uuid = helpers.uuid('b', 2), alias = 's1-replica' }, - }, - }, - { - uuid = helpers.uuid('c'), - alias = 's-2', - roles = { 'customers-storage', 'crud-storage' }, - servers = { - { instance_uuid = helpers.uuid('c', 1), alias = 's2-master' }, - { instance_uuid = helpers.uuid('c', 2), alias = 's2-replica' }, - }, - }, - { - uuid = helpers.uuid('d'), - alias = 's-2', - roles = { 'customers-storage', 'crud-storage' }, - servers = { - { instance_uuid = helpers.uuid('d', 1), alias = 's3-master' }, - { instance_uuid = helpers.uuid('d', 2), alias = 's3-replica' }, - }, - } - }, - }) - g.cluster:start() - - g.router = g.cluster:server('router').net_box - - g.router:eval("crud = require('crud')") - - -- Run real perf tests only with flag, otherwise run short version - -- to test compatibility as part of unit/integration test run. - g.perf_mode_on = os.getenv('PERF_MODE_ON') -end) - -g.before_each(function(g) - helpers.truncate_space_on_cluster(g.cluster, 'customers') -end) - -g.after_all(function(g) - g.cluster:stop() - fio.rmtree(g.cluster.datadir) -end) - -local function insert_customers(conn, id, count, timeout, report, call_func) - local customer = {id, box.NULL, 'David', 'Smith', 33, 'Los Angeles'} - local start = fiber.clock() - - while (fiber.clock() - start) < timeout do - local ok, res, err = pcall(conn.call, conn, call_func, {'customers', customer}) - if not ok then - log.error('Insert error: %s', res) - table.insert(report.errors, res) - elseif err ~= nil then - errors.wrap(err) - log.error('Insert error: %s', err) - table.insert(report.errors, err) - else - report.count = report.count + 1 - end - customer[1] = customer[1] + count - end -end - -local insert_cases = { - without_stats_wrapper = { - prepare = function(g) - g.router:eval("plain_insert = require('crud.insert').tuple") - end, - call = "plain_insert", - }, - with_stats_disabled = { - prepare = function(g) - g.cluster:server('router').net_box:call("package.loaded.crud.disable_stats") - end, - call = "package.loaded.crud.insert", - }, - with_local_stats = { - prepare = function(g) - g.cluster:server('router').net_box:call("package.loaded.crud.enable_stats", {{ driver = 'local' }}) - end, - call = "package.loaded.crud.insert", - }, - with_metrics_stats = { - prepare = function(g) - local router = g.cluster:server('router').net_box - local is_metrics_supported = router:eval([[ - return require('crud.stats.metrics_registry').is_supported() - ]]) - t.skip_if(is_metrics_supported == false, 'Metrics registry is unsupported') - router:call("package.loaded.crud.enable_stats", {{ driver = 'metrics' }}) - end, - call = "package.loaded.crud.insert", - }, -} - -for name, case in pairs(insert_cases) do - local test_name = ('test_insert_%s'):format(name) - - if case.prepare ~= nil then - g.before_test(test_name, case.prepare) - end - - g[test_name] = function(g) - local timeout, fiber_count, connection_count - if g.perf_mode_on then - timeout = 30 - fiber_count = 600 - connection_count = 10 - else - timeout = 2 - fiber_count = 10 - connection_count = 2 - end - - local connections = {} - - local server = g.cluster.main_server - for _ = 1, connection_count do - local c = net_box:connect(server.net_box_uri, server.net_box_credentials) - assert(c) - table.insert(connections, c) - end - - local fibers = {} - local report = {errors = {}, count = 0} - for id = 1, fiber_count do - local conn_id = id % connection_count + 1 - local conn = connections[conn_id] - local f = fiber.new(insert_customers, conn, id, fiber_count, timeout, report, case.call) - f:set_joinable(true) - table.insert(fibers, f) - end - - for i = 1, fiber_count do - fibers[i]:join() - end - - log.error('\n%s: requests %d, rps %d, errors %d', - test_name, report.count, report.count / timeout, #report.errors) - end -end - - -local select_cases = { - without_stats_wrapper = { - prepare = function(g) - g.router:eval("plain_select = require('crud.select').call") - end, - call = "plain_select", - }, - with_stats_disabled = { - prepare = function(g) - g.router:call("package.loaded.crud.disable_stats") - end, - call = "package.loaded.crud.select", - }, - with_local_stats = { - prepare = function(g) - g.router:call("package.loaded.crud.enable_stats", {{ driver = 'local' }}) - end, - call = "package.loaded.crud.select", - }, - with_metrics_stats = { - prepare = function(g) - local router = g.cluster:server('router').net_box - local is_metrics_supported = router:eval([[ - return require('crud.stats.metrics_registry').is_supported() - ]]) - t.skip_if(is_metrics_supported == false, 'Metrics registry is unsupported') - router:call("package.loaded.crud.enable_stats", {{ driver = 'metrics' }}) - end, - call = "package.loaded.crud.select", - }, -} - -local function select_customers(conn, id, timeout, report, call_func) - local start = fiber.clock() - local ok, err = pcall(function() - while (fiber.clock() - start) < timeout do - local _, err = conn:call(call_func, {'customers', {{'>', 'id', id}}, {first = 10}}) - if err ~= nil then - errors.wrap(err) - log.error(err) - table.insert(report.errors, err) - else - report.count = report.count + 1 - end - end - end) - if not ok then - table.insert(report.errors, err) - log.error(err) - end -end - -for name, case in pairs(select_cases) do - local test_name = ('test_select_%s'):format(name) - - if case.prepare ~= nil then - g.before_test(test_name, case.prepare) - end - - g[test_name] = function(g) - local timeout, fiber_count, connection_count - if g.perf_mode_on then - timeout = 30 - fiber_count = 200 - connection_count = 10 - else - timeout = 2 - fiber_count = 5 - connection_count = 2 - end - - local connections = {} - - local server = g.cluster.main_server - - for _ = 1, connection_count do - local c = net_box:connect(server.net_box_uri, server.net_box_credentials) - assert(c) - table.insert(connections, c) - end - - local fibers = {} - local report = {errors = {}, count = 0} - for id = 1, fiber_count do - local conn_id = id % connection_count + 1 - local conn = connections[conn_id] - local f = fiber.new(select_customers, conn, id, timeout, report, case.call) - f:set_joinable(true) - table.insert(fibers, f) - end - - for i = 1, fiber_count do - fibers[i]:join() - end - - log.error('\n%s: requests %d, rps %d, errors %d', - test_name, report.count, report.count / timeout, #report.errors) - end -end