Skip to content

Commit

Permalink
Add detailed statistics for select/pairs
Browse files Browse the repository at this point in the history
After this patch, statistics `select` section additionally contains
`details` collectors.

```
crud.stats('my_space').select.details
---
- map_reduces: 4
  tuples_fetched: 10500
  tuples_lookup: 238000
...
```

`map_reduces` is a count of planned map reduces
(including those not executed successfully). `tuples_fetched`
is a count of tuples fetched from storages during execution,
`tuples_lookup` is a count of tuples looked up on storages
while collecting response for call.

Fiber storages used to pass additional context info for stats where
possible. Cursor for select/pairs passes statistics info between
router and storage. Info about fetched and looked up tuples updated
through callbacks.

Part of #224
  • Loading branch information
DifferentialOrange committed Dec 20, 2021
1 parent 5685009 commit fe50f30
Show file tree
Hide file tree
Showing 12 changed files with 384 additions and 8 deletions.
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,21 @@ returns. `count` is total requests count since instance start
or stats restart. `latency` is average time of requests execution,
`time` is total time of requests execution.

`select` section additionally contains `details` collectors.
```lua
crud.stats('my_space').select.details
---
- map_reduces: 4
tuples_fetched: 10500
tuples_lookup: 238000
...
```
`map_reduces` is a count of planned map reduces
(including those not executed successfully). `tuples_fetched`
is a count of tuples fetched from storages during execution,
`tuples_lookup` is a count of tuples looked up on storages
while collecting response for call.

## Cartridge roles

`cartridge.roles.crud-storage` is a Tarantool Cartridge role that depends on the
Expand Down
21 changes: 21 additions & 0 deletions crud/common/utils.lua
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
local errors = require('errors')
local ffi = require('ffi')
local fiber = require('fiber')
local vshard = require('vshard')
local fun = require('fun')

Expand Down Expand Up @@ -610,4 +611,24 @@ function utils.space_doesnt_exist_msg(space_name)
return ("Space %q doesn't exist"):format(space_name)
end

-- Do nothing.
function utils.pass() end

function utils.init_context_section(section)
local storage = fiber.self().storage
if storage[section] == nil then
storage[section] = {}
end

return storage[section]
end

function utils.get_context_section(section)
return fiber.self().storage[section]
end

function utils.drop_context_section(section)
fiber.self().storage[section] = nil
end

return utils
3 changes: 3 additions & 0 deletions crud/select.lua
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ local function select_on_storage(space_name, index_id, conditions, opts)
cursor = make_cursor(tuples)
end

cursor.stats = utils.get_context_section('storage_stats')
utils.drop_context_section('storage_stats')

-- getting tuples with user defined fields (if `fields` option is specified)
-- and fields that are needed for comparison on router (primary key + scan key)
return cursor, schema.filter_tuples_fields(tuples, opts.field_names)
Expand Down
11 changes: 10 additions & 1 deletion crud/select/compat/select.lua
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ local dev_checks = require('crud.common.dev_checks')
local common = require('crud.select.compat.common')
local schema = require('crud.common.schema')
local sharding_key_module = require('crud.common.sharding_key')
local stats = require('crud.stats.module')

local compare_conditions = require('crud.compare.conditions')
local select_plan = require('crud.select.plan')
Expand Down Expand Up @@ -111,6 +112,9 @@ local function build_select_iterator(space_name, user_conditions, opts)
if err ~= nil then
return nil, err, true
end
else
local context_stats = utils.init_context_section('router_stats')
context_stats.map_reduces = 1
end

local tuples_limit = opts.first
Expand Down Expand Up @@ -142,7 +146,12 @@ local function build_select_iterator(space_name, user_conditions, opts)
local merger = Merger.new(replicasets_to_select, space, plan.index_id,
common.SELECT_FUNC_NAME,
{space_name, plan.index_id, plan.conditions, select_opts},
{tarantool_iter = plan.tarantool_iter, field_names = plan.field_names, call_opts = opts.call_opts}
{
tarantool_iter = plan.tarantool_iter,
field_names = plan.field_names,
call_opts = opts.call_opts,
stats_callback = stats.get_fetch_callback(),
}
)

