Skip to content

Commit

Permalink
fix(file source): make FinalizerSet optionally handle shutdown signals
Browse files Browse the repository at this point in the history
  • Loading branch information
jches committed Mar 23, 2023
1 parent 532e9da commit 988a760
Show file tree
Hide file tree
Showing 10 changed files with 29 additions and 15 deletions.
3 changes: 3 additions & 0 deletions lib/file-source/src/file_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,9 @@ where
futures::pin_mut!(sleep);
match self.handle.block_on(select(shutdown_data, sleep)) {
Either::Left((_, _)) => {
self.handle
.block_on(chans.close())
.expect("error closing file_server data channel.");
let checkpointer = self
.handle
.block_on(checkpoint_task_handle)
Expand Down
4 changes: 2 additions & 2 deletions lib/vector-buffers/src/variants/disk_v2/ledger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use futures::StreamExt;
use rkyv::{with::Atomic, Archive, Serialize};
use snafu::{ResultExt, Snafu};
use tokio::{fs, io::AsyncWriteExt, sync::Notify};
use vector_common::{finalizer::OrderedFinalizer, shutdown::ShutdownSignal};
use vector_common::finalizer::OrderedFinalizer;

use super::{
backed_archive::BackedArchive,
Expand Down Expand Up @@ -705,7 +705,7 @@ where

#[must_use]
pub(super) fn spawn_finalizer(self: Arc<Self>) -> OrderedFinalizer<u64> {
let (finalizer, mut stream) = OrderedFinalizer::new(ShutdownSignal::noop());
let (finalizer, mut stream) = OrderedFinalizer::new(None);
tokio::spawn(async move {
while let Some((_status, amount)) = stream.next().await {
self.increment_pending_acks(amount);
Expand Down
23 changes: 17 additions & 6 deletions lib/vector-common/src/finalizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::marker::{PhantomData, Unpin};
use std::{fmt::Debug, future::Future, pin::Pin, sync::Arc, task::Context, task::Poll};

use futures::stream::{BoxStream, FuturesOrdered, FuturesUnordered};
use futures::{FutureExt, Stream, StreamExt};
use futures::{future::OptionFuture, FutureExt, Stream, StreamExt};
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use tokio::sync::Notify;

Expand Down Expand Up @@ -45,8 +45,16 @@ where
{
/// Produce a finalizer set along with the output stream of
/// received acknowledged batch identifiers.
///
/// The output stream will end when the source closes the producer side of the channel, and
/// acknowledgements in the channel are drained.
///
/// If the optional shutdown signal is provided, the output stream will end immediately when a
/// shutdown signal is received. This is not recommended, and can cause some acknowledgements
/// to go unprocessed. Sources may process the message(s) that correspond to those
/// acknowledgements again.
#[must_use]
pub fn new(shutdown: ShutdownSignal) -> (Self, BoxStream<'static, (BatchStatus, T)>) {
pub fn new(shutdown: Option<ShutdownSignal>) -> (Self, BoxStream<'static, (BatchStatus, T)>) {
let (todo_tx, todo_rx) = mpsc::unbounded_channel();
let flush1 = Arc::new(Notify::new());
let flush2 = Arc::clone(&flush1);
Expand All @@ -67,7 +75,7 @@ where
#[must_use]
pub fn maybe_new(
maybe: bool,
shutdown: ShutdownSignal,
shutdown: Option<ShutdownSignal>,
) -> (Option<Self>, BoxStream<'static, (BatchStatus, T)>) {
if maybe {
let (finalizer, stream) = Self::new(shutdown);
Expand All @@ -91,19 +99,22 @@ where
}

fn finalizer_stream<T, S>(
mut shutdown: ShutdownSignal,
shutdown: Option<ShutdownSignal>,
mut new_entries: UnboundedReceiver<(BatchStatusReceiver, T)>,
mut status_receivers: S,
flush: Arc<Notify>,
) -> impl Stream<Item = (BatchStatus, T)>
where
S: Default + FuturesSet<FinalizerFuture<T>> + Unpin,
{
let handle_shutdown = shutdown.is_some();
let mut shutdown = OptionFuture::from(shutdown);

async_stream::stream! {
loop {
tokio::select! {
biased;
_ = &mut shutdown => break,
_ = &mut shutdown, if handle_shutdown => break,
_ = flush.notified() => {
// Drop all the existing status receivers and start over.
status_receivers = S::default();
Expand All @@ -116,7 +127,7 @@ where
entry: Some(entry),
});
}
// The new entry sender went away before shutdown, count it as a shutdown too.
// The end of the new entry channel signals shutdown
None => break,
},
finished = status_receivers.next(), if !status_receivers.is_empty() => match finished {
Expand Down
2 changes: 1 addition & 1 deletion src/sources/amqp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ async fn run_amqp_source(
acknowledgements: bool,
) -> Result<(), ()> {
let (finalizer, mut ack_stream) =
UnorderedFinalizer::<FinalizerEntry>::maybe_new(acknowledgements, shutdown.clone());
UnorderedFinalizer::<FinalizerEntry>::maybe_new(acknowledgements, Some(shutdown.clone()));

debug!("Starting amqp source, listening to queue {}.", config.queue);
let mut consumer = channel
Expand Down
2 changes: 1 addition & 1 deletion src/sources/aws_sqs/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl SqsSource {
pub async fn run(self, out: SourceSender, shutdown: ShutdownSignal) -> Result<(), ()> {
let mut task_handles = vec![];
let finalizer = self.acknowledgements.then(|| {
let (finalizer, mut ack_stream) = Finalizer::new(shutdown.clone());
let (finalizer, mut ack_stream) = Finalizer::new(Some(shutdown.clone()));
let client = self.client.clone();
let queue_url = self.queue_url.clone();
tokio::spawn(
Expand Down
2 changes: 1 addition & 1 deletion src/sources/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ pub fn file_source(
// The shutdown sent in to the finalizer is the global
// shutdown handle used to tell it to stop accepting new batch
// statuses and just wait for the remaining acks to come in.
let (finalizer, mut ack_stream) = OrderedFinalizer::<FinalizerEntry>::new(shutdown.clone());
let (finalizer, mut ack_stream) = OrderedFinalizer::<FinalizerEntry>::new(None);
// We set up a separate shutdown signal to tie together the
// finalizer and the checkpoint writer task in the file
// server, to make it continue to write out updated
Expand Down
2 changes: 1 addition & 1 deletion src/sources/gcp_pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ impl PubsubSource {
let mut stream = stream.into_inner();

let (finalizer, mut ack_stream) =
Finalizer::maybe_new(self.acknowledgements, self.shutdown.clone());
Finalizer::maybe_new(self.acknowledgements, Some(self.shutdown.clone()));
let mut pending_acks = 0;

loop {
Expand Down
2 changes: 1 addition & 1 deletion src/sources/journald.rs
Original file line number Diff line number Diff line change
Expand Up @@ -887,7 +887,7 @@ impl Finalizer {
shutdown: ShutdownSignal,
) -> Self {
if acknowledgements {
let (finalizer, mut ack_stream) = OrderedFinalizer::new(shutdown);
let (finalizer, mut ack_stream) = OrderedFinalizer::new(Some(shutdown));
tokio::spawn(async move {
while let Some((status, cursor)) = ack_stream.next().await {
if status == BatchStatus::Delivered {
Expand Down
2 changes: 1 addition & 1 deletion src/sources/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ async fn kafka_source(
) -> Result<(), ()> {
let consumer = Arc::new(consumer);
let (finalizer, mut ack_stream) =
OrderedFinalizer::<FinalizerEntry>::maybe_new(acknowledgements, shutdown.clone());
OrderedFinalizer::<FinalizerEntry>::maybe_new(acknowledgements, Some(shutdown.clone()));
let finalizer = finalizer.map(Arc::new);
if let Some(finalizer) = &finalizer {
consumer
Expand Down
2 changes: 1 addition & 1 deletion src/sources/splunk_hec/acknowledgements.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ impl Channel {
fn new(max_pending_acks_per_channel: u64, shutdown: ShutdownSignal) -> Self {
let ack_ids_status = Arc::new(Mutex::new(RoaringTreemap::new()));
let finalizer_ack_ids_status = Arc::clone(&ack_ids_status);
let (ack_event_finalizer, mut ack_stream) = UnorderedFinalizer::new(shutdown);
let (ack_event_finalizer, mut ack_stream) = UnorderedFinalizer::new(Some(shutdown));
tokio::spawn(async move {
while let Some((status, ack_id)) = ack_stream.next().await {
if status == BatchStatus::Delivered {
Expand Down

0 comments on commit 988a760

Please sign in to comment.