Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

in_calyptia_fleet: improve configuration reloading. #7925

Merged
merged 7 commits into from
Sep 18, 2023
169 changes: 139 additions & 30 deletions plugins/in_calyptia_fleet/in_calyptia_fleet.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
#define CALYPTIA_H_CTYPE "Content-Type"
#define CALYPTIA_H_CTYPE_JSON "application/json"

#define DEFAULT_INTERVAL_SEC "3"
#define DEFAULT_INTERVAL_SEC "15"
#define DEFAULT_INTERVAL_NSEC "0"

#define CALYPTIA_HOST "cloud-api.calyptia.com"
Expand All @@ -61,6 +61,11 @@ struct flb_in_calyptia_fleet_config {
int interval_sec;
int interval_nsec;

/* Grabbed from the cfg_path, used to check if configuration has
* has been updated.
*/
long config_timestamp;

flb_sds_t api_key;
flb_sds_t fleet_id;
flb_sds_t fleet_name;
Expand Down Expand Up @@ -103,12 +108,12 @@ static char *find_case_header(struct flb_http_client *cli, const char *header)

ptr+=2;

// no space left for header
/* no space left for header */
if (ptr + strlen(header)+2 >= cli->resp.payload) {
return NULL;
}

// matched header and the delimiter
/* matched header and the delimiter */
if (strncasecmp(ptr, header, strlen(header)) == 0) {

if (ptr[strlen(header)] == ':' && ptr[strlen(header)+1] == ' ') {
Expand Down Expand Up @@ -159,6 +164,11 @@ static int case_header_lookup(struct flb_http_client *cli,
return -1;
}

/* sanity check that the header_len does not exceed the headers. */
if (ptr + header_len + 2 > end) {
return -1;
}

ptr += header_len + 2;

*out_val = ptr;
Expand Down Expand Up @@ -257,13 +267,49 @@ static int is_cur_fleet_config(struct flb_in_calyptia_fleet_config *ctx, struct
return ret;
}

static int is_timestamped_fleet_config(struct flb_in_calyptia_fleet_config *ctx, struct flb_config *cfg)
{
char *fname;
char *end;
long val;

if (cfg->conf_path_file == NULL) {
return FLB_FALSE;
}

fname = strrchr(cfg->conf_path_file, PATH_SEPARATOR[0]);

if (fname == NULL) {
return FLB_FALSE;
}

fname++;

errno = 0;
val = strtol(fname, &end, 10);

if ((errno == ERANGE && (val == LONG_MAX || val == LONG_MIN)) ||
(errno != 0 && val == 0)) {
flb_errno();
return FLB_FALSE;
}

if (strcmp(end, ".ini") == 0) {
return FLB_TRUE;
}

return FLB_FALSE;
}

static int is_fleet_config(struct flb_in_calyptia_fleet_config *ctx, struct flb_config *cfg)
{
if (cfg->conf_path_file == NULL) {
return FLB_FALSE;
}

return is_new_fleet_config(ctx, cfg) || is_cur_fleet_config(ctx, cfg);
return is_new_fleet_config(ctx, cfg) ||
is_cur_fleet_config(ctx, cfg) ||
is_timestamped_fleet_config(ctx, cfg);
}

static int exists_new_fleet_config(struct flb_in_calyptia_fleet_config *ctx)
Expand Down Expand Up @@ -296,7 +342,7 @@ static void *do_reload(void *data)
{
struct reload_ctx *reload = (struct reload_ctx *)data;

// avoid reloading the current configuration... just use our new one!
/* avoid reloading the current configuration... just use our new one! */
flb_context_set(reload->flb);
reload->flb->config->enable_hot_reload = FLB_TRUE;
reload->flb->config->conf_path_file = reload->cfg_path;
Expand Down Expand Up @@ -359,19 +405,33 @@ static int execute_reload(struct flb_in_calyptia_fleet_config *ctx, flb_sds_t cf
pthread_attr_t ptha;
flb_ctx_t *flb = flb_context_get();

if (ctx->collect_fd > 0) {
flb_input_collector_pause(ctx->collect_fd, ctx->ins);
}

if (flb == NULL) {
flb_plg_error(ctx->ins, "unable to get fluent-bit context.");

if (ctx->collect_fd > 0) {
flb_input_collector_resume(ctx->collect_fd, ctx->ins);
}

return FLB_FALSE;
}

// fix execution in valgrind...
// otherwise flb_reload errors out with:
// [error] [reload] given flb context is NULL
/* fix execution in valgrind...
* otherwise flb_reload errors out with:
* [error] [reload] given flb context is NULL
*/
flb_plg_info(ctx->ins, "loading configuration from %s.", cfgpath);

if (test_config_is_valid(cfgpath) == FLB_FALSE) {
flb_plg_error(ctx->ins, "unable to load configuration.");

if (ctx->collect_fd > 0) {
flb_input_collector_resume(ctx->collect_fd, ctx->ins);
}

return FLB_FALSE;
}

Expand Down Expand Up @@ -447,8 +507,7 @@ static flb_sds_t parse_api_key_json(struct flb_in_calyptia_fleet_config *ctx,
}

project_id = flb_sds_create_len(cur->val.via.str.ptr,
cur->val.via.str.size);

cur->val.via.str.size);
msgpack_unpacked_destroy(&result);
flb_free(pack);

Expand Down Expand Up @@ -724,7 +783,6 @@ static int in_calyptia_fleet_collect(struct flb_input_instance *ins,
goto http_error;
}

// Wed, 21 Oct 2015 07:28:00 GMT
flb_strptime(fbit_last_modified, "%a, %d %B %Y %H:%M:%S GMT", &tm_last_modified);
time_last_modified = mktime(&tm_last_modified.tm);

Expand Down Expand Up @@ -807,22 +865,24 @@ static int in_calyptia_fleet_collect(struct flb_input_instance *ins,
flb_sds_destroy(cfgoldname);
}

link(cfgname, cfgnewname);
symlink(cfgname, cfgnewname);
}

// FORCE THE RELOAD!!!
if (ctx->config_timestamp < time_last_modified) {
flb_plg_debug(ctx->ins, "new configuration is newer than current: %ld < %ld",
ctx->config_timestamp, time_last_modified);
flb_plg_info(ctx->ins, "force the reloading of the configuration file=%d.", ctx->event_fd);
flb_sds_destroy(cfgname);
flb_sds_destroy(data);

if (execute_reload(ctx, cfgnewname) == FLB_FALSE) {
if (execute_reload(ctx, cfgname) == FLB_FALSE) {
cfgoldname = old_fleet_config_filename(ctx);
cfgcurname = cur_fleet_config_filename(ctx);
rename(cfgoldname, cfgcurname);
flb_sds_destroy(cfgcurname);
flb_sds_destroy(cfgoldname);
goto reload_error;
}
else {
else {
FLB_INPUT_RETURN(0);
}
}
Expand All @@ -838,10 +898,11 @@ static int in_calyptia_fleet_collect(struct flb_input_instance *ins,
FLB_INPUT_RETURN(ret);
}

// recursively create directories, based on:
// https://stackoverflow.com/a/2336245
// who found it at:
// http://nion.modprobe.de/blog/archives/357-Recursive-directory-creation.html
/* recursively create directories, based on:
* https://stackoverflow.com/a/2336245
* who found it at:
* http://nion.modprobe.de/blog/archives/357-Recursive-directory-creation.html
*/
static int _mkdir(const char *dir, int perms) {
char tmp[255];
char *ptr = NULL;
Expand Down Expand Up @@ -907,23 +968,62 @@ static int create_fleet_directory(struct flb_in_calyptia_fleet_config *ctx)
static int load_fleet_config(struct flb_in_calyptia_fleet_config *ctx)
{
flb_ctx_t *flb_ctx = flb_context_get();

char *fname;
char *ext;
long timestamp;
char realname[4096];
ssize_t len;

if (create_fleet_directory(ctx) != 0) {
return -1;
}

// check if we are already using the fleet configuration file.
/* check if we are already using the fleet configuration file. */
if (is_fleet_config(ctx, flb_ctx->config) == FLB_FALSE) {
// check which one and load it
/* check which one and load it */
if (exists_cur_fleet_config(ctx) == FLB_TRUE) {
execute_reload(ctx, cur_fleet_config_filename(ctx));
return execute_reload(ctx, cur_fleet_config_filename(ctx));
}
else if (exists_new_fleet_config(ctx) == FLB_TRUE) {
execute_reload(ctx, new_fleet_config_filename(ctx));
else if (exists_new_fleet_config(ctx) == FLB_TRUE) {
return execute_reload(ctx, new_fleet_config_filename(ctx));
}
}
return 0;
else {
if (is_new_fleet_config(ctx, flb_ctx->config) || is_cur_fleet_config(ctx, flb_ctx->config)) {
len = readlink(flb_ctx->config->conf_path_file, realname, sizeof(realname));

if (len > sizeof(realname)) {
return FLB_FALSE;
}

fname = basename(realname);
}
else {
fname = basename(flb_ctx->config->conf_path_file);
}

if (fname == NULL) {
return FLB_FALSE;
}

errno = 0;
timestamp = strtol(fname, &ext, 10);
pwhelan marked this conversation as resolved.
Show resolved Hide resolved

if ((errno == ERANGE && (timestamp == LONG_MAX || timestamp == LONG_MIN)) ||
(errno != 0 && timestamp == 0)) {
flb_errno();
return FLB_FALSE;
}

/* unable to parse the timstamp */
if (errno == ERANGE) {
return FLB_FALSE;
}

ctx->config_timestamp = timestamp;
}

return FLB_FALSE;
}

static int in_calyptia_fleet_init(struct flb_input_instance *in,
Expand Down Expand Up @@ -953,6 +1053,7 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in,
return -1;
}
ctx->ins = in;
ctx->collect_fd = -1;


/* Load the config map */
Expand Down Expand Up @@ -1004,9 +1105,18 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in,
ctx->interval_nsec = atoi(DEFAULT_INTERVAL_NSEC);
}

if (ctx->interval_sec < atoi(DEFAULT_INTERVAL_SEC)) {
ctx->interval_sec = atoi(DEFAULT_INTERVAL_SEC);
}

/* Set the context */
flb_input_set_context(in, ctx);


/* if we load a new configuration then we will be reloaded anyways */
if (load_fleet_config(ctx) == FLB_TRUE) {
return 0;
}

/* Set our collector based on time */
ret = flb_input_set_collector_time(in,
in_calyptia_fleet_collect,
Expand All @@ -1015,14 +1125,13 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in,
config);

if (ret == -1) {
flb_plg_error(ctx->ins, "could not set collector for Health input plugin");
flb_plg_error(ctx->ins, "could not initialize collector for fleet input plugin");
flb_free(ctx);
return -1;
}

ctx->collect_fd = ret;

load_fleet_config(ctx);
return 0;
}

Expand Down
Loading