Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make capture::source's token thread-safe #364

Merged
merged 3 commits into from
Apr 25, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 74 additions & 55 deletions src/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,24 @@ pub mod source {
use std::cell::RefCell;
use std::hash::Hash;
use std::rc::Rc;
use std::marker::{Send, Sync};
use std::sync::Arc;
use timely::dataflow::{Scope, Stream, operators::{Capability, CapabilitySet}};
use timely::progress::Timestamp;
use timely::scheduling::SyncActivator;
use timely::scheduling::{SyncActivator, activate::SyncActivateOnDrop};

// TODO(guswynn): implement this generally in timely
struct DropActivator {
activator: Arc<SyncActivator>,
}

impl Drop for DropActivator {
fn drop(&mut self) {
// Best effort: failure to activate
// is ignored
let _ = self.activator.activate();
}
}

/// Constructs a stream of updates from a source of messages.
///
Expand All @@ -277,7 +292,7 @@ pub mod source {
pub fn build<G, B, I, D, T, R>(
scope: G,
source_builder: B,
) -> (Box<dyn std::any::Any>, Stream<G, (D, T, R)>)
) -> (Box<dyn std::any::Any + Send + Sync>, Stream<G, (D, T, R)>)
where
G: Scope<Timestamp = T>,
B: FnOnce(SyncActivator) -> I,
Expand Down Expand Up @@ -324,8 +339,7 @@ pub mod source {
// Some message distribution logic depends on the number of workers.
let workers = scope.peers();

// Vector of strong references to capabilities, which can be dropped to terminate the sources.
let mut tokens = Vec::new();
let mut token = None;
// Frontier owned by the FEEDBACK operator and consulted by the MESSAGES operators.
let mut antichain = MutableAntichain::new();
antichain.update_iter(Some((T::minimum(), workers as i64)));
Expand All @@ -337,66 +351,71 @@ pub mod source {
let address = messages_op.operator_info().address;
let activator = scope.sync_activator_for(&address);
let activator2 = scope.activator_for(&address);
let activations = scope.activations();
let drop_activator = Arc::new(SyncActivateOnDrop::new((), scope.sync_activator_for(&address)));
let mut source = source_builder(activator);
let (mut updates_out, updates) = messages_op.new_output();
let (mut progress_out, progress) = messages_op.new_output();
let tokens_mut = &mut tokens;
messages_op.build(move |capabilities| {
messages_op.build(|capabilities| {

// A Weak that communicates whether the returned token has been dropped.
let drop_activator_weak = Arc::downgrade(&drop_activator);

token = Some(drop_activator);

// Read messages from some source; shuffle them to UPDATES and PROGRESS; share capability with FEEDBACK.
// First, wrap capabilities in a rc refcell so that they can be downgraded to weak references.
use timely::scheduling::activate::ActivateOnDrop;
let capability_sets = (CapabilitySet::from_elem(capabilities[0].clone()), CapabilitySet::from_elem(capabilities[1].clone()));
let capability_sets = ActivateOnDrop::new(capability_sets, Rc::new(address), activations);
let strong_capabilities = Rc::new(RefCell::new(capability_sets));
let local_capabilities = Rc::downgrade(&strong_capabilities);
tokens_mut.push(strong_capabilities);
let mut updates_caps = CapabilitySet::from_elem(capabilities[0].clone());
let mut progress_caps = CapabilitySet::from_elem(capabilities[1].clone());
// Capture the shared frontier to read out frontier updates to apply.
let local_frontier = shared_frontier.clone();
//
move |_frontiers| {
// First check to ensure that we haven't been terminated by someone dropping our tokens.
if let Some(capabilities) = local_capabilities.upgrade() {
let (updates_caps, progress_caps) = &mut **capabilities.borrow_mut();
// Consult our shared frontier, and ensure capabilities are downgraded to it.
let shared_frontier = local_frontier.borrow();
updates_caps.downgrade(&shared_frontier.frontier());
progress_caps.downgrade(&shared_frontier.frontier());

// Next check to see if we have been terminated by the source being complete.
if !updates_caps.is_empty() && !progress_caps.is_empty() {
let mut updates = updates_out.activate();
let mut progress = progress_out.activate();

// TODO(frank): this is a moment where multi-temporal capabilities need to be fixed up.
// Specifically, there may not be one capability valid for all updates.
let mut updates_session = updates.session(&updates_caps[0]);
let mut progress_session = progress.session(&progress_caps[0]);

// We presume the iterator will yield if appropriate.
while let Some(message) = source.next() {
match message {
Message::Updates(mut updates) => {
updates_session.give_vec(&mut updates);
if drop_activator_weak.upgrade().is_none() {
// Give up our capabilities
updates_caps.downgrade(&[]);
progress_caps.downgrade(&[]);
// never continue, even if we are (erroneously) activated again.
return;
}

// Consult our shared frontier, and ensure capabilities are downgraded to it.
let shared_frontier = local_frontier.borrow();
updates_caps.downgrade(&shared_frontier.frontier());
progress_caps.downgrade(&shared_frontier.frontier());

// Next check to see if we have been terminated by the source being complete.
if !updates_caps.is_empty() && !progress_caps.is_empty() {
let mut updates = updates_out.activate();
let mut progress = progress_out.activate();

// TODO(frank): this is a moment where multi-temporal capabilities need to be fixed up.
// Specifically, there may not be one capability valid for all updates.
let mut updates_session = updates.session(&updates_caps[0]);
let mut progress_session = progress.session(&progress_caps[0]);

// We presume the iterator will yield if appropriate.
while let Some(message) = source.next() {
match message {
Message::Updates(mut updates) => {
updates_session.give_vec(&mut updates);
}
Message::Progress(progress) => {
// We must send a copy of each progress message to all workers,
// but we can partition the counts across workers by timestamp.
let mut to_worker = vec![Vec::new(); workers];
for (time, count) in progress.counts {
to_worker[(time.hashed() as usize) % workers]
.push((time, count));
}
Message::Progress(progress) => {
// We must send a copy of each progress message to all workers,
// but we can partition the counts across workers by timestamp.
let mut to_worker = vec![Vec::new(); workers];
for (time, count) in progress.counts {
to_worker[(time.hashed() as usize) % workers]
.push((time, count));
}
for (worker, counts) in to_worker.into_iter().enumerate() {
progress_session.give((
worker,
Progress {
lower: progress.lower.clone(),
upper: progress.upper.clone(),
counts,
},
));
}
for (worker, counts) in to_worker.into_iter().enumerate() {
progress_session.give((
worker,
Progress {
lower: progress.lower.clone(),
upper: progress.upper.clone(),
counts,
},
));
}
}
}
Expand Down Expand Up @@ -558,7 +577,7 @@ pub mod source {
}
});

(Box::new(tokens), changes)
(Box::new(token.unwrap()), changes)
}
}

Expand Down