diff --git a/src/operators/join.rs b/src/operators/join.rs index 588298907..59db65e0d 100644 --- a/src/operators/join.rs +++ b/src/operators/join.rs @@ -364,22 +364,29 @@ impl JoinCore for Arranged // TODO: in the case that this does not hold, instead "upgrade" the physical compaction frontier. assert!(PartialOrder::less_equal(&trace1.get_physical_compaction(), &acknowledged1.borrow())); + // We capture batch2 cursors first and establish work second to avoid taking a `RefCell` lock + // on both traces at the same time, as they could be the same trace and this would panic. + let mut batch2_cursors = Vec::new(); trace2.map_batches(|batch2| { acknowledged2.clone_from(batch2.upper()); + batch2_cursors.push((batch2.cursor(), batch2.clone())); + }); + // At this point, `ack2` should exactly equal `trace2.read_upper()`, as they are both determined by + // iterating through batches and capturing the upper bound. This is a great moment to assert that + // `trace2`'s physical compaction frontier is before the frontier of completed times in `trace2`. + // TODO: in the case that this does not hold, instead "upgrade" the physical compaction frontier. + assert!(PartialOrder::less_equal(&trace2.get_physical_compaction(), &acknowledged2.borrow())); + + // Load up deferred work using trace2 cursors and batches captured just above. + for (batch2_cursor, batch2) in batch2_cursors.into_iter() { // It is safe to ask for `ack1` because we have confirmed it to be in advance of `distinguish_since`. let (trace1_cursor, trace1_storage) = trace1.cursor_through(acknowledged1.borrow()).unwrap(); - let batch2_cursor = batch2.cursor(); // We could downgrade the capability here, but doing so is a bit complicated mathematically. // TODO: downgrade the capability by searching out the one time in `batch2.lower()` and not // in `batch2.upper()`. Only necessary for non-empty batches, as empty batches may not have // that property. todo2.push_back(Deferred::new(trace1_cursor, trace1_storage, batch2_cursor, batch2.clone(), capability.clone())); - }); - // At this point, `ack2` should exactly equal `trace2.read_upper()`, as they are both determined by - // iterating through batches and capturing the upper bound. This is a great moment to assert that - // `trace2`'s physical compaction frontier is before the frontier of completed times in `trace2`. - // TODO: in the case that this does not hold, instead "upgrade" the physical compaction frontier. - assert!(PartialOrder::less_equal(&trace2.get_physical_compaction(), &acknowledged2.borrow())); + } // Droppable handles to shared trace data structures. let mut trace1_option = Some(trace1);