From 26a4833b4aa97bfe25a6f46ff7bb17bd8acc8e3e Mon Sep 17 00:00:00 2001
From: Matthew Fala <34408404+matthewfala@users.noreply.github.com>
Date: Tue, 17 Oct 2023 04:36:23 -1000
Subject: [PATCH] out_cloudwatch_logs: remove sequence tokens from API calls
 (#7973)

Signed-off-by: Matthew Fala <falamatt@amazon.com>
---
 plugins/out_cloudwatch_logs/cloudwatch_api.c  | 114 ++----------------
 plugins/out_cloudwatch_logs/cloudwatch_logs.c |  68 +++++------
 plugins/out_cloudwatch_logs/cloudwatch_logs.h |   8 +-
 src/aws/flb_aws_util.c                        |   2 +
 4 files changed, 48 insertions(+), 144 deletions(-)

diff --git a/plugins/out_cloudwatch_logs/cloudwatch_api.c b/plugins/out_cloudwatch_logs/cloudwatch_api.c
index 8043968cfa8..e6d5e8a766f 100644
--- a/plugins/out_cloudwatch_logs/cloudwatch_api.c
+++ b/plugins/out_cloudwatch_logs/cloudwatch_api.c
@@ -50,9 +50,7 @@
 #include "cloudwatch_api.h"
 
 #define ERR_CODE_ALREADY_EXISTS         "ResourceAlreadyExistsException"
-#define ERR_CODE_INVALID_SEQUENCE_TOKEN "InvalidSequenceTokenException"
 #define ERR_CODE_NOT_FOUND              "ResourceNotFoundException"
-#define ERR_CODE_DATA_ALREADY_ACCEPTED  "DataAlreadyAcceptedException"
 
 #define AMZN_REQUEST_ID_HEADER          "x-amzn-RequestId"
 
@@ -229,23 +227,6 @@ static int init_put_payload(struct flb_cloudwatch *ctx, struct cw_flush *buf,
         goto error;
     }
 
-    if (stream->sequence_token) {
-        if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,
-                          "\"sequenceToken\":\"", 17)) {
-            goto error;
-        }
-
-        if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,
-                          stream->sequence_token, 0)) {
-            goto error;
-        }
-
-        if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,
-                          "\",", 2)) {
-            goto error;
-        }
-    }
-
     if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,
                       "\"logEvents\":[", 13)) {
         goto error;
@@ -493,9 +474,6 @@ void reset_flush_buf(struct flb_cloudwatch *ctx, struct cw_flush *buf) {
     if (buf->current_stream != NULL) {
         buf->data_size += strlen(buf->current_stream->name);
         buf->data_size += strlen(buf->current_stream->group);
-        if (buf->current_stream->sequence_token) {
-            buf->data_size += strlen(buf->current_stream->sequence_token);
-        }
     }
 }
 
@@ -1153,7 +1131,6 @@ static int set_log_group_retention(struct flb_cloudwatch *ctx, struct log_stream
     struct flb_aws_client *cw_client;
     flb_sds_t body;
     flb_sds_t tmp;
-    flb_sds_t error;
 
     flb_plg_info(ctx->ins, "Setting retention policy on log group %s to %dd", stream->group, ctx->log_retention_days);
 
@@ -1196,17 +1173,9 @@ static int set_log_group_retention(struct flb_cloudwatch *ctx, struct log_stream
 
         /* Check error */
         if (c->resp.payload_size > 0) {
-            error = flb_aws_error(c->resp.payload, c->resp.payload_size);
-            if (error != NULL) {
-                /* some other error occurred; notify user */
-                flb_aws_print_error(c->resp.payload, c->resp.payload_size,
-                                        "PutRetentionPolicy", ctx->ins);
-                flb_sds_destroy(error);
-            }
-            else {
-                /* error can not be parsed, print raw response to debug */
-                flb_plg_debug(ctx->ins, "Raw response: %s", c->resp.payload);
-            }
+            /* some error occurred; notify user */
+            flb_aws_print_error(c->resp.payload, c->resp.payload_size,
+                                               "PutRetentionPolicy", ctx->ins);
         }
     }
 
@@ -1287,8 +1256,8 @@ int create_log_group(struct flb_cloudwatch *ctx, struct log_stream *stream)
                 flb_sds_destroy(error);
             }
             else {
-                /* error can not be parsed, print raw response to debug */
-                flb_plg_debug(ctx->ins, "Raw response: %s", c->resp.payload);
+                /* error can not be parsed, print raw response */
+                flb_plg_warn(ctx->ins, "Raw response: %s", c->resp.payload);
             }
         }
     }
