diff --git a/ci/do-ut b/ci/do-ut index da49d2bbe29..4047bf3f95a 100755 --- a/ci/do-ut +++ b/ci/do-ut @@ -35,6 +35,9 @@ then SKIP="$SKIP -DFLB_WITHOUT_flb-it-parser=1" fi +# Disable fstore test on Travis +[[ ! -z "$TRAVIS" ]] && SKIP="$SKIP -DFLB_WITHOUT_flb-it-fstore=1" + # If no v6, disable that test [[ ! $(ip a) =~ ::1 ]] && SKIP="$SKIP -DFLB_WITHOUT_flb-it-network=1" diff --git a/cmake/windows-setup.cmake b/cmake/windows-setup.cmake index 8c011317927..7ae786485d9 100644 --- a/cmake/windows-setup.cmake +++ b/cmake/windows-setup.cmake @@ -10,6 +10,7 @@ set(FLB_EXAMPLES Yes) set(FLB_PARSER Yes) set(FLB_TLS Yes) set(FLB_AWS Yes) +set(FLB_HTTP_SERVER No) # INPUT plugins # ============= diff --git a/include/fluent-bit/flb_output.h b/include/fluent-bit/flb_output.h index 975584e2036..75bf4577d19 100644 --- a/include/fluent-bit/flb_output.h +++ b/include/fluent-bit/flb_output.h @@ -45,6 +45,8 @@ #include #include #include +#include +#include #ifdef FLB_HAVE_REGEX #include @@ -670,7 +672,11 @@ void flb_output_set_context(struct flb_output_instance *ins, void *context); int flb_output_instance_destroy(struct flb_output_instance *ins); int flb_output_init_all(struct flb_config *config); int flb_output_check(struct flb_config *config); + int flb_output_upstream_set(struct flb_upstream *u, struct flb_output_instance *ins); +int flb_output_upstream_ha_set(struct flb_upstream_ha *ha, + struct flb_output_instance *ins); + void flb_output_prepare(); int flb_output_set_http_debug_callbacks(struct flb_output_instance *ins); diff --git a/include/fluent-bit/flb_thread_storage.h b/include/fluent-bit/flb_thread_storage.h index f94325a25e4..c0e08f058f5 100644 --- a/include/fluent-bit/flb_thread_storage.h +++ b/include/fluent-bit/flb_thread_storage.h @@ -25,6 +25,8 @@ #ifdef FLB_SYSTEM_WINDOWS #include +#else +#include #endif /* Ideal case when the compiler support direct storage through __thread */ @@ -36,8 +38,6 @@ #else -#include - /* Fallback mode using pthread_*() for Thread-Local-Storage usage */ #define FLB_TLS_SET(key, val) pthread_setspecific(key, (void *) val) #define FLB_TLS_GET(key) pthread_getspecific(key) diff --git a/include/fluent-bit/flb_time.h b/include/fluent-bit/flb_time.h index ba68ea84910..2d36edf21ef 100644 --- a/include/fluent-bit/flb_time.h +++ b/include/fluent-bit/flb_time.h @@ -83,5 +83,6 @@ int flb_time_append_to_msgpack(struct flb_time *tm, msgpack_packer *pk, int fmt) int flb_time_msgpack_to_time(struct flb_time *time, msgpack_object *obj); int flb_time_pop_from_msgpack(struct flb_time *time, msgpack_unpacked *upk, msgpack_object **map); +long flb_time_tz_offset_to_second(); #endif /* FLB_TIME_H */ diff --git a/include/fluent-bit/flb_upstream.h b/include/fluent-bit/flb_upstream.h index 23cf5fdc8d6..c3f125ca5cb 100644 --- a/include/fluent-bit/flb_upstream.h +++ b/include/fluent-bit/flb_upstream.h @@ -57,8 +57,6 @@ struct flb_upstream { char *proxy_username; char *proxy_password; - int n_connections; - /* Networking setup for timeouts and network interfaces */ struct flb_net_setup net; diff --git a/plugins/in_tail/tail_sql.h b/plugins/in_tail/tail_sql.h index 6b7f00b0960..15db168af6e 100644 --- a/plugins/in_tail/tail_sql.h +++ b/plugins/in_tail/tail_sql.h @@ -38,7 +38,8 @@ " rotated INTEGER DEFAULT 0" \ ");" -#define SQL_GET_FILE "SELECT * from in_tail_files WHERE inode=@inode;" +#define SQL_GET_FILE \ + "SELECT * from in_tail_files WHERE inode=@inode order by id desc;" #define SQL_INSERT_FILE \ "INSERT INTO in_tail_files (name, offset, inode, created)" \ diff --git a/plugins/out_azure/azure_conf.c b/plugins/out_azure/azure_conf.c index 5795d5a63db..aadf01000d5 100644 --- a/plugins/out_azure/azure_conf.c +++ b/plugins/out_azure/azure_conf.c @@ -182,6 +182,7 @@ struct flb_azure *flb_azure_conf_create(struct flb_output_instance *ins, return NULL; } ctx->u = upstream; + flb_output_upstream_set(ctx->u, ins); /* Compose uri */ ctx->uri = flb_sds_create_size(1024); diff --git a/plugins/out_azure_blob/azure_blob_conf.c b/plugins/out_azure_blob/azure_blob_conf.c index 38ae79d9051..3319246d6c1 100644 --- a/plugins/out_azure_blob/azure_blob_conf.c +++ b/plugins/out_azure_blob/azure_blob_conf.c @@ -113,7 +113,7 @@ struct flb_azure_blob *flb_azure_blob_conf_create(struct flb_output_instance *in } /* Compress (gzip) */ - tmp = flb_output_get_property("compress", ins); + tmp = (char *) flb_output_get_property("compress", ins); ctx->compress_gzip = FLB_FALSE; if (tmp) { if (strcasecmp(tmp, "gzip") == 0) { @@ -149,7 +149,6 @@ struct flb_azure_blob *flb_azure_blob_conf_create(struct flb_output_instance *in flb_plg_error(ctx->ins, "invalid endpoint '%s'", ctx->endpoint); return NULL; } - ctx->real_endpoint = flb_sds_create(ctx->endpoint); } else { @@ -180,6 +179,7 @@ struct flb_azure_blob *flb_azure_blob_conf_create(struct flb_output_instance *in return NULL; } } + flb_output_upstream_set(ctx->u, ins); /* Compose base uri */ ctx->base_uri = flb_sds_create_size(256); diff --git a/plugins/out_bigquery/bigquery.c b/plugins/out_bigquery/bigquery.c index 716363a4c7a..3c4bfdb6d5c 100644 --- a/plugins/out_bigquery/bigquery.c +++ b/plugins/out_bigquery/bigquery.c @@ -311,6 +311,7 @@ static int cb_bigquery_init(struct flb_output_instance *ins, flb_plg_error(ctx->ins, "upstream creation failed"); return -1; } + flb_output_upstream_set(ctx->u, ins); /* Retrief oauth2 token */ token = get_google_token(ctx); diff --git a/plugins/out_datadog/datadog_conf.c b/plugins/out_datadog/datadog_conf.c index 9d65dbbcbb9..aa8cfbf321c 100644 --- a/plugins/out_datadog/datadog_conf.c +++ b/plugins/out_datadog/datadog_conf.c @@ -208,6 +208,7 @@ struct flb_out_datadog *flb_datadog_conf_create(struct flb_output_instance *ins, return NULL; } ctx->upstream = upstream; + flb_output_upstream_set(ctx->upstream, ins); return ctx; } diff --git a/plugins/out_es/es.c b/plugins/out_es/es.c index 0096c425a24..caccd5ceb0e 100644 --- a/plugins/out_es/es.c +++ b/plugins/out_es/es.c @@ -26,6 +26,7 @@ #include #include #include +#include #include #include @@ -335,30 +336,21 @@ static int elasticsearch_format(struct flb_config *config, es_index_custom_len = 0; if (ctx->logstash_prefix_key) { - for (i = 0; i < map_size; i++) { - key = map.via.map.ptr[i].key; - if (key.type != MSGPACK_OBJECT_STR) { - continue; + flb_sds_t v = flb_ra_translate(ctx->ra_prefix_key, + (char *) tag, tag_len, + map, NULL); + if (v) { + len = flb_sds_len(v); + if (len > 128) { + len = 128; + memcpy(logstash_index, v, 128); } - if (key.via.str.size != flb_sds_len(ctx->logstash_prefix_key)) { - continue; + else { + memcpy(logstash_index, v, len); } - if (strncmp(key.via.str.ptr, ctx->logstash_prefix_key, - flb_sds_len(ctx->logstash_prefix_key)) != 0) { - continue; - } - val = map.via.map.ptr[i].val; - if (val.type != MSGPACK_OBJECT_STR) { - continue; - } - if (val.via.str.size >= 128) { - continue; - } - es_index_custom = val.via.str.ptr; - es_index_custom_len = val.via.str.size; - memcpy(logstash_index, es_index_custom, es_index_custom_len); - logstash_index[es_index_custom_len] = '\0'; - break; + es_index_custom = v; + es_index_custom_len = len; + flb_sds_destroy(v); } } @@ -691,6 +683,9 @@ static void cb_es_flush(const void *data, size_t bytes, } #endif + /* Map debug callbacks */ + flb_http_client_debug(c, ctx->ins->callback); + ret = flb_http_do(c, &b_sent); if (ret != 0) { flb_plg_warn(ctx->ins, "http_do=%i URI=%s", ret, ctx->uri); @@ -725,7 +720,7 @@ static void cb_es_flush(const void *data, size_t bytes, * input/output to Elasticsearch that caused the problem. */ flb_plg_debug(ctx->ins, "error caused by: Input\n%s\n", - pack); + pack); flb_plg_error(ctx->ins, "error: Output\n%s", c->resp.payload); } @@ -771,24 +766,24 @@ static struct flb_config_map config_map[] = { { FLB_CONFIG_MAP_STR, "index", FLB_ES_DEFAULT_INDEX, 0, FLB_TRUE, offsetof(struct flb_elasticsearch, index), - NULL + "Set an index name" }, { FLB_CONFIG_MAP_STR, "type", FLB_ES_DEFAULT_TYPE, 0, FLB_TRUE, offsetof(struct flb_elasticsearch, type), - NULL + "Set the document type property" }, /* HTTP Authentication */ { FLB_CONFIG_MAP_STR, "http_user", NULL, 0, FLB_TRUE, offsetof(struct flb_elasticsearch, http_user), - NULL + "Optional username credential for Elastic X-Pack access" }, { FLB_CONFIG_MAP_STR, "http_passwd", "", 0, FLB_TRUE, offsetof(struct flb_elasticsearch, http_passwd), - NULL + "Password for user defined in HTTP_User" }, /* Cloud Authentication */ @@ -836,94 +831,115 @@ static struct flb_config_map config_map[] = { { FLB_CONFIG_MAP_BOOL, "logstash_format", "false", 0, FLB_TRUE, offsetof(struct flb_elasticsearch, logstash_format), - NULL + "Enable Logstash format compatibility" }, { FLB_CONFIG_MAP_STR, "logstash_prefix", FLB_ES_DEFAULT_PREFIX, 0, FLB_TRUE, offsetof(struct flb_elasticsearch, logstash_prefix), - NULL + "When Logstash_Format is enabled, the Index name is composed using a prefix " + "and the date, e.g: If Logstash_Prefix is equals to 'mydata' your index will " + "become 'mydata-YYYY.MM.DD'. The last string appended belongs to the date " + "when the data is being generated" }, { FLB_CONFIG_MAP_STR, "logstash_prefix_key", NULL, 0, FLB_TRUE, offsetof(struct flb_elasticsearch, logstash_prefix_key), - NULL + "When included: the value in the record that belongs to the key will be looked " + "up and over-write the Logstash_Prefix for index generation. If the key/value " + "is not found in the record then the Logstash_Prefix option will act as a " + "fallback. Nested keys are supported through record accessor pattern" }, { FLB_CONFIG_MAP_STR, "logstash_dateformat", FLB_ES_DEFAULT_TIME_FMT, 0, FLB_TRUE, offsetof(struct flb_elasticsearch, logstash_dateformat), - NULL + "Time format (based on strftime) to generate the second part of the Index name" }, /* Custom Time and Tag keys */ { FLB_CONFIG_MAP_STR, "time_key", FLB_ES_DEFAULT_TIME_KEY, 0, FLB_TRUE, offsetof(struct flb_elasticsearch, time_key), - NULL + "When Logstash_Format is enabled, each record will get a new timestamp field. " + "The Time_Key property defines the name of that field" }, { FLB_CONFIG_MAP_STR, "time_key_format", FLB_ES_DEFAULT_TIME_KEYF, 0, FLB_TRUE, offsetof(struct flb_elasticsearch, time_key_format), - NULL + "When Logstash_Format is enabled, this property defines the format of the " + "timestamp" }, { FLB_CONFIG_MAP_BOOL, "time_key_nanos", "false", 0, FLB_TRUE, offsetof(struct flb_elasticsearch, time_key_nanos), - NULL + "When Logstash_Format is enabled, enabling this property sends nanosecond " + "precision timestamps" }, { FLB_CONFIG_MAP_BOOL, "include_tag_key", "false", 0, FLB_TRUE, offsetof(struct flb_elasticsearch, include_tag_key), - NULL + "When enabled, it append the Tag name to the record" }, { FLB_CONFIG_MAP_STR, "tag_key", FLB_ES_DEFAULT_TAG_KEY, 0, FLB_TRUE, offsetof(struct flb_elasticsearch, tag_key), - NULL + "When Include_Tag_Key is enabled, this property defines the key name for the tag" }, { FLB_CONFIG_MAP_SIZE, "buffer_size", FLB_ES_DEFAULT_HTTP_MAX, 0, FLB_TRUE, offsetof(struct flb_elasticsearch, buffer_size), - NULL + "Specify the buffer size used to read the response from the Elasticsearch HTTP " + "service. This option is useful for debugging purposes where is required to read " + "full responses, note that response size grows depending of the number of records " + "inserted. To set an unlimited amount of memory set this value to 'false', " + "otherwise the value must be according to the Unit Size specification" }, /* Elasticsearch specifics */ { FLB_CONFIG_MAP_STR, "path", NULL, 0, FLB_FALSE, 0, - NULL + "Elasticsearch accepts new data on HTTP query path '/_bulk'. But it is also " + "possible to serve Elasticsearch behind a reverse proxy on a subpath. This " + "option defines such path on the fluent-bit side. It simply adds a path " + "prefix in the indexing HTTP POST URI" }, { FLB_CONFIG_MAP_STR, "pipeline", NULL, 0, FLB_FALSE, 0, - NULL + "Newer versions of Elasticsearch allows to setup filters called pipelines. " + "This option allows to define which pipeline the database should use. For " + "performance reasons is strongly suggested to do parsing and filtering on " + "Fluent Bit side, avoid pipelines" }, { FLB_CONFIG_MAP_BOOL, "generate_id", "false", 0, FLB_TRUE, offsetof(struct flb_elasticsearch, generate_id), - NULL + "When enabled, generate _id for outgoing records. This prevents duplicate " + "records when retrying ES" }, { FLB_CONFIG_MAP_BOOL, "replace_dots", "false", 0, FLB_TRUE, offsetof(struct flb_elasticsearch, replace_dots), - NULL + "When enabled, replace field name dots with underscore, required by Elasticsearch " + "2.0-2.3." }, { FLB_CONFIG_MAP_BOOL, "current_time_index", "false", 0, FLB_TRUE, offsetof(struct flb_elasticsearch, current_time_index), - NULL + "Use current time for index generation instead of message record" }, /* Trace */ { FLB_CONFIG_MAP_BOOL, "trace_output", "false", 0, FLB_TRUE, offsetof(struct flb_elasticsearch, trace_output), - NULL + "When enabled print the Elasticsearch API calls to stdout (for diag only)" }, { FLB_CONFIG_MAP_BOOL, "trace_error", "false", 0, FLB_TRUE, offsetof(struct flb_elasticsearch, trace_error), - NULL + "When enabled print the Elasticsearch exception to stderr (for diag only)" }, /* EOF */ diff --git a/plugins/out_es/es.h b/plugins/out_es/es.h index a433c25e6bb..168888d2273 100644 --- a/plugins/out_es/es.h +++ b/plugins/out_es/es.h @@ -106,6 +106,8 @@ struct flb_elasticsearch { /* Elasticsearch HTTP API */ char uri[256]; + struct flb_record_accessor *ra_prefix_key; + /* Upstream connection to the backend server */ struct flb_upstream *u; diff --git a/plugins/out_es/es_conf.c b/plugins/out_es/es_conf.c index 43d9eb7ba11..0cacd5df5ac 100644 --- a/plugins/out_es/es_conf.c +++ b/plugins/out_es/es_conf.c @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -111,9 +112,10 @@ static void set_cloud_credentials(struct flb_elasticsearch *ctx, struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, struct flb_config *config) { - + int len; int io_flags = 0; ssize_t ret; + char *buf; const char *tmp; const char *path; #ifdef FLB_HAVE_AWS @@ -230,6 +232,34 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, snprintf(ctx->uri, sizeof(ctx->uri) - 1, "%s/_bulk", path); } + + if (ctx->logstash_prefix_key) { + if (ctx->logstash_prefix_key[0] != '$') { + len = flb_sds_len(ctx->logstash_prefix_key); + buf = flb_malloc(len + 2); + if (!buf) { + flb_errno(); + flb_es_conf_destroy(ctx); + return NULL; + } + buf[0] = '$'; + memcpy(buf + 1, ctx->logstash_prefix_key, len); + buf[len + 1] = '\0'; + + ctx->ra_prefix_key = flb_ra_create(buf, FLB_TRUE); + flb_free(buf); + } + else { + ctx->ra_prefix_key = flb_ra_create(ctx->logstash_prefix_key, FLB_TRUE); + } + + if (!ctx->ra_prefix_key) { + flb_plg_error(ins, "invalid logstash_prefix_key pattern '%s'", tmp); + flb_es_conf_destroy(ctx); + return NULL; + } + } + #ifdef FLB_HAVE_AWS /* AWS Auth */ ctx->has_aws_auth = FLB_FALSE; @@ -375,6 +405,10 @@ int flb_es_conf_destroy(struct flb_elasticsearch *ctx) } #endif + if (ctx->ra_prefix_key) { + flb_ra_destroy(ctx->ra_prefix_key); + } + flb_free(ctx->cloud_passwd); flb_free(ctx->cloud_user); flb_free(ctx); diff --git a/plugins/out_forward/forward.c b/plugins/out_forward/forward.c index 70a21b74c36..24f50bc9e1f 100644 --- a/plugins/out_forward/forward.c +++ b/plugins/out_forward/forward.c @@ -746,6 +746,8 @@ static int forward_config_ha(const char *upstream_file, flb_upstream_node_set_data(fc, node); } + flb_output_upstream_ha_set(ctx->ha, ctx->ins); + return 0; } diff --git a/plugins/out_gelf/gelf.c b/plugins/out_gelf/gelf.c index d1030ea612f..21a628e6573 100644 --- a/plugins/out_gelf/gelf.c +++ b/plugins/out_gelf/gelf.c @@ -446,6 +446,7 @@ static int cb_gelf_init(struct flb_output_instance *ins, struct flb_config *conf flb_free(ctx); return -1; } + flb_output_upstream_set(ctx->u, ins); } /* Set the plugin context */ diff --git a/plugins/out_influxdb/influxdb.c b/plugins/out_influxdb/influxdb.c index d84ee892f50..6b8efd48541 100644 --- a/plugins/out_influxdb/influxdb.c +++ b/plugins/out_influxdb/influxdb.c @@ -426,6 +426,7 @@ static int cb_influxdb_init(struct flb_output_instance *ins, struct flb_config * } ctx->u = upstream; ctx->seq = 0; + flb_output_upstream_set(ctx->u, ins); flb_time_zero(&ctx->ts_dupe); flb_time_zero(&ctx->ts_last); diff --git a/plugins/out_kafka_rest/kafka_conf.c b/plugins/out_kafka_rest/kafka_conf.c index 3b8e29c4470..e0b160c7c12 100644 --- a/plugins/out_kafka_rest/kafka_conf.c +++ b/plugins/out_kafka_rest/kafka_conf.c @@ -71,6 +71,7 @@ struct flb_kafka_rest *flb_kr_conf_create(struct flb_output_instance *ins, return NULL; } ctx->u = upstream; + flb_output_upstream_set(ctx->u, ins); /* HTTP Auth */ tmp = flb_output_get_property("http_user", ins); diff --git a/plugins/out_logdna/logdna.c b/plugins/out_logdna/logdna.c index f3b9dbbf6ce..0d77efe2daf 100644 --- a/plugins/out_logdna/logdna.c +++ b/plugins/out_logdna/logdna.c @@ -324,6 +324,7 @@ static struct flb_logdna *logdna_config_create(struct flb_output_instance *ins, return NULL; } ctx->u = upstream; + flb_output_upstream_set(ctx->u, ins); /* Set networking defaults */ flb_output_net_default(FLB_LOGDNA_HOST, atoi(FLB_LOGDNA_PORT), ins); diff --git a/plugins/out_loki/loki.c b/plugins/out_loki/loki.c index ee13febf4e1..8d9ac15e438 100644 --- a/plugins/out_loki/loki.c +++ b/plugins/out_loki/loki.c @@ -568,6 +568,7 @@ static struct flb_loki *loki_config_create(struct flb_output_instance *ins, return NULL; } ctx->u = upstream; + flb_output_upstream_set(ctx->u, ins); ctx->tcp_port = ins->host.port; ctx->tcp_host = ins->host.name; diff --git a/plugins/out_nats/nats.c b/plugins/out_nats/nats.c index 7d09bf1037a..27b3bd244c2 100644 --- a/plugins/out_nats/nats.c +++ b/plugins/out_nats/nats.c @@ -62,6 +62,7 @@ static int cb_nats_init(struct flb_output_instance *ins, struct flb_config *conf } ctx->u = upstream; ctx->ins = ins; + flb_output_upstream_set(ctx->u, ins); flb_output_set_context(ins, ctx); return 0; diff --git a/plugins/out_nrlogs/newrelic.c b/plugins/out_nrlogs/newrelic.c index 1f99eee68e9..8165a5cb879 100644 --- a/plugins/out_nrlogs/newrelic.c +++ b/plugins/out_nrlogs/newrelic.c @@ -313,6 +313,7 @@ static struct flb_newrelic *newrelic_config_create(struct flb_output_instance *i return NULL; } ctx->u = upstream; + flb_output_upstream_set(ctx->u, ins); return ctx; } diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index ee7fb94c721..2d8106843f0 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -1200,17 +1200,18 @@ static void cb_s3_flush(const void *data, size_t bytes, void *out_context, struct flb_config *config) { - struct flb_s3 *ctx = out_context; - flb_sds_t json = NULL; - struct s3_file *chunk = NULL; - struct multipart_upload *m_upload = NULL; + int ret; + int len; + int timeout_check = FLB_FALSE; char *buffer = NULL; size_t buffer_size; - int timeout_check = FLB_FALSE; size_t chunk_size = 0; size_t upload_size = 0; - int ret; - int len; + flb_sds_t json = NULL; + struct s3_file *chunk = NULL; + struct flb_s3 *ctx = out_context; + struct multipart_upload *m_upload = NULL; + struct flb_sched *sched; (void) i_ins; (void) config; @@ -1242,7 +1243,8 @@ static void cb_s3_flush(const void *data, size_t bytes, "Creating upload timer with frequency %ds", ctx->timer_ms / 1000); - ret = flb_sched_timer_cb_create(config, FLB_SCHED_TIMER_CB_PERM, + sched = flb_sched_ctx_get(); + ret = flb_sched_timer_cb_create(sched, FLB_SCHED_TIMER_CB_PERM, ctx->timer_ms, cb_s3_upload, ctx); diff --git a/plugins/out_syslog/syslog.c b/plugins/out_syslog/syslog.c index e2ad3375b1e..c56f05c39e1 100644 --- a/plugins/out_syslog/syslog.c +++ b/plugins/out_syslog/syslog.c @@ -885,6 +885,7 @@ static int cb_syslog_init(struct flb_output_instance *ins, struct flb_config *co flb_syslog_config_destroy(ctx); return -1; } + flb_output_upstream_set(ctx->u, ins); } /* Set the plugin context */ diff --git a/plugins/out_td/td.c b/plugins/out_td/td.c index 6a8a9b51e11..bc4dcccd4eb 100644 --- a/plugins/out_td/td.c +++ b/plugins/out_td/td.c @@ -174,6 +174,7 @@ static int cb_td_init(struct flb_output_instance *ins, struct flb_config *config return -1; } ctx->u = upstream; + flb_output_upstream_set(ctx->u, ins); flb_output_set_context(ins, ctx); return 0; diff --git a/plugins/out_websocket/websocket_conf.c b/plugins/out_websocket/websocket_conf.c index e554291f642..7437c0ed1b3 100644 --- a/plugins/out_websocket/websocket_conf.c +++ b/plugins/out_websocket/websocket_conf.c @@ -142,7 +142,10 @@ struct flb_out_ws *flb_ws_conf_create(struct flb_output_instance *ins, ctx->host = ins->host.name; ctx->port = ins->host.port; ctx->idle_interval = idle_interval; - + + /* Set instance flags into upstream */ + flb_output_upstream_set(ctx->u, ins); + flb_info("[out_ws] we have following parameter %s, %s, %d, %d", ctx->uri, ctx->host, ctx->port, ctx->idle_interval); return ctx; } diff --git a/src/flb_fstore.c b/src/flb_fstore.c index fff56b8c694..0e96bf31ee9 100644 --- a/src/flb_fstore.c +++ b/src/flb_fstore.c @@ -143,6 +143,8 @@ struct flb_fstore_file *flb_fstore_file_create(struct flb_fstore *fs, flb_errno(); return NULL; } + fsf->stream = fs_stream->stream; + fsf->name = flb_sds_create(name); if (!fsf->name) { flb_error("[fstore] could not create file: %s:%s", @@ -162,7 +164,6 @@ struct flb_fstore_file *flb_fstore_file_create(struct flb_fstore *fs, } fsf->chunk = chunk; - fsf->stream = fs_stream->stream; mk_list_add(&fsf->_head, &fs_stream->files); return fsf; diff --git a/src/flb_output.c b/src/flb_output.c index 24ccba796f0..f84fe03afa5 100644 --- a/src/flb_output.c +++ b/src/flb_output.c @@ -1037,6 +1037,20 @@ int flb_output_upstream_set(struct flb_upstream *u, struct flb_output_instance * return 0; } +int flb_output_upstream_ha_set(struct flb_upstream_ha *ha, + struct flb_output_instance *ins) +{ + struct mk_list *head; + struct flb_upstream_node *node; + + mk_list_foreach(head, &ha->nodes) { + node = mk_list_entry(head, struct flb_upstream_node, _head); + flb_output_upstream_set(node->u, ins); + } + + return 0; +} + /* * Helper function to set HTTP callbacks using the output instance 'callback' * context. diff --git a/src/flb_time.c b/src/flb_time.c index a8275cac4c7..c398a12b0a7 100644 --- a/src/flb_time.c +++ b/src/flb_time.c @@ -237,3 +237,25 @@ int flb_time_pop_from_msgpack(struct flb_time *time, msgpack_unpacked *upk, ret = flb_time_msgpack_to_time(time, &obj); return ret; } + +long flb_time_tz_offset_to_second() +{ + time_t t = time(NULL); + struct tm local = *localtime(&t); + struct tm utc = *gmtime(&t); + + long diff = ((local.tm_hour - utc.tm_hour) \ + * 60 + (local.tm_min - utc.tm_min)) \ + * 60L + (local.tm_sec - utc.tm_sec); + + int delta_day = local.tm_mday - utc.tm_mday; + + if ((delta_day == 1) || (delta_day < -1)) { + diff += 24L * 60 * 60; + } + else if ((delta_day == -1) || (delta_day > 1)) { + diff -= 24L * 60 * 60; + } + + return diff; +} diff --git a/src/flb_upstream.c b/src/flb_upstream.c index 28a437b7605..b48ddce984b 100644 --- a/src/flb_upstream.c +++ b/src/flb_upstream.c @@ -212,7 +212,6 @@ struct flb_upstream *flb_upstream_create(struct flb_config *config, } u->flags = flags; - u->n_connections = 0; u->flags |= FLB_IO_ASYNC; u->thread_safe = FLB_FALSE; @@ -321,8 +320,6 @@ static int prepare_destroy_conn(struct flb_upstream_conn *u_conn) /* Add node to destroy queue */ mk_list_add(&u_conn->_head, &uq->destroy_queue); - u->n_connections--; - /* * note: the connection context is destroyed by the engine once all events * have been processed. @@ -403,9 +400,6 @@ static struct flb_upstream_conn *create_conn(struct flb_upstream *u) uq = flb_upstream_queue_get(u); mk_list_add(&conn->_head, &uq->busy_queue); - /* Increase counter */ - u->n_connections++; - if (u->thread_safe == FLB_TRUE) { pthread_mutex_unlock(&u->mutex_lists); } diff --git a/tests/internal/fstore.c b/tests/internal/fstore.c index 0cf6a29c920..1f4de61b567 100644 --- a/tests/internal/fstore.c +++ b/tests/internal/fstore.c @@ -57,6 +57,9 @@ void cb_all() fsf = flb_fstore_file_create(fs, st, "example.txt", 100); TEST_CHECK(fsf != NULL); + if (!fsf) { + return; + } ret = stat(FSF_STORE_PATH "/abc/example.txt", &st_data); TEST_CHECK(ret == 0); diff --git a/tests/internal/parser.c b/tests/internal/parser.c index 25b55f314b3..1a69d30d047 100644 --- a/tests/internal/parser.c +++ b/tests/internal/parser.c @@ -19,6 +19,9 @@ #define JSON_FMT_01 "{\"key001\": 12345, \"key002\": 0.99, \"time\": \"%s\"}" #define REGEX_FMT_01 "12345 0.99 %s" +#define isleap(y) ((y) % 4 == 0 && ((y) % 400 == 0 || (y) % 100 != 0)) +#define year2sec(y) (isleap(y) ? 31622400 : 31536000) + /* Timezone */ struct tz_check { char *val; @@ -167,6 +170,7 @@ static void load_regex_parsers(struct flb_config *config) void test_parser_time_lookup() { int i; + int j; int len; int ret; int toff; @@ -210,8 +214,9 @@ void test_parser_time_lookup() gmtime_r(&now, &tm_now); gmtime_r(&time_test, &tm_test); - if (tm_now.tm_year != tm_test.tm_year) { - year_diff = ((tm_now.tm_year - tm_test.tm_year) * 31536000); + year_diff = 0; + for (j = tm_test.tm_year; j < tm_now.tm_year; j++) { + year_diff += year2sec(tm_test.tm_mon < 2 ? j : j + 1); } } else { @@ -244,6 +249,7 @@ void test_parser_time_lookup() void test_json_parser_time_lookup() { int i; + int j; int ret; int len; int toff; @@ -289,8 +295,9 @@ void test_json_parser_time_lookup() gmtime_r(&time_now, &tm_now); gmtime_r(&time_test, &tm_test); - if (tm_now.tm_year != tm_test.tm_year) { - year_diff = ((tm_now.tm_year - tm_test.tm_year) * 31536000); + year_diff = 0; + for (j = tm_test.tm_year; j < tm_now.tm_year; j++) { + year_diff += year2sec(tm_test.tm_mon < 2 ? j : j + 1); } } else { @@ -327,6 +334,7 @@ void test_json_parser_time_lookup() void test_regex_parser_time_lookup() { int i; + int j; int ret; int len; int toff; @@ -372,8 +380,9 @@ void test_regex_parser_time_lookup() gmtime_r(&time_now, &tm_now); gmtime_r(&time_test, &tm_test); - if (tm_now.tm_year != tm_test.tm_year) { - year_diff = ((tm_now.tm_year - tm_test.tm_year) * 31536000); + year_diff = 0; + for (j = tm_test.tm_year; j < tm_now.tm_year; j++) { + year_diff += year2sec(tm_test.tm_mon < 2 ? j : j + 1); } } else {