diff --git a/upstairs/src/control.rs b/upstairs/src/control.rs index 043f29d59..e9d83182a 100644 --- a/upstairs/src/control.rs +++ b/upstairs/src/control.rs @@ -122,16 +122,11 @@ async fn upstairs_fill_info( let ds_jobs = ds.ds_active.len(); let repair_done = ds.reconcile_repaired; let repair_needed = ds.reconcile_repair_needed; - let extents_repaired = ds.extents_repaired; - let extents_confirmed = ds.extents_confirmed; - let extent_limit = ds.extent_limit; - let live_repair_completed = ds.live_repair_completed; - let live_repair_aborted = ds.live_repair_aborted; - - // Convert from a map of extent limits to a Vec> - let extent_limit = ClientId::iter() - .map(|i| extent_limit.get(&i).cloned()) - .collect(); + let extents_repaired = ds.collect_stats(|c| c.extents_repaired); + let extents_confirmed = ds.collect_stats(|c| c.extents_confirmed); + let extent_limit = ds.collect_stats(|c| c.extent_limit); + let live_repair_completed = ds.collect_stats(|c| c.live_repair_completed); + let live_repair_aborted = ds.collect_stats(|c| c.live_repair_aborted); Ok(HttpResponseOk(UpstairsStats { state: act, @@ -140,11 +135,11 @@ async fn upstairs_fill_info( ds_jobs, repair_done, repair_needed, - extents_repaired: extents_repaired.0.to_vec(), - extents_confirmed: extents_confirmed.0.to_vec(), - extent_limit, - live_repair_completed: live_repair_completed.0.to_vec(), - live_repair_aborted: live_repair_aborted.0.to_vec(), + extents_repaired: extents_repaired.to_vec(), + extents_confirmed: extents_confirmed.to_vec(), + extent_limit: extent_limit.to_vec(), + live_repair_completed: live_repair_completed.to_vec(), + live_repair_aborted: live_repair_aborted.to_vec(), })) } diff --git a/upstairs/src/lib.rs b/upstairs/src/lib.rs index 9dc125476..179a42174 100644 --- a/upstairs/src/lib.rs +++ b/upstairs/src/lib.rs @@ -411,13 +411,6 @@ impl ClientMap { fn new() -> Self { Self(ClientData([None, None, None])) } - fn is_empty(&self) -> bool { - self.0.iter().all(Option::is_none) - } - /// Removes a value, returning the old value (or `None`) - fn remove(&mut self, c: &ClientId) -> Option { - self.0.insert(*c, None) - } /// Inserts a new value, returning the old value (or `None`) pub fn insert(&mut self, c: ClientId, v: T) -> Option { self.0.insert(c, Some(v)) @@ -736,14 +729,11 @@ where let mut ds = u.downstairs.lock().await; let active_count = ds.submitted_work(client_id); if active_count > MAX_ACTIVE_COUNT { - ds.flow_control[client_id] += 1; + ds.clients[client_id].flow_control += 1; return Ok(true); } else { let n = MAX_ACTIVE_COUNT - active_count; - let (new_work, flow_control) = ds.new_work(client_id, n); - if flow_control { - ds.flow_control[client_id] += 1; - } + let (new_work, flow_control) = ds.clients[client_id].new_work(n); (new_work, flow_control) } }; @@ -1000,7 +990,7 @@ where // re_new. { let mut ds = up.downstairs.lock().await; - let my_state = ds.ds_state[up_coms.client_id]; + let my_state = ds.clients[up_coms.client_id].state; info!( up.log, @@ -1031,7 +1021,7 @@ where * any work that we were holding that we did not flush. */ ds.re_new(up_coms.client_id); - assert!(ds.extent_limit.get(&up_coms.client_id).is_none()); + assert!(ds.clients[up_coms.client_id].extent_limit.is_none()); } _ => { panic!( @@ -1530,10 +1520,11 @@ where up.add_ds_region(up_coms.client_id, region_def).await?; // Match on the current state of this downstairs - let my_state = { - let state = &up.downstairs.lock().await.ds_state; - state[up_coms.client_id] - }; + let my_state = up.downstairs + .lock() + .await + .clients[up_coms.client_id] + .state; match my_state { DsState::Offline => { /* @@ -1600,7 +1591,7 @@ where let mut ds = up.downstairs.lock().await; drop(active); - let my_state = ds.ds_state[up_coms.client_id]; + let my_state = ds.clients[up_coms.client_id].state; if my_state == DsState::Replacing { bail!( "[{}] exits negotiation, replacing", @@ -1618,7 +1609,7 @@ where // Assert now, but this should eventually be an // error and move the downstairs to failed. XXX assert_eq!( - ds.ds_last_flush[up_coms.client_id], + ds.clients[up_coms.client_id].last_flush, last_flush_number ); up.ds_transition_with_lock( @@ -1644,7 +1635,7 @@ where let mut ds = up.downstairs.lock().await; drop(active); - let my_state = ds.ds_state[up_coms.client_id]; + let my_state = ds.clients[up_coms.client_id].state; match my_state { DsState::WaitActive => { up.ds_transition_with_lock( @@ -1693,8 +1684,9 @@ where }; let old_rm = ds - .region_metadata - .insert(up_coms.client_id, dsr); + .clients[up_coms.client_id] + .region_metadata + .replace(dsr); warn!( up.log, @@ -1844,7 +1836,7 @@ where }; { let mut ds = up.downstairs.lock().await; - let state = ds.ds_state[up_coms.client_id]; + let state = ds.clients[up_coms.client_id].state; match state { DsState::Replay => { info!( @@ -1950,7 +1942,8 @@ where * tear down this connection and require the downstairs to * reconnect and eventually go into LiveRepair mode. */ - let my_state = up_c.downstairs.lock().await.ds_state[client_id]; + let my_state = + up_c.downstairs.lock().await.clients[client_id].state; if my_state == DsState::Faulted || my_state == DsState::Replacing { @@ -2639,7 +2632,7 @@ async fn looper( } // Get the specific information for the downstairs we will operate on. let ds = up.downstairs.lock().await; - let target: SocketAddr = ds.ds_target[up_coms.client_id]; + let target: SocketAddr = ds.clients[up_coms.client_id].target.unwrap(); drop(ds); /* @@ -2723,7 +2716,7 @@ async fn looper( } }; - up.downstairs.lock().await.connected[up_coms.client_id] += 1; + up.downstairs.lock().await.clients[up_coms.client_id].connected += 1; /* * Once we have a connected downstairs, the proc task takes over and @@ -2808,55 +2801,14 @@ impl WorkCounts { */ #[derive(Debug)] struct Downstairs { - /** - * UUID for each downstairs, index by client ID - */ - ds_uuid: ClientMap, - - /// The IP:Port of each of the downstairs - /// - /// This is left unpopulated in some unit tests - ds_target: ClientMap, - - /** - * The IP:Port for repair when contacting the downstairs, hashed by - * the client index the upstairs gives it. - */ - ds_repair: ClientMap, - - /** - * The state of a downstairs connection, based on client ID - * Ready here indicates it can receive IO. - */ - ds_state: ClientData, - - /** - * The last flush job ID that each downstairs has acked. - * - * Note that this is a job ID; not a flush ID! - */ - ds_last_flush: ClientData, - - /** - * Errors recorded, indexed by client ID. - */ - downstairs_errors: ClientData, + /// Per-client data + clients: ClientData, /** * The active list of IO for the downstairs. */ ds_active: ActiveJobs, - /** - * Cache of new jobs, indexed by client ID. - */ - ds_new: ClientData>, - - /** - * Jobs that have been skipped, indexed by client ID. - */ - ds_skipped_jobs: ClientData>, - /// The number of write bytes that haven't finished yet /// /// This is used to configure backpressure to the host, because writes @@ -2883,17 +2835,6 @@ struct Downstairs { */ completed_jobs: AllocRingBuffer, - /** - * On Startup, we collect info from each downstairs region. We use that - * info to make sure that all three regions in a region set are the - * same, and if not the same, to decide which data we will consider - * valid and make the other downstairs contain that same data. - * - * We also determine the next flush ID and verify the generation - * number. - */ - region_metadata: ClientMap, - /** * This holds the current piece of repair work that the three * downstairs are working on. It can be New, InProgress, Skipped, @@ -2919,65 +2860,6 @@ struct Downstairs { */ log: Logger, - /** - * Counters for the in flight work for the downstairs - */ - io_state_count: IOStateCount, - - /** - * Live Repair info - * This will contain the extent info for each downstairs as reported - * by those downstairs and is used to decide if an extent requires - * repair or not. - */ - repair_info: ClientMap, - - /** - * Count of extents repaired live. - */ - extents_repaired: ClientData, - - /** - * Count of extents checked but not needing live repair. - */ - extents_confirmed: ClientData, - - /** - * Count of time a downstairs LiveRepair completed. - */ - live_repair_completed: ClientData, - - /** - * Count of time a downstairs LiveRepair was aborted. - */ - live_repair_aborted: ClientData, - - /** - * Times we skipped repairing a downstairs because we are running - * as read_only. - */ - ro_lr_skipped: ClientData, - - /** - * Extent limit, if set, indicates the extent where LiveRepair has already - * submitted, or possibly even already finished the LiveRepair of this - * extent. If you are changing this value, it must happen at the same - * time the repair IOs are enqueued on the work list for the extent under - * repair, don't release the downstairs lock until both are done. - * - * This limit, if used in a flush indicates that extents <= this - * value should be issued a flush, and extents > this value should - * not be flushed. - * - * When deciding to skip an IO on a downstairs in LiveRepair, any - * IO at or below this extent should go ahead and be submitted. Any IO - * above this extent should still be skipped. - * - * This is only used during live repair, and will only ever be - * set on a downstairs that is undergoing live repair. - */ - extent_limit: ClientMap, - /** * Live Repair Job IDs * If, while running live repair, we have an IO that spans repaired @@ -2998,58 +2880,29 @@ struct Downstairs { * prevent more than one repair task from running at the same time. */ repair_min_id: Option, - - /** - * Count of downstairs connections - */ - connected: ClientData, - - /** - * Count of downstairs replacements - */ - replaced: ClientData, - - /** - * Count of times a downstairs has had flow control turned on - */ - flow_control: ClientData, } impl Downstairs { fn new(log: Logger, ds_target: ClientMap) -> Self { + let clients = [ + DownstairsClient::new(ds_target.get(&ClientId::new(0)).copied()), + DownstairsClient::new(ds_target.get(&ClientId::new(1)).copied()), + DownstairsClient::new(ds_target.get(&ClientId::new(2)).copied()), + ]; Self { - ds_uuid: ClientMap::new(), - ds_target, - ds_repair: ClientMap::new(), - ds_state: ClientData::new(DsState::New), - ds_last_flush: ClientData::new(JobId(0)), - downstairs_errors: ClientData::new(0), + clients: ClientData(clients), ds_active: ActiveJobs::new(), - ds_new: ClientData::new(BTreeSet::new()), - ds_skipped_jobs: ClientData::new(HashSet::new()), write_bytes_outstanding: 0, completed: AllocRingBuffer::new(2048), completed_jobs: AllocRingBuffer::new(8), next_id: JobId(1000), - region_metadata: ClientMap::new(), reconcile_current_work: None, reconcile_task_list: VecDeque::new(), reconcile_repaired: 0, reconcile_repair_needed: 0, log: log.new(o!("" => "downstairs".to_string())), - io_state_count: IOStateCount::new(), - repair_info: ClientMap::new(), - extents_repaired: ClientData::new(0), - extents_confirmed: ClientData::new(0), - live_repair_completed: ClientData::new(0), - live_repair_aborted: ClientData::new(0), - ro_lr_skipped: ClientData::new(0), - extent_limit: ClientMap::new(), repair_job_ids: BTreeMap::new(), repair_min_id: None, - connected: ClientData::new(0), - replaced: ClientData::new(0), - flow_control: ClientData::new(0), } } @@ -3057,8 +2910,9 @@ impl Downstairs { * Live repair is over, Clean up any repair related settings. */ fn end_live_repair(&mut self) { - self.repair_info = ClientMap::new(); - self.extent_limit = ClientMap::new(); + for c in self.clients.iter_mut() { + c.end_live_repair(); + } self.repair_job_ids = BTreeMap::new(); self.repair_min_id = None; } @@ -3108,12 +2962,12 @@ impl Downstairs { let new_state = IOState::InProgress; let old_state = job.state.insert(client_id, new_state.clone()); assert_eq!(old_state, IOState::New); - self.io_state_count.decr(&old_state, client_id); - self.io_state_count.incr(&new_state, client_id); + self.clients[client_id].io_state_count.decr(&old_state); + self.clients[client_id].io_state_count.incr(&new_state); let mut out = job.work.clone(); drop(handle); - if self.dependencies_need_cleanup(client_id) { + if self.clients[client_id].dependencies_need_cleanup() { match &mut out { IOop::Write { dependencies, .. } | IOop::WriteUnwritten { dependencies, .. } @@ -3135,24 +2989,13 @@ impl Downstairs { // If our downstairs is under repair, then include any extent limit sent // in the IOop; otherwise, clear it out if let IOop::Flush { extent_limit, .. } = &mut out { - if self.ds_state[client_id] != DsState::LiveRepair { + if self.clients[client_id].state != DsState::LiveRepair { *extent_limit = None; } } Some(out) } - /* - * Determine if the conditions exist where we need to remove dependencies - * for an IOop during live repair. We only need to do this if the - * downstairs in question is in LiveRepair, and there are skipped - * jobs for this downstairs. - */ - fn dependencies_need_cleanup(&mut self, client_id: ClientId) -> bool { - self.ds_state[client_id] == DsState::LiveRepair - && !self.ds_skipped_jobs[client_id].is_empty() - } - /// Given a client ID that is undergoing LiveRepair, go through the list /// of dependencies and remove any jobs that this downstairs has already /// skipped, as the downstairs on the other side will not have received @@ -3174,13 +3017,13 @@ impl Downstairs { "[{}] {} Remove check skipped:{:?} from deps:{:?}", client_id, ds_id, - self.ds_skipped_jobs[client_id], + self.clients[client_id].skipped_jobs, deps ); - assert_eq!(self.ds_state[client_id], DsState::LiveRepair); + assert_eq!(self.clients[client_id].state, DsState::LiveRepair); assert!(self.repair_min_id.is_some()); - deps.retain(|x| !self.ds_skipped_jobs[client_id].contains(x)); + deps.retain(|x| !self.clients[client_id].skipped_jobs.contains(x)); // If we are repairing, then there must be a repair_min_id set so we // know where to stop with dependency inclusion. @@ -3215,9 +3058,9 @@ impl Downstairs { */ fn repair_or_abort(&mut self) -> Result<()> { let not_ready = self - .ds_state + .clients .iter() - .filter(|state| **state != DsState::Repair) + .filter(|client| client.state != DsState::Repair) .count(); if not_ready > 0 { @@ -3226,7 +3069,9 @@ impl Downstairs { * Mark any downstairs that have not changed as failed * and return error. */ - for (i, s) in self.ds_state.iter_mut().enumerate() { + for (i, s) in + self.clients.iter_mut().map(|c| &mut c.state).enumerate() + { if *s == DsState::Repair { *s = DsState::FailedRepair; error!(self.log, "Mark {} as FAILED REPAIR", i); @@ -3266,7 +3111,7 @@ impl Downstairs { * should just continue waiting for work to show up. */ fn rep_in_progress(&mut self, client_id: ClientId) -> Option { - if self.ds_state[client_id] != DsState::Repair { + if self.clients[client_id].state != DsState::Repair { return None; } if let Some(job) = &mut self.reconcile_current_work { @@ -3325,7 +3170,7 @@ impl Downstairs { * Given a client ID, return the SocketAddr for repair to use. */ fn repair_addr(&mut self, client_id: ClientId) -> SocketAddr { - *self.ds_repair.get(&client_id).unwrap() + self.clients[client_id].repair_addr.unwrap() } /** @@ -3425,15 +3270,17 @@ impl Downstairs { if matches!(state, IOState::InProgress | IOState::New) { info!(self.log, "{} change {} to skipped", client_id, ds_id); let old_state = job.state.insert(client_id, IOState::Skipped); - self.io_state_count.decr(&old_state, client_id); - self.io_state_count.incr(&IOState::Skipped, client_id); - self.ds_skipped_jobs[client_id].insert(*ds_id); + self.clients[client_id].io_state_count.decr(&old_state); + self.clients[client_id] + .io_state_count + .incr(&IOState::Skipped); + self.clients[client_id].skipped_jobs.insert(*ds_id); } }); // All of IOState::New jobs are now IOState::Skipped, so clear our // cache of new jobs for this downstairs. - self.ds_new[client_id].clear(); + self.clients[client_id].new_jobs.clear(); } /** @@ -3453,7 +3300,7 @@ impl Downstairs { * happen. */ fn re_new(&mut self, client_id: ClientId) { - let lf = self.ds_last_flush[client_id]; + let lf = self.clients[client_id].last_flush; info!( self.log, @@ -3521,9 +3368,9 @@ impl Downstairs { let old_state = job.state.insert(client_id, IOState::New); job.replay = true; if old_state != IOState::New { - self.io_state_count.decr(&old_state, client_id); - self.io_state_count.incr(&IOState::New, client_id); - self.ds_new[client_id].insert(*ds_id); + self.clients[client_id].io_state_count.decr(&old_state); + self.clients[client_id].io_state_count.incr(&IOState::New); + self.clients[client_id].new_jobs.insert(*ds_id); } }); } @@ -3551,9 +3398,11 @@ impl Downstairs { if matches!(state, IOState::InProgress | IOState::New) { let old_state = job.state.insert(client_id, IOState::Skipped); - self.io_state_count.decr(&old_state, client_id); - self.io_state_count.incr(&IOState::Skipped, client_id); - self.ds_skipped_jobs[client_id].insert(*ds_id); + self.clients[client_id].io_state_count.decr(&old_state); + self.clients[client_id] + .io_state_count + .incr(&IOState::Skipped); + self.clients[client_id].skipped_jobs.insert(*ds_id); number_jobs_skipped += 1; // Check to see if this being skipped means we can ACK @@ -3597,42 +3446,19 @@ impl Downstairs { // We have eliminated all of our jobs in IOState::New above; flush // our cache to reflect that. - self.ds_new[client_id].clear(); + self.clients[client_id].new_jobs.clear(); // As this downstairs is now faulted, we clear the extent_limit. - self.extent_limit.remove(&client_id); + self.clients[client_id].extent_limit = None; notify_guest } - /// Return a list of downstairs request IDs that represent unissued - /// requests for this client. - /// - /// Returns a tuple of `(jobs, flow control)` where flow control is true if - /// the jobs list has been clamped to `max_count`. - fn new_work( - &mut self, - client_id: ClientId, - max_count: usize, - ) -> (BTreeSet, bool) { - if max_count >= self.ds_new[client_id].len() { - // Happy path: we can grab everything - (std::mem::take(&mut self.ds_new[client_id]), false) - } else { - // Otherwise, pop elements from the queue - let mut out = BTreeSet::new(); - for _ in 0..max_count { - out.insert(self.ds_new[client_id].pop_first().unwrap()); - } - (out, true) - } - } - /** * Called to requeue a single job that was previously found by calling * [`new_work`], presumably due to flow control. */ fn requeue_one(&mut self, client_id: ClientId, work: JobId) { - self.ds_new[client_id].insert(work); + self.clients[client_id].new_jobs.insert(work); } /** @@ -3640,7 +3466,7 @@ impl Downstairs { * for this client, but don't yet have a response. */ fn submitted_work(&self, client_id: ClientId) -> usize { - self.io_state_count.in_progress[client_id] as usize + self.clients[client_id].io_state_count.in_progress as usize } /** @@ -3648,8 +3474,7 @@ impl Downstairs { * work we have for a downstairs. */ fn total_live_work(&self, client_id: ClientId) -> usize { - (self.io_state_count.new[client_id] - + self.io_state_count.in_progress[client_id]) as usize + self.clients[client_id].total_live_work() } /** @@ -3672,7 +3497,7 @@ impl Downstairs { for cid in ClientId::iter() { assert_eq!(io.state[cid], IOState::New); - let current = self.ds_state[cid]; + let current = self.clients[cid].state; // If a downstairs is faulted or ready for repair, we can move // that job directly to IOState::Skipped // If a downstairs is in repair, then we need to see if this @@ -3686,38 +3511,41 @@ impl Downstairs { | DsState::Replacing | DsState::LiveRepairReady => { io.state.insert(cid, IOState::Skipped); - self.io_state_count.incr(&IOState::Skipped, cid); + self.clients[cid].io_state_count.incr(&IOState::Skipped); skipped += 1; - self.ds_skipped_jobs[cid].insert(io.ds_id); + self.clients[cid].skipped_jobs.insert(io.ds_id); } DsState::LiveRepair => { // Pick the latest repair limit that's relevant for this // downstairs. This is either the extent under repair (if // there are no reserved repair jobs), or the last extent // for which we have reserved a repair job ID. - let my_limit = self.extent_limit.get(&cid).map(|first| { - self.repair_job_ids - .last_key_value() - .map(|(k, _)| *k) - .unwrap_or(*first as u64) - }); + let my_limit = + self.clients[cid].extent_limit.map(|first| { + self.repair_job_ids + .last_key_value() + .map(|(k, _)| *k) + .unwrap_or(first as u64) + }); assert!(self.repair_min_id.is_some()); if io.work.send_io_live_repair(my_limit) { // Leave this IO as New, the downstairs will receive it. - self.io_state_count.incr(&IOState::New, cid); - self.ds_new[cid].insert(io.ds_id); + self.clients[cid].io_state_count.incr(&IOState::New); + self.clients[cid].new_jobs.insert(io.ds_id); } else { // Move this IO to skipped, we are not ready for // the downstairs to receive it. io.state.insert(cid, IOState::Skipped); - self.io_state_count.incr(&IOState::Skipped, cid); + self.clients[cid] + .io_state_count + .incr(&IOState::Skipped); skipped += 1; - self.ds_skipped_jobs[cid].insert(io.ds_id); + self.clients[cid].skipped_jobs.insert(io.ds_id); } } _ => { - self.io_state_count.incr(&IOState::New, cid); - self.ds_new[cid].insert(io.ds_id); + self.clients[cid].io_state_count.incr(&IOState::New); + self.clients[cid].new_jobs.insert(io.ds_id); } } } @@ -3769,7 +3597,7 @@ impl Downstairs { for cid in ClientId::iter() { assert_eq!(io.state[cid], IOState::New); - let current = self.ds_state[cid]; + let current = self.clients[cid].state; // If a downstairs is faulted, we can move that job directly // to IOState::Skipped. match current { @@ -3778,12 +3606,12 @@ impl Downstairs { | DsState::Replacing | DsState::LiveRepairReady => { io.state.insert(cid, IOState::Skipped); - self.io_state_count.incr(&IOState::Skipped, cid); - self.ds_skipped_jobs[cid].insert(io.ds_id); + self.clients[cid].io_state_count.incr(&IOState::Skipped); + self.clients[cid].skipped_jobs.insert(io.ds_id); } _ => { - self.io_state_count.incr(&IOState::New, cid); - self.ds_new[cid].insert(io.ds_id); + self.clients[cid].io_state_count.incr(&IOState::New); + self.clients[cid].new_jobs.insert(io.ds_id); } } } @@ -4231,8 +4059,8 @@ impl Downstairs { }; let old_state = job.state.insert(client_id, new_state.clone()); - self.io_state_count.decr(&old_state, client_id); - self.io_state_count.incr(&new_state, client_id); + self.clients[client_id].io_state_count.decr(&old_state); + self.clients[client_id].io_state_count.incr(&new_state); /* * Verify the job was InProgress @@ -4276,7 +4104,7 @@ impl Downstairs { snapshot_details: _, extent_limit: _, } => { - self.downstairs_errors[client_id] += 1; + self.clients[client_id].downstairs_errors += 1; } // If a repair job errors, mark that downstairs as bad @@ -4310,7 +4138,7 @@ impl Downstairs { // well as throw out the whole repair and start // over as we can no longer trust results from // the downstairs under repair. - self.downstairs_errors[client_id] += 1; + self.clients[client_id].downstairs_errors += 1; } // If a read job fails, we sometimes need to panic. @@ -4366,7 +4194,7 @@ impl Downstairs { snapshot_details: _, extent_limit: _, } => { - self.ds_last_flush[client_id] = ds_id; + self.clients[client_id].last_flush = ds_id; } IOop::Read { dependencies: _dependencies, @@ -4586,7 +4414,7 @@ impl Downstairs { ); } } - self.ds_last_flush[client_id] = ds_id; + self.clients[client_id].last_flush = ds_id; } IOop::ExtentClose { dependencies: _, @@ -4608,9 +4436,9 @@ impl Downstairs { assert!(read_data.is_empty()); assert!(extent_info.is_some()); - let ci = self + let ci = self.clients[client_id] .repair_info - .insert(client_id, extent_info.unwrap()); + .replace(extent_info.unwrap()); if ci.is_some() { panic!( "[{}] Unexpected repair found on insertion: {:?}", @@ -4772,7 +4600,7 @@ impl Downstairs { self.completed_jobs.push(summary); for cid in ClientId::iter() { let old_state = &job.state[cid]; - self.io_state_count.decr(old_state, cid); + self.clients[cid].io_state_count.decr(old_state); } } // Now that we've collected jobs to retire, remove them from the map @@ -4799,7 +4627,7 @@ impl Downstairs { debug!(self.log, "[rc] retire {} clears {:?}", ds_id, retired); // Only keep track of skipped jobs at or above the flush. for cid in ClientId::iter() { - self.ds_skipped_jobs[cid].retain(|&x| x >= ds_id); + self.clients[cid].skipped_jobs.retain(|&x| x >= ds_id); } } } @@ -4918,7 +4746,7 @@ impl Downstairs { fn get_extent_under_repair(&self) -> Option> { let mut extent_under_repair = None; for cid in ClientId::iter() { - if let Some(&eur) = self.extent_limit.get(&cid) { + if let Some(eur) = self.clients[cid].extent_limit { if extent_under_repair.is_none() { extent_under_repair = Some(eur as u64); } else { @@ -4960,6 +4788,230 @@ impl Downstairs { fn get_extents_for(&self, job: &DownstairsIO) -> ImpactedBlocks { self.ds_active.get_extents_for(job.ds_id) } + + /// Collects stats from the three `DownstairsClient`s + pub fn collect_stats T>( + &self, + f: F, + ) -> [T; 3] { + [ + f(&self.clients[ClientId::new(0)]), + f(&self.clients[ClientId::new(1)]), + f(&self.clients[ClientId::new(2)]), + ] + } + + pub fn io_state_count(&self) -> IOStateCount { + let d = self.collect_stats(|c| c.io_state_count); + let f = |g: fn(ClientIOStateCount) -> u32| { + ClientData([g(d[0]), g(d[1]), g(d[2])]) + }; + IOStateCount { + new: f(|d| d.new), + in_progress: f(|d| d.in_progress), + done: f(|d| d.done), + skipped: f(|d| d.skipped), + error: f(|d| d.error), + } + } +} + +#[derive(Debug)] +struct DownstairsClient { + /** + * UUID for this downstairs + */ + uuid: Option, + + /// The IP:Port of each of the downstairs + /// + /// This is left unpopulated in some unit tests + target: Option, + + /** + * The IP:Port for repair when contacting the downstairs, hashed by + * the client index the upstairs gives it. + */ + repair_addr: Option, + + /** + * The state of a downstairs connection, based on client ID + * Ready here indicates it can receive IO. + */ + state: DsState, + + /** + * The last flush job ID that each downstairs has acked. + * + * Note that this is a job ID; not a flush ID! + */ + last_flush: JobId, + + /** + * Errors recorded + */ + downstairs_errors: usize, + + /** + * Cache of new jobs + */ + new_jobs: BTreeSet, + + /** + * Jobs that have been skipped + */ + skipped_jobs: BTreeSet, + + /** + * On Startup, we collect info from each downstairs region. We use that + * info to make sure that all three regions in a region set are the + * same, and if not the same, to decide which data we will consider + * valid and make the other downstairs contain that same data. + * + * We also determine the next flush ID and verify the generation + * number. + */ + region_metadata: Option, + + /** + * Live Repair info + * This will contain the extent info for each downstairs as reported + * by those downstairs and is used to decide if an extent requires + * repair or not. + */ + repair_info: Option, + + /** + * Count of extents repaired live. + */ + extents_repaired: usize, + + /** + * Count of extents checked but not needing live repair. + */ + extents_confirmed: usize, + + /** + * Count of time a downstairs LiveRepair completed. + */ + live_repair_completed: usize, + + /** + * Count of time a downstairs LiveRepair was aborted. + */ + live_repair_aborted: usize, + + /** + * Times we skipped repairing a downstairs because we are running + * as read_only. + */ + ro_lr_skipped: usize, + + /** + * Extent limit, if set, indicates the extent where LiveRepair has already + * submitted, or possibly even already finished the LiveRepair of this + * extent. If you are changing this value, it must happen at the same + * time the repair IOs are enqueued on the work list for the extent under + * repair, don't release the downstairs lock until both are done. + * + * This limit, if used in a flush indicates that extents <= this + * value should be issued a flush, and extents > this value should + * not be flushed. + * + * When deciding to skip an IO on a downstairs in LiveRepair, any + * IO at or below this extent should go ahead and be submitted. Any IO + * above this extent should still be skipped. + * + * This is only used during live repair, and will only ever be + * set on a downstairs that is undergoing live repair. + */ + extent_limit: Option, + + /** + * Count of downstairs connections + */ + connected: usize, + + /** + * Count of downstairs replacements + */ + replaced: usize, + + /** + * Count of times a downstairs has had flow control turned on + */ + flow_control: usize, + + /** + * Counters for the in flight work for the downstairs + */ + io_state_count: ClientIOStateCount, +} + +impl DownstairsClient { + fn new(target: Option) -> Self { + Self { + uuid: None, + target, + repair_addr: None, + state: DsState::New, + last_flush: JobId(0), + downstairs_errors: 0, + new_jobs: BTreeSet::new(), + skipped_jobs: BTreeSet::new(), + region_metadata: None, + repair_info: None, + extents_repaired: 0, + extents_confirmed: 0, + live_repair_completed: 0, + live_repair_aborted: 0, + ro_lr_skipped: 0, + extent_limit: None, + connected: 0, + replaced: 0, + flow_control: 0, + io_state_count: ClientIOStateCount::new(), + } + } + + fn end_live_repair(&mut self) { + self.repair_info = None; + self.extent_limit = None; + } + + /* + * Determine if the conditions exist where we need to remove dependencies + * for an IOop during live repair. We only need to do this if the + * downstairs in question is in LiveRepair, and there are skipped + * jobs for this downstairs. + */ + fn dependencies_need_cleanup(&self) -> bool { + self.state == DsState::LiveRepair && !self.skipped_jobs.is_empty() + } + + /// Return a list of downstairs request IDs that represent unissued + /// requests for this client. + /// + /// Returns a tuple of `(jobs, flow control)` where flow control is true if + /// the jobs list has been clamped to `max_count`. + fn new_work(&mut self, max_count: usize) -> (BTreeSet, bool) { + if max_count >= self.new_jobs.len() { + // Happy path: we can grab everything + (std::mem::take(&mut self.new_jobs), false) + } else { + // Otherwise, pop elements from the queue + let mut out = BTreeSet::new(); + for _ in 0..max_count { + out.insert(self.new_jobs.pop_first().unwrap()); + } + self.flow_control += 1; + (out, true) + } + } + + fn total_live_work(&self) -> usize { + (self.io_state_count.new + self.io_state_count.in_progress) as usize + } } #[derive(Debug, Copy, Clone)] @@ -5387,17 +5439,19 @@ impl Upstairs { let ds_count = self.ds_work_active().await; let ds_state = self.ds_state_copy().await; let ds = self.downstairs.lock().await; - let ds_io_count = ds.io_state_count; + let ds_io_count = ds.io_state_count(); let ds_reconciled = ds.reconcile_repaired; let ds_reconcile_needed = ds.reconcile_repair_needed; - let ds_live_repair_completed = ds.live_repair_completed; - let ds_live_repair_aborted = ds.live_repair_aborted; - let ds_connected = ds.connected; - let ds_replaced = ds.replaced; - let ds_flow_control = ds.flow_control; - let ds_extents_repaired = ds.extents_repaired; - let ds_extents_confirmed = ds.extents_confirmed; - let ds_ro_lr_skipped = ds.ro_lr_skipped; + let ds_live_repair_completed = + ds.collect_stats(|c| c.live_repair_completed); + let ds_live_repair_aborted = + ds.collect_stats(|c| c.live_repair_aborted); + let ds_connected = ds.collect_stats(|c| c.connected); + let ds_replaced = ds.collect_stats(|c| c.replaced); + let ds_flow_control = ds.collect_stats(|c| c.flow_control); + let ds_extents_repaired = ds.collect_stats(|c| c.extents_repaired); + let ds_extents_confirmed = ds.collect_stats(|c| c.extents_confirmed); + let ds_ro_lr_skipped = ds.collect_stats(|c| c.ro_lr_skipped); let up_backpressure = self.guest.backpressure_us.load(Ordering::SeqCst); let write_bytes_out = ds.write_bytes_outstanding; @@ -5411,14 +5465,14 @@ impl Upstairs { ds_io_count, ds_reconciled, ds_reconcile_needed, - ds_live_repair_completed: ds_live_repair_completed.0, - ds_live_repair_aborted: ds_live_repair_aborted.0, - ds_connected: ds_connected.0, - ds_replaced: ds_replaced.0, - ds_flow_control: ds_flow_control.0, - ds_extents_repaired: ds_extents_repaired.0, - ds_extents_confirmed: ds_extents_confirmed.0, - ds_ro_lr_skipped: ds_ro_lr_skipped.0, + ds_live_repair_completed, + ds_live_repair_aborted, + ds_connected, + ds_replaced, + ds_flow_control, + ds_extents_repaired, + ds_extents_confirmed, + ds_ro_lr_skipped, }; (msg, arg) }); @@ -5549,8 +5603,8 @@ impl Upstairs { * However: TODO: This is not done yet. */ let mut offline_ds = Vec::new(); - for (index, state) in ds.ds_state.iter().enumerate() { - if *state == DsState::Offline { + for (index, state) in ds.clients.iter().map(|c| c.state).enumerate() { + if state == DsState::Offline { offline_ds.push(ClientId::new(index as u8)); } } @@ -5598,21 +5652,28 @@ impl Upstairs { info!(self.log, "deactivate transition checking..."); let mut ds = self.downstairs.lock().await; let mut de_done = true; - ds.ds_state.iter_mut().for_each(|ds_state| { - if *ds_state == DsState::New || *ds_state == DsState::WaitActive - { - info!( - self.log, - "deactivate_transition {} Maybe ", *ds_state - ); - } else if *ds_state == DsState::Offline { - // TODO: support this - panic!("Can't deactivate when a downstairs is offline"); - } else { - info!(self.log, "deactivate_transition {} NO", *ds_state); - de_done = false; - } - }); + ds.clients + .iter_mut() + .map(|c| &mut c.state) + .for_each(|ds_state| { + if *ds_state == DsState::New + || *ds_state == DsState::WaitActive + { + info!( + self.log, + "deactivate_transition {} Maybe ", *ds_state + ); + } else if *ds_state == DsState::Offline { + // TODO: support this + panic!("Can't deactivate when a downstairs is offline"); + } else { + info!( + self.log, + "deactivate_transition {} NO", *ds_state + ); + de_done = false; + } + }); if de_done { info!(self.log, "All DS in the proper state! -> INIT"); active.up_state = UpState::Initializing; @@ -5774,7 +5835,7 @@ impl Upstairs { async fn last_flush_id(&self, client_id: ClientId) -> JobId { let ds = self.downstairs.lock().await; - ds.ds_last_flush[client_id] + ds.clients[client_id].last_flush } fn set_flush_clear(&self) { @@ -6190,7 +6251,7 @@ impl Upstairs { */ async fn ds_missing(&self, client_id: ClientId) { let mut ds = self.downstairs.lock().await; - let current = ds.ds_state[client_id]; + let current = ds.clients[client_id].state; let new_state = match current { DsState::Active => DsState::Offline, DsState::Replay => DsState::Offline, @@ -6224,7 +6285,7 @@ impl Upstairs { // Should we move jobs now? When do we move work that has // been submitted over to "skipped" - ds.ds_state[client_id] = new_state; + ds.clients[client_id].state = new_state; } /* @@ -6238,14 +6299,14 @@ impl Upstairs { */ async fn _ds_is_replay(&self, client_id: ClientId) -> bool { let mut ds = self.downstairs.lock().await; - if ds.ds_state[client_id] == DsState::Replay { + if ds.clients[client_id].state == DsState::Replay { info!( self.log, "[{}] {} Transition from Replay to Active", client_id, self.uuid ); - ds.ds_state[client_id] = DsState::Active; + ds.clients[client_id].state = DsState::Active; return true; } false @@ -6280,13 +6341,13 @@ impl Upstairs { client_id, self.uuid, self.session_id, - ds.ds_state[ClientId::new(0)], - ds.ds_state[ClientId::new(1)], - ds.ds_state[ClientId::new(2)], + ds.clients[ClientId::new(0)].state, + ds.clients[ClientId::new(1)].state, + ds.clients[ClientId::new(2)].state, new_state ); - let old_state = ds.ds_state[client_id]; + let old_state = ds.clients[client_id].state; /* * Check that this is a valid transition @@ -6455,10 +6516,10 @@ impl Upstairs { self.log, "[{}] Transition from {} to {}", client_id, - ds.ds_state[client_id], + ds.clients[client_id].state, new_state, ); - ds.ds_state[client_id] = new_state; + ds.clients[client_id].state = new_state; } else { panic!("[{}] transition to same state: {}", client_id, new_state); } @@ -6466,7 +6527,7 @@ impl Upstairs { async fn ds_state(&self, client_id: ClientId) -> DsState { let ds = self.downstairs.lock().await; - ds.ds_state[client_id] + ds.clients[client_id].state } /* @@ -6476,9 +6537,18 @@ impl Upstairs { * at it. */ fn mismatch_list(&self, ds: &Downstairs) -> Option { - let c0_rec = ds.region_metadata.get(&ClientId::new(0)).unwrap(); - let c1_rec = ds.region_metadata.get(&ClientId::new(1)).unwrap(); - let c2_rec = ds.region_metadata.get(&ClientId::new(2)).unwrap(); + let c0_rec = ds.clients[ClientId::new(0)] + .region_metadata + .as_ref() + .unwrap(); + let c1_rec = ds.clients[ClientId::new(1)] + .region_metadata + .as_ref() + .unwrap(); + let c2_rec = ds.clients[ClientId::new(2)] + .region_metadata + .as_ref() + .unwrap(); let log = self.log.new(o!("" => "mend".to_string())); DownstairsMend::new(c0_rec, c1_rec, c2_rec, log) @@ -6577,7 +6647,9 @@ impl Upstairs { */ let mut max_flush = 0; let mut max_gen = 0; - for (cid, rec) in ds.region_metadata.iter() { + for cid in ClientId::iter() { + let Some(rec) = ds.clients[cid].region_metadata.as_ref() + else { continue; }; let mf = rec.flush_numbers.iter().max().unwrap() + 1; if mf > max_flush { max_flush = mf; @@ -6665,16 +6737,19 @@ impl Upstairs { * while we have the downstairs lock. This will insure that * all downstairs enter the repair path. */ - ds.ds_state.iter_mut().for_each(|ds_state| { - info!(self.log, "Transition from {} to Repair", *ds_state); - /* - * This is a panic and not an error because we should - * not call this method without already verifying the - * downstairs are in the proper state. - */ - assert_eq!(*ds_state, DsState::WaitQuorum); - *ds_state = DsState::Repair; - }); + ds.clients + .iter_mut() + .map(|c| &mut c.state) + .for_each(|ds_state| { + info!(self.log, "Transition from {} to Repair", *ds_state); + /* + * This is a panic and not an error because we should + * not call this method without already verifying the + * downstairs are in the proper state. + */ + assert_eq!(*ds_state, DsState::WaitQuorum); + *ds_state = DsState::Repair; + }); info!( self.log, @@ -6866,9 +6941,9 @@ impl Upstairs { * proceed. */ let not_ready = ds - .ds_state + .clients .iter() - .filter(|state| **state != DsState::WaitQuorum) + .filter(|c| c.state != DsState::WaitQuorum) .count(); if not_ready > 0 { info!( @@ -6909,7 +6984,9 @@ impl Upstairs { assert_eq!(ds.ds_active.len(), 0); assert_eq!(ds.reconcile_task_list.len(), 0); - for (i, s) in ds.ds_state.iter_mut().enumerate() { + for (i, s) in + ds.clients.iter_mut().map(|c| &mut c.state).enumerate() + { if *s == DsState::WaitQuorum { *s = DsState::FailedRepair; warn!( @@ -6956,9 +7033,9 @@ impl Upstairs { assert_eq!(ds.reconcile_task_list.len(), 0); let ready = ds - .ds_state + .clients .iter() - .filter(|s| **s == DsState::Repair) + .filter(|c| c.state == DsState::Repair) .count(); if ready != 3 { @@ -6969,7 +7046,9 @@ impl Upstairs { * they were still repairing to FailedRepair which will * trigger a reconnect. */ - for (i, s) in ds.ds_state.iter_mut().enumerate() { + for (i, s) in + ds.clients.iter_mut().map(|c| &mut c.state).enumerate() + { if *s == DsState::Repair { *s = DsState::FailedRepair; warn!( @@ -6993,7 +7072,7 @@ impl Upstairs { bail!("Upstairs in unexpected state while reconciling"); } - for s in ds.ds_state.iter_mut() { + for s in ds.clients.iter_mut().map(|c| &mut c.state) { *s = DsState::Active; } active.set_active()?; @@ -7016,9 +7095,9 @@ impl Upstairs { let mut ds = self.downstairs.lock().await; let ready = ds - .ds_state + .clients .iter() - .filter(|s| **s == DsState::WaitQuorum) + .filter(|c| c.state == DsState::WaitQuorum) .count(); if ready != 3 { @@ -7029,7 +7108,7 @@ impl Upstairs { if active.up_state != UpState::Initializing { bail!("Upstairs in unexpected state while reconciling"); } - for s in ds.ds_state.iter_mut() { + for s in ds.clients.iter_mut().map(|c| &mut c.state) { *s = DsState::Active; } active.set_active()?; @@ -7073,7 +7152,7 @@ impl Upstairs { * DTraces uses this. */ async fn ds_state_copy(&self) -> ClientData { - self.downstairs.lock().await.ds_state + ClientData(self.downstairs.lock().await.collect_stats(|c| c.state)) } /** @@ -7098,7 +7177,7 @@ impl Upstairs { let mut state_line = String::new(); state_line.push_str(&self.uuid.to_string()); - for state in ds.ds_state.iter() { + for state in ds.clients.iter().map(|c| c.state) { state_line.push(' '); state_line.push_str(&state.to_string()); } @@ -7142,11 +7221,11 @@ impl Upstairs { * verified here. */ let mut ds = self.downstairs.lock().await; - if let Some(uuid) = ds.ds_uuid.get(&client_id) { - if *uuid != client_ddef.uuid() { + if let Some(uuid) = ds.clients[client_id].uuid { + if uuid != client_ddef.uuid() { // If we are replacing the downstairs, then a new UUID is // okay. - if ds.ds_state[client_id] == DsState::Replaced { + if ds.clients[client_id].state == DsState::Replaced { warn!( self.log, "[{}] replace downstairs uuid:{} with {}", @@ -7176,7 +7255,7 @@ impl Upstairs { * If it is an existing UUID, we already compared and it is good, * so the insert is unnecessary, but will result in the same UUID. */ - ds.ds_uuid.insert(client_id, client_ddef.uuid()); + ds.clients[client_id].uuid = Some(client_ddef.uuid()); let mut ddef = self.ddef.lock().await; @@ -7333,7 +7412,7 @@ impl Upstairs { * While the downstairs is away, it's OK to act on the result that * we already received, because it may never come back. */ - let ds_state = ds.ds_state[client_id]; + let ds_state = ds.clients[client_id].state; match ds_state { DsState::Active | DsState::Repair | DsState::LiveRepair => {} DsState::Faulted => { @@ -7475,12 +7554,12 @@ impl Upstairs { addr: SocketAddr, ) { let mut ds = self.downstairs.lock().await; - ds.ds_repair.insert(client_id, addr); + ds.clients[client_id].repair_addr = Some(addr); } async fn ds_clear_repair_address(&self, client_id: ClientId) { let mut ds = self.downstairs.lock().await; - ds.ds_repair.remove(&client_id); + ds.clients[client_id].repair_addr = None; } async fn replace_downstairs( @@ -7503,12 +7582,14 @@ impl Upstairs { // for a different downstairs. let mut new_client_id: Option = None; let mut old_client_id: Option = None; - for (client_id, ds_target) in ds.ds_target.iter() { - if *ds_target == new { + for client_id in ClientId::iter() { + let Some(ds_target) = ds.clients[client_id].target + else { continue; }; + if ds_target == new { new_client_id = Some(client_id); info!(self.log, "{id} found new target: {new} at {client_id}"); } - if *ds_target == old { + if ds_target == old { old_client_id = Some(client_id); info!(self.log, "{id} found old target: {old} at {client_id}"); } @@ -7529,7 +7610,7 @@ impl Upstairs { // We don't really know if the "old" matches what was old, // as that info is gone to us now, so assume it was true. - match ds.ds_state[new_client_id.unwrap()] { + match ds.clients[new_client_id.unwrap()].state { DsState::Replacing | DsState::Replaced | DsState::LiveRepairReady @@ -7560,7 +7641,7 @@ impl Upstairs { if client_id == old_client_id { continue; } - match ds.ds_state[client_id] { + match ds.clients[client_id].state { DsState::Replacing | DsState::Replaced | DsState::LiveRepairReady @@ -7568,7 +7649,7 @@ impl Upstairs { crucible_bail!( ReplaceRequestInvalid, "Replace {old} failed, downstairs {client_id} is {:?}", - ds.ds_state[client_id] + ds.clients[client_id].state ); } _ => {} @@ -7579,19 +7660,19 @@ impl Upstairs { // elsewhere, verified no other downstairs are in a bad state, we can // move forward with the replacement. info!(self.log, "{id} replacing old: {old} at {old_client_id}"); - ds.ds_target.insert(old_client_id, new); + ds.clients[old_client_id].target = Some(new); if ds.ds_set_faulted(old_client_id) { let _ = ds_done_tx.send(()).await; } - ds.region_metadata.remove(&old_client_id); + ds.clients[old_client_id].region_metadata = None; self.ds_transition_with_lock( &mut ds, up_state, old_client_id, DsState::Replacing, ); - ds.replaced[old_client_id] += 1; + ds.clients[old_client_id].replaced += 1; Ok(ReplaceResult::Started) } @@ -8377,6 +8458,45 @@ impl fmt::Display for IOState { } } +#[derive(Debug, Copy, Clone, Serialize, Deserialize)] +pub struct ClientIOStateCount { + pub new: u32, + pub in_progress: u32, + pub done: u32, + pub skipped: u32, + pub error: u32, +} + +impl ClientIOStateCount { + fn new() -> ClientIOStateCount { + ClientIOStateCount { + new: 0, + in_progress: 0, + done: 0, + skipped: 0, + error: 0, + } + } + + pub fn incr(&mut self, state: &IOState) { + *self.get_mut(state) += 1; + } + + pub fn decr(&mut self, state: &IOState) { + *self.get_mut(state) -= 1; + } + + fn get_mut(&mut self, state: &IOState) -> &mut u32 { + match state { + IOState::New => &mut self.new, + IOState::InProgress => &mut self.in_progress, + IOState::Done => &mut self.done, + IOState::Skipped => &mut self.skipped, + IOState::Error(_) => &mut self.error, + } + } +} + #[derive(Debug, Copy, Clone, Serialize, Deserialize)] pub struct IOStateCount { pub new: ClientData, @@ -8387,16 +8507,6 @@ pub struct IOStateCount { } impl IOStateCount { - fn new() -> IOStateCount { - IOStateCount { - new: ClientData::new(0), - in_progress: ClientData::new(0), - done: ClientData::new(0), - skipped: ClientData::new(0), - error: ClientData::new(0), - } - } - fn show_all(&self) { println!(" STATES DS:0 DS:1 DS:2 TOTAL"); self.show(IOState::New); @@ -8407,16 +8517,6 @@ impl IOStateCount { self.show(IOState::Error(e)); } - fn get_mut(&mut self, state: &IOState) -> &mut ClientData { - match state { - IOState::New => &mut self.new, - IOState::InProgress => &mut self.in_progress, - IOState::Done => &mut self.done, - IOState::Skipped => &mut self.skipped, - IOState::Error(_) => &mut self.error, - } - } - fn get(&self, state: &IOState) -> &ClientData { match state { IOState::New => &self.new, @@ -8453,14 +8553,6 @@ impl IOStateCount { } println!("{:4}", sum); } - - pub fn incr(&mut self, state: &IOState, cid: ClientId) { - self.get_mut(state)[cid] += 1; - } - - pub fn decr(&mut self, state: &IOState, cid: ClientId) { - self.get_mut(state)[cid] -= 1; - } } #[allow(clippy::derive_partial_eq_without_eq)] @@ -9792,7 +9884,7 @@ async fn gone_too_long(up: &Arc, ds_done_tx: mpsc::Sender<()>) { let mut notify_guest = false; for cid in ClientId::iter() { // Only downstairs in these states are checked. - match ds.ds_state[cid] { + match ds.clients[cid].state { DsState::Active | DsState::LiveRepair | DsState::Offline @@ -10053,9 +10145,9 @@ async fn process_new_io( // TODO should this first check if the Upstairs is active? let ds = up.downstairs.lock().await; let active_count = ds - .ds_state + .clients .iter() - .filter(|state| **state == DsState::Active) + .filter(|client| client.state == DsState::Active) .count(); drop(ds); *data.lock().await = WQCounts { @@ -10746,9 +10838,9 @@ async fn show_all_work(up: &Arc) -> WQCounts { println!(); } - ds.io_state_count.show_all(); + ds.io_state_count().show_all(); print!("Last Flush: "); - for lf in ds.ds_last_flush.iter() { + for lf in ds.clients.iter().map(|c| c.last_flush) { print!("{} ", lf); } println!(); @@ -10766,9 +10858,9 @@ async fn show_all_work(up: &Arc) -> WQCounts { } println!(); let active_count = ds - .ds_state + .clients .iter() - .filter(|state| **state == DsState::Active) + .filter(|c| c.state == DsState::Active) .count(); drop(ds); diff --git a/upstairs/src/live_repair.rs b/upstairs/src/live_repair.rs index 8387dae6c..b54939f35 100644 --- a/upstairs/src/live_repair.rs +++ b/upstairs/src/live_repair.rs @@ -117,14 +117,14 @@ pub async fn check_for_repair( // There's no state drift to repair anyway, this read-only Upstairs // wouldn't have caused any modifications. for cid in ClientId::iter() { - if ds.ds_state[cid] == DsState::LiveRepairReady { + if ds.clients[cid].state == DsState::LiveRepairReady { up.ds_transition_with_lock( &mut ds, up_state, cid, DsState::Active, ); - ds.ro_lr_skipped[cid] += 1; + ds.clients[cid].ro_lr_skipped += 1; } } @@ -134,15 +134,15 @@ pub async fn check_for_repair( // Verify that all downstairs and the upstairs are in the proper state // before we begin a live repair. let repair = ds - .ds_state + .clients .iter() - .filter(|state| **state == DsState::LiveRepair) + .filter(|c| c.state == DsState::LiveRepair) .count(); let repair_ready = ds - .ds_state + .clients .iter() - .filter(|state| **state == DsState::LiveRepairReady) + .filter(|c| c.state == DsState::LiveRepairReady) .count(); if repair_ready == 0 { @@ -171,7 +171,7 @@ pub async fn check_for_repair( // We do this now while we have the lock to avoid having to do all // these checks again in live_repair_main for cid in ClientId::iter() { - if ds.ds_state[cid] == DsState::LiveRepairReady { + if ds.clients[cid].state == DsState::LiveRepairReady { up.ds_transition_with_lock( &mut ds, up_state, @@ -245,13 +245,13 @@ async fn live_repair_main( // Make sure things are as we expect them to be. assert!(ds.repair_job_ids.is_empty()); // Verify no extent_limits are Some - assert_eq!(ds.extent_limit.iter().count(), 0); + assert!(ds.clients.iter().all(|c| c.extent_limit.is_none())); // When we transitioned this downstairs to LiveRepair, it should have set // the minimum for repair, though we will update it again below. assert!(ds.repair_min_id.is_some()); for cid in ClientId::iter() { - match ds.ds_state[cid] { + match ds.clients[cid].state { DsState::LiveRepair => { repair_downstairs.push(cid); } @@ -329,8 +329,8 @@ async fn live_repair_main( // Verify state has been cleared. for cid in ClientId::iter() { - assert!(ds.extent_limit.get(&cid).is_none()); - assert!(ds.ds_state[cid] != DsState::LiveRepair); + assert!(ds.clients[cid].extent_limit.is_none()); + assert!(ds.clients[cid].state != DsState::LiveRepair); } // This will not be set until the repair task exits. assert!(ds.repair_min_id.is_some()); @@ -407,16 +407,16 @@ async fn live_repair_main( if failed_repair { for cid in ClientId::iter() { - assert!(ds.extent_limit.get(&cid).is_none()); - assert!(ds.ds_state[cid] != DsState::LiveRepair); + assert!(ds.clients[cid].extent_limit.is_none()); + assert!(ds.clients[cid].state != DsState::LiveRepair); } for &cid in repair_downstairs.iter() { - ds.live_repair_aborted[cid] += 1; + ds.clients[cid].live_repair_aborted += 1; } } else { for &cid in repair_downstairs.iter() { up.ds_transition_with_lock(&mut ds, up_state, cid, DsState::Active); - ds.live_repair_completed[cid] += 1; + ds.clients[cid].live_repair_completed += 1; } } ds.end_live_repair(); @@ -550,10 +550,10 @@ fn repair_or_noop( let mut need_repair = Vec::new(); debug!(ds.log, "Get repair info for {} source", source); - let good_ei = ds.repair_info.remove(&source).unwrap(); + let good_ei = ds.clients[source].repair_info.take().unwrap(); for broken_extent in repair.iter() { debug!(ds.log, "Get repair info for {} bad", broken_extent); - let repair_ei = ds.repair_info.remove(broken_extent).unwrap(); + let repair_ei = ds.clients[*broken_extent].repair_info.take().unwrap(); let repair = if repair_ei.dirty || repair_ei.generation != good_ei.generation { @@ -569,12 +569,14 @@ fn repair_or_noop( // Now that we have consumed the contents, be sure to clear // out anything we did not look at. There could be something left - ds.repair_info = ClientMap::new(); + for c in ds.clients.iter_mut() { + c.repair_info = None; + } if need_repair.is_empty() { info!(ds.log, "No repair needed for extent {}", extent); for &cid in repair.iter() { - ds.extents_confirmed[cid] += 1; + ds.clients[cid].extents_confirmed += 1; } create_noop_io(repair_id, repair_deps, gw_repair_id) } else { @@ -583,7 +585,7 @@ fn repair_or_noop( "Repair for extent {} s:{} d:{:?}", extent, source, need_repair ); for &cid in repair.iter() { - ds.extents_repaired[cid] += 1; + ds.clients[cid].extents_repaired += 1; } let repair_address = ds.repair_addr(source); @@ -727,11 +729,11 @@ fn repair_ds_state_change( repair: &[ClientId], ) -> bool { for &cid in repair.iter() { - if ds.ds_state[cid] != DsState::LiveRepair { + if ds.clients[cid].state != DsState::LiveRepair { return true; } } - ds.ds_state[source] != DsState::Active + ds.clients[source].state != DsState::Active } impl Upstairs { @@ -746,7 +748,7 @@ impl Upstairs { ) { let mut notify_guest = false; for cid in ClientId::iter() { - if ds.ds_state[cid] == DsState::LiveRepair { + if ds.clients[cid].state == DsState::LiveRepair { if ds.ds_set_faulted(cid) { notify_guest = true; } @@ -793,9 +795,9 @@ impl Upstairs { // anyway, so just give up. There is no way we can make // progress, and no point in creating any more work. if ds - .ds_state + .clients .iter() - .filter(|state| **state == DsState::Faulted) + .filter(|c| c.state == DsState::Faulted) .count() == 3 { @@ -987,20 +989,20 @@ impl Upstairs { bail!("Abort repair due to state change in downstairs"); } assert!(ds.repair_min_id.is_some()); - assert!(ds.repair_info.is_empty()); + assert!(ds.clients.iter().all(|c| c.repair_info.is_none())); // Update our extent limit to this extent. for ds_repair in repair.iter() { // We should be walking up the extents one at a time. if eid > 0 { assert_eq!( - ds.extent_limit.get(ds_repair).cloned(), + ds.clients[*ds_repair].extent_limit, Some(eid as usize - 1) ); } else { - assert!(ds.extent_limit.get(ds_repair).is_none()) + assert!(ds.clients[*ds_repair].extent_limit.is_none()); } - ds.extent_limit.insert(*ds_repair, eid as usize); + ds.clients[*ds_repair].extent_limit = Some(eid as usize); } // Upstairs "guest" work IDs. @@ -1269,9 +1271,9 @@ impl Upstairs { warn!(self.log, "RE:{} Bailing with error", eid); let ds = self.downstairs.lock().await; assert_eq!( - ds.ds_state + ds.clients .iter() - .filter(|state| **state == DsState::LiveRepair) + .filter(|c| c.state == DsState::LiveRepair) .count(), 0 ); @@ -1326,7 +1328,8 @@ pub mod repair_test { fn create_test_downstairs() -> Downstairs { let mut ds = Downstairs::new(csl(), ClientMap::new()); for cid in ClientId::iter() { - ds.ds_repair.insert(cid, "127.0.0.1:1234".parse().unwrap()); + ds.clients[cid].repair_addr = + Some("127.0.0.1:1234".parse().unwrap()); } ds } @@ -1492,7 +1495,7 @@ pub mod repair_test { // No downstairs should change state. let ds = up.downstairs.lock().await; for cid in ClientId::iter() { - assert_eq!(ds.ds_state[cid], DsState::Active); + assert_eq!(ds.clients[cid].state, DsState::Active); } assert!(ds.repair_min_id.is_none()) } @@ -1521,7 +1524,7 @@ pub mod repair_test { RepairCheck::RepairStarted ); let ds = up.downstairs.lock().await; - assert_eq!(ds.ds_state[ClientId::new(1)], DsState::LiveRepair); + assert_eq!(ds.clients[ClientId::new(1)].state, DsState::LiveRepair); assert!(ds.repair_min_id.is_some()) } @@ -1552,9 +1555,9 @@ pub mod repair_test { RepairCheck::RepairStarted ); let ds = up.downstairs.lock().await; - assert_eq!(ds.ds_state[ClientId::new(0)], DsState::Active); - assert_eq!(ds.ds_state[ClientId::new(1)], DsState::LiveRepair); - assert_eq!(ds.ds_state[ClientId::new(2)], DsState::LiveRepair); + assert_eq!(ds.clients[ClientId::new(0)].state, DsState::Active); + assert_eq!(ds.clients[ClientId::new(1)].state, DsState::LiveRepair); + assert_eq!(ds.clients[ClientId::new(2)].state, DsState::LiveRepair); assert!(ds.repair_min_id.is_some()) } @@ -1927,7 +1930,7 @@ pub mod repair_test { } assert_eq!(job.state_count().done, 3); - assert_eq!(ds.ds_state[or_ds], DsState::LiveRepair); + assert_eq!(ds.clients[or_ds].state, DsState::LiveRepair); } #[tokio::test] @@ -1994,7 +1997,7 @@ pub mod repair_test { // process_ds_operation should force the downstairs to fail assert_eq!( - up.downstairs.lock().await.ds_state[err_ds], + up.downstairs.lock().await.clients[err_ds].state, DsState::Faulted ); @@ -2112,8 +2115,8 @@ pub mod repair_test { assert_eq!(job.state_count().skipped, 2); } - assert_eq!(ds.ds_state[err_ds], DsState::Faulted); - assert_eq!(ds.ds_state[or_ds], DsState::Faulted); + assert_eq!(ds.clients[err_ds].state, DsState::Faulted); + assert_eq!(ds.clients[or_ds].state, DsState::Faulted); } #[tokio::test] @@ -2219,7 +2222,7 @@ pub mod repair_test { // reported the error, and the downstairs that is under repair to // fail. assert_eq!( - up.downstairs.lock().await.ds_state[err_ds], + up.downstairs.lock().await.clients[err_ds].state, DsState::Faulted ); // When we completed the repair jobs, the repair_extent should @@ -2302,8 +2305,8 @@ pub mod repair_test { assert_eq!(job.state_count().skipped, 1); } - assert_eq!(ds.ds_state[err_ds], DsState::Faulted); - assert_eq!(ds.ds_state[or_ds], DsState::Faulted); + assert_eq!(ds.clients[err_ds].state, DsState::Faulted); + assert_eq!(ds.clients[or_ds].state, DsState::Faulted); } #[tokio::test] @@ -2455,8 +2458,8 @@ pub mod repair_test { assert_eq!(job.state_count().skipped, 1); } - assert_eq!(ds.ds_state[err_ds], DsState::Faulted); - assert_eq!(ds.ds_state[or_ds], DsState::Faulted); + assert_eq!(ds.clients[err_ds].state, DsState::Faulted); + assert_eq!(ds.clients[or_ds].state, DsState::Faulted); } #[tokio::test] @@ -2584,8 +2587,8 @@ pub mod repair_test { assert_eq!(job.state_count().done, 2); assert_eq!(job.state_count().error, 1); - assert_eq!(ds.ds_state[err_ds], DsState::Faulted); - assert_eq!(ds.ds_state[or_ds], DsState::Faulted); + assert_eq!(ds.clients[err_ds].state, DsState::Faulted); + assert_eq!(ds.clients[or_ds].state, DsState::Faulted); } #[tokio::test] @@ -2672,7 +2675,7 @@ pub mod repair_test { let id = 1000; let mut ds = up.downstairs.lock().await; - assert!(ds.extent_limit.get(&ClientId::new(1)).is_none()); + assert!(ds.clients[ClientId::new(1)].extent_limit.is_none()); // Check all three IOs. for job_id in (id..id + 3).map(JobId) { assert!(ds.in_progress(job_id, ClientId::new(0)).is_some()); @@ -2690,7 +2693,7 @@ pub mod repair_test { let (ds_done_tx, _ds_done_rx) = mpsc::channel(500); let mut ds = up.downstairs.lock().await; - ds.extent_limit.insert(ClientId::new(1), 1); + ds.clients[ClientId::new(1)].extent_limit = Some(1); drop(ds); // Our default extent size is 3, so block 3 will be on extent 1 @@ -2746,7 +2749,7 @@ pub mod repair_test { let (ds_done_tx, _ds_done_rx) = mpsc::channel(500); let mut ds = up.downstairs.lock().await; - ds.extent_limit.insert(ClientId::new(1), 0); + ds.clients[ClientId::new(1)].extent_limit = Some(0); drop(ds); up.submit_write( @@ -2798,7 +2801,7 @@ pub mod repair_test { let (ds_done_tx, _ds_done_rx) = mpsc::channel(500); let mut ds = up.downstairs.lock().await; - ds.extent_limit.insert(ClientId::new(1), 1); + ds.clients[ClientId::new(1)].extent_limit = Some(1); drop(ds); up.submit_write( @@ -2890,7 +2893,7 @@ pub mod repair_test { let (ds_done_tx, _ds_done_rx) = mpsc::channel(500); let mut ds = up.downstairs.lock().await; - ds.extent_limit.insert(ClientId::new(1), 1); + ds.clients[ClientId::new(1)].extent_limit = Some(1); drop(ds); // Our default extent size is 3, so block 3 will be on extent 1 @@ -2978,7 +2981,7 @@ pub mod repair_test { let (ds_done_tx, _ds_done_rx) = mpsc::channel(500); let mut ds = up.downstairs.lock().await; - ds.extent_limit.insert(ClientId::new(1), 1); + ds.clients[ClientId::new(1)].extent_limit = Some(1); drop(ds); // Our default extent size is 3, so block 3 will be on extent 1 @@ -3044,7 +3047,7 @@ pub mod repair_test { let (ds_done_tx, _ds_done_rx) = mpsc::channel(500); let mut ds = up.downstairs.lock().await; - ds.extent_limit.insert(ClientId::new(1), 1); + ds.clients[ClientId::new(1)].extent_limit = Some(1); drop(ds); // Our default extent size is 3, so block 3 will be on extent 1 @@ -3116,7 +3119,7 @@ pub mod repair_test { let (ds_done_tx, _ds_done_rx) = mpsc::channel(500); let mut ds = up.downstairs.lock().await; - ds.extent_limit.insert(ClientId::new(1), 0); + ds.clients[ClientId::new(1)].extent_limit = Some(0); drop(ds); // Our default extent size is 3, so block 3 will be on extent 1 @@ -3154,8 +3157,8 @@ pub mod repair_test { let mut gw = up.guest.guest_work.lock().await; let mut ds = up.downstairs.lock().await; let eid = 1; - ds.extent_limit - .insert(ClientId::new(1), eid.try_into().unwrap()); + ds.clients[ClientId::new(1)].extent_limit = + Some(eid.try_into().unwrap()); let gw_close_id = 1; let (repair_ids, deps) = ds.get_repair_ids(eid); @@ -3204,12 +3207,12 @@ pub mod repair_test { // Verify the extent information has been added to the repair info // hashmap for client 0 - let new_ei = ds.repair_info.get(&ClientId::new(0)).unwrap(); + let new_ei = ds.clients[ClientId::new(0)].repair_info.unwrap(); assert_eq!(new_ei.generation, 5); assert_eq!(new_ei.flush_number, 3); assert!(!new_ei.dirty); - assert!(ds.repair_info.remove(&ClientId::new(1)).is_none()); - assert!(ds.repair_info.remove(&ClientId::new(2)).is_none()); + assert!(ds.clients[ClientId::new(1)].repair_info.is_none()); + assert!(ds.clients[ClientId::new(2)].repair_info.is_none()); // Verify the extent information has been added to the repair info // hashmap for client 1 @@ -3227,11 +3230,11 @@ pub mod repair_test { Some(ei) ) .unwrap()); - let new_ei = ds.repair_info.get(&ClientId::new(1)).unwrap(); + let new_ei = ds.clients[ClientId::new(1)].repair_info.unwrap(); assert_eq!(new_ei.generation, 2); assert_eq!(new_ei.flush_number, 4); assert!(new_ei.dirty); - assert!(ds.repair_info.remove(&ClientId::new(2)).is_none()); + assert!(ds.clients[ClientId::new(2)].repair_info.is_none()); let ei = ExtentInfo { generation: 29, @@ -3247,7 +3250,7 @@ pub mod repair_test { Some(ei) ) .unwrap()); - let new_ei = ds.repair_info.get(&ClientId::new(2)).unwrap(); + let new_ei = ds.clients[ClientId::new(2)].repair_info.unwrap(); assert_eq!(new_ei.generation, 29); assert_eq!(new_ei.flush_number, 444); assert!(!new_ei.dirty); @@ -3279,9 +3282,18 @@ pub mod repair_test { flush_number: 3, dirty: false, }; - assert!(ds.repair_info.insert(ClientId::new(0), ei).is_none()); - assert!(ds.repair_info.insert(ClientId::new(1), ei).is_none()); - assert!(ds.repair_info.insert(ClientId::new(2), ei).is_none()); + assert!(ds.clients[ClientId::new(0)] + .repair_info + .replace(ei) + .is_none()); + assert!(ds.clients[ClientId::new(1)] + .repair_info + .replace(ei) + .is_none()); + assert!(ds.clients[ClientId::new(2)] + .repair_info + .replace(ei) + .is_none()); let repair_extent = if source == ClientId::new(0) { vec![ClientId::new(1), ClientId::new(2)] @@ -3380,31 +3392,31 @@ pub mod repair_test { // is the source. // First try one source, one repair let repair = if source == ClientId::new(0) { - assert!(ds + assert!(ds.clients[ClientId::new(0)] .repair_info - .insert(ClientId::new(0), good_ei) + .replace(good_ei) .is_none()); - assert!(ds + assert!(ds.clients[ClientId::new(1)] .repair_info - .insert(ClientId::new(1), bad_ei) + .replace(bad_ei) .is_none()); - assert!(ds + assert!(ds.clients[ClientId::new(2)] .repair_info - .insert(ClientId::new(2), good_ei) + .replace(good_ei) .is_none()); vec![ClientId::new(1)] } else { - assert!(ds + assert!(ds.clients[ClientId::new(0)] .repair_info - .insert(ClientId::new(0), bad_ei) + .replace(bad_ei) .is_none()); - assert!(ds + assert!(ds.clients[ClientId::new(1)] .repair_info - .insert(ClientId::new(1), good_ei) + .replace(good_ei) .is_none()); - assert!(ds + assert!(ds.clients[ClientId::new(2)] .repair_info - .insert(ClientId::new(2), good_ei) + .replace(good_ei) .is_none()); vec![ClientId::new(0)] }; @@ -3414,31 +3426,31 @@ pub mod repair_test { // Next try the other downstairs to repair. let repair = if source == ClientId::new(2) { - assert!(ds + assert!(ds.clients[ClientId::new(0)] .repair_info - .insert(ClientId::new(0), good_ei) + .replace(good_ei) .is_none()); - assert!(ds + assert!(ds.clients[ClientId::new(1)] .repair_info - .insert(ClientId::new(1), bad_ei) + .replace(bad_ei) .is_none()); - assert!(ds + assert!(ds.clients[ClientId::new(2)] .repair_info - .insert(ClientId::new(2), good_ei) + .replace(good_ei) .is_none()); vec![ClientId::new(1)] } else { - assert!(ds + assert!(ds.clients[ClientId::new(0)] .repair_info - .insert(ClientId::new(0), good_ei) + .replace(good_ei) .is_none()); - assert!(ds + assert!(ds.clients[ClientId::new(1)] .repair_info - .insert(ClientId::new(1), good_ei) + .replace(good_ei) .is_none()); - assert!(ds + assert!(ds.clients[ClientId::new(2)] .repair_info - .insert(ClientId::new(2), bad_ei) + .replace(bad_ei) .is_none()); vec![ClientId::new(2)] }; @@ -3466,45 +3478,45 @@ pub mod repair_test { // is the source. // One source, two repair let repair = if source == ClientId::new(0) { - assert!(ds + assert!(ds.clients[ClientId::new(0)] .repair_info - .insert(ClientId::new(0), good_ei) + .replace(good_ei) .is_none()); - assert!(ds + assert!(ds.clients[ClientId::new(1)] .repair_info - .insert(ClientId::new(1), bad_ei) + .replace(bad_ei) .is_none()); - assert!(ds + assert!(ds.clients[ClientId::new(2)] .repair_info - .insert(ClientId::new(2), bad_ei) + .replace(bad_ei) .is_none()); vec![ClientId::new(1), ClientId::new(2)] } else if source == ClientId::new(1) { - assert!(ds + assert!(ds.clients[ClientId::new(0)] .repair_info - .insert(ClientId::new(0), bad_ei) + .replace(bad_ei) .is_none()); - assert!(ds + assert!(ds.clients[ClientId::new(1)] .repair_info - .insert(ClientId::new(1), good_ei) + .replace(good_ei) .is_none()); - assert!(ds + assert!(ds.clients[ClientId::new(2)] .repair_info - .insert(ClientId::new(2), bad_ei) + .replace(bad_ei) .is_none()); vec![ClientId::new(0), ClientId::new(2)] } else { - assert!(ds + assert!(ds.clients[ClientId::new(0)] .repair_info - .insert(ClientId::new(0), bad_ei) + .replace(bad_ei) .is_none()); - assert!(ds + assert!(ds.clients[ClientId::new(1)] .repair_info - .insert(ClientId::new(1), bad_ei) + .replace(bad_ei) .is_none()); - assert!(ds + assert!(ds.clients[ClientId::new(2)] .repair_info - .insert(ClientId::new(2), good_ei) + .replace(good_ei) .is_none()); vec![ClientId::new(0), ClientId::new(1)] }; @@ -3734,8 +3746,8 @@ pub mod repair_test { let mut ds = up.downstairs.lock().await; let eid = 1; - ds.extent_limit - .insert(ClientId::new(1), eid.try_into().unwrap()); + ds.clients[ClientId::new(1)].extent_limit = + Some(eid.try_into().unwrap()); // Upstairs "guest" work IDs. let gw_r_id: u64 = 1; @@ -3792,8 +3804,8 @@ pub mod repair_test { let mut ds = up.downstairs.lock().await; let eid = 1; - ds.extent_limit - .insert(ClientId::new(1), eid.try_into().unwrap()); + ds.clients[ClientId::new(1)].extent_limit = + Some(eid.try_into().unwrap()); // Upstairs "guest" work IDs. let gw_close_id: u64 = 1; @@ -3870,8 +3882,8 @@ pub mod repair_test { let mut ds = up.downstairs.lock().await; let eid = 1; - ds.extent_limit - .insert(ClientId::new(1), eid.try_into().unwrap()); + ds.clients[ClientId::new(1)].extent_limit = + Some(eid.try_into().unwrap()); // Upstairs "guest" work IDs. let gw_repair_id: u64 = gw.next_gw_id(); @@ -3891,7 +3903,7 @@ pub mod repair_test { dirty: false, }; for cid in ClientId::iter() { - ds.repair_info.insert(cid, ei); + ds.clients[cid].repair_info = Some(ei); } let _repair_brw = create_and_enqueue_repair_io( @@ -3938,8 +3950,8 @@ pub mod repair_test { let mut ds = up.downstairs.lock().await; let eid = 1; - ds.extent_limit - .insert(ClientId::new(1), eid.try_into().unwrap()); + ds.clients[ClientId::new(1)].extent_limit = + Some(eid.try_into().unwrap()); // Upstairs "guest" work IDs. let gw_repair_id: u64 = gw.next_gw_id(); @@ -3957,17 +3969,18 @@ pub mod repair_test { flush_number: 3, dirty: false, }; - ds.repair_info.insert(ClientId::new(0), ei); - ds.repair_info.insert(ClientId::new(1), ei); + ds.clients[ClientId::new(0)].repair_info = Some(ei); + ds.clients[ClientId::new(1)].repair_info = Some(ei); let bad_ei = ExtentInfo { generation: 5, flush_number: 2, dirty: false, }; - ds.repair_info.insert(ClientId::new(2), bad_ei); + ds.clients[ClientId::new(2)].repair_info = Some(bad_ei); // We also need a fake repair address for cid in ClientId::iter() { - ds.ds_repair.insert(cid, "127.0.0.1:1234".parse().unwrap()); + ds.clients[cid].repair_addr = + Some("127.0.0.1:1234".parse().unwrap()); } let _reopen_brw = create_and_enqueue_repair_io( @@ -4036,8 +4049,8 @@ pub mod repair_test { async fn create_and_enqueue_repair_ops(up: &Arc, eid: u64) { let mut gw = up.guest.guest_work.lock().await; let mut ds = up.downstairs.lock().await; - ds.extent_limit - .insert(ClientId::new(1), eid.try_into().unwrap()); + ds.clients[ClientId::new(1)].extent_limit = + Some(eid.try_into().unwrap()); let (extent_repair_ids, deps) = ds.get_repair_ids(eid); @@ -4122,8 +4135,8 @@ pub mod repair_test { // Repair IO functions assume you have the locks let mut ds = up.downstairs.lock().await; let eid = 0; - ds.extent_limit - .insert(ClientId::new(1), eid.try_into().unwrap()); + ds.clients[ClientId::new(1)].extent_limit = + Some(eid.try_into().unwrap()); drop(ds); create_and_enqueue_repair_ops(&up, eid).await; @@ -4178,8 +4191,8 @@ pub mod repair_test { // Repair IO functions assume you have the locks already let mut ds = up.downstairs.lock().await; let eid = 0; - ds.extent_limit - .insert(ClientId::new(1), eid.try_into().unwrap()); + ds.clients[ClientId::new(1)].extent_limit = + Some(eid.try_into().unwrap()); drop(ds); create_and_enqueue_repair_ops(&up, eid).await; @@ -4245,8 +4258,8 @@ pub mod repair_test { // Repair IO functions assume you have the locks already let mut ds = up.downstairs.lock().await; let eid = 0; - ds.extent_limit - .insert(ClientId::new(1), eid.try_into().unwrap()); + ds.clients[ClientId::new(1)].extent_limit = + Some(eid.try_into().unwrap()); drop(ds); create_and_enqueue_repair_ops(&up, eid).await; @@ -4317,8 +4330,8 @@ pub mod repair_test { // Repair IO functions assume you have the locks let mut ds = up.downstairs.lock().await; let eid = 0; - ds.extent_limit - .insert(ClientId::new(1), eid.try_into().unwrap()); + ds.clients[ClientId::new(1)].extent_limit = + Some(eid.try_into().unwrap()); drop(ds); @@ -4363,8 +4376,8 @@ pub mod repair_test { // Repair IO functions assume you have the locks already let mut ds = up.downstairs.lock().await; let eid = 0; - ds.extent_limit - .insert(ClientId::new(1), eid.try_into().unwrap()); + ds.clients[ClientId::new(1)].extent_limit = + Some(eid.try_into().unwrap()); drop(ds); create_and_enqueue_repair_ops(&up, eid).await; @@ -4408,8 +4421,8 @@ pub mod repair_test { // Repair IO functions assume you have the locks already let mut ds = up.downstairs.lock().await; let eid = 0; - ds.extent_limit - .insert(ClientId::new(1), eid.try_into().unwrap()); + ds.clients[ClientId::new(1)].extent_limit = + Some(eid.try_into().unwrap()); drop(ds); create_and_enqueue_repair_ops(&up, eid).await; @@ -5188,7 +5201,7 @@ pub mod repair_test { let (ds_done_tx, _ds_done_rx) = mpsc::channel(500); let mut ds = up.downstairs.lock().await; - ds.extent_limit.insert(ClientId::new(1), 0); + ds.clients[ClientId::new(1)].extent_limit = Some(0); drop(ds); // A write of blocks 2,3,4 which spans extents. @@ -5243,7 +5256,7 @@ pub mod repair_test { let (ds_done_tx, _ds_done_rx) = mpsc::channel(500); let mut ds = up.downstairs.lock().await; - ds.extent_limit.insert(ClientId::new(1), 0); + ds.clients[ClientId::new(1)].extent_limit = Some(0); drop(ds); // A read of blocks 2,3,4 which spans extents. @@ -5295,7 +5308,7 @@ pub mod repair_test { let (ds_done_tx, _ds_done_rx) = mpsc::channel(500); let mut ds = up.downstairs.lock().await; - ds.extent_limit.insert(ClientId::new(1), 0); + ds.clients[ClientId::new(1)].extent_limit = Some(0); drop(ds); up.submit_flush(None, None, ds_done_tx.clone()) @@ -5317,7 +5330,7 @@ pub mod repair_test { let (ds_done_tx, _ds_done_rx) = mpsc::channel(500); let mut ds = up.downstairs.lock().await; - ds.extent_limit.insert(ClientId::new(1), 1); + ds.clients[ClientId::new(1)].extent_limit = Some(1); drop(ds); let job_id = JobId(1000); @@ -5391,7 +5404,7 @@ pub mod repair_test { let eid = 0u64; let mut ds = up.downstairs.lock().await; - ds.extent_limit.insert(ClientId::new(1), eid as usize); + ds.clients[ClientId::new(1)].extent_limit = Some(eid as usize); drop(ds); submit_three_ios(&up, &ds_done_tx).await; @@ -5402,9 +5415,9 @@ pub mod repair_test { up.abort_repair_ds(&mut ds, UpState::Active, &ds_done_tx); up.abort_repair_extent(&mut gw, &mut ds, eid); - assert_eq!(ds.ds_state[ClientId::new(0)], DsState::Active); - assert_eq!(ds.ds_state[ClientId::new(1)], DsState::Faulted); - assert_eq!(ds.ds_state[ClientId::new(0)], DsState::Active); + assert_eq!(ds.clients[ClientId::new(0)].state, DsState::Active); + assert_eq!(ds.clients[ClientId::new(1)].state, DsState::Faulted); + assert_eq!(ds.clients[ClientId::new(0)].state, DsState::Active); // Check all three IOs again, downstairs 1 will be skipped.. let jobs: Vec<&DownstairsIO> = ds.ds_active.values().collect(); @@ -5414,7 +5427,7 @@ pub mod repair_test { assert_eq!(job.state[ClientId::new(1)], IOState::Skipped); assert_eq!(job.state[ClientId::new(2)], IOState::New); } - assert_eq!(ds.extent_limit.get(&ClientId::new(1)), None); + assert!(ds.clients[ClientId::new(1)].extent_limit.is_none()); } #[tokio::test] @@ -5428,7 +5441,7 @@ pub mod repair_test { let eid = 0u64; let mut ds = up.downstairs.lock().await; - ds.extent_limit.insert(ClientId::new(1), eid as usize); + ds.clients[ClientId::new(1)].extent_limit = Some(eid as usize); drop(ds); submit_three_ios(&up, &ds_done_tx).await; @@ -5476,9 +5489,9 @@ pub mod repair_test { let eid = 0u64; let mut ds = up.downstairs.lock().await; - ds.extent_limit.insert(ClientId::new(0), eid as usize); - ds.extent_limit.insert(ClientId::new(1), eid as usize); - ds.extent_limit.insert(ClientId::new(2), eid as usize); + ds.clients[ClientId::new(0)].extent_limit = Some(eid as usize); + ds.clients[ClientId::new(1)].extent_limit = Some(eid as usize); + ds.clients[ClientId::new(2)].extent_limit = Some(eid as usize); drop(ds); submit_three_ios(&up, &ds_done_tx).await; @@ -5522,7 +5535,7 @@ pub mod repair_test { flush_numbers: vec![2, 2, 2, 2], dirty: vec![false, false, false, false], }; - up.downstairs.lock().await.region_metadata.insert(cid, dsr); + up.downstairs.lock().await.clients[cid].region_metadata = Some(dsr); up.ds_transition(cid, DsState::WaitQuorum).await; up.ds_transition(cid, DsState::Active).await; } @@ -5571,11 +5584,11 @@ pub mod repair_test { show_all_work(&up).await; let mut ds = up.downstairs.lock().await; // Good downstairs don't need changes - assert!(!ds.dependencies_need_cleanup(ClientId::new(0))); - assert!(!ds.dependencies_need_cleanup(ClientId::new(2))); + assert!(!ds.clients[ClientId::new(0)].dependencies_need_cleanup()); + assert!(!ds.clients[ClientId::new(2)].dependencies_need_cleanup()); // LiveRepair downstairs might need a change - assert!(ds.dependencies_need_cleanup(ClientId::new(1))); + assert!(ds.clients[ClientId::new(1)].dependencies_need_cleanup()); for job_id in (1003..1006).map(JobId) { let job = ds.ds_active.get(&job_id).unwrap(); // jobs 3,4,5 will be skipped for our LiveRepair downstairs. @@ -5665,7 +5678,7 @@ pub mod repair_test { submit_three_ios(&up, &ds_done_tx).await; let mut ds = up.downstairs.lock().await; - ds.extent_limit.insert(ClientId::new(1), 1); + ds.clients[ClientId::new(1)].extent_limit = Some(1); drop(ds); // New jobs will go -> Skipped for the downstairs in repair. @@ -5673,11 +5686,11 @@ pub mod repair_test { let mut ds = up.downstairs.lock().await; // Good downstairs don't need changes - assert!(!ds.dependencies_need_cleanup(ClientId::new(0))); - assert!(!ds.dependencies_need_cleanup(ClientId::new(2))); + assert!(!ds.clients[ClientId::new(0)].dependencies_need_cleanup()); + assert!(!ds.clients[ClientId::new(2)].dependencies_need_cleanup()); // LiveRepair downstairs might need a change - assert!(ds.dependencies_need_cleanup(ClientId::new(1))); + assert!(ds.clients[ClientId::new(1)].dependencies_need_cleanup()); // For the three latest jobs, they should be New as they are IOs that // are on an extent we "already repaired". @@ -5957,7 +5970,7 @@ pub mod repair_test { let mut gw = up.guest.guest_work.lock().await; let mut ds = up.downstairs.lock().await; ds.repair_min_id = Some(ds.peek_next_id()); - ds.extent_limit.insert(ClientId::new(1), eid as usize); + ds.clients[ClientId::new(1)].extent_limit = Some(eid as usize); // Upstairs "guest" work IDs. let gw_close_id: u64 = gw.next_gw_id(); @@ -6120,7 +6133,7 @@ pub mod repair_test { let mut ds = up.downstairs.lock().await; // Make extent 0 under repair - ds.extent_limit.insert(ClientId::new(1), 0); + ds.clients[ClientId::new(1)].extent_limit = Some(0); drop(ds); // A write of blocks 2,3,4 which spans extents 0-1. @@ -6215,7 +6228,7 @@ pub mod repair_test { let mut ds = up.downstairs.lock().await; // Make extent 0 under repair - ds.extent_limit.insert(ClientId::new(1), 0); + ds.clients[ClientId::new(1)].extent_limit = Some(0); drop(ds); // A write of blocks 3,4,5,6 which spans extents 1-2. diff --git a/upstairs/src/test.rs b/upstairs/src/test.rs index 7ddbe5b1c..9bc73bb68 100644 --- a/upstairs/src/test.rs +++ b/upstairs/src/test.rs @@ -933,9 +933,9 @@ pub(crate) mod up_test { assert_eq!(ds.completed.len(), 1); // No skipped jobs here. - assert_eq!(ds.ds_skipped_jobs[ClientId::new(0)].len(), 0); - assert_eq!(ds.ds_skipped_jobs[ClientId::new(1)].len(), 0); - assert_eq!(ds.ds_skipped_jobs[ClientId::new(2)].len(), 0); + assert_eq!(ds.clients[ClientId::new(0)].skipped_jobs.len(), 0); + assert_eq!(ds.clients[ClientId::new(1)].skipped_jobs.len(), 0); + assert_eq!(ds.clients[ClientId::new(2)].skipped_jobs.len(), 0); } #[tokio::test] @@ -1773,9 +1773,9 @@ pub(crate) mod up_test { // if it's just a write, then it should be false. assert_eq!(res, is_write_unwritten); - assert!(ds.downstairs_errors[ClientId::new(0)] > 0); - assert!(ds.downstairs_errors[ClientId::new(1)] > 0); - assert_eq!(ds.downstairs_errors[ClientId::new(2)], 0); + assert!(ds.clients[ClientId::new(0)].downstairs_errors > 0); + assert!(ds.clients[ClientId::new(1)].downstairs_errors > 0); + assert_eq!(ds.clients[ClientId::new(2)].downstairs_errors, 0); } #[tokio::test] @@ -2335,9 +2335,9 @@ pub(crate) mod up_test { Some(vec![Bytes::from_static(&[3])]), ); - assert_eq!(ds.downstairs_errors[ClientId::new(0)], 0); - assert_eq!(ds.downstairs_errors[ClientId::new(1)], 0); - assert_eq!(ds.downstairs_errors[ClientId::new(2)], 0); + assert_eq!(ds.clients[ClientId::new(0)].downstairs_errors, 0); + assert_eq!(ds.clients[ClientId::new(1)].downstairs_errors, 0); + assert_eq!(ds.clients[ClientId::new(2)].downstairs_errors, 0); // send another read, and expect all to return something // (reads shouldn't cause a Failed transition) @@ -2742,9 +2742,9 @@ pub(crate) mod up_test { // Make sure downstairs 0 and 1 update their last flush id and // that downstairs 2 does not. - assert_eq!(ds.ds_last_flush[ClientId::new(0)], flush_id); - assert_eq!(ds.ds_last_flush[ClientId::new(1)], flush_id); - assert_eq!(ds.ds_last_flush[ClientId::new(2)], JobId(0)); + assert_eq!(ds.clients[ClientId::new(0)].last_flush, flush_id); + assert_eq!(ds.clients[ClientId::new(1)].last_flush, flush_id); + assert_eq!(ds.clients[ClientId::new(2)].last_flush, JobId(0)); // Should not retire yet. ds.retire_check(flush_id); @@ -2794,7 +2794,7 @@ pub(crate) mod up_test { // All three jobs should now move to completed assert_eq!(ds.completed.len(), 3); // Downstairs 2 should update the last flush it just did. - assert_eq!(ds.ds_last_flush[ClientId::new(2)], flush_id); + assert_eq!(ds.clients[ClientId::new(2)].last_flush, flush_id); } #[tokio::test] @@ -3061,9 +3061,9 @@ pub(crate) mod up_test { assert_eq!(ds.completed.len(), 0); // Verify who has updated their last flush. - assert_eq!(ds.ds_last_flush[ClientId::new(0)], flush_id); - assert_eq!(ds.ds_last_flush[ClientId::new(1)], JobId(0)); - assert_eq!(ds.ds_last_flush[ClientId::new(2)], flush_id); + assert_eq!(ds.clients[ClientId::new(0)].last_flush, flush_id); + assert_eq!(ds.clients[ClientId::new(1)].last_flush, JobId(0)); + assert_eq!(ds.clients[ClientId::new(2)].last_flush, flush_id); // Now, finish sending and completing the writes assert!(ds.in_progress(id1, ClientId::new(2)).is_some()); @@ -3106,7 +3106,7 @@ pub(crate) mod up_test { assert_eq!(ds.completed.len(), 3); // downstairs 1 should now have that flush - assert_eq!(ds.ds_last_flush[ClientId::new(1)], flush_id); + assert_eq!(ds.clients[ClientId::new(1)].last_flush, flush_id); } #[tokio::test] @@ -3835,9 +3835,9 @@ pub(crate) mod up_test { let (ds_done_tx, _ds_done_rx) = mpsc::channel(500); up.set_active().await.unwrap(); let mut ds = up.downstairs.lock().await; - ds.ds_state[ClientId::new(0)] = DsState::Active; - ds.ds_state[ClientId::new(1)] = DsState::Active; - ds.ds_state[ClientId::new(2)] = DsState::Active; + ds.clients[ClientId::new(0)].state = DsState::Active; + ds.clients[ClientId::new(1)].state = DsState::Active; + ds.clients[ClientId::new(2)].state = DsState::Active; // Build a write, put it on the work queue. let id1 = ds.next_id(); @@ -3937,9 +3937,9 @@ pub(crate) mod up_test { ds = up.downstairs.lock().await; // Make sure the correct DS have changed state. - assert_eq!(ds.ds_state[ClientId::new(0)], DsState::Deactivated); - assert_eq!(ds.ds_state[ClientId::new(2)], DsState::Deactivated); - assert_eq!(ds.ds_state[ClientId::new(1)], DsState::Active); + assert_eq!(ds.clients[ClientId::new(0)].state, DsState::Deactivated); + assert_eq!(ds.clients[ClientId::new(2)].state, DsState::Deactivated); + assert_eq!(ds.clients[ClientId::new(1)].state, DsState::Active); // Send and complete the flush ds.in_progress(flush_id, ClientId::new(1)); @@ -3969,9 +3969,9 @@ pub(crate) mod up_test { // Verify after the ds_missing, all downstairs are New let ds = up.downstairs.lock().await; - assert_eq!(ds.ds_state[ClientId::new(0)], DsState::New); - assert_eq!(ds.ds_state[ClientId::new(1)], DsState::New); - assert_eq!(ds.ds_state[ClientId::new(2)], DsState::New); + assert_eq!(ds.clients[ClientId::new(0)].state, DsState::New); + assert_eq!(ds.clients[ClientId::new(1)].state, DsState::New); + assert_eq!(ds.clients[ClientId::new(2)].state, DsState::New); } #[tokio::test] @@ -3985,9 +3985,9 @@ pub(crate) mod up_test { let (ds_done_tx, _ds_done_rx) = mpsc::channel(500); up.set_active().await.unwrap(); let mut ds = up.downstairs.lock().await; - ds.ds_state[ClientId::new(0)] = DsState::Active; - ds.ds_state[ClientId::new(1)] = DsState::Active; - ds.ds_state[ClientId::new(2)] = DsState::Active; + ds.clients[ClientId::new(0)].state = DsState::Active; + ds.clients[ClientId::new(1)].state = DsState::Active; + ds.clients[ClientId::new(2)].state = DsState::Active; drop(ds); up.set_deactivate(None, ds_done_tx.clone()).await.unwrap(); @@ -3999,9 +3999,9 @@ pub(crate) mod up_test { ds = up.downstairs.lock().await; // Make sure the correct DS have changed state. - assert_eq!(ds.ds_state[ClientId::new(0)], DsState::Deactivated); - assert_eq!(ds.ds_state[ClientId::new(1)], DsState::Deactivated); - assert_eq!(ds.ds_state[ClientId::new(2)], DsState::Deactivated); + assert_eq!(ds.clients[ClientId::new(0)].state, DsState::Deactivated); + assert_eq!(ds.clients[ClientId::new(1)].state, DsState::Deactivated); + assert_eq!(ds.clients[ClientId::new(2)].state, DsState::Deactivated); drop(ds); // Mark all three DS as missing, which moves their state to New @@ -4032,9 +4032,9 @@ pub(crate) mod up_test { let (ds_done_tx, _ds_done_rx) = mpsc::channel(500); up.set_active().await.unwrap(); let mut ds = up.downstairs.lock().await; - ds.ds_state[ClientId::new(0)] = DsState::Active; - ds.ds_state[ClientId::new(1)] = DsState::Active; - ds.ds_state[ClientId::new(2)] = DsState::Active; + ds.clients[ClientId::new(0)].state = DsState::Active; + ds.clients[ClientId::new(1)].state = DsState::Active; + ds.clients[ClientId::new(2)].state = DsState::Active; // Build a write, put it on the work queue. let id1 = ds.next_id(); @@ -4100,9 +4100,9 @@ pub(crate) mod up_test { ds = up.downstairs.lock().await; // Make sure no DS have changed state. - assert_eq!(ds.ds_state[ClientId::new(0)], DsState::Active); - assert_eq!(ds.ds_state[ClientId::new(2)], DsState::Active); - assert_eq!(ds.ds_state[ClientId::new(1)], DsState::Active); + assert_eq!(ds.clients[ClientId::new(0)].state, DsState::Active); + assert_eq!(ds.clients[ClientId::new(2)].state, DsState::Active); + assert_eq!(ds.clients[ClientId::new(1)].state, DsState::Active); } #[tokio::test] @@ -4128,9 +4128,9 @@ pub(crate) mod up_test { let up = Upstairs::test_default(None); up.set_active().await.unwrap(); let mut ds = up.downstairs.lock().await; - ds.ds_state[ClientId::new(0)] = DsState::Active; - ds.ds_state[ClientId::new(1)] = DsState::Active; - ds.ds_state[ClientId::new(2)] = DsState::Active; + ds.clients[ClientId::new(0)].state = DsState::Active; + ds.clients[ClientId::new(1)].state = DsState::Active; + ds.clients[ClientId::new(2)].state = DsState::Active; drop(ds); @@ -4141,9 +4141,9 @@ pub(crate) mod up_test { ds = up.downstairs.lock().await; // Make sure no DS have changed state. - assert_eq!(ds.ds_state[ClientId::new(0)], DsState::Active); - assert_eq!(ds.ds_state[ClientId::new(1)], DsState::Active); - assert_eq!(ds.ds_state[ClientId::new(2)], DsState::Active); + assert_eq!(ds.clients[ClientId::new(0)].state, DsState::Active); + assert_eq!(ds.clients[ClientId::new(1)].state, DsState::Active); + assert_eq!(ds.clients[ClientId::new(2)].state, DsState::Active); } #[tokio::test] @@ -4159,9 +4159,9 @@ pub(crate) mod up_test { let ds = up.downstairs.lock().await; // Make sure no DS have changed state. - assert_eq!(ds.ds_state[ClientId::new(0)], DsState::New); - assert_eq!(ds.ds_state[ClientId::new(1)], DsState::New); - assert_eq!(ds.ds_state[ClientId::new(2)], DsState::New); + assert_eq!(ds.clients[ClientId::new(0)].state, DsState::New); + assert_eq!(ds.clients[ClientId::new(1)].state, DsState::New); + assert_eq!(ds.clients[ClientId::new(2)].state, DsState::New); } #[tokio::test] @@ -4335,9 +4335,9 @@ pub(crate) mod up_test { // No repairs on the queue, should return None let up = Upstairs::test_default(None); let mut ds = up.downstairs.lock().await; - ds.ds_state[ClientId::new(0)] = DsState::Repair; - ds.ds_state[ClientId::new(1)] = DsState::Repair; - ds.ds_state[ClientId::new(2)] = DsState::Repair; + ds.clients[ClientId::new(0)].state = DsState::Repair; + ds.clients[ClientId::new(1)].state = DsState::Repair; + ds.clients[ClientId::new(2)].state = DsState::Repair; let w = ds.rep_in_progress(ClientId::new(0)); assert_eq!(w, None); } @@ -4360,17 +4360,17 @@ pub(crate) mod up_test { }, )); // A downstairs is not in Repair state - ds.ds_state[ClientId::new(0)] = DsState::Repair; - ds.ds_state[ClientId::new(1)] = DsState::WaitQuorum; - ds.ds_state[ClientId::new(2)] = DsState::Repair; + ds.clients[ClientId::new(0)].state = DsState::Repair; + ds.clients[ClientId::new(1)].state = DsState::WaitQuorum; + ds.clients[ClientId::new(2)].state = DsState::Repair; } // Move that job to next to do. let nw = up.new_rec_work().await; assert!(nw.is_err()); let mut ds = up.downstairs.lock().await; - assert_eq!(ds.ds_state[ClientId::new(0)], DsState::FailedRepair); - assert_eq!(ds.ds_state[ClientId::new(1)], DsState::WaitQuorum); - assert_eq!(ds.ds_state[ClientId::new(2)], DsState::FailedRepair); + assert_eq!(ds.clients[ClientId::new(0)].state, DsState::FailedRepair); + assert_eq!(ds.clients[ClientId::new(1)].state, DsState::WaitQuorum); + assert_eq!(ds.clients[ClientId::new(2)].state, DsState::FailedRepair); // Verify rep_in_progress now returns none for all DS assert!(ds.reconcile_task_list.is_empty()); @@ -4388,9 +4388,9 @@ pub(crate) mod up_test { let rep_id = ReconciliationId(0); { let mut ds = up.downstairs.lock().await; - ds.ds_state[ClientId::new(0)] = DsState::Repair; - ds.ds_state[ClientId::new(1)] = DsState::Repair; - ds.ds_state[ClientId::new(2)] = DsState::Repair; + ds.clients[ClientId::new(0)].state = DsState::Repair; + ds.clients[ClientId::new(1)].state = DsState::Repair; + ds.clients[ClientId::new(2)].state = DsState::Repair; // Put two jobs on the todo list ds.reconcile_task_list.push_back(ReconcileIO::new( rep_id, @@ -4410,7 +4410,7 @@ pub(crate) mod up_test { assert!(ds.rep_in_progress(ClientId::new(2)).is_some()); // Now verify we can be done even if a DS is gone - ds.ds_state[ClientId::new(1)] = DsState::New; + ds.clients[ClientId::new(1)].state = DsState::New; // Now, make sure we consider this done only after all three are done assert!(!ds.rep_done(ClientId::new(0), rep_id)); assert!(!ds.rep_done(ClientId::new(1), rep_id)); @@ -4422,9 +4422,9 @@ pub(crate) mod up_test { let nw = up.new_rec_work().await; assert!(nw.is_err()); let mut ds = up.downstairs.lock().await; - assert_eq!(ds.ds_state[ClientId::new(0)], DsState::FailedRepair); - assert_eq!(ds.ds_state[ClientId::new(1)], DsState::New); - assert_eq!(ds.ds_state[ClientId::new(2)], DsState::FailedRepair); + assert_eq!(ds.clients[ClientId::new(0)].state, DsState::FailedRepair); + assert_eq!(ds.clients[ClientId::new(1)].state, DsState::New); + assert_eq!(ds.clients[ClientId::new(2)].state, DsState::FailedRepair); // Verify rep_in_progress now returns none for all DS assert!(ds.reconcile_task_list.is_empty()); @@ -4441,9 +4441,9 @@ pub(crate) mod up_test { let rep_id = ReconciliationId(0); { let mut ds = up.downstairs.lock().await; - ds.ds_state[ClientId::new(0)] = DsState::Repair; - ds.ds_state[ClientId::new(1)] = DsState::Repair; - ds.ds_state[ClientId::new(2)] = DsState::Repair; + ds.clients[ClientId::new(0)].state = DsState::Repair; + ds.clients[ClientId::new(1)].state = DsState::Repair; + ds.clients[ClientId::new(2)].state = DsState::Repair; // Put a job on the todo list ds.reconcile_task_list.push_back(ReconcileIO::new( rep_id, @@ -4460,12 +4460,12 @@ pub(crate) mod up_test { // Mark all three as in progress assert!(ds.rep_in_progress(ClientId::new(0)).is_some()); assert!(ds.rep_in_progress(ClientId::new(1)).is_some()); - ds.ds_state[ClientId::new(2)] = DsState::New; + ds.clients[ClientId::new(2)].state = DsState::New; assert!(ds.rep_in_progress(ClientId::new(2)).is_none()); // Okay, now the DS is back and ready for repair, verify it will // start taking work. - ds.ds_state[ClientId::new(2)] = DsState::Repair; + ds.clients[ClientId::new(2)].state = DsState::Repair; assert!(ds.rep_in_progress(ClientId::new(2)).is_some()); } @@ -4477,9 +4477,9 @@ pub(crate) mod up_test { let rep_id = ReconciliationId(0); { let mut ds = up.downstairs.lock().await; - ds.ds_state[ClientId::new(0)] = DsState::Repair; - ds.ds_state[ClientId::new(1)] = DsState::Repair; - ds.ds_state[ClientId::new(2)] = DsState::Repair; + ds.clients[ClientId::new(0)].state = DsState::Repair; + ds.clients[ClientId::new(1)].state = DsState::Repair; + ds.clients[ClientId::new(2)].state = DsState::Repair; // Put a job on the todo list ds.reconcile_task_list.push_back(ReconcileIO::new( rep_id, @@ -4504,9 +4504,9 @@ pub(crate) mod up_test { let rep_id = ReconciliationId(0); { let mut ds = up.downstairs.lock().await; - ds.ds_state[ClientId::new(0)] = DsState::Repair; - ds.ds_state[ClientId::new(1)] = DsState::Repair; - ds.ds_state[ClientId::new(2)] = DsState::Repair; + ds.clients[ClientId::new(0)].state = DsState::Repair; + ds.clients[ClientId::new(1)].state = DsState::Repair; + ds.clients[ClientId::new(2)].state = DsState::Repair; // Put a job on the todo list ds.reconcile_task_list.push_back(ReconcileIO::new( rep_id, @@ -4528,9 +4528,9 @@ pub(crate) mod up_test { let mut rep_id = ReconciliationId(0); { let mut ds = up.downstairs.lock().await; - ds.ds_state[ClientId::new(0)] = DsState::Repair; - ds.ds_state[ClientId::new(1)] = DsState::Repair; - ds.ds_state[ClientId::new(2)] = DsState::Repair; + ds.clients[ClientId::new(0)].state = DsState::Repair; + ds.clients[ClientId::new(1)].state = DsState::Repair; + ds.clients[ClientId::new(2)].state = DsState::Repair; // Put two jobs on the todo list ds.reconcile_task_list.push_back(ReconcileIO::new( rep_id, @@ -4590,9 +4590,9 @@ pub(crate) mod up_test { let rep_id = ReconciliationId(0); { let mut ds = up.downstairs.lock().await; - ds.ds_state[ClientId::new(0)] = DsState::Repair; - ds.ds_state[ClientId::new(1)] = DsState::Repair; - ds.ds_state[ClientId::new(2)] = DsState::Repair; + ds.clients[ClientId::new(0)].state = DsState::Repair; + ds.clients[ClientId::new(1)].state = DsState::Repair; + ds.clients[ClientId::new(2)].state = DsState::Repair; // Put two jobs on the todo list ds.reconcile_task_list.push_back(ReconcileIO::new( rep_id, @@ -4636,9 +4636,9 @@ pub(crate) mod up_test { let rep_id = ReconciliationId(0); { let mut ds = up.downstairs.lock().await; - ds.ds_state[ClientId::new(0)] = DsState::Repair; - ds.ds_state[ClientId::new(1)] = DsState::Repair; - ds.ds_state[ClientId::new(2)] = DsState::Repair; + ds.clients[ClientId::new(0)].state = DsState::Repair; + ds.clients[ClientId::new(1)].state = DsState::Repair; + ds.clients[ClientId::new(2)].state = DsState::Repair; // Put a job on the todo list ds.reconcile_task_list.push_back(ReconcileIO::new( rep_id, @@ -4677,9 +4677,9 @@ pub(crate) mod up_test { let rep_id = ReconciliationId(0); { let mut ds = up.downstairs.lock().await; - ds.ds_state[ClientId::new(0)] = DsState::Repair; - ds.ds_state[ClientId::new(1)] = DsState::Repair; - ds.ds_state[ClientId::new(2)] = DsState::Repair; + ds.clients[ClientId::new(0)].state = DsState::Repair; + ds.clients[ClientId::new(1)].state = DsState::Repair; + ds.clients[ClientId::new(2)].state = DsState::Repair; // Put a job on the todo list ds.reconcile_task_list.push_back(ReconcileIO::new( rep_id, @@ -4713,9 +4713,9 @@ pub(crate) mod up_test { let rep_id = ReconciliationId(0); { let mut ds = up.downstairs.lock().await; - ds.ds_state[ClientId::new(0)] = DsState::Repair; - ds.ds_state[ClientId::new(1)] = DsState::Repair; - ds.ds_state[ClientId::new(2)] = DsState::Repair; + ds.clients[ClientId::new(0)].state = DsState::Repair; + ds.clients[ClientId::new(1)].state = DsState::Repair; + ds.clients[ClientId::new(2)].state = DsState::Repair; // Put a job on the todo list ds.reconcile_task_list.push_back(ReconcileIO::new( rep_id, @@ -4744,9 +4744,9 @@ pub(crate) mod up_test { let r0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 801); let r1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 802); let r2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 803); - ds.ds_repair.insert(ClientId::new(0), r0); - ds.ds_repair.insert(ClientId::new(1), r1); - ds.ds_repair.insert(ClientId::new(2), r2); + ds.clients[ClientId::new(0)].repair_addr = Some(r0); + ds.clients[ClientId::new(1)].repair_addr = Some(r1); + ds.clients[ClientId::new(2)].repair_addr = Some(r2); let repair_extent = 9; let mut rec_list = HashMap::new(); @@ -4864,9 +4864,9 @@ pub(crate) mod up_test { let r0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 801); let r1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 802); let r2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 803); - ds.ds_repair.insert(ClientId::new(0), r0); - ds.ds_repair.insert(ClientId::new(1), r1); - ds.ds_repair.insert(ClientId::new(2), r2); + ds.clients[ClientId::new(0)].repair_addr = Some(r0); + ds.clients[ClientId::new(1)].repair_addr = Some(r1); + ds.clients[ClientId::new(2)].repair_addr = Some(r2); let repair_extent = 5; let mut rec_list = HashMap::new(); @@ -5729,9 +5729,9 @@ pub(crate) mod up_test { assert!(ds.in_progress(next_id, ClientId::new(2)).is_some()); // We should have one job on the skipped job list for failed DS - assert_eq!(ds.ds_skipped_jobs[ClientId::new(0)].len(), 1); - assert_eq!(ds.ds_skipped_jobs[ClientId::new(1)].len(), 0); - assert_eq!(ds.ds_skipped_jobs[ClientId::new(2)].len(), 0); + assert_eq!(ds.clients[ClientId::new(0)].skipped_jobs.len(), 1); + assert_eq!(ds.clients[ClientId::new(1)].skipped_jobs.len(), 0); + assert_eq!(ds.clients[ClientId::new(2)].skipped_jobs.len(), 0); next_id }; @@ -5807,10 +5807,12 @@ pub(crate) mod up_test { assert_eq!(ds.completed.len(), 3); // The last skipped flush should still be on the skipped list - assert_eq!(ds.ds_skipped_jobs[ClientId::new(0)].len(), 1); - assert!(ds.ds_skipped_jobs[ClientId::new(0)].contains(&JobId(1002))); - assert_eq!(ds.ds_skipped_jobs[ClientId::new(1)].len(), 0); - assert_eq!(ds.ds_skipped_jobs[ClientId::new(2)].len(), 0); + assert_eq!(ds.clients[ClientId::new(0)].skipped_jobs.len(), 1); + assert!(ds.clients[ClientId::new(0)] + .skipped_jobs + .contains(&JobId(1002))); + assert_eq!(ds.clients[ClientId::new(1)].skipped_jobs.len(), 0); + assert_eq!(ds.clients[ClientId::new(2)].skipped_jobs.len(), 0); } #[tokio::test] @@ -5914,9 +5916,9 @@ pub(crate) mod up_test { assert!(ds.in_progress(next_id, ClientId::new(2)).is_some()); // Two downstairs should have a skipped job on their lists. - assert_eq!(ds.ds_skipped_jobs[ClientId::new(0)].len(), 1); - assert_eq!(ds.ds_skipped_jobs[ClientId::new(1)].len(), 1); - assert_eq!(ds.ds_skipped_jobs[ClientId::new(2)].len(), 0); + assert_eq!(ds.clients[ClientId::new(0)].skipped_jobs.len(), 1); + assert_eq!(ds.clients[ClientId::new(1)].skipped_jobs.len(), 1); + assert_eq!(ds.clients[ClientId::new(2)].skipped_jobs.len(), 0); next_id }; @@ -6176,9 +6178,9 @@ pub(crate) mod up_test { // A faulted write won't change skipped job count. let ds = up.downstairs.lock().await; - assert_eq!(ds.ds_skipped_jobs[ClientId::new(0)].len(), 0); - assert_eq!(ds.ds_skipped_jobs[ClientId::new(1)].len(), 0); - assert_eq!(ds.ds_skipped_jobs[ClientId::new(2)].len(), 0); + assert_eq!(ds.clients[ClientId::new(0)].skipped_jobs.len(), 0); + assert_eq!(ds.clients[ClientId::new(1)].skipped_jobs.len(), 0); + assert_eq!(ds.clients[ClientId::new(2)].skipped_jobs.len(), 0); drop(ds); // Verify we can ack this work @@ -6218,10 +6220,12 @@ pub(crate) mod up_test { // One downstairs should have a skipped job on its list. let ds = up.downstairs.lock().await; - assert_eq!(ds.ds_skipped_jobs[ClientId::new(0)].len(), 0); - assert_eq!(ds.ds_skipped_jobs[ClientId::new(1)].len(), 1); - assert!(ds.ds_skipped_jobs[ClientId::new(1)].contains(&JobId(1001))); - assert_eq!(ds.ds_skipped_jobs[ClientId::new(2)].len(), 0); + assert_eq!(ds.clients[ClientId::new(0)].skipped_jobs.len(), 0); + assert_eq!(ds.clients[ClientId::new(1)].skipped_jobs.len(), 1); + assert!(ds.clients[ClientId::new(1)] + .skipped_jobs + .contains(&JobId(1001))); + assert_eq!(ds.clients[ClientId::new(2)].skipped_jobs.len(), 0); drop(ds); // Enqueue the flush. @@ -6273,10 +6277,12 @@ pub(crate) mod up_test { assert_eq!(ds.completed.len(), 3); // Only the skipped flush should remain. - assert_eq!(ds.ds_skipped_jobs[ClientId::new(0)].len(), 0); - assert_eq!(ds.ds_skipped_jobs[ClientId::new(1)].len(), 1); - assert!(ds.ds_skipped_jobs[ClientId::new(1)].contains(&JobId(1002))); - assert_eq!(ds.ds_skipped_jobs[ClientId::new(2)].len(), 0); + assert_eq!(ds.clients[ClientId::new(0)].skipped_jobs.len(), 0); + assert_eq!(ds.clients[ClientId::new(1)].skipped_jobs.len(), 1); + assert!(ds.clients[ClientId::new(1)] + .skipped_jobs + .contains(&JobId(1002))); + assert_eq!(ds.clients[ClientId::new(2)].skipped_jobs.len(), 0); } #[tokio::test] @@ -6361,9 +6367,9 @@ pub(crate) mod up_test { assert_eq!(job.state[ClientId::new(1)], IOState::Skipped); assert_eq!(job.state[ClientId::new(2)], IOState::New); - assert_eq!(ds.ds_skipped_jobs[ClientId::new(0)].len(), 0); - assert_eq!(ds.ds_skipped_jobs[ClientId::new(1)].len(), 1); - assert_eq!(ds.ds_skipped_jobs[ClientId::new(2)].len(), 0); + assert_eq!(ds.clients[ClientId::new(0)].skipped_jobs.len(), 0); + assert_eq!(ds.clients[ClientId::new(1)].skipped_jobs.len(), 1); + assert_eq!(ds.clients[ClientId::new(2)].skipped_jobs.len(), 0); drop(ds); } @@ -6437,9 +6443,9 @@ pub(crate) mod up_test { assert_eq!(job.state[ClientId::new(1)], IOState::Skipped); assert_eq!(job.state[ClientId::new(2)], IOState::InProgress); - assert_eq!(ds.ds_skipped_jobs[ClientId::new(0)].len(), 0); - assert_eq!(ds.ds_skipped_jobs[ClientId::new(1)].len(), 1); - assert_eq!(ds.ds_skipped_jobs[ClientId::new(2)].len(), 0); + assert_eq!(ds.clients[ClientId::new(0)].skipped_jobs.len(), 0); + assert_eq!(ds.clients[ClientId::new(1)].skipped_jobs.len(), 1); + assert_eq!(ds.clients[ClientId::new(2)].skipped_jobs.len(), 0); drop(ds); } @@ -6492,9 +6498,9 @@ pub(crate) mod up_test { let job = ds.ds_active.get(&write_one).unwrap(); assert_eq!(job.state[cid], IOState::Done); } - assert_eq!(ds.ds_skipped_jobs[ClientId::new(0)].len(), 0); - assert_eq!(ds.ds_skipped_jobs[ClientId::new(1)].len(), 0); - assert_eq!(ds.ds_skipped_jobs[ClientId::new(2)].len(), 0); + assert_eq!(ds.clients[ClientId::new(0)].skipped_jobs.len(), 0); + assert_eq!(ds.clients[ClientId::new(1)].skipped_jobs.len(), 0); + assert_eq!(ds.clients[ClientId::new(2)].skipped_jobs.len(), 0); drop(ds); // New write, this one will have a failure @@ -6544,9 +6550,9 @@ pub(crate) mod up_test { ); // A failed job does not change the skipped count. - assert_eq!(ds.ds_skipped_jobs[ClientId::new(0)].len(), 0); - assert_eq!(ds.ds_skipped_jobs[ClientId::new(1)].len(), 0); - assert_eq!(ds.ds_skipped_jobs[ClientId::new(2)].len(), 0); + assert_eq!(ds.clients[ClientId::new(0)].skipped_jobs.len(), 0); + assert_eq!(ds.clients[ClientId::new(1)].skipped_jobs.len(), 0); + assert_eq!(ds.clients[ClientId::new(2)].skipped_jobs.len(), 0); drop(ds); } @@ -6670,9 +6676,9 @@ pub(crate) mod up_test { assert_eq!(job.state[ClientId::new(1)], IOState::InProgress); assert_eq!(job.state[ClientId::new(2)], IOState::Skipped); - assert_eq!(ds.ds_skipped_jobs[ClientId::new(0)].len(), 0); - assert_eq!(ds.ds_skipped_jobs[ClientId::new(1)].len(), 0); - assert_eq!(ds.ds_skipped_jobs[ClientId::new(2)].len(), 1); + assert_eq!(ds.clients[ClientId::new(0)].skipped_jobs.len(), 0); + assert_eq!(ds.clients[ClientId::new(1)].skipped_jobs.len(), 0); + assert_eq!(ds.clients[ClientId::new(2)].skipped_jobs.len(), 1); drop(ds); } @@ -6724,9 +6730,9 @@ pub(crate) mod up_test { assert_eq!(job.state[ClientId::new(2)], IOState::New); // Three skipped jobs for downstairs zero - assert_eq!(ds.ds_skipped_jobs[ClientId::new(0)].len(), 3); - assert_eq!(ds.ds_skipped_jobs[ClientId::new(1)].len(), 0); - assert_eq!(ds.ds_skipped_jobs[ClientId::new(2)].len(), 0); + assert_eq!(ds.clients[ClientId::new(0)].skipped_jobs.len(), 3); + assert_eq!(ds.clients[ClientId::new(1)].skipped_jobs.len(), 0); + assert_eq!(ds.clients[ClientId::new(2)].skipped_jobs.len(), 0); drop(ds); } @@ -6779,9 +6785,9 @@ pub(crate) mod up_test { assert_eq!(job.state[ClientId::new(2)], IOState::InProgress); // Three skipped jobs on downstairs client 0 - assert_eq!(ds.ds_skipped_jobs[ClientId::new(0)].len(), 3); - assert_eq!(ds.ds_skipped_jobs[ClientId::new(1)].len(), 0); - assert_eq!(ds.ds_skipped_jobs[ClientId::new(2)].len(), 0); + assert_eq!(ds.clients[ClientId::new(0)].skipped_jobs.len(), 3); + assert_eq!(ds.clients[ClientId::new(1)].skipped_jobs.len(), 0); + assert_eq!(ds.clients[ClientId::new(2)].skipped_jobs.len(), 0); drop(ds); // Do the write @@ -6832,8 +6838,10 @@ pub(crate) mod up_test { ds.retire_check(flush_one); // Skipped jobs just has the flush - assert_eq!(ds.ds_skipped_jobs[ClientId::new(0)].len(), 1); - assert!(ds.ds_skipped_jobs[ClientId::new(0)].contains(&JobId(1002))); + assert_eq!(ds.clients[ClientId::new(0)].skipped_jobs.len(), 1); + assert!(ds.clients[ClientId::new(0)] + .skipped_jobs + .contains(&JobId(1002))); assert_eq!(ds.ackable_work().len(), 0); // The writes, the read, and the flush should be completed. @@ -6895,9 +6903,9 @@ pub(crate) mod up_test { assert_eq!(job.state[ClientId::new(2)], IOState::Skipped); // Skipped jobs added on downstairs client 0 - assert_eq!(ds.ds_skipped_jobs[ClientId::new(0)].len(), 3); - assert_eq!(ds.ds_skipped_jobs[ClientId::new(1)].len(), 0); - assert_eq!(ds.ds_skipped_jobs[ClientId::new(2)].len(), 3); + assert_eq!(ds.clients[ClientId::new(0)].skipped_jobs.len(), 3); + assert_eq!(ds.clients[ClientId::new(1)].skipped_jobs.len(), 0); + assert_eq!(ds.clients[ClientId::new(2)].skipped_jobs.len(), 3); drop(ds); // Do the write @@ -6936,10 +6944,14 @@ pub(crate) mod up_test { ds.retire_check(flush_one); // Skipped jobs now just have the flush. - assert_eq!(ds.ds_skipped_jobs[ClientId::new(0)].len(), 1); - assert!(ds.ds_skipped_jobs[ClientId::new(0)].contains(&JobId(1002))); - assert_eq!(ds.ds_skipped_jobs[ClientId::new(2)].len(), 1); - assert!(ds.ds_skipped_jobs[ClientId::new(2)].contains(&JobId(1002))); + assert_eq!(ds.clients[ClientId::new(0)].skipped_jobs.len(), 1); + assert!(ds.clients[ClientId::new(0)] + .skipped_jobs + .contains(&JobId(1002))); + assert_eq!(ds.clients[ClientId::new(2)].skipped_jobs.len(), 1); + assert!(ds.clients[ClientId::new(2)] + .skipped_jobs + .contains(&JobId(1002))); assert_eq!(ds.ackable_work().len(), 0); // The writes, the read, and the flush should be completed. @@ -6985,9 +6997,9 @@ pub(crate) mod up_test { assert_eq!(job.state[ClientId::new(1)], IOState::Skipped); assert_eq!(job.state[ClientId::new(2)], IOState::Skipped); - assert_eq!(ds.ds_skipped_jobs[ClientId::new(0)].len(), 1); - assert_eq!(ds.ds_skipped_jobs[ClientId::new(1)].len(), 1); - assert_eq!(ds.ds_skipped_jobs[ClientId::new(2)].len(), 1); + assert_eq!(ds.clients[ClientId::new(0)].skipped_jobs.len(), 1); + assert_eq!(ds.clients[ClientId::new(1)].skipped_jobs.len(), 1); + assert_eq!(ds.clients[ClientId::new(2)].skipped_jobs.len(), 1); drop(ds); // Verify jobs can be acked. @@ -7002,9 +7014,9 @@ pub(crate) mod up_test { ds.retire_check(read_one); // Our skipped jobs have not yet been cleared. - assert_eq!(ds.ds_skipped_jobs[ClientId::new(0)].len(), 1); - assert_eq!(ds.ds_skipped_jobs[ClientId::new(1)].len(), 1); - assert_eq!(ds.ds_skipped_jobs[ClientId::new(2)].len(), 1); + assert_eq!(ds.clients[ClientId::new(0)].skipped_jobs.len(), 1); + assert_eq!(ds.clients[ClientId::new(1)].skipped_jobs.len(), 1); + assert_eq!(ds.clients[ClientId::new(2)].skipped_jobs.len(), 1); assert_eq!(ds.ackable_work().len(), 0); } @@ -7034,9 +7046,9 @@ pub(crate) mod up_test { assert_eq!(job.state[ClientId::new(1)], IOState::Skipped); assert_eq!(job.state[ClientId::new(2)], IOState::Skipped); - assert_eq!(ds.ds_skipped_jobs[ClientId::new(0)].len(), 1); - assert_eq!(ds.ds_skipped_jobs[ClientId::new(1)].len(), 1); - assert_eq!(ds.ds_skipped_jobs[ClientId::new(2)].len(), 1); + assert_eq!(ds.clients[ClientId::new(0)].skipped_jobs.len(), 1); + assert_eq!(ds.clients[ClientId::new(1)].skipped_jobs.len(), 1); + assert_eq!(ds.clients[ClientId::new(2)].skipped_jobs.len(), 1); drop(ds); // Verify jobs can be acked. @@ -7050,9 +7062,9 @@ pub(crate) mod up_test { ds.retire_check(write_one); // No flush, no change in skipped jobs. - assert_eq!(ds.ds_skipped_jobs[ClientId::new(0)].len(), 1); - assert_eq!(ds.ds_skipped_jobs[ClientId::new(1)].len(), 1); - assert_eq!(ds.ds_skipped_jobs[ClientId::new(2)].len(), 1); + assert_eq!(ds.clients[ClientId::new(0)].skipped_jobs.len(), 1); + assert_eq!(ds.clients[ClientId::new(1)].skipped_jobs.len(), 1); + assert_eq!(ds.clients[ClientId::new(2)].skipped_jobs.len(), 1); assert_eq!(ds.ackable_work().len(), 0); } @@ -7083,8 +7095,8 @@ pub(crate) mod up_test { assert_eq!(job.state[ClientId::new(1)], IOState::Skipped); assert_eq!(job.state[ClientId::new(2)], IOState::Skipped); for cid in ClientId::iter() { - assert_eq!(ds.ds_skipped_jobs[cid].len(), 1); - assert!(ds.ds_skipped_jobs[cid].contains(&JobId(1000))); + assert_eq!(ds.clients[cid].skipped_jobs.len(), 1); + assert!(ds.clients[cid].skipped_jobs.contains(&JobId(1000))); } drop(ds); @@ -7107,8 +7119,8 @@ pub(crate) mod up_test { // Skipped jobs still has the flush. for cid in ClientId::iter() { - assert_eq!(ds.ds_skipped_jobs[cid].len(), 1); - assert!(ds.ds_skipped_jobs[cid].contains(&JobId(1000))); + assert_eq!(ds.clients[cid].skipped_jobs.len(), 1); + assert!(ds.clients[cid].skipped_jobs.contains(&JobId(1000))); } drop(ds); } @@ -7153,10 +7165,10 @@ pub(crate) mod up_test { // Skipped jobs are not yet cleared. for cid in ClientId::iter() { - assert!(ds.ds_skipped_jobs[cid].contains(&JobId(1000))); - assert!(ds.ds_skipped_jobs[cid].contains(&JobId(1001))); - assert!(ds.ds_skipped_jobs[cid].contains(&JobId(1002))); - assert_eq!(ds.ds_skipped_jobs[cid].len(), 3); + assert!(ds.clients[cid].skipped_jobs.contains(&JobId(1000))); + assert!(ds.clients[cid].skipped_jobs.contains(&JobId(1001))); + assert!(ds.clients[cid].skipped_jobs.contains(&JobId(1002))); + assert_eq!(ds.clients[cid].skipped_jobs.len(), 3); } // Verify all IOs are done @@ -7180,8 +7192,8 @@ pub(crate) mod up_test { // Skipped jobs now just has the flush for cid in ClientId::iter() { - assert!(ds.ds_skipped_jobs[cid].contains(&JobId(1002))); - assert_eq!(ds.ds_skipped_jobs[cid].len(), 1); + assert!(ds.clients[cid].skipped_jobs.contains(&JobId(1002))); + assert_eq!(ds.clients[cid].skipped_jobs.len(), 1); } } @@ -7237,13 +7249,13 @@ pub(crate) mod up_test { // Six jobs have been skipped. for cid in ClientId::iter() { - assert!(ds.ds_skipped_jobs[cid].contains(&JobId(1000))); - assert!(ds.ds_skipped_jobs[cid].contains(&JobId(1001))); - assert!(ds.ds_skipped_jobs[cid].contains(&JobId(1002))); - assert!(ds.ds_skipped_jobs[cid].contains(&JobId(1003))); - assert!(ds.ds_skipped_jobs[cid].contains(&JobId(1004))); - assert!(ds.ds_skipped_jobs[cid].contains(&JobId(1005))); - assert_eq!(ds.ds_skipped_jobs[cid].len(), 6); + assert!(ds.clients[cid].skipped_jobs.contains(&JobId(1000))); + assert!(ds.clients[cid].skipped_jobs.contains(&JobId(1001))); + assert!(ds.clients[cid].skipped_jobs.contains(&JobId(1002))); + assert!(ds.clients[cid].skipped_jobs.contains(&JobId(1003))); + assert!(ds.clients[cid].skipped_jobs.contains(&JobId(1004))); + assert!(ds.clients[cid].skipped_jobs.contains(&JobId(1005))); + assert_eq!(ds.clients[cid].skipped_jobs.len(), 6); } // Ack the first 3 jobs @@ -7258,11 +7270,11 @@ pub(crate) mod up_test { // The first two skipped jobs are now cleared and the non-acked // jobs remain on the list, as well as the last flush. for cid in ClientId::iter() { - assert!(ds.ds_skipped_jobs[cid].contains(&JobId(1002))); - assert!(ds.ds_skipped_jobs[cid].contains(&JobId(1003))); - assert!(ds.ds_skipped_jobs[cid].contains(&JobId(1004))); - assert!(ds.ds_skipped_jobs[cid].contains(&JobId(1005))); - assert_eq!(ds.ds_skipped_jobs[cid].len(), 4); + assert!(ds.clients[cid].skipped_jobs.contains(&JobId(1002))); + assert!(ds.clients[cid].skipped_jobs.contains(&JobId(1003))); + assert!(ds.clients[cid].skipped_jobs.contains(&JobId(1004))); + assert!(ds.clients[cid].skipped_jobs.contains(&JobId(1005))); + assert_eq!(ds.clients[cid].skipped_jobs.len(), 4); } }