Skip to content

Commit

Permalink
Merge pull request #5 from fluent/master
Browse files Browse the repository at this point in the history
merge
  • Loading branch information
DrewZhang13 authored Jan 27, 2021
2 parents b69a379 + 2642626 commit f25180f
Show file tree
Hide file tree
Showing 32 changed files with 198 additions and 74 deletions.
3 changes: 3 additions & 0 deletions ci/do-ut
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ then
SKIP="$SKIP -DFLB_WITHOUT_flb-it-parser=1"
fi

# Disable fstore test on Travis
[[ ! -z "$TRAVIS" ]] && SKIP="$SKIP -DFLB_WITHOUT_flb-it-fstore=1"

# If no v6, disable that test
[[ ! $(ip a) =~ ::1 ]] && SKIP="$SKIP -DFLB_WITHOUT_flb-it-network=1"

Expand Down
1 change: 1 addition & 0 deletions cmake/windows-setup.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ set(FLB_EXAMPLES Yes)
set(FLB_PARSER Yes)
set(FLB_TLS Yes)
set(FLB_AWS Yes)
set(FLB_HTTP_SERVER No)

# INPUT plugins
# =============
Expand Down
6 changes: 6 additions & 0 deletions include/fluent-bit/flb_output.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
#include <fluent-bit/flb_http_client.h>
#include <fluent-bit/tls/flb_tls.h>
#include <fluent-bit/flb_output_thread.h>
#include <fluent-bit/flb_upstream.h>
#include <fluent-bit/flb_upstream_ha.h>

#ifdef FLB_HAVE_REGEX
#include <fluent-bit/flb_regex.h>
Expand Down Expand Up @@ -670,7 +672,11 @@ void flb_output_set_context(struct flb_output_instance *ins, void *context);
int flb_output_instance_destroy(struct flb_output_instance *ins);
int flb_output_init_all(struct flb_config *config);
int flb_output_check(struct flb_config *config);

int flb_output_upstream_set(struct flb_upstream *u, struct flb_output_instance *ins);
int flb_output_upstream_ha_set(struct flb_upstream_ha *ha,
struct flb_output_instance *ins);

void flb_output_prepare();
int flb_output_set_http_debug_callbacks(struct flb_output_instance *ins);

Expand Down
4 changes: 2 additions & 2 deletions include/fluent-bit/flb_thread_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

#ifdef FLB_SYSTEM_WINDOWS
#include <monkey/mk_core/external/winpthreads.h>
#else
#include <pthread.h>
#endif

/* Ideal case when the compiler support direct storage through __thread */
Expand All @@ -36,8 +38,6 @@

#else

#include <pthread.h>

/* Fallback mode using pthread_*() for Thread-Local-Storage usage */
#define FLB_TLS_SET(key, val) pthread_setspecific(key, (void *) val)
#define FLB_TLS_GET(key) pthread_getspecific(key)
Expand Down
1 change: 1 addition & 0 deletions include/fluent-bit/flb_time.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,6 @@ int flb_time_append_to_msgpack(struct flb_time *tm, msgpack_packer *pk, int fmt)
int flb_time_msgpack_to_time(struct flb_time *time, msgpack_object *obj);
int flb_time_pop_from_msgpack(struct flb_time *time, msgpack_unpacked *upk,
msgpack_object **map);
long flb_time_tz_offset_to_second();

