Skip to content

Commit

Permalink
api: compute per second rate for counters
Browse files Browse the repository at this point in the history
This patch introduces tool to compute counters per second rate. No
additional deepcopies are performed, same as in collect.

Part of tarantool/tarantool#7725
Part of tarantool/tarantool#7728
  • Loading branch information
DifferentialOrange committed Feb 15, 2023
1 parent 1619c32 commit d87459c
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 3 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `histogram_obj:collect()`;
- `summary_obj:collect()`;
- `metrics.collect()`;
- tools to compute metrics aggregates
- tools to compute metrics aggregates:
- per second rate for counters;

### Changed
- Setup cartridge hotreload inside the role
Expand Down
5 changes: 5 additions & 0 deletions doc/monitoring/api_reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,11 @@ Metrics functions
Each aggregate metric is marked with ``metainfo.aggregate = true``. Aggregates
for aggregates are not computed.

Supported aggregates:

* ``rate`` for counter collectors: per second rate of value change for the last
two observations.

:param table output_with_aggregates_prev: a previous result of this method call.
Use ``nil`` if this is the first invokation. You may use
``metrics.collect{extended_format = true}`` result instead, but in this case
Expand Down
87 changes: 85 additions & 2 deletions metrics/aggregates.lua
Original file line number Diff line number Diff line change
@@ -1,12 +1,93 @@
local default_kind_rules = {}
local string_utils = require('metrics.string_utils')
local Counter = require('metrics.collectors.counter')
local Gauge = require('metrics.collectors.gauge')

local rule_processors = {}
local mksec_in_sec = 1e6

local RATE_SUFFIX = 'per_second'

local function compute_rate_value(time_delta, obs_prev, obs)
if obs_prev == nil then
return nil
end

return {
label_pairs = obs.label_pairs,
value = tonumber(obs.value - obs_prev.value) / (time_delta / mksec_in_sec)
}
end

local function compute_counter_rate(output_with_aggregates_prev, output, coll_key, coll_obs)
local prev_coll_obs = output_with_aggregates_prev[coll_key]

if prev_coll_obs == nil then
return output
end

local name = string_utils.build_name(coll_obs.name_prefix, RATE_SUFFIX)
local kind = Gauge.kind -- Derivative of monotonic is not monotonic.
local registry_key = string_utils.build_registry_key(name, kind)

if output[registry_key] ~= nil then
-- If, for any reason, registry collision had happenned,
-- we assume that there is already an aggregate metric with the
-- similar meaning.
return output
end

-- ULL subtraction on older Tarantools yields big ULL.
if coll_obs.timestamp <= prev_coll_obs.timestamp then
return output
end

-- tonumber to work with float deltas instead of cdata integers.
local time_delta = tonumber(coll_obs.timestamp - prev_coll_obs.timestamp)

if time_delta <= 0 then
return output
end

local values = {}

for key, obs in pairs(coll_obs.observations['']) do
local obs_prev = prev_coll_obs.observations[''][key]
values[key] = compute_rate_value(time_delta, obs_prev, obs)
end

local metainfo = table.deepcopy(coll_obs.metainfo)
metainfo.aggregate = true

output[registry_key] = {
name = name,
name_prefix = coll_obs.name_prefix,
help = "Average per second rate of change of " .. coll_obs.name,
kind = kind,
metainfo = metainfo,
timestamp = coll_obs.timestamp,
observations = {[''] = values}
}

return output
end


local default_kind_rules = {
[Counter.kind] = { 'rate' },
}

local rule_processors = {
rate = compute_counter_rate,
}

local function compute(output_with_aggregates_prev, output, kind_rules)
output_with_aggregates_prev = output_with_aggregates_prev or {}
kind_rules = kind_rules or default_kind_rules

for coll_key, coll_obs in pairs(output) do
if coll_obs.metainfo.aggregate then
goto continue
end

local coll_rules = kind_rules[coll_obs.kind] or {}
for _, rule in ipairs(coll_rules) do
if rule_processors[rule] == nil then
Expand All @@ -15,6 +96,8 @@ local function compute(output_with_aggregates_prev, output, kind_rules)

output = rule_processors[rule](output_with_aggregates_prev, output, coll_key, coll_obs)
end

:: continue ::
end

return output
Expand Down
83 changes: 83 additions & 0 deletions test/aggregates_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ local t = require('luatest')
local g = t.group('metrics_aggregates')

local metrics = require('metrics')
local utils = require('test.utils')

