diff --git a/CMakeLists.txt b/CMakeLists.txt index e12ccb9ea6a..e8ecf62e408 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -199,6 +199,7 @@ option(FLB_IN_EMITTER "Enable emitter input plugin" option(FLB_IN_NODE_EXPORTER_METRICS "Enable node exporter metrics input plugin" Yes) option(FLB_IN_WINDOWS_EXPORTER_METRICS "Enable windows exporter metrics input plugin" Yes) option(FLB_IN_OPENTELEMETRY "Enable OpenTelemetry input plugin" Yes) +option(FLB_IN_ELASTICSEARCH "Enable Elasticsearch (Bulk API) input plugin" Yes) option(FLB_OUT_AZURE "Enable Azure output plugin" Yes) option(FLB_OUT_AZURE_BLOB "Enable Azure output plugin" Yes) option(FLB_OUT_AZURE_KUSTO "Enable Azure Kusto output plugin" Yes) diff --git a/cmake/windows-setup.cmake b/cmake/windows-setup.cmake index ca0303a664b..27136be1c96 100644 --- a/cmake/windows-setup.cmake +++ b/cmake/windows-setup.cmake @@ -55,6 +55,7 @@ if(FLB_WINDOWS_DEFAULTS) set(FLB_IN_STATSD Yes) set(FLB_IN_STORAGE_BACKLOG Yes) set(FLB_IN_EMITTER Yes) + set(FLB_IN_ELASTICSEARCH Yes) # OUTPUT plugins # ============== diff --git a/plugins/CMakeLists.txt b/plugins/CMakeLists.txt index 584a9c8c2c6..9e2fac3493b 100644 --- a/plugins/CMakeLists.txt +++ b/plugins/CMakeLists.txt @@ -184,6 +184,7 @@ REGISTER_IN_PLUGIN("in_http") REGISTER_IN_PLUGIN("in_collectd") REGISTER_IN_PLUGIN("in_statsd") REGISTER_IN_PLUGIN("in_opentelemetry") +REGISTER_IN_PLUGIN("in_elasticsearch") # Test the event loop messaging when used in threaded mode REGISTER_IN_PLUGIN("in_event_test") diff --git a/plugins/in_elasticsearch/CMakeLists.txt b/plugins/in_elasticsearch/CMakeLists.txt new file mode 100644 index 00000000000..50a472f6afb --- /dev/null +++ b/plugins/in_elasticsearch/CMakeLists.txt @@ -0,0 +1,12 @@ +if(NOT FLB_METRICS) + message(FATAL_ERROR "Elasticsearch input plugin requires FLB_HTTP_SERVER=On.") +endif() + +set(src + in_elasticsearch.c + in_elasticsearch_config.c + in_elasticsearch_bulk_conn.c + in_elasticsearch_bulk_prot.c + ) + +FLB_PLUGIN(in_elasticsearch "${src}" "") diff --git a/plugins/in_elasticsearch/in_elasticsearch.c b/plugins/in_elasticsearch/in_elasticsearch.c new file mode 100644 index 00000000000..319d480e83e --- /dev/null +++ b/plugins/in_elasticsearch/in_elasticsearch.c @@ -0,0 +1,237 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2023 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include +#include +#include +#include + +#include "in_elasticsearch.h" +#include "in_elasticsearch_config.h" +#include "in_elasticsearch_bulk_conn.h" + +/* + * For a server event, the collection event means a new client have arrived, we + * accept the connection and create a new TCP instance which will wait for + * JSON map messages. + */ +static int in_elasticsearch_bulk_collect(struct flb_input_instance *ins, + struct flb_config *config, void *in_context) +{ + struct flb_connection *connection; + struct in_elasticsearch_bulk_conn *conn; + struct flb_in_elasticsearch *ctx; + + ctx = in_context; + + connection = flb_downstream_conn_get(ctx->downstream); + + if (connection == NULL) { + flb_plg_error(ctx->ins, "could not accept new connection"); + + return -1; + } + + flb_plg_trace(ctx->ins, "new TCP connection arrived FD=%i", + connection->fd); + + conn = in_elasticsearch_bulk_conn_add(connection, ctx); + + if (conn == NULL) { + flb_downstream_conn_release(connection); + + return -1; + } + + return 0; +} + +static void bytes_to_groupname(unsigned char *data, char *buf, size_t len) { + int index; + char charset[] = "0123456789" + "abcdefghijklmnopqrstuvwxyz" + "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; + + while (len-- > 0) { + index = (int) data[len]; + index = index % (sizeof(charset) - 1); + buf[len] = charset[index]; + } +} + +static void bytes_to_nodename(unsigned char *data, char *buf, size_t len) { + int index; + char charset[] = "0123456789" + "abcdefghijklmnopqrstuvwxyz"; + + while (len-- > 0) { + index = (int) data[len]; + index = index % (sizeof(charset) - 1); + buf[len] = charset[index]; + } +} + +static int in_elasticsearch_bulk_init(struct flb_input_instance *ins, + struct flb_config *config, void *data) +{ + unsigned short int port; + int ret; + struct flb_in_elasticsearch *ctx; + unsigned char rand[16]; + + (void) data; + + /* Create context and basic conf */ + ctx = in_elasticsearch_config_create(ins); + if (!ctx) { + return -1; + } + + ctx->collector_id = -1; + + /* Populate context with config map defaults and incoming properties */ + ret = flb_input_config_map_set(ins, (void *) ctx); + if (ret == -1) { + flb_plg_error(ctx->ins, "configuration error"); + in_elasticsearch_config_destroy(ctx); + return -1; + } + + /* Set the context */ + flb_input_set_context(ins, ctx); + + ctx->evl = config->evl; + + port = (unsigned short int) strtoul(ctx->tcp_port, NULL, 10); + + if (flb_random_bytes(rand, 16)) { + flb_plg_error(ctx->ins, "cannot generate cluster name"); + return -1; + } + + bytes_to_groupname(rand, ctx->cluster_name, 16); + + if (flb_random_bytes(rand, 12)) { + flb_plg_error(ctx->ins, "cannot generate node name"); + return -1; + } + + bytes_to_nodename(rand, ctx->node_name, 12); + + ctx->downstream = flb_downstream_create(FLB_TRANSPORT_TCP, + ins->flags, + ctx->listen, + port, + ins->tls, + config, + &ins->net_setup); + + if (ctx->downstream == NULL) { + flb_plg_error(ctx->ins, + "could not initialize downstream on %s:%s. Aborting", + ctx->listen, ctx->tcp_port); + + in_elasticsearch_config_destroy(ctx); + + return -1; + } + + /* Collect upon data available on the standard input */ + ret = flb_input_set_collector_socket(ins, + in_elasticsearch_bulk_collect, + ctx->downstream->server_fd, + config); + if (ret == -1) { + flb_plg_error(ctx->ins, "Could not set collector for IN_TCP input plugin"); + in_elasticsearch_config_destroy(ctx); + + return -1; + } + + ctx->collector_id = ret; + + return 0; +} + +static int in_elasticsearch_bulk_exit(void *data, struct flb_config *config) +{ + struct flb_in_elasticsearch *ctx; + + (void) config; + + ctx = data; + + if (ctx != NULL) { + in_elasticsearch_config_destroy(ctx); + } + + return 0; +} + +/* Configuration properties map */ +static struct flb_config_map config_map[] = { + { + FLB_CONFIG_MAP_SIZE, "buffer_max_size", HTTP_BUFFER_MAX_SIZE, + 0, FLB_TRUE, offsetof(struct flb_in_elasticsearch, buffer_max_size), + "Set the maximum size of buffer" + }, + + { + FLB_CONFIG_MAP_SIZE, "buffer_chunk_size", HTTP_BUFFER_CHUNK_SIZE, + 0, FLB_TRUE, offsetof(struct flb_in_elasticsearch, buffer_chunk_size), + "Set the buffer chunk size" + }, + + { + FLB_CONFIG_MAP_STR, "tag_key", NULL, + 0, FLB_TRUE, offsetof(struct flb_in_elasticsearch, tag_key), + "Specify a key name for extracting as a tag" + }, + + { + FLB_CONFIG_MAP_STR, "meta_key", "@meta", + 0, FLB_TRUE, offsetof(struct flb_in_elasticsearch, meta_key), + "Specify a key name for meta information" + }, + + { + FLB_CONFIG_MAP_STR, "hostname", "localhost", + 0, FLB_TRUE, offsetof(struct flb_in_elasticsearch, hostname), + "Specify hostname or FQDN. This parameter is effective for sniffering node information." + }, + + /* EOF */ + {0} +}; + +/* Plugin reference */ +struct flb_input_plugin in_elasticsearch_plugin = { + .name = "elasticsearch", + .description = "HTTP Endpoints for Elasticsearch (Bulk API)", + .cb_init = in_elasticsearch_bulk_init, + .cb_pre_run = NULL, + .cb_collect = in_elasticsearch_bulk_collect, + .cb_flush_buf = NULL, + .cb_pause = NULL, + .cb_resume = NULL, + .cb_exit = in_elasticsearch_bulk_exit, + .config_map = config_map, + .flags = FLB_INPUT_NET_SERVER | FLB_IO_OPT_TLS +}; diff --git a/plugins/in_elasticsearch/in_elasticsearch.h b/plugins/in_elasticsearch/in_elasticsearch.h new file mode 100644 index 00000000000..faf435f8d78 --- /dev/null +++ b/plugins/in_elasticsearch/in_elasticsearch.h @@ -0,0 +1,56 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2023 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_IN_ELASTICSEARCH_H +#define FLB_IN_ELASTICSEARCH_H + +#include +#include +#include +#include + +#include + +#define HTTP_BUFFER_MAX_SIZE "4M" +#define HTTP_BUFFER_CHUNK_SIZE "512K" + +struct flb_in_elasticsearch { + flb_sds_t listen; + flb_sds_t tcp_port; + const char *tag_key; + const char *meta_key; + flb_sds_t hostname; + char cluster_name[16]; + char node_name[12]; + + int collector_id; + + size_t buffer_max_size; /* Maximum buffer size */ + size_t buffer_chunk_size; /* Chunk allocation size */ + + struct flb_downstream *downstream; /* Client manager */ + struct mk_list connections; /* linked list of connections */ + struct mk_event_loop *evl; /* Event loop context */ + + struct mk_server *server; + struct flb_input_instance *ins; +}; + + +#endif diff --git a/plugins/in_elasticsearch/in_elasticsearch_bulk_conn.c b/plugins/in_elasticsearch/in_elasticsearch_bulk_conn.c new file mode 100644 index 00000000000..5f9a4827dd2 --- /dev/null +++ b/plugins/in_elasticsearch/in_elasticsearch_bulk_conn.c @@ -0,0 +1,306 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2023 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include "in_elasticsearch.h" +#include "in_elasticsearch_bulk_conn.h" +#include "in_elasticsearch_bulk_prot.h" + +static void in_elasticsearch_bulk_conn_request_init(struct mk_http_session *session, + struct mk_http_request *request); + +static int in_elasticsearch_bulk_conn_event(void *data) +{ + int status; + size_t size; + ssize_t available; + ssize_t bytes; + char *tmp; + char *request_end; + size_t request_len; + struct flb_connection *connection; + struct in_elasticsearch_bulk_conn *conn; + struct mk_event *event; + struct flb_in_elasticsearch *ctx; + + connection = (struct flb_connection *) data; + + conn = connection->user_data; + + ctx = conn->ctx; + + event = &connection->event; + + if (event->mask & MK_EVENT_READ) { + available = (conn->buf_size - conn->buf_len) - 1; + if (available < 1) { + if (conn->buf_size + ctx->buffer_chunk_size > ctx->buffer_max_size) { + flb_plg_trace(ctx->ins, + "fd=%i incoming data exceed limit (%zu KB)", + event->fd, (ctx->buffer_max_size / 1024)); + in_elasticsearch_bulk_conn_del(conn); + return -1; + } + + size = conn->buf_size + ctx->buffer_chunk_size; + tmp = flb_realloc(conn->buf_data, size); + if (!tmp) { + flb_errno(); + return -1; + } + flb_plg_trace(ctx->ins, "fd=%i buffer realloc %i -> %i", + event->fd, conn->buf_size, size); + + conn->buf_data = tmp; + conn->buf_size = size; + available = (conn->buf_size - conn->buf_len) - 1; + } + + /* Read data */ + bytes = flb_io_net_read(connection, + (void *) &conn->buf_data[conn->buf_len], + available); + + if (bytes <= 0) { + flb_plg_trace(ctx->ins, "fd=%i closed connection", event->fd); + in_elasticsearch_bulk_conn_del(conn); + return -1; + } + + flb_plg_trace(ctx->ins, "read()=%i pre_len=%i now_len=%i", + bytes, conn->buf_len, conn->buf_len + bytes); + conn->buf_len += bytes; + conn->buf_data[conn->buf_len] = '\0'; + + status = mk_http_parser(&conn->request, &conn->session.parser, + conn->buf_data, conn->buf_len, conn->session.server); + + if (status == MK_HTTP_PARSER_OK) { + /* Do more logic parsing and checks for this request */ + in_elasticsearch_bulk_prot_handle(ctx, conn, &conn->session, &conn->request); + + /* Evict the processed request from the connection buffer and reinitialize + * the HTTP parser. + */ + + request_end = NULL; + + if (NULL != conn->request.data.data) { + request_end = &conn->request.data.data[conn->request.data.len]; + } + else { + request_end = strstr(conn->buf_data, "\r\n\r\n"); + + if(NULL != request_end) { + request_end = &request_end[4]; + } + } + + if (NULL != request_end) { + request_len = (size_t)(request_end - conn->buf_data); + + if (0 < (conn->buf_len - request_len)) { + memmove(conn->buf_data, &conn->buf_data[request_len], + conn->buf_len - request_len); + + conn->buf_data[conn->buf_len - request_len] = '\0'; + conn->buf_len -= request_len; + } + else { + memset(conn->buf_data, 0, request_len); + + conn->buf_len = 0; + } + + /* Reinitialize the parser so the next request is properly + * handled, the additional memset intends to wipe any left over data + * from the headers parsed in the previous request. + */ + memset(&conn->session.parser, 0, sizeof(struct mk_http_parser)); + mk_http_parser_init(&conn->session.parser); + in_elasticsearch_bulk_conn_request_init(&conn->session, &conn->request); + } + } + else if (status == MK_HTTP_PARSER_ERROR) { + in_elasticsearch_bulk_prot_handle_error(ctx, conn, &conn->session, &conn->request); + + /* Reinitialize the parser so the next request is properly + * handled, the additional memset intends to wipe any left over data + * from the headers parsed in the previous request. + */ + memset(&conn->session.parser, 0, sizeof(struct mk_http_parser)); + mk_http_parser_init(&conn->session.parser); + in_elasticsearch_bulk_conn_request_init(&conn->session, &conn->request); + } + + /* FIXME: add Protocol handler here */ + return bytes; + } + + if (event->mask & MK_EVENT_CLOSE) { + flb_plg_trace(ctx->ins, "fd=%i hangup", event->fd); + in_elasticsearch_bulk_conn_del(conn); + return -1; + } + + return 0; + +} + +static void in_elasticsearch_bulk_conn_session_init(struct mk_http_session *session, + struct mk_server *server, + int client_fd) +{ + /* Alloc memory for node */ + session->_sched_init = MK_TRUE; + session->pipelined = MK_FALSE; + session->counter_connections = 0; + session->close_now = MK_FALSE; + session->status = MK_REQUEST_STATUS_INCOMPLETE; + session->server = server; + session->socket = client_fd; + + /* creation time in unix time */ + session->init_time = time(NULL); + + session->channel = mk_channel_new(MK_CHANNEL_SOCKET, session->socket); + session->channel->io = session->server->network; + + /* Init session request list */ + mk_list_init(&session->request_list); + + /* Initialize the parser */ + mk_http_parser_init(&session->parser); +} + +static void in_elasticsearch_bulk_conn_request_init(struct mk_http_session *session, + struct mk_http_request *request) +{ + memset(request, 0, sizeof(struct mk_http_request)); + + mk_http_request_init(session, request, session->server); + + request->in_headers.type = MK_STREAM_IOV; + request->in_headers.dynamic = MK_FALSE; + request->in_headers.cb_consumed = NULL; + request->in_headers.cb_finished = NULL; + request->in_headers.stream = &request->stream; + + mk_list_add(&request->in_headers._head, &request->stream.inputs); + + request->session = session; +} + +struct in_elasticsearch_bulk_conn *in_elasticsearch_bulk_conn_add(struct flb_connection *connection, + struct flb_in_elasticsearch *ctx) +{ + struct in_elasticsearch_bulk_conn *conn; + int ret; + + conn = flb_calloc(1, sizeof(struct in_elasticsearch_bulk_conn)); + if (!conn) { + flb_errno(); + return NULL; + } + + conn->connection = connection; + + /* Set data for the event-loop */ + MK_EVENT_NEW(&connection->event); + + connection->user_data = conn; + connection->event.type = FLB_ENGINE_EV_CUSTOM; + connection->event.handler = in_elasticsearch_bulk_conn_event; + + /* Connection info */ + conn->ctx = ctx; + conn->buf_len = 0; + + conn->buf_data = flb_malloc(ctx->buffer_chunk_size); + if (!conn->buf_data) { + flb_errno(); + + flb_plg_error(ctx->ins, "could not allocate new connection"); + flb_free(conn); + + return NULL; + } + conn->buf_size = ctx->buffer_chunk_size; + + /* Register instance into the event loop */ + ret = mk_event_add(ctx->evl, + connection->fd, + FLB_ENGINE_EV_CUSTOM, + MK_EVENT_READ, + &connection->event); + if (ret == -1) { + flb_plg_error(ctx->ins, "could not register new connection"); + + flb_free(conn->buf_data); + flb_free(conn); + + return NULL; + } + + /* Initialize HTTP Session: this is a custom context for Monkey HTTP */ + in_elasticsearch_bulk_conn_session_init(&conn->session, ctx->server, conn->connection->fd); + + /* Initialize HTTP Request: this is the initial request and it will be reinitialized + * automatically after the request is handled so it can be used for the next one. + */ + in_elasticsearch_bulk_conn_request_init(&conn->session, &conn->request); + + /* Link connection node to parent context list */ + mk_list_add(&conn->_head, &ctx->connections); + + return conn; +} + +int in_elasticsearch_bulk_conn_del(struct in_elasticsearch_bulk_conn *conn) +{ + if (conn->session.channel != NULL) { + mk_channel_release(conn->session.channel); + } + + /* The downstream unregisters the file descriptor from the event-loop + * so there's nothing to be done by the plugin + */ + flb_downstream_conn_release(conn->connection); + + mk_list_del(&conn->_head); + + flb_free(conn->buf_data); + flb_free(conn); + + return 0; +} + +void in_elasticsearch_bulk_conn_release_all(struct flb_in_elasticsearch *ctx) +{ + struct mk_list *tmp; + struct mk_list *head; + struct in_elasticsearch_bulk_conn *conn; + + mk_list_foreach_safe(head, tmp, &ctx->connections) { + conn = mk_list_entry(head, struct in_elasticsearch_bulk_conn, _head); + in_elasticsearch_bulk_conn_del(conn); + } +} diff --git a/plugins/in_elasticsearch/in_elasticsearch_bulk_conn.h b/plugins/in_elasticsearch/in_elasticsearch_bulk_conn.h new file mode 100644 index 00000000000..a5a7593ac67 --- /dev/null +++ b/plugins/in_elasticsearch/in_elasticsearch_bulk_conn.h @@ -0,0 +1,55 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2023 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_IN_ELASTICSEARCH_BULK_CONN +#define FLB_IN_ELASTICSEARCH_BULK_CONN + +#include +#include + +#include +#include +#include + +struct in_elasticsearch_bulk_conn { + /* Buffer */ + char *buf_data; /* Buffer data */ + int buf_len; /* Data length */ + int buf_size; /* Buffer size */ + + /* + * Parser context: we only held one parser per connection + * which is re-used everytime we have a new request. + */ + struct mk_http_parser parser; + struct mk_http_request request; + struct mk_http_session session; + struct flb_connection *connection; + + void *ctx; /* Plugin parent context */ + struct mk_list _head; /* link to flb_es_bulk->connections */ +}; + +struct in_elasticsearch_bulk_conn *in_elasticsearch_bulk_conn_add(struct flb_connection *connection, + struct flb_in_elasticsearch *ctx); +int in_elasticsearch_bulk_conn_del(struct in_elasticsearch_bulk_conn *conn); +void in_elasticsearch_bulk_conn_release_all(struct flb_in_elasticsearch *ctx); + + +#endif diff --git a/plugins/in_elasticsearch/in_elasticsearch_bulk_prot.c b/plugins/in_elasticsearch/in_elasticsearch_bulk_prot.c new file mode 100644 index 00000000000..4e308170965 --- /dev/null +++ b/plugins/in_elasticsearch/in_elasticsearch_bulk_prot.c @@ -0,0 +1,846 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2023 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include +#include + +#include "in_elasticsearch.h" +#include "in_elasticsearch_bulk_conn.h" +#include "in_elasticsearch_bulk_prot.h" + +#define HTTP_CONTENT_JSON 0 +#define HTTP_CONTENT_NDJSON 1 + +static int send_empty_response(struct in_elasticsearch_bulk_conn *conn, int http_status) +{ + size_t sent; + flb_sds_t out; + + out = flb_sds_create_size(256); + if (!out) { + return -1; + } + + if (http_status == 200) { + flb_sds_printf(&out, + "HTTP/1.1 200 OK\r\n" + "Content-Type: application/json\r\n\r\n"); + } + + /* We should check this operations result */ + flb_io_net_write(conn->connection, + (void *) out, + flb_sds_len(out), + &sent); + + flb_sds_destroy(out); + + return 0; +} + +static int send_json_message_response(struct in_elasticsearch_bulk_conn *conn, int http_status, char *message) +{ + size_t sent; + int len; + flb_sds_t out; + + out = flb_sds_create_size(256); + if (!out) { + return -1; + } + + if (message) { + len = strlen(message); + } + else { + len = 0; + } + + if (http_status == 200) { + flb_sds_printf(&out, + "HTTP/1.1 200 OK\r\n" + "Content-Type: application/json\r\n" + "Content-Length: %i\r\n\r\n%s", + len, message); + } + + /* We should check this operations result */ + flb_io_net_write(conn->connection, + (void *) out, + flb_sds_len(out), + &sent); + + flb_sds_destroy(out); + + return 0; +} + +static int send_dummy_sniffer_response(struct in_elasticsearch_bulk_conn *conn, int http_status, + struct flb_in_elasticsearch *ctx) +{ + size_t sent; + int len; + flb_sds_t out; + flb_sds_t resp; + flb_sds_t hostname; + + if (ctx->hostname != NULL) { + hostname = ctx->hostname; + } + else { + hostname = "localhost"; + } + + out = flb_sds_create_size(384); + if (!out) { + return -1; + } + + resp = flb_sds_create_size(384); + if (!resp) { + return -1; + } + + flb_sds_printf(&resp, + ES_NODES_TEMPLATE, + ctx->cluster_name, ctx->node_name, + hostname, ctx->tcp_port, ctx->buffer_max_size); + + len = flb_sds_len(resp) ; + + if (http_status == 200) { + flb_sds_printf(&out, + "HTTP/1.1 200 OK\r\n" + "Content-Type: application/json\r\n" + "Content-Length: %i\r\n\r\n%s", + len, resp); + } + + /* We should check this operations result */ + flb_io_net_write(conn->connection, + (void *) out, + flb_sds_len(out), + &sent); + + flb_sds_destroy(resp); + flb_sds_destroy(out); + + return 0; +} + +static int send_response(struct in_elasticsearch_bulk_conn *conn, int http_status, char *message) +{ + size_t sent; + int len; + flb_sds_t out; + + out = flb_sds_create_size(256); + if (!out) { + return -1; + } + + if (message) { + len = strlen(message); + } + else { + len = 0; + } + + if (http_status == 200) { + flb_sds_printf(&out, + "HTTP/1.1 200 OK\r\n" + "Server: Fluent Bit v%s\r\n" + "Content-Type: application/json\r\n" + "Content-Length: %i\r\n\r\n%s", + FLB_VERSION_STR, + len, message); + } + else if (http_status == 400) { + flb_sds_printf(&out, + "HTTP/1.1 400 Forbidden\r\n" + "Server: Fluent Bit v%s\r\n" + "Content-Length: %i\r\n\r\n%s", + FLB_VERSION_STR, + len, message); + } + + /* We should check this operations result */ + flb_io_net_write(conn->connection, + (void *) out, + flb_sds_len(out), + &sent); + + flb_sds_destroy(out); + + return 0; +} + +/* implements functionality to get tag from key in record */ +static flb_sds_t tag_key(struct flb_in_elasticsearch *ctx, msgpack_object *map) +{ + size_t map_size = map->via.map.size; + msgpack_object_kv *kv; + msgpack_object key; + msgpack_object val; + char *key_str = NULL; + char *val_str = NULL; + size_t key_str_size = 0; + size_t val_str_size = 0; + int j; + int check = FLB_FALSE; + int found = FLB_FALSE; + flb_sds_t tag; + + kv = map->via.map.ptr; + + for(j=0; j < map_size; j++) { + check = FLB_FALSE; + found = FLB_FALSE; + key = (kv+j)->key; + if (key.type == MSGPACK_OBJECT_BIN) { + key_str = (char *) key.via.bin.ptr; + key_str_size = key.via.bin.size; + check = FLB_TRUE; + } + if (key.type == MSGPACK_OBJECT_STR) { + key_str = (char *) key.via.str.ptr; + key_str_size = key.via.str.size; + check = FLB_TRUE; + } + + if (check == FLB_TRUE) { + if (strncmp(ctx->tag_key, key_str, key_str_size) == 0) { + val = (kv+j)->val; + if (val.type == MSGPACK_OBJECT_BIN) { + val_str = (char *) val.via.bin.ptr; + val_str_size = val.via.str.size; + found = FLB_TRUE; + break; + } + if (val.type == MSGPACK_OBJECT_STR) { + val_str = (char *) val.via.str.ptr; + val_str_size = val.via.str.size; + found = FLB_TRUE; + break; + } + } + } + } + + if (found == FLB_TRUE) { + tag = flb_sds_create_len(val_str, val_str_size); + if (!tag) { + flb_errno(); + return NULL; + } + return tag; + } + + + flb_plg_error(ctx->ins, "Could not find tag_key %s in record", ctx->tag_key); + return NULL; +} + +static inline void map_pack_each(msgpack_packer *packer, + msgpack_object *map) +{ + int i; + msgpack_object *key; + + for (i = 0; i < map->via.map.size; i++) { + key = &map->via.map.ptr[i].key; + msgpack_pack_object(packer, *key); + msgpack_pack_object(packer, map->via.map.ptr[i].val); + } +} + +static int count_map_elements(struct flb_in_elasticsearch *ctx, char *buf, size_t size, int idx) +{ + msgpack_unpacked result; + int index = 0; + int map_num = 0; + msgpack_object *obj; + size_t off = 0; + + msgpack_unpacked_init(&result); + + /* Iterate each item to know map number */ + while (msgpack_unpack_next(&result, buf, size, &off) == MSGPACK_UNPACK_SUCCESS) { + if (idx >= index) { + index++; + continue; + } + + if (result.data.type == MSGPACK_OBJECT_MAP) { + obj = &result.data; + map_num = obj->via.map.size; + break; + } + else if (result.data.type == MSGPACK_OBJECT_ARRAY) { + obj = &result.data; + map_num = obj->via.array.size; + break; + } + } + msgpack_unpacked_destroy(&result); + + return map_num; +} + +static int get_write_op(struct flb_in_elasticsearch *ctx, msgpack_object *map, flb_sds_t *out_write_op, size_t *out_key_size) +{ + char *op_str = NULL; + size_t op_str_size = 0; + msgpack_object_kv *kv; + msgpack_object key; + int check = FLB_FALSE; + + kv = map->via.map.ptr; + key = kv[0].key; + if (key.type == MSGPACK_OBJECT_BIN) { + op_str = (char *) key.via.bin.ptr; + op_str_size = key.via.bin.size; + check = FLB_TRUE; + } + if (key.type == MSGPACK_OBJECT_STR) { + op_str = (char *) key.via.str.ptr; + op_str_size = key.via.str.size; + check = FLB_TRUE; + } + + if (check == FLB_TRUE) { + *out_write_op = flb_sds_create_len(op_str, op_str_size); + *out_key_size = op_str_size; + } + + return check; +} + +static int status_buffer_avail(struct flb_in_elasticsearch *ctx, flb_sds_t bulk_statuses, size_t threshold) +{ + if (flb_sds_avail(bulk_statuses) < threshold) { + flb_plg_warn(ctx->ins, "left buffer for bulk status(es) is too small"); + + return FLB_FALSE; + } + + return FLB_TRUE; +} + +static int process_ndpack(struct flb_in_elasticsearch *ctx, flb_sds_t tag, char *buf, size_t size, flb_sds_t bulk_statuses) +{ + size_t off = 0; + msgpack_sbuffer mp_sbuf; + msgpack_packer mp_pck; + msgpack_unpacked result; + struct flb_time tm; + msgpack_object *obj; + flb_sds_t tag_from_record = NULL; + int map_num = 0; + int idx = 0; + int cursor = 0; + flb_sds_t write_op; + size_t op_str_size = 0; + int op_ret = FLB_FALSE; + int error_op = FLB_FALSE; + + flb_time_get(&tm); + + msgpack_unpacked_init(&result); + while (msgpack_unpack_next(&result, buf, size, &off) == MSGPACK_UNPACK_SUCCESS) { + if (result.data.type == MSGPACK_OBJECT_MAP) { + if (idx > 0 && idx % 2 == 0) { + flb_sds_cat(bulk_statuses, ",", 1); + } + if (status_buffer_avail(ctx, bulk_statuses, 50) == FLB_FALSE) { + break; + } + if (idx % 2 == 0) { + msgpack_sbuffer_init(&mp_sbuf); + msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); + + op_ret = get_write_op(ctx, &result.data, &write_op, &op_str_size); + + if (op_ret) { + if (flb_sds_cmp(write_op, "index", op_str_size) == 0) { + flb_sds_cat(bulk_statuses, "{\"index\":", 9); + error_op = FLB_FALSE; + } + else if (flb_sds_cmp(write_op, "create", op_str_size) == 0) { + flb_sds_cat(bulk_statuses, "{\"create\":", 10); + error_op = FLB_FALSE; + } + else if (flb_sds_cmp(write_op, "update", op_str_size) == 0) { + flb_sds_cat(bulk_statuses, "{\"update\":", 10); + error_op = FLB_TRUE; + } + else if (flb_sds_cmp(write_op, "delete", op_str_size) == 0) { + flb_sds_cat(bulk_statuses, "{\"delete\":{\"status\":404,\"result\":\"not_found\"}}", 46); + error_op = FLB_TRUE; + idx += 1; /* Prepare to adjust to multiple of two + * in the end of the loop. + * Due to delete actions include only one line. */ + msgpack_sbuffer_destroy(&mp_sbuf); + flb_sds_destroy(write_op); + + goto proceed; + } + else { + flb_sds_cat(bulk_statuses, "{\"unknown\":{\"status\":400,\"result\":\"bad_request\"}}", 49); + error_op = FLB_TRUE; + + msgpack_sbuffer_destroy(&mp_sbuf); + flb_sds_destroy(write_op); + + break; + } + } else { + flb_sds_destroy(write_op); + flb_plg_error(ctx->ins, "meta information line is missing"); + error_op = FLB_TRUE; + + break; + } + + if (error_op == FLB_FALSE) { + msgpack_pack_array(&mp_pck, 2); + flb_time_append_to_msgpack(&tm, &mp_pck, 0); + + /* Prepare map for records */ + map_num = count_map_elements(ctx, buf, size, cursor); + msgpack_pack_map(&mp_pck, map_num + 1); + + /* Pack meta */ + msgpack_pack_str(&mp_pck, strlen(ctx->meta_key)); + msgpack_pack_str_body(&mp_pck, ctx->meta_key, strlen(ctx->meta_key)); + msgpack_pack_object(&mp_pck, result.data); + } + } + else if (idx % 2 == 1) { + if (error_op == FLB_FALSE) { + /* Pack body */ + map_pack_each(&mp_pck, &result.data); + + tag_from_record = NULL; + if (ctx->tag_key) { + obj = &result.data; + tag_from_record = tag_key(ctx, obj); + } + + if (tag_from_record) { + flb_input_log_append(ctx->ins, tag_from_record, flb_sds_len(tag_from_record), + mp_sbuf.data, mp_sbuf.size); + flb_sds_destroy(tag_from_record); + } + else if (tag) { + flb_input_log_append(ctx->ins, tag, flb_sds_len(tag), + mp_sbuf.data, mp_sbuf.size); + } + else { + /* use default plugin Tag (it internal name, e.g: http.0 */ + flb_input_log_append(ctx->ins, NULL, 0, mp_sbuf.data, mp_sbuf.size); + } + } + if (op_ret) { + if (flb_sds_cmp(write_op, "index", op_str_size) == 0) { + flb_sds_cat(bulk_statuses, "{\"status\":201,\"result\":\"created\"}}", 34); + } + else if (flb_sds_cmp(write_op, "create", op_str_size) == 0) { + flb_sds_cat(bulk_statuses, "{\"status\":201,\"result\":\"created\"}}", 34); + } + else if (flb_sds_cmp(write_op, "update", op_str_size) == 0) { + flb_sds_cat(bulk_statuses, "{\"status\":403,\"result\":\"forbidden\"}}", 36); + } + if (status_buffer_avail(ctx, bulk_statuses, 50) == FLB_FALSE) { + msgpack_sbuffer_destroy(&mp_sbuf); + flb_sds_destroy(write_op); + + break; + } + } + msgpack_sbuffer_destroy(&mp_sbuf); + flb_sds_destroy(write_op); + } + + proceed: + idx++; + cursor++; + } + else { + flb_plg_error(ctx->ins, "skip record from invalid type: %i", + result.data.type); + msgpack_unpacked_destroy(&result); + return -1; + } + } + + if (idx % 2 != 0) { + flb_plg_warn(ctx->ins, "decode payload of Bulk API is failed"); + msgpack_unpacked_destroy(&result); + if (error_op == FLB_FALSE) { + /* On lacking of body case in non-error case, there is no + * releasing memory code paths. We should proceed to do + * it here. */ + msgpack_sbuffer_destroy(&mp_sbuf); + flb_sds_destroy(write_op); + } + + return -1; + } + + msgpack_unpacked_destroy(&result); + + return 0; +} + +static ssize_t parse_payload_ndjson(struct flb_in_elasticsearch *ctx, flb_sds_t tag, + char *payload, size_t size, flb_sds_t bulk_statuses) +{ + int ret; + int out_size; + char *pack; + struct flb_pack_state pack_state; + + /* Initialize packer */ + flb_pack_state_init(&pack_state); + + /* Pack JSON as msgpack */ + ret = flb_pack_json_state(payload, size, + &pack, &out_size, &pack_state); + flb_pack_state_reset(&pack_state); + + /* Handle exceptions */ + if (ret == FLB_ERR_JSON_PART) { + flb_plg_warn(ctx->ins, "JSON data is incomplete, skipping"); + return -1; + } + else if (ret == FLB_ERR_JSON_INVAL) { + flb_plg_warn(ctx->ins, "invalid JSON message, skipping"); + return -1; + } + else if (ret == -1) { + return -1; + } + + /* Process the packaged JSON and return the last byte used */ + process_ndpack(ctx, tag, pack, out_size, bulk_statuses); + flb_free(pack); + + return 0; +} + +static int process_payload(struct flb_in_elasticsearch *ctx, struct in_elasticsearch_bulk_conn *conn, + flb_sds_t tag, + struct mk_http_session *session, + struct mk_http_request *request, + flb_sds_t bulk_statuses) +{ + int type = -1; + int i = 0; + int ret = 0; + struct mk_http_header *header; + int extra_size = -1; + struct mk_http_header *headers_extra; + int gzip_compressed = FLB_FALSE; + void *gz_data = NULL; + size_t gz_size = -1; + + header = &session->parser.headers[MK_HEADER_CONTENT_TYPE]; + if (header->key.data == NULL) { + send_response(conn, 400, "error: header 'Content-Type' is not set\n"); + return -1; + } + + if (header->val.len >= 20 && + strncasecmp(header->val.data, "application/x-ndjson", 20) == 0) { + type = HTTP_CONTENT_NDJSON; + } + + if (header->val.len >= 16 && + strncasecmp(header->val.data, "application/json", 16) == 0) { + type = HTTP_CONTENT_JSON; + } + + if (type == -1) { + send_response(conn, 400, "error: invalid 'Content-Type'\n"); + return -1; + } + + if (request->data.len <= 0) { + send_response(conn, 400, "error: no payload found\n"); + return -1; + } + + extra_size = session->parser.headers_extra_count; + if (extra_size > 0) { + for (i = 0; i < extra_size; i++) { + headers_extra = &session->parser.headers_extra[i]; + if (headers_extra->key.len == 16 && + strncasecmp(headers_extra->key.data, "Content-Encoding", 16) == 0) { + if (headers_extra->val.len == 4 && + strncasecmp(headers_extra->val.data, "gzip", 4) == 0) { + flb_debug("[elasticsearch_bulk_prot] body is gzipped"); + gzip_compressed = FLB_TRUE; + } + } + } + } + + if (type == HTTP_CONTENT_NDJSON || type == HTTP_CONTENT_JSON) { + if (gzip_compressed == FLB_TRUE) { + ret = flb_gzip_uncompress((void *) request->data.data, request->data.len, + &gz_data, &gz_size); + if (ret == -1) { + flb_error("[elasticsearch_bulk_prot] gzip uncompress is failed"); + return -1; + } + parse_payload_ndjson(ctx, tag, gz_data, gz_size, bulk_statuses); + flb_free(gz_data); + } + else { + parse_payload_ndjson(ctx, tag, request->data.data, request->data.len, bulk_statuses); + } + } + + return 0; +} + +static inline int mk_http_point_header(mk_ptr_t *h, + struct mk_http_parser *parser, int key) +{ + struct mk_http_header *header; + + header = &parser->headers[key]; + if (header->type == key) { + h->data = header->val.data; + h->len = header->val.len; + return 0; + } + else { + h->data = NULL; + h->len = -1; + } + + return -1; +} + +/* + * Handle an incoming request. It perform extra checks over the request, if + * everything is OK, it enqueue the incoming payload. + */ +int in_elasticsearch_bulk_prot_handle(struct flb_in_elasticsearch *ctx, + struct in_elasticsearch_bulk_conn *conn, + struct mk_http_session *session, + struct mk_http_request *request) +{ + int i; + int ret; + int len; + char *uri; + char *qs; + off_t diff; + flb_sds_t tag; + struct mk_http_header *header; + flb_sds_t bulk_statuses; + flb_sds_t bulk_response; + char *error_str = NULL; + + if (request->uri.data[0] != '/') { + send_response(conn, 400, "error: invalid request\n"); + return -1; + } + + /* Decode URI */ + uri = mk_utils_url_decode(request->uri); + if (!uri) { + uri = mk_mem_alloc_z(request->uri.len + 1); + if (!uri) { + return -1; + } + memcpy(uri, request->uri.data, request->uri.len); + uri[request->uri.len] = '\0'; + } + + /* Try to match a query string so we can remove it */ + qs = strchr(uri, '?'); + if (qs) { + /* remove the query string part */ + diff = qs - uri; + uri[diff] = '\0'; + } + + /* Compose the query string using the URI */ + len = strlen(uri); + + if (len == 1) { + tag = NULL; /* use default tag */ + } + else { + tag = flb_sds_create_size(len); + if (!tag) { + mk_mem_free(uri); + return -1; + } + + /* New tag skipping the URI '/' */ + flb_sds_cat(tag, uri + 1, len - 1); + + /* Sanitize, only allow alphanum chars */ + for (i = 0; i < flb_sds_len(tag); i++) { + if (!isalnum(tag[i]) && tag[i] != '_' && tag[i] != '.') { + tag[i] = '_'; + } + } + } + + /* Check if we have a Host header: Hostname ; port */ + mk_http_point_header(&request->host, &session->parser, MK_HEADER_HOST); + + /* Header: Connection */ + mk_http_point_header(&request->connection, &session->parser, + MK_HEADER_CONNECTION); + + /* HTTP/1.1 needs Host header */ + if (!request->host.data && request->protocol == MK_HTTP_PROTOCOL_11) { + flb_sds_destroy(tag); + return -1; + } + + /* Should we close the session after this request ? */ + mk_http_keepalive_check(session, request, ctx->server); + + /* Content Length */ + header = &session->parser.headers[MK_HEADER_CONTENT_LENGTH]; + if (header->type == MK_HEADER_CONTENT_LENGTH) { + request->_content_length.data = header->val.data; + request->_content_length.len = header->val.len; + } + else { + request->_content_length.data = NULL; + } + + if (request->method == MK_METHOD_HEAD) { + send_empty_response(conn, 200); + + flb_sds_destroy(tag); + mk_mem_free(uri); + + return 0; + } + + if (request->method == MK_METHOD_PUT) { + send_json_message_response(conn, 200, "{}"); + + flb_sds_destroy(tag); + mk_mem_free(uri); + + return 0; + } + + if (request->method == MK_METHOD_GET) { + if (strncmp(uri, "/_nodes/http", 12) == 0) { + send_dummy_sniffer_response(conn, 200, ctx); + } + else if (strlen(uri) == 1 && strncmp(uri, "/", 1) == 0) { + send_json_message_response(conn, 200, ES_VERSION_RESPONSE); + } + else { + send_json_message_response(conn, 200, "{}"); + } + + flb_sds_destroy(tag); + mk_mem_free(uri); + + return 0; + } + + if (request->method == MK_METHOD_POST) { + if (strncmp(uri, "/_bulk", 6) == 0) { + bulk_statuses = flb_sds_create_size(ctx->buffer_max_size); + if (!bulk_statuses) { + return -1; + } + + bulk_response = flb_sds_create_size(ctx->buffer_max_size); + if (!bulk_response) { + return -1; + } + } else { + flb_sds_destroy(tag); + mk_mem_free(uri); + + send_response(conn, 400, "error: invaild HTTP endpoint\n"); + + return -1; + } + } + + if (request->method != MK_METHOD_POST && + request->method != MK_METHOD_GET && + request->method != MK_METHOD_HEAD && + request->method != MK_METHOD_PUT) { + flb_sds_destroy(tag); + mk_mem_free(uri); + + send_response(conn, 400, "error: invalid HTTP method\n"); + return -1; + } + + ret = process_payload(ctx, conn, tag, session, request, bulk_statuses); + flb_sds_destroy(tag); + + len = flb_sds_len(bulk_statuses); + if (flb_sds_alloc(bulk_response) < len + 27) { + bulk_response = flb_sds_increase(bulk_response, len + 27 - flb_sds_alloc(bulk_response)); + } + error_str = strstr(bulk_statuses, "\"status\":40"); + if (error_str){ + flb_sds_cat(bulk_response, "{\"errors\":true,\"items\":[", 24); + } + else { + flb_sds_cat(bulk_response, "{\"errors\":false,\"items\":[", 25); + } + flb_sds_cat(bulk_response, bulk_statuses, flb_sds_len(bulk_statuses)); + flb_sds_cat(bulk_response, "]}", 2); + send_response(conn, 200, bulk_response); + + mk_mem_free(uri); + flb_sds_destroy(bulk_statuses); + flb_sds_destroy(bulk_response); + + return ret; +} + +/* + * Handle an incoming request which has resulted in an http parser error. + */ +int in_elasticsearch_bulk_prot_handle_error(struct flb_in_elasticsearch *ctx, + struct in_elasticsearch_bulk_conn *conn, + struct mk_http_session *session, + struct mk_http_request *request) +{ + send_response(conn, 400, "error: invalid request\n"); + return -1; +} diff --git a/plugins/in_elasticsearch/in_elasticsearch_bulk_prot.h b/plugins/in_elasticsearch/in_elasticsearch_bulk_prot.h new file mode 100644 index 00000000000..af8c2c22eed --- /dev/null +++ b/plugins/in_elasticsearch/in_elasticsearch_bulk_prot.h @@ -0,0 +1,39 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2023 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_IN_ELASTICSEARCH_BULK_PROT +#define FLB_IN_ELASTICSEARCH_BULK_PROT + +#define ES_VERSION_RESPONSE "{\"version\":{\"number\":\"8.0.0\",\"build_flavor\":\"Fluent Bit OSS\"},\"tagline\":\"Fluent Bit's Bulk API compatible endpoint\"}" + +#define ES_NODES_TEMPLATE "{\"_nodes\":{\"total\":1,\"successful\":1,\"failed\":0}," \ + "\"nodes\":{\"%s\":{\"name\":\"%s\",\"version\":\"8.0.0\"," \ + "\"http\":{\"publish_address\":\"%s:%s\",\"max_content_length_in_bytes\":%ld}}}}" + +int in_elasticsearch_bulk_prot_handle(struct flb_in_elasticsearch *ctx, + struct in_elasticsearch_bulk_conn *conn, + struct mk_http_session *session, + struct mk_http_request *request); + +int in_elasticsearch_bulk_prot_handle_error(struct flb_in_elasticsearch *ctx, + struct in_elasticsearch_bulk_conn *conn, + struct mk_http_session *session, + struct mk_http_request *request); + +#endif diff --git a/plugins/in_elasticsearch/in_elasticsearch_config.c b/plugins/in_elasticsearch/in_elasticsearch_config.c new file mode 100644 index 00000000000..fd603832d28 --- /dev/null +++ b/plugins/in_elasticsearch/in_elasticsearch_config.c @@ -0,0 +1,86 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2023 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "in_elasticsearch.h" +#include "in_elasticsearch_bulk_conn.h" + +struct flb_in_elasticsearch *in_elasticsearch_config_create(struct flb_input_instance *ins) +{ + int ret; + char port[8]; + struct flb_in_elasticsearch *ctx; + + ctx = flb_calloc(1, sizeof(struct flb_in_elasticsearch)); + if (!ctx) { + flb_errno(); + return NULL; + } + ctx->ins = ins; + mk_list_init(&ctx->connections); + + /* Load the config map */ + ret = flb_input_config_map_set(ins, (void *) ctx); + if (ret == -1) { + flb_free(ctx); + return NULL; + } + + /* Listen interface (if not set, defaults to 0.0.0.0:9200) */ + flb_input_net_default_listener("0.0.0.0", 9200, ins); + + ctx->listen = flb_strdup(ins->host.listen); + snprintf(port, sizeof(port) - 1, "%d", ins->host.port); + ctx->tcp_port = flb_strdup(port); + + /* HTTP Server specifics */ + ctx->server = flb_calloc(1, sizeof(struct mk_server)); + ctx->server->keep_alive = MK_TRUE; + + /* monkey detects server->workers == 0 as the server not being initialized at the + * moment so we want to make sure that it stays that way! + */ + + return ctx; +} + +int in_elasticsearch_config_destroy(struct flb_in_elasticsearch *ctx) +{ + /* release all connections */ + in_elasticsearch_bulk_conn_release_all(ctx); + + if (ctx->collector_id != -1) { + flb_input_collector_delete(ctx->collector_id, ctx->ins); + + ctx->collector_id = -1; + } + + if (ctx->downstream != NULL) { + flb_downstream_destroy(ctx->downstream); + } + + if (ctx->server) { + flb_free(ctx->server); + } + flb_free(ctx->listen); + flb_free(ctx->tcp_port); + flb_free(ctx); + return 0; +} diff --git a/plugins/in_elasticsearch/in_elasticsearch_config.h b/plugins/in_elasticsearch/in_elasticsearch_config.h new file mode 100644 index 00000000000..28108723d50 --- /dev/null +++ b/plugins/in_elasticsearch/in_elasticsearch_config.h @@ -0,0 +1,29 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2023 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_IN_ELASTICSEARCH_CONFIG_H +#define FLB_IN_ELASTICSEARCH_CONFIG_H + +#include +#include "in_elasticsearch.h" + +struct flb_in_elasticsearch *in_elasticsearch_config_create(struct flb_input_instance *ins); +int in_elasticsearch_config_destroy(struct flb_in_elasticsearch *ctx); + +#endif diff --git a/tests/runtime/CMakeLists.txt b/tests/runtime/CMakeLists.txt index f91b00303aa..8a7e432aa0b 100644 --- a/tests/runtime/CMakeLists.txt +++ b/tests/runtime/CMakeLists.txt @@ -43,6 +43,7 @@ if(FLB_OUT_LIB) FLB_RT_TEST(FLB_IN_HEAD "in_head.c") FLB_RT_TEST(FLB_IN_DUMMY "in_dummy.c") FLB_RT_TEST(FLB_IN_HTTP "in_http.c") + FLB_RT_TEST(FLB_IN_ELASTICSEARCH "in_elasticsearch.c") FLB_RT_TEST(FLB_IN_RANDOM "in_random.c") FLB_RT_TEST(FLB_IN_STATSD "in_statsd.c") FLB_RT_TEST(FLB_IN_SYSLOG "in_syslog.c") diff --git a/tests/runtime/data/in_elasticsearch/json_bulk.h b/tests/runtime/data/in_elasticsearch/json_bulk.h new file mode 100644 index 00000000000..ca8f751c087 --- /dev/null +++ b/tests/runtime/data/in_elasticsearch/json_bulk.h @@ -0,0 +1,25 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +#define NDJSON_BULK "{ \"index\" : { \"_index\" : \"test\", \"_id\" : \"1\" } }\n" \ + "{ \"field1\" : \"value1\" }\n" \ + "{ \"delete\" : { \"_index\" : \"test\", \"_id\" : \"2\" } }\n" \ + "{ \"create\" : { \"_index\" : \"test\", \"_id\" : \"3\" } }\n" \ + "{ \"field1\" : \"value3\", \"field2\" : \"value4\" }\n" \ + "{ \"update\" : {\"_id\" : \"1\", \"_index\" : \"test\"} }\n" \ + "{ \"doc\" : {\"field2\" : \"value2\"} }\n" \ + "{ \"index\" : { \"_index\" : \"test\", \"_id\" : \"10\" } }\n" \ + "{ \"field1\" : \"value1\", \"a\": \"line\", \"that\" : \"is\", \"long\": \"line\", \"contained\": \"request\" }\n" \ + "{ \"delete\" : { \"_index\" : \"test\", \"_id\" : \"20\" } }\n" \ + "{ \"create\" : { \"_index\" : \"test\", \"_id\" : \"30\" } }\n" \ + "{ \"field10\" : \"value30\", \"field20\" : \"value40\", \"message\": \"ok\" }\n" \ + "{ \"update\" : {\"_id\" : \"10\", \"_index\" : \"test\"} }\n" \ + "{ \"doc\" : {\"field20\" : \"value20\"} }\n" \ + "{ \"index\" : { \"_index\" : \"test\", \"_id\" : \"11\" } }\n" \ + "{ \"field11\" : \"value11\", \"nested\": {\"message\":\"ok\"} }\n" \ + "{ \"delete\" : { \"_index\" : \"test\", \"_id\" : \"21\" } }\n" \ + "{ \"create\" : { \"_index\" : \"test\", \"_id\" : \"31\" } }\n" \ + "{ \"field11\" : \"value31\", \"field21\" : \"value41\", \"nested\": { \"multiply\": {\"message\": \"ok\"}} }\n" \ + "{ \"update\" : {\"_id\" : \"11\", \"_index\" : \"test\"} }\n" \ + "{ \"doc\" : {\"field21\" : \"value21\"} }\n" \ + "{ \"index\" : { \"_index\" : \"test\", \"_id\" : \"41\" } }\n" \ + "{ \"field41\" : \"value41\", \"nested\": {\"message\": \"ok\"} }\n" diff --git a/tests/runtime/in_elasticsearch.c b/tests/runtime/in_elasticsearch.c new file mode 100644 index 00000000000..ecb9d8b10ca --- /dev/null +++ b/tests/runtime/in_elasticsearch.c @@ -0,0 +1,808 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2019-2023 The Fluent Bit Authors + * Copyright (C) 2015-2018 Treasure Data Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include "flb_tests_runtime.h" + +/* Test data */ +#include "data/in_elasticsearch/json_bulk.h" /* NDBULK_JSON */ + +#define NDJSON_CONTENT_TYPE "application/x-ndjson" + +struct in_elasticsearch_client_ctx { + struct flb_upstream *u; + struct flb_connection *u_conn; + struct flb_config *config; + struct mk_event_loop *evl; +}; + +struct test_ctx { + flb_ctx_t *flb; /* Fluent Bit library context */ + int i_ffd; /* Input fd */ + int f_ffd; /* Filter fd (unused) */ + int o_ffd; /* Output fd */ + struct in_elasticsearch_client_ctx *httpc; +}; + + +pthread_mutex_t result_mutex = PTHREAD_MUTEX_INITIALIZER; +int num_output = 0; +static int get_output_num() +{ + int ret; + pthread_mutex_lock(&result_mutex); + ret = num_output; + pthread_mutex_unlock(&result_mutex); + + return ret; +} + +static void set_output_num(int num) +{ + pthread_mutex_lock(&result_mutex); + num_output = num; + pthread_mutex_unlock(&result_mutex); +} + +static void clear_output_num() +{ + set_output_num(0); +} + +/* Callback to check expected results */ +static int cb_check_result_json(void *record, size_t size, void *data) +{ + char *p; + char *expected; + char *result; + int num = get_output_num(); + + set_output_num(num+1); + + expected = (char *) data; + result = (char *) record; + + p = strstr(result, expected); + TEST_CHECK(p != NULL); + + if (p==NULL) { + flb_error("Expected to find: '%s' in result '%s'", + expected, result); + } + /* + * If you want to debug your test + * + * printf("Expect: '%s' in result '%s'", expected, result); + */ + flb_free(record); + return 0; +} + +struct in_elasticsearch_client_ctx* in_elasticsearch_client_ctx_create(int port) +{ + struct in_elasticsearch_client_ctx *ret_ctx = NULL; + struct mk_event_loop *evl = NULL; + + ret_ctx = flb_calloc(1, sizeof(struct in_elasticsearch_client_ctx)); + if (!TEST_CHECK(ret_ctx != NULL)) { + flb_errno(); + TEST_MSG("flb_calloc(in_elasticsearch_client_ctx) failed"); + return NULL; + } + + evl = mk_event_loop_create(16); + if (!TEST_CHECK(evl != NULL)) { + TEST_MSG("mk_event_loop failed"); + flb_free(ret_ctx); + return NULL; + } + ret_ctx->evl = evl; + flb_engine_evl_init(); + flb_engine_evl_set(evl); + + ret_ctx->config = flb_config_init(); + if(!TEST_CHECK(ret_ctx->config != NULL)) { + TEST_MSG("flb_config_init failed"); + mk_event_loop_destroy(evl); + flb_free(ret_ctx); + return NULL; + } + + ret_ctx->u = flb_upstream_create(ret_ctx->config, "127.0.0.1", port, 0, NULL); + if (!TEST_CHECK(ret_ctx->u != NULL)) { + TEST_MSG("flb_upstream_create failed"); + flb_config_exit(ret_ctx->config); + mk_event_loop_destroy(evl); + flb_free(ret_ctx); + return NULL; + } + + ret_ctx->u_conn = flb_upstream_conn_get(ret_ctx->u); + TEST_CHECK(ret_ctx->u_conn != NULL); + + ret_ctx->u_conn->upstream = ret_ctx->u; + + return ret_ctx; +} + +static struct test_ctx *test_ctx_create(struct flb_lib_out_cb *data) +{ + int i_ffd; + int o_ffd; + struct test_ctx *ctx = NULL; + + ctx = flb_calloc(1, sizeof(struct test_ctx)); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("flb_calloc failed"); + flb_errno(); + return NULL; + } + + /* Service config */ + ctx->flb = flb_create(); + flb_service_set(ctx->flb, + "Flush", "0.200000000", + "Grace", "1", + "Log_Level", "error", + NULL); + + /* Input */ + i_ffd = flb_input(ctx->flb, (char *) "elasticsearch", NULL); + TEST_CHECK(i_ffd >= 0); + ctx->i_ffd = i_ffd; + + /* Output */ + o_ffd = flb_output(ctx->flb, (char *) "lib", (void *) data); + ctx->o_ffd = o_ffd; + + return ctx; +} + +int in_elasticsearch_client_ctx_destroy(struct in_elasticsearch_client_ctx* ctx) +{ + if (!TEST_CHECK(ctx != NULL)) { + return -1; + } + if (ctx->u) { + flb_upstream_destroy(ctx->u); + } + if (ctx->config) { + flb_config_exit(ctx->config); + } + if (ctx->evl) { + mk_event_loop_destroy(ctx->evl); + } + + flb_free(ctx); + return 0; +} + +static void test_ctx_destroy(struct test_ctx *ctx) +{ + TEST_CHECK(ctx != NULL); + if (ctx->httpc) { + in_elasticsearch_client_ctx_destroy(ctx->httpc); + } + + sleep(1); + flb_stop(ctx->flb); + flb_destroy(ctx->flb); + flb_free(ctx); +} + +void flb_test_in_elasticsearch_version() +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + struct flb_http_client *c; + int ret; + size_t b_sent; + char *expected = "\"version\":{\"number\":\"8.0.0\",\"build_flavor\""; + char *buf = NULL; + int port = 9201; + char sport[16]; + + snprintf(sport, 16, "%d", port); + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_input_set(ctx->flb, ctx->i_ffd, + "port", sport, + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "match", "*", + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + ctx->httpc = in_elasticsearch_client_ctx_create(port); + TEST_CHECK(ctx->httpc != NULL); + + c = flb_http_client(ctx->httpc->u_conn, FLB_HTTP_GET, "/", NULL, 0, + "127.0.0.1", port, NULL, 0); + if (!TEST_CHECK(c != NULL)) { + TEST_MSG("in_elasticsearch_client failed"); + exit(EXIT_FAILURE); + } + + ret = flb_http_do(c, &b_sent); + if (!TEST_CHECK(ret == 0)) { + TEST_MSG("ret error. ret=%d\n", ret); + } + else if (!TEST_CHECK(b_sent > 0)){ + TEST_MSG("b_sent size error. b_sent = %lu\n", b_sent); + } + else if (!TEST_CHECK(c->resp.status == 200)) { + TEST_MSG("http response code error. expect: 200, got: %d\n", c->resp.status); + } + + /* waiting to flush */ + flb_time_msleep(1500); + + buf = strstr(c->resp.payload, expected); + if (!TEST_CHECK(buf != NULL)) { + TEST_MSG("http request for version info failed"); + } + flb_http_client_destroy(c); + flb_upstream_conn_release(ctx->httpc->u_conn); + test_ctx_destroy(ctx); +} + +void flb_test_in_elasticsearch(char *write_op, int port) +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + struct flb_http_client *c; + int ret; + int num; + size_t b_sent; + char buf[64]; + char expected[64]; + char sport[16]; + + snprintf(buf, 64, "{\"%s\":{\"_index\":\"fluent-bit\",\"_id\":1}}\n{\"test\":\"msg\"}\n", write_op); + snprintf(expected, 64, "\"@meta\":{\"%s\":{\"_index\":\"fluent-bit\",\"_id\":1}},\"test\":\"msg\"", write_op); + + snprintf(sport, 16, "%d", port); + + clear_output_num(); + + cb_data.cb = cb_check_result_json; + cb_data.data = expected; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_input_set(ctx->flb, ctx->i_ffd, + "port", sport, + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "match", "*", + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + ctx->httpc = in_elasticsearch_client_ctx_create(port); + TEST_CHECK(ctx->httpc != NULL); + + c = flb_http_client(ctx->httpc->u_conn, FLB_HTTP_POST, "/_bulk", buf, strlen(buf), + "127.0.0.1", port, NULL, 0); + ret = flb_http_add_header(c, FLB_HTTP_HEADER_CONTENT_TYPE, strlen(FLB_HTTP_HEADER_CONTENT_TYPE), + NDJSON_CONTENT_TYPE, strlen(NDJSON_CONTENT_TYPE)); + TEST_CHECK(ret == 0); + if (!TEST_CHECK(c != NULL)) { + TEST_MSG("in_elasticsearch_client failed"); + exit(EXIT_FAILURE); + } + + ret = flb_http_do(c, &b_sent); + if (!TEST_CHECK(ret == 0)) { + TEST_MSG("ret error. ret=%d\n", ret); + } + else if (!TEST_CHECK(b_sent > 0)){ + TEST_MSG("b_sent size error. b_sent = %lu\n", b_sent); + } + else if (!TEST_CHECK(c->resp.status == 200)) { + TEST_MSG("http response code error. expect: 200, got: %d\n", c->resp.status); + } + + /* waiting to flush */ + flb_time_msleep(1500); + + num = get_output_num(); + if (!TEST_CHECK(num > 0)) { + TEST_MSG("no outputs"); + } + flb_http_client_destroy(c); + flb_upstream_conn_release(ctx->httpc->u_conn); + test_ctx_destroy(ctx); +} + +void flb_test_in_elasticsearch_index_op() +{ + flb_test_in_elasticsearch("index", 9202); +} + +void flb_test_in_elasticsearch_create_op() +{ + flb_test_in_elasticsearch("create", 9203); +} + +void flb_test_in_elasticsearch_invalid(char *write_op, int status, char *expected_op, int port) +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + struct flb_http_client *c; + int ret; + int num; + size_t b_sent; + char buf[64]; + char expected[64]; + char *ret_buf = NULL; + char sport[16]; + + snprintf(buf, 64, "{\"%s\":{\"_index\":\"fluent-bit\",\"_id\":1}}\n{\"test\":\"msg\"}\n", write_op); + snprintf(expected, 64, "{\"%s\":{\"status\":%d", expected_op, status); + + snprintf(sport, 16, "%d", port); + + clear_output_num(); + + cb_data.cb = NULL; + cb_data.data = NULL; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_input_set(ctx->flb, ctx->i_ffd, + "port", sport, + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "match", "*", + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + ctx->httpc = in_elasticsearch_client_ctx_create(port); + TEST_CHECK(ctx->httpc != NULL); + + c = flb_http_client(ctx->httpc->u_conn, FLB_HTTP_POST, "/_bulk", buf, strlen(buf), + "127.0.0.1", port, NULL, 0); + ret = flb_http_add_header(c, FLB_HTTP_HEADER_CONTENT_TYPE, strlen(FLB_HTTP_HEADER_CONTENT_TYPE), + NDJSON_CONTENT_TYPE, strlen(NDJSON_CONTENT_TYPE)); + TEST_CHECK(ret == 0); + if (!TEST_CHECK(c != NULL)) { + TEST_MSG("in_elasticsearch_client failed"); + exit(EXIT_FAILURE); + } + + ret = flb_http_do(c, &b_sent); + if (!TEST_CHECK(ret == 0)) { + TEST_MSG("ret error. ret=%d\n", ret); + } + else if (!TEST_CHECK(b_sent > 0)){ + TEST_MSG("b_sent size error. b_sent = %lu\n", b_sent); + } + else if (!TEST_CHECK(c->resp.status == 200)) { + TEST_MSG("http response code error. expect: 200, got: %d\n", c->resp.status); + } + + /* waiting to flush */ + flb_time_msleep(1500); + + num = get_output_num(); + if (!TEST_CHECK(num == 0)) { + TEST_MSG("invalid ingested requests"); + } + ret_buf = strstr(c->resp.payload, expected); + if (!TEST_CHECK(ret_buf != NULL)) { + TEST_MSG("http request for bulk failed"); + } + + flb_http_client_destroy(c); + flb_upstream_conn_release(ctx->httpc->u_conn); + test_ctx_destroy(ctx); +} + +void flb_test_in_elasticsearch_update_op() +{ + flb_test_in_elasticsearch_invalid("update", 403, "update", 9204); +} + +void flb_test_in_elasticsearch_delete_op() +{ + flb_test_in_elasticsearch_invalid("delete", 404, "delete", 9205); +} + +void flb_test_in_elasticsearch_nonexistent_op() +{ + flb_test_in_elasticsearch_invalid("nonexistent", 400, "unknown", 9206); +} + +void flb_test_in_elasticsearch_multi_ops() +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + struct flb_http_client *c; + int ret; + int num; + int port = 9207; + char sport[16]; + size_t b_sent; + char *buf = NDJSON_BULK; + char *expected = ":{\"_index\":\"test\",\"_id\":"; + char *ret_buf = NULL; + char *ret_expected = "{\"errors\":true,\"items\":["; + + snprintf(sport, 16, "%d", port); + + clear_output_num(); + + cb_data.cb = cb_check_result_json; + cb_data.data = expected; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_input_set(ctx->flb, ctx->i_ffd, + "port", sport, + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "match", "*", + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + ctx->httpc = in_elasticsearch_client_ctx_create(port); + TEST_CHECK(ctx->httpc != NULL); + + c = flb_http_client(ctx->httpc->u_conn, FLB_HTTP_POST, "/_bulk", buf, strlen(buf), + "127.0.0.1", port, NULL, 0); + ret = flb_http_add_header(c, FLB_HTTP_HEADER_CONTENT_TYPE, strlen(FLB_HTTP_HEADER_CONTENT_TYPE), + NDJSON_CONTENT_TYPE, strlen(NDJSON_CONTENT_TYPE)); + TEST_CHECK(ret == 0); + if (!TEST_CHECK(c != NULL)) { + TEST_MSG("in_elasticsearch_client failed"); + exit(EXIT_FAILURE); + } + + ret = flb_http_do(c, &b_sent); + if (!TEST_CHECK(ret == 0)) { + TEST_MSG("ret error. ret=%d\n", ret); + } + else if (!TEST_CHECK(b_sent > 0)){ + TEST_MSG("b_sent size error. b_sent = %lu\n", b_sent); + } + else if (!TEST_CHECK(c->resp.status == 200)) { + TEST_MSG("http response code error. expect: 200, got: %d\n", c->resp.status); + } + + /* waiting to flush */ + flb_time_msleep(1500); + + num = get_output_num(); + if (!TEST_CHECK(num > 0)) { + TEST_MSG("no outputs"); + } + ret_buf = strstr(c->resp.payload, ret_expected); + if (!TEST_CHECK(ret_buf != NULL)) { + TEST_MSG("bulk request for multi write ops failed"); + } + flb_http_client_destroy(c); + flb_upstream_conn_release(ctx->httpc->u_conn); + test_ctx_destroy(ctx); +} + +void flb_test_in_elasticsearch_multi_ops_gzip() +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + struct flb_http_client *c; + int ret; + int num; + int port = 9208; + char sport[16]; + size_t b_sent; + char *buf = NDJSON_BULK; + char *expected = ":{\"_index\":\"test\",\"_id\":"; + char *ret_buf = NULL; + char *ret_expected = "{\"errors\":true,\"items\":["; + void *final_data; + size_t final_bytes; + + snprintf(sport, 16, "%d", port); + + clear_output_num(); + + cb_data.cb = cb_check_result_json; + cb_data.data = expected; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_input_set(ctx->flb, ctx->i_ffd, + "port", sport, + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "match", "*", + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + ctx->httpc = in_elasticsearch_client_ctx_create(port); + TEST_CHECK(ctx->httpc != NULL); + + ret = flb_gzip_compress((void *) buf, strlen(buf), &final_data, &final_bytes); + TEST_CHECK(ret != -1); + + c = flb_http_client(ctx->httpc->u_conn, FLB_HTTP_POST, "/_bulk", final_data, final_bytes, + "127.0.0.1", port, NULL, 0); + ret = flb_http_add_header(c, FLB_HTTP_HEADER_CONTENT_TYPE, strlen(FLB_HTTP_HEADER_CONTENT_TYPE), + NDJSON_CONTENT_TYPE, strlen(NDJSON_CONTENT_TYPE)); + TEST_CHECK(ret == 0); + /* Add Content-Encoding: gzip */ + ret = flb_http_add_header(c, "Content-Encoding", 16, "gzip", 4); + TEST_CHECK(ret == 0); + + if (!TEST_CHECK(c != NULL)) { + TEST_MSG("in_elasticsearch_client failed"); + exit(EXIT_FAILURE); + } + + ret = flb_http_do(c, &b_sent); + if (!TEST_CHECK(ret == 0)) { + TEST_MSG("ret error. ret=%d\n", ret); + } + else if (!TEST_CHECK(b_sent > 0)){ + TEST_MSG("b_sent size error. b_sent = %lu\n", b_sent); + } + else if (!TEST_CHECK(c->resp.status == 200)) { + TEST_MSG("http response code error. expect: 200, got: %d\n", c->resp.status); + } + flb_free(final_data); + + /* waiting to flush */ + flb_time_msleep(1500); + + num = get_output_num(); + if (!TEST_CHECK(num > 0)) { + TEST_MSG("no outputs"); + } + ret_buf = strstr(c->resp.payload, ret_expected); + if (!TEST_CHECK(ret_buf != NULL)) { + TEST_MSG("bulk request for multi write ops failed"); + } + flb_http_client_destroy(c); + flb_upstream_conn_release(ctx->httpc->u_conn); + test_ctx_destroy(ctx); +} + +void flb_test_in_elasticsearch_node_info() +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + struct flb_http_client *c; + int ret; + int port = 9208; + char sport[16]; + size_t b_sent; + char *expected = "{\"_nodes\":{\"total\":1,\"successful\":1,\"failed\":0},\"nodes\":{\""; + char *buf = NULL; + + snprintf(sport, 16, "%d", port); + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_input_set(ctx->flb, ctx->i_ffd, + "port", sport, + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "match", "*", + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + ctx->httpc = in_elasticsearch_client_ctx_create(port); + TEST_CHECK(ctx->httpc != NULL); + + c = flb_http_client(ctx->httpc->u_conn, FLB_HTTP_GET, "/_nodes/http", NULL, 0, + "127.0.0.1", port, NULL, 0); + if (!TEST_CHECK(c != NULL)) { + TEST_MSG("in_elasticsearch_client failed"); + exit(EXIT_FAILURE); + } + + ret = flb_http_do(c, &b_sent); + if (!TEST_CHECK(ret == 0)) { + TEST_MSG("ret error. ret=%d\n", ret); + } + else if (!TEST_CHECK(b_sent > 0)){ + TEST_MSG("b_sent size error. b_sent = %lu\n", b_sent); + } + else if (!TEST_CHECK(c->resp.status == 200)) { + TEST_MSG("http response code error. expect: 200, got: %d\n", c->resp.status); + } + + /* waiting to flush */ + flb_time_msleep(1500); + + buf = strstr(c->resp.payload, expected); + if (!TEST_CHECK(buf != NULL)) { + TEST_MSG("http request for version info failed"); + } + flb_http_client_destroy(c); + flb_upstream_conn_release(ctx->httpc->u_conn); + test_ctx_destroy(ctx); +} + +void flb_test_in_elasticsearch_tag_key() +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + struct flb_http_client *c; + int ret; + int num; + size_t b_sent; + int port = 9209; + char sport[16]; + + char *buf = "{\"index\":{\"_index\":\"fluent-bit\"}}\n{\"test\":\"msg\",\"tag\":\"new_tag\"}\n"; + + snprintf(sport, 16, "%d", port); + + clear_output_num(); + + cb_data.cb = cb_check_result_json; + cb_data.data = "\"@meta\":{\"index\":{\"_index\":\"fluent-bit\"}},\"test\":\"msg\",\"tag\":\"new_tag\""; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_input_set(ctx->flb, ctx->i_ffd, + "port", sport, + "tag_key", "tag", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "match", "new_tag", + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + ctx->httpc = in_elasticsearch_client_ctx_create(port); + TEST_CHECK(ctx->httpc != NULL); + + c = flb_http_client(ctx->httpc->u_conn, FLB_HTTP_POST, "/_bulk", buf, strlen(buf), + "127.0.0.1", port, NULL, 0); + ret = flb_http_add_header(c, FLB_HTTP_HEADER_CONTENT_TYPE, strlen(FLB_HTTP_HEADER_CONTENT_TYPE), + NDJSON_CONTENT_TYPE, strlen(NDJSON_CONTENT_TYPE)); + TEST_CHECK(ret == 0); + if (!TEST_CHECK(c != NULL)) { + TEST_MSG("in_elasticsearch_client failed"); + exit(EXIT_FAILURE); + } + + ret = flb_http_do(c, &b_sent); + if (!TEST_CHECK(ret == 0)) { + TEST_MSG("ret error. ret=%d\n", ret); + } + else if (!TEST_CHECK(b_sent > 0)){ + TEST_MSG("b_sent size error. b_sent = %lu\n", b_sent); + } + else if (!TEST_CHECK(c->resp.status == 200)) { + TEST_MSG("http response code error. expect: 200, got: %d\n", c->resp.status); + } + + /* waiting to flush */ + flb_time_msleep(1500); + + num = get_output_num(); + if (!TEST_CHECK(num > 0)) { + TEST_MSG("no outputs"); + } + flb_http_client_destroy(c); + flb_upstream_conn_release(ctx->httpc->u_conn); + test_ctx_destroy(ctx); +} + +TEST_LIST = { + {"version", flb_test_in_elasticsearch_version}, + {"index_op", flb_test_in_elasticsearch_index_op}, + {"create_op", flb_test_in_elasticsearch_create_op}, + {"update_op", flb_test_in_elasticsearch_update_op}, + {"delete_op", flb_test_in_elasticsearch_delete_op}, + {"nonexistent_op", flb_test_in_elasticsearch_nonexistent_op}, + {"multi_ops", flb_test_in_elasticsearch_multi_ops}, + {"multi_ops_gzip", flb_test_in_elasticsearch_multi_ops_gzip}, + {"node_info", flb_test_in_elasticsearch_node_info}, + {"tag_key", flb_test_in_elasticsearch_tag_key}, + {NULL, NULL} +};