Skip to content

Commit

Permalink
Address rename errors (#310)
Browse files Browse the repository at this point in the history
* address ername errors

* even more renaming
  • Loading branch information
frankmcsherry authored Feb 26, 2021
1 parent 91ec2ba commit cdb9b70
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 60 deletions.
38 changes: 19 additions & 19 deletions src/operators/arrange/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ where
{
trace: Rc<RefCell<TraceBox<Tr>>>,
queues: Weak<RefCell<Vec<TraceAgentQueueWriter<Tr>>>>,
advance: Antichain<Tr::Time>,
through: Antichain<Tr::Time>,
logical_compaction: Antichain<Tr::Time>,
physical_compaction: Antichain<Tr::Time>,

operator: ::timely::dataflow::operators::generic::OperatorInfo,
logging: Option<::logging::Logger>,
Expand All @@ -55,21 +55,21 @@ where
type Cursor = Tr::Cursor;

fn set_logical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
self.trace.borrow_mut().adjust_get_logical_compaction(self.advance.borrow(), frontier);
self.advance.clear();
self.advance.extend(frontier.iter().cloned());
self.trace.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), frontier);
self.logical_compaction.clear();
self.logical_compaction.extend(frontier.iter().cloned());
}
fn get_logical_compaction(&mut self) -> AntichainRef<Tr::Time> {
self.advance.borrow()
self.logical_compaction.borrow()
}
fn set_physical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
debug_assert!(timely::PartialOrder::less_equal(&self.through.borrow(), &frontier));
self.trace.borrow_mut().adjust_through_frontier(self.through.borrow(), frontier);
self.through.clear();
self.through.extend(frontier.iter().cloned());
debug_assert!(timely::PartialOrder::less_equal(&self.physical_compaction.borrow(), &frontier));
self.trace.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), frontier);
self.physical_compaction.clear();
self.physical_compaction.extend(frontier.iter().cloned());
}
fn get_physical_compaction(&mut self) -> AntichainRef<Tr::Time> {
self.through.borrow()
self.physical_compaction.borrow()
}
fn cursor_through(&mut self, frontier: AntichainRef<Tr::Time>) -> Option<(Tr::Cursor, <Tr::Cursor as Cursor<Tr::Key, Tr::Val, Tr::Time, Tr::R>>::Storage)> {
self.trace.borrow_mut().trace.cursor_through(frontier)
Expand Down Expand Up @@ -100,8 +100,8 @@ where
let reader = TraceAgent {
trace: trace.clone(),
queues: Rc::downgrade(&queues),
advance: trace.borrow().get_logical_compactions.frontier().to_owned(),
through: trace.borrow().through_frontiers.frontier().to_owned(),
logical_compaction: trace.borrow().logical_compaction.frontier().to_owned(),
physical_compaction: trace.borrow().physical_compaction.frontier().to_owned(),
operator,
logging,
};
Expand Down Expand Up @@ -534,14 +534,14 @@ where

// increase counts for wrapped `TraceBox`.
let empty_frontier = Antichain::new();
self.trace.borrow_mut().adjust_get_logical_compaction(empty_frontier.borrow(), self.advance.borrow());
self.trace.borrow_mut().adjust_through_frontier(empty_frontier.borrow(), self.through.borrow());
self.trace.borrow_mut().adjust_logical_compaction(empty_frontier.borrow(), self.logical_compaction.borrow());
self.trace.borrow_mut().adjust_physical_compaction(empty_frontier.borrow(), self.physical_compaction.borrow());

TraceAgent {
trace: self.trace.clone(),
queues: self.queues.clone(),
advance: self.advance.clone(),
through: self.through.clone(),
logical_compaction: self.logical_compaction.clone(),
physical_compaction: self.physical_compaction.clone(),
operator: self.operator.clone(),
logging: self.logging.clone(),
}
Expand All @@ -563,7 +563,7 @@ where

// decrement borrow counts to remove all holds
let empty_frontier = Antichain::new();
self.trace.borrow_mut().adjust_get_logical_compaction(self.advance.borrow(), empty_frontier.borrow());
self.trace.borrow_mut().adjust_through_frontier(self.through.borrow(), empty_frontier.borrow());
self.trace.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), empty_frontier.borrow());
self.trace.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), empty_frontier.borrow());
}
}
75 changes: 34 additions & 41 deletions src/trace/wrappers/rc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ where
Tr: TraceReader
{
/// accumulated holds on times for advancement.
pub get_logical_compactions: MutableAntichain<Tr::Time>,
pub logical_compaction: MutableAntichain<Tr::Time>,
/// accumulated holds on times for distinction.
pub through_frontiers: MutableAntichain<Tr::Time>,
pub physical_compaction: MutableAntichain<Tr::Time>,
/// The wrapped trace.
pub trace: Tr,
}
Expand All @@ -49,35 +49,28 @@ where
/// process will fish these out and make sure that they are used for the initial read capabilities.
pub fn new(mut trace: Tr) -> Self {

let mut advance = MutableAntichain::new();
advance.update_iter(trace.get_logical_compaction().iter().cloned().map(|t| (t,1)));
// for time in trace.get_logical_compaction() {
// advance.update(time, 1);
// }

let mut through = MutableAntichain::new();
through.update_iter(trace.get_physical_compaction().iter().cloned().map(|t| (t,1)));
// for time in trace.get_physical_compaction() {
// through.update(time, 1);
// }
let mut logical_compaction = MutableAntichain::new();
logical_compaction.update_iter(trace.get_logical_compaction().iter().cloned().map(|t| (t,1)));
let mut physical_compaction = MutableAntichain::new();
physical_compaction.update_iter(trace.get_physical_compaction().iter().cloned().map(|t| (t,1)));

TraceBox {
get_logical_compactions: advance,
through_frontiers: through,
logical_compaction,
physical_compaction,
trace: trace,
}
}
/// Replaces elements of `lower` with those of `upper`.
pub fn adjust_get_logical_compaction(&mut self, lower: AntichainRef<Tr::Time>, upper: AntichainRef<Tr::Time>) {
self.get_logical_compactions.update_iter(upper.iter().cloned().map(|t| (t,1)));
self.get_logical_compactions.update_iter(lower.iter().cloned().map(|t| (t,-1)));
self.trace.set_logical_compaction(self.get_logical_compactions.frontier());
pub fn adjust_logical_compaction(&mut self, lower: AntichainRef<Tr::Time>, upper: AntichainRef<Tr::Time>) {
self.logical_compaction.update_iter(upper.iter().cloned().map(|t| (t,1)));
self.logical_compaction.update_iter(lower.iter().cloned().map(|t| (t,-1)));
self.trace.set_logical_compaction(self.logical_compaction.frontier());
}
/// Replaces elements of `lower` with those of `upper`.
pub fn adjust_through_frontier(&mut self, lower: AntichainRef<Tr::Time>, upper: AntichainRef<Tr::Time>) {
self.through_frontiers.update_iter(upper.iter().cloned().map(|t| (t,1)));
self.through_frontiers.update_iter(lower.iter().cloned().map(|t| (t,-1)));
self.trace.set_physical_compaction(self.through_frontiers.frontier());
pub fn adjust_physical_compaction(&mut self, lower: AntichainRef<Tr::Time>, upper: AntichainRef<Tr::Time>) {
self.physical_compaction.update_iter(upper.iter().cloned().map(|t| (t,1)));
self.physical_compaction.update_iter(lower.iter().cloned().map(|t| (t,-1)));
self.trace.set_physical_compaction(self.physical_compaction.frontier());
}
}

Expand All @@ -91,8 +84,8 @@ where
Tr::Time: Lattice+Ord+Clone+'static,
Tr: TraceReader,
{
get_logical_compaction: Antichain<Tr::Time>,
through_frontier: Antichain<Tr::Time>,
logical_compaction: Antichain<Tr::Time>,
physical_compaction: Antichain<Tr::Time>,
/// Wrapped trace. Please be gentle when using.
pub wrapper: Rc<RefCell<TraceBox<Tr>>>,
}
Expand All @@ -116,16 +109,16 @@ where
/// handle no longer requires access to times other than those in the future of `frontier`, but if
/// there are other handles to the same trace, it may not yet be able to compact.
fn set_logical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
self.wrapper.borrow_mut().adjust_get_logical_compaction(self.get_logical_compaction.borrow(), frontier);
self.get_logical_compaction = frontier.to_owned();
self.wrapper.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), frontier);
self.logical_compaction = frontier.to_owned();
}
fn get_logical_compaction(&mut self) -> AntichainRef<Tr::Time> { self.get_logical_compaction.borrow() }
fn get_logical_compaction(&mut self) -> AntichainRef<Tr::Time> { self.logical_compaction.borrow() }
/// Allows the trace to compact batches of times before `frontier`.
fn set_physical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
self.wrapper.borrow_mut().adjust_through_frontier(self.through_frontier.borrow(), frontier);
self.through_frontier = frontier.to_owned();
self.wrapper.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), frontier);
self.physical_compaction = frontier.to_owned();
}
fn get_physical_compaction(&mut self) -> AntichainRef<Tr::Time> { self.through_frontier.borrow() }
fn get_physical_compaction(&mut self) -> AntichainRef<Tr::Time> { self.physical_compaction.borrow() }
/// Creates a new cursor over the wrapped trace.
fn cursor_through(&mut self, frontier: AntichainRef<Tr::Time>) -> Option<(Tr::Cursor, <Tr::Cursor as Cursor<Tr::Key, Tr::Val, Tr::Time, Tr::R>>::Storage)> {
::std::cell::RefCell::borrow_mut(&self.wrapper).trace.cursor_through(frontier)
Expand All @@ -147,8 +140,8 @@ where
let wrapped = Rc::new(RefCell::new(TraceBox::new(trace)));

let handle = TraceRc {
get_logical_compaction: wrapped.borrow().get_logical_compactions.frontier().to_owned(),
through_frontier: wrapped.borrow().through_frontiers.frontier().to_owned(),
logical_compaction: wrapped.borrow().logical_compaction.frontier().to_owned(),
physical_compaction: wrapped.borrow().physical_compaction.frontier().to_owned(),
wrapper: wrapped.clone(),
};

Expand All @@ -163,11 +156,11 @@ where
{
fn clone(&self) -> Self {
// increase ref counts for this frontier
self.wrapper.borrow_mut().adjust_get_logical_compaction(Antichain::new().borrow(), self.get_logical_compaction.borrow());
self.wrapper.borrow_mut().adjust_through_frontier(Antichain::new().borrow(), self.through_frontier.borrow());
self.wrapper.borrow_mut().adjust_logical_compaction(Antichain::new().borrow(), self.logical_compaction.borrow());
self.wrapper.borrow_mut().adjust_physical_compaction(Antichain::new().borrow(), self.physical_compaction.borrow());
TraceRc {
get_logical_compaction: self.get_logical_compaction.clone(),
through_frontier: self.through_frontier.clone(),
logical_compaction: self.logical_compaction.clone(),
physical_compaction: self.physical_compaction.clone(),
wrapper: self.wrapper.clone(),
}
}
Expand All @@ -179,9 +172,9 @@ where
Tr: TraceReader,
{
fn drop(&mut self) {
self.wrapper.borrow_mut().adjust_get_logical_compaction(self.get_logical_compaction.borrow(), Antichain::new().borrow());
self.wrapper.borrow_mut().adjust_through_frontier(self.through_frontier.borrow(), Antichain::new().borrow());
self.get_logical_compaction = Antichain::new();
self.through_frontier = Antichain::new();
self.wrapper.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), Antichain::new().borrow());
self.wrapper.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), Antichain::new().borrow());
self.logical_compaction = Antichain::new();
self.physical_compaction = Antichain::new();
}
}

0 comments on commit cdb9b70

Please sign in to comment.