diff --git a/changelog.d/21128_log_aws_s3_objects_consumed.md b/changelog.d/21128_log_aws_s3_objects_consumed.md new file mode 100644 index 0000000000000..f869d8d43ae3f --- /dev/null +++ b/changelog.d/21128_log_aws_s3_objects_consumed.md @@ -0,0 +1,3 @@ +`aws_s3` source now logs s3 objects fetched (and completed if acks are enabled on the sink) + +authors: fdamstra diff --git a/src/sources/aws_s3/sqs.rs b/src/sources/aws_s3/sqs.rs index c0c3e597be113..e45e014442e9c 100644 --- a/src/sources/aws_s3/sqs.rs +++ b/src/sources/aws_s3/sqs.rs @@ -393,6 +393,11 @@ impl IngestorProcess { message_id: &message_id }); if self.state.delete_message { + trace!( + message = "Queued SQS message for deletion", + id = message_id, + receipt_handle = receipt_handle, + ); delete_entries.push( DeleteMessageBatchRequestEntry::builder() .id(message_id) @@ -517,6 +522,12 @@ impl IngestorProcess { let object = object_result?; + info!( + message = "Got S3 object from SQS notification.", + bucket = s3_event.s3.bucket.name, + key = s3_event.s3.object.key + ); + let metadata = object.metadata; let timestamp = object.last_modified.map(|ts| { @@ -641,12 +652,36 @@ impl IngestorProcess { Some(receiver) => { let result = receiver.await; match result { - BatchStatus::Delivered => Ok(()), - BatchStatus::Errored => Err(ProcessingError::ErrorAcknowledgement), + BatchStatus::Delivered => { + info!( + message = "S3 object from SQS notification delivered.", + bucket = s3_event.s3.bucket.name, + key = s3_event.s3.object.key, + ); + Ok(()) + } + BatchStatus::Errored => { + warn!( + message = "S3 object from SQS notification had an error.", + bucket = s3_event.s3.bucket.name, + key = s3_event.s3.object.key, + ); + Err(ProcessingError::ErrorAcknowledgement) + }, BatchStatus::Rejected => { if self.state.delete_failed_message { + warn!( + message = "S3 object from SQS notification was rejected. Deleting failed message.", + bucket = s3_event.s3.bucket.name, + key = s3_event.s3.object.key, + ); Ok(()) } else { + error!( + message = "S3 object from SQS notification was rejected.", + bucket = s3_event.s3.bucket.name, + key = s3_event.s3.object.key, + ); Err(ProcessingError::ErrorAcknowledgement) } }