From 988a76075824746191ad7adb35126d3879cf09a9 Mon Sep 17 00:00:00 2001 From: John Chesley Date: Thu, 23 Mar 2023 11:53:32 -0400 Subject: [PATCH 1/6] fix(file source): make FinalizerSet optionally handle shutdown signals --- lib/file-source/src/file_server.rs | 3 +++ .../src/variants/disk_v2/ledger.rs | 4 ++-- lib/vector-common/src/finalizer.rs | 23 ++++++++++++++----- src/sources/amqp.rs | 2 +- src/sources/aws_sqs/source.rs | 2 +- src/sources/file.rs | 2 +- src/sources/gcp_pubsub.rs | 2 +- src/sources/journald.rs | 2 +- src/sources/kafka.rs | 2 +- src/sources/splunk_hec/acknowledgements.rs | 2 +- 10 files changed, 29 insertions(+), 15 deletions(-) diff --git a/lib/file-source/src/file_server.rs b/lib/file-source/src/file_server.rs index a3de6b26fbfc3..59604037797c6 100644 --- a/lib/file-source/src/file_server.rs +++ b/lib/file-source/src/file_server.rs @@ -342,6 +342,9 @@ where futures::pin_mut!(sleep); match self.handle.block_on(select(shutdown_data, sleep)) { Either::Left((_, _)) => { + self.handle + .block_on(chans.close()) + .expect("error closing file_server data channel."); let checkpointer = self .handle .block_on(checkpoint_task_handle) diff --git a/lib/vector-buffers/src/variants/disk_v2/ledger.rs b/lib/vector-buffers/src/variants/disk_v2/ledger.rs index 6f436d8aa0b47..f3209f3ca9c45 100644 --- a/lib/vector-buffers/src/variants/disk_v2/ledger.rs +++ b/lib/vector-buffers/src/variants/disk_v2/ledger.rs @@ -14,7 +14,7 @@ use futures::StreamExt; use rkyv::{with::Atomic, Archive, Serialize}; use snafu::{ResultExt, Snafu}; use tokio::{fs, io::AsyncWriteExt, sync::Notify}; -use vector_common::{finalizer::OrderedFinalizer, shutdown::ShutdownSignal}; +use vector_common::finalizer::OrderedFinalizer; use super::{ backed_archive::BackedArchive, @@ -705,7 +705,7 @@ where #[must_use] pub(super) fn spawn_finalizer(self: Arc) -> OrderedFinalizer { - let (finalizer, mut stream) = OrderedFinalizer::new(ShutdownSignal::noop()); + let (finalizer, mut stream) = OrderedFinalizer::new(None); tokio::spawn(async move { while let Some((_status, amount)) = stream.next().await { self.increment_pending_acks(amount); diff --git a/lib/vector-common/src/finalizer.rs b/lib/vector-common/src/finalizer.rs index e7de8625fdb95..dbf8497e7a3ef 100644 --- a/lib/vector-common/src/finalizer.rs +++ b/lib/vector-common/src/finalizer.rs @@ -4,7 +4,7 @@ use std::marker::{PhantomData, Unpin}; use std::{fmt::Debug, future::Future, pin::Pin, sync::Arc, task::Context, task::Poll}; use futures::stream::{BoxStream, FuturesOrdered, FuturesUnordered}; -use futures::{FutureExt, Stream, StreamExt}; +use futures::{future::OptionFuture, FutureExt, Stream, StreamExt}; use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; use tokio::sync::Notify; @@ -45,8 +45,16 @@ where { /// Produce a finalizer set along with the output stream of /// received acknowledged batch identifiers. + /// + /// The output stream will end when the source closes the producer side of the channel, and + /// acknowledgements in the channel are drained. + /// + /// If the optional shutdown signal is provided, the output stream will end immediately when a + /// shutdown signal is received. This is not recommended, and can cause some acknowledgements + /// to go unprocessed. Sources may process the message(s) that correspond to those + /// acknowledgements again. #[must_use] - pub fn new(shutdown: ShutdownSignal) -> (Self, BoxStream<'static, (BatchStatus, T)>) { + pub fn new(shutdown: Option) -> (Self, BoxStream<'static, (BatchStatus, T)>) { let (todo_tx, todo_rx) = mpsc::unbounded_channel(); let flush1 = Arc::new(Notify::new()); let flush2 = Arc::clone(&flush1); @@ -67,7 +75,7 @@ where #[must_use] pub fn maybe_new( maybe: bool, - shutdown: ShutdownSignal, + shutdown: Option, ) -> (Option, BoxStream<'static, (BatchStatus, T)>) { if maybe { let (finalizer, stream) = Self::new(shutdown); @@ -91,7 +99,7 @@ where } fn finalizer_stream( - mut shutdown: ShutdownSignal, + shutdown: Option, mut new_entries: UnboundedReceiver<(BatchStatusReceiver, T)>, mut status_receivers: S, flush: Arc, @@ -99,11 +107,14 @@ fn finalizer_stream( where S: Default + FuturesSet> + Unpin, { + let handle_shutdown = shutdown.is_some(); + let mut shutdown = OptionFuture::from(shutdown); + async_stream::stream! { loop { tokio::select! { biased; - _ = &mut shutdown => break, + _ = &mut shutdown, if handle_shutdown => break, _ = flush.notified() => { // Drop all the existing status receivers and start over. status_receivers = S::default(); @@ -116,7 +127,7 @@ where entry: Some(entry), }); } - // The new entry sender went away before shutdown, count it as a shutdown too. + // The end of the new entry channel signals shutdown None => break, }, finished = status_receivers.next(), if !status_receivers.is_empty() => match finished { diff --git a/src/sources/amqp.rs b/src/sources/amqp.rs index eec36c0751a21..4ca5bcc1d9183 100644 --- a/src/sources/amqp.rs +++ b/src/sources/amqp.rs @@ -417,7 +417,7 @@ async fn run_amqp_source( acknowledgements: bool, ) -> Result<(), ()> { let (finalizer, mut ack_stream) = - UnorderedFinalizer::::maybe_new(acknowledgements, shutdown.clone()); + UnorderedFinalizer::::maybe_new(acknowledgements, Some(shutdown.clone())); debug!("Starting amqp source, listening to queue {}.", config.queue); let mut consumer = channel diff --git a/src/sources/aws_sqs/source.rs b/src/sources/aws_sqs/source.rs index 64314391bd1c5..8f02fd3028e0f 100644 --- a/src/sources/aws_sqs/source.rs +++ b/src/sources/aws_sqs/source.rs @@ -45,7 +45,7 @@ impl SqsSource { pub async fn run(self, out: SourceSender, shutdown: ShutdownSignal) -> Result<(), ()> { let mut task_handles = vec![]; let finalizer = self.acknowledgements.then(|| { - let (finalizer, mut ack_stream) = Finalizer::new(shutdown.clone()); + let (finalizer, mut ack_stream) = Finalizer::new(Some(shutdown.clone())); let client = self.client.clone(); let queue_url = self.queue_url.clone(); tokio::spawn( diff --git a/src/sources/file.rs b/src/sources/file.rs index 7fe178915e367..9eb417660bb70 100644 --- a/src/sources/file.rs +++ b/src/sources/file.rs @@ -551,7 +551,7 @@ pub fn file_source( // The shutdown sent in to the finalizer is the global // shutdown handle used to tell it to stop accepting new batch // statuses and just wait for the remaining acks to come in. - let (finalizer, mut ack_stream) = OrderedFinalizer::::new(shutdown.clone()); + let (finalizer, mut ack_stream) = OrderedFinalizer::::new(None); // We set up a separate shutdown signal to tie together the // finalizer and the checkpoint writer task in the file // server, to make it continue to write out updated diff --git a/src/sources/gcp_pubsub.rs b/src/sources/gcp_pubsub.rs index 7c96eb9d3b129..c15d4008f66aa 100644 --- a/src/sources/gcp_pubsub.rs +++ b/src/sources/gcp_pubsub.rs @@ -497,7 +497,7 @@ impl PubsubSource { let mut stream = stream.into_inner(); let (finalizer, mut ack_stream) = - Finalizer::maybe_new(self.acknowledgements, self.shutdown.clone()); + Finalizer::maybe_new(self.acknowledgements, Some(self.shutdown.clone())); let mut pending_acks = 0; loop { diff --git a/src/sources/journald.rs b/src/sources/journald.rs index 73bed775705a2..4eb0a4c178d8e 100644 --- a/src/sources/journald.rs +++ b/src/sources/journald.rs @@ -887,7 +887,7 @@ impl Finalizer { shutdown: ShutdownSignal, ) -> Self { if acknowledgements { - let (finalizer, mut ack_stream) = OrderedFinalizer::new(shutdown); + let (finalizer, mut ack_stream) = OrderedFinalizer::new(Some(shutdown)); tokio::spawn(async move { while let Some((status, cursor)) = ack_stream.next().await { if status == BatchStatus::Delivered { diff --git a/src/sources/kafka.rs b/src/sources/kafka.rs index 35396487c6cba..5eced9a031dda 100644 --- a/src/sources/kafka.rs +++ b/src/sources/kafka.rs @@ -373,7 +373,7 @@ async fn kafka_source( ) -> Result<(), ()> { let consumer = Arc::new(consumer); let (finalizer, mut ack_stream) = - OrderedFinalizer::::maybe_new(acknowledgements, shutdown.clone()); + OrderedFinalizer::::maybe_new(acknowledgements, Some(shutdown.clone())); let finalizer = finalizer.map(Arc::new); if let Some(finalizer) = &finalizer { consumer diff --git a/src/sources/splunk_hec/acknowledgements.rs b/src/sources/splunk_hec/acknowledgements.rs index 795f4644bbb40..2eee1ed78ac8e 100644 --- a/src/sources/splunk_hec/acknowledgements.rs +++ b/src/sources/splunk_hec/acknowledgements.rs @@ -197,7 +197,7 @@ impl Channel { fn new(max_pending_acks_per_channel: u64, shutdown: ShutdownSignal) -> Self { let ack_ids_status = Arc::new(Mutex::new(RoaringTreemap::new())); let finalizer_ack_ids_status = Arc::clone(&ack_ids_status); - let (ack_event_finalizer, mut ack_stream) = UnorderedFinalizer::new(shutdown); + let (ack_event_finalizer, mut ack_stream) = UnorderedFinalizer::new(Some(shutdown)); tokio::spawn(async move { while let Some((status, ack_id)) = ack_stream.next().await { if status == BatchStatus::Delivered { From 3756619f7f1dc095bed25d14b8b375ba12fce5c9 Mon Sep 17 00:00:00 2001 From: John Chesley Date: Wed, 29 Mar 2023 09:06:24 -0400 Subject: [PATCH 2/6] tests(file source): add test case to exercise FinalizerSet shutdown handling race --- src/sources/file.rs | 54 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/src/sources/file.rs b/src/sources/file.rs index 9eb417660bb70..194c788b610fa 100644 --- a/src/sources/file.rs +++ b/src/sources/file.rs @@ -1565,6 +1565,60 @@ mod tests { assert_eq!(lines, vec!["the line"]); } + #[tokio::test] + async fn file_duplicate_processing_after_restart() { + let dir = tempdir().unwrap(); + let config = file::FileConfig { + include: vec![dir.path().join("*")], + ..test_default_file_config(&dir) + }; + + let path = dir.path().join("file"); + let mut file = File::create(&path).unwrap(); + + let line_count = 4000; + for i in 0..line_count { + writeln!(&mut file, "Here's a line for you: {}", i).unwrap(); + } + sleep_500_millis().await; + + // First time server runs it should pick up a bunch of lines + let received = run_file_source( + &config, + true, + Acks, + LogNamespace::Legacy, + // shutdown signal is sent after this duration + sleep_500_millis(), + ) + .await; + let lines = extract_messages_string(received); + + // ...but not all the lines; if the first run processed the entire file, we may not hit the + // bug we're testing for, which happens if the finalizer stream exits on shutdown with pending acks + assert!(lines.len() < line_count); + println!( + "Read {} lines (last: '{}').", + lines.len(), + lines[lines.len() - 1] + ); + + // Restart the server, and it should read the rest without duplicating any + let received = run_file_source( + &config, + true, + Acks, + LogNamespace::Legacy, + sleep(Duration::from_secs(5)), + ) + .await; + let lines2 = extract_messages_string(received); + println!("Read {} lines (first: '{}').", lines2.len(), lines2[0]); + + // Between both runs, we should have the expected number of lines + assert_eq!(lines.len() + lines2.len(), line_count); + } + #[tokio::test] async fn file_start_position_server_restart_with_file_rotation_acknowledged() { file_start_position_server_restart_with_file_rotation(Acks).await From d887e4b5cef1a9858addeb5c36694d5169bf9278 Mon Sep 17 00:00:00 2001 From: John Chesley Date: Wed, 29 Mar 2023 10:11:25 -0400 Subject: [PATCH 3/6] clippy fixes --- src/sources/file.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/sources/file.rs b/src/sources/file.rs index 194c788b610fa..76a189d5fddd0 100644 --- a/src/sources/file.rs +++ b/src/sources/file.rs @@ -1597,11 +1597,6 @@ mod tests { // ...but not all the lines; if the first run processed the entire file, we may not hit the // bug we're testing for, which happens if the finalizer stream exits on shutdown with pending acks assert!(lines.len() < line_count); - println!( - "Read {} lines (last: '{}').", - lines.len(), - lines[lines.len() - 1] - ); // Restart the server, and it should read the rest without duplicating any let received = run_file_source( @@ -1613,7 +1608,6 @@ mod tests { ) .await; let lines2 = extract_messages_string(received); - println!("Read {} lines (first: '{}').", lines2.len(), lines2[0]); // Between both runs, we should have the expected number of lines assert_eq!(lines.len() + lines2.len(), line_count); From b7b9a9028702e8b2dd967c7fbf57fb87afffe551 Mon Sep 17 00:00:00 2001 From: John Chesley Date: Wed, 29 Mar 2023 10:12:12 -0400 Subject: [PATCH 4/6] revert fix to demonstrate failure --- src/sources/file.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/sources/file.rs b/src/sources/file.rs index 76a189d5fddd0..7cae9c2bb2978 100644 --- a/src/sources/file.rs +++ b/src/sources/file.rs @@ -551,7 +551,8 @@ pub fn file_source( // The shutdown sent in to the finalizer is the global // shutdown handle used to tell it to stop accepting new batch // statuses and just wait for the remaining acks to come in. - let (finalizer, mut ack_stream) = OrderedFinalizer::::new(None); + let (finalizer, mut ack_stream) = OrderedFinalizer::::new(Some(shutdown.clone())); + // We set up a separate shutdown signal to tie together the // finalizer and the checkpoint writer task in the file // server, to make it continue to write out updated From e42b7c377ef53034be88bcb5d0498f11ccd47af6 Mon Sep 17 00:00:00 2001 From: John Chesley Date: Wed, 29 Mar 2023 11:27:09 -0400 Subject: [PATCH 5/6] fix(file source): disable ack_stream shutdown handling --- src/sources/file.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sources/file.rs b/src/sources/file.rs index 7cae9c2bb2978..6f6622ef6a6c5 100644 --- a/src/sources/file.rs +++ b/src/sources/file.rs @@ -551,7 +551,7 @@ pub fn file_source( // The shutdown sent in to the finalizer is the global // shutdown handle used to tell it to stop accepting new batch // statuses and just wait for the remaining acks to come in. - let (finalizer, mut ack_stream) = OrderedFinalizer::::new(Some(shutdown.clone())); + let (finalizer, mut ack_stream) = OrderedFinalizer::::new(None); // We set up a separate shutdown signal to tie together the // finalizer and the checkpoint writer task in the file From 54c8c0699fd3e71a4a2524a99ea3b1715e6cfaa7 Mon Sep 17 00:00:00 2001 From: John Chesley Date: Wed, 29 Mar 2023 17:06:47 -0400 Subject: [PATCH 6/6] trigger test suites