Skip to content

Commit

Permalink
CR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Oct 31, 2023
1 parent c7303e3 commit 40efe38
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 94 deletions.
11 changes: 10 additions & 1 deletion quickwit/quickwit-control-plane/src/indexing_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,21 @@ use serde::Serialize;
/// each indexer, identified by its node ID, should run.
/// TODO(fmassot): a metastore version number will be attached to the plan
/// to identify if the plan is up to date with the metastore.
#[derive(Debug, PartialEq, Clone, Serialize, Default)]
#[derive(Debug, PartialEq, Clone, Serialize)]
pub struct PhysicalIndexingPlan {
indexing_tasks_per_indexer_id: FnvHashMap<String, Vec<IndexingTask>>,
}

impl PhysicalIndexingPlan {
pub fn with_indexer_ids(indexer_ids: &[String]) -> PhysicalIndexingPlan {
PhysicalIndexingPlan {
indexing_tasks_per_indexer_id: indexer_ids
.iter()
.map(|indexer_id| (indexer_id.clone(), Vec::new()))
.collect(),
}
}

pub fn add_indexing_task(&mut self, indexer_id: &str, indexing_task: IndexingTask) {
self.indexing_tasks_per_indexer_id
.entry(indexer_id.to_string())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,15 @@ fn populate_problem(
load_per_shard,
} => {
let num_shards = shards.len() as u32;
let source_id = problem.add_source(num_shards, *load_per_shard);
Some(source_id)
let source_ord = problem.add_source(num_shards, *load_per_shard);
Some(source_ord)
}
SourceToScheduleType::NonSharded {
num_pipelines,
load_per_pipeline,
} => {
let source_id = problem.add_source(*num_pipelines, *load_per_pipeline);
Some(source_id)
let source_ord = problem.add_source(*num_pipelines, *load_per_pipeline);
Some(source_ord)
}
}
}
Expand Down Expand Up @@ -151,7 +151,7 @@ fn convert_physical_plan_to_solution(
source_id: indexing_task.source_id.clone(),
};
if let Some(source_ord) = id_to_ord_map.source_ord(&source_uid) {
indexer_assignment.add_shard(source_ord, indexing_task.shard_ids.len() as u32);
indexer_assignment.add_shards(source_ord, indexing_task.shard_ids.len() as u32);
}
}
}
Expand Down Expand Up @@ -249,7 +249,8 @@ fn convert_scheduling_solution_to_physical_plan(
.map(|previous_plan| create_shard_to_indexer_map(previous_plan, id_to_ord_map))
.unwrap_or_default();

let mut physical_indexing_plan = PhysicalIndexingPlan::default();
let mut physical_indexing_plan =
PhysicalIndexingPlan::with_indexer_ids(&id_to_ord_map.indexer_ids);

for source in sources {
match &source.source_type {
Expand Down Expand Up @@ -353,9 +354,9 @@ pub fn build_physical_indexing_plan(
let mut problem = SchedulingProblem::with_indexer_maximum_load(indexer_max_loads);

for source in sources {
if let Some(source_id) = populate_problem(source, &mut problem) {
let source_ord = id_to_ord_map.add_source_uid(source.source_uid.clone());
assert_eq!(source_ord, source_id);
if let Some(source_ord) = populate_problem(source, &mut problem) {
let registered_source_ord = id_to_ord_map.add_source_uid(source.source_uid.clone());
assert_eq!(source_ord, registered_source_ord);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ fn check_contract_conditions(problem: &SchedulingProblem, solution: &SchedulingS
for (node_id, indexer_assignment) in solution.indexer_assignments.iter().enumerate() {
assert_eq!(indexer_assignment.indexer_ord, node_id);
}
for (source_id, source) in problem.sources().enumerate() {
assert_eq!(source_id as SourceOrd, source.source_id);
for (source_ord, source) in problem.sources().enumerate() {
assert_eq!(source_ord as SourceOrd, source.source_ord);
}
}

Expand All @@ -55,7 +55,7 @@ pub fn solve(
// Remove shards in solution that are not needed anymore

fn remove_extraneous_shards(problem: &SchedulingProblem, solution: &mut SchedulingSolution) {
let mut num_shards_per_source: Box<[u32]> = vec![0; problem.num_sources()].into_boxed_slice();
let mut num_shards_per_source: Vec<u32> = vec![0; problem.num_sources()];
for indexer_assignment in &mut solution.indexer_assignments {
if let Some((&source_ord, _)) = indexer_assignment.num_shards_per_source.last_key_value() {
assert!(source_ord < problem.num_sources() as SourceOrd);
Expand All @@ -66,12 +66,11 @@ fn remove_extraneous_shards(problem: &SchedulingProblem, solution: &mut Scheduli
}
let num_shards_per_source_to_remove: Vec<(SourceOrd, u32)> = num_shards_per_source
.into_iter()
.copied()
.zip(problem.sources())
.flat_map(|(num_shards, source)| {
let target_num_shards = source.num_shards;
if target_num_shards < num_shards {
Some((source.source_id, num_shards - target_num_shards))
Some((source.source_ord, num_shards - target_num_shards))
} else {
None
}
Expand All @@ -91,20 +90,20 @@ fn remove_extraneous_shards(problem: &SchedulingProblem, solution: &mut Scheduli
.map(|indexer_assignment| indexer_assignment.indexer_available_capacity(problem))
.collect();

for (source_id, mut num_shards_to_remove) in num_shards_per_source_to_remove {
for (source_ord, mut num_shards_to_remove) in num_shards_per_source_to_remove {
let nodes_with_source = nodes_with_source
.get_mut(&source_id)
.get_mut(&source_ord)
// Unwrap is safe here. By construction if we need to decrease the number of shard of a
// given source, at least one node has it.
.unwrap();
nodes_with_source.sort_by_key(|&node_id| indexer_available_capacity[node_id]);
for node_id in nodes_with_source.iter().copied() {
let indexer_assignment = &mut solution.indexer_assignments[node_id];
let previous_num_shards = indexer_assignment.num_shards(source_id);
let previous_num_shards = indexer_assignment.num_shards(source_ord);
assert!(previous_num_shards > 0);
assert!(num_shards_to_remove > 0);
let num_shards_removed = previous_num_shards.min(num_shards_to_remove);
indexer_assignment.remove_shards(source_id, num_shards_removed);
indexer_assignment.remove_shards(source_ord, num_shards_removed);
num_shards_to_remove -= num_shards_removed;
// We update the node capacity since its load has changed.
indexer_available_capacity[node_id] =
Expand All @@ -129,7 +128,7 @@ fn assert_remove_extraneous_shards_post_condition(
}
}
for source in problem.sources() {
assert!(num_shards_per_source[source.source_id as usize] <= source.num_shards);
assert!(num_shards_per_source[source.source_ord as usize] <= source.num_shards);
}
}

Expand Down Expand Up @@ -157,14 +156,14 @@ fn enforce_node_max_load(
let mut load_sources: Vec<(Load, SourceOrd)> = indexer_assignment
.num_shards_per_source
.iter()
.map(|(source_id, num_shards)| {
let load_for_source = problem.source_load_per_shard(*source_id).get() * num_shards;
(load_for_source, *source_id)
.map(|(source_ord, num_shards)| {
let load_for_source = problem.source_load_per_shard(*source_ord).get() * num_shards;
(load_for_source, *source_ord)
})
.collect();
load_sources.sort();
for (load, source_id) in load_sources {
indexer_assignment.num_shards_per_source.remove(&source_id);
for (load, source_ord) in load_sources {
indexer_assignment.num_shards_per_source.remove(&source_ord);
load_to_remove = load_to_remove.saturating_sub(load);
if load_to_remove == 0 {
break;
Expand Down Expand Up @@ -208,7 +207,7 @@ fn place_unassigned_shards(
place_unassigned_shards_single_source(&source, &mut node_with_least_loads, solution);
// We haven't been able to place this source entirely.
if num_shards_unassigned != 0 {
unassignable_shards.insert(source.source_id, num_shards_unassigned);
unassignable_shards.insert(source.source_ord, num_shards_unassigned);
}
}
assert_place_unassigned_shards_post_condition(problem, solution, &unassignable_shards);
Expand All @@ -225,12 +224,12 @@ fn assert_place_unassigned_shards_post_condition(
let num_assigned_shards: u32 = solution
.indexer_assignments
.iter()
.map(|indexer_assignment| indexer_assignment.num_shards(source.source_id))
.map(|indexer_assignment| indexer_assignment.num_shards(source.source_ord))
.sum();
assert_eq!(
num_assigned_shards
+ unassigned_shards
.get(&source.source_id)
.get(&source.source_ord)
.copied()
.unwrap_or(0),
source.num_shards
Expand All @@ -239,9 +238,9 @@ fn assert_place_unassigned_shards_post_condition(
// We make sure that all unassigned shard cannot be placed.
for indexer_assignment in &solution.indexer_assignments {
let available_capacity: Load = indexer_assignment.indexer_available_capacity(problem);
for (&source_id, &num_shards) in unassigned_shards {
for (&source_ord, &num_shards) in unassigned_shards {
assert!(num_shards > 0);
let source = problem.source(source_id);
let source = problem.source(source_ord);
assert!(source.load_per_shard.get() > available_capacity);
}
}
Expand All @@ -267,7 +266,7 @@ fn place_unassigned_shards_single_source(
}
// TODO take in account colocation.
// Update the solution, the shard load, and the number of shards to place.
solution.indexer_assignments[node_id].add_shard(source.source_id, num_shards_to_place);
solution.indexer_assignments[node_id].add_shards(source.source_ord, num_shards_to_place);
*available_capacity -= num_shards_to_place * source.load_per_shard.get();
num_shards -= num_shards_to_place;
}
Expand All @@ -280,11 +279,11 @@ fn compute_unassigned_sources(
) -> Vec<Source> {
let mut unassigned_sources: BTreeMap<SourceOrd, Source> = problem
.sources()
.map(|source| (source.source_id as SourceOrd, source))
.map(|source| (source.source_ord as SourceOrd, source))
.collect();
for indexer_assignment in &solution.indexer_assignments {
for (&source_id, &num_shards) in &indexer_assignment.num_shards_per_source {
let Entry::Occupied(mut entry) = unassigned_sources.entry(source_id) else {
for (&source_ord, &num_shards) in &indexer_assignment.num_shards_per_source {
let Entry::Occupied(mut entry) = unassigned_sources.entry(source_ord) else {
panic!("The solution contains more shards than the actual problem.");
};
entry.get_mut().num_shards -= num_shards;
Expand Down Expand Up @@ -321,8 +320,8 @@ mod tests {
let mut problem = SchedulingProblem::with_indexer_maximum_load(vec![4_000, 5_000]);
problem.add_source(1, NonZeroU32::new(1_000u32).unwrap());
let mut solution = problem.new_solution();
solution.indexer_assignments[0].add_shard(0, 3);
solution.indexer_assignments[1].add_shard(0, 3);
solution.indexer_assignments[0].add_shards(0, 3);
solution.indexer_assignments[1].add_shards(0, 3);
remove_extraneous_shards(&problem, &mut solution);
assert_eq!(solution.indexer_assignments[0].num_shards(0), 0);
assert_eq!(solution.indexer_assignments[1].num_shards(0), 1);
Expand All @@ -333,8 +332,8 @@ mod tests {
let mut problem = SchedulingProblem::with_indexer_maximum_load(vec![5_000, 4_000]);
problem.add_source(2, NonZeroU32::new(1_000).unwrap());
let mut solution = problem.new_solution();
solution.indexer_assignments[0].add_shard(0, 3);
solution.indexer_assignments[1].add_shard(0, 3);
solution.indexer_assignments[0].add_shards(0, 3);
solution.indexer_assignments[1].add_shards(0, 3);
remove_extraneous_shards(&problem, &mut solution);
assert_eq!(solution.indexer_assignments[0].num_shards(0), 2);
assert_eq!(solution.indexer_assignments[1].num_shards(0), 0);
Expand All @@ -348,29 +347,16 @@ mod tests {
// Source 1
problem.add_source(2, NonZeroU32::new(1_000).unwrap());
let mut solution = problem.new_solution();
solution.indexer_assignments[0].add_shard(0, 1);
solution.indexer_assignments[0].add_shard(1, 1);
solution.indexer_assignments[1].add_shard(1, 2);
solution.indexer_assignments[0].add_shards(0, 1);
solution.indexer_assignments[0].add_shards(1, 1);
solution.indexer_assignments[1].add_shards(1, 2);
remove_extraneous_shards(&problem, &mut solution);
assert_eq!(solution.indexer_assignments[0].num_shards(0), 0);
assert_eq!(solution.indexer_assignments[0].num_shards(1), 1);
assert_eq!(solution.indexer_assignments[1].num_shards(0), 0);
assert_eq!(solution.indexer_assignments[1].num_shards(1), 1);
}

#[test]
fn test_truncate_sources() {
let mut problem = SchedulingProblem::with_indexer_maximum_load(vec![5_000, 4_000]);
// Source 0
problem.add_source(1, NonZeroU32::new(1_000).unwrap());
let mut solution = problem.new_solution();
solution.indexer_assignments[0].add_shard(0, 1);
solution.indexer_assignments[0].add_shard(1, 1);
remove_extraneous_shards(&problem, &mut solution);
assert_eq!(solution.indexer_assignments[0].num_shards(0), 1);
assert_eq!(solution.indexer_assignments[0].num_shards(1), 0);
}

#[test]
fn test_enforce_nodes_max_load() {
let mut problem =
Expand All @@ -382,25 +368,25 @@ mod tests {
let mut solution = problem.new_solution();

// node 0 does not exceed its capacity
solution.indexer_assignments[0].add_shard(0, 1);
solution.indexer_assignments[0].add_shards(0, 1);

// node 1 exceed its capacity with a single source
solution.indexer_assignments[1].add_shard(0, 2);
solution.indexer_assignments[1].add_shards(0, 2);

// node 2 is precisely at capacity
solution.indexer_assignments[2].add_shard(0, 1);
solution.indexer_assignments[2].add_shard(1, 1);
solution.indexer_assignments[2].add_shards(0, 1);
solution.indexer_assignments[2].add_shards(1, 1);

// node 3 is exceeding its capacity due with several sources
// We choose to remove sources entirely (as opposed to removing only shards that do not fit)
solution.indexer_assignments[3].add_shard(0, 1);
solution.indexer_assignments[3].add_shard(2, 2);
solution.indexer_assignments[3].add_shards(0, 1);
solution.indexer_assignments[3].add_shards(2, 2);

// node 3 is exceeding its capacity due with several sources
// We choose to remove sources entirely (as opposed to removing only shards that do not fit)
solution.indexer_assignments[4].add_shard(0, 1);
solution.indexer_assignments[4].add_shard(1, 1);
solution.indexer_assignments[4].add_shard(2, 2);
solution.indexer_assignments[4].add_shards(0, 1);
solution.indexer_assignments[4].add_shards(1, 1);
solution.indexer_assignments[4].add_shards(2, 2);

enforce_nodes_max_load(&problem, &mut solution);

Expand Down Expand Up @@ -436,7 +422,7 @@ mod tests {
assert_eq!(
unassigned_shards[0],
Source {
source_id: 0,
source_ord: 0,
load_per_shard: NonZeroU32::new(1_000).unwrap(),
num_shards: 4
}
Expand All @@ -450,23 +436,23 @@ mod tests {
problem.add_source(15, NonZeroU32::new(2_000).unwrap());
let mut solution = problem.new_solution();

solution.indexer_assignments[0].add_shard(0, 1);
solution.indexer_assignments[0].add_shard(1, 3);
solution.indexer_assignments[1].add_shard(0, 2);
solution.indexer_assignments[1].add_shard(1, 3);
solution.indexer_assignments[0].add_shards(0, 1);
solution.indexer_assignments[0].add_shards(1, 3);
solution.indexer_assignments[1].add_shards(0, 2);
solution.indexer_assignments[1].add_shards(1, 3);
let unassigned_shards = compute_unassigned_sources(&problem, &solution);
assert_eq!(
unassigned_shards[0],
Source {
source_id: 0,
source_ord: 0,
load_per_shard: NonZeroU32::new(1_000).unwrap(),
num_shards: 5 - (1 + 2)
}
);
assert_eq!(
unassigned_shards[1],
Source {
source_id: 1,
source_ord: 1,
load_per_shard: NonZeroU32::new(2_000).unwrap(),
num_shards: 15 - (3 + 3)
}
Expand All @@ -489,10 +475,10 @@ mod tests {
problem.add_source(5, NonZeroU32::new(1_000).unwrap());
problem.add_source(15, NonZeroU32::new(2_000).unwrap());
let mut solution = problem.new_solution();
solution.indexer_assignments[0].add_shard(0, 1);
solution.indexer_assignments[0].add_shard(1, 3);
solution.indexer_assignments[1].add_shard(0, 2);
solution.indexer_assignments[1].add_shard(1, 3);
solution.indexer_assignments[0].add_shards(0, 1);
solution.indexer_assignments[0].add_shards(1, 3);
solution.indexer_assignments[1].add_shards(0, 2);
solution.indexer_assignments[1].add_shards(1, 3);
let unassigned_shards = compute_unassigned_sources(&problem, &solution);
assert_eq!(solution.indexer_assignments[0].num_shards(0), 1);
assert_eq!(solution.indexer_assignments[0].num_shards(1), 3);
Expand All @@ -501,15 +487,15 @@ mod tests {
assert_eq!(
unassigned_shards[0],
Source {
source_id: 0,
source_ord: 0,
load_per_shard: NonZeroU32::new(1_000).unwrap(),
num_shards: 5 - (1 + 2)
}
);
assert_eq!(
unassigned_shards[1],
Source {
source_id: 1,
source_ord: 1,
load_per_shard: NonZeroU32::new(2_000).unwrap(),
num_shards: 15 - (3 + 3)
}
Expand Down Expand Up @@ -593,10 +579,10 @@ mod tests {
move |indexer_assignments: Vec<Vec<u32>>| {
let mut solution = SchedulingSolution::with_num_indexers(num_nodes);
for (node_id, indexer_assignment) in indexer_assignments.iter().enumerate() {
for (source_id, num_shards) in indexer_assignment.iter().copied().enumerate() {
for (source_ord, num_shards) in indexer_assignment.iter().copied().enumerate() {
if num_shards > 0 {
solution.indexer_assignments[node_id]
.add_shard(source_id as u32, num_shards);
.add_shards(source_ord as u32, num_shards);
}
}
}
Expand Down
Loading

0 comments on commit 40efe38

Please sign in to comment.