diff --git a/plugins/out_cloudwatch_logs/cloudwatch_api.c b/plugins/out_cloudwatch_logs/cloudwatch_api.c index d99377689c2..0b5a4e5a9ac 100644 --- a/plugins/out_cloudwatch_logs/cloudwatch_api.c +++ b/plugins/out_cloudwatch_logs/cloudwatch_api.c @@ -63,6 +63,13 @@ static struct flb_aws_header create_group_header = { .val_len = 28, }; +static struct flb_aws_header put_retention_policy_header = { + .key = "X-Amz-Target", + .key_len = 12, + .val = "Logs_20140328.PutRetentionPolicy", + .val_len = 32, +}; + static struct flb_aws_header create_stream_header = { .key = "X-Amz-Target", .key_len = 12, @@ -1013,6 +1020,83 @@ struct log_stream *get_log_stream(struct flb_cloudwatch *ctx, return get_dynamic_log_stream(ctx, tag, tag_len); } +static int set_log_group_retention(struct flb_cloudwatch *ctx) +{ + if (ctx->log_retention_days <= 0) { + /* no need to set */ + return 0; + } + + struct flb_http_client *c = NULL; + struct flb_aws_client *cw_client; + flb_sds_t body; + flb_sds_t tmp; + flb_sds_t error; + + flb_plg_info(ctx->ins, "Set retention policy on log group %s to %dd", ctx->log_group, ctx->log_retention_days); + + body = flb_sds_create_size(68 + strlen(ctx->log_group)); + if (!body) { + flb_sds_destroy(body); + flb_errno(); + return -1; + } + + /* construct CreateLogGroup request body */ + tmp = flb_sds_printf(&body, "{\"logGroupName\":\"%s\",\"retentionInDays\":%d}", ctx->log_group, ctx->log_retention_days); + if (!tmp) { + flb_sds_destroy(body); + flb_errno(); + return -1; + } + body = tmp; + + if (plugin_under_test() == FLB_TRUE) { + c = mock_http_call("TEST_PUT_RETENTION_POLICY_ERROR", "PutRetentionPolicy"); + } + else { + cw_client = ctx->cw_client; + c = cw_client->client_vtable->request(cw_client, FLB_HTTP_POST, + "/", body, strlen(body), + &put_retention_policy_header, 1); + } + + if (c) { + flb_plg_debug(ctx->ins, "PutRetentionPolicy http status=%d", c->resp.status); + + if (c->resp.status == 200) { + /* success */ + flb_plg_info(ctx->ins, "Set retention policy to %d", ctx->log_retention_days); + flb_sds_destroy(body); + flb_http_client_destroy(c); + return 0; + } + + /* Check error */ + if (c->resp.payload_size > 0) { + error = flb_aws_error(c->resp.payload, c->resp.payload_size); + if (error != NULL) { + /* some other error occurred; notify user */ + flb_aws_print_error(c->resp.payload, c->resp.payload_size, + "PutRetentionPolicy", ctx->ins); + flb_sds_destroy(error); + } + else { + /* error can not be parsed, print raw response to debug */ + flb_plg_debug(ctx->ins, "Raw response: %s", c->resp.payload); + } + } + } + + flb_plg_error(ctx->ins, "Failed to putRetentionPolicy"); + if (c) { + flb_http_client_destroy(c); + } + flb_sds_destroy(body); + + return 0; +} + int create_log_group(struct flb_cloudwatch *ctx) { struct flb_http_client *c = NULL; @@ -1058,7 +1142,7 @@ int create_log_group(struct flb_cloudwatch *ctx) ctx->group_created = FLB_TRUE; flb_sds_destroy(body); flb_http_client_destroy(c); - return 0; + return set_log_group_retention(ctx); } /* Check error */ diff --git a/plugins/out_cloudwatch_logs/cloudwatch_logs.c b/plugins/out_cloudwatch_logs/cloudwatch_logs.c index 71c521acc0b..477252cb462 100644 --- a/plugins/out_cloudwatch_logs/cloudwatch_logs.c +++ b/plugins/out_cloudwatch_logs/cloudwatch_logs.c @@ -157,6 +157,12 @@ static int cb_cloudwatch_init(struct flb_output_instance *ins, ctx->create_group = FLB_TRUE; } + ctx->log_retention_days = 0; + tmp = flb_output_get_property("log_retention_days", ins); + if (tmp) { + ctx->log_retention_days = atoi(tmp); + } + tmp = flb_output_get_property("role_arn", ins); if (tmp) { ctx->role_arn = tmp; @@ -528,6 +534,13 @@ static struct flb_config_map config_map[] = { " be created)" }, + { + FLB_CONFIG_MAP_INT, "log_retention_days", "0", + 0, FLB_FALSE, 0, + "If set to a number greater than zero, and newly create log group's " + "retention policy is set to this many days." + }, + { FLB_CONFIG_MAP_STR, "endpoint", NULL, 0, FLB_FALSE, 0, diff --git a/plugins/out_cloudwatch_logs/cloudwatch_logs.h b/plugins/out_cloudwatch_logs/cloudwatch_logs.h index aa68177d388..a21740a74a9 100644 --- a/plugins/out_cloudwatch_logs/cloudwatch_logs.h +++ b/plugins/out_cloudwatch_logs/cloudwatch_logs.h @@ -121,6 +121,9 @@ struct flb_cloudwatch { /* Should the plugin create the log group */ int create_group; + /* If set to a number greater than zero, and newly create log group's retention policy is set to this many days. */ + int log_retention_days; + /* has the log group successfully been created */ int group_created; diff --git a/tests/runtime/out_cloudwatch.c b/tests/runtime/out_cloudwatch.c index 861e729721d..fc54e4c380b 100644 --- a/tests/runtime/out_cloudwatch.c +++ b/tests/runtime/out_cloudwatch.c @@ -231,6 +231,122 @@ void flb_test_cloudwatch_error_put_log_events(void) flb_destroy(ctx); } +void flb_test_cloudwatch_put_retention_policy_success(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + /* mocks calls- signals that we are in test mode */ + setenv("FLB_CLOUDWATCH_PLUGIN_UNDER_TEST", "true", 1); + + ctx = flb_create(); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx,in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "cloudwatch_logs", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd,"match", "test", NULL); + flb_output_set(ctx, out_ffd,"region", "us-west-2", NULL); + flb_output_set(ctx, out_ffd,"log_group_name", "fluent", NULL); + flb_output_set(ctx, out_ffd,"log_stream_prefix", "from-fluent-", NULL); + flb_output_set(ctx, out_ffd,"auto_create_group", "On", NULL); + flb_output_set(ctx, out_ffd,"log_retention_days", "14", NULL); + flb_output_set(ctx, out_ffd,"net.keepalive", "Off", NULL); + flb_output_set(ctx, out_ffd,"Retry_Limit", "1", NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + flb_lib_push(ctx, in_ffd, (char *) JSON_TD , (int) sizeof(JSON_TD) - 1); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_cloudwatch_already_exists_create_group_put_retention_policy(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + /* mocks calls- signals that we are in test mode */ + setenv("FLB_CLOUDWATCH_PLUGIN_UNDER_TEST", "true", 1); + setenv("TEST_CREATE_LOG_GROUP_ERROR", ERROR_ALREADY_EXISTS, 1); + + /* PutRetentionPolicy is not called if the group already exists */ + setenv("TEST_PUT_RETENTION_POLICY_ERROR", ERROR_UNKNOWN, 1); + + ctx = flb_create(); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx,in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "cloudwatch_logs", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd,"match", "test", NULL); + flb_output_set(ctx, out_ffd,"region", "us-west-2", NULL); + flb_output_set(ctx, out_ffd,"log_group_name", "fluent", NULL); + flb_output_set(ctx, out_ffd,"log_stream_prefix", "from-fluent-", NULL); + flb_output_set(ctx, out_ffd,"auto_create_group", "On", NULL); + flb_output_set(ctx, out_ffd,"log_retention_days", "14", NULL); + flb_output_set(ctx, out_ffd,"net.keepalive", "Off", NULL); + flb_output_set(ctx, out_ffd,"Retry_Limit", "1", NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + flb_lib_push(ctx, in_ffd, (char *) JSON_TD , (int) sizeof(JSON_TD) - 1); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_cloudwatch_error_put_retention_policy(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + /* mocks calls- signals that we are in test mode */ + setenv("FLB_CLOUDWATCH_PLUGIN_UNDER_TEST", "true", 1); + setenv("TEST_PUT_RETENTION_POLICY_ERROR", ERROR_UNKNOWN, 1); + + ctx = flb_create(); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx,in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "cloudwatch_logs", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd,"match", "test", NULL); + flb_output_set(ctx, out_ffd,"region", "us-west-2", NULL); + flb_output_set(ctx, out_ffd,"log_group_name", "fluent", NULL); + flb_output_set(ctx, out_ffd,"log_stream_prefix", "from-fluent-", NULL); + flb_output_set(ctx, out_ffd,"auto_create_group", "On", NULL); + flb_output_set(ctx, out_ffd,"log_retention_days", "14", NULL); + flb_output_set(ctx, out_ffd,"net.keepalive", "Off", NULL); + flb_output_set(ctx, out_ffd,"Retry_Limit", "1", NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + flb_lib_push(ctx, in_ffd, (char *) JSON_TD , (int) sizeof(JSON_TD) - 1); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + /* Test list */ TEST_LIST = { {"success", flb_test_cloudwatch_success }, @@ -239,5 +355,8 @@ TEST_LIST = { {"create_group_error", flb_test_cloudwatch_error_create_group }, {"create_stream_error", flb_test_cloudwatch_error_create_stream }, {"put_log_events_error", flb_test_cloudwatch_error_put_log_events }, + {"put_retention_policy_success", flb_test_cloudwatch_put_retention_policy_success }, + {"already_exists_create_group_put_retention_policy", flb_test_cloudwatch_already_exists_create_group_put_retention_policy }, + {"error_put_retention_policy", flb_test_cloudwatch_error_put_retention_policy }, {NULL, NULL} };