Skip to content

Commit

Permalink
stats: add detailed statistics for select/pairs
Browse files Browse the repository at this point in the history
After this patch, statistics `select` section additionally contains
`details` collectors.

```
crud.stats('my_space').select.details
---
- map_reduces: 4
  tuples_fetched: 10500
  tuples_lookup: 238000
...
```

`map_reduces` is the count of planned map reduces (including those not
executed successfully). `tuples_fetched` is the count of tuples fetched
from storages during execution, `tuples_lookup` is the count of tuples
looked up on storages while collecting responses for calls (including
scrolls for multibatch requests). Details data is updated as part of
the request process, so you may get new details before `select`/`pairs`
call is finished and observed with count, latency and time collectors.

Part of #224
  • Loading branch information
DifferentialOrange committed Feb 18, 2022
1 parent 144deef commit cc763bf
Show file tree
Hide file tree
Showing 13 changed files with 446 additions and 33 deletions.
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,23 @@ returns. `count` is total requests count since instance start
or stats restart. `latency` is average time of requests execution,
`time` is the total time of requests execution.

`select` section additionally contains `details` collectors.
```lua
crud.stats('my_space').select.details
---
- map_reduces: 4
tuples_fetched: 10500
tuples_lookup: 238000
...
```
`map_reduces` is the count of planned map reduces (including those not
executed successfully). `tuples_fetched` is the count of tuples fetched
from storages during execution, `tuples_lookup` is the count of tuples
looked up on storages while collecting responses for calls (including
scrolls for multibatch requests). Details data is updated as part of
the request process, so you may get new details before `select`/`pairs`
call is finished and observed with count, latency and time collectors.

Since `pairs` request behavior differs from any other crud request, its
statistics collection also has specific behavior. Statistics (`select`
section) are updated after `pairs` cycle is finished: you
Expand Down
3 changes: 3 additions & 0 deletions crud/common/utils.lua
Original file line number Diff line number Diff line change
Expand Up @@ -606,4 +606,7 @@ function utils.merge_options(opts_a, opts_b)
return fun.chain(opts_a or {}, opts_b or {}):tomap()
end

-- Do nothing.
function utils.do_nothing() end

return utils
10 changes: 6 additions & 4 deletions crud/select.lua
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ local function select_on_storage(space_name, index_id, conditions, opts)
end

