Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

in_calyptia_fleet: improve configuration reloading. #7925

Merged
merged 7 commits into from
Sep 18, 2023
117 changes: 102 additions & 15 deletions plugins/in_calyptia_fleet/in_calyptia_fleet.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
#define CALYPTIA_H_CTYPE "Content-Type"
#define CALYPTIA_H_CTYPE_JSON "application/json"

#define DEFAULT_INTERVAL_SEC "3"
#define DEFAULT_INTERVAL_SEC "15"
#define DEFAULT_INTERVAL_NSEC "0"

#define CALYPTIA_HOST "cloud-api.calyptia.com"
Expand All @@ -61,6 +61,11 @@ struct flb_in_calyptia_fleet_config {
int interval_sec;
int interval_nsec;

/* Grabbed from the cfg_path, used to check if configuration has
* has been updated.
*/
long config_timestamp;

flb_sds_t api_key;
flb_sds_t fleet_id;
flb_sds_t fleet_name;
Expand Down Expand Up @@ -257,13 +262,40 @@ static int is_cur_fleet_config(struct flb_in_calyptia_fleet_config *ctx, struct
return ret;
}

static int is_timestamped_fleet_config(struct flb_in_calyptia_fleet_config *ctx, struct flb_config *cfg)
{
char *fname;
char *end;

if (cfg->conf_path_file == NULL) {
return FLB_FALSE;
}

fname = strrchr(cfg->conf_path_file, PATH_SEPARATOR[0]);

if (fname == NULL) {
return FLB_FALSE;
}

fname++;
(void)strtol(fname, &end, 10);
pwhelan marked this conversation as resolved.
Show resolved Hide resolved

if (strcmp(end, ".ini") == 0) {
return FLB_TRUE;
}

return FLB_FALSE;
}

static int is_fleet_config(struct flb_in_calyptia_fleet_config *ctx, struct flb_config *cfg)
{
if (cfg->conf_path_file == NULL) {
return FLB_FALSE;
}

return is_new_fleet_config(ctx, cfg) || is_cur_fleet_config(ctx, cfg);
return is_new_fleet_config(ctx, cfg) ||
is_cur_fleet_config(ctx, cfg) ||
is_timestamped_fleet_config(ctx, cfg);
}

static int exists_new_fleet_config(struct flb_in_calyptia_fleet_config *ctx)
Expand Down Expand Up @@ -359,9 +391,17 @@ static int execute_reload(struct flb_in_calyptia_fleet_config *ctx, flb_sds_t cf
pthread_attr_t ptha;
flb_ctx_t *flb = flb_context_get();

if (ctx->collect_fd > 0) {
flb_input_collector_pause(ctx->collect_fd, ctx->ins);
}

if (flb == NULL) {
flb_plg_error(ctx->ins, "unable to get fluent-bit context.");

if (ctx->collect_fd > 0) {
flb_input_collector_resume(ctx->collect_fd, ctx->ins);
}

return FLB_FALSE;
}

Expand All @@ -372,6 +412,11 @@ static int execute_reload(struct flb_in_calyptia_fleet_config *ctx, flb_sds_t cf

if (test_config_is_valid(cfgpath) == FLB_FALSE) {
flb_plg_error(ctx->ins, "unable to load configuration.");

if (ctx->collect_fd > 0) {
flb_input_collector_resume(ctx->collect_fd, ctx->ins);
}

return FLB_FALSE;
}

Expand Down Expand Up @@ -447,8 +492,7 @@ static flb_sds_t parse_api_key_json(struct flb_in_calyptia_fleet_config *ctx,
}

project_id = flb_sds_create_len(cur->val.via.str.ptr,
cur->val.via.str.size);

cur->val.via.str.size);
msgpack_unpacked_destroy(&result);
flb_free(pack);

Expand Down Expand Up @@ -807,14 +851,17 @@ static int in_calyptia_fleet_collect(struct flb_input_instance *ins,
flb_sds_destroy(cfgoldname);
}

link(cfgname, cfgnewname);
symlink(cfgname, cfgnewname);
}

if (ctx->config_timestamp < time_last_modified) {
flb_plg_debug(ctx->ins, "new configuration is newer than current: %d < %d",
ctx->config_timestamp, time_last_modified);
// FORCE THE RELOAD!!!
flb_plg_info(ctx->ins, "force the reloading of the configuration file=%d.", ctx->event_fd);
flb_sds_destroy(cfgname);
flb_sds_destroy(data);

if (execute_reload(ctx, cfgnewname) == FLB_FALSE) {
if (execute_reload(ctx, cfgname) == FLB_FALSE) {
cfgoldname = old_fleet_config_filename(ctx);
cfgcurname = cur_fleet_config_filename(ctx);
rename(cfgoldname, cfgcurname);
Expand Down Expand Up @@ -907,7 +954,11 @@ static int create_fleet_directory(struct flb_in_calyptia_fleet_config *ctx)
static int load_fleet_config(struct flb_in_calyptia_fleet_config *ctx)
{
flb_ctx_t *flb_ctx = flb_context_get();

char *fname;
char *ext;
long timestamp;
char realname[4096];
ssize_t len;

if (create_fleet_directory(ctx) != 0) {
return -1;
Expand All @@ -917,13 +968,40 @@ static int load_fleet_config(struct flb_in_calyptia_fleet_config *ctx)
if (is_fleet_config(ctx, flb_ctx->config) == FLB_FALSE) {
// check which one and load it
pwhelan marked this conversation as resolved.
Show resolved Hide resolved
if (exists_cur_fleet_config(ctx) == FLB_TRUE) {
execute_reload(ctx, cur_fleet_config_filename(ctx));
return execute_reload(ctx, cur_fleet_config_filename(ctx));
} else if (exists_new_fleet_config(ctx) == FLB_TRUE) {
pwhelan marked this conversation as resolved.
Show resolved Hide resolved
return execute_reload(ctx, new_fleet_config_filename(ctx));
}
}
else {
if (is_new_fleet_config(ctx, flb_ctx->config) || is_cur_fleet_config(ctx, flb_ctx->config)) {
len = readlink(flb_ctx->config->conf_path_file, realname, sizeof(realname));

if (len > sizeof(realname)) {
return FLB_FALSE;
}

fname = basename(realname);
}
else {
fname = basename(flb_ctx->config->conf_path_file);
}

if (fname == NULL) {
return FLB_FALSE;
}
else if (exists_new_fleet_config(ctx) == FLB_TRUE) {
execute_reload(ctx, new_fleet_config_filename(ctx));

timestamp = strtol(fname, &ext, 10);
pwhelan marked this conversation as resolved.
Show resolved Hide resolved

/* unable to parse the timstamp */
if (errno == ERANGE) {
return FLB_FALSE;
}

ctx->config_timestamp = timestamp;
}
return 0;

return FLB_FALSE;
}

static int in_calyptia_fleet_init(struct flb_input_instance *in,
Expand Down Expand Up @@ -953,6 +1031,7 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in,
return -1;
}
ctx->ins = in;
ctx->collect_fd = -1;


/* Load the config map */
Expand Down Expand Up @@ -1004,9 +1083,18 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in,
ctx->interval_nsec = atoi(DEFAULT_INTERVAL_NSEC);
}

if (ctx->interval_sec < atoi(DEFAULT_INTERVAL_SEC)) {
ctx->interval_sec = atoi(DEFAULT_INTERVAL_SEC);
}

/* Set the context */
flb_input_set_context(in, ctx);


/* if we load a new configuration then we will be reloaded anyways */
if (load_fleet_config(ctx) == FLB_TRUE) {
return 0;
}

/* Set our collector based on time */
ret = flb_input_set_collector_time(in,
in_calyptia_fleet_collect,
Expand All @@ -1015,14 +1103,13 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in,
config);

if (ret == -1) {
flb_plg_error(ctx->ins, "could not set collector for Health input plugin");
flb_plg_error(ctx->ins, "could not initialize collector for fleet input plugin");
flb_free(ctx);
return -1;
}

ctx->collect_fd = ret;

load_fleet_config(ctx);
return 0;
}

Expand Down
Loading