diff --git a/cmake/windows-setup.cmake b/cmake/windows-setup.cmake index 3ba520559c9..c7015b3c0e8 100644 --- a/cmake/windows-setup.cmake +++ b/cmake/windows-setup.cmake @@ -51,7 +51,7 @@ set(FLB_OUT_DATADOG Yes) set(FLB_OUT_ES Yes) set(FLB_OUT_EXIT No) set(FLB_OUT_FORWARD Yes) -set(FLB_OUT_GELF No) +set(FLB_OUT_GELF Yes) set(FLB_OUT_HTTP Yes) set(FLB_OUT_INFLUXDB Yes) set(FLB_OUT_NATS No) diff --git a/plugins/out_gelf/gelf.c b/plugins/out_gelf/gelf.c index b3b7e7fd773..f965d03b29d 100644 --- a/plugins/out_gelf/gelf.c +++ b/plugins/out_gelf/gelf.c @@ -75,6 +75,51 @@ * server nodes omit this field automatically. */ +/* + * Generate a unique message ID. The upper 48-bit is milliseconds + * since the Epoch, the lower 16-bit is a random nonce. + */ +static uint64_t message_id(void) +{ + uint64_t now; + uint16_t nonce; + struct flb_time tm; + + if (flb_time_get(&tm) != -1) { + now = (uint64_t) tm.tm.tv_sec * 1000 + tm.tm.tv_nsec / 1000000; + } + else { + now = (uint64_t) time(NULL) * 1000; + } + nonce = (uint16_t) rand(); + + return (now << 16) | nonce; +} + +/* + * A GELF header is 12 bytes in size. It has the following + * structure: + * + * +---+---+---+---+---+---+---+---+---+---+---+---+ + * | MAGIC | MESSAGE ID |SEQ|NUM| + * +---+---+---+---+---+---+---+---+---+---+---+---+ + * + * NUM is the total number of packets to send. SEQ is the + * unique sequence number for each packet (zero-indexed). + */ +#define GELF_MAGIC "\x1e\x0f" +#define GELF_HEADER_SIZE 12 + +static void init_chunk_header(uint8_t *buf, int count) +{ + uint64_t msgid = message_id(); + + memcpy(buf, GELF_MAGIC, 2); + memcpy(buf + 2, &msgid, 8); + buf[10] = 0; + buf[11] = count; +} + /* * Chunked GELF * Prepend the following structure to your GELF message to make it chunked: @@ -89,66 +134,45 @@ * already arrived and still arriving chunks. * A message MUST NOT consist of more than 128 chunks. */ - static int gelf_send_udp_chunked(struct flb_out_gelf_config *ctx, void *msg, size_t msg_size) { int ret; - uint8_t header[12]; uint8_t n; size_t chunks; size_t offset; - struct flb_time tm; - uint64_t messageid; - struct msghdr msghdr; - struct iovec iov[2]; + size_t len; + uint8_t *buf = (uint8_t *) ctx->pckt_buf; chunks = msg_size / ctx->pckt_size; - if ((msg_size % ctx->pckt_size) != 0) + if (msg_size % ctx->pckt_size != 0) { chunks++; + } if (chunks > 128) { - flb_plg_error(ctx->ins, "message too big: %zd bytes, too many chunks", - msg_size); + flb_plg_error(ctx->ins, "message too big: %zd bytes", msg_size); return -1; } - flb_time_get(&tm); - - messageid = ((uint64_t)(tm.tm.tv_nsec*1000000 + tm.tm.tv_nsec) << 32) | - (uint64_t)rand_r(&(ctx->seed)); - - header[0] = 0x1e; - header[1] = 0x0f; - memcpy (header+2, &messageid, 8); - header[10] = chunks; - - iov[0].iov_base = header; - iov[0].iov_len = 12; - - memset(&msghdr, 0, sizeof(struct msghdr)); - msghdr.msg_iov = iov; - msghdr.msg_iovlen = 2; + init_chunk_header(buf, chunks); offset = 0; for (n = 0; n < chunks; n++) { - header[11] = n; + buf[10] = n; - iov[1].iov_base = msg + offset; - if ((msg_size - offset) < ctx->pckt_size) { - iov[1].iov_len = msg_size - offset; - } - else { - iov[1].iov_len = ctx->pckt_size; + len = msg_size - offset; + if (ctx->pckt_size < len) { + len = ctx->pckt_size; } + memcpy(buf + GELF_HEADER_SIZE, (char *) msg + offset, len); - ret = sendmsg(ctx->fd, &msghdr, MSG_DONTWAIT | MSG_NOSIGNAL); + ret = send(ctx->fd, buf, len + GELF_HEADER_SIZE, + MSG_DONTWAIT | MSG_NOSIGNAL); if (ret == -1) { flb_errno(); } offset += ctx->pckt_size; } - return 0; } @@ -399,14 +423,23 @@ static int cb_gelf_init(struct flb_output_instance *ins, struct flb_config *conf } close(fd); } + srand(ctx->seed); ctx->fd = -1; + ctx->pckt_buf = NULL; + if (ctx->mode == FLB_GELF_UDP) { ctx->fd = flb_net_udp_connect(ins->host.name, ins->host.port); if (ctx->fd < 0) { flb_free(ctx); return -1; } + ctx->pckt_buf = flb_malloc(GELF_HEADER_SIZE + ctx->pckt_size); + if (ctx->pckt_buf == NULL) { + flb_socket_close(ctx->fd); + flb_free(ctx); + return -1; + } } else { int io_flags = FLB_IO_TCP; @@ -449,6 +482,7 @@ static int cb_gelf_exit(void *data, struct flb_config *config) flb_sds_destroy(ctx->fields.full_message_key); flb_sds_destroy(ctx->fields.level_key); + flb_free(ctx->pckt_buf); flb_free(ctx); return 0; diff --git a/plugins/out_gelf/gelf.h b/plugins/out_gelf/gelf.h index a7aef89bdd7..c8ef4150b9d 100644 --- a/plugins/out_gelf/gelf.h +++ b/plugins/out_gelf/gelf.h @@ -35,6 +35,7 @@ struct flb_out_gelf_config { flb_sockfd_t fd; int pckt_size; + char *pckt_buf; int compress; unsigned int seed;