From 97c29d89fbeeadd9edaaa6a60b6841d6ed1b9f51 Mon Sep 17 00:00:00 2001 From: Georgy Moiseev Date: Wed, 15 Feb 2023 12:03:19 +0300 Subject: [PATCH] internal: collect extended on graphite export After this patch, graphite export plugin is based on metrics.collect{invoke_callbacks = true, extended_format = true} result. Process API is also separated, so now a developer can reuse export handles if he already has an output (for example, processing historical data in Flight Recorder). Older version was preserved for test purposes to ensure that output is the same in new implementation. Part of tarantool/tarantool#7725 Part of tarantool/tarantool#7728 --- metrics/plugins/graphite.lua | 66 +++++++++++++++++++++++++++------- test/plugins/graphite_test.lua | 66 ++++++++++++++++++++++++++++++++++ 2 files changed, 120 insertions(+), 12 deletions(-) diff --git a/metrics/plugins/graphite.lua b/metrics/plugins/graphite.lua index e75355c8..faa78b08 100644 --- a/metrics/plugins/graphite.lua +++ b/metrics/plugins/graphite.lua @@ -1,6 +1,7 @@ local socket = require('socket') local fiber = require('fiber') local metrics = require('metrics') +local string_utils = require('metrics.string_utils') local checks = require('checks') local log = require('log') local fun = require('fun') @@ -36,22 +37,63 @@ function graphite.format_observation(prefix, obs) return graph end -local function graphite_worker(opts) - fiber.name('metrics_graphite_worker') +graphite.internal = {} +function graphite.internal.collect_and_push_v1(opts) + metrics.invoke_callbacks() + for _, c in pairs(metrics.collectors()) do + for _, obs in ipairs(c:collect()) do + local data = graphite.format_observation(opts.prefix, obs) + local numbytes = opts.sock:sendto(opts.host, opts.port, data) + if numbytes == nil then + log.error('Error while sending to host %s port %s data %s', + opts.host, opts.port, data) + end + end + end +end - while true do - metrics.invoke_callbacks() - for _, c in pairs(metrics.collectors()) do - for _, obs in ipairs(c:collect()) do - local data = graphite.format_observation(opts.prefix, obs) - local numbytes = opts.sock:sendto(opts.host, opts.port, data) - if numbytes == nil then - log.error('Error while sending to host %s port %s data %s', - opts.host, opts.port, data) - end +function graphite.format_output(output, opts) + local result = {} + for _, coll_obs in pairs(output) do + for group_name, obs_group in pairs(coll_obs.observations) do + local metric_name = string_utils.build_name(coll_obs.name, group_name) + for _, obs in pairs(obs_group) do + local formatted_obs = graphite.format_observation(opts.prefix, + { + metric_name = metric_name, + label_pairs = obs.label_pairs, + timestamp = coll_obs.timestamp, + value = obs.value + }) + table.insert(result, formatted_obs) end end + end + + return result +end + +function graphite.send_formatted(formatted_output, opts) + for _, data in ipairs(formatted_output) do + local numbytes = opts.sock:sendto(opts.host, opts.port, data) + if numbytes == nil then + log.error('Error while sending to host %s port %s data %s', + opts.host, opts.port, data) + end + end +end + +function graphite.internal.collect_and_push_v2(opts) + local output = metrics.collect{invoke_callbacks = true, extended_format = true} + local formatted_output = graphite.format_output(output, opts) + graphite.send_formatted(formatted_output, opts) +end +local function graphite_worker(opts) + fiber.name('metrics_graphite_worker') + + while true do + graphite.internal.collect_and_push_v2(opts) fiber.sleep(opts.send_interval) end end diff --git a/test/plugins/graphite_test.lua b/test/plugins/graphite_test.lua index 8891dc06..136ece06 100755 --- a/test/plugins/graphite_test.lua +++ b/test/plugins/graphite_test.lua @@ -153,3 +153,69 @@ g.test_graphite_kills_previous_fibers_on_init = function() fiber.yield() -- let cancelled fibers disappear from fiber.info() t.assert_equals(count_workers(), 1) end + +g.test_collect_and_push_preseves_format = function(group) + -- Prepare some data for all collector types. + metrics.cfg{include = 'all', exclude = {}, labels = {alias = 'router-3'}} + + local c = metrics.counter('cnt', nil, {my_useful_info = 'here'}) + c:inc(3, {mylabel = 'myvalue1'}) + c:inc(2, {mylabel = 'myvalue2'}) + + c = metrics.gauge('gauge', nil, {my_useful_info = 'here'}) + c:set(3, {mylabel = 'myvalue1'}) + c:set(2, {mylabel = 'myvalue2'}) + + c = metrics.histogram('histogram', nil, {2, 4}, {my_useful_info = 'here'}) + c:observe(3, {mylabel = 'myvalue1'}) + c:observe(2, {mylabel = 'myvalue2'}) + + local port_v1 = 22003 + group.sock_v1 = socket('AF_INET', 'SOCK_DGRAM', 'udp') + group.sock_v1:bind('127.0.0.1', port_v1) + local port_v2 = 22004 + group.sock_v2 = socket('AF_INET', 'SOCK_DGRAM', 'udp') + group.sock_v2:bind('127.0.0.1', port_v2) + + graphite.internal.collect_and_push_v1({ + prefix = 'tarantool', + host = '127.0.0.1', + port = port_v1, + sock = group.sock_v1, + }) + graphite.internal.collect_and_push_v2({ + prefix = 'tarantool', + host = '127.0.0.1', + port = port_v2, + sock = group.sock_v2, + }) + + local output_v2 = '' + while true do + local output_v2_part = group.sock_v2:recvfrom(200) + if output_v2_part == nil or output_v2_part == '' then + break + end + + output_v2 = output_v2 .. ' ' .. output_v2_part + end + + while true do + local output_v1_part = group.sock_v1:recvfrom(200) + if output_v1_part == nil or output_v1_part == '' then + break + end + + t.assert_str_contains(output_v2, output_v1_part:split(' ')[1]) + end +end + +g.after_test('test_collect_and_push_preseves_format', function(group) + if group.sock_v1 then + group.sock_v1:close() + end + + if group.sock_v2 then + group.sock_v2:close() + end +end)