Skip to content

Commit

Permalink
internal: collect extended on graphite export
Browse files Browse the repository at this point in the history
After this patch, the only metrics method that plugin export uses
is metrics.collect{invoke_callbacks = true, extended_format = true}.
Process API is also separated, so now a developer can reuse export
handles if he already has an output (for example, processing historical
data in Flight Recorder).

Older version was preserved for test purposes to ensure that output is
the same in new implementation.

Part of tarantool/tarantool#7725
Part of tarantool/tarantool#7728
  • Loading branch information
DifferentialOrange committed Feb 15, 2023
1 parent a6d1cbb commit b2f41ee
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 12 deletions.
66 changes: 54 additions & 12 deletions metrics/plugins/graphite.lua
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
local socket = require('socket')
local fiber = require('fiber')
local metrics = require('metrics')
local string_utils = require('metrics.string_utils')
local checks = require('checks')
local log = require('log')
local fun = require('fun')
Expand Down Expand Up @@ -36,22 +37,63 @@ function graphite.format_observation(prefix, obs)
return graph
end

local function graphite_worker(opts)
fiber.name('metrics_graphite_worker')
graphite.internal = {}
function graphite.internal.collect_and_push_v1(opts)
metrics.invoke_callbacks()
for _, c in pairs(metrics.collectors()) do
for _, obs in ipairs(c:collect()) do
local data = graphite.format_observation(opts.prefix, obs)
local numbytes = opts.sock:sendto(opts.host, opts.port, data)
if numbytes == nil then
log.error('Error while sending to host %s port %s data %s',
opts.host, opts.port, data)
end
end
end
end

while true do
metrics.invoke_callbacks()
for _, c in pairs(metrics.collectors()) do
for _, obs in ipairs(c:collect()) do
local data = graphite.format_observation(opts.prefix, obs)
local numbytes = opts.sock:sendto(opts.host, opts.port, data)
if numbytes == nil then
log.error('Error while sending to host %s port %s data %s',
opts.host, opts.port, data)
end
function graphite.format_output(output, opts)
local result = {}
for _, coll_obs in pairs(output) do
for group_name, obs_group in pairs(coll_obs.observations) do
local metric_name = string_utils.build_name(coll_obs.name, group_name)
for _, obs in pairs(obs_group) do
local formatted_obs = graphite.format_observation(opts.prefix,
{
metric_name = metric_name,
label_pairs = obs.label_pairs,
timestamp = coll_obs.timestamp,
value = obs.value
})
table.insert(result, formatted_obs)
end
end
end

return result
end

function graphite.send_formatted(formatted_output, opts)
for _, data in ipairs(formatted_output) do
local numbytes = opts.sock:sendto(opts.host, opts.port, data)
if numbytes == nil then
log.error('Error while sending to host %s port %s data %s',
opts.host, opts.port, data)
end
end
end

function graphite.internal.collect_and_push_v2(opts)
local output = metrics.collect{invoke_callbacks = true, extended_format = true}
local formatted_output = graphite.format_output(output, opts)
graphite.send_formatted(formatted_output, opts)
end

local function graphite_worker(opts)
fiber.name('metrics_graphite_worker')

while true do
graphite.internal.collect_and_push_v2(opts)
fiber.sleep(opts.send_interval)
end
end
Expand Down
66 changes: 66 additions & 0 deletions test/plugins/graphite_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,69 @@ g.test_graphite_kills_previous_fibers_on_init = function()
fiber.yield() -- let cancelled fibers disappear from fiber.info()
t.assert_equals(count_workers(), 1)
end

g.test_collect_and_push_preseves_format = function(group)
-- Prepare some data for all collector types.
metrics.cfg{include = 'all', exclude = {}, labels = {alias = 'router-3'}}

local c = metrics.counter('cnt', nil, {my_useful_info = 'here'})
c:inc(3, {mylabel = 'myvalue1'})
c:inc(2, {mylabel = 'myvalue2'})

c = metrics.gauge('gauge', nil, {my_useful_info = 'here'})
c:set(3, {mylabel = 'myvalue1'})
c:set(2, {mylabel = 'myvalue2'})

c = metrics.histogram('histogram', nil, {2, 4}, {my_useful_info = 'here'})
c:observe(3, {mylabel = 'myvalue1'})
c:observe(2, {mylabel = 'myvalue2'})

local port_v1 = 22003
group.sock_v1 = socket('AF_INET', 'SOCK_DGRAM', 'udp')
group.sock_v1:bind('127.0.0.1', port_v1)
local port_v2 = 22004
group.sock_v2 = socket('AF_INET', 'SOCK_DGRAM', 'udp')
group.sock_v2:bind('127.0.0.1', port_v2)

graphite.internal.collect_and_push_v1({
prefix = 'tarantool',
host = '127.0.0.1',
port = port_v1,
sock = group.sock_v1,
})
graphite.internal.collect_and_push_v2({
prefix = 'tarantool',
host = '127.0.0.1',
port = port_v2,
sock = group.sock_v2,
})

local output_v2 = ''
while true do
local output_v2_part = group.sock_v2:recvfrom(200)
if output_v2_part == nil or output_v2_part == '' then
break
end

output_v2 = output_v2 .. ' ' .. output_v2_part
end

while true do
local output_v1_part = group.sock_v1:recvfrom(200)
if output_v1_part == nil or output_v1_part == '' then
break
end

t.assert_str_contains(output_v2, output_v1_part:split(' ')[1])
end
end

g.after_test('test_collect_and_push_preseves_format', function(group)
if group.sock_v1 then
group.sock_v1:close()
end

if group.sock_v2 then
group.sock_v2:close()
end
end)

0 comments on commit b2f41ee

Please sign in to comment.