Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(file source): make FinalizerSet optionally handle shutdown signals #16928

Merged
merged 6 commits into from
Mar 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions lib/file-source/src/file_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,9 @@ where
futures::pin_mut!(sleep);
match self.handle.block_on(select(shutdown_data, sleep)) {
Either::Left((_, _)) => {
self.handle
.block_on(chans.close())
.expect("error closing file_server data channel.");
let checkpointer = self
.handle
.block_on(checkpoint_task_handle)
Expand Down
4 changes: 2 additions & 2 deletions lib/vector-buffers/src/variants/disk_v2/ledger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use futures::StreamExt;
use rkyv::{with::Atomic, Archive, Serialize};
use snafu::{ResultExt, Snafu};
use tokio::{fs, io::AsyncWriteExt, sync::Notify};
use vector_common::{finalizer::OrderedFinalizer, shutdown::ShutdownSignal};
use vector_common::finalizer::OrderedFinalizer;

use super::{
backed_archive::BackedArchive,
Expand Down Expand Up @@ -705,7 +705,7 @@ where

#[must_use]
pub(super) fn spawn_finalizer(self: Arc<Self>) -> OrderedFinalizer<u64> {
let (finalizer, mut stream) = OrderedFinalizer::new(ShutdownSignal::noop());
let (finalizer, mut stream) = OrderedFinalizer::new(None);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this not be Some here too, at least for now? The description says only the file source is being modified.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, yes I should have mentioned this case too. If I understand correctly, ShutdownSignal::noop() gives back a ShutdownSignal that is not attached to the global shutdown trigger, and can't be triggered externally. In this case, I believe this "noop" ShutdownSignal should be functionally equivalent to passing None, since it can't be signaled until the finalizer stream that owns it is dropped?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, after adding a unit test for the file source, I ran those tests locally using Some(ShutdownSignal::noop()) in the file source ack stream, and that also fixes the bug - evidence that it is indeed equivalent to passing None.

That said, if there is a concern with including this change here I'm happy to revert this line for now.

tokio::spawn(async move {
while let Some((_status, amount)) = stream.next().await {
self.increment_pending_acks(amount);
Expand Down
23 changes: 17 additions & 6 deletions lib/vector-common/src/finalizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::marker::{PhantomData, Unpin};
use std::{fmt::Debug, future::Future, pin::Pin, sync::Arc, task::Context, task::Poll};

use futures::stream::{BoxStream, FuturesOrdered, FuturesUnordered};
use futures::{FutureExt, Stream, StreamExt};
use futures::{future::OptionFuture, FutureExt, Stream, StreamExt};
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use tokio::sync::Notify;

Expand Down Expand Up @@ -45,8 +45,16 @@ where
{
/// Produce a finalizer set along with the output stream of
/// received acknowledged batch identifiers.
///
/// The output stream will end when the source closes the producer side of the channel, and
/// acknowledgements in the channel are drained.
///
/// If the optional shutdown signal is provided, the output stream will end immediately when a
/// shutdown signal is received. This is not recommended, and can cause some acknowledgements
/// to go unprocessed. Sources may process the message(s) that correspond to those
/// acknowledgements again.
#[must_use]
pub fn new(shutdown: ShutdownSignal) -> (Self, BoxStream<'static, (BatchStatus, T)>) {
pub fn new(shutdown: Option<ShutdownSignal>) -> (Self, BoxStream<'static, (BatchStatus, T)>) {
let (todo_tx, todo_rx) = mpsc::unbounded_channel();
let flush1 = Arc::new(Notify::new());
let flush2 = Arc::clone(&flush1);
Expand All @@ -67,7 +75,7 @@ where
#[must_use]
pub fn maybe_new(
maybe: bool,
shutdown: ShutdownSignal,
shutdown: Option<ShutdownSignal>,
) -> (Option<Self>, BoxStream<'static, (BatchStatus, T)>) {
if maybe {
let (finalizer, stream) = Self::new(shutdown);
Expand All @@ -91,19 +99,22 @@ where
}

fn finalizer_stream<T, S>(
mut shutdown: ShutdownSignal,
shutdown: Option<ShutdownSignal>,
mut new_entries: UnboundedReceiver<(BatchStatusReceiver, T)>,
mut status_receivers: S,
flush: Arc<Notify>,
) -> impl Stream<Item = (BatchStatus, T)>
where
S: Default + FuturesSet<FinalizerFuture<T>> + Unpin,
{
let handle_shutdown = shutdown.is_some();
let mut shutdown = OptionFuture::from(shutdown);

async_stream::stream! {
loop {
tokio::select! {
biased;
_ = &mut shutdown => break,
_ = &mut shutdown, if handle_shutdown => break,
_ = flush.notified() => {
// Drop all the existing status receivers and start over.
status_receivers = S::default();
Expand All @@ -116,7 +127,7 @@ where
entry: Some(entry),
});
}
// The new entry sender went away before shutdown, count it as a shutdown too.
// The end of the new entry channel signals shutdown
None => break,
},
finished = status_receivers.next(), if !status_receivers.is_empty() => match finished {
Expand Down
2 changes: 1 addition & 1 deletion src/sources/amqp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ async fn run_amqp_source(
acknowledgements: bool,
) -> Result<(), ()> {
let (finalizer, mut ack_stream) =
UnorderedFinalizer::<FinalizerEntry>::maybe_new(acknowledgements, shutdown.clone());
UnorderedFinalizer::<FinalizerEntry>::maybe_new(acknowledgements, Some(shutdown.clone()));

debug!("Starting amqp source, listening to queue {}.", config.queue);
let mut consumer = channel
Expand Down
2 changes: 1 addition & 1 deletion src/sources/aws_sqs/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl SqsSource {
pub async fn run(self, out: SourceSender, shutdown: ShutdownSignal) -> Result<(), ()> {
let mut task_handles = vec![];
let finalizer = self.acknowledgements.then(|| {
let (finalizer, mut ack_stream) = Finalizer::new(shutdown.clone());
let (finalizer, mut ack_stream) = Finalizer::new(Some(shutdown.clone()));
let client = self.client.clone();
let queue_url = self.queue_url.clone();
tokio::spawn(
Expand Down
51 changes: 50 additions & 1 deletion src/sources/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,8 @@ pub fn file_source(
// The shutdown sent in to the finalizer is the global
// shutdown handle used to tell it to stop accepting new batch
// statuses and just wait for the remaining acks to come in.
let (finalizer, mut ack_stream) = OrderedFinalizer::<FinalizerEntry>::new(shutdown.clone());
let (finalizer, mut ack_stream) = OrderedFinalizer::<FinalizerEntry>::new(None);
bruceg marked this conversation as resolved.
Show resolved Hide resolved

// We set up a separate shutdown signal to tie together the
// finalizer and the checkpoint writer task in the file
// server, to make it continue to write out updated
Expand Down Expand Up @@ -1565,6 +1566,54 @@ mod tests {
assert_eq!(lines, vec!["the line"]);
}

#[tokio::test]
async fn file_duplicate_processing_after_restart() {
let dir = tempdir().unwrap();
let config = file::FileConfig {
include: vec![dir.path().join("*")],
..test_default_file_config(&dir)
};

let path = dir.path().join("file");
let mut file = File::create(&path).unwrap();

let line_count = 4000;
for i in 0..line_count {
writeln!(&mut file, "Here's a line for you: {}", i).unwrap();
}
sleep_500_millis().await;

// First time server runs it should pick up a bunch of lines
let received = run_file_source(
&config,
true,
Acks,
LogNamespace::Legacy,
// shutdown signal is sent after this duration
sleep_500_millis(),
)
.await;
let lines = extract_messages_string(received);

// ...but not all the lines; if the first run processed the entire file, we may not hit the
// bug we're testing for, which happens if the finalizer stream exits on shutdown with pending acks
assert!(lines.len() < line_count);

// Restart the server, and it should read the rest without duplicating any
let received = run_file_source(
&config,
true,
Acks,
LogNamespace::Legacy,
sleep(Duration::from_secs(5)),
)
.await;
let lines2 = extract_messages_string(received);

// Between both runs, we should have the expected number of lines
assert_eq!(lines.len() + lines2.len(), line_count);
}

#[tokio::test]
async fn file_start_position_server_restart_with_file_rotation_acknowledged() {
file_start_position_server_restart_with_file_rotation(Acks).await
Expand Down
2 changes: 1 addition & 1 deletion src/sources/gcp_pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ impl PubsubSource {
let mut stream = stream.into_inner();

let (finalizer, mut ack_stream) =
Finalizer::maybe_new(self.acknowledgements, self.shutdown.clone());
Finalizer::maybe_new(self.acknowledgements, Some(self.shutdown.clone()));
let mut pending_acks = 0;

loop {
Expand Down
2 changes: 1 addition & 1 deletion src/sources/journald.rs
Original file line number Diff line number Diff line change
Expand Up @@ -887,7 +887,7 @@ impl Finalizer {
shutdown: ShutdownSignal,
) -> Self {
if acknowledgements {
let (finalizer, mut ack_stream) = OrderedFinalizer::new(shutdown);
let (finalizer, mut ack_stream) = OrderedFinalizer::new(Some(shutdown));
tokio::spawn(async move {
while let Some((status, cursor)) = ack_stream.next().await {
if status == BatchStatus::Delivered {
Expand Down
2 changes: 1 addition & 1 deletion src/sources/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ async fn kafka_source(
) -> Result<(), ()> {
let consumer = Arc::new(consumer);
let (finalizer, mut ack_stream) =
OrderedFinalizer::<FinalizerEntry>::maybe_new(acknowledgements, shutdown.clone());
OrderedFinalizer::<FinalizerEntry>::maybe_new(acknowledgements, Some(shutdown.clone()));
let finalizer = finalizer.map(Arc::new);
if let Some(finalizer) = &finalizer {
consumer
Expand Down
2 changes: 1 addition & 1 deletion src/sources/splunk_hec/acknowledgements.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ impl Channel {
fn new(max_pending_acks_per_channel: u64, shutdown: ShutdownSignal) -> Self {
let ack_ids_status = Arc::new(Mutex::new(RoaringTreemap::new()));
let finalizer_ack_ids_status = Arc::clone(&ack_ids_status);
let (ack_event_finalizer, mut ack_stream) = UnorderedFinalizer::new(shutdown);
let (ack_event_finalizer, mut ack_stream) = UnorderedFinalizer::new(Some(shutdown));
tokio::spawn(async move {
while let Some((status, ack_id)) = ack_stream.next().await {
if status == BatchStatus::Delivered {
Expand Down