@@ -1402,8 +1371,8 @@ int create_log_stream(struct flb_cloudwatch *ctx, struct log_stream *stream,
                 flb_sds_destroy(error);
             }
             else {
-                /* error can not be parsed, print raw response to debug */
-                flb_plg_debug(ctx->ins, "Raw response: %s", c->resp.payload);
+                /* error can not be parsed, print raw response */
+                flb_plg_warn(ctx->ins, "Raw response: %s", c->resp.payload);
             }
         }
     }
@@ -1417,8 +1386,7 @@ int create_log_stream(struct flb_cloudwatch *ctx, struct log_stream *stream,
 }
 
 /*
- * Returns -1 on failure, 0 on success, and 1 for a sequence token error,
- * which means the caller can retry.
+ * Returns -1 on failure, 0 on success
  */
 int put_log_events(struct flb_cloudwatch *ctx, struct cw_flush *buf,
                    struct log_stream *stream, size_t payload_size)
@@ -1427,7 +1395,6 @@ int put_log_events(struct flb_cloudwatch *ctx, struct cw_flush *buf,
     struct flb_http_client *c = NULL;
     struct flb_aws_client *cw_client;
     flb_sds_t tmp;
-    flb_sds_t error;
     int num_headers = 1;
     int retry = FLB_TRUE;
 
@@ -1460,8 +1427,7 @@ int put_log_events(struct flb_cloudwatch *ctx, struct cw_flush *buf,
             if (c->resp.data == NULL || c->resp.data_len == 0 || strstr(c->resp.data, AMZN_REQUEST_ID_HEADER) == NULL) {
                 /* code was 200, but response is invalid, treat as failure */
                 if (c->resp.data != NULL) {
-                    flb_plg_debug(ctx->ins, "Could not find sequence token in "
-                                  "response: response body is empty: full data: `%.*s`", c->resp.data_len, c->resp.data);
+                    flb_plg_debug(ctx->ins, "Invalid response: full data: `%.*s`", c->resp.data_len, c->resp.data);
                 }
                 flb_http_client_destroy(c);
 
@@ -1474,27 +1440,6 @@ int put_log_events(struct flb_cloudwatch *ctx, struct cw_flush *buf,
                                   AMZN_REQUEST_ID_HEADER);
                 return -1;
             }
-
-
-            /* success */
-            if (c->resp.payload_size > 0) {
-                flb_plg_debug(ctx->ins, "Sent events to %s", stream->name);
-                tmp = flb_json_get_val(c->resp.payload, c->resp.payload_size,
-                                       "nextSequenceToken");
-                if (tmp) {
-                    if (stream->sequence_token != NULL) {
-                        flb_sds_destroy(stream->sequence_token);
-                    }
-                    stream->sequence_token = tmp;
-
-                    flb_http_client_destroy(c);
-                    return 0;
-                }
-                else {
-                    flb_plg_error(ctx->ins, "Could not find sequence token in "
-                                  "response: %s", c->resp.payload);
-                }
-            }
         
             flb_http_client_destroy(c);
             return 0;
@@ -1502,45 +1447,8 @@ int put_log_events(struct flb_cloudwatch *ctx, struct cw_flush *buf,
 
         /* Check error */
         if (c->resp.payload_size > 0) {
-            error = flb_aws_error(c->resp.payload, c->resp.payload_size);
-            if (error != NULL) {
-                if (strcmp(error, ERR_CODE_INVALID_SEQUENCE_TOKEN) == 0) {
-                    /*
-                     * This case will happen when we do not know the correct
-                     * sequence token; we can find it in the error response
-                     * and retry.
-                     */
-                    flb_plg_debug(ctx->ins, "Sequence token was invalid, "
-                                  "will retry");
-                    tmp = flb_json_get_val(c->resp.payload, c->resp.payload_size,
-                                           "expectedSequenceToken");
-                    if (tmp) {
-                        if (stream->sequence_token != NULL) {
-                            flb_sds_destroy(stream->sequence_token);
-                        }
-                        stream->sequence_token = tmp;
-                        flb_sds_destroy(error);
-                        flb_http_client_destroy(c);
-                        /* tell the caller to retry */
-                        return 1;
-                    }
-                } else if (strcmp(error, ERR_CODE_DATA_ALREADY_ACCEPTED) == 0) {
-                    /* not sure what causes this but it counts as success */
-                    flb_plg_info(ctx->ins, "Got %s, a previous retry must have succeeded asychronously", ERR_CODE_DATA_ALREADY_ACCEPTED);
-                    flb_sds_destroy(error);
-                    flb_http_client_destroy(c);
-                    /* success */
-                    return 0;
-                }
-                /* some other error occurred; notify user */
-                flb_aws_print_error(c->resp.payload, c->resp.payload_size,
-                                    "PutLogEvents", ctx->ins);
-                flb_sds_destroy(error);
-            }
-            else {
-                /* error could not be parsed, print raw response to debug */
-                flb_plg_debug(ctx->ins, "Raw response: %s", c->resp.payload);
-            }
+            flb_aws_print_error(c->resp.payload, c->resp.payload_size,
+                                                  "PutLogEvents", ctx->ins);
         }
     }
 
diff --git a/plugins/out_cloudwatch_logs/cloudwatch_logs.c b/plugins/out_cloudwatch_logs/cloudwatch_logs.c
index f6aef224088..87c31949a55 100644
--- a/plugins/out_cloudwatch_logs/cloudwatch_logs.c
+++ b/plugins/out_cloudwatch_logs/cloudwatch_logs.c
@@ -56,7 +56,6 @@ static int cb_cloudwatch_init(struct flb_output_instance *ins,
     const char *tmp;
     char *session_name = NULL;
     struct flb_cloudwatch *ctx = NULL;
-    struct cw_flush *buf = NULL;
     int ret;
     flb_sds_t tmp_sds = NULL;
     (void) config;
@@ -348,18 +347,33 @@ static int cb_cloudwatch_init(struct flb_output_instance *ins,
     flb_output_upstream_set(upstream, ctx->ins);
     ctx->cw_client->host = ctx->endpoint;
 
-    /* alloc the payload/processing buffer */
+    /* Export context */
+    flb_output_set_context(ins, ctx);
+
+    return 0;
+
+error:
+    flb_free(session_name);
+    flb_plg_error(ctx->ins, "Initialization failed");
+    flb_cloudwatch_ctx_destroy(ctx);
+    return -1;
+}
+
+struct cw_flush *new_buffer()
+{
+    struct cw_flush *buf;
+
     buf = flb_calloc(1, sizeof(struct cw_flush));
     if (!buf) {
         flb_errno();
-        goto error;
+        return NULL;
     }
 
     buf->out_buf = flb_malloc(PUT_LOG_EVENTS_PAYLOAD_SIZE);
     if (!buf->out_buf) {
         flb_errno();
         cw_flush_destroy(buf);
-        goto error;
+        return NULL;
     }
     buf->out_buf_size = PUT_LOG_EVENTS_PAYLOAD_SIZE;
 
@@ -367,7 +381,7 @@ static int cb_cloudwatch_init(struct flb_output_instance *ins,
     if (!buf->tmp_buf) {
         flb_errno();
         cw_flush_destroy(buf);
-        goto error;
+        return NULL;
     }
     buf->tmp_buf_size = PUT_LOG_EVENTS_PAYLOAD_SIZE;
 
@@ -375,23 +389,11 @@ static int cb_cloudwatch_init(struct flb_output_instance *ins,
     if (!buf->events) {
         flb_errno();
         cw_flush_destroy(buf);
-        goto error;
+        return NULL;
     }
     buf->events_capacity = MAX_EVENTS_PER_PUT;
 
-    ctx->buf = buf;
-
-
-    /* Export context */
-    flb_output_set_context(ins, ctx);
-
-    return 0;
-
-error:
-    flb_free(session_name);
-    flb_plg_error(ctx->ins, "Initialization failed");
-    flb_cloudwatch_ctx_destroy(ctx);
-    return -1;
+    return buf;
 }
 
 static void cb_cloudwatch_flush(struct flb_event_chunk *event_chunk,
@@ -405,15 +407,21 @@ static void cb_cloudwatch_flush(struct flb_event_chunk *event_chunk,
     (void) i_ins;
     (void) config;
 
-    event_count = process_and_send(ctx, i_ins->p->name, ctx->buf, event_chunk->tag,
-                                   event_chunk->data, event_chunk->size);
+    struct cw_flush *buf;
+
+    buf = new_buffer();
+    if (!buf) {
+        FLB_OUTPUT_RETURN(FLB_RETRY);
+    }
+
+    event_count = process_and_send(ctx, i_ins->p->name, buf, event_chunk->tag, event_chunk->data, event_chunk->size);
     if (event_count < 0) {
         flb_plg_error(ctx->ins, "Failed to send events");
+        cw_flush_destroy(buf);
         FLB_OUTPUT_RETURN(FLB_RETRY);
     }
 
-    // TODO: this msg is innaccurate if events are skipped
-    flb_plg_debug(ctx->ins, "Sent %d events to CloudWatch", event_count);
+    cw_flush_destroy(buf);
 
     FLB_OUTPUT_RETURN(FLB_OK);
 }
@@ -429,10 +437,6 @@ void flb_cloudwatch_ctx_destroy(struct flb_cloudwatch *ctx)
             flb_aws_provider_destroy(ctx->base_aws_provider);
         }
 
-        if (ctx->buf) {
-            cw_flush_destroy(ctx->buf);
-        }
-
         if (ctx->aws_provider) {
             flb_aws_provider_destroy(ctx->aws_provider);
         }
@@ -496,9 +500,6 @@ void log_stream_destroy(struct log_stream *stream)
         if (stream->name) {
             flb_sds_destroy(stream->name);
         }
-        if (stream->sequence_token) {
-            flb_sds_destroy(stream->sequence_token);
-        }
         if (stream->group) {
             flb_sds_destroy(stream->group);
         }
@@ -657,12 +658,7 @@ struct flb_output_plugin out_cloudwatch_logs_plugin = {
     .cb_init      = cb_cloudwatch_init,
     .cb_flush     = cb_cloudwatch_flush,
     .cb_exit      = cb_cloudwatch_exit,
-
-    /*
-     * Allow cloudwatch to use async network stack synchronously by opting into
-     * FLB_OUTPUT_SYNCHRONOUS synchronous task scheduler
-     */
-    .flags        = FLB_OUTPUT_SYNCHRONOUS,
+    .flags        = 0,
     .workers      = 1,
 
     /* Configuration */
diff --git a/plugins/out_cloudwatch_logs/cloudwatch_logs.h b/plugins/out_cloudwatch_logs/cloudwatch_logs.h
index 7fe8bf0b764..b08ef9df0cf 100644
--- a/plugins/out_cloudwatch_logs/cloudwatch_logs.h
+++ b/plugins/out_cloudwatch_logs/cloudwatch_logs.h
@@ -70,7 +70,7 @@ struct cw_event {
 struct log_stream {
     flb_sds_t name;
     flb_sds_t group;
-    flb_sds_t sequence_token;
+
     /*
      * log streams in CloudWatch do not expire; but our internal representations
      * of them are periodically cleaned up if they have been unused for too long
@@ -87,8 +87,6 @@ struct log_stream {
     struct mk_list _head;
 };
 
-void log_stream_destroy(struct log_stream *stream);
-
 struct flb_cloudwatch {
     /*
      * TLS instances can not be re-used. So we have one for:
@@ -138,8 +136,6 @@ struct flb_cloudwatch {
     /* stores log streams we're putting to */
     struct mk_list streams;
 
-    /* buffers for data processing and request payload */
-    struct cw_flush *buf;
     /* The namespace to use for the metric */
     flb_sds_t metric_namespace;
 
@@ -155,4 +151,6 @@ struct flb_cloudwatch {
 
 void flb_cloudwatch_ctx_destroy(struct flb_cloudwatch *ctx);
 
+void log_stream_destroy(struct log_stream *stream);
+
 #endif
diff --git a/src/aws/flb_aws_util.c b/src/aws/flb_aws_util.c
index 533bba7eb4d..231e9aeb7ff 100644
--- a/src/aws/flb_aws_util.c
+++ b/src/aws/flb_aws_util.c
@@ -581,6 +581,8 @@ void flb_aws_print_error(char *response, size_t response_len,
 
     error = flb_json_get_val(response, response_len, "__type");
     if (!error) {
+        /* error can not be parsed, print raw response */
+        flb_plg_warn(ins, "Raw response: %s", response);
         return;
     }