From d47f9ce599a60dcce946e5b8227a128e70a90374 Mon Sep 17 00:00:00 2001 From: composer <2789706336@qq.com> Date: Tue, 22 Oct 2024 19:45:28 +0800 Subject: [PATCH 01/12] feat: add doris out plugin Signed-off-by: composer <2789706336@qq.com> --- plugins/CMakeLists.txt | 1 + plugins/out_doris/CMakeLists.txt | 6 + plugins/out_doris/doris.c | 306 +++++++++++++++++++++++++++++++ plugins/out_doris/doris.h | 46 +++++ plugins/out_doris/doris_conf.c | 116 ++++++++++++ plugins/out_doris/doris_conf.h | 32 ++++ 6 files changed, 507 insertions(+) create mode 100644 plugins/out_doris/CMakeLists.txt create mode 100644 plugins/out_doris/doris.c create mode 100644 plugins/out_doris/doris.h create mode 100644 plugins/out_doris/doris_conf.c create mode 100644 plugins/out_doris/doris_conf.h diff --git a/plugins/CMakeLists.txt b/plugins/CMakeLists.txt index 200a09b449c..7bf3f81af19 100644 --- a/plugins/CMakeLists.txt +++ b/plugins/CMakeLists.txt @@ -344,6 +344,7 @@ REGISTER_OUT_PLUGIN("out_prometheus_remote_write") REGISTER_OUT_PLUGIN("out_s3") REGISTER_OUT_PLUGIN("out_vivo_exporter") REGISTER_OUT_PLUGIN("out_chronicle") +REGISTER_OUT_PLUGIN("out_doris") # FILTERS # ======= diff --git a/plugins/out_doris/CMakeLists.txt b/plugins/out_doris/CMakeLists.txt new file mode 100644 index 00000000000..79338fdbb20 --- /dev/null +++ b/plugins/out_doris/CMakeLists.txt @@ -0,0 +1,6 @@ +set(src + doris.c + doris_conf.c + ) + +FLB_PLUGIN(out_doris "${src}" "") \ No newline at end of file diff --git a/plugins/out_doris/doris.c b/plugins/out_doris/doris.c new file mode 100644 index 00000000000..49dcc3e3531 --- /dev/null +++ b/plugins/out_doris/doris.c @@ -0,0 +1,306 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "doris.h" +#include "doris_conf.h" + +#include + +static int cb_doris_init(struct flb_output_instance *ins, + struct flb_config *config, void *data) +{ + struct flb_out_doris *ctx = NULL; + (void) data; + + ctx = flb_doris_conf_create(ins, config); + if (!ctx) { + return -1; + } + + /* Set the plugin context */ + flb_output_set_context(ins, ctx); + + /* + * This plugin instance uses the HTTP client interface, let's register + * it debugging callbacks. + */ + flb_output_set_http_debug_callbacks(ins); + + return 0; +} + +static int http_put(struct flb_out_doris *ctx, + const void *body, size_t body_len, + const char *tag, int tag_len) +{ + int ret; + int out_ret = FLB_OK; + size_t b_sent; + void *payload_buf = NULL; + size_t payload_size = 0; + struct flb_upstream *u; + struct flb_connection *u_conn; + struct flb_http_client *c; + + /* Get upstream context and connection */ + u = ctx->u; + u_conn = flb_upstream_conn_get(u); + if (!u_conn) { + flb_plg_error(ctx->ins, "no upstream connections available to %s:%i", + u->tcp_host, u->tcp_port); + return FLB_RETRY; + } + + /* Map payload */ + payload_buf = (void *) body; + payload_size = body_len; + + /* Create HTTP client context */ + c = flb_http_client(u_conn, FLB_HTTP_PUT, ctx->uri, + payload_buf, payload_size, + ctx->host, ctx->port, + NULL, 0); + + /* + * Direct assignment of the callback context to the HTTP client context. + * This needs to be improved through a more clean API. + */ + c->cb_ctx = ctx->ins->callback; + + /* Append headers */ + flb_http_add_header(c, "format", 6, "json", 4); + flb_http_add_header(c, "Expect", 6, "100-continue", 12); + flb_http_add_header(c, "strip_outer_array", 17, "true", 4); + flb_http_add_header(c, "columns", 7, ctx->columns, strlen(ctx->columns)); + flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); + if (ctx->timeout_second > 0) { + char timeout[256]; + snprintf(timeout, sizeof(timeout) - 1, "%d", ctx->timeout_second); + flb_http_add_header(c, "timeout", 7, timeout, strlen(timeout)); + } + + /* Basic Auth headers */ + flb_http_basic_auth(c, ctx->user, ctx->password); + + ret = flb_http_do(c, &b_sent); + if (ret == 0) { + flb_plg_info(ctx->ins, "%s:%i, HTTP status=%i\n%s\n", + ctx->host, ctx->port, + c->resp.status, c->resp.payload); + if (c->resp.payload_size > 0 && + (strstr(c->resp.payload, "\"Status\": \"Success\"") != NULL || + strstr(c->resp.payload, "\"Status\": \"Publish Timeout\"") != NULL)) { + // continue + } + else { + out_ret = FLB_RETRY; + } + } + else { + flb_plg_error(ctx->ins, "could not flush records to %s:%i (http_do=%i)", + ctx->host, ctx->port, ret); + out_ret = FLB_RETRY; + } + + /* cleanup */ + + /* + * If the payload buffer is different than incoming records in body, means + * we generated a different payload and must be freed. + */ + if (payload_buf != body) { + flb_free(payload_buf); + } + + /* Destroy HTTP client context */ + flb_http_client_destroy(c); + + /* Release the TCP connection */ + flb_upstream_conn_release(u_conn); + + return out_ret; +} + +static int compose_payload(struct flb_out_doris *ctx, + const void *in_body, size_t in_size, + void **out_body, size_t *out_size) +{ + flb_sds_t encoded; + + *out_body = NULL; + *out_size = 0; + + encoded = flb_pack_msgpack_to_json_format(in_body, + in_size, + FLB_PACK_JSON_FORMAT_JSON, + FLB_PACK_JSON_DATE_DOUBLE, + ctx->time_key); + if (encoded == NULL) { + flb_plg_error(ctx->ins, "failed to convert json"); + return FLB_ERROR; + } + *out_body = (void*)encoded; + *out_size = flb_sds_len(encoded); + + flb_plg_info(ctx->ins, "%s", (char*) *out_body); + + return FLB_OK; +} + +static void cb_doris_flush(struct flb_event_chunk *event_chunk, + struct flb_output_flush *out_flush, + struct flb_input_instance *i_ins, + void *out_context, + struct flb_config *config) +{ + int ret = FLB_ERROR; + struct flb_out_doris *ctx = out_context; + void *out_body; + size_t out_size; + (void) i_ins; + + ret = compose_payload(ctx, event_chunk->data, event_chunk->size, + &out_body, &out_size); + + if (ret != FLB_OK) { + FLB_OUTPUT_RETURN(ret); + } + + ret = http_put(ctx, out_body, out_size, + event_chunk->tag, flb_sds_len(event_chunk->tag)); + flb_sds_destroy(out_body); + + FLB_OUTPUT_RETURN(ret); +} + +static int cb_doris_exit(void *data, struct flb_config *config) +{ + struct flb_out_doris *ctx = data; + + flb_doris_conf_destroy(ctx); + return 0; +} + +/* Configuration properties map */ +static struct flb_config_map config_map[] = { + // host + // port + // user + { + FLB_CONFIG_MAP_STR, "user", NULL, + 0, FLB_TRUE, offsetof(struct flb_out_doris, user), + "Set HTTP auth user" + }, + // password + { + FLB_CONFIG_MAP_STR, "password", "", + 0, FLB_TRUE, offsetof(struct flb_out_doris, password), + "Set HTTP auth password" + }, + // database + { + FLB_CONFIG_MAP_STR, "database", NULL, + 0, FLB_TRUE, offsetof(struct flb_out_doris, database), + "Set database" + }, + // table + { + FLB_CONFIG_MAP_STR, "table", NULL, + 0, FLB_TRUE, offsetof(struct flb_out_doris, table), + "Set table" + }, + // time_key + { + FLB_CONFIG_MAP_STR, "time_key", "date", + 0, FLB_TRUE, offsetof(struct flb_out_doris, time_key), + "Specify the name of the date field in output" + }, + // columns + { + FLB_CONFIG_MAP_STR, "columns", "date,log", + 0, FLB_TRUE, offsetof(struct flb_out_doris, columns), + "Set columns" + }, + // timeout + { + FLB_CONFIG_MAP_INT, "timeout_second", "60", + 0, FLB_TRUE, offsetof(struct flb_out_doris, timeout_second), + "Set timeout in second" + }, + + /* EOF */ + {0} +}; + +static int cb_doris_format_test(struct flb_config *config, + struct flb_input_instance *ins, + void *plugin_context, + void *flush_ctx, + int event_type, + const char *tag, int tag_len, + const void *data, size_t bytes, + void **out_data, size_t *out_size) +{ + struct flb_out_doris *ctx = plugin_context; + int ret; + + ret = compose_payload(ctx, data, bytes, out_data, out_size); + if (ret != FLB_OK) { + flb_error("ret=%d", ret); + return -1; + } + return 0; +} + +/* Plugin reference */ +struct flb_output_plugin out_doris_plugin = { + .name = "doris", + .description = "Doris Output", + .cb_init = cb_doris_init, + .cb_pre_run = NULL, + .cb_flush = cb_doris_flush, + .cb_exit = cb_doris_exit, + .config_map = config_map, + + /* for testing */ + .test_formatter.callback = cb_doris_format_test, + + .flags = FLB_OUTPUT_NET | FLB_IO_OPT_TLS, + .workers = 2 +}; \ No newline at end of file diff --git a/plugins/out_doris/doris.h b/plugins/out_doris/doris.h new file mode 100644 index 00000000000..51864a7decd --- /dev/null +++ b/plugins/out_doris/doris.h @@ -0,0 +1,46 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_OUT_DORIS_H +#define FLB_OUT_DORIS_H + +struct flb_out_doris { + char *host; + int port; + char uri[256]; + + char *user; + char *password; + + flb_sds_t database; + flb_sds_t table; + + flb_sds_t time_key; + flb_sds_t columns; + + int timeout_second; + + /* Upstream connection to the backend server */ + struct flb_upstream *u; + + /* Plugin instance */ + struct flb_output_instance *ins; +}; + +#endif diff --git a/plugins/out_doris/doris_conf.c b/plugins/out_doris/doris_conf.c new file mode 100644 index 00000000000..0c24a228d87 --- /dev/null +++ b/plugins/out_doris/doris_conf.c @@ -0,0 +1,116 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include "doris.h" +#include "doris_conf.h" + +struct flb_out_doris *flb_doris_conf_create(struct flb_output_instance *ins, + struct flb_config *config) +{ + int ret; + int io_flags = 0; + struct flb_upstream *upstream; + struct flb_out_doris *ctx = NULL; + + /* Allocate plugin context */ + ctx = flb_calloc(1, sizeof(struct flb_out_doris)); + if (!ctx) { + flb_errno(); + return NULL; + } + ctx->ins = ins; + + ret = flb_output_config_map_set(ins, (void *) ctx); + if (ret == -1) { + flb_free(ctx); + return NULL; + } + + /* Set default network configuration */ + flb_output_net_default("127.0.0.1", 8030, ins); + + /* Validate */ + if (!ctx->user) { + flb_plg_error(ins, "user is not set"); + } + if (!ctx->database) { + flb_plg_error(ins, "database is not set"); + } + if (!ctx->table) { + flb_plg_error(ins, "table is not set"); + } + + /* Check if SSL/TLS is enabled */ +#ifdef FLB_HAVE_TLS + if (ins->use_tls == FLB_TRUE) { + io_flags = FLB_IO_TLS; + } + else { + io_flags = FLB_IO_TCP; + } +#else + io_flags = FLB_IO_TCP; +#endif + + if (ins->host.ipv6 == FLB_TRUE) { + io_flags |= FLB_IO_IPV6; + } + + /* Prepare an upstream handler */ + upstream = flb_upstream_create(config, + ins->host.name, + ins->host.port, + io_flags, ins->tls); + + if (!upstream) { + flb_free(ctx); + return NULL; + } + + /* url: /api/{database}/{table}/_stream_load */ + snprintf(ctx->uri, sizeof(ctx->uri) - 1, "/api/%s/%s/_stream_load", ctx->database, ctx->table); + + ctx->u = upstream; + ctx->host = ins->host.name; + ctx->port = ins->host.port; + + /* Set instance flags into upstream */ + flb_output_upstream_set(ctx->u, ins); + + return ctx; +} + +void flb_doris_conf_destroy(struct flb_out_doris *ctx) +{ + if (!ctx) { + return; + } + + if (ctx->u) { + flb_upstream_destroy(ctx->u); + } + + flb_free(ctx); +} diff --git a/plugins/out_doris/doris_conf.h b/plugins/out_doris/doris_conf.h new file mode 100644 index 00000000000..5c9eae4e331 --- /dev/null +++ b/plugins/out_doris/doris_conf.h @@ -0,0 +1,32 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_OUT_DORIS_CONF_H +#define FLB_OUT_DORIS_CONF_H + +#include +#include + +#include "doris.h" + +struct flb_out_doris *flb_doris_conf_create(struct flb_output_instance *ins, + struct flb_config *config); +void flb_doris_conf_destroy(struct flb_out_doris *ctx); + +#endif From b81a93d998f4460678b9a13f9fbc96d2c158793b Mon Sep 17 00:00:00 2001 From: composer <2789706336@qq.com> Date: Tue, 22 Oct 2024 21:30:16 +0800 Subject: [PATCH 02/12] feat: add unit test Signed-off-by: composer <2789706336@qq.com> --- tests/runtime/CMakeLists.txt | 1 + tests/runtime/out_doris.c | 268 +++++++++++++++++++++++++++++++++++ 2 files changed, 269 insertions(+) create mode 100644 tests/runtime/out_doris.c diff --git a/tests/runtime/CMakeLists.txt b/tests/runtime/CMakeLists.txt index e902f7892ff..99323c48817 100644 --- a/tests/runtime/CMakeLists.txt +++ b/tests/runtime/CMakeLists.txt @@ -125,6 +125,7 @@ if(FLB_IN_LIB) FLB_RT_TEST(FLB_OUT_S3 "out_s3.c") FLB_RT_TEST(FLB_OUT_TD "out_td.c") FLB_RT_TEST(FLB_OUT_INFLUXDB "out_influxdb.c") + FLB_RT_TEST(FLB_OUT_DORIS "out_doris.c") endif() diff --git a/tests/runtime/out_doris.c b/tests/runtime/out_doris.c new file mode 100644 index 00000000000..94d6616d9f1 --- /dev/null +++ b/tests/runtime/out_doris.c @@ -0,0 +1,268 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2019-2022 The Fluent Bit Authors + * Copyright (C) 2015-2018 Treasure Data Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include +#include +#include +#include +#include "flb_tests_runtime.h" + +struct test_ctx { + flb_ctx_t *flb; /* Fluent Bit library context */ + int i_ffd; /* Input fd */ + int f_ffd; /* Filter fd (unused) */ + int o_ffd; /* Output fd */ +}; + +pthread_mutex_t result_mutex = PTHREAD_MUTEX_INITIALIZER; +int num_output = 0; +static int get_output_num() +{ + int ret; + pthread_mutex_lock(&result_mutex); + ret = num_output; + pthread_mutex_unlock(&result_mutex); + + return ret; +} + +static void set_output_num(int num) +{ + pthread_mutex_lock(&result_mutex); + num_output = num; + pthread_mutex_unlock(&result_mutex); +} + +static void clear_output_num() +{ + set_output_num(0); +} + +struct str_list { + size_t size; + char **lists; +}; + +/* Callback to check expected results */ +static void cb_check_str_list(void *ctx, int ffd, int res_ret, + void *res_data, size_t res_size, void *data) +{ + char *p; + flb_sds_t out_line = res_data; + int num = get_output_num(); + size_t i; + struct str_list *l = (struct str_list *)data; + + if (!TEST_CHECK(res_data != NULL)) { + TEST_MSG("res_data is NULL"); + return; + } + + if (!TEST_CHECK(l != NULL)) { + TEST_MSG("l is NULL"); + flb_sds_destroy(out_line); + return; + } + + if(!TEST_CHECK(res_ret == 0)) { + TEST_MSG("callback ret=%d", res_ret); + } + if (!TEST_CHECK(res_data != NULL)) { + TEST_MSG("res_data is NULL"); + flb_sds_destroy(out_line); + return; + } + + for (i=0; isize; i++) { + p = strstr(out_line, l->lists[i]); + if (!TEST_CHECK(p != NULL)) { + TEST_MSG(" Got :%s\n expect:%s", out_line, l->lists[i]); + } + } + set_output_num(num+1); + + flb_sds_destroy(out_line); +} + +static struct test_ctx *test_ctx_create() +{ + int i_ffd; + int o_ffd; + struct test_ctx *ctx = NULL; + + ctx = flb_malloc(sizeof(struct test_ctx)); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("malloc failed"); + flb_errno(); + return NULL; + } + + /* Service config */ + ctx->flb = flb_create(); + flb_service_set(ctx->flb, + "Flush", "0.200000000", + "Grace", "1", + "Log_Level", "error", + NULL); + + /* Input */ + i_ffd = flb_input(ctx->flb, (char *) "lib", NULL); + TEST_CHECK(i_ffd >= 0); + ctx->i_ffd = i_ffd; + + /* Output */ + o_ffd = flb_output(ctx->flb, (char *) "doris", NULL); + ctx->o_ffd = o_ffd; + + return ctx; +} + +static void test_ctx_destroy(struct test_ctx *ctx) +{ + TEST_CHECK(ctx != NULL); + + sleep(1); + flb_stop(ctx->flb); + flb_destroy(ctx->flb); + flb_free(ctx); +} + +void flb_test_json() +{ + struct test_ctx *ctx; + int ret; + int num; + + char *buf1 = "[1, {\"msg\":\"hello world\"}]"; + size_t size1 = strlen(buf1); + char *buf2 = "[2, {\"msg\":\"hello world\"}]"; + size_t size2 = strlen(buf2); + + char *expected_strs[] = {"[{\"date\":1.0,\"msg\":\"hello world\"},{\"date\":2.0,\"msg\":\"hello world\"}]"}; + struct str_list expected = { + .size = sizeof(expected_strs)/sizeof(char*), + .lists = &expected_strs[0], + }; + + clear_output_num(); + + ctx = test_ctx_create(); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "match", "*", + "user", "admin", + "database", "d_fb", + "table", "t_fb", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set_test(ctx->flb, ctx->o_ffd, + "formatter", cb_check_str_list, + &expected, NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + ret = flb_lib_push(ctx->flb, ctx->i_ffd, (char *) buf1, size1); + TEST_CHECK(ret >= 0); + ret = flb_lib_push(ctx->flb, ctx->i_ffd, (char *) buf2, size2); + TEST_CHECK(ret >= 0); + + /* waiting to flush */ + flb_time_msleep(500); + + num = get_output_num(); + if (!TEST_CHECK(num > 0)) { + TEST_MSG("no outputs"); + } + + test_ctx_destroy(ctx); +} + +void flb_test_time_key() +{ + struct test_ctx *ctx; + int ret; + int num; + + char *buf1 = "[1, {\"msg\":\"hello world\"}]"; + size_t size1 = strlen(buf1); + + char *expected_strs[] = {"{\"timestamp\":1.0,\"msg\":\"hello world\"}"}; + struct str_list expected = { + .size = sizeof(expected_strs)/sizeof(char*), + .lists = &expected_strs[0], + }; + + clear_output_num(); + + ctx = test_ctx_create(); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "match", "*", + "user", "admin", + "database", "d_fb", + "table", "t_fb", + "time_key", "timestamp", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set_test(ctx->flb, ctx->o_ffd, + "formatter", cb_check_str_list, + &expected, NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + ret = flb_lib_push(ctx->flb, ctx->i_ffd, (char *) buf1, size1); + TEST_CHECK(ret >= 0); + + /* waiting to flush */ + flb_time_msleep(500); + + num = get_output_num(); + if (!TEST_CHECK(num > 0)) { + TEST_MSG("no outputs"); + } + + test_ctx_destroy(ctx); +} + +/* Test list */ +TEST_LIST = { + {"json" , flb_test_json}, + {"time_key" , flb_test_time_key}, + {NULL, NULL} +}; \ No newline at end of file From e504e53ce970262e429976fa795d1301ceaf73b2 Mon Sep 17 00:00:00 2001 From: composer <2789706336@qq.com> Date: Wed, 23 Oct 2024 14:23:17 +0800 Subject: [PATCH 03/12] feat: support http redirect Signed-off-by: composer <2789706336@qq.com> --- plugins/out_doris/doris.c | 64 +++++++++++++++++++++++++++++---------- 1 file changed, 48 insertions(+), 16 deletions(-) diff --git a/plugins/out_doris/doris.c b/plugins/out_doris/doris.c index 49dcc3e3531..8fead74c6a2 100644 --- a/plugins/out_doris/doris.c +++ b/plugins/out_doris/doris.c @@ -66,8 +66,9 @@ static int cb_doris_init(struct flb_output_instance *ins, } static int http_put(struct flb_out_doris *ctx, - const void *body, size_t body_len, - const char *tag, int tag_len) + const char *host, int port, + const void *body, size_t body_len, + const char *tag, int tag_len) { int ret; int out_ret = FLB_OK; @@ -79,7 +80,16 @@ static int http_put(struct flb_out_doris *ctx, struct flb_http_client *c; /* Get upstream context and connection */ - u = ctx->u; + if (strcmp(host, ctx->host) == 0 && port == ctx->port) { + u = ctx->u; + } + else { + u = flb_upstream_create(ctx->u->base.config, + host, + port, + ctx->u->base.flags, + ctx->u->base.tls_context); + } u_conn = flb_upstream_conn_get(u); if (!u_conn) { flb_plg_error(ctx->ins, "no upstream connections available to %s:%i", @@ -94,7 +104,7 @@ static int http_put(struct flb_out_doris *ctx, /* Create HTTP client context */ c = flb_http_client(u_conn, FLB_HTTP_PUT, ctx->uri, payload_buf, payload_size, - ctx->host, ctx->port, + host, port, NULL, 0); /* @@ -120,13 +130,32 @@ static int http_put(struct flb_out_doris *ctx, ret = flb_http_do(c, &b_sent); if (ret == 0) { - flb_plg_info(ctx->ins, "%s:%i, HTTP status=%i\n%s\n", - ctx->host, ctx->port, - c->resp.status, c->resp.payload); - if (c->resp.payload_size > 0 && - (strstr(c->resp.payload, "\"Status\": \"Success\"") != NULL || - strstr(c->resp.payload, "\"Status\": \"Publish Timeout\"") != NULL)) { + flb_plg_debug(ctx->ins, "%s:%i, HTTP status=%i\n%s\n", + host, port, + c->resp.status, c->resp.payload); + if (c->resp.status == 307) { // redict + // example: Location: http://admin:admin@127.0.0.1:8040/api/d_fb/t_fb/_stream_load? + char* location = strstr(c->resp.data, "Location:"); + char* start = strstr(location, "@") + 1; + char* mid = strstr(start, ":"); + char* end = strstr(mid, "/api"); + char redict_host[50] = {0}; + memcpy(redict_host, start, mid - start); + char redict_port[10] = {0}; + memcpy(redict_port, mid + 1, end - (mid + 1)); + + out_ret = http_put(ctx, redict_host, atoi(redict_port), + body, body_len, tag, tag_len); + } + else if (c->resp.status == 200) { + if (c->resp.payload_size > 0 && + (strstr(c->resp.payload, "\"Status\": \"Success\"") != NULL || + strstr(c->resp.payload, "\"Status\": \"Publish Timeout\"") != NULL)) { // continue + } + else { + out_ret = FLB_RETRY; + } } else { out_ret = FLB_RETRY; @@ -154,6 +183,11 @@ static int http_put(struct flb_out_doris *ctx, /* Release the TCP connection */ flb_upstream_conn_release(u_conn); + /* Release flb_upstream */ + if (u != ctx->u) { + flb_upstream_destroy(u); + } + return out_ret; } @@ -169,7 +203,7 @@ static int compose_payload(struct flb_out_doris *ctx, encoded = flb_pack_msgpack_to_json_format(in_body, in_size, FLB_PACK_JSON_FORMAT_JSON, - FLB_PACK_JSON_DATE_DOUBLE, + FLB_PACK_JSON_DATE_EPOCH, ctx->time_key); if (encoded == NULL) { flb_plg_error(ctx->ins, "failed to convert json"); @@ -178,7 +212,7 @@ static int compose_payload(struct flb_out_doris *ctx, *out_body = (void*)encoded; *out_size = flb_sds_len(encoded); - flb_plg_info(ctx->ins, "%s", (char*) *out_body); + flb_plg_debug(ctx->ins, "http body: %s", (char*) *out_body); return FLB_OK; } @@ -202,8 +236,8 @@ static void cb_doris_flush(struct flb_event_chunk *event_chunk, FLB_OUTPUT_RETURN(ret); } - ret = http_put(ctx, out_body, out_size, - event_chunk->tag, flb_sds_len(event_chunk->tag)); + ret = http_put(ctx, ctx->host, ctx->port, out_body, out_size, + event_chunk->tag, flb_sds_len(event_chunk->tag)); flb_sds_destroy(out_body); FLB_OUTPUT_RETURN(ret); @@ -219,8 +253,6 @@ static int cb_doris_exit(void *data, struct flb_config *config) /* Configuration properties map */ static struct flb_config_map config_map[] = { - // host - // port // user { FLB_CONFIG_MAP_STR, "user", NULL, From daa4540f47e924f1acbad91ea0779e4eefb92cd3 Mon Sep 17 00:00:00 2001 From: composer <2789706336@qq.com> Date: Wed, 23 Oct 2024 15:04:01 +0800 Subject: [PATCH 04/12] feat: add doris build config Signed-off-by: composer <2789706336@qq.com> --- cmake/plugins_options.cmake | 1 + cmake/windows-setup.cmake | 1 + 2 files changed, 2 insertions(+) diff --git a/cmake/plugins_options.cmake b/cmake/plugins_options.cmake index 4292ef204b5..2d86f69bc9b 100644 --- a/cmake/plugins_options.cmake +++ b/cmake/plugins_options.cmake @@ -111,6 +111,7 @@ DEFINE_OPTION(FLB_OUT_CHRONICLE "Enable Google Chronicle output pl DEFINE_OPTION(FLB_OUT_CLOUDWATCH_LOGS "Enable AWS CloudWatch output plugin" ON) DEFINE_OPTION(FLB_OUT_COUNTER "Enable Counter output plugin" ON) DEFINE_OPTION(FLB_OUT_DATADOG "Enable DataDog output plugin" ON) +DEFINE_OPTION(FLB_OUT_DORIS "Enable Apache Doris output plugin" ON) DEFINE_OPTION(FLB_OUT_ES "Enable Elasticsearch output plugin" ON) DEFINE_OPTION(FLB_OUT_EXIT "Enable Exit output plugin" ON) DEFINE_OPTION(FLB_OUT_FILE "Enable file output plugin" ON) diff --git a/cmake/windows-setup.cmake b/cmake/windows-setup.cmake index b947d6cbfa0..e24909211e5 100644 --- a/cmake/windows-setup.cmake +++ b/cmake/windows-setup.cmake @@ -66,6 +66,7 @@ if(FLB_WINDOWS_DEFAULTS) set(FLB_OUT_COUNTER Yes) set(FLB_OUT_CHRONICLE Yes) set(FLB_OUT_DATADOG Yes) + set(FLB_OUT_DORIS Yes) set(FLB_OUT_ES Yes) set(FLB_OUT_EXIT No) set(FLB_OUT_FORWARD Yes) From e24c442be418c75bfebf0aa3bfc07ce47239c04c Mon Sep 17 00:00:00 2001 From: composer <2789706336@qq.com> Date: Thu, 31 Oct 2024 01:17:42 +0800 Subject: [PATCH 05/12] feat: add more doris config Signed-off-by: composer <2789706336@qq.com> --- plugins/out_doris/doris.c | 130 +++++++++++++++++++++++++++------ plugins/out_doris/doris.h | 19 ++++- plugins/out_doris/doris_conf.c | 79 ++++++++++++++++++++ 3 files changed, 202 insertions(+), 26 deletions(-) diff --git a/plugins/out_doris/doris.c b/plugins/out_doris/doris.c index 8fead74c6a2..dfcf6e3c8db 100644 --- a/plugins/out_doris/doris.c +++ b/plugins/out_doris/doris.c @@ -78,12 +78,27 @@ static int http_put(struct flb_out_doris *ctx, struct flb_upstream *u; struct flb_connection *u_conn; struct flb_http_client *c; + struct mk_list *head; + struct flb_config_map_val *mv; + struct flb_slist_entry *key = NULL; + struct flb_slist_entry *val = NULL; + + int i; + int root_type; + char *out_buf; + size_t off = 0; + size_t out_size; + msgpack_unpacked result; + msgpack_object root; + msgpack_object msg_key; + msgpack_object msg_val; /* Get upstream context and connection */ if (strcmp(host, ctx->host) == 0 && port == ctx->port) { u = ctx->u; } else { + // TODO cache u = flb_upstream_create(ctx->u->base.config, host, port, @@ -117,12 +132,15 @@ static int http_put(struct flb_out_doris *ctx, flb_http_add_header(c, "format", 6, "json", 4); flb_http_add_header(c, "Expect", 6, "100-continue", 12); flb_http_add_header(c, "strip_outer_array", 17, "true", 4); - flb_http_add_header(c, "columns", 7, ctx->columns, strlen(ctx->columns)); flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); - if (ctx->timeout_second > 0) { - char timeout[256]; - snprintf(timeout, sizeof(timeout) - 1, "%d", ctx->timeout_second); - flb_http_add_header(c, "timeout", 7, timeout, strlen(timeout)); + + flb_config_map_foreach(head, mv, ctx->headers) { + key = mk_list_entry_first(mv->val.list, struct flb_slist_entry, _head); + val = mk_list_entry_last(mv->val.list, struct flb_slist_entry, _head); + + flb_http_add_header(c, + key->str, flb_sds_len(key->str), + val->str, flb_sds_len(val->str)); } /* Basic Auth headers */ @@ -130,9 +148,16 @@ static int http_put(struct flb_out_doris *ctx, ret = flb_http_do(c, &b_sent); if (ret == 0) { - flb_plg_debug(ctx->ins, "%s:%i, HTTP status=%i\n%s\n", - host, port, - c->resp.status, c->resp.payload); + if (ctx->log_request) { + flb_plg_info(ctx->ins, "%s:%i, HTTP status=%i\n%s\n", + host, port, + c->resp.status, c->resp.payload); + } else { + flb_plg_debug(ctx->ins, "%s:%i, HTTP status=%i\n%s\n", + host, port, + c->resp.status, c->resp.payload); + } + if (c->resp.status == 307) { // redict // example: Location: http://admin:admin@127.0.0.1:8040/api/d_fb/t_fb/_stream_load? char* location = strstr(c->resp.data, "Location:"); @@ -147,15 +172,53 @@ static int http_put(struct flb_out_doris *ctx, out_ret = http_put(ctx, redict_host, atoi(redict_port), body, body_len, tag, tag_len); } - else if (c->resp.status == 200) { - if (c->resp.payload_size > 0 && - (strstr(c->resp.payload, "\"Status\": \"Success\"") != NULL || - strstr(c->resp.payload, "\"Status\": \"Publish Timeout\"") != NULL)) { - // continue + else if (c->resp.status == 200 && c->resp.payload_size > 0) { + ret = flb_pack_json(c->resp.payload, c->resp.payload_size, + &out_buf, &out_size, &root_type, NULL); + + if (ret == -1) { + out_ret = FLB_RETRY; + } + + msgpack_unpacked_init(&result); + ret = msgpack_unpack_next(&result, out_buf, out_size, &off); + if (ret != MSGPACK_UNPACK_SUCCESS) { + out_ret = FLB_RETRY; } - else { + + root = result.data; + if (root.type != MSGPACK_OBJECT_MAP) { out_ret = FLB_RETRY; } + + for (i = 0; i < root.via.map.size; i++) { + msg_key = root.via.map.ptr[i].key; + if (msg_key.type != MSGPACK_OBJECT_STR) { + out_ret = FLB_RETRY; + break; + } + + if (msg_key.via.str.size == 6 && strncmp(msg_key.via.str.ptr, "Status", 6) == 0) { + msg_val = root.via.map.ptr[i].val; + if (msg_val.type != MSGPACK_OBJECT_STR) { + out_ret = FLB_RETRY; + break; + } + + if (msg_val.via.str.size == 7 && strncmp(msg_val.via.str.ptr, "Success", 7) == 0) { + out_ret = FLB_OK; + break; + } + + if (msg_val.via.str.size == 15 && strncmp(msg_val.via.str.ptr, "Publish Timeout", 15) == 0) { + out_ret = FLB_OK; + break; + } + + out_ret = FLB_RETRY; + break; + } + } } else { out_ret = FLB_RETRY; @@ -204,7 +267,7 @@ static int compose_payload(struct flb_out_doris *ctx, in_size, FLB_PACK_JSON_FORMAT_JSON, FLB_PACK_JSON_DATE_EPOCH, - ctx->time_key); + ctx->date_key); if (encoded == NULL) { flb_plg_error(ctx->ins, "failed to convert json"); return FLB_ERROR; @@ -212,7 +275,11 @@ static int compose_payload(struct flb_out_doris *ctx, *out_body = (void*)encoded; *out_size = flb_sds_len(encoded); - flb_plg_debug(ctx->ins, "http body: %s", (char*) *out_body); + if (ctx->log_request) { + flb_plg_info(ctx->ins, "http body: %s", (char*) *out_body); + } else { + flb_plg_debug(ctx->ins, "http body: %s", (char*) *out_body); + } return FLB_OK; } @@ -233,6 +300,9 @@ static void cb_doris_flush(struct flb_event_chunk *event_chunk, &out_body, &out_size); if (ret != FLB_OK) { + if (ret == FLB_ERROR) { + __sync_fetch_and_add(&ctx->reporter->failed_rows, event_chunk->total_events); + } FLB_OUTPUT_RETURN(ret); } @@ -240,6 +310,12 @@ static void cb_doris_flush(struct flb_event_chunk *event_chunk, event_chunk->tag, flb_sds_len(event_chunk->tag)); flb_sds_destroy(out_body); + if (ret == FLB_OK) { + __sync_fetch_and_add(&ctx->reporter->total_bytes, out_size); + __sync_fetch_and_add(&ctx->reporter->total_rows, event_chunk->total_events); + } else if (ret == FLB_ERROR) { + __sync_fetch_and_add(&ctx->reporter->failed_rows, event_chunk->total_events); + } FLB_OUTPUT_RETURN(ret); } @@ -283,17 +359,23 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct flb_out_doris, time_key), "Specify the name of the date field in output" }, - // columns + // header + { + FLB_CONFIG_MAP_SLIST_1, "header", NULL, + FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct flb_out_doris, headers), + "Add a doris stream load header key/value pair. Multiple headers can be set" + }, + // log_request { - FLB_CONFIG_MAP_STR, "columns", "date,log", - 0, FLB_TRUE, offsetof(struct flb_out_doris, columns), - "Set columns" + FLB_CONFIG_MAP_BOOL, "log_request", "true", + 0, FLB_TRUE, offsetof(struct flb_out_doris, log_request), + "Specify if the doris stream load request and response should be logged or not" }, - // timeout + // log_progress_interval { - FLB_CONFIG_MAP_INT, "timeout_second", "60", - 0, FLB_TRUE, offsetof(struct flb_out_doris, timeout_second), - "Set timeout in second" + FLB_CONFIG_MAP_INT, "log_progress_interval", "10", + 0, FLB_TRUE, offsetof(struct flb_out_doris, log_progress_interval), + "Specify the interval in seconds to log the progress of the doris stream load" }, /* EOF */ diff --git a/plugins/out_doris/doris.h b/plugins/out_doris/doris.h index 51864a7decd..625cc050aa5 100644 --- a/plugins/out_doris/doris.h +++ b/plugins/out_doris/doris.h @@ -20,6 +20,14 @@ #ifndef FLB_OUT_DORIS_H #define FLB_OUT_DORIS_H +#include + +struct flb_doris_progress_reporter { + size_t total_bytes; + size_t total_rows; + size_t failed_rows; +}; + struct flb_out_doris { char *host; int port; @@ -32,9 +40,16 @@ struct flb_out_doris { flb_sds_t table; flb_sds_t time_key; - flb_sds_t columns; + flb_sds_t date_key; /* internal use */ + + /* doris stream load headers */ + struct mk_list *headers; + + int log_request; + int log_progress_interval; - int timeout_second; + struct flb_doris_progress_reporter *reporter; + pthread_t reporter_thread; /* Upstream connection to the backend server */ struct flb_upstream *u; diff --git a/plugins/out_doris/doris_conf.c b/plugins/out_doris/doris_conf.c index 0c24a228d87..a45a1ab4805 100644 --- a/plugins/out_doris/doris_conf.c +++ b/plugins/out_doris/doris_conf.c @@ -23,16 +23,60 @@ #include #include #include +#include #include "doris.h" #include "doris_conf.h" +void *report(void *c) { + struct flb_out_doris *ctx = (struct flb_out_doris *) c; + + size_t init_time = cfl_time_now() / 1000000000L; + size_t last_time = init_time; + size_t last_bytes = ctx->reporter->total_bytes; + size_t last_rows = ctx->reporter->total_rows; + + size_t cur_time, cur_bytes, cur_rows, total_time, total_speed_mbps, total_speed_rps; + size_t inc_bytes, inc_rows, inc_time, inc_speed_mbps, inc_speed_rps; + + flb_plg_info(ctx->ins, "Start progress reporter with interval %d", ctx->log_progress_interval); + + while (ctx->log_progress_interval > 0) { + sleep(ctx->log_progress_interval); + + cur_time = cfl_time_now() / 1000000000L; + cur_bytes = ctx->reporter->total_bytes; + cur_rows = ctx->reporter->total_rows; + total_time = cur_time - init_time; + total_speed_mbps = cur_bytes / 1024 / 1024 / total_time; + total_speed_rps = cur_rows / total_time; + + inc_bytes = cur_bytes - last_bytes; + inc_rows = cur_rows - last_rows; + inc_time = cur_time - last_time; + inc_speed_mbps = inc_bytes / 1024 / 1024 / inc_time; + inc_speed_rps = inc_rows / inc_time; + + flb_plg_info(ctx->ins, "total %zu MB %zu ROWS, total speed %zu MB/s %zu R/s, last %zu seconds speed %zu MB/s %zu R/s", + cur_bytes/1024/1024, cur_rows, total_speed_mbps, total_speed_rps, + inc_time, inc_speed_mbps, inc_speed_rps); + + last_time = cur_time; + last_bytes = cur_bytes; + last_rows = cur_rows; + } + + return NULL; +} + struct flb_out_doris *flb_doris_conf_create(struct flb_output_instance *ins, struct flb_config *config) { int ret; int io_flags = 0; + const char *tmp; struct flb_upstream *upstream; struct flb_out_doris *ctx = NULL; + struct flb_doris_progress_reporter *reporter = NULL; /* Allocate plugin context */ ctx = flb_calloc(1, sizeof(struct flb_out_doris)); @@ -92,6 +136,16 @@ struct flb_out_doris *flb_doris_conf_create(struct flb_output_instance *ins, /* url: /api/{database}/{table}/_stream_load */ snprintf(ctx->uri, sizeof(ctx->uri) - 1, "/api/%s/%s/_stream_load", ctx->database, ctx->table); + /* Date key */ + ctx->date_key = ctx->time_key; + tmp = flb_output_get_property("time_key", ins); + if (tmp) { + /* Just check if we have to disable it */ + if (flb_utils_bool(tmp) == FLB_FALSE) { + ctx->date_key = NULL; + } + } + ctx->u = upstream; ctx->host = ins->host.name; ctx->port = ins->host.port; @@ -99,6 +153,26 @@ struct flb_out_doris *flb_doris_conf_create(struct flb_output_instance *ins, /* Set instance flags into upstream */ flb_output_upstream_set(ctx->u, ins); + /* create and start the progress reporter */ + if (ctx->log_progress_interval > 0) { + reporter = flb_calloc(1, sizeof(struct flb_doris_progress_reporter)); + if (!reporter) { + flb_plg_error(ins, "failed to create progress reporter"); + flb_doris_conf_destroy(ctx); + return NULL; + } + reporter->total_bytes = 0; + reporter->total_rows = 0; + reporter->failed_rows = 0; + ctx->reporter = reporter; + + if(pthread_create(&ctx->reporter_thread, NULL, report, (void *) ctx)) { + flb_plg_error(ins, "failed to create progress reporter"); + flb_doris_conf_destroy(ctx); + return NULL; + } + } + return ctx; } @@ -112,5 +186,10 @@ void flb_doris_conf_destroy(struct flb_out_doris *ctx) flb_upstream_destroy(ctx->u); } + if (ctx->reporter) { + pthread_cancel(ctx->reporter_thread); + flb_free(ctx->reporter); + } + flb_free(ctx); } From 7b44ce656effe13b33e1298c9d873075891abc19 Mon Sep 17 00:00:00 2001 From: composer <2789706336@qq.com> Date: Thu, 31 Oct 2024 04:35:59 +0800 Subject: [PATCH 06/12] add label config Signed-off-by: composer <2789706336@qq.com> --- plugins/out_doris/doris.c | 27 ++++++++++++++++++++++++--- plugins/out_doris/doris.h | 3 +++ plugins/out_doris/doris_conf.c | 10 ++++++++++ 3 files changed, 37 insertions(+), 3 deletions(-) diff --git a/plugins/out_doris/doris.c b/plugins/out_doris/doris.c index dfcf6e3c8db..158a78c089c 100644 --- a/plugins/out_doris/doris.c +++ b/plugins/out_doris/doris.c @@ -68,7 +68,8 @@ static int cb_doris_init(struct flb_output_instance *ins, static int http_put(struct flb_out_doris *ctx, const char *host, int port, const void *body, size_t body_len, - const char *tag, int tag_len) + const char *tag, int tag_len, + const char *label, int label_len) { int ret; int out_ret = FLB_OK; @@ -134,6 +135,11 @@ static int http_put(struct flb_out_doris *ctx, flb_http_add_header(c, "strip_outer_array", 17, "true", 4); flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); + if (ctx->add_label) { + flb_http_add_header(c, "label", 5, label, label_len); + flb_plg_debug(ctx->ins, "add label: %s", label); + } + flb_config_map_foreach(head, mv, ctx->headers) { key = mk_list_entry_first(mv->val.list, struct flb_slist_entry, _head); val = mk_list_entry_last(mv->val.list, struct flb_slist_entry, _head); @@ -170,7 +176,7 @@ static int http_put(struct flb_out_doris *ctx, memcpy(redict_port, mid + 1, end - (mid + 1)); out_ret = http_put(ctx, redict_host, atoi(redict_port), - body, body_len, tag, tag_len); + body, body_len, tag, tag_len, label, label_len); } else if (c->resp.status == 200 && c->resp.payload_size > 0) { ret = flb_pack_json(c->resp.payload, c->resp.payload_size, @@ -296,6 +302,9 @@ static void cb_doris_flush(struct flb_event_chunk *event_chunk, size_t out_size; (void) i_ins; + char label[256] = {0}; + int len = 0; + ret = compose_payload(ctx, event_chunk->data, event_chunk->size, &out_body, &out_size); @@ -306,8 +315,14 @@ static void cb_doris_flush(struct flb_event_chunk *event_chunk, FLB_OUTPUT_RETURN(ret); } + if (ctx->add_label) { + len = snprintf(label, sizeof(label) - 1, "%s_%lu_", ctx->label_prefix, cfl_time_now() / 1000000000L); + flb_utils_uuid_v4_gen(label + len); + len += 36; + } + ret = http_put(ctx, ctx->host, ctx->port, out_body, out_size, - event_chunk->tag, flb_sds_len(event_chunk->tag)); + event_chunk->tag, flb_sds_len(event_chunk->tag), label, len); flb_sds_destroy(out_body); if (ret == FLB_OK) { @@ -353,6 +368,12 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct flb_out_doris, table), "Set table" }, + // label_prefix + { + FLB_CONFIG_MAP_STR, "label_prefix", "flubentbit", + 0, FLB_TRUE, offsetof(struct flb_out_doris, label_prefix), + "Set label prefix" + }, // time_key { FLB_CONFIG_MAP_STR, "time_key", "date", diff --git a/plugins/out_doris/doris.h b/plugins/out_doris/doris.h index 625cc050aa5..b18a48bad2a 100644 --- a/plugins/out_doris/doris.h +++ b/plugins/out_doris/doris.h @@ -39,6 +39,9 @@ struct flb_out_doris { flb_sds_t database; flb_sds_t table; + flb_sds_t label_prefix; + int add_label; + flb_sds_t time_key; flb_sds_t date_key; /* internal use */ diff --git a/plugins/out_doris/doris_conf.c b/plugins/out_doris/doris_conf.c index a45a1ab4805..9dbb315a44c 100644 --- a/plugins/out_doris/doris_conf.c +++ b/plugins/out_doris/doris_conf.c @@ -136,6 +136,16 @@ struct flb_out_doris *flb_doris_conf_create(struct flb_output_instance *ins, /* url: /api/{database}/{table}/_stream_load */ snprintf(ctx->uri, sizeof(ctx->uri) - 1, "/api/%s/%s/_stream_load", ctx->database, ctx->table); + /* label prefix */ + ctx->add_label = 1; + tmp = flb_output_get_property("label_prefix", ins); + if (tmp) { + /* Just check if we have to disable it */ + if (flb_utils_bool(tmp) == FLB_FALSE) { + ctx->add_label = 0; + } + } + /* Date key */ ctx->date_key = ctx->time_key; tmp = flb_output_get_property("time_key", ins); From e8a2a609a92df8bc522d89c959c935397b8872aa Mon Sep 17 00:00:00 2001 From: composer <2789706336@qq.com> Date: Thu, 31 Oct 2024 04:42:18 +0800 Subject: [PATCH 07/12] fix: unit test Signed-off-by: composer <2789706336@qq.com> --- tests/runtime/out_doris.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/runtime/out_doris.c b/tests/runtime/out_doris.c index 94d6616d9f1..d91ab581a2d 100644 --- a/tests/runtime/out_doris.c +++ b/tests/runtime/out_doris.c @@ -156,7 +156,7 @@ void flb_test_json() char *buf2 = "[2, {\"msg\":\"hello world\"}]"; size_t size2 = strlen(buf2); - char *expected_strs[] = {"[{\"date\":1.0,\"msg\":\"hello world\"},{\"date\":2.0,\"msg\":\"hello world\"}]"}; + char *expected_strs[] = {"[{\"date\":1,\"msg\":\"hello world\"},{\"date\":2,\"msg\":\"hello world\"}]"}; struct str_list expected = { .size = sizeof(expected_strs)/sizeof(char*), .lists = &expected_strs[0], @@ -213,7 +213,7 @@ void flb_test_time_key() char *buf1 = "[1, {\"msg\":\"hello world\"}]"; size_t size1 = strlen(buf1); - char *expected_strs[] = {"{\"timestamp\":1.0,\"msg\":\"hello world\"}"}; + char *expected_strs[] = {"{\"timestamp\":1,\"msg\":\"hello world\"}"}; struct str_list expected = { .size = sizeof(expected_strs)/sizeof(char*), .lists = &expected_strs[0], From c02e1c7f3b49b8b12affe3bbbe487abe59739a00 Mon Sep 17 00:00:00 2001 From: composer <2789706336@qq.com> Date: Thu, 31 Oct 2024 05:10:06 +0800 Subject: [PATCH 08/12] fix: mem leak Signed-off-by: composer <2789706336@qq.com> --- plugins/out_doris/doris.c | 3 +++ plugins/out_doris/doris_conf.c | 2 ++ 2 files changed, 5 insertions(+) diff --git a/plugins/out_doris/doris.c b/plugins/out_doris/doris.c index 158a78c089c..407f4f0936b 100644 --- a/plugins/out_doris/doris.c +++ b/plugins/out_doris/doris.c @@ -225,6 +225,9 @@ static int http_put(struct flb_out_doris *ctx, break; } } + + flb_free(out_buf); + msgpack_unpacked_destroy(&result); } else { out_ret = FLB_RETRY; diff --git a/plugins/out_doris/doris_conf.c b/plugins/out_doris/doris_conf.c index 9dbb315a44c..4730a1bd604 100644 --- a/plugins/out_doris/doris_conf.c +++ b/plugins/out_doris/doris_conf.c @@ -38,6 +38,8 @@ void *report(void *c) { size_t cur_time, cur_bytes, cur_rows, total_time, total_speed_mbps, total_speed_rps; size_t inc_bytes, inc_rows, inc_time, inc_speed_mbps, inc_speed_rps; + pthread_detach(pthread_self()); + flb_plg_info(ctx->ins, "Start progress reporter with interval %d", ctx->log_progress_interval); while (ctx->log_progress_interval > 0) { From 14f25266f5589040608cec56baa5c423acd3a099 Mon Sep 17 00:00:00 2001 From: composer <2789706336@qq.com> Date: Thu, 31 Oct 2024 05:20:19 +0800 Subject: [PATCH 09/12] fix: typo Signed-off-by: composer <2789706336@qq.com> --- plugins/out_doris/doris.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/out_doris/doris.c b/plugins/out_doris/doris.c index 407f4f0936b..34a34b103ca 100644 --- a/plugins/out_doris/doris.c +++ b/plugins/out_doris/doris.c @@ -373,7 +373,7 @@ static struct flb_config_map config_map[] = { }, // label_prefix { - FLB_CONFIG_MAP_STR, "label_prefix", "flubentbit", + FLB_CONFIG_MAP_STR, "label_prefix", "fluentbit", 0, FLB_TRUE, offsetof(struct flb_out_doris, label_prefix), "Set label prefix" }, From affcdc51649d6925b98d28401ff10c27aedc198f Mon Sep 17 00:00:00 2001 From: composer <2789706336@qq.com> Date: Thu, 31 Oct 2024 14:49:31 +0800 Subject: [PATCH 10/12] fix: only calculate speed when interval > 0 Signed-off-by: composer <2789706336@qq.com> --- plugins/out_doris/doris.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/plugins/out_doris/doris.c b/plugins/out_doris/doris.c index 34a34b103ca..dcac70bd3cc 100644 --- a/plugins/out_doris/doris.c +++ b/plugins/out_doris/doris.c @@ -312,7 +312,7 @@ static void cb_doris_flush(struct flb_event_chunk *event_chunk, &out_body, &out_size); if (ret != FLB_OK) { - if (ret == FLB_ERROR) { + if (ret == FLB_ERROR && ctx->log_progress_interval > 0) { __sync_fetch_and_add(&ctx->reporter->failed_rows, event_chunk->total_events); } FLB_OUTPUT_RETURN(ret); @@ -328,10 +328,10 @@ static void cb_doris_flush(struct flb_event_chunk *event_chunk, event_chunk->tag, flb_sds_len(event_chunk->tag), label, len); flb_sds_destroy(out_body); - if (ret == FLB_OK) { + if (ret == FLB_OK && ctx->log_progress_interval > 0) { __sync_fetch_and_add(&ctx->reporter->total_bytes, out_size); __sync_fetch_and_add(&ctx->reporter->total_rows, event_chunk->total_events); - } else if (ret == FLB_ERROR) { + } else if (ret == FLB_ERROR && ctx->log_progress_interval > 0) { __sync_fetch_and_add(&ctx->reporter->failed_rows, event_chunk->total_events); } FLB_OUTPUT_RETURN(ret); From 0dbeebf98f7559be5460f537e8a27b271d10c3af Mon Sep 17 00:00:00 2001 From: composer <2789706336@qq.com> Date: Sat, 2 Nov 2024 11:07:19 +0800 Subject: [PATCH 11/12] fix: windows atomic Signed-off-by: composer <2789706336@qq.com> --- plugins/out_doris/doris.c | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/plugins/out_doris/doris.c b/plugins/out_doris/doris.c index dcac70bd3cc..b9025f80c08 100644 --- a/plugins/out_doris/doris.c +++ b/plugins/out_doris/doris.c @@ -42,6 +42,10 @@ #include +#ifdef FLB_SYSTEM_WINDOWS +#include +#endif + static int cb_doris_init(struct flb_output_instance *ins, struct flb_config *config, void *data) { @@ -313,7 +317,11 @@ static void cb_doris_flush(struct flb_event_chunk *event_chunk, if (ret != FLB_OK) { if (ret == FLB_ERROR && ctx->log_progress_interval > 0) { +#ifdef FLB_SYSTEM_WINDOWS + InterlockedAdd(&ctx->reporter->failed_rows, event_chunk->total_events); +#else __sync_fetch_and_add(&ctx->reporter->failed_rows, event_chunk->total_events); +#endif } FLB_OUTPUT_RETURN(ret); } @@ -329,10 +337,19 @@ static void cb_doris_flush(struct flb_event_chunk *event_chunk, flb_sds_destroy(out_body); if (ret == FLB_OK && ctx->log_progress_interval > 0) { +#ifdef FLB_SYSTEM_WINDOWS + InterlockedAdd(&ctx->reporter->total_bytes, out_size); + InterlockedAdd(&ctx->reporter->total_rows, event_chunk->total_events); +#else __sync_fetch_and_add(&ctx->reporter->total_bytes, out_size); __sync_fetch_and_add(&ctx->reporter->total_rows, event_chunk->total_events); +#endif } else if (ret == FLB_ERROR && ctx->log_progress_interval > 0) { +#ifdef FLB_SYSTEM_WINDOWS + InterlockedAdd(&ctx->reporter->failed_rows, event_chunk->total_events); +#else __sync_fetch_and_add(&ctx->reporter->failed_rows, event_chunk->total_events); +#endif } FLB_OUTPUT_RETURN(ret); } From 0bee5f4954a05596ff72530b1796919c698f7056 Mon Sep 17 00:00:00 2001 From: composer <2789706336@qq.com> Date: Wed, 6 Nov 2024 23:44:19 +0800 Subject: [PATCH 12/12] refactor: atomic add Signed-off-by: composer <2789706336@qq.com> --- plugins/out_doris/doris.c | 29 ++++++++++++----------------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/plugins/out_doris/doris.c b/plugins/out_doris/doris.c index b9025f80c08..d0e2c986118 100644 --- a/plugins/out_doris/doris.c +++ b/plugins/out_doris/doris.c @@ -46,6 +46,14 @@ #include #endif +static inline void sync_fetch_and_add(size_t *dest, size_t value) { +#ifdef FLB_SYSTEM_WINDOWS + InterlockedAdd(dest, value); +#else + __sync_fetch_and_add(dest, value); +#endif +} + static int cb_doris_init(struct flb_output_instance *ins, struct flb_config *config, void *data) { @@ -317,11 +325,7 @@ static void cb_doris_flush(struct flb_event_chunk *event_chunk, if (ret != FLB_OK) { if (ret == FLB_ERROR && ctx->log_progress_interval > 0) { -#ifdef FLB_SYSTEM_WINDOWS - InterlockedAdd(&ctx->reporter->failed_rows, event_chunk->total_events); -#else - __sync_fetch_and_add(&ctx->reporter->failed_rows, event_chunk->total_events); -#endif + sync_fetch_and_add(&ctx->reporter->failed_rows, event_chunk->total_events); } FLB_OUTPUT_RETURN(ret); } @@ -337,19 +341,10 @@ static void cb_doris_flush(struct flb_event_chunk *event_chunk, flb_sds_destroy(out_body); if (ret == FLB_OK && ctx->log_progress_interval > 0) { -#ifdef FLB_SYSTEM_WINDOWS - InterlockedAdd(&ctx->reporter->total_bytes, out_size); - InterlockedAdd(&ctx->reporter->total_rows, event_chunk->total_events); -#else - __sync_fetch_and_add(&ctx->reporter->total_bytes, out_size); - __sync_fetch_and_add(&ctx->reporter->total_rows, event_chunk->total_events); -#endif + sync_fetch_and_add(&ctx->reporter->total_bytes, out_size); + sync_fetch_and_add(&ctx->reporter->total_rows, event_chunk->total_events); } else if (ret == FLB_ERROR && ctx->log_progress_interval > 0) { -#ifdef FLB_SYSTEM_WINDOWS - InterlockedAdd(&ctx->reporter->failed_rows, event_chunk->total_events); -#else - __sync_fetch_and_add(&ctx->reporter->failed_rows, event_chunk->total_events); -#endif + sync_fetch_and_add(&ctx->reporter->failed_rows, event_chunk->total_events); } FLB_OUTPUT_RETURN(ret); }