-- filter space format by plan.field_names (user defined fields + primary key + scan key)
Expand Down
13 changes: 13 additions & 0 deletions crud/select/compat/select_old.lua
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ local sharding = require('crud.common.sharding')
local dev_checks = require('crud.common.dev_checks')
local schema = require('crud.common.schema')
local sharding_key_module = require('crud.common.sharding_key')
local stats = require('crud.stats.module')

local compare_conditions = require('crud.compare.conditions')
local select_plan = require('crud.select.plan')
Expand All @@ -30,6 +31,7 @@ local function select_iteration(space_name, plan, opts)
})

local call_opts = opts.call_opts
local stats_callback = stats.get_fetch_callback()

-- call select on storages
local storage_select_opts = {
Expand Down Expand Up @@ -59,6 +61,14 @@ local function select_iteration(space_name, plan, opts)

local tuples = {}
for replicaset_uuid, replicaset_results in pairs(results) do
-- Stats extracted with callback here and not passed
-- outside to wrapper because fetch for pairs can be
-- called even after pairs() return from generators.
local cursor = replicaset_results[1]
if cursor.stats ~= nil then
stats_callback(cursor.stats, space_name)
end

tuples[replicaset_uuid] = replicaset_results[2]
end

Expand Down Expand Up @@ -137,6 +147,9 @@ local function build_select_iterator(space_name, user_conditions, opts)
if err ~= nil then
return nil, err, true
end
else
local context_stats = utils.init_context_section('router_stats')
context_stats.map_reduces = 1
end

-- generate tuples comparator
Expand Down
10 changes: 7 additions & 3 deletions crud/select/executor.lua
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,15 @@ function executor.execute(space, index, filter_func, opts)

opts = opts or {}

local stats = utils.init_context_section('storage_stats')
stats.tuples_lookup = 0
stats.tuples_fetched = 0

if opts.limit == 0 then
return {}
end

local tuples = {}
local tuples_count = 0

local value = opts.scan_value
if opts.after_tuple ~= nil then
Expand Down Expand Up @@ -111,16 +114,17 @@ function executor.execute(space, index, filter_func, opts)

if matched then
table.insert(tuples, tuple)
tuples_count = tuples_count + 1
stats.tuples_fetched = stats.tuples_fetched + 1

if opts.limit ~= nil and tuples_count >= opts.limit then
if opts.limit ~= nil and stats.tuples_fetched >= opts.limit then
break
end
elseif early_exit then
break
end

gen.state, tuple = gen(gen.param, gen.state)
stats.tuples_lookup = stats.tuples_lookup + 1
end

return tuples
Expand Down
13 changes: 13 additions & 0 deletions crud/select/merger.lua
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ local function fetch_chunk(context, state)
local replicaset = context.replicaset
local vshard_call_name = context.vshard_call_name
local timeout = context.timeout or call.DEFAULT_VSHARD_CALL_TIMEOUT
local space_name = context.space_name
local stats_callback = context.stats_callback
local future = state.future

-- The source was entirely drained.
Expand All @@ -109,6 +111,14 @@ local function fetch_chunk(context, state)
-- Decode metainfo, leave data to be processed by the merger.
local cursor = decode_metainfo(buf)

-- Extract stats info.
-- Stats extracted with callback here and not passed
-- outside to wrapper because fetch for pairs can be
-- called even after pairs() return from generators.
if cursor.stats ~= nil and stats_callback ~= nil then
stats_callback(cursor.stats, space_name)
end

-- Check whether we need the next call.
if cursor.is_end then
local next_state = {}
Expand Down Expand Up @@ -137,6 +147,7 @@ local function new(replicasets, space, index_id, func_name, func_args, opts)
opts = opts or {}
local call_opts = opts.call_opts
local mode = call_opts.mode or 'read'
local stats_callback = opts.stats_callback
local vshard_call_name = call.get_vshard_call_name(mode, call_opts.prefer_replica, call_opts.balance)

-- Request a first data chunk and create merger sources.
Expand All @@ -157,6 +168,8 @@ local function new(replicasets, space, index_id, func_name, func_args, opts)
replicaset = replicaset,
vshard_call_name = vshard_call_name,
timeout = call_opts.timeout,
stats_callback = stats_callback,
space_name = space.name,
}
local state = {future = future}
local source = merger_lib.new_buffer_source(fetch_chunk, context, state)
Expand Down
53 changes: 53 additions & 0 deletions crud/stats/local_registry.lua
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
local dev_checks = require('crud.common.dev_checks')
local op_module = require('crud.stats.operation')
local registry_common = require('crud.stats.registry_common')

