From cdb9b709715caeaa20ea511eb83d4c4caedef8e4 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Fri, 26 Feb 2021 16:22:16 -0500 Subject: [PATCH] Address rename errors (#310) * address ername errors * even more renaming --- src/operators/arrange/agent.rs | 38 ++++++++--------- src/trace/wrappers/rc.rs | 75 +++++++++++++++------------------- 2 files changed, 53 insertions(+), 60 deletions(-) diff --git a/src/operators/arrange/agent.rs b/src/operators/arrange/agent.rs index c27a04e45..289ed4c56 100644 --- a/src/operators/arrange/agent.rs +++ b/src/operators/arrange/agent.rs @@ -34,8 +34,8 @@ where { trace: Rc>>, queues: Weak>>>, - advance: Antichain, - through: Antichain, + logical_compaction: Antichain, + physical_compaction: Antichain, operator: ::timely::dataflow::operators::generic::OperatorInfo, logging: Option<::logging::Logger>, @@ -55,21 +55,21 @@ where type Cursor = Tr::Cursor; fn set_logical_compaction(&mut self, frontier: AntichainRef) { - self.trace.borrow_mut().adjust_get_logical_compaction(self.advance.borrow(), frontier); - self.advance.clear(); - self.advance.extend(frontier.iter().cloned()); + self.trace.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), frontier); + self.logical_compaction.clear(); + self.logical_compaction.extend(frontier.iter().cloned()); } fn get_logical_compaction(&mut self) -> AntichainRef { - self.advance.borrow() + self.logical_compaction.borrow() } fn set_physical_compaction(&mut self, frontier: AntichainRef) { - debug_assert!(timely::PartialOrder::less_equal(&self.through.borrow(), &frontier)); - self.trace.borrow_mut().adjust_through_frontier(self.through.borrow(), frontier); - self.through.clear(); - self.through.extend(frontier.iter().cloned()); + debug_assert!(timely::PartialOrder::less_equal(&self.physical_compaction.borrow(), &frontier)); + self.trace.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), frontier); + self.physical_compaction.clear(); + self.physical_compaction.extend(frontier.iter().cloned()); } fn get_physical_compaction(&mut self) -> AntichainRef { - self.through.borrow() + self.physical_compaction.borrow() } fn cursor_through(&mut self, frontier: AntichainRef) -> Option<(Tr::Cursor, >::Storage)> { self.trace.borrow_mut().trace.cursor_through(frontier) @@ -100,8 +100,8 @@ where let reader = TraceAgent { trace: trace.clone(), queues: Rc::downgrade(&queues), - advance: trace.borrow().get_logical_compactions.frontier().to_owned(), - through: trace.borrow().through_frontiers.frontier().to_owned(), + logical_compaction: trace.borrow().logical_compaction.frontier().to_owned(), + physical_compaction: trace.borrow().physical_compaction.frontier().to_owned(), operator, logging, }; @@ -534,14 +534,14 @@ where // increase counts for wrapped `TraceBox`. let empty_frontier = Antichain::new(); - self.trace.borrow_mut().adjust_get_logical_compaction(empty_frontier.borrow(), self.advance.borrow()); - self.trace.borrow_mut().adjust_through_frontier(empty_frontier.borrow(), self.through.borrow()); + self.trace.borrow_mut().adjust_logical_compaction(empty_frontier.borrow(), self.logical_compaction.borrow()); + self.trace.borrow_mut().adjust_physical_compaction(empty_frontier.borrow(), self.physical_compaction.borrow()); TraceAgent { trace: self.trace.clone(), queues: self.queues.clone(), - advance: self.advance.clone(), - through: self.through.clone(), + logical_compaction: self.logical_compaction.clone(), + physical_compaction: self.physical_compaction.clone(), operator: self.operator.clone(), logging: self.logging.clone(), } @@ -563,7 +563,7 @@ where // decrement borrow counts to remove all holds let empty_frontier = Antichain::new(); - self.trace.borrow_mut().adjust_get_logical_compaction(self.advance.borrow(), empty_frontier.borrow()); - self.trace.borrow_mut().adjust_through_frontier(self.through.borrow(), empty_frontier.borrow()); + self.trace.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), empty_frontier.borrow()); + self.trace.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), empty_frontier.borrow()); } } diff --git a/src/trace/wrappers/rc.rs b/src/trace/wrappers/rc.rs index fd270d71f..a0298f83d 100644 --- a/src/trace/wrappers/rc.rs +++ b/src/trace/wrappers/rc.rs @@ -31,9 +31,9 @@ where Tr: TraceReader { /// accumulated holds on times for advancement. - pub get_logical_compactions: MutableAntichain, + pub logical_compaction: MutableAntichain, /// accumulated holds on times for distinction. - pub through_frontiers: MutableAntichain, + pub physical_compaction: MutableAntichain, /// The wrapped trace. pub trace: Tr, } @@ -49,35 +49,28 @@ where /// process will fish these out and make sure that they are used for the initial read capabilities. pub fn new(mut trace: Tr) -> Self { - let mut advance = MutableAntichain::new(); - advance.update_iter(trace.get_logical_compaction().iter().cloned().map(|t| (t,1))); - // for time in trace.get_logical_compaction() { - // advance.update(time, 1); - // } - - let mut through = MutableAntichain::new(); - through.update_iter(trace.get_physical_compaction().iter().cloned().map(|t| (t,1))); - // for time in trace.get_physical_compaction() { - // through.update(time, 1); - // } + let mut logical_compaction = MutableAntichain::new(); + logical_compaction.update_iter(trace.get_logical_compaction().iter().cloned().map(|t| (t,1))); + let mut physical_compaction = MutableAntichain::new(); + physical_compaction.update_iter(trace.get_physical_compaction().iter().cloned().map(|t| (t,1))); TraceBox { - get_logical_compactions: advance, - through_frontiers: through, + logical_compaction, + physical_compaction, trace: trace, } } /// Replaces elements of `lower` with those of `upper`. - pub fn adjust_get_logical_compaction(&mut self, lower: AntichainRef, upper: AntichainRef) { - self.get_logical_compactions.update_iter(upper.iter().cloned().map(|t| (t,1))); - self.get_logical_compactions.update_iter(lower.iter().cloned().map(|t| (t,-1))); - self.trace.set_logical_compaction(self.get_logical_compactions.frontier()); + pub fn adjust_logical_compaction(&mut self, lower: AntichainRef, upper: AntichainRef) { + self.logical_compaction.update_iter(upper.iter().cloned().map(|t| (t,1))); + self.logical_compaction.update_iter(lower.iter().cloned().map(|t| (t,-1))); + self.trace.set_logical_compaction(self.logical_compaction.frontier()); } /// Replaces elements of `lower` with those of `upper`. - pub fn adjust_through_frontier(&mut self, lower: AntichainRef, upper: AntichainRef) { - self.through_frontiers.update_iter(upper.iter().cloned().map(|t| (t,1))); - self.through_frontiers.update_iter(lower.iter().cloned().map(|t| (t,-1))); - self.trace.set_physical_compaction(self.through_frontiers.frontier()); + pub fn adjust_physical_compaction(&mut self, lower: AntichainRef, upper: AntichainRef) { + self.physical_compaction.update_iter(upper.iter().cloned().map(|t| (t,1))); + self.physical_compaction.update_iter(lower.iter().cloned().map(|t| (t,-1))); + self.trace.set_physical_compaction(self.physical_compaction.frontier()); } } @@ -91,8 +84,8 @@ where Tr::Time: Lattice+Ord+Clone+'static, Tr: TraceReader, { - get_logical_compaction: Antichain, - through_frontier: Antichain, + logical_compaction: Antichain, + physical_compaction: Antichain, /// Wrapped trace. Please be gentle when using. pub wrapper: Rc>>, } @@ -116,16 +109,16 @@ where /// handle no longer requires access to times other than those in the future of `frontier`, but if /// there are other handles to the same trace, it may not yet be able to compact. fn set_logical_compaction(&mut self, frontier: AntichainRef) { - self.wrapper.borrow_mut().adjust_get_logical_compaction(self.get_logical_compaction.borrow(), frontier); - self.get_logical_compaction = frontier.to_owned(); + self.wrapper.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), frontier); + self.logical_compaction = frontier.to_owned(); } - fn get_logical_compaction(&mut self) -> AntichainRef { self.get_logical_compaction.borrow() } + fn get_logical_compaction(&mut self) -> AntichainRef { self.logical_compaction.borrow() } /// Allows the trace to compact batches of times before `frontier`. fn set_physical_compaction(&mut self, frontier: AntichainRef) { - self.wrapper.borrow_mut().adjust_through_frontier(self.through_frontier.borrow(), frontier); - self.through_frontier = frontier.to_owned(); + self.wrapper.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), frontier); + self.physical_compaction = frontier.to_owned(); } - fn get_physical_compaction(&mut self) -> AntichainRef { self.through_frontier.borrow() } + fn get_physical_compaction(&mut self) -> AntichainRef { self.physical_compaction.borrow() } /// Creates a new cursor over the wrapped trace. fn cursor_through(&mut self, frontier: AntichainRef) -> Option<(Tr::Cursor, >::Storage)> { ::std::cell::RefCell::borrow_mut(&self.wrapper).trace.cursor_through(frontier) @@ -147,8 +140,8 @@ where let wrapped = Rc::new(RefCell::new(TraceBox::new(trace))); let handle = TraceRc { - get_logical_compaction: wrapped.borrow().get_logical_compactions.frontier().to_owned(), - through_frontier: wrapped.borrow().through_frontiers.frontier().to_owned(), + logical_compaction: wrapped.borrow().logical_compaction.frontier().to_owned(), + physical_compaction: wrapped.borrow().physical_compaction.frontier().to_owned(), wrapper: wrapped.clone(), }; @@ -163,11 +156,11 @@ where { fn clone(&self) -> Self { // increase ref counts for this frontier - self.wrapper.borrow_mut().adjust_get_logical_compaction(Antichain::new().borrow(), self.get_logical_compaction.borrow()); - self.wrapper.borrow_mut().adjust_through_frontier(Antichain::new().borrow(), self.through_frontier.borrow()); + self.wrapper.borrow_mut().adjust_logical_compaction(Antichain::new().borrow(), self.logical_compaction.borrow()); + self.wrapper.borrow_mut().adjust_physical_compaction(Antichain::new().borrow(), self.physical_compaction.borrow()); TraceRc { - get_logical_compaction: self.get_logical_compaction.clone(), - through_frontier: self.through_frontier.clone(), + logical_compaction: self.logical_compaction.clone(), + physical_compaction: self.physical_compaction.clone(), wrapper: self.wrapper.clone(), } } @@ -179,9 +172,9 @@ where Tr: TraceReader, { fn drop(&mut self) { - self.wrapper.borrow_mut().adjust_get_logical_compaction(self.get_logical_compaction.borrow(), Antichain::new().borrow()); - self.wrapper.borrow_mut().adjust_through_frontier(self.through_frontier.borrow(), Antichain::new().borrow()); - self.get_logical_compaction = Antichain::new(); - self.through_frontier = Antichain::new(); + self.wrapper.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), Antichain::new().borrow()); + self.wrapper.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), Antichain::new().borrow()); + self.logical_compaction = Antichain::new(); + self.physical_compaction = Antichain::new(); } } \ No newline at end of file