From 59ddf18c6872597c3693aa8bd7e51efe2e2b9f09 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Mon, 16 Aug 2021 17:18:49 -0600 Subject: [PATCH] out_calyptia: add support for agent patching and session store Signed-off-by: Eduardo Silva --- plugins/out_calyptia/calyptia.c | 233 +++++++++++++++++++++++++++++--- plugins/out_calyptia/calyptia.h | 28 ++-- 2 files changed, 232 insertions(+), 29 deletions(-) diff --git a/plugins/out_calyptia/calyptia.c b/plugins/out_calyptia/calyptia.c index eed99014fa3..7fd51b74866 100644 --- a/plugins/out_calyptia/calyptia.c +++ b/plugins/out_calyptia/calyptia.c @@ -26,6 +26,7 @@ #include #include #include +#include #include "calyptia.h" @@ -250,6 +251,16 @@ static int calyptia_http_do(struct flb_calyptia *ctx, struct flb_http_client *c, CALYPTIA_H_PROJECT, sizeof(CALYPTIA_H_PROJECT) - 1, ctx->api_key, flb_sds_len(ctx->api_key)); } + else if (type == CALYPTIA_ACTION_PATCH) { + flb_http_add_header(c, + CALYPTIA_H_CTYPE, sizeof(CALYPTIA_H_CTYPE) - 1, + CALYPTIA_H_CTYPE_JSON, sizeof(CALYPTIA_H_CTYPE_JSON) - 1); + + flb_http_add_header(c, + CALYPTIA_H_AGENT_TOKEN, + sizeof(CALYPTIA_H_AGENT_TOKEN) - 1, + ctx->agent_token, flb_sds_len(ctx->agent_token)); + } else if (type == CALYPTIA_ACTION_METRICS) { flb_http_add_header(c, CALYPTIA_H_CTYPE, sizeof(CALYPTIA_H_CTYPE) - 1, @@ -272,7 +283,7 @@ static int calyptia_http_do(struct flb_calyptia *ctx, struct flb_http_client *c, return FLB_RETRY; } - if (c->resp.status != 200 && c->resp.status != 201) { + if (c->resp.status != 200 && c->resp.status != 201 && c->resp.status != 204) { if (c->resp.payload_size > 0) { flb_plg_warn(ctx->ins, "http_status=%i:\n%s", c->resp.status, c->resp.payload); @@ -351,11 +362,146 @@ static flb_sds_t get_agent_info(char *buf, size_t size, char *k) return v; } +/* Set the session content */ +static int store_session_set(struct flb_calyptia *ctx, char *buf, size_t size) +{ + int ret; + int type; + char *mp_buf; + size_t mp_size; + + /* remove any previous session file */ + if (ctx->fs_file) { + flb_fstore_file_delete(ctx->fs, ctx->fs_file); + } + + /* create session file */ + ctx->fs_file = flb_fstore_file_create(ctx->fs, ctx->fs_stream, + CALYPTIA_SESSION_FILE, 1024); + if (!ctx->fs_file) { + flb_plg_error(ctx->ins, "could not create new session file"); + return -1; + } + + /* store meta */ + flb_fstore_file_meta_set(ctx->fs, ctx->fs_file, + FLB_VERSION_STR "\n", sizeof(FLB_VERSION_STR) - 1); + + /* encode */ + ret = flb_pack_json(buf, size, &mp_buf, &mp_size, &type); + if (ret < 0) { + flb_plg_error(ctx->ins, "could not encode session information"); + return -1; + } + + /* store content */ + ret = flb_fstore_file_append(ctx->fs_file, mp_buf, mp_size); + if (ret == -1) { + flb_plg_error(ctx->ins, "could not store session information"); + flb_free(mp_buf); + return -1; + } + + flb_free(mp_buf); + return 0; +} + +static int store_session_get(struct flb_calyptia *ctx, + void **out_buf, size_t *out_size) +{ + int ret; + void *buf; + size_t size; + flb_sds_t json; + + ret = flb_fstore_file_content_copy(ctx->fs, ctx->fs_file, + &buf, &size); + + if (size == 0) { + return -1; + } + + /* decode */ + json = flb_msgpack_raw_to_json_sds(buf, size); + flb_free(buf); + if (!json) { + return -1; + } + + *out_buf = json; + *out_size = flb_sds_len(json); + + return ret; +} + +static int store_init(struct flb_calyptia *ctx) +{ + int ret; + struct flb_fstore *fs; + struct flb_fstore_file *fsf; + void *buf; + size_t size; + + /* store context */ + fs = flb_fstore_create(ctx->store_path, FLB_FSTORE_FS); + if (!fs) { + flb_plg_error(ctx->ins, + "could not initialize 'store_path': %s", + ctx->store_path); + return -1; + } + ctx->fs = fs; + + /* stream */ + ctx->fs_stream = flb_fstore_stream_create(ctx->fs, "calyptia"); + if (!ctx->fs_stream) { + flb_plg_error(ctx->ins, "could not create storage stream"); + return -1; + } + + /* lookup any previous file */ + fsf = flb_fstore_file_get(ctx->fs, ctx->fs_stream, CALYPTIA_SESSION_FILE, + sizeof(CALYPTIA_SESSION_FILE) - 1); + if (!fsf) { + flb_plg_debug(ctx->ins, "no session file was found"); + return 0; + } + ctx->fs_file = fsf; + + /* retrieve session info */ + ret = store_session_get(ctx, &buf, &size); + if (ret == 0) { + /* agent id */ + ctx->agent_id = get_agent_info(buf, size, "id"); + + /* agent token */ + ctx->agent_token = get_agent_info(buf, size, "token"); + + if (ctx->agent_id && ctx->agent_token) { + flb_plg_info(ctx->ins, "session setup OK"); + } + else { + if (ctx->agent_id) { + flb_sds_destroy(ctx->agent_id); + } + if (ctx->agent_token) { + flb_sds_destroy(ctx->agent_token); + } + } + flb_sds_destroy(buf); + } + + return 0; +} + /* Agent creation is perform on initialization using a sync upstream connection */ static int api_agent_create(struct flb_config *config, struct flb_calyptia *ctx) { int ret; + int flb_ret; int flags; + int action = CALYPTIA_ACTION_REGISTER; + char uri[1024]; flb_sds_t meta; struct flb_upstream *u; struct flb_upstream_conn *u_conn; @@ -391,31 +537,61 @@ static int api_agent_create(struct flb_config *config, struct flb_calyptia *ctx) return -1; } - /* Compose HTTP Client request */ - c = flb_http_client(u_conn, FLB_HTTP_POST, CALYPTIA_ENDPOINT_CREATE, - meta, flb_sds_len(meta), NULL, 0, NULL, 0); + if (ctx->agent_id && ctx->agent_token) { + /* Patch */ + action = CALYPTIA_ACTION_PATCH; + snprintf(uri, sizeof(uri) - 1, CALYPTIA_ENDPOINT_PATCH, ctx->agent_id); + c = flb_http_client(u_conn, FLB_HTTP_PATCH, uri, + meta, flb_sds_len(meta), NULL, 0, NULL, 0); + } + else { + /* Create */ + action = CALYPTIA_ACTION_REGISTER; + c = flb_http_client(u_conn, FLB_HTTP_POST, CALYPTIA_ENDPOINT_CREATE, + meta, flb_sds_len(meta), NULL, 0, NULL, 0); + } + if (!c) { flb_upstream_conn_release(u_conn); flb_upstream_destroy(u); return -1; } - /* perform requst */ - ret = calyptia_http_do(ctx, c, CALYPTIA_ACTION_REGISTER); - if (ret == FLB_OK && (c->resp.status == 200 || c->resp.status == 201)) { + /* perform request */ + flb_ret = calyptia_http_do(ctx, c, action); + if (flb_ret == FLB_OK && + (c->resp.status == 200 || c->resp.status == 201 || c->resp.status == 204)) { if (c->resp.payload_size > 0) { - /* agent id */ - ctx->agent_id = get_agent_info(c->resp.payload, - c->resp.payload_size, - "id"); - - /* agent token */ - ctx->agent_token = get_agent_info(c->resp.payload, - c->resp.payload_size, - "token"); - - flb_plg_info(ctx->ins, "connected to Calyptia, agent_id='%s'", - ctx->agent_id); + if (action == CALYPTIA_ACTION_REGISTER) { + /* agent id */ + ctx->agent_id = get_agent_info(c->resp.payload, + c->resp.payload_size, + "id"); + + /* agent token */ + ctx->agent_token = get_agent_info(c->resp.payload, + c->resp.payload_size, + "token"); + + if (ctx->agent_id && ctx->agent_token) { + flb_plg_info(ctx->ins, "connected to Calyptia, agent_id='%s'", + ctx->agent_id); + + if (ctx->store_path && ctx->fs) { + ret = store_session_set(ctx, + c->resp.payload, + c->resp.payload_size); + if (ret == -1) { + flb_plg_warn(ctx->ins, + "could not store Calyptia session"); + } + } + } + } + } + + if (action == CALYPTIA_ACTION_PATCH) { + flb_plg_info(ctx->ins, "known agent registration successful"); } } @@ -425,7 +601,7 @@ static int api_agent_create(struct flb_config *config, struct flb_calyptia *ctx) flb_upstream_conn_release(u_conn); flb_upstream_destroy(u); - return ret; + return flb_ret; } static struct flb_calyptia *config_init(struct flb_output_instance *ins, @@ -473,6 +649,14 @@ static struct flb_calyptia *config_init(struct flb_output_instance *ins, /* Set context */ flb_output_set_context(ins, ctx); + /* Initialize optional storage */ + if (ctx->store_path) { + ret = store_init(ctx); + if (ret == -1) { + return NULL; + } + } + /* machine id */ ret = get_machine_id(ctx, &machine_id, &size); if (ret == -1) { @@ -658,6 +842,9 @@ static int cb_calyptia_exit(void *data, struct flb_config *config) flb_sds_destroy(ctx->metrics_endpoint); } + if (ctx->fs) { + flb_fstore_destroy(ctx->fs); + } flb_free(ctx); return 0; @@ -683,6 +870,12 @@ static struct flb_config_map config_map[] = { "Calyptia Cloud API Key." }, + { + FLB_CONFIG_MAP_STR, "store_path", NULL, + 0, FLB_TRUE, offsetof(struct flb_calyptia, store_path), + "" + }, + /* EOF */ {0} }; diff --git a/plugins/out_calyptia/calyptia.h b/plugins/out_calyptia/calyptia.h index 3bc9cf5c8af..3cc7a826c8a 100644 --- a/plugins/out_calyptia/calyptia.h +++ b/plugins/out_calyptia/calyptia.h @@ -24,6 +24,7 @@ #include #include #include +#include #include /* End point */ @@ -32,12 +33,17 @@ /* HTTP action types */ #define CALYPTIA_ACTION_REGISTER 0 -#define CALYPTIA_ACTION_METRICS 1 +#define CALYPTIA_ACTION_PATCH 1 +#define CALYPTIA_ACTION_METRICS 2 /* Endpoints */ #define CALYPTIA_ENDPOINT_CREATE "/v1/agents" +#define CALYPTIA_ENDPOINT_PATCH "/v1/agents/%s" #define CALYPTIA_ENDPOINT_METRICS "/v1/agents/%s/metrics" +/* Storage */ +#define CALYPTIA_SESSION_FILE "session.CALYPTIA" + /* Headers */ #define CALYPTIA_H_PROJECT "X-Project-Token" #define CALYPTIA_H_AGENT_TOKEN "X-Agent-Token" @@ -48,20 +54,24 @@ struct flb_calyptia { /* config map */ int cloud_port; - flb_sds_t cloud_host; flb_sds_t api_key; + flb_sds_t cloud_host; + flb_sds_t store_path; struct mk_list *add_labels; /* internal */ flb_sds_t agent_id; flb_sds_t agent_token; - flb_sds_t machine_id; /* machine-id */ - flb_sds_t metrics_endpoint; /* metrics endpoint */ - struct flb_env *env; /* environment */ - struct flb_upstream *u; /* upstream connection */ - struct mk_list kv_labels; /* parsed add_labels */ - struct flb_output_instance *ins; /* plugin instance */ - struct flb_config *config; /* Fluent Bit context */ + flb_sds_t machine_id; /* machine-id */ + flb_sds_t metrics_endpoint; /* metrics endpoint */ + struct flb_fstore *fs; /* fstore ctx */ + struct flb_fstore_stream *fs_stream; /* fstore stream */ + struct flb_fstore_file *fs_file; /* fstore session file */ + struct flb_env *env; /* environment */ + struct flb_upstream *u; /* upstream connection */ + struct mk_list kv_labels; /* parsed add_labels */ + struct flb_output_instance *ins; /* plugin instance */ + struct flb_config *config; /* Fluent Bit context */ }; #endif \ No newline at end of file