From d2696edc5ed0bec7ed1e781c4f738dfda6b02331 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Fri, 8 Jan 2021 10:21:08 -0600 Subject: [PATCH] in_http: add HTTP input plugin (#18) Signed-off-by: Eduardo Silva --- CMakeLists.txt | 2 +- plugins/in_http/CMakeLists.txt | 55 +--- plugins/in_http/conf/in_http.conf.in | 18 - plugins/in_http/conf/mimetypes.conf.in | 145 --------- plugins/in_http/conf/sites/default.in | 3 - plugins/in_http/http.c | 147 +++++++++ plugins/in_http/{in_http.h => http.h} | 25 +- plugins/in_http/http_config.c | 72 ++++ plugins/in_http/http_config.h | 30 ++ plugins/in_http/http_conn.c | 205 ++++++++++++ plugins/in_http/http_conn.h | 54 +++ plugins/in_http/http_prot.c | 308 ++++++++++++++++++ .../{in_http_info.h.in => http_prot.h} | 9 +- plugins/in_http/in_http.c | 73 ----- 14 files changed, 844 insertions(+), 302 deletions(-) delete mode 100644 plugins/in_http/conf/in_http.conf.in delete mode 100644 plugins/in_http/conf/mimetypes.conf.in delete mode 100644 plugins/in_http/conf/sites/default.in create mode 100644 plugins/in_http/http.c rename plugins/in_http/{in_http.h => http.h} (64%) create mode 100644 plugins/in_http/http_config.c create mode 100644 plugins/in_http/http_config.h create mode 100644 plugins/in_http/http_conn.c create mode 100644 plugins/in_http/http_conn.h create mode 100644 plugins/in_http/http_prot.c rename plugins/in_http/{in_http_info.h.in => http_prot.h} (72%) delete mode 100644 plugins/in_http/in_http.c diff --git a/CMakeLists.txt b/CMakeLists.txt index d067d18802c..318470fa844 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -126,7 +126,7 @@ option(FLB_IN_DOCKER_EVENTS "Enable Docker events input plugin" Yes) option(FLB_IN_EXEC "Enable Exec input plugin" Yes) option(FLB_IN_FORWARD "Enable Forward input plugin" Yes) option(FLB_IN_HEALTH "Enable Health input plugin" Yes) -option(FLB_IN_HTTP "Enable HTTP input plugin" No) +option(FLB_IN_HTTP "Enable HTTP input plugin" Yes) option(FLB_IN_MEM "Enable Memory input plugin" Yes) option(FLB_IN_KMSG "Enable Kernel log input plugin" Yes) option(FLB_IN_LIB "Enable library mode input plugin" Yes) diff --git a/plugins/in_http/CMakeLists.txt b/plugins/in_http/CMakeLists.txt index f6a62921332..c51ad9573ec 100644 --- a/plugins/in_http/CMakeLists.txt +++ b/plugins/in_http/CMakeLists.txt @@ -1,53 +1,8 @@ set(src - in_http.c) - -FLB_PLUGIN(in_http "${src}" "") -target_link_libraries(flb-plugin-in_http - monkey-core-static - ${CMAKE_DL_LIBS}) - -# Configuration files -# =================== -# Default values for conf/in_http.conf - -set(MK_CONF_LISTEN "8080") -set(MK_CONF_WORKERS "1") -set(MK_CONF_TIMEOUT "15") -set(MK_CONF_INDEXFILE "index.html index.htm index.php") -set(MK_CONF_HIDEVERSION "Off") -set(MK_CONF_RESUME "On") -set(MK_CONF_KA "On") -set(MK_CONF_KA_TIMEOUT "5") -set(MK_CONF_KA_MAXREQ "1000") -set(MK_CONF_REQ_SIZE "32") -set(MK_CONF_SYMLINK "Off") -set(MK_CONF_TRANSPORT "liana") -set(MK_CONF_DEFAULT_MIME "text/plain") -set(MK_CONF_FDT "On") -set(MK_CONF_OVERCAPACITY "Resist") - -# Virtual Host -# ============ -set(MK_VH_SERVERNAME "127.0.0.1") -set(MK_PATH_WWW "${PROJECT_SOURCE_DIR}/lib/monkey/htdocs") -set(IN_HTTP_CONF_PATH "${PROJECT_BINARY_DIR}/conf/") - -configure_file( - "in_http_info.h.in" - "${PROJECT_SOURCE_DIR}/plugins/in_http/in_http_info.h" + http.c + http_conn.c + http_prot.c + http_config.c ) -configure_file( - "conf/in_http.conf.in" - "${PROJECT_BINARY_DIR}/conf/in_http.conf" - ) - -configure_file( - "conf/sites/default.in" - "${PROJECT_BINARY_DIR}/conf/sites/default" - ) - -configure_file( - "conf/mimetypes.conf.in" - "${PROJECT_BINARY_DIR}/conf/mimetypes.conf" - ) +FLB_PLUGIN(in_http "${src}" "") diff --git a/plugins/in_http/conf/in_http.conf.in b/plugins/in_http/conf/in_http.conf.in deleted file mode 100644 index 85c7091ac9e..00000000000 --- a/plugins/in_http/conf/in_http.conf.in +++ /dev/null @@ -1,18 +0,0 @@ -# Monkey HTTP Server - Configuration -# ================================== -[SERVER] - Listen @MK_CONF_LISTEN@ - Workers @MK_CONF_WORKERS@ - Timeout @MK_CONF_TIMEOUT@ - PidFile @MK_PATH_PIDFILE@/@MK_CONF_PIDFILE@ - Indexfile @MK_CONF_INDEXFILE@ - HideVersion @MK_CONF_HIDEVERSION@ - Resume @MK_CONF_RESUME@ - KeepAlive @MK_CONF_KA@ - KeepAliveTimeout @MK_CONF_KA_TIMEOUT@ - MaxKeepAliveRequest @MK_CONF_KA_MAXREQ@ - MaxRequestSize @MK_CONF_REQ_SIZE@ - SymLink @MK_CONF_SYMLINK@ - DefaultMimeType @MK_CONF_DEFAULT_MIME@ - FDT @MK_CONF_FDT@ - OverCapacity @MK_CONF_OVERCAPACITY@ diff --git a/plugins/in_http/conf/mimetypes.conf.in b/plugins/in_http/conf/mimetypes.conf.in deleted file mode 100644 index bf474c504b8..00000000000 --- a/plugins/in_http/conf/mimetypes.conf.in +++ /dev/null @@ -1,145 +0,0 @@ -[MIMETYPES] - html text/html - jpg image/jpeg - png image/png - js application/x-javascript - css text/css - xml text/xml - gif image/gif - flv video/x-flv - jpe image/jpeg - - deb application/x-debian-package - jpeg image/jpeg - htm text/html - bmp image/bmp - ief image/ief - tiff image/tiff - tif image/tiff - wbmp image/vnd.wap.wbmp - ras image/x-cmu-raster - ico image/x-icon - pnm image/x-portable-anymap - pbm image/x-portable-bitmap - pgm image/x-portable-graymap - ppm image/x-portable-pixmap - rgb image/x-rgb - xbm image/x-xbitmap - xpm image/x-xpixmap - xwd image/x-xwindowdump - svg image/svg+xml - svgz image/svg+xml - - json application/json - ez application/andrew-inset - hqx application/mac-binhex40 - cpt application/mac-compactpro - doc application/msword - bin application/octet-stream - dms application/octet-stream - lha application/octet-stream - lhz application/octet-stream - exe application/octet-stream - oda application/oda - pdf application/pdf - api application/postscript - eps application/postscript - ps application/postscript - smi application/smil - smil application/smil - mif application/vnd.mif - xls application/vnd.ms-excel - ppt application/vnd.ms-powerpoint - vbxml application/vnd.wap.wbxml - wmlc application/vnd.wap.wmlc - wmlsc application/vnd.wap.wmlscriptc - bcpio application/x-bcpio - vcd application/x-cdlink - pgn application/x-chess-pgn - cpio application/x-cpio - csh application/x-csh - dcr application/x-director - dir application/x-director - dxr application/x-director - dvi application/x-dvi - spl application/x-futuresplash - gtar application/x-gtar - gz application/x-gzip - hdf application/x-hdf - skp application/x-koan - skd application/x-koan - skt application/x-koan - skm application/x-koan - latex application/x-latex - nc application/x-netcdf - cdf application/x-netcdf - sh application/x-sh - shar application/x-shar - swf application/x-shockwave-flash - sit application/x-stuffit - sv4cpio application/x-sv4cpio - sv4crc application/x-sv4crc - tar application/x-tar - tcl application/x-tcl - tex application/x-tex - texinfo application/x-texinfo - texi application/x-texinfo - t application/x-troff - tr application/x-troff - roff application/x-troff - man application/x-troff-man - me application/x-troff-me - ms application/x-troff-ms - untar application/x-ustar - src application/x-wais-source - zip application/zip - - au audio/basic - snd audio/basic - mid audio/midi - midi audio/midi - kar audio/midi - mpga audio/mpeg - mp2 audio/mpeg - mp3 audio/mpeg - aif audio/x-aiff - aiff audio/x-aiff - aifc audio/x-aiff - ram audio/x-pn-realaudio - rm audio/x-pn-realaudio - rpm audio/x-pn-realaudio-plugin - ra audio/x-realaudio - wav audio/x-wav - - pdb chemical/x-pdb - xyz chemical/x-pdb - - igs model/iges - iges model/iges - msh model/mesh - mesh model/mesh - silo model/mesh - wrl model/vrml - vmrl model/vrml - - asc text/plain - txt text/plain - rtx text/richtext - rtf text/rtf - sgml text/sgml - sgm text/sgml - tsv text/tab-separated-values - wml text/vnd.wap.wml - wmls text/vnd.wap.wmlscript - etx text/x-setext - xsl text/xml - - mpeg video/mpeg - mpg video/mpeg - mpe video/mpeg - qt video/quicktime - mov video/quicktime - avi video/x-msvideo - movie video/x-sgi-movie - - ice x-conference/x-cooltalk diff --git a/plugins/in_http/conf/sites/default.in b/plugins/in_http/conf/sites/default.in deleted file mode 100644 index 565b76e5055..00000000000 --- a/plugins/in_http/conf/sites/default.in +++ /dev/null @@ -1,3 +0,0 @@ -[HOST] - ServerName @MK_VH_SERVERNAME@ - DocumentRoot @MK_PATH_WWW@ diff --git a/plugins/in_http/http.c b/plugins/in_http/http.c new file mode 100644 index 00000000000..e48f7b2c9c2 --- /dev/null +++ b/plugins/in_http/http.c @@ -0,0 +1,147 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2019-2020 The Fluent Bit Authors + * Copyright (C) 2015-2018 Treasure Data Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include +#include +#include + +#include "http.h" +#include "http_conn.h" +#include "http_config.h" + +/* + * For a server event, the collection event means a new client have arrived, we + * accept the connection and create a new TCP instance which will wait for + * JSON map messages. + */ +static int in_http_collect(struct flb_input_instance *ins, + struct flb_config *config, void *in_context) +{ + int fd; + struct flb_http *ctx = in_context; + struct http_conn *conn; + + /* Accept the new connection */ + fd = flb_net_accept(ctx->server_fd); + if (fd == -1) { + flb_plg_error(ctx->ins, "could not accept new connection"); + return -1; + } + + flb_plg_trace(ctx->ins, "new TCP connection arrived FD=%i", fd); + conn = http_conn_add(fd, ctx); + if (!conn) { + return -1; + } + return 0; +} + +static int in_http_init(struct flb_input_instance *ins, + struct flb_config *config, void *data) +{ + int ret; + struct flb_http *ctx; + + /* Create context and basic conf */ + ctx = http_config_create(ins); + if (!ctx) { + return -1; + } + + /* Set the context */ + flb_input_set_context(ins, ctx); + + ctx->evl = config->evl; + + /* Create HTTP listener */ + ctx->server_fd = flb_net_server(ctx->tcp_port, ctx->listen); + if (ctx->server_fd > 0) { + flb_plg_info(ctx->ins, "listening on %s:%s", ctx->listen, ctx->tcp_port); + } + else { + flb_plg_error(ctx->ins, "could not bind address %s:%s. Aborting", + ctx->listen, ctx->tcp_port); + http_config_destroy(ctx); + return -1; + } + + /* Set the socket non-blocking */ + flb_net_socket_nonblocking(ctx->server_fd); + + /* Collect upon data available on the standard input */ + ret = flb_input_set_collector_socket(ins, + in_http_collect, + ctx->server_fd, + config); + if (ret == -1) { + flb_plg_error(ctx->ins, "Could not set collector for IN_TCP input plugin"); + http_config_destroy(ctx); + return -1; + } + + return 0; +} + +static int in_http_exit(void *data, struct flb_config *config) +{ + struct flb_http *ctx = data; + (void) config; + + if (!ctx) { + return 0; + } + + http_config_destroy(ctx); + return 0; +} + +/* Configuration properties map */ +static struct flb_config_map config_map[] = { + { + FLB_CONFIG_MAP_SIZE, "buffer_max_size", HTTP_BUFFER_MAX_SIZE, + 0, FLB_TRUE, offsetof(struct flb_http, buffer_max_size), + "" + }, + + { + FLB_CONFIG_MAP_SIZE, "buffer_chunk_size", HTTP_BUFFER_CHUNK_SIZE, + 0, FLB_TRUE, offsetof(struct flb_http, buffer_chunk_size), + "" + }, + + /* EOF */ + {0} +}; + +/* Plugin reference */ +struct flb_input_plugin in_http_plugin = { + .name = "http", + .description = "HTTP", + .cb_init = in_http_init, + .cb_pre_run = NULL, + .cb_collect = in_http_collect, + .cb_flush_buf = NULL, + .cb_pause = NULL, + .cb_resume = NULL, + .cb_exit = in_http_exit, + .config_map = config_map, + .flags = FLB_INPUT_NET, +}; diff --git a/plugins/in_http/in_http.h b/plugins/in_http/http.h similarity index 64% rename from plugins/in_http/in_http.h rename to plugins/in_http/http.h index 526f730a094..292c052acc8 100644 --- a/plugins/in_http/in_http.h +++ b/plugins/in_http/http.h @@ -25,18 +25,25 @@ #include #include -/* Default configuration for the HTTP server */ -#define FLB_HTTP_CONFIG "in_http.conf" -#define FLB_HTTP_SITES "sites/" -#define FLB_HTTP_MIMES "mimetypes.conf" +#include -struct flb_in_http_config { +#define HTTP_BUFFER_MAX_SIZE "4M" +#define HTTP_BUFFER_CHUNK_SIZE "512K" - /* MessagePack buffers */ - msgpack_packer mp_pck; - msgpack_sbuffer mp_sbuf; +struct flb_http { + int server_fd; + flb_sds_t listen; + flb_sds_t tcp_port; + + size_t buffer_max_size; /* Maximum buffer size */ + size_t buffer_chunk_size; /* Chunk allocation size */ + + struct mk_list connections; /* linked list of connections */ + struct mk_event_loop *evl; /* Event loop context */ + + struct mk_server *server; + struct flb_input_instance *ins; }; -extern struct flb_input_plugin in_http_plugin; #endif diff --git a/plugins/in_http/http_config.c b/plugins/in_http/http_config.c new file mode 100644 index 00000000000..443155bacab --- /dev/null +++ b/plugins/in_http/http_config.c @@ -0,0 +1,72 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2019-2020 The Fluent Bit Authors + * Copyright (C) 2015-2018 Treasure Data Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "http.h" +#include "http_conn.h" + +struct flb_http *http_config_create(struct flb_input_instance *ins) +{ + int ret; + char port[8]; + struct flb_http *ctx; + + ctx = flb_calloc(1, sizeof(struct flb_http)); + if (!ctx) { + flb_errno(); + return NULL; + } + ctx->ins = ins; + mk_list_init(&ctx->connections); + + /* Load the config map */ + ret = flb_input_config_map_set(ins, (void *) ctx); + if (ret == -1) { + flb_free(ctx); + return NULL; + } + + /* Listen interface (if not set, defaults to 0.0.0.0:9880) */ + flb_input_net_default_listener("0.0.0.0", 9880, ins); + + ctx->listen = flb_strdup(ins->host.listen); + snprintf(port, sizeof(port) - 1, "%d", ins->host.port); + ctx->tcp_port = flb_strdup(port); + + /* HTTP Server specifics */ + ctx->server = flb_calloc(1, sizeof(struct mk_server)); + ctx->server->keep_alive = MK_TRUE; + return ctx; +} + +int http_config_destroy(struct flb_http *ctx) +{ + /* release all connections */ + http_conn_release_all(ctx); + + if (ctx->server) { + flb_free(ctx->server); + } + flb_free(ctx->listen); + flb_free(ctx->tcp_port); + flb_free(ctx); + return 0; +} diff --git a/plugins/in_http/http_config.h b/plugins/in_http/http_config.h new file mode 100644 index 00000000000..02d6db40ea0 --- /dev/null +++ b/plugins/in_http/http_config.h @@ -0,0 +1,30 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2019-2020 The Fluent Bit Authors + * Copyright (C) 2015-2018 Treasure Data Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_IN_HTTP_CONFIG_H +#define FLB_IN_HTTP_CONFIG_H + +#include +#include "http.h" + +struct flb_http *http_config_create(struct flb_input_instance *ins); +int http_config_destroy(struct flb_http *ctx); + +#endif diff --git a/plugins/in_http/http_conn.c b/plugins/in_http/http_conn.c new file mode 100644 index 00000000000..1db53de0a49 --- /dev/null +++ b/plugins/in_http/http_conn.c @@ -0,0 +1,205 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2019-2020 The Fluent Bit Authors + * Copyright (C) 2015-2018 Treasure Data Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include "http.h" +#include "http_conn.h" +#include "http_prot.h" + +static int http_conn_event(void *data) +{ + int ret; + int status; + size_t size; + ssize_t available; + ssize_t bytes; + char *tmp; + struct http_conn *conn = data; + struct mk_event *event; + struct flb_http *ctx = conn->ctx; + struct mk_http_session *session; + struct mk_http_request *request; + + event = &conn->event; + if (event->mask & MK_EVENT_READ) { + available = (conn->buf_size - conn->buf_len) - 1; + if (available < 1) { + if (conn->buf_size + ctx->buffer_chunk_size > ctx->buffer_max_size) { + flb_plg_trace(ctx->ins, + "fd=%i incoming data exceed limit (%zu KB)", + event->fd, (ctx->buffer_max_size / 1024)); + http_conn_del(conn); + return -1; + } + + size = conn->buf_size + ctx->buffer_chunk_size; + tmp = flb_realloc(conn->buf_data, size); + if (!tmp) { + flb_errno(); + return -1; + } + flb_plg_trace(ctx->ins, "fd=%i buffer realloc %i -> %i", + event->fd, conn->buf_size, size); + + conn->buf_data = tmp; + conn->buf_size = size; + available = (conn->buf_size - conn->buf_len) - 1; + } + + /* Read data */ + bytes = recv(conn->fd, + conn->buf_data + conn->buf_len, available, 0); + if (bytes <= 0) { + flb_plg_trace(ctx->ins, "fd=%i closed connection", event->fd); + http_conn_del(conn); + return -1; + } + + flb_plg_trace(ctx->ins, "read()=%i pre_len=%i now_len=%i", + bytes, conn->buf_len, conn->buf_len + bytes); + conn->buf_len += bytes; + conn->buf_data[conn->buf_len] = '\0'; + + session = &conn->session; + request = mk_list_entry_first(&session->request_list, + struct mk_http_request, _head); + + status = mk_http_parser(request, &session->parser, + conn->buf_data, conn->buf_len, NULL); + if (status == MK_HTTP_PARSER_OK) { + /* Do more logic parsing and checks for this request */ + ret = http_prot_handle(ctx, conn, session, request); + } + + /* FIXME: add Protocol handler here */ + return bytes; + } + + if (event->mask & MK_EVENT_CLOSE) { + flb_plg_trace(ctx->ins, "fd=%i hangup", event->fd); + http_conn_del(conn); + return -1; + } + + return 0; + +} + +static void http_conn_session_init(struct mk_http_session *session, + struct mk_server *server) +{ + /* Alloc memory for node */ + session->_sched_init = MK_TRUE; + session->pipelined = MK_FALSE; + session->counter_connections = 0; + session->close_now = MK_FALSE; + session->socket = -1; + session->status = MK_REQUEST_STATUS_INCOMPLETE; + session->server = server; + + /* creation time in unix time */ + session->init_time = time(NULL); + + /* Init session request list */ + mk_list_init(&session->request_list); + + /* Initialize the parser */ + mk_http_parser_init(&session->parser); +} + +struct http_conn *http_conn_add(int fd, struct flb_http *ctx) +{ + int ret; + struct http_conn *conn; + struct mk_event *event; + + conn = flb_calloc(1, sizeof(struct http_conn)); + if (!conn) { + flb_errno(); + return NULL; + } + + /* Set data for the event-loop */ + event = &conn->event; + MK_EVENT_NEW(event); + event->fd = fd; + event->type = FLB_ENGINE_EV_CUSTOM; + event->handler = http_conn_event; + + /* Connection info */ + conn->fd = fd; + conn->ctx = ctx; + conn->buf_len = 0; + + conn->buf_data = flb_malloc(ctx->buffer_chunk_size); + if (!conn->buf_data) { + flb_errno(); + flb_socket_close(fd); + flb_plg_error(ctx->ins, "could not allocate new connection"); + flb_free(conn); + return NULL; + } + conn->buf_size = ctx->buffer_chunk_size; + + /* Register instance into the event loop */ + ret = mk_event_add(ctx->evl, fd, FLB_ENGINE_EV_CUSTOM, MK_EVENT_READ, conn); + if (ret == -1) { + flb_plg_error(ctx->ins, "could not register new connection"); + flb_socket_close(fd); + flb_free(conn->buf_data); + flb_free(conn); + return NULL; + } + + /* Initialize HTTP Session: this is a custom context for Monkey HTTP */ + http_conn_session_init(&conn->session, ctx->server); + + /* Link connection node to parent context list */ + mk_list_add(&conn->_head, &ctx->connections); + return conn; +} + +int http_conn_del(struct http_conn *conn) +{ + struct flb_http *ctx; + + ctx = conn->ctx; + + mk_event_del(ctx->evl, &conn->event); + mk_list_del(&conn->_head); + flb_free(conn->buf_data); + flb_free(conn); + + return 0; +} + +void http_conn_release_all(struct flb_http *ctx) +{ + struct mk_list *tmp; + struct mk_list *head; + struct http_conn *conn; + + mk_list_foreach_safe(head, tmp, &ctx->connections) { + conn = mk_list_entry(head, struct http_conn, _head); + http_conn_del(conn); + } +} diff --git a/plugins/in_http/http_conn.h b/plugins/in_http/http_conn.h new file mode 100644 index 00000000000..288e1247dda --- /dev/null +++ b/plugins/in_http/http_conn.h @@ -0,0 +1,54 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2019-2020 The Fluent Bit Authors + * Copyright (C) 2015-2018 Treasure Data Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_IN_HTTP_CONN +#define FLB_IN_HTTP_CONN + +#include +#include +#include + +struct http_conn { + struct mk_event event; /* Built-in event data for mk_events */ + int fd; /* socket connection */ + + /* Buffer */ + char *buf_data; /* Buffer data */ + int buf_len; /* Data length */ + int buf_size; /* Buffer size */ + + /* + * Parser context: we only held one parser per connection + * which is re-used everytime we have a new request. + */ + struct mk_http_parser parser; + struct mk_http_request request; + struct mk_http_session session; + + void *ctx; /* Plugin parent context */ + struct mk_list _head; /* link to flb_http->connections */ +}; + +struct http_conn *http_conn_add(int fd, struct flb_http *ctx); +int http_conn_del(struct http_conn *conn); +void http_conn_release_all(struct flb_http *ctx); + + +#endif diff --git a/plugins/in_http/http_prot.c b/plugins/in_http/http_prot.c new file mode 100644 index 00000000000..5ad31446b75 --- /dev/null +++ b/plugins/in_http/http_prot.c @@ -0,0 +1,308 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2019-2020 The Fluent Bit Authors + * Copyright (C) 2015-2018 Treasure Data Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include + +#include +#include + +#include "http.h" +#include "http_conn.h" + +#define HTTP_CONTENT_JSON 0 + +static int send_response(struct http_conn *conn, int http_status, char *message) +{ + int len; + flb_sds_t out; + + out = flb_sds_create_size(256); + if (!out) { + return -1; + } + + if (message) { + len = strlen(message); + } + else { + len = 0; + } + + if (http_status == 201) { + flb_sds_printf(&out, + "HTTP/1.1 201 Created \r\n" + "Server: Fluent Bit v%s\r\n" + "Content-Length: 0\r\n\r\n", + FLB_VERSION_STR); + } + else if (http_status == 400) { + flb_sds_printf(&out, + "HTTP/1.1 400 Forbidden\r\n" + "Server: Fluent Bit v%s\r\n" + "Content-Length: %i\r\n\r\n%s", + FLB_VERSION_STR, + len, message); + } + + write(conn->fd, out, flb_sds_len(out)); + flb_sds_destroy(out); + return 0; +} + +int process_pack(struct flb_http *ctx, flb_sds_t tag, char *buf, size_t size) +{ + size_t off = 0; + msgpack_sbuffer mp_sbuf; + msgpack_packer mp_pck; + msgpack_unpacked result; + struct flb_time tm; + + msgpack_sbuffer_init(&mp_sbuf); + msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); + + flb_time_get(&tm); + + msgpack_unpacked_init(&result); + while (msgpack_unpack_next(&result, buf, size, &off) == MSGPACK_UNPACK_SUCCESS) { + if (result.data.type != MSGPACK_OBJECT_MAP) { + flb_plg_warn(ctx->ins, "skip record from invalid type: %i", + result.data.type); + continue; + } + + /* Pack record with timestamp */ + msgpack_pack_array(&mp_pck, 2); + flb_time_append_to_msgpack(&tm, &mp_pck, 0); + msgpack_pack_object(&mp_pck, result.data); + } + + /* Ingest real record into the engine */ + if (tag) { + flb_input_chunk_append_raw(ctx->ins, tag, flb_sds_len(tag), + mp_sbuf.data, mp_sbuf.size); + } + else { + /* use default plugin Tag (it internal name, e.g: http.0 */ + flb_input_chunk_append_raw(ctx->ins, NULL, 0, mp_sbuf.data, mp_sbuf.size); + } + msgpack_unpacked_destroy(&result); + msgpack_sbuffer_destroy(&mp_sbuf); + + return 0; +} + +static ssize_t parse_payload_json(struct flb_http *ctx, flb_sds_t tag, + char *payload, size_t size) +{ + int ret; + int out_size; + char *pack; + struct flb_pack_state pack_state; + + /* Initialize packer */ + flb_pack_state_init(&pack_state); + + /* Pack JSON as msgpack */ + ret = flb_pack_json_state(payload, size, + &pack, &out_size, &pack_state); + flb_pack_state_reset(&pack_state); + + /* Handle exceptions */ + if (ret == FLB_ERR_JSON_PART) { + flb_plg_warn(ctx->ins, "JSON data is incomplete, skipping"); + return -1; + } + else if (ret == FLB_ERR_JSON_INVAL) { + flb_plg_warn(ctx->ins, "invalid JSON message, skipping"); + return -1; + } + else if (ret == -1) { + return -1; + } + + /* Process the packaged JSON and return the last byte used */ + process_pack(ctx, tag, pack, out_size); + flb_free(pack); + + return 0; +} + +static int process_payload(struct flb_http *ctx, struct http_conn *conn, + flb_sds_t tag, + struct mk_http_session *session, + struct mk_http_request *request) +{ + int type = -1; + struct mk_http_header *header; + + header = &session->parser.headers[MK_HEADER_CONTENT_TYPE]; + if (header->key.data == NULL) { + send_response(conn, 400, "error: header 'Content-Type' is not set\n"); + return -1; + } + + if (header->val.len == 16 && + strncasecmp(header->val.data, "application/json", 16) == 0) { + type = HTTP_CONTENT_JSON; + } + + if (type == -1) { + send_response(conn, 400, "error: invalid 'Content-Type'\n"); + return -1; + } + + if (request->data.len <= 0) { + send_response(conn, 400, "error: no payload found\n"); + return -1; + } + + if (type == HTTP_CONTENT_JSON) { + parse_payload_json(ctx, tag, request->data.data, request->data.len); + } + + return 0; +} + +static inline int mk_http_point_header(mk_ptr_t *h, + struct mk_http_parser *parser, int key) +{ + struct mk_http_header *header; + + header = &parser->headers[key]; + if (header->type == key) { + h->data = header->val.data; + h->len = header->val.len; + return 0; + } + else { + h->data = NULL; + h->len = -1; + } + + return -1; +} + +/* + * Handle an incoming request. It perform extra checks over the request, if + * everything is OK, it enqueue the incoming payload. + */ +int http_prot_handle(struct flb_http *ctx, struct http_conn *conn, + struct mk_http_session *session, + struct mk_http_request *request) +{ + int i; + int ret; + int len; + char *uri; + char *qs; + off_t diff; + flb_sds_t tag; + struct mk_http_header *header; + + if (request->uri.data[0] != '/') { + send_response(conn, 400, "error: invalid request\n"); + return -1; + } + + /* Decode URI */ + uri = mk_utils_url_decode(request->uri); + if (!uri) { + uri = mk_mem_alloc_z(request->uri.len + 1); + if (!uri) { + return -1; + } + memcpy(uri, request->uri.data, request->uri.len); + uri[request->uri.len] = '\0'; + } + + /* Try to match a query string so we can remove it */ + qs = strchr(uri, '?'); + if (qs) { + /* remove the query string part */ + diff = qs - uri; + uri[diff] = '\0'; + } + + /* Compose the query string using the URI */ + len = strlen(uri); + + if (len == 1) { + tag = NULL; /* use default tag */ + } + else { + tag = flb_sds_create_size(len); + if (!tag) { + mk_mem_free(uri); + return -1; + } + + /* New tag skipping the URI '/' */ + flb_sds_cat(tag, uri + 1, len - 1); + + /* Sanitize, only allow alphanum chars */ + for (i = 0; i < flb_sds_len(tag); i++) { + if (!isalnum(tag[i]) && tag[i] != '_' && tag[i] != '.') { + tag[i] = '_'; + } + } + } + + mk_mem_free(uri); + + /* Check if we have a Host header: Hostname ; port */ + mk_http_point_header(&request->host, &session->parser, MK_HEADER_HOST); + + /* Header: Connection */ + mk_http_point_header(&request->connection, &session->parser, + MK_HEADER_CONNECTION); + + /* HTTP/1.1 needs Host header */ + if (!request->host.data && request->protocol == MK_HTTP_PROTOCOL_11) { + flb_sds_destroy(tag); + return -1; + } + + /* Should we close the session after this request ? */ + mk_http_keepalive_check(session, request, ctx->server); + + /* Content Length */ + header = &session->parser.headers[MK_HEADER_CONTENT_LENGTH]; + if (header->type == MK_HEADER_CONTENT_LENGTH) { + request->_content_length.data = header->val.data; + request->_content_length.len = header->val.len; + } + else { + request->_content_length.data = NULL; + } + + if (request->method != MK_METHOD_POST) { + flb_sds_destroy(tag); + send_response(conn, 400, "error: invalid HTTP method\n"); + return -1; + } + + ret = process_payload(ctx, conn, tag, session, request); + flb_sds_destroy(tag); + send_response(conn, 201, NULL); + return ret; +} diff --git a/plugins/in_http/in_http_info.h.in b/plugins/in_http/http_prot.h similarity index 72% rename from plugins/in_http/in_http_info.h.in rename to plugins/in_http/http_prot.h index aa2134f7162..12834be04c0 100644 --- a/plugins/in_http/in_http_info.h.in +++ b/plugins/in_http/http_prot.h @@ -2,6 +2,7 @@ /* Fluent Bit * ========== + * Copyright (C) 2019-2020 The Fluent Bit Authors * Copyright (C) 2015-2018 Treasure Data Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -17,9 +18,11 @@ * limitations under the License. */ -#ifndef FLB_IN_HTTP_INFO_H -#define FLB_IN_HTTP_INFO_H +#ifndef FLB_IN_HTTP_PROT +#define FLB_IN_HTTP_PROT -#define FLB_HTTP_CONF_PATH "@IN_HTTP_CONF_PATH@" +int http_prot_handle(struct flb_http *ctx, struct http_conn *conn, + struct mk_http_session *session, + struct mk_http_request *request); #endif diff --git a/plugins/in_http/in_http.c b/plugins/in_http/in_http.c deleted file mode 100644 index 33b8fcfdbe9..00000000000 --- a/plugins/in_http/in_http.c +++ /dev/null @@ -1,73 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2019-2020 The Fluent Bit Authors - * Copyright (C) 2015-2018 Treasure Data Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include -#include -#include - -#include -#include -#include - -#include "in_http.h" -#include "in_http_info.h" - -/* Init CPU input */ -int in_http_init(struct flb_config *config) -{ - mk_config = mk_server_init(); - mk_config->server_conf_file = FLB_HTTP_CONFIG; - mk_config->path_config = FLB_HTTP_CONF_PATH; - mk_config->sites_conf_dir = FLB_HTTP_SITES; - mk_config->mimes_conf_file = FLB_HTTP_MIMES; - mk_server_setup(); - - return 0; -} - -/* Callback invoked after setup but before to join the main loop */ -int in_http_pre_run(void *in_context, struct flb_config *config) -{ - /* EXPERIMENTAL!!! */ - mk_server_loop(); - return 0; -} - -/* Callback to gather CPU usage between now and previous snapshot */ -int in_http_collect(struct flb_config *config, void *in_context) -{ - return 0; -} - -void *in_http_flush(void *in_context, int *size) -{ - return NULL; -} - -/* Plugin reference */ -struct flb_input_plugin in_http_plugin = { - .name = "http", - .description = "HTTP Service", - .cb_init = in_http_init, - .cb_pre_run = in_http_pre_run, - .cb_collect = in_http_collect, - .cb_flush_buf = in_http_flush -};