local registry = {}
Expand Down Expand Up @@ -121,4 +122,56 @@ function registry.observe_space_not_found()
return true
end

--- Increase statistics of storage select/pairs calls
--
-- @function observe_fetch
--
-- @tparam string space_name
-- Name of space.
--
-- @tparam number tuples_fetched
-- Count of tuples fetched during storage call.
--
-- @tparam number tuples_lookup
-- Count of tuples looked up on storages while collecting response.
--
-- @treturn boolean Returns true.
--
function registry.observe_fetch(tuples_fetched, tuples_lookup, space_name)
dev_checks('number', 'number', 'string')

local op = op_module.SELECT
registry_common.init_collectors_if_required(internal_registry.spaces, space_name, op)
local collectors = internal_registry.spaces[space_name][op].details

collectors.tuples_fetched = collectors.tuples_fetched + tuples_fetched
collectors.tuples_lookup = collectors.tuples_lookup + tuples_lookup

return true
end

--- Increase statistics of planned map reduces during select/pairs
--
-- @function observe_map_reduces
--
-- @tparam number count
-- Count of map reduces planned.
--
-- @tparam string space_name
-- Name of space.
--
-- @treturn boolean Returns true.
--
function registry.observe_map_reduces(count, space_name)
dev_checks('number', 'string')

local op = op_module.SELECT
registry_common.init_collectors_if_required(internal_registry.spaces, space_name, op)
local collectors = internal_registry.spaces[space_name][op].details

collectors.map_reduces = collectors.map_reduces + count

return true
end

return registry
59 changes: 59 additions & 0 deletions crud/stats/module.lua
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ local function wrap_tail(space_name, op, opts, start_time, call_status, ...)
err = second_return_val
end

local context_stats = utils.get_context_section('router_stats')

-- If space not exists, do not build a separate collector for it.
-- Call request for non-existing space will always result in error.
-- The resulting overhead is insignificant for existing spaces:
Expand Down Expand Up @@ -144,6 +146,13 @@ local function wrap_tail(space_name, op, opts, start_time, call_status, ...)

registry.observe(latency, space_name, op, status)

if context_stats ~= nil then
if context_stats.map_reduces ~= nil then
registry.observe_map_reduces(context_stats.map_reduces, space_name)
end
utils.drop_context_section('router_stats')
end

:: return_values ::

if call_status == false then
Expand Down Expand Up @@ -202,6 +211,56 @@ function stats.wrap(func, op, opts)
end
end

local storage_stats_schema = { tuples_fetched = 'number', tuples_lookup = 'number' }
--- Callback to collect storage tuples stats (select/pairs)
--
-- @function update_fetch_stats
--
-- @tparam table storage_stats
-- Statistics from select storage call.
--
-- @tfield number tuples_fetched
-- Count of tuples fetched during storage call.
--
-- @tfield number tuples_lookup
-- Count of tuples looked up on storages while collecting response.
--
-- @tparam string space_name
-- Name of space.
--
-- @treturn boolean Returns true.
--
local function update_fetch_stats(storage_stats, space_name)
dev_checks(storage_stats_schema, 'string')

if not is_enabled then
return true
end

registry.observe_fetch(
storage_stats.tuples_fetched,
storage_stats.tuples_lookup,
space_name
)

return true
end

--- Returns callback to collect storage tuples stats (select/pairs)
--
-- @function get_fetch_callback
--
-- @treturn[1] function `update_fetch_stats` function to collect tuples stats.
-- @treturn[2] function Dummy function, if stats disabled.
--
function stats.get_fetch_callback()
if not is_enabled then
return utils.pass
end

return update_fetch_stats
end

--- Table with CRUD operation lables
--
-- @table label
Expand Down
Loading

0 comments on commit fe50f30

Please sign in to comment.