-- execute select
local tuples, err = select_executor.execute(space, index, filter_func, {
local resp, err = select_executor.execute(space, index, filter_func, {
scan_value = opts.scan_value,
after_tuple = opts.after_tuple,
tarantool_iter = opts.tarantool_iter,
Expand All @@ -70,15 +70,17 @@ local function select_on_storage(space_name, index_id, conditions, opts)
end

local cursor
if #tuples < opts.limit or opts.limit == 0 then
if resp.stats.tuples_fetched < opts.limit or opts.limit == 0 then
cursor = {is_end = true}
else
cursor = make_cursor(tuples)
cursor = make_cursor(resp.tuples)
end

cursor.stats = resp.stats

-- getting tuples with user defined fields (if `fields` option is specified)
-- and fields that are needed for comparison on router (primary key + scan key)
return cursor, schema.filter_tuples_fields(tuples, opts.field_names)
return cursor, schema.filter_tuples_fields(resp.tuples, opts.field_names)
end

function select_module.init()
Expand Down
10 changes: 9 additions & 1 deletion crud/select/compat/select.lua
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ local dev_checks = require('crud.common.dev_checks')
local common = require('crud.select.compat.common')
local schema = require('crud.common.schema')
local sharding_key_module = require('crud.common.sharding_key')
local stats = require('crud.stats')

local compare_conditions = require('crud.compare.conditions')
local select_plan = require('crud.select.plan')
Expand Down Expand Up @@ -111,6 +112,8 @@ local function build_select_iterator(space_name, user_conditions, opts)
if err ~= nil then
return nil, err, true
end
else
stats.update_map_reduces(space_name)
end

local tuples_limit = opts.first
Expand Down Expand Up @@ -142,7 +145,12 @@ local function build_select_iterator(space_name, user_conditions, opts)
local merger = Merger.new(replicasets_to_select, space, plan.index_id,
common.SELECT_FUNC_NAME,
{space_name, plan.index_id, plan.conditions, select_opts},
{tarantool_iter = plan.tarantool_iter, field_names = plan.field_names, call_opts = opts.call_opts}
{
tarantool_iter = plan.tarantool_iter,
field_names = plan.field_names,
call_opts = opts.call_opts,
stats_callback = stats.get_fetch_callback(),
}
)

-- filter space format by plan.field_names (user defined fields + primary key + scan key)
Expand Down
12 changes: 12 additions & 0 deletions crud/select/compat/select_old.lua
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ local sharding = require('crud.common.sharding')
local dev_checks = require('crud.common.dev_checks')
local schema = require('crud.common.schema')
local sharding_key_module = require('crud.common.sharding_key')
local stats = require('crud.stats')

local compare_conditions = require('crud.compare.conditions')
local select_plan = require('crud.select.plan')
Expand All @@ -30,6 +31,7 @@ local function select_iteration(space_name, plan, opts)
})

local call_opts = opts.call_opts
local stats_callback = stats.get_fetch_callback()

-- call select on storages
local storage_select_opts = {
Expand Down Expand Up @@ -59,6 +61,14 @@ local function select_iteration(space_name, plan, opts)

local tuples = {}
for replicaset_uuid, replicaset_results in pairs(results) do
-- Stats extracted with callback here and not passed
-- outside to wrapper because fetch for pairs can be
-- called even after pairs() return from generators.
local cursor = replicaset_results[1]
if cursor.stats ~= nil then
stats_callback(cursor.stats, space_name)
end

tuples[replicaset_uuid] = replicaset_results[2]
end

Expand Down Expand Up @@ -137,6 +147,8 @@ local function build_select_iterator(space_name, user_conditions, opts)
if err ~= nil then
return nil, err, true
end
else
stats.update_map_reduces(space_name)
end

-- generate tuples comparator
Expand Down
29 changes: 18 additions & 11 deletions crud/select/executor.lua
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ local ExecuteSelectError = errors.new_class('ExecuteSelectError')

local executor = {}

local function scroll_to_after_tuple(gen, space, scan_index, tarantool_iter, after_tuple)
local function scroll_to_after_tuple(gen, space, scan_index, tarantool_iter, after_tuple, stats)
local primary_index = space.index[0]

local scroll_key_parts = utils.merge_primary_key_parts(scan_index.parts, primary_index.parts)
Expand All @@ -32,6 +32,8 @@ local function scroll_to_after_tuple(gen, space, scan_index, tarantool_iter, aft
return nil
end

stats.tuples_lookup = stats.tuples_lookup + 1

if scroll_comparator(tuple, after_tuple) then
return tuple
end
Expand Down Expand Up @@ -68,13 +70,16 @@ function executor.execute(space, index, filter_func, opts)

opts = opts or {}

local stats = {
tuples_lookup = 0,
tuples_fetched = 0,
}
local resp = { stats = stats, tuples = {} }

if opts.limit == 0 then
return {}
return resp
end

local tuples = {}
local tuples_count = 0

local value = opts.scan_value
if opts.after_tuple ~= nil then
local new_value = generate_value(opts.after_tuple, opts.scan_value, index.parts, opts.tarantool_iter)
Expand All @@ -88,13 +93,13 @@ function executor.execute(space, index, filter_func, opts)

if opts.after_tuple ~= nil then
local err
tuple, err = scroll_to_after_tuple(gen, space, index, opts.tarantool_iter, opts.after_tuple)
tuple, err = scroll_to_after_tuple(gen, space, index, opts.tarantool_iter, opts.after_tuple, resp.stats)
if err ~= nil then
return nil, ExecuteSelectError:new("Failed to scroll to the after_tuple: %s", err)
end

if tuple == nil then
return {}
return resp
end
end

Expand All @@ -107,13 +112,15 @@ function executor.execute(space, index, filter_func, opts)
break
end

stats.tuples_lookup = stats.tuples_lookup + 1

local matched, early_exit = filter_func(tuple)

if matched then
table.insert(tuples, tuple)
tuples_count = tuples_count + 1
table.insert(resp.tuples, tuple)
stats.tuples_fetched = stats.tuples_fetched + 1

if opts.limit ~= nil and tuples_count >= opts.limit then
if opts.limit ~= nil and stats.tuples_fetched >= opts.limit then
break
end
elseif early_exit then
Expand All @@ -123,7 +130,7 @@ function executor.execute(space, index, filter_func, opts)
gen.state, tuple = gen(gen.param, gen.state)
end

return tuples
return resp
end

return executor
13 changes: 13 additions & 0 deletions crud/select/merger.lua
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ local function fetch_chunk(context, state)
local replicaset = context.replicaset
local vshard_call_name = context.vshard_call_name
local timeout = context.timeout or call.DEFAULT_VSHARD_CALL_TIMEOUT
local space_name = context.space_name
local stats_callback = context.stats_callback
local future = state.future

-- The source was entirely drained.
Expand All @@ -109,6 +111,14 @@ local function fetch_chunk(context, state)
-- Decode metainfo, leave data to be processed by the merger.
local cursor = decode_metainfo(buf)

-- Extract stats info.
-- Stats extracted with callback here and not passed
-- outside to wrapper because fetch for pairs can be
-- called even after pairs() return from generators.
if cursor.stats ~= nil and stats_callback ~= nil then
stats_callback(cursor.stats, space_name)
end

-- Check whether we need the next call.
if cursor.is_end then
local next_state = {}
Expand Down Expand Up @@ -137,6 +147,7 @@ local function new(replicasets, space, index_id, func_name, func_args, opts)
opts = opts or {}
local call_opts = opts.call_opts
local mode = call_opts.mode or 'read'
local stats_callback = opts.stats_callback
local vshard_call_name = call.get_vshard_call_name(mode, call_opts.prefer_replica, call_opts.balance)

-- Request a first data chunk and create merger sources.
Expand All @@ -157,6 +168,8 @@ local function new(replicasets, space, index_id, func_name, func_args, opts)
replicaset = replicaset,
vshard_call_name = vshard_call_name,
timeout = call_opts.timeout,
stats_callback = stats_callback,
space_name = space.name,
}
local state = {future = future}
local source = merger_lib.new_buffer_source(fetch_chunk, context, state)
Expand Down
74 changes: 74 additions & 0 deletions crud/stats/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ local fun = require('fun')

local dev_checks = require('crud.common.dev_checks')
local stash = require('crud.common.stash')
local utils = require('crud.common.utils')
local op_module = require('crud.stats.operation')
local registry = require('crud.stats.local_registry')

Expand Down Expand Up @@ -258,6 +259,79 @@ function stats.wrap(func, op, opts)
end
end

local storage_stats_schema = { tuples_fetched = 'number', tuples_lookup = 'number' }
--- Callback to collect storage tuples stats (select/pairs).
--
-- @function update_fetch_stats
--
-- @local
--
-- @tab storage_stats
-- Statistics from select storage call.
--
-- @number storage_stats.tuples_fetched
-- Count of tuples fetched during storage call.
--
-- @number storage_stats.tuples_lookup
-- Count of tuples looked up on storages while collecting response.
--
-- @string space_name
-- Name of space.
--
-- @treturn boolean Returns `true`.
--
local function update_fetch_stats(storage_stats, space_name)
dev_checks(storage_stats_schema, 'string')

if not stats.is_enabled() then
return true
end

registry.observe_fetch(
storage_stats.tuples_fetched,
storage_stats.tuples_lookup,
space_name
)

return true
end

--- Returns callback to collect storage tuples stats (select/pairs).
--
-- @function get_fetch_callback
--
-- @treturn[1] function `update_fetch_stats` function to collect tuples stats.
-- @treturn[2] function Dummy function, if stats disabled.
--
function stats.get_fetch_callback()
if not stats.is_enabled() then
return utils.do_nothing
end

return update_fetch_stats
end

--- Callback to collect planned map reduces stats (select/pairs).
--
-- @function update_map_reduces
--
-- @string space_name
-- Name of space.
--
-- @treturn boolean Returns `true`.
--
function stats.update_map_reduces(space_name)
dev_checks('string')

if not stats.is_enabled() then
return true
end

registry.observe_map_reduces(1, space_name)

return true
end

--- Table with CRUD operation lables.
--
-- @tfield string INSERT
Expand Down
Loading

0 comments on commit cc763bf

Please sign in to comment.