Skip to content

Commit

Permalink
Add detailed statistics for select/pairs
Browse files Browse the repository at this point in the history
After this patch, statistics `select` section additionally contains
`details` collectors.

```
crud.stats('my_space').select.details
---
- map_reduces: 4
  tuples_fetched: 10500
  tuples_lookup: 238000
...
```

`map_reduces` is a count of planned map reduces
(including those not executed successfully). `tuples_fetched`
is a count of tuples fetched from storages during execution,
`tuples_lookup` is a count of tuples looked up on storages
while collecting response for call.

Fiber storages used to pass additional context info for stats where
possible. Cursor for select/pairs passes statistics info between
router and storage. Info about fetched and looked up tuples updated
through callbacks.

Part of #224
  • Loading branch information
DifferentialOrange committed Feb 4, 2022
1 parent 67dbd32 commit e1811b8
Show file tree
Hide file tree
Showing 14 changed files with 452 additions and 10 deletions.
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,21 @@ returns. `count` is total requests count since instance start
or stats restart. `latency` is average time of requests execution,
`time` is total time of requests execution.

`select` section additionally contains `details` collectors.
```lua
crud.stats('my_space').select.details
---
- map_reduces: 4
tuples_fetched: 10500
tuples_lookup: 238000
...
```
`map_reduces` is a count of planned map reduces
(including those not executed successfully). `tuples_fetched`
is a count of tuples fetched from storages during execution,
`tuples_lookup` is a count of tuples looked up on storages
while collecting response for call.

Statistics are preserved between package reloads or [Tarantool Cartridge
role reloads](https://www.tarantool.io/en/doc/latest/book/cartridge/cartridge_api/modules/cartridge.roles/#reload).

Expand Down
22 changes: 22 additions & 0 deletions crud/common/fiber_context.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
local fiber = require('fiber')

local fiber_context = {}

function fiber_context.init_section(section)
local storage = fiber.self().storage
if storage[section] == nil then
storage[section] = {}
end

return storage[section]
end

function fiber_context.get_section(section)
return fiber.self().storage[section]
end

function fiber_context.drop_section(section)
fiber.self().storage[section] = nil
end

return fiber_context
3 changes: 3 additions & 0 deletions crud/common/utils.lua
Original file line number Diff line number Diff line change
Expand Up @@ -610,4 +610,7 @@ function utils.space_doesnt_exist_msg(space_name)
return ("Space %q doesn't exist"):format(space_name)
end

-- Do nothing.
function utils.do_nothing() end

return utils
4 changes: 4 additions & 0 deletions crud/select.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ local select_executor = require('crud.select.executor')
local select_filters = require('crud.select.filters')
local dev_checks = require('crud.common.dev_checks')
local schema = require('crud.common.schema')
local fiber_context = require('crud.common.fiber_context')

local SelectError = errors.new_class('SelectError')

Expand Down Expand Up @@ -76,6 +77,9 @@ local function select_on_storage(space_name, index_id, conditions, opts)
cursor = make_cursor(tuples)
end

cursor.stats = fiber_context.get_section('storage_stats')
fiber_context.drop_section('storage_stats')

-- getting tuples with user defined fields (if `fields` option is specified)
-- and fields that are needed for comparison on router (primary key + scan key)
return cursor, schema.filter_tuples_fields(tuples, opts.field_names)
Expand Down
15 changes: 14 additions & 1 deletion crud/select/compat/select.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ local checks = require('checks')
local errors = require('errors')
local vshard = require('vshard')

local fiber_context = require('crud.common.fiber_context')
local utils = require('crud.common.utils')
local sharding = require('crud.common.sharding')
local dev_checks = require('crud.common.dev_checks')
local common = require('crud.select.compat.common')
local schema = require('crud.common.schema')
local sharding_key_module = require('crud.common.sharding_key')
local stats = require('crud.stats.module')

local compare_conditions = require('crud.compare.conditions')
local select_plan = require('crud.select.plan')
Expand Down Expand Up @@ -111,6 +113,12 @@ local function build_select_iterator(space_name, user_conditions, opts)
if err ~= nil then
return nil, err, true
end
else
local context_stats = fiber_context.init_section('router_stats')
if context_stats.map_reduces == nil then
context_stats.map_reduces = 0
end
context_stats.map_reduces = context_stats.map_reduces + 1
end

local tuples_limit = opts.first
Expand Down Expand Up @@ -142,7 +150,12 @@ local function build_select_iterator(space_name, user_conditions, opts)
local merger = Merger.new(replicasets_to_select, space, plan.index_id,
common.SELECT_FUNC_NAME,
{space_name, plan.index_id, plan.conditions, select_opts},
{tarantool_iter = plan.tarantool_iter, field_names = plan.field_names, call_opts = opts.call_opts}
{
tarantool_iter = plan.tarantool_iter,
field_names = plan.field_names,
call_opts = opts.call_opts,
stats_callback = stats.get_fetch_callback(),
}
)

-- filter space format by plan.field_names (user defined fields + primary key + scan key)
Expand Down
17 changes: 17 additions & 0 deletions crud/select/compat/select_old.lua
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ local sharding = require('crud.common.sharding')
local dev_checks = require('crud.common.dev_checks')
local schema = require('crud.common.schema')
local sharding_key_module = require('crud.common.sharding_key')
local stats = require('crud.stats.module')
local fiber_context = require('crud.common.fiber_context')

local compare_conditions = require('crud.compare.conditions')
local select_plan = require('crud.select.plan')
Expand All @@ -30,6 +32,7 @@ local function select_iteration(space_name, plan, opts)
})

