diff --git a/metrics/plugins/graphite.lua b/metrics/plugins/graphite.lua index e75355c8..faa78b08 100644 --- a/metrics/plugins/graphite.lua +++ b/metrics/plugins/graphite.lua @@ -1,6 +1,7 @@ local socket = require('socket') local fiber = require('fiber') local metrics = require('metrics') +local string_utils = require('metrics.string_utils') local checks = require('checks') local log = require('log') local fun = require('fun') @@ -36,22 +37,63 @@ function graphite.format_observation(prefix, obs) return graph end -local function graphite_worker(opts) - fiber.name('metrics_graphite_worker') +graphite.internal = {} +function graphite.internal.collect_and_push_v1(opts) + metrics.invoke_callbacks() + for _, c in pairs(metrics.collectors()) do + for _, obs in ipairs(c:collect()) do + local data = graphite.format_observation(opts.prefix, obs) + local numbytes = opts.sock:sendto(opts.host, opts.port, data) + if numbytes == nil then + log.error('Error while sending to host %s port %s data %s', + opts.host, opts.port, data) + end + end + end +end - while true do - metrics.invoke_callbacks() - for _, c in pairs(metrics.collectors()) do - for _, obs in ipairs(c:collect()) do - local data = graphite.format_observation(opts.prefix, obs) - local numbytes = opts.sock:sendto(opts.host, opts.port, data) - if numbytes == nil then - log.error('Error while sending to host %s port %s data %s', - opts.host, opts.port, data) - end +function graphite.format_output(output, opts) + local result = {} + for _, coll_obs in pairs(output) do + for group_name, obs_group in pairs(coll_obs.observations) do + local metric_name = string_utils.build_name(coll_obs.name, group_name) + for _, obs in pairs(obs_group) do + local formatted_obs = graphite.format_observation(opts.prefix, + { + metric_name = metric_name, + label_pairs = obs.label_pairs, + timestamp = coll_obs.timestamp, + value = obs.value + }) + table.insert(result, formatted_obs) end end + end + + return result +end + +function graphite.send_formatted(formatted_output, opts) + for _, data in ipairs(formatted_output) do + local numbytes = opts.sock:sendto(opts.host, opts.port, data) + if numbytes == nil then + log.error('Error while sending to host %s port %s data %s', + opts.host, opts.port, data) + end + end +end + +function graphite.internal.collect_and_push_v2(opts) + local output = metrics.collect{invoke_callbacks = true, extended_format = true} + local formatted_output = graphite.format_output(output, opts) + graphite.send_formatted(formatted_output, opts) +end +local function graphite_worker(opts) + fiber.name('metrics_graphite_worker') + + while true do + graphite.internal.collect_and_push_v2(opts) fiber.sleep(opts.send_interval) end end diff --git a/test/plugins/graphite_test.lua b/test/plugins/graphite_test.lua index 8891dc06..136ece06 100755 --- a/test/plugins/graphite_test.lua +++ b/test/plugins/graphite_test.lua @@ -153,3 +153,69 @@ g.test_graphite_kills_previous_fibers_on_init = function() fiber.yield() -- let cancelled fibers disappear from fiber.info() t.assert_equals(count_workers(), 1) end + +g.test_collect_and_push_preseves_format = function(group) + -- Prepare some data for all collector types. + metrics.cfg{include = 'all', exclude = {}, labels = {alias = 'router-3'}} + + local c = metrics.counter('cnt', nil, {my_useful_info = 'here'}) + c:inc(3, {mylabel = 'myvalue1'}) + c:inc(2, {mylabel = 'myvalue2'}) + + c = metrics.gauge('gauge', nil, {my_useful_info = 'here'}) + c:set(3, {mylabel = 'myvalue1'}) + c:set(2, {mylabel = 'myvalue2'}) + + c = metrics.histogram('histogram', nil, {2, 4}, {my_useful_info = 'here'}) + c:observe(3, {mylabel = 'myvalue1'}) + c:observe(2, {mylabel = 'myvalue2'}) + + local port_v1 = 22003 + group.sock_v1 = socket('AF_INET', 'SOCK_DGRAM', 'udp') + group.sock_v1:bind('127.0.0.1', port_v1) + local port_v2 = 22004 + group.sock_v2 = socket('AF_INET', 'SOCK_DGRAM', 'udp') + group.sock_v2:bind('127.0.0.1', port_v2) + + graphite.internal.collect_and_push_v1({ + prefix = 'tarantool', + host = '127.0.0.1', + port = port_v1, + sock = group.sock_v1, + }) + graphite.internal.collect_and_push_v2({ + prefix = 'tarantool', + host = '127.0.0.1', + port = port_v2, + sock = group.sock_v2, + }) + + local output_v2 = '' + while true do + local output_v2_part = group.sock_v2:recvfrom(200) + if output_v2_part == nil or output_v2_part == '' then + break + end + + output_v2 = output_v2 .. ' ' .. output_v2_part + end + + while true do + local output_v1_part = group.sock_v1:recvfrom(200) + if output_v1_part == nil or output_v1_part == '' then + break + end + + t.assert_str_contains(output_v2, output_v1_part:split(' ')[1]) + end +end + +g.after_test('test_collect_and_push_preseves_format', function(group) + if group.sock_v1 then + group.sock_v1:close() + end + + if group.sock_v2 then + group.sock_v2:close() + end +end)