Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge #5

Merged
merged 29 commits into from
Jan 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
6a26b66
out_es: add support for record accessor on 'logstash_prefix_key' (#1670)
edsiper Jan 25, 2021
e268365
Merge branch 'master' of github.com:fluent/fluent-bit
edsiper Jan 25, 2021
c7f5593
out_es: complete config map help description
edsiper Jan 25, 2021
01f3a59
out_websocket: set upstream flags using instance flags (#2965)
ginobiliwang Jan 25, 2021
4517086
time: add function to gather diff from local timezone from UTC
edsiper Jan 25, 2021
b0cf76e
out_s3: fix parameter for scheduler timer
edsiper Jan 25, 2021
9b84267
Merge branch 'master' of github.com:fluent/fluent-bit
edsiper Jan 25, 2021
8b1c132
fstore: link stream context before using it
edsiper Jan 25, 2021
b7acf73
tests: internal: fstore: do not continue tests if file creation failed
edsiper Jan 25, 2021
93979d9
ci: disable 'fstore' test on Travis
edsiper Jan 26, 2021
0cbcfd4
upstream: remove unused counter
edsiper Jan 26, 2021
007f93f
tests: parser: fix a test failure due to leap years (#2969)
fujimotos Jan 26, 2021
ea2eaf3
out_loki: fix network setting is not applied (#2971)
PoYuHsu Jan 26, 2021
47e66af
out_azure: register upstream with instance
edsiper Jan 26, 2021
e4919ce
out_azure_blob: register upstream with instance
edsiper Jan 26, 2021
bd30e2b
out_bigquery: register upstream with instance
edsiper Jan 26, 2021
850d468
out_datadog: register upstream with instance
edsiper Jan 26, 2021
be36dd7
output: new flb_output_upstream_ha_set() to setup HA mode
edsiper Jan 26, 2021
52a37cf
out_forward: register upstream HA with instance
edsiper Jan 26, 2021
aa140ed
out_gelf: register upstream with instance
edsiper Jan 26, 2021
b38375d
out_influxdb: register upstream with instance
edsiper Jan 26, 2021
adadea3
out_kafka_rest: register upstream with instance
edsiper Jan 26, 2021
19d339c
out_logdna: register upstream with instance
edsiper Jan 26, 2021
6d374a8
out_nats: register upstream with instance
edsiper Jan 26, 2021
0332cba
out_nrlogs: register upstream with instance
edsiper Jan 26, 2021
7b3d954
out_syslog: register upstream with instance
edsiper Jan 26, 2021
a479713
out_td: register upstream with instance
edsiper Jan 26, 2021
af4b25c
in_tail: use the latest id when finding offset from the db (#2960)
lee-byeoksan Jan 26, 2021
2642626
build: Disable FLB_HTTP_SERVER on Windows build
fujimotos Jan 26, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ci/do-ut
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ then
SKIP="$SKIP -DFLB_WITHOUT_flb-it-parser=1"
fi

# Disable fstore test on Travis
[[ ! -z "$TRAVIS" ]] && SKIP="$SKIP -DFLB_WITHOUT_flb-it-fstore=1"

# If no v6, disable that test
[[ ! $(ip a) =~ ::1 ]] && SKIP="$SKIP -DFLB_WITHOUT_flb-it-network=1"

Expand Down
1 change: 1 addition & 0 deletions cmake/windows-setup.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ set(FLB_EXAMPLES Yes)
set(FLB_PARSER Yes)
set(FLB_TLS Yes)
set(FLB_AWS Yes)
set(FLB_HTTP_SERVER No)

# INPUT plugins
# =============
Expand Down
6 changes: 6 additions & 0 deletions include/fluent-bit/flb_output.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
#include <fluent-bit/flb_http_client.h>
#include <fluent-bit/tls/flb_tls.h>
#include <fluent-bit/flb_output_thread.h>
#include <fluent-bit/flb_upstream.h>
#include <fluent-bit/flb_upstream_ha.h>

#ifdef FLB_HAVE_REGEX
#include <fluent-bit/flb_regex.h>
Expand Down Expand Up @@ -670,7 +672,11 @@ void flb_output_set_context(struct flb_output_instance *ins, void *context);
int flb_output_instance_destroy(struct flb_output_instance *ins);
int flb_output_init_all(struct flb_config *config);
int flb_output_check(struct flb_config *config);

int flb_output_upstream_set(struct flb_upstream *u, struct flb_output_instance *ins);
int flb_output_upstream_ha_set(struct flb_upstream_ha *ha,
struct flb_output_instance *ins);

void flb_output_prepare();
int flb_output_set_http_debug_callbacks(struct flb_output_instance *ins);

Expand Down
4 changes: 2 additions & 2 deletions include/fluent-bit/flb_thread_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

#ifdef FLB_SYSTEM_WINDOWS
#include <monkey/mk_core/external/winpthreads.h>
#else
#include <pthread.h>
#endif

/* Ideal case when the compiler support direct storage through __thread */
Expand All @@ -36,8 +38,6 @@

#else

#include <pthread.h>

/* Fallback mode using pthread_*() for Thread-Local-Storage usage */
#define FLB_TLS_SET(key, val) pthread_setspecific(key, (void *) val)
#define FLB_TLS_GET(key) pthread_getspecific(key)
Expand Down
1 change: 1 addition & 0 deletions include/fluent-bit/flb_time.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,6 @@ int flb_time_append_to_msgpack(struct flb_time *tm, msgpack_packer *pk, int fmt)
int flb_time_msgpack_to_time(struct flb_time *time, msgpack_object *obj);
int flb_time_pop_from_msgpack(struct flb_time *time, msgpack_unpacked *upk,
msgpack_object **map);
long flb_time_tz_offset_to_second();

#endif /* FLB_TIME_H */
2 changes: 0 additions & 2 deletions include/fluent-bit/flb_upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ struct flb_upstream {
char *proxy_username;
char *proxy_password;

int n_connections;

/* Networking setup for timeouts and network interfaces */
struct flb_net_setup net;

Expand Down
3 changes: 2 additions & 1 deletion plugins/in_tail/tail_sql.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
" rotated INTEGER DEFAULT 0" \
");"

#define SQL_GET_FILE "SELECT * from in_tail_files WHERE inode=@inode;"
#define SQL_GET_FILE \
"SELECT * from in_tail_files WHERE inode=@inode order by id desc;"

#define SQL_INSERT_FILE \
"INSERT INTO in_tail_files (name, offset, inode, created)" \
Expand Down
1 change: 1 addition & 0 deletions plugins/out_azure/azure_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ struct flb_azure *flb_azure_conf_create(struct flb_output_instance *ins,
return NULL;
}
ctx->u = upstream;
flb_output_upstream_set(ctx->u, ins);

/* Compose uri */
ctx->uri = flb_sds_create_size(1024);
Expand Down
4 changes: 2 additions & 2 deletions plugins/out_azure_blob/azure_blob_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ struct flb_azure_blob *flb_azure_blob_conf_create(struct flb_output_instance *in
}

/* Compress (gzip) */
tmp = flb_output_get_property("compress", ins);
tmp = (char *) flb_output_get_property("compress", ins);
ctx->compress_gzip = FLB_FALSE;
if (tmp) {
if (strcasecmp(tmp, "gzip") == 0) {
Expand Down Expand Up @@ -149,7 +149,6 @@ struct flb_azure_blob *flb_azure_blob_conf_create(struct flb_output_instance *in
flb_plg_error(ctx->ins, "invalid endpoint '%s'", ctx->endpoint);
return NULL;
}

ctx->real_endpoint = flb_sds_create(ctx->endpoint);
}
else {
Expand Down Expand Up @@ -180,6 +179,7 @@ struct flb_azure_blob *flb_azure_blob_conf_create(struct flb_output_instance *in
return NULL;
}
}
flb_output_upstream_set(ctx->u, ins);

/* Compose base uri */
ctx->base_uri = flb_sds_create_size(256);
Expand Down
1 change: 1 addition & 0 deletions plugins/out_bigquery/bigquery.c
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ static int cb_bigquery_init(struct flb_output_instance *ins,
flb_plg_error(ctx->ins, "upstream creation failed");
return -1;
}
flb_output_upstream_set(ctx->u, ins);

/* Retrief oauth2 token */
token = get_google_token(ctx);
Expand Down
1 change: 1 addition & 0 deletions plugins/out_datadog/datadog_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ struct flb_out_datadog *flb_datadog_conf_create(struct flb_output_instance *ins,
return NULL;
}
ctx->upstream = upstream;
flb_output_upstream_set(ctx->upstream, ins);

return ctx;
}
Expand Down
104 changes: 60 additions & 44 deletions plugins/out_es/es.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <fluent-bit/flb_time.h>
#include <fluent-bit/flb_signv4.h>
#include <fluent-bit/flb_aws_credentials.h>
#include <fluent-bit/flb_record_accessor.h>
#include <msgpack.h>

#include <time.h>
Expand Down Expand Up @@ -335,30 +336,21 @@ static int elasticsearch_format(struct flb_config *config,

es_index_custom_len = 0;
if (ctx->logstash_prefix_key) {
for (i = 0; i < map_size; i++) {
key = map.via.map.ptr[i].key;
if (key.type != MSGPACK_OBJECT_STR) {
continue;
flb_sds_t v = flb_ra_translate(ctx->ra_prefix_key,
(char *) tag, tag_len,
map, NULL);
if (v) {
len = flb_sds_len(v);
if (len > 128) {
len = 128;
memcpy(logstash_index, v, 128);
}
if (key.via.str.size != flb_sds_len(ctx->logstash_prefix_key)) {
continue;
else {
memcpy(logstash_index, v, len);
}
if (strncmp(key.via.str.ptr, ctx->logstash_prefix_key,
flb_sds_len(ctx->logstash_prefix_key)) != 0) {
continue;
}
val = map.via.map.ptr[i].val;
if (val.type != MSGPACK_OBJECT_STR) {
continue;
}
if (val.via.str.size >= 128) {
continue;
}
es_index_custom = val.via.str.ptr;
es_index_custom_len = val.via.str.size;
memcpy(logstash_index, es_index_custom, es_index_custom_len);
logstash_index[es_index_custom_len] = '\0';
break;
es_index_custom = v;
es_index_custom_len = len;
flb_sds_destroy(v);
}
}

Expand Down Expand Up @@ -691,6 +683,9 @@ static void cb_es_flush(const void *data, size_t bytes,
}
#endif

/* Map debug callbacks */
flb_http_client_debug(c, ctx->ins->callback);

ret = flb_http_do(c, &b_sent);
if (ret != 0) {
flb_plg_warn(ctx->ins, "http_do=%i URI=%s", ret, ctx->uri);
Expand Down Expand Up @@ -725,7 +720,7 @@ static void cb_es_flush(const void *data, size_t bytes,
* input/output to Elasticsearch that caused the problem.
*/
flb_plg_debug(ctx->ins, "error caused by: Input\n%s\n",
pack);
pack);
flb_plg_error(ctx->ins, "error: Output\n%s",
c->resp.payload);
}
Expand Down Expand Up @@ -771,24 +766,24 @@ static struct flb_config_map config_map[] = {
{
FLB_CONFIG_MAP_STR, "index", FLB_ES_DEFAULT_INDEX,
0, FLB_TRUE, offsetof(struct flb_elasticsearch, index),
NULL
"Set an index name"
},
{
FLB_CONFIG_MAP_STR, "type", FLB_ES_DEFAULT_TYPE,
0, FLB_TRUE, offsetof(struct flb_elasticsearch, type),
NULL
"Set the document type property"
},

/* HTTP Authentication */
{
FLB_CONFIG_MAP_STR, "http_user", NULL,
0, FLB_TRUE, offsetof(struct flb_elasticsearch, http_user),
NULL
"Optional username credential for Elastic X-Pack access"
},
{
FLB_CONFIG_MAP_STR, "http_passwd", "",
0, FLB_TRUE, offsetof(struct flb_elasticsearch, http_passwd),
NULL
"Password for user defined in HTTP_User"
},

/* Cloud Authentication */
Expand Down Expand Up @@ -836,94 +831,115 @@ static struct flb_config_map config_map[] = {
{
FLB_CONFIG_MAP_BOOL, "logstash_format", "false",
0, FLB_TRUE, offsetof(struct flb_elasticsearch, logstash_format),
NULL
"Enable Logstash format compatibility"
},
{
FLB_CONFIG_MAP_STR, "logstash_prefix", FLB_ES_DEFAULT_PREFIX,
0, FLB_TRUE, offsetof(struct flb_elasticsearch, logstash_prefix),
NULL
"When Logstash_Format is enabled, the Index name is composed using a prefix "
"and the date, e.g: If Logstash_Prefix is equals to 'mydata' your index will "
"become 'mydata-YYYY.MM.DD'. The last string appended belongs to the date "
"when the data is being generated"
},
{
FLB_CONFIG_MAP_STR, "logstash_prefix_key", NULL,
0, FLB_TRUE, offsetof(struct flb_elasticsearch, logstash_prefix_key),
NULL
"When included: the value in the record that belongs to the key will be looked "
"up and over-write the Logstash_Prefix for index generation. If the key/value "
"is not found in the record then the Logstash_Prefix option will act as a "
"fallback. Nested keys are supported through record accessor pattern"
},
{
FLB_CONFIG_MAP_STR, "logstash_dateformat", FLB_ES_DEFAULT_TIME_FMT,
0, FLB_TRUE, offsetof(struct flb_elasticsearch, logstash_dateformat),
NULL
"Time format (based on strftime) to generate the second part of the Index name"
},

/* Custom Time and Tag keys */
{
FLB_CONFIG_MAP_STR, "time_key", FLB_ES_DEFAULT_TIME_KEY,
0, FLB_TRUE, offsetof(struct flb_elasticsearch, time_key),
NULL
"When Logstash_Format is enabled, each record will get a new timestamp field. "
"The Time_Key property defines the name of that field"
},
{
FLB_CONFIG_MAP_STR, "time_key_format", FLB_ES_DEFAULT_TIME_KEYF,
0, FLB_TRUE, offsetof(struct flb_elasticsearch, time_key_format),
NULL
"When Logstash_Format is enabled, this property defines the format of the "
"timestamp"
},
{
FLB_CONFIG_MAP_BOOL, "time_key_nanos", "false",
0, FLB_TRUE, offsetof(struct flb_elasticsearch, time_key_nanos),
NULL
"When Logstash_Format is enabled, enabling this property sends nanosecond "
"precision timestamps"
},
{
FLB_CONFIG_MAP_BOOL, "include_tag_key", "false",
0, FLB_TRUE, offsetof(struct flb_elasticsearch, include_tag_key),
NULL
"When enabled, it append the Tag name to the record"
},
{
FLB_CONFIG_MAP_STR, "tag_key", FLB_ES_DEFAULT_TAG_KEY,
0, FLB_TRUE, offsetof(struct flb_elasticsearch, tag_key),
NULL
"When Include_Tag_Key is enabled, this property defines the key name for the tag"
},
{
FLB_CONFIG_MAP_SIZE, "buffer_size", FLB_ES_DEFAULT_HTTP_MAX,
0, FLB_TRUE, offsetof(struct flb_elasticsearch, buffer_size),
NULL
"Specify the buffer size used to read the response from the Elasticsearch HTTP "
"service. This option is useful for debugging purposes where is required to read "
"full responses, note that response size grows depending of the number of records "
"inserted. To set an unlimited amount of memory set this value to 'false', "
"otherwise the value must be according to the Unit Size specification"
},

/* Elasticsearch specifics */
{
FLB_CONFIG_MAP_STR, "path", NULL,
0, FLB_FALSE, 0,
NULL
"Elasticsearch accepts new data on HTTP query path '/_bulk'. But it is also "
"possible to serve Elasticsearch behind a reverse proxy on a subpath. This "
"option defines such path on the fluent-bit side. It simply adds a path "
"prefix in the indexing HTTP POST URI"
},
{
FLB_CONFIG_MAP_STR, "pipeline", NULL,
0, FLB_FALSE, 0,
NULL
"Newer versions of Elasticsearch allows to setup filters called pipelines. "
"This option allows to define which pipeline the database should use. For "
"performance reasons is strongly suggested to do parsing and filtering on "
"Fluent Bit side, avoid pipelines"
},
{
FLB_CONFIG_MAP_BOOL, "generate_id", "false",
0, FLB_TRUE, offsetof(struct flb_elasticsearch, generate_id),
NULL
"When enabled, generate _id for outgoing records. This prevents duplicate "
"records when retrying ES"
},
{
FLB_CONFIG_MAP_BOOL, "replace_dots", "false",
0, FLB_TRUE, offsetof(struct flb_elasticsearch, replace_dots),
NULL
"When enabled, replace field name dots with underscore, required by Elasticsearch "
"2.0-2.3."
},

{
FLB_CONFIG_MAP_BOOL, "current_time_index", "false",
0, FLB_TRUE, offsetof(struct flb_elasticsearch, current_time_index),
NULL
"Use current time for index generation instead of message record"
},

/* Trace */
{
FLB_CONFIG_MAP_BOOL, "trace_output", "false",
0, FLB_TRUE, offsetof(struct flb_elasticsearch, trace_output),
NULL
"When enabled print the Elasticsearch API calls to stdout (for diag only)"
},
{
FLB_CONFIG_MAP_BOOL, "trace_error", "false",
0, FLB_TRUE, offsetof(struct flb_elasticsearch, trace_error),
NULL
"When enabled print the Elasticsearch exception to stderr (for diag only)"
},

/* EOF */
Expand Down
2 changes: 2 additions & 0 deletions plugins/out_es/es.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ struct flb_elasticsearch {
/* Elasticsearch HTTP API */
char uri[256];

struct flb_record_accessor *ra_prefix_key;

/* Upstream connection to the backend server */
struct flb_upstream *u;

Expand Down
Loading