diff --git a/README.md b/README.md index 3fb06975..a2b6932e 100644 --- a/README.md +++ b/README.md @@ -657,6 +657,21 @@ returns. `count` is total requests count since instance start or stats restart. `latency` is average time of requests execution, `time` is total time of requests execution. +`select` section additionally contains `details` collectors. +```lua +crud.stats('my_space').select.details +--- +- map_reduces: 4 + tuples_fetched: 10500 + tuples_lookup: 238000 +... +``` +`map_reduces` is a count of planned map reduces +(including those not executed successfully). `tuples_fetched` +is a count of tuples fetched from storages during execution, +`tuples_lookup` is a count of tuples looked up on storages +while collecting response for call. + ## Cartridge roles `cartridge.roles.crud-storage` is a Tarantool Cartridge role that depends on the diff --git a/crud/common/utils.lua b/crud/common/utils.lua index 3ae3503a..b5df4642 100644 --- a/crud/common/utils.lua +++ b/crud/common/utils.lua @@ -1,5 +1,6 @@ local errors = require('errors') local ffi = require('ffi') +local fiber = require('fiber') local vshard = require('vshard') local fun = require('fun') @@ -610,4 +611,24 @@ function utils.space_doesnt_exist_msg(space_name) return ("Space %q doesn't exist"):format(space_name) end +-- Do nothing. +function utils.pass() end + +function utils.init_context_section(section) + local storage = fiber.self().storage + if storage[section] == nil then + storage[section] = {} + end + + return storage[section] +end + +function utils.get_context_section(section) + return fiber.self().storage[section] +end + +function utils.drop_context_section(section) + fiber.self().storage[section] = nil +end + return utils diff --git a/crud/select.lua b/crud/select.lua index c01e4941..dcea2456 100644 --- a/crud/select.lua +++ b/crud/select.lua @@ -76,6 +76,9 @@ local function select_on_storage(space_name, index_id, conditions, opts) cursor = make_cursor(tuples) end + cursor.stats = utils.get_context_section('storage_stats') + utils.drop_context_section('storage_stats') + -- getting tuples with user defined fields (if `fields` option is specified) -- and fields that are needed for comparison on router (primary key + scan key) return cursor, schema.filter_tuples_fields(tuples, opts.field_names) diff --git a/crud/select/compat/select.lua b/crud/select/compat/select.lua index adf165bb..210b483d 100644 --- a/crud/select/compat/select.lua +++ b/crud/select/compat/select.lua @@ -8,6 +8,7 @@ local dev_checks = require('crud.common.dev_checks') local common = require('crud.select.compat.common') local schema = require('crud.common.schema') local sharding_key_module = require('crud.common.sharding_key') +local stats = require('crud.stats.module') local compare_conditions = require('crud.compare.conditions') local select_plan = require('crud.select.plan') @@ -111,6 +112,9 @@ local function build_select_iterator(space_name, user_conditions, opts) if err ~= nil then return nil, err, true end + else + local context_stats = utils.init_context_section('router_stats') + context_stats.map_reduces = 1 end local tuples_limit = opts.first @@ -142,7 +146,12 @@ local function build_select_iterator(space_name, user_conditions, opts) local merger = Merger.new(replicasets_to_select, space, plan.index_id, common.SELECT_FUNC_NAME, {space_name, plan.index_id, plan.conditions, select_opts}, - {tarantool_iter = plan.tarantool_iter, field_names = plan.field_names, call_opts = opts.call_opts} + { + tarantool_iter = plan.tarantool_iter, + field_names = plan.field_names, + call_opts = opts.call_opts, + stats_callback = stats.get_fetch_callback(), + } ) -- filter space format by plan.field_names (user defined fields + primary key + scan key) diff --git a/crud/select/compat/select_old.lua b/crud/select/compat/select_old.lua index d2254ed2..497e5e41 100644 --- a/crud/select/compat/select_old.lua +++ b/crud/select/compat/select_old.lua @@ -9,6 +9,7 @@ local sharding = require('crud.common.sharding') local dev_checks = require('crud.common.dev_checks') local schema = require('crud.common.schema') local sharding_key_module = require('crud.common.sharding_key') +local stats = require('crud.stats.module') local compare_conditions = require('crud.compare.conditions') local select_plan = require('crud.select.plan') @@ -30,6 +31,7 @@ local function select_iteration(space_name, plan, opts) }) local call_opts = opts.call_opts + local stats_callback = stats.get_fetch_callback() -- call select on storages local storage_select_opts = { @@ -59,6 +61,14 @@ local function select_iteration(space_name, plan, opts) local tuples = {} for replicaset_uuid, replicaset_results in pairs(results) do + -- Stats extracted with callback here and not passed + -- outside to wrapper because fetch for pairs can be + -- called even after pairs() return from generators. + local cursor = replicaset_results[1] + if cursor.stats ~= nil then + stats_callback(cursor.stats, space_name) + end + tuples[replicaset_uuid] = replicaset_results[2] end @@ -137,6 +147,9 @@ local function build_select_iterator(space_name, user_conditions, opts) if err ~= nil then return nil, err, true end + else + local context_stats = utils.init_context_section('router_stats') + context_stats.map_reduces = 1 end -- generate tuples comparator diff --git a/crud/select/executor.lua b/crud/select/executor.lua index 6d6f7483..3aed25cb 100644 --- a/crud/select/executor.lua +++ b/crud/select/executor.lua @@ -68,12 +68,15 @@ function executor.execute(space, index, filter_func, opts) opts = opts or {} + local stats = utils.init_context_section('storage_stats') + stats.tuples_lookup = 0 + stats.tuples_fetched = 0 + if opts.limit == 0 then return {} end local tuples = {} - local tuples_count = 0 local value = opts.scan_value if opts.after_tuple ~= nil then @@ -111,9 +114,9 @@ function executor.execute(space, index, filter_func, opts) if matched then table.insert(tuples, tuple) - tuples_count = tuples_count + 1 + stats.tuples_fetched = stats.tuples_fetched + 1 - if opts.limit ~= nil and tuples_count >= opts.limit then + if opts.limit ~= nil and stats.tuples_fetched >= opts.limit then break end elseif early_exit then @@ -121,6 +124,7 @@ function executor.execute(space, index, filter_func, opts) end gen.state, tuple = gen(gen.param, gen.state) + stats.tuples_lookup = stats.tuples_lookup + 1 end return tuples diff --git a/crud/select/merger.lua b/crud/select/merger.lua index fa443b84..a3a361eb 100644 --- a/crud/select/merger.lua +++ b/crud/select/merger.lua @@ -93,6 +93,8 @@ local function fetch_chunk(context, state) local replicaset = context.replicaset local vshard_call_name = context.vshard_call_name local timeout = context.timeout or call.DEFAULT_VSHARD_CALL_TIMEOUT + local space_name = context.space_name + local stats_callback = context.stats_callback local future = state.future -- The source was entirely drained. @@ -109,6 +111,14 @@ local function fetch_chunk(context, state) -- Decode metainfo, leave data to be processed by the merger. local cursor = decode_metainfo(buf) + -- Extract stats info. + -- Stats extracted with callback here and not passed + -- outside to wrapper because fetch for pairs can be + -- called even after pairs() return from generators. + if cursor.stats ~= nil and stats_callback ~= nil then + stats_callback(cursor.stats, space_name) + end + -- Check whether we need the next call. if cursor.is_end then local next_state = {} @@ -137,6 +147,7 @@ local function new(replicasets, space, index_id, func_name, func_args, opts) opts = opts or {} local call_opts = opts.call_opts local mode = call_opts.mode or 'read' + local stats_callback = opts.stats_callback local vshard_call_name = call.get_vshard_call_name(mode, call_opts.prefer_replica, call_opts.balance) -- Request a first data chunk and create merger sources. @@ -157,6 +168,8 @@ local function new(replicasets, space, index_id, func_name, func_args, opts) replicaset = replicaset, vshard_call_name = vshard_call_name, timeout = call_opts.timeout, + stats_callback = stats_callback, + space_name = space.name, } local state = {future = future} local source = merger_lib.new_buffer_source(fetch_chunk, context, state) diff --git a/crud/stats/local_registry.lua b/crud/stats/local_registry.lua index 26b1d6fb..048273c3 100644 --- a/crud/stats/local_registry.lua +++ b/crud/stats/local_registry.lua @@ -1,4 +1,5 @@ local dev_checks = require('crud.common.dev_checks') +local op_module = require('crud.stats.operation') local registry_common = require('crud.stats.registry_common') local registry = {} @@ -121,4 +122,56 @@ function registry.observe_space_not_found() return true end +--- Increase statistics of storage select/pairs calls +-- +-- @function observe_fetch +-- +-- @tparam string space_name +-- Name of space. +-- +-- @tparam number tuples_fetched +-- Count of tuples fetched during storage call. +-- +-- @tparam number tuples_lookup +-- Count of tuples looked up on storages while collecting response. +-- +-- @treturn boolean Returns true. +-- +function registry.observe_fetch(tuples_fetched, tuples_lookup, space_name) + dev_checks('number', 'number', 'string') + + local op = op_module.SELECT + registry_common.init_collectors_if_required(internal_registry.spaces, space_name, op) + local collectors = internal_registry.spaces[space_name][op].details + + collectors.tuples_fetched = collectors.tuples_fetched + tuples_fetched + collectors.tuples_lookup = collectors.tuples_lookup + tuples_lookup + + return true +end + +--- Increase statistics of planned map reduces during select/pairs +-- +-- @function observe_map_reduces +-- +-- @tparam number count +-- Count of map reduces planned. +-- +-- @tparam string space_name +-- Name of space. +-- +-- @treturn boolean Returns true. +-- +function registry.observe_map_reduces(count, space_name) + dev_checks('number', 'string') + + local op = op_module.SELECT + registry_common.init_collectors_if_required(internal_registry.spaces, space_name, op) + local collectors = internal_registry.spaces[space_name][op].details + + collectors.map_reduces = collectors.map_reduces + count + + return true +end + return registry diff --git a/crud/stats/module.lua b/crud/stats/module.lua index e7b37741..dceee8ba 100644 --- a/crud/stats/module.lua +++ b/crud/stats/module.lua @@ -117,6 +117,8 @@ local function wrap_tail(space_name, op, opts, start_time, call_status, ...) err = second_return_val end + local context_stats = utils.get_context_section('router_stats') + -- If space not exists, do not build a separate collector for it. -- Call request for non-existing space will always result in error. -- The resulting overhead is insignificant for existing spaces: @@ -144,6 +146,13 @@ local function wrap_tail(space_name, op, opts, start_time, call_status, ...) registry.observe(latency, space_name, op, status) + if context_stats ~= nil then + if context_stats.map_reduces ~= nil then + registry.observe_map_reduces(context_stats.map_reduces, space_name) + end + utils.drop_context_section('router_stats') + end + :: return_values :: if call_status == false then @@ -202,6 +211,56 @@ function stats.wrap(func, op, opts) end end +local storage_stats_schema = { tuples_fetched = 'number', tuples_lookup = 'number' } +--- Callback to collect storage tuples stats (select/pairs) +-- +-- @function update_fetch_stats +-- +-- @tparam table storage_stats +-- Statistics from select storage call. +-- +-- @tfield number tuples_fetched +-- Count of tuples fetched during storage call. +-- +-- @tfield number tuples_lookup +-- Count of tuples looked up on storages while collecting response. +-- +-- @tparam string space_name +-- Name of space. +-- +-- @treturn boolean Returns true. +-- +local function update_fetch_stats(storage_stats, space_name) + dev_checks(storage_stats_schema, 'string') + + if not is_enabled then + return true + end + + registry.observe_fetch( + storage_stats.tuples_fetched, + storage_stats.tuples_lookup, + space_name + ) + + return true +end + +--- Returns callback to collect storage tuples stats (select/pairs) +-- +-- @function get_fetch_callback +-- +-- @treturn[1] function `update_fetch_stats` function to collect tuples stats. +-- @treturn[2] function Dummy function, if stats disabled. +-- +function stats.get_fetch_callback() + if not is_enabled then + return utils.pass + end + + return update_fetch_stats +end + --- Table with CRUD operation lables -- -- @table label diff --git a/crud/stats/registry_common.lua b/crud/stats/registry_common.lua index 084bebb9..48bd30f4 100644 --- a/crud/stats/registry_common.lua +++ b/crud/stats/registry_common.lua @@ -1,4 +1,5 @@ local dev_checks = require('crud.common.dev_checks') +local op_module = require('crud.stats.operation') local registry_common = {} @@ -6,10 +7,17 @@ local registry_common = {} -- -- @function build_collectors -- +-- @tparam string op +-- Label of registry collectors. +-- Use `require('crud.stats.module').op` to pick one. +-- -- @treturn table Returns collectors for success and error requests. --- Collectors store 'count', 'latency' and 'time' values. +-- Collectors store 'count', 'latency' and 'time' values. Also +-- returns additional collectors for select operation. -- -function registry_common.build_collectors() +function registry_common.build_collectors(op) + dev_checks('string') + local collectors = { ok = { count = 0, @@ -23,6 +31,14 @@ function registry_common.build_collectors() }, } + if op == op_module.SELECT then + collectors.details = { + tuples_fetched = 0, + tuples_lookup = 0, + map_reduces = 0, + } + end + return collectors end @@ -49,7 +65,7 @@ function registry_common.init_collectors_if_required(spaces, space_name, op) local space_collectors = spaces[space_name] if space_collectors[op] == nil then - space_collectors[op] = registry_common.build_collectors() + space_collectors[op] = registry_common.build_collectors(op) end end diff --git a/test/integration/stats_test.lua b/test/integration/stats_test.lua index 52847429..7cc5c2ee 100644 --- a/test/integration/stats_test.lua +++ b/test/integration/stats_test.lua @@ -421,7 +421,7 @@ for name, case in pairs(unknown_space_cases) do local test_name = ('test_%s_on_unknown_space'):format(name) g[test_name] = function(g) - -- Collect statss before call. + -- Collect stats before call. local stats_before = g:get_stats() t.assert_type(stats_before, 'table') @@ -445,3 +445,118 @@ for name, case in pairs(unknown_space_cases) do "Existing spaces stats haven't changed") end end + +local prepare_select_data = function(g) + helpers.insert_objects(g, space_name, { + -- Storage is s-2. + { + id = 1, name = "Elizabeth", last_name = "Jackson", + age = 12, city = "New York", + }, + -- Storage is s-2. + { + id = 2, name = "Mary", last_name = "Brown", + age = 46, city = "Los Angeles", + }, + -- Storage is s-1. + { + id = 3, name = "David", last_name = "Smith", + age = 33, city = "Los Angeles", + }, + -- Storage is s-2. + { + id = 4, name = "William", last_name = "White", + age = 81, city = "Chicago", + } + }) +end + +local select_cases = { + select_by_primary_index = { + func = 'crud.select', + conditions = {{ '==', 'id_index', 3 }}, + map_reduces = 0, + tuples_fetched = 1, + tuples_lookup = 1, + }, + select_by_secondary_index = { + func = 'crud.select', + conditions = {{ '==', 'age_index', 46 }}, + map_reduces = 1, + tuples_fetched = 1, + tuples_lookup = 1, + }, + select_full_scan = { + func = 'crud.select', + conditions = {{ '>', 'id_index', 0 }, { '==', 'city', 'Kyoto' }}, + map_reduces = 1, + tuples_fetched = 0, + tuples_lookup = 4, + }, + pairs_by_primary_index = { + eval = eval.pairs, + conditions = {{ '==', 'id_index', 3 }}, + map_reduces = 0, + tuples_fetched = 1, + tuples_lookup = 1, + }, + pairs_by_secondary_index = { + eval = eval.pairs, + conditions = {{ '==', 'age_index', 46 }}, + map_reduces = 1, + tuples_fetched = 1, + tuples_lookup = 1, + }, + pairs_full_scan = { + eval = eval.pairs, + conditions = {{ '>', 'id_index', 0 }, { '==', 'city', 'Kyoto' }}, + map_reduces = 1, + tuples_fetched = 0, + tuples_lookup = 4, + }, +} + +for name, case in pairs(select_cases) do + local test_name = ('test_%s_details'):format(name) + + g.before_test(test_name, prepare_select_data) + + g[test_name] = function(g) + local op = 'select' + local space_name = space_name + + -- Collect stats before call. + local stats_before = g:get_stats(space_name) + t.assert_type(stats_before, 'table') + + -- Call operation. + local _, err + if case.eval ~= nil then + _, err = g.router:eval(case.eval, { space_name, case.conditions }) + else + _, err = g.router:call(case.func, { space_name, case.conditions }) + end + + t.assert_equals(err, nil) + + -- Collect stats after call. + local stats_after = g:get_stats(space_name) + t.assert_type(stats_after, 'table') + + local op_before = get_before_stats(stats_before, op) + local details_before = op_before.details + local details_after = stats_after[op].details + + local tuples_fetched_diff = details_after.tuples_fetched - details_before.tuples_fetched + t.assert_equals(tuples_fetched_diff, case.tuples_fetched, + 'Expected count of tuples fetched') + + local tuples_lookup_diff = details_after.tuples_lookup - details_before.tuples_lookup + t.assert_equals(tuples_lookup_diff, case.tuples_lookup, + 'Expected count of tuples looked up on storage') + + local map_reduces_diff = details_after.map_reduces - details_before.map_reduces + t.assert_equals(map_reduces_diff, case.map_reduces, + 'Expected count of map reduces planned') + end +end diff --git a/test/unit/stats_test.lua b/test/unit/stats_test.lua index 97a8d124..385f1566 100644 --- a/test/unit/stats_test.lua +++ b/test/unit/stats_test.lua @@ -192,6 +192,19 @@ for name, case in pairs(observe_cases) do }, 'Other status collectors initialized after observations' ) + + -- SELECT collectors have additional details section. + if op == stats_module.op.SELECT then + t.assert_equals( + op_stats.details, + { + tuples_fetched = 0, + tuples_lookup = 0, + map_reduces = 0, + }, + 'Detail collectors initialized after select observations' + ) + end end end end @@ -441,3 +454,45 @@ g.test_enabling_stats_on_non_router_throws_error = function(g) local storage = g.cluster:server('s1-master').net_box t.assert_error(storage.eval, storage, " require('crud.stats.module').enable() ") end + +g.test_stats_fetch_callback = function(g) + local storage_cursor_stats = { tuples_fetched = 5, tuples_lookup = 25 } + + g.router:eval([[ stats_module.get_fetch_callback()(...) ]], + { storage_cursor_stats, space_name }) + + local op = stats_module.op.SELECT + local stats = g:get_stats(space_name) + + t.assert_not_equals(stats[op], nil, + 'Fetch stats update inits SELECT collectors') + + local details = stats[op].details + + t.assert_equals(details.tuples_fetched, 5, + 'tuples_fetched is inremented by expected value') + t.assert_equals(details.tuples_lookup, 25, + 'tuples_lookup is inremented by expected value') +end + +g.test_disable_stats_before_fetch_callback_get_do_not_break_call = function(g) + g:disable_stats() + + local storage_cursor_stats = { tuples_fetched = 5, tuples_lookup = 25 } + g.router:eval([[ stats_module.get_fetch_callback()(...) ]], + { storage_cursor_stats, space_name }) + + t.success('No unexpected errors') +end + +g.test_disable_stats_after_fetch_callback_get_do_not_break_call = function(g) + local storage_cursor_stats = { tuples_fetched = 5, tuples_lookup = 25 } + + g.router:eval([[ + local callback = stats_module.get_fetch_callback() + stats_module.disable() + callback(...) + ]], { storage_cursor_stats, space_name }) + + t.success('No unexpected errors') +end