local function get_counter_example(timestamp, value1, value2)
local res = {
Expand Down Expand Up @@ -47,3 +48,85 @@ g.test_no_rules = function()
local original_output = table.deepcopy(output)
t.assert_equals(metrics.compute_aggregates(nil, output), original_output)
end

g.test_counter_rate_no_previous_data = function()
local output = get_counter_example(1676364616294847ULL, 14148, 3204)

local output_with_aggregates = metrics.compute_aggregates(nil, output)
t.assert_equals(utils.len(output_with_aggregates), 1,
"No rate computed for a single observation")
end

g.test_counter_rate = function()
local output_1 = get_counter_example(1676364616294847ULL, 14148, 3204)
local output_2 = get_counter_example(1676364616294847ULL + 100 * 1e6, 14148 + 200, 3204 + 50)

local output_with_aggregates_1 = metrics.compute_aggregates(nil, output_1)
local output_with_aggregates_2 = metrics.compute_aggregates(output_with_aggregates_1, output_2)

t.assert_equals(utils.len(output_with_aggregates_2), 2, "Rate computed")

local rate_obs = output_with_aggregates_2['lj_gc_steps_propagate_per_secondgauge']
t.assert_not_equals(rate_obs, nil, "Rate computed")
t.assert_equals(rate_obs.name, 'lj_gc_steps_propagate_per_second')
t.assert_equals(rate_obs.name_prefix, 'lj_gc_steps_propagate')
t.assert_equals(rate_obs.kind, 'gauge')
t.assert_equals(rate_obs.help, 'Average per second rate of change of lj_gc_steps_propagate_total')
t.assert_equals(rate_obs.metainfo.default, true)
t.assert_equals(rate_obs.metainfo.aggregate, true)
t.assert_equals(rate_obs.timestamp, 1676364616294847ULL + 100 * 1e6)
t.assert_equals(rate_obs.observations[''][''].label_pairs, { alias = 'router' })
t.assert_almost_equals(rate_obs.observations[''][''].value, 200 / 100)
t.assert_equals(rate_obs.observations['']['source\tvinyl_procedures'].label_pairs,
{ alias = 'router', source = 'vinyl_procedures' })
t.assert_almost_equals(rate_obs.observations['']['source\tvinyl_procedures'].value, 50 / 100)
end

g.test_counter_rate_new_label = function()
local output_1 = get_counter_example(1676364616294847ULL, 14148, nil)
local output_2 = get_counter_example(1676364616294847ULL + 100 * 1e6, 14148 + 200, 3204)

local output_with_aggregates_1 = metrics.compute_aggregates(nil, output_1)
local output_with_aggregates_2 = metrics.compute_aggregates(output_with_aggregates_1, output_2)

t.assert_equals(utils.len(output_with_aggregates_2), 2, "Rate computed")

local rate_obs = output_with_aggregates_2['lj_gc_steps_propagate_per_secondgauge']
t.assert_not_equals(rate_obs, nil, "Rate computed")
t.assert_not_equals(rate_obs.observations[''][''], nil)
t.assert_equals(rate_obs.observations['']['source\tvinyl_procedures'], nil)
end

g.test_counter_rate_wrong_timeline = function()
local output_1 = get_counter_example(1676364616294847ULL, 14148, 3204)
local output_2 = get_counter_example(1676364616294847ULL + 100 * 1e6, 14148 + 200, 3204 + 50)

local output_with_aggregates_2 = metrics.compute_aggregates(nil, output_2)
local output_with_aggregates_1 = metrics.compute_aggregates(output_with_aggregates_2, output_1)

t.assert_equals(utils.len(output_with_aggregates_1), 1,
"No rate computed for reverse observations timeline")
end

g.test_counter_rate_too_high_collect_rate = function()
local output_1 = get_counter_example(1676364616294847ULL, 14148, 3204)
local output_2 = get_counter_example(1676364616294847ULL, 14148 + 200, 3204 + 50)

local output_with_aggregates_1 = metrics.compute_aggregates(nil, output_1)
local output_with_aggregates_2 = metrics.compute_aggregates(output_with_aggregates_1, output_2)

t.assert_equals(utils.len(output_with_aggregates_2), 1,
"No rate computed if two observations are for the same time")
end

g.test_counter_rate_disabled = function()
local output_1 = get_counter_example(1676364616294847ULL, 14148, 3204)
local output_2 = get_counter_example(1676364616294847ULL + 100 * 1e6, 14148 + 200, 3204 + 50)

local opts = { counter = {} }
local output_with_aggregates_1 = metrics.compute_aggregates(nil, output_1, opts)
local output_with_aggregates_2 = metrics.compute_aggregates(output_with_aggregates_1, output_2, opts)

t.assert_equals(utils.len(output_with_aggregates_2), 1,
"No rate computed due to options")
end

0 comments on commit d87459c

Please sign in to comment.