Skip to content

Commit

Permalink
out_gelf: port the plugin to Windows (#2574)
Browse files Browse the repository at this point in the history
With this patch, the GELF Output plugin can be compiled and linked
on Windows.

This commit also fixes a few bugs in GELF plugin:

 * The message header was not constructed properly. In particular,
   11-12th bytes were filled in the reversed order.

 * The message id generation was bogus (e.g. it did "tv_nsec*1000000
   + tm.tm.tv_nsec" to generate a timestamp)

Fix these glitches and add detailed documentation to each function.

Signed-off-by: Fujimoto Seiji <[email protected]>
  • Loading branch information
fujimotos authored and edsiper committed Sep 24, 2020
1 parent 3626f63 commit c8c6932
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 35 deletions.
2 changes: 1 addition & 1 deletion cmake/windows-setup.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ set(FLB_OUT_DATADOG Yes)
set(FLB_OUT_ES Yes)
set(FLB_OUT_EXIT No)
set(FLB_OUT_FORWARD Yes)
set(FLB_OUT_GELF No)
set(FLB_OUT_GELF Yes)
set(FLB_OUT_HTTP Yes)
set(FLB_OUT_INFLUXDB Yes)
set(FLB_OUT_NATS No)
Expand Down
102 changes: 68 additions & 34 deletions plugins/out_gelf/gelf.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,51 @@
* server nodes omit this field automatically.
*/

/*
* Generate a unique message ID. The upper 48-bit is milliseconds
* since the Epoch, the lower 16-bit is a random nonce.
*/
static uint64_t message_id(void)
{
uint64_t now;
uint16_t nonce;
struct flb_time tm;

if (flb_time_get(&tm) != -1) {
now = (uint64_t) tm.tm.tv_sec * 1000 + tm.tm.tv_nsec / 1000000;
}
else {
now = (uint64_t) time(NULL) * 1000;
}
nonce = (uint16_t) rand();

return (now << 16) | nonce;
}

/*
* A GELF header is 12 bytes in size. It has the following
* structure:
*
* +---+---+---+---+---+---+---+---+---+---+---+---+
* | MAGIC | MESSAGE ID |SEQ|NUM|
* +---+---+---+---+---+---+---+---+---+---+---+---+
*
* NUM is the total number of packets to send. SEQ is the
* unique sequence number for each packet (zero-indexed).
*/
#define GELF_MAGIC "\x1e\x0f"
#define GELF_HEADER_SIZE 12

static void init_chunk_header(uint8_t *buf, int count)
{
uint64_t msgid = message_id();

memcpy(buf, GELF_MAGIC, 2);
memcpy(buf + 2, &msgid, 8);
buf[10] = 0;
buf[11] = count;
}

/*
* Chunked GELF
* Prepend the following structure to your GELF message to make it chunked:
Expand All @@ -89,66 +134,45 @@
* already arrived and still arriving chunks.
* A message MUST NOT consist of more than 128 chunks.
*/

static int gelf_send_udp_chunked(struct flb_out_gelf_config *ctx, void *msg,
size_t msg_size)
{
int ret;
uint8_t header[12];
uint8_t n;
size_t chunks;
size_t offset;
struct flb_time tm;
uint64_t messageid;
struct msghdr msghdr;
struct iovec iov[2];
size_t len;
uint8_t *buf = (uint8_t *) ctx->pckt_buf;

chunks = msg_size / ctx->pckt_size;
if ((msg_size % ctx->pckt_size) != 0)
if (msg_size % ctx->pckt_size != 0) {
chunks++;
}

if (chunks > 128) {
flb_plg_error(ctx->ins, "message too big: %zd bytes, too many chunks",
msg_size);
flb_plg_error(ctx->ins, "message too big: %zd bytes", msg_size);
return -1;
}

flb_time_get(&tm);

messageid = ((uint64_t)(tm.tm.tv_nsec*1000000 + tm.tm.tv_nsec) << 32) |
(uint64_t)rand_r(&(ctx->seed));

header[0] = 0x1e;
header[1] = 0x0f;
memcpy (header+2, &messageid, 8);
header[10] = chunks;

iov[0].iov_base = header;
iov[0].iov_len = 12;

memset(&msghdr, 0, sizeof(struct msghdr));
msghdr.msg_iov = iov;
msghdr.msg_iovlen = 2;
init_chunk_header(buf, chunks);

offset = 0;
for (n = 0; n < chunks; n++) {
header[11] = n;
buf[10] = n;

iov[1].iov_base = msg + offset;
if ((msg_size - offset) < ctx->pckt_size) {
iov[1].iov_len = msg_size - offset;
}
else {
iov[1].iov_len = ctx->pckt_size;
len = msg_size - offset;
if (ctx->pckt_size < len) {
len = ctx->pckt_size;
}
memcpy(buf + GELF_HEADER_SIZE, (char *) msg + offset, len);

ret = sendmsg(ctx->fd, &msghdr, MSG_DONTWAIT | MSG_NOSIGNAL);
ret = send(ctx->fd, buf, len + GELF_HEADER_SIZE,
MSG_DONTWAIT | MSG_NOSIGNAL);
if (ret == -1) {
flb_errno();
}
offset += ctx->pckt_size;
}

return 0;
}

Expand Down Expand Up @@ -399,14 +423,23 @@ static int cb_gelf_init(struct flb_output_instance *ins, struct flb_config *conf
}
close(fd);
}
srand(ctx->seed);

ctx->fd = -1;
ctx->pckt_buf = NULL;

if (ctx->mode == FLB_GELF_UDP) {
ctx->fd = flb_net_udp_connect(ins->host.name, ins->host.port);
if (ctx->fd < 0) {
flb_free(ctx);
return -1;
}
ctx->pckt_buf = flb_malloc(GELF_HEADER_SIZE + ctx->pckt_size);
if (ctx->pckt_buf == NULL) {
flb_socket_close(ctx->fd);
flb_free(ctx);
return -1;
}
}
else {
int io_flags = FLB_IO_TCP;
Expand Down Expand Up @@ -449,6 +482,7 @@ static int cb_gelf_exit(void *data, struct flb_config *config)
flb_sds_destroy(ctx->fields.full_message_key);
flb_sds_destroy(ctx->fields.level_key);

flb_free(ctx->pckt_buf);
flb_free(ctx);

return 0;
Expand Down
1 change: 1 addition & 0 deletions plugins/out_gelf/gelf.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ struct flb_out_gelf_config {
flb_sockfd_t fd;

int pckt_size;
char *pckt_buf;
int compress;
unsigned int seed;

Expand Down

0 comments on commit c8c6932

Please sign in to comment.