local call_opts = opts.call_opts
local stats_callback = stats.get_fetch_callback()

-- call select on storages
local storage_select_opts = {
Expand Down Expand Up @@ -59,6 +62,14 @@ local function select_iteration(space_name, plan, opts)

local tuples = {}
for replicaset_uuid, replicaset_results in pairs(results) do
-- Stats extracted with callback here and not passed
-- outside to wrapper because fetch for pairs can be
-- called even after pairs() return from generators.
local cursor = replicaset_results[1]
if cursor.stats ~= nil then
stats_callback(cursor.stats, space_name)
end

tuples[replicaset_uuid] = replicaset_results[2]
end

Expand Down Expand Up @@ -137,6 +148,12 @@ local function build_select_iterator(space_name, user_conditions, opts)
if err ~= nil then
return nil, err, true
end
else
local context_stats = fiber_context.init_section('router_stats')
if context_stats.map_reduces == nil then
context_stats.map_reduces = 0
end
context_stats.map_reduces = context_stats.map_reduces + 1
end

-- generate tuples comparator
Expand Down
11 changes: 8 additions & 3 deletions crud/select/executor.lua
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
local errors = require('errors')

local dev_checks = require('crud.common.dev_checks')
local fiber_context = require('crud.common.fiber_context')
local select_comparators = require('crud.compare.comparators')
local compat = require('crud.common.compat')
local has_keydef = compat.exists('tuple.keydef', 'key_def')
Expand Down Expand Up @@ -68,12 +69,15 @@ function executor.execute(space, index, filter_func, opts)

opts = opts or {}

local stats = fiber_context.init_section('storage_stats')
stats.tuples_lookup = 0
stats.tuples_fetched = 0

if opts.limit == 0 then
return {}
end

local tuples = {}
local tuples_count = 0

local value = opts.scan_value
if opts.after_tuple ~= nil then
Expand Down Expand Up @@ -111,16 +115,17 @@ function executor.execute(space, index, filter_func, opts)

if matched then
table.insert(tuples, tuple)
tuples_count = tuples_count + 1
stats.tuples_fetched = stats.tuples_fetched + 1

if opts.limit ~= nil and tuples_count >= opts.limit then
if opts.limit ~= nil and stats.tuples_fetched >= opts.limit then
break
end
elseif early_exit then
break
end

gen.state, tuple = gen(gen.param, gen.state)
stats.tuples_lookup = stats.tuples_lookup + 1
end

return tuples
Expand Down
13 changes: 13 additions & 0 deletions crud/select/merger.lua
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ local function fetch_chunk(context, state)
local replicaset = context.replicaset
local vshard_call_name = context.vshard_call_name
local timeout = context.timeout or call.DEFAULT_VSHARD_CALL_TIMEOUT
local space_name = context.space_name
local stats_callback = context.stats_callback
local future = state.future

-- The source was entirely drained.
Expand All @@ -109,6 +111,14 @@ local function fetch_chunk(context, state)
-- Decode metainfo, leave data to be processed by the merger.
local cursor = decode_metainfo(buf)

-- Extract stats info.
-- Stats extracted with callback here and not passed
-- outside to wrapper because fetch for pairs can be
-- called even after pairs() return from generators.
if cursor.stats ~= nil and stats_callback ~= nil then
stats_callback(cursor.stats, space_name)
end

-- Check whether we need the next call.
if cursor.is_end then
local next_state = {}
Expand Down Expand Up @@ -137,6 +147,7 @@ local function new(replicasets, space, index_id, func_name, func_args, opts)
opts = opts or {}
local call_opts = opts.call_opts
local mode = call_opts.mode or 'read'
local stats_callback = opts.stats_callback
local vshard_call_name = call.get_vshard_call_name(mode, call_opts.prefer_replica, call_opts.balance)

-- Request a first data chunk and create merger sources.
Expand All @@ -157,6 +168,8 @@ local function new(replicasets, space, index_id, func_name, func_args, opts)
replicaset = replicaset,
vshard_call_name = vshard_call_name,
timeout = call_opts.timeout,
stats_callback = stats_callback,
space_name = space.name,
}
local state = {future = future}
local source = merger_lib.new_buffer_source(fetch_chunk, context, state)
Expand Down
53 changes: 53 additions & 0 deletions crud/stats/local_registry.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
--

