diff --git a/README.md b/README.md index 974ef702c..10753cdae 100644 --- a/README.md +++ b/README.md @@ -657,6 +657,21 @@ returns. `count` is total requests count since instance start or stats restart. `latency` is average time of requests execution, `time` is total time of requests execution. +`select` section additionally contains `details` collectors. +```lua +crud.stats('my_space').select.details +--- +- map_reduces: 4 + tuples_fetched: 10500 + tuples_lookup: 238000 +... +``` +`map_reduces` is a count of planned map reduces +(including those not executed successfully). `tuples_fetched` +is a count of tuples fetched from storages during execution, +`tuples_lookup` is a count of tuples looked up on storages +while collecting response for call. + Statistics are preserved between package reloads or [Tarantool Cartridge role reloads](https://www.tarantool.io/en/doc/latest/book/cartridge/cartridge_api/modules/cartridge.roles/#reload). diff --git a/crud/common/fiber_context.lua b/crud/common/fiber_context.lua new file mode 100644 index 000000000..9ba974b2e --- /dev/null +++ b/crud/common/fiber_context.lua @@ -0,0 +1,22 @@ +local fiber = require('fiber') + +local fiber_context = {} + +function fiber_context.init_section(section) + local storage = fiber.self().storage + if storage[section] == nil then + storage[section] = {} + end + + return storage[section] +end + +function fiber_context.get_section(section) + return fiber.self().storage[section] +end + +function fiber_context.drop_section(section) + fiber.self().storage[section] = nil +end + +return fiber_context diff --git a/crud/common/utils.lua b/crud/common/utils.lua index 3ae3503ad..bb2d5044f 100644 --- a/crud/common/utils.lua +++ b/crud/common/utils.lua @@ -610,4 +610,7 @@ function utils.space_doesnt_exist_msg(space_name) return ("Space %q doesn't exist"):format(space_name) end +-- Do nothing. +function utils.do_nothing() end + return utils diff --git a/crud/select.lua b/crud/select.lua index c01e49410..8c4ea5e46 100644 --- a/crud/select.lua +++ b/crud/select.lua @@ -5,6 +5,7 @@ local select_executor = require('crud.select.executor') local select_filters = require('crud.select.filters') local dev_checks = require('crud.common.dev_checks') local schema = require('crud.common.schema') +local fiber_context = require('crud.common.fiber_context') local SelectError = errors.new_class('SelectError') @@ -76,6 +77,9 @@ local function select_on_storage(space_name, index_id, conditions, opts) cursor = make_cursor(tuples) end + cursor.stats = fiber_context.get_section('storage_stats') + fiber_context.drop_section('storage_stats') + -- getting tuples with user defined fields (if `fields` option is specified) -- and fields that are needed for comparison on router (primary key + scan key) return cursor, schema.filter_tuples_fields(tuples, opts.field_names) diff --git a/crud/select/compat/select.lua b/crud/select/compat/select.lua index adf165bbe..06521ec73 100644 --- a/crud/select/compat/select.lua +++ b/crud/select/compat/select.lua @@ -2,12 +2,14 @@ local checks = require('checks') local errors = require('errors') local vshard = require('vshard') +local fiber_context = require('crud.common.fiber_context') local utils = require('crud.common.utils') local sharding = require('crud.common.sharding') local dev_checks = require('crud.common.dev_checks') local common = require('crud.select.compat.common') local schema = require('crud.common.schema') local sharding_key_module = require('crud.common.sharding_key') +local stats = require('crud.stats.module') local compare_conditions = require('crud.compare.conditions') local select_plan = require('crud.select.plan') @@ -111,6 +113,12 @@ local function build_select_iterator(space_name, user_conditions, opts) if err ~= nil then return nil, err, true end + else + local context_stats = fiber_context.init_section('router_stats') + if context_stats.map_reduces == nil then + context_stats.map_reduces = 0 + end + context_stats.map_reduces = context_stats.map_reduces + 1 end local tuples_limit = opts.first @@ -142,7 +150,12 @@ local function build_select_iterator(space_name, user_conditions, opts) local merger = Merger.new(replicasets_to_select, space, plan.index_id, common.SELECT_FUNC_NAME, {space_name, plan.index_id, plan.conditions, select_opts}, - {tarantool_iter = plan.tarantool_iter, field_names = plan.field_names, call_opts = opts.call_opts} + { + tarantool_iter = plan.tarantool_iter, + field_names = plan.field_names, + call_opts = opts.call_opts, + stats_callback = stats.get_fetch_callback(), + } ) -- filter space format by plan.field_names (user defined fields + primary key + scan key) diff --git a/crud/select/compat/select_old.lua b/crud/select/compat/select_old.lua index d2254ed2d..e0ba547f8 100644 --- a/crud/select/compat/select_old.lua +++ b/crud/select/compat/select_old.lua @@ -9,6 +9,8 @@ local sharding = require('crud.common.sharding') local dev_checks = require('crud.common.dev_checks') local schema = require('crud.common.schema') local sharding_key_module = require('crud.common.sharding_key') +local stats = require('crud.stats.module') +local fiber_context = require('crud.common.fiber_context') local compare_conditions = require('crud.compare.conditions') local select_plan = require('crud.select.plan') @@ -30,6 +32,7 @@ local function select_iteration(space_name, plan, opts) }) local call_opts = opts.call_opts + local stats_callback = stats.get_fetch_callback() -- call select on storages local storage_select_opts = { @@ -59,6 +62,14 @@ local function select_iteration(space_name, plan, opts) local tuples = {} for replicaset_uuid, replicaset_results in pairs(results) do + -- Stats extracted with callback here and not passed + -- outside to wrapper because fetch for pairs can be + -- called even after pairs() return from generators. + local cursor = replicaset_results[1] + if cursor.stats ~= nil then + stats_callback(cursor.stats, space_name) + end + tuples[replicaset_uuid] = replicaset_results[2] end @@ -137,6 +148,12 @@ local function build_select_iterator(space_name, user_conditions, opts) if err ~= nil then return nil, err, true end + else + local context_stats = fiber_context.init_section('router_stats') + if context_stats.map_reduces == nil then + context_stats.map_reduces = 0 + end + context_stats.map_reduces = context_stats.map_reduces + 1 end -- generate tuples comparator diff --git a/crud/select/executor.lua b/crud/select/executor.lua index 6d6f74837..285bd359d 100644 --- a/crud/select/executor.lua +++ b/crud/select/executor.lua @@ -1,6 +1,7 @@ local errors = require('errors') local dev_checks = require('crud.common.dev_checks') +local fiber_context = require('crud.common.fiber_context') local select_comparators = require('crud.compare.comparators') local compat = require('crud.common.compat') local has_keydef = compat.exists('tuple.keydef', 'key_def') @@ -68,12 +69,15 @@ function executor.execute(space, index, filter_func, opts) opts = opts or {} + local stats = fiber_context.init_section('storage_stats') + stats.tuples_lookup = 0 + stats.tuples_fetched = 0 + if opts.limit == 0 then return {} end local tuples = {} - local tuples_count = 0 local value = opts.scan_value if opts.after_tuple ~= nil then @@ -111,9 +115,9 @@ function executor.execute(space, index, filter_func, opts) if matched then table.insert(tuples, tuple) - tuples_count = tuples_count + 1 + stats.tuples_fetched = stats.tuples_fetched + 1 - if opts.limit ~= nil and tuples_count >= opts.limit then + if opts.limit ~= nil and stats.tuples_fetched >= opts.limit then break end elseif early_exit then @@ -121,6 +125,7 @@ function executor.execute(space, index, filter_func, opts) end gen.state, tuple = gen(gen.param, gen.state) + stats.tuples_lookup = stats.tuples_lookup + 1 end return tuples diff --git a/crud/select/merger.lua b/crud/select/merger.lua index fa443b849..a3a361ebe 100644 --- a/crud/select/merger.lua +++ b/crud/select/merger.lua @@ -93,6 +93,8 @@ local function fetch_chunk(context, state) local replicaset = context.replicaset local vshard_call_name = context.vshard_call_name local timeout = context.timeout or call.DEFAULT_VSHARD_CALL_TIMEOUT + local space_name = context.space_name + local stats_callback = context.stats_callback local future = state.future -- The source was entirely drained. @@ -109,6 +111,14 @@ local function fetch_chunk(context, state) -- Decode metainfo, leave data to be processed by the merger. local cursor = decode_metainfo(buf) + -- Extract stats info. + -- Stats extracted with callback here and not passed + -- outside to wrapper because fetch for pairs can be + -- called even after pairs() return from generators. + if cursor.stats ~= nil and stats_callback ~= nil then + stats_callback(cursor.stats, space_name) + end + -- Check whether we need the next call. if cursor.is_end then local next_state = {} @@ -137,6 +147,7 @@ local function new(replicasets, space, index_id, func_name, func_args, opts) opts = opts or {} local call_opts = opts.call_opts local mode = call_opts.mode or 'read' + local stats_callback = opts.stats_callback local vshard_call_name = call.get_vshard_call_name(mode, call_opts.prefer_replica, call_opts.balance) -- Request a first data chunk and create merger sources. @@ -157,6 +168,8 @@ local function new(replicasets, space, index_id, func_name, func_args, opts) replicaset = replicaset, vshard_call_name = vshard_call_name, timeout = call_opts.timeout, + stats_callback = stats_callback, + space_name = space.name, } local state = {future = future} local source = merger_lib.new_buffer_source(fetch_chunk, context, state) diff --git a/crud/stats/local_registry.lua b/crud/stats/local_registry.lua index d18485efc..c08088358 100644 --- a/crud/stats/local_registry.lua +++ b/crud/stats/local_registry.lua @@ -3,6 +3,7 @@ -- local dev_checks = require('crud.common.dev_checks') +local op_module = require('crud.stats.operation') local registry_common = require('crud.stats.registry_common') local stash = require('crud.stats.stash') @@ -127,4 +128,56 @@ function registry.observe_space_not_found() return true end +--- Increase statistics of storage select/pairs calls +-- +-- @function observe_fetch +-- +-- @string space_name +-- Name of space. +-- +-- @number tuples_fetched +-- Count of tuples fetched during storage call. +-- +-- @number tuples_lookup +-- Count of tuples looked up on storages while collecting response. +-- +-- @treturn boolean Returns true. +-- +function registry.observe_fetch(tuples_fetched, tuples_lookup, space_name) + dev_checks('number', 'number', 'string') + + local op = op_module.SELECT + registry_common.init_collectors_if_required(internal.registry.spaces, space_name, op) + local collectors = internal.registry.spaces[space_name][op].details + + collectors.tuples_fetched = collectors.tuples_fetched + tuples_fetched + collectors.tuples_lookup = collectors.tuples_lookup + tuples_lookup + + return true +end + +--- Increase statistics of planned map reduces during select/pairs +-- +-- @function observe_map_reduces +-- +-- @number count +-- Count of map reduces planned. +-- +-- @string space_name +-- Name of space. +-- +-- @treturn boolean Returns true. +-- +function registry.observe_map_reduces(count, space_name) + dev_checks('number', 'string') + + local op = op_module.SELECT + registry_common.init_collectors_if_required(internal.registry.spaces, space_name, op) + local collectors = internal.registry.spaces[space_name][op].details + + collectors.map_reduces = collectors.map_reduces + count + + return true +end + return registry diff --git a/crud/stats/module.lua b/crud/stats/module.lua index a68219edb..7e185a27a 100644 --- a/crud/stats/module.lua +++ b/crud/stats/module.lua @@ -8,6 +8,7 @@ local errors = require('errors') local vshard = require('vshard') local dev_checks = require('crud.common.dev_checks') +local fiber_context = require('crud.common.fiber_context') local utils = require('crud.common.utils') local op_module = require('crud.stats.operation') local registry = require('crud.stats.local_registry') @@ -138,6 +139,10 @@ local function wrap_tail(space_name, op, pairs, start_time, call_status, ...) err = second_return_val end + local context_stats = fiber_context.get_section('router_stats') + -- Describe local variables to use `goto`. + local space_not_found_msg, space + -- If space not exists, do not build a separate collector for it. -- Call request for non-existing space will always result in error. -- The resulting overhead is insignificant for existing spaces: @@ -146,7 +151,7 @@ local function wrap_tail(space_name, op, pairs, start_time, call_status, ...) -- it is treated as unknown as well. if status == 'error' and registry.is_unknown_space(space_name) then if type(err) == 'table' and type(err.err) == 'string' then - local space_not_found_msg = utils.space_doesnt_exist_msg(space_name) + space_not_found_msg = utils.space_doesnt_exist_msg(space_name) if string.find(err.err, space_not_found_msg) ~= nil then registry.observe_space_not_found() goto return_values @@ -156,13 +161,21 @@ local function wrap_tail(space_name, op, pairs, start_time, call_status, ...) -- We can't rely only on parsing error value because space existence -- is not always the first check in request validation. -- Check explicitly if space do not exist. - local space = utils.get_space(space_name, vshard.router.routeall()) + space = utils.get_space(space_name, vshard.router.routeall()) if space == nil then registry.observe_space_not_found() goto return_values end end + if context_stats ~= nil then + if context_stats.map_reduces ~= nil then + registry.observe_map_reduces(context_stats.map_reduces, space_name) + end + + fiber_context.drop_section('router_stats') + end + registry.observe(latency, space_name, op, status) :: return_values :: @@ -225,6 +238,57 @@ function stats.wrap(func, op, opts) end end +local storage_stats_schema = { tuples_fetched = 'number', tuples_lookup = 'number' } +--- Callback to collect storage tuples stats (select/pairs). +-- +-- @function update_fetch_stats +-- @local +-- +-- @tab storage_stats +-- Statistics from select storage call. +-- +-- @number storage_stats.tuples_fetched +-- Count of tuples fetched during storage call. +-- +-- @number storage_stats.tuples_lookup +-- Count of tuples looked up on storages while collecting response. +-- +-- @string space_name +-- Name of space. +-- +-- @treturn boolean Returns `true`. +-- +local function update_fetch_stats(storage_stats, space_name) + dev_checks(storage_stats_schema, 'string') + + if not stats.is_enabled() then + return true + end + + registry.observe_fetch( + storage_stats.tuples_fetched, + storage_stats.tuples_lookup, + space_name + ) + + return true +end + +--- Returns callback to collect storage tuples stats (select/pairs). +-- +-- @function get_fetch_callback +-- +-- @treturn[1] function `update_fetch_stats` function to collect tuples stats. +-- @treturn[2] function Dummy function, if stats disabled. +-- +function stats.get_fetch_callback() + if not stats.is_enabled() then + return utils.do_nothing + end + + return update_fetch_stats +end + --- Table with CRUD operation lables. -- -- @tfield string INSERT diff --git a/crud/stats/registry_common.lua b/crud/stats/registry_common.lua index e15bd7c71..93fa7e579 100644 --- a/crud/stats/registry_common.lua +++ b/crud/stats/registry_common.lua @@ -3,6 +3,7 @@ -- local dev_checks = require('crud.common.dev_checks') +local op_module = require('crud.stats.operation') local registry_common = {} @@ -10,10 +11,17 @@ local registry_common = {} -- -- @function build_collectors -- +-- @string op +-- Label of registry collectors. +-- Use `require('crud.stats.module').op` to pick one. +-- -- @treturn table Returns collectors for success and error requests. --- Collectors store 'count', 'latency' and 'time' values. +-- Collectors store 'count', 'latency' and 'time' values. Also +-- returns additional collectors for select operation. -- -function registry_common.build_collectors() +function registry_common.build_collectors(op) + dev_checks('string') + local collectors = { ok = { count = 0, @@ -27,6 +35,14 @@ function registry_common.build_collectors() }, } + if op == op_module.SELECT then + collectors.details = { + tuples_fetched = 0, + tuples_lookup = 0, + map_reduces = 0, + } + end + return collectors end @@ -53,7 +69,7 @@ function registry_common.init_collectors_if_required(spaces, space_name, op) local space_collectors = spaces[space_name] if space_collectors[op] == nil then - space_collectors[op] = registry_common.build_collectors() + space_collectors[op] = registry_common.build_collectors(op) end end diff --git a/crud/stats/stash.lua b/crud/stats/stash.lua index d0d84febc..dc2cc1db4 100644 --- a/crud/stats/stash.lua +++ b/crud/stats/stash.lua @@ -1,4 +1,4 @@ ----- Module +---- Stash for storing data between package and role reloads. -- @module crud.stats.stash -- local dev_checks = require('crud.common.dev_checks') diff --git a/test/integration/stats_test.lua b/test/integration/stats_test.lua index 34164ca96..6ee611e6a 100644 --- a/test/integration/stats_test.lua +++ b/test/integration/stats_test.lua @@ -362,6 +362,76 @@ local unknown_space_cases = { }, } +local prepare_select_data = function(g) + helpers.insert_objects(g, space_name, { + -- Storage is s-2. + { + id = 1, name = "Elizabeth", last_name = "Jackson", + age = 12, city = "New York", + }, + -- Storage is s-2. + { + id = 2, name = "Mary", last_name = "Brown", + age = 46, city = "Los Angeles", + }, + -- Storage is s-1. + { + id = 3, name = "David", last_name = "Smith", + age = 33, city = "Los Angeles", + }, + -- Storage is s-2. + { + id = 4, name = "William", last_name = "White", + age = 81, city = "Chicago", + } + }) +end + +local select_cases = { + select_by_primary_index = { + func = 'crud.select', + conditions = {{ '==', 'id_index', 3 }}, + map_reduces = 0, + tuples_fetched = 1, + tuples_lookup = 1, + }, + select_by_secondary_index = { + func = 'crud.select', + conditions = {{ '==', 'age_index', 46 }}, + map_reduces = 1, + tuples_fetched = 1, + tuples_lookup = 1, + }, + select_full_scan = { + func = 'crud.select', + conditions = {{ '>', 'id_index', 0 }, { '==', 'city', 'Kyoto' }}, + map_reduces = 1, + tuples_fetched = 0, + tuples_lookup = 4, + }, + pairs_by_primary_index = { + eval = eval.pairs, + conditions = {{ '==', 'id_index', 3 }}, + map_reduces = 0, + tuples_fetched = 1, + tuples_lookup = 1, + }, + pairs_by_secondary_index = { + eval = eval.pairs, + conditions = {{ '==', 'age_index', 46 }}, + map_reduces = 1, + tuples_fetched = 1, + tuples_lookup = 1, + }, + pairs_full_scan = { + eval = eval.pairs, + conditions = {{ '>', 'id_index', 0 }, { '==', 'city', 'Kyoto' }}, + map_reduces = 1, + tuples_fetched = 0, + tuples_lookup = 4, + }, +} + -- Generate non-null stats for all cases. local function generate_stats(g) for _, case in pairs(simple_operation_cases) do @@ -387,6 +457,19 @@ local function generate_stats(g) end end + -- Generate non-null select details. + prepare_select_data(g) + for _, case in pairs(select_cases) do + local _, err + if case.eval ~= nil then + _, err = g.router:eval(case.eval, { space_name, case.conditions }) + else + _, err = g.router:call(case.func, { space_name, case.conditions }) + end + + t.assert_equals(err, nil) + end + -- Generate non-null space_not_found stats. local case = unknown_space_cases.insert local _, err = g.router:call(case.func, case.args) @@ -507,6 +590,53 @@ for name, case in pairs(unknown_space_cases) do end +for name, case in pairs(select_cases) do + local test_name = ('test_%s_details'):format(name) + + g.before_test(test_name, prepare_select_data) + + g[test_name] = function(g) + local op = 'select' + local space_name = space_name + + -- Collect stats before call. + local stats_before = g:get_stats(space_name) + t.assert_type(stats_before, 'table') + + -- Call operation. + local _, err + if case.eval ~= nil then + _, err = g.router:eval(case.eval, { space_name, case.conditions }) + else + _, err = g.router:call(case.func, { space_name, case.conditions }) + end + + t.assert_equals(err, nil) + + -- Collect stats after call. + local stats_after = g:get_stats(space_name) + t.assert_type(stats_after, 'table') + + local op_before = set_defaults_if_empty(stats_before, op) + local details_before = op_before.details + local op_after = set_defaults_if_empty(stats_after, op) + local details_after = op_after.details + + local tuples_fetched_diff = details_after.tuples_fetched - details_before.tuples_fetched + t.assert_equals(tuples_fetched_diff, case.tuples_fetched, + 'Expected count of tuples fetched') + + local tuples_lookup_diff = details_after.tuples_lookup - details_before.tuples_lookup + t.assert_equals(tuples_lookup_diff, case.tuples_lookup, + 'Expected count of tuples looked up on storage') + + local map_reduces_diff = details_after.map_reduces - details_before.map_reduces + t.assert_equals(map_reduces_diff, case.map_reduces, + 'Expected count of map reduces planned') + end +end + + g.before_test( 'test_role_reload_do_not_reset_observations', generate_stats) diff --git a/test/unit/stats_test.lua b/test/unit/stats_test.lua index 71d1b9132..bc60a5b01 100644 --- a/test/unit/stats_test.lua +++ b/test/unit/stats_test.lua @@ -192,6 +192,19 @@ for name, case in pairs(observe_cases) do }, 'Other status collectors initialized after observations' ) + + -- SELECT collectors have additional details section. + if op == stats_module.op.SELECT then + t.assert_equals( + op_stats.details, + { + tuples_fetched = 0, + tuples_lookup = 0, + map_reduces = 0, + }, + 'Detail collectors initialized after select observations' + ) + end end end end @@ -441,3 +454,77 @@ g.test_enabling_stats_on_non_router_throws_error = function(g) local storage = g.cluster:server('s1-master').net_box t.assert_error(storage.eval, storage, " require('crud.stats.module').enable() ") end + +g.test_stats_fetch_callback = function(g) + local storage_cursor_stats = { tuples_fetched = 5, tuples_lookup = 25 } + + g.router:eval([[ stats_module.get_fetch_callback()(...) ]], + { storage_cursor_stats, space_name }) + + local op = stats_module.op.SELECT + local stats = g:get_stats(space_name) + + t.assert_not_equals(stats[op], nil, + 'Fetch stats update inits SELECT collectors') + + local details = stats[op].details + + t.assert_equals(details.tuples_fetched, 5, + 'tuples_fetched is inremented by expected value') + t.assert_equals(details.tuples_lookup, 25, + 'tuples_lookup is inremented by expected value') +end + +g.test_disable_stats_before_fetch_callback_get_do_not_break_call = function(g) + g:disable_stats() + + local storage_cursor_stats = { tuples_fetched = 5, tuples_lookup = 25 } + g.router:eval([[ stats_module.get_fetch_callback()(...) ]], + { storage_cursor_stats, space_name }) + + t.success('No unexpected errors') +end + +g.test_disable_stats_after_fetch_callback_get_do_not_break_call = function(g) + local storage_cursor_stats = { tuples_fetched = 5, tuples_lookup = 25 } + + g.router:eval([[ + local callback = stats_module.get_fetch_callback() + stats_module.disable() + callback(...) + ]], { storage_cursor_stats, space_name }) + + t.success('No unexpected errors') +end + +local eval_with_fiber_context_setup = [[ + local fiber_context = require('crud.common.fiber_context') + + local op = select(1, ...) + local space_name = select(2, ...) + local context_field = select(3, ...) + local context_value = select(4, ...) + + local func = function(space_name) + local ctx = fiber_context.init_section('router_stats') + ctx[context_field] = context_value + return true + end + + + return stats_module.wrap(func, op)(space_name) +]] + +g.test_map_reduce_increment = function(g) + local op = stats_module.op.SELECT + local inc = 1 + + local _, err = g.router:eval(eval_with_fiber_context_setup, + { op, space_name, 'map_reduces', inc }) + t.assert_equals(err, nil) + + local stats = g:get_stats() + + t.assert_equals(stats.spaces[space_name][op].details.map_reduces, inc, + "Counter of map reduces incremented") +end