forked from marksteele/collectd-amqp-opentsdb-gateway
-
Notifications
You must be signed in to change notification settings - Fork 0
/
amqp_consumer_opentsdb.pl
executable file
·80 lines (73 loc) · 2.38 KB
/
amqp_consumer_opentsdb.pl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
#!/usr/bin/perl
## TODO: error checking around dead socket....
use strict;
use Data::Dumper;
use Net::RabbitMQ;
use IO::Socket;
use Getopt::Long;
use JSON;
my %options;
$options{'amqp_host'} = '127.0.0.1';
$options{'amqp_port'} = 5672;
$options{'amqp_user'} = 'amqpuser';
$options{'amqp_password'} = 'amqppass';
$options{'amqp_vhost'} = '/';
$options{'amqp_exchange'} = 'stats';
$options{'amqp_queue'} = 'consumerqueue';
$options{'opentsdb_host'} = '127.0.0.1';
$options{'opentsdb_port'} = 4242;
$options{'compress'} = 0;
$options{'debug'} = 0;
$options{'input_format'} = 'json';
GetOptions ("var=s" => \%options);
if ($options{'compress'}) {
use Compress::Snappy;
}
my $sock = IO::Socket::INET->new(PeerAddr => $options{'opentsdb_host'}, PeerPort => $options{'opentsdb_port'}, Proto => 'tcp', Timeout => 1, Blocking => 0);
$sock->sockopt(SO_KEEPALIVE);
die("Unable to connect") unless $sock->connected();
my $mq = Net::RabbitMQ->new();
$mq->connect($options{'amqp_host'} , { port => $options{'amqp_port'}, user => $options{'amqp_user'}, password => $options{'amqp_password'}, vhost => $options{'amqp_vhost'} });
$mq->channel_open(1);
$mq->queue_declare(1, $options{'amqp_queue'});
$mq->queue_bind(1, $options{'amqp_queue'}, $options{'amqp_exchange'}, '#');
$mq->consume(1,$options{'amqp_queue'});
while(1) {
my $msg = $mq->recv();
if ($msg) {
if ($options{'compress'}) {
$msg->{'body'} = decompress($msg->{'body'});
}
foreach my $line (split(/\n/, $msg->{'body'})) {
my $metric = {};
eval {
$metric = decode_json($line);
};
if ($@) {
print "Error parsing input data: $line\n";
next;
}
my $output = sprintf("put %s %d %s", $metric->{'plugin'},$metric->{'time'},$metric->{'value'});
delete($metric->{'plugin'});
delete($metric->{'time'});
delete($metric->{'value'});
foreach my $k (keys %{$metric}) {
my $tk = $k;
$tk =~ s/[ =]//g;
$metric->{$k} =~ s/[ =]//g;
$output .= ' ' . $tk . '=' . $metric->{$k};
}
$output .= "\n";
## Should be using select to see if the socket is ready.... Ah well....
my $len = length($output);
my $written = 0;
while ($written != $len) {
$written += syswrite($sock,$output,$len - $written, $written);
}
}
} else {
sleep(1);
}
## Drain inbound data from socket
sysread($sock,my $blarh,8192);
}