local dev_checks = require('crud.common.dev_checks')
local op_module = require('crud.stats.operation')
local registry_common = require('crud.stats.registry_common')
local stash = require('crud.stats.stash')

Expand Down Expand Up @@ -127,4 +128,56 @@ function registry.observe_space_not_found()
return true
end

--- Increase statistics of storage select/pairs calls
--
-- @function observe_fetch
--
-- @string space_name
-- Name of space.
--
-- @number tuples_fetched
-- Count of tuples fetched during storage call.
--
-- @number tuples_lookup
-- Count of tuples looked up on storages while collecting response.
--
-- @treturn boolean Returns true.
--
function registry.observe_fetch(tuples_fetched, tuples_lookup, space_name)
dev_checks('number', 'number', 'string')

local op = op_module.SELECT
registry_common.init_collectors_if_required(internal.registry.spaces, space_name, op)
local collectors = internal.registry.spaces[space_name][op].details

collectors.tuples_fetched = collectors.tuples_fetched + tuples_fetched
collectors.tuples_lookup = collectors.tuples_lookup + tuples_lookup

return true
end

--- Increase statistics of planned map reduces during select/pairs
--
-- @function observe_map_reduces
--
-- @number count
-- Count of map reduces planned.
--
-- @string space_name
-- Name of space.
--
-- @treturn boolean Returns true.
--
function registry.observe_map_reduces(count, space_name)
dev_checks('number', 'string')

local op = op_module.SELECT
registry_common.init_collectors_if_required(internal.registry.spaces, space_name, op)
local collectors = internal.registry.spaces[space_name][op].details

collectors.map_reduces = collectors.map_reduces + count

return true
end

return registry
Loading

0 comments on commit e1811b8

Please sign in to comment.