#endif /* FLB_TIME_H */
2 changes: 0 additions & 2 deletions include/fluent-bit/flb_upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ struct flb_upstream {
char *proxy_username;
char *proxy_password;

int n_connections;

/* Networking setup for timeouts and network interfaces */
struct flb_net_setup net;

Expand Down
3 changes: 2 additions & 1 deletion plugins/in_tail/tail_sql.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
" rotated INTEGER DEFAULT 0" \
");"

#define SQL_GET_FILE "SELECT * from in_tail_files WHERE inode=@inode;"
#define SQL_GET_FILE \
"SELECT * from in_tail_files WHERE inode=@inode order by id desc;"

#define SQL_INSERT_FILE \
"INSERT INTO in_tail_files (name, offset, inode, created)" \
Expand Down
1 change: 1 addition & 0 deletions plugins/out_azure/azure_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ struct flb_azure *flb_azure_conf_create(struct flb_output_instance *ins,
return NULL;
}
ctx->u = upstream;
flb_output_upstream_set(ctx->u, ins);

/* Compose uri */
ctx->uri = flb_sds_create_size(1024);
Expand Down
4 changes: 2 additions & 2 deletions plugins/out_azure_blob/azure_blob_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ struct flb_azure_blob *flb_azure_blob_conf_create(struct flb_output_instance *in
}

/* Compress (gzip) */
tmp = flb_output_get_property("compress", ins);
tmp = (char *) flb_output_get_property("compress", ins);
ctx->compress_gzip = FLB_FALSE;
if (tmp) {
if (strcasecmp(tmp, "gzip") == 0) {
Expand Down Expand Up @@ -149,7 +149,6 @@ struct flb_azure_blob *flb_azure_blob_conf_create(struct flb_output_instance *in
flb_plg_error(ctx->ins, "invalid endpoint '%s'", ctx->endpoint);
return NULL;
}

ctx->real_endpoint = flb_sds_create(ctx->endpoint);
}
else {
Expand Down Expand Up @@ -180,6 +179,7 @@ struct flb_azure_blob *flb_azure_blob_conf_create(struct flb_output_instance *in
return NULL;
}
}
flb_output_upstream_set(ctx->u, ins);

/* Compose base uri */
ctx->base_uri = flb_sds_create_size(256);
Expand Down
1 change: 1 addition & 0 deletions plugins/out_bigquery/bigquery.c
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ static int cb_bigquery_init(struct flb_output_instance *ins,
flb_plg_error(ctx->ins, "upstream creation failed");
return -1;
}
flb_output_upstream_set(ctx->u, ins);

/* Retrief oauth2 token */
token = get_google_token(ctx);
Expand Down
1 change: 1 addition & 0 deletions plugins/out_datadog/datadog_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ struct flb_out_datadog *flb_datadog_conf_create(struct flb_output_instance *ins,
return NULL;
}
ctx->upstream = upstream;
flb_output_upstream_set(ctx->upstream, ins);

return ctx;
}
Expand Down
104 changes: 60 additions & 44 deletions plugins/out_es/es.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <fluent-bit/flb_time.h>
#include <fluent-bit/flb_signv4.h>
#include <fluent-bit/flb_aws_credentials.h>
#include <fluent-bit/flb_record_accessor.h>
#include <msgpack.h>

#include <time.h>
Expand Down Expand Up @@ -335,30 +336,21 @@ static int elasticsearch_format(struct flb_config *config,

es_index_custom_len = 0;
if (ctx->logstash_prefix_key) {
for (i = 0; i < map_size; i++) {
key = map.via.map.ptr[i].key;
if (key.type != MSGPACK_OBJECT_STR) {
continue;
flb_sds_t v = flb_ra_translate(ctx->ra_prefix_key,
(char *) tag, tag_len,
map, NULL);
if (v) {
len = flb_sds_len(v);
if (len > 128) {
len = 128;
memcpy(logstash_index, v, 128);
}
if (key.via.str.size != flb_sds_len(ctx->logstash_prefix_key)) {
continue;
else {
memcpy(logstash_index, v, len);
}
if (strncmp(key.via.str.ptr, ctx->logstash_prefix_key,
flb_sds_len(ctx->logstash_prefix_key)) != 0) {
continue;
}
val = map.via.map.ptr[i].val;
if (val.type != MSGPACK_OBJECT_STR) {
continue;
}
if (val.via.str.size >= 128) {
continue;
}
es_index_custom = val.via.str.ptr;
es_index_custom_len = val.via.str.size;
memcpy(logstash_index, es_index_custom, es_index_custom_len);
logstash_index[es_index_custom_len] = '\0';
break;
es_index_custom = v;
es_index_custom_len = len;
flb_sds_destroy(v);
}
}

Expand Down Expand Up @@ -691,6 +683,9 @@ static void cb_es_flush(const void *data, size_t bytes,
}
#endif

/* Map debug callbacks */
flb_http_client_debug(c, ctx->ins->callback);

ret = flb_http_do(c, &b_sent);
if (ret != 0) {
flb_plg_warn(ctx->ins, "http_do=%i URI=%s", ret, ctx->uri);
Expand Down Expand Up @@ -725,7 +720,7 @@ static void cb_es_flush(const void *data, size_t bytes,
* input/output to Elasticsearch that caused the problem.
*/
flb_plg_debug(ctx->ins, "error caused by: Input\n%s\n",
pack);
pack);
flb_plg_error(ctx->ins, "error: Output\n%s",
c->resp.payload);
}
Expand Down Expand Up @@ -771,24 +766,24 @@ static struct flb_config_map config_map[] = {
{
FLB_CONFIG_MAP_STR, "index", FLB_ES_DEFAULT_INDEX,
0, FLB_TRUE, offsetof(struct flb_elasticsearch, index),
NULL
"Set an index name"
},
{
FLB_CONFIG_MAP_STR, "type", FLB_ES_DEFAULT_TYPE,
0, FLB_TRUE, offsetof(struct flb_elasticsearch, type),
NULL
"Set the document type property"
},

/* HTTP Authentication */
{
FLB_CONFIG_MAP_STR, "http_user", NULL,
0, FLB_TRUE, offsetof(struct flb_elasticsearch, http_user),
NULL
"Optional username credential for Elastic X-Pack access"
},
{
FLB_CONFIG_MAP_STR, "http_passwd", "",
0, FLB_TRUE, offsetof(struct flb_elasticsearch, http_passwd),
NULL
"Password for user defined in HTTP_User"
},

/* Cloud Authentication */
Expand Down Expand Up @@ -836,94 +831,115 @@ static struct flb_config_map config_map[] = {
{
FLB_CONFIG_MAP_BOOL, "logstash_format", "false",
0, FLB_TRUE, offsetof(struct flb_elasticsearch, logstash_format),
NULL
"Enable Logstash format compatibility"
},
{
FLB_CONFIG_MAP_STR, "logstash_prefix", FLB_ES_DEFAULT_PREFIX,
0, FLB_TRUE, offsetof(struct flb_elasticsearch, logstash_prefix),
NULL
"When Logstash_Format is enabled, the Index name is composed using a prefix "
"and the date, e.g: If Logstash_Prefix is equals to 'mydata' your index will "
"become 'mydata-YYYY.MM.DD'. The last string appended belongs to the date "
"when the data is being generated"
},
{
FLB_CONFIG_MAP_STR, "logstash_prefix_key", NULL,
0, FLB_TRUE, offsetof(struct flb_elasticsearch, logstash_prefix_key),
NULL
"When included: the value in the record that belongs to the key will be looked "
"up and over-write the Logstash_Prefix for index generation. If the key/value "
"is not found in the record then the Logstash_Prefix option will act as a "
"fallback. Nested keys are supported through record accessor pattern"
},
{
FLB_CONFIG_MAP_STR, "logstash_dateformat", FLB_ES_DEFAULT_TIME_FMT,
0, FLB_TRUE, offsetof(struct flb_elasticsearch, logstash_dateformat),
NULL
"Time format (based on strftime) to generate the second part of the Index name"
},

/* Custom Time and Tag keys */
{
FLB_CONFIG_MAP_STR, "time_key", FLB_ES_DEFAULT_TIME_KEY,
0, FLB_TRUE, offsetof(struct flb_elasticsearch, time_key),
NULL
"When Logstash_Format is enabled, each record will get a new timestamp field. "
"The Time_Key property defines the name of that field"
},
{
FLB_CONFIG_MAP_STR, "time_key_format", FLB_ES_DEFAULT_TIME_KEYF,
0, FLB_TRUE, offsetof(struct flb_elasticsearch, time_key_format),
NULL
"When Logstash_Format is enabled, this property defines the format of the "
"timestamp"
},
{
FLB_CONFIG_MAP_BOOL, "time_key_nanos", "false",
0, FLB_TRUE, offsetof(struct flb_elasticsearch, time_key_nanos),
NULL
"When Logstash_Format is enabled, enabling this property sends nanosecond "
"precision timestamps"
},
{
FLB_CONFIG_MAP_BOOL, "include_tag_key", "false",
0, FLB_TRUE, offsetof(struct flb_elasticsearch, include_tag_key),
NULL
"When enabled, it append the Tag name to the record"
},
{
FLB_CONFIG_MAP_STR, "tag_key", FLB_ES_DEFAULT_TAG_KEY,
0, FLB_TRUE, offsetof(struct flb_elasticsearch, tag_key),
NULL
"When Include_Tag_Key is enabled, this property defines the key name for the tag"
},
{
FLB_CONFIG_MAP_SIZE, "buffer_size", FLB_ES_DEFAULT_HTTP_MAX,
0, FLB_TRUE, offsetof(struct flb_elasticsearch, buffer_size),
NULL
"Specify the buffer size used to read the response from the Elasticsearch HTTP "
"service. This option is useful for debugging purposes where is required to read "
"full responses, note that response size grows depending of the number of records "
"inserted. To set an unlimited amount of memory set this value to 'false', "
"otherwise the value must be according to the Unit Size specification"
},

/* Elasticsearch specifics */
{
FLB_CONFIG_MAP_STR, "path", NULL,
0, FLB_FALSE, 0,
NULL
"Elasticsearch accepts new data on HTTP query path '/_bulk'. But it is also "
"possible to serve Elasticsearch behind a reverse proxy on a subpath. This "
"option defines such path on the fluent-bit side. It simply adds a path "
"prefix in the indexing HTTP POST URI"
},
{
FLB_CONFIG_MAP_STR, "pipeline", NULL,
0, FLB_FALSE, 0,
NULL
"Newer versions of Elasticsearch allows to setup filters called pipelines. "
"This option allows to define which pipeline the database should use. For "
"performance reasons is strongly suggested to do parsing and filtering on "
"Fluent Bit side, avoid pipelines"
},
{
FLB_CONFIG_MAP_BOOL, "generate_id", "false",
0, FLB_TRUE, offsetof(struct flb_elasticsearch, generate_id),
NULL
"When enabled, generate _id for outgoing records. This prevents duplicate "
"records when retrying ES"
},
{
FLB_CONFIG_MAP_BOOL, "replace_dots", "false",
0, FLB_TRUE, offsetof(struct flb_elasticsearch, replace_dots),
NULL
"When enabled, replace field name dots with underscore, required by Elasticsearch "
"2.0-2.3."
},

{
FLB_CONFIG_MAP_BOOL, "current_time_index", "false",
0, FLB_TRUE, offsetof(struct flb_elasticsearch, current_time_index),
NULL
"Use current time for index generation instead of message record"
},

/* Trace */
{
FLB_CONFIG_MAP_BOOL, "trace_output", "false",
0, FLB_TRUE, offsetof(struct flb_elasticsearch, trace_output),
NULL
"When enabled print the Elasticsearch API calls to stdout (for diag only)"
},
{
FLB_CONFIG_MAP_BOOL, "trace_error", "false",
0, FLB_TRUE, offsetof(struct flb_elasticsearch, trace_error),
NULL
"When enabled print the Elasticsearch exception to stderr (for diag only)"
},

/* EOF */
Expand Down
2 changes: 2 additions & 0 deletions plugins/out_es/es.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ struct flb_elasticsearch {
/* Elasticsearch HTTP API */
char uri[256];

struct flb_record_accessor *ra_prefix_key;

/* Upstream connection to the backend server */
struct flb_upstream *u;

Expand Down
Loading

0 comments on commit f25180f

Please sign in to comment.