Skip to content

Commit

Permalink
Address various scheduler timing issues (#1069)
Browse files Browse the repository at this point in the history
Error seen and addressed:

- Executor stuck in an infinite loop when a scheduler loop processes 2 state changes.
- Reducer function returning more than the expected single output when it a reducer finishes before the next parent output finishes.
- Reducer function returning more than the expected single output when the ingest file for it happens right before a scheduler run loop.


Also adding tracing around the scheduler.
  • Loading branch information
seriousben authored Dec 2, 2024
1 parent 9f19bc5 commit deee6c0
Show file tree
Hide file tree
Showing 10 changed files with 1,709 additions and 194 deletions.
1 change: 1 addition & 0 deletions server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions server/data_model/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ indexify_utils = { workspace = true }
rand = {workspace=true}
uuid = {workspace=true}
sha2 = {workspace=true}
strum = {workspace=true}
216 changes: 214 additions & 2 deletions server/data_model/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use filter::LabelsFilter;
use indexify_utils::{default_creation_time, get_epoch_time_in_ms};
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use strum::AsRefStr;

// Invoke graph for all existing payloads
#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -118,7 +119,7 @@ impl ImageInformation {
}
}

#[derive(Debug, Clone, Serialize, Deserialize, Builder, PartialEq, Eq)]
#[derive(Default, Debug, Clone, Serialize, Deserialize, Builder, PartialEq, Eq)]
pub struct DynamicEdgeRouter {
pub name: String,
pub description: String,
Expand Down Expand Up @@ -391,6 +392,25 @@ impl ComputeGraph {

self
}

pub fn get_compute_parent_nodes(&self, node_name: &str) -> Vec<&str> {
// Find parent of the node
self.edges
.iter()
.filter_map(|(parent, successors)| {
successors
.contains(&node_name.to_string())
.then(|| parent.as_str())
})
// Filter for compute node parent, traversing through routers
.map(|parent_name| match self.nodes.get(parent_name) {
Some(Node::Compute(_)) => vec![parent_name],
Some(Node::Router(_)) => self.get_compute_parent_nodes(parent_name),
None => vec![],
})
.flatten()
.collect()
}
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
Expand Down Expand Up @@ -596,6 +616,10 @@ impl GraphInvocationCtx {
pub fn key_from(ns: &str, cg: &str, id: &str) -> String {
format!("{}|{}|{}", ns, cg, id)
}

pub fn get_task_analytics(&self, compute_fn: &str) -> Option<&TaskAnalytics> {
self.fn_task_analytics.get(compute_fn)
}
}

impl GraphInvocationCtxBuilder {
Expand Down Expand Up @@ -879,7 +903,7 @@ impl fmt::Display for TaskFinishedEvent {
}
}

#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq)]
#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq, AsRefStr)]
pub enum ChangeType {
InvokeComputeGraph(InvokeComputeGraphEvent),
TaskFinished(TaskFinishedEvent),
Expand Down Expand Up @@ -958,6 +982,7 @@ mod tests {
ComputeFn,
ComputeGraph,
ComputeGraphCode,
DynamicEdgeRouter,
ExecutorMetadata,
GraphVersion,
ImageInformation,
Expand Down Expand Up @@ -1097,4 +1122,191 @@ mod tests {

assert_eq!(graph.created_at, 5, "created_at should not change");
}

// Check function pattern
fn check_compute_parent<F>(node: &str, expected_parents: Vec<&str>, configure_graph: F)
where
F: FnOnce(&mut ComputeGraph),
{
fn create_test_graph() -> ComputeGraph {
let fn_a = test_compute_fn("fn_a", Some("some_hash_fn_a".to_string()));
ComputeGraph {
namespace: String::new(),
name: String::new(),
description: String::new(),
version: GraphVersion::default(),
code: ComputeGraphCode {
path: String::new(),
size: 0,
sha256_hash: String::new(),
},
created_at: 0,
start_fn: Node::Compute(fn_a),
nodes: HashMap::new(),
edges: HashMap::new(),
runtime_information: RuntimeInformation {
major_version: 0,
minor_version: 0,
},
replaying: false,
}
}

let mut graph = create_test_graph();
configure_graph(&mut graph);

assert_eq!(
graph.get_compute_parent_nodes(node).sort(),
expected_parents.clone().sort(),
"Failed for node: {}",
node
);
}

#[test]
fn test_get_compute_parent_scenarios() {
check_compute_parent("compute2", vec!["compute1"], |graph| {
graph.edges = HashMap::from([("compute1".to_string(), vec!["compute2".to_string()])]);
graph.nodes = HashMap::from([
(
"compute1".to_string(),
Node::Compute(test_compute_fn("compute1", Some("image_hash".to_string()))),
),
(
"compute2".to_string(),
Node::Compute(test_compute_fn("compute2", Some("image_hash".to_string()))),
),
]);
});
check_compute_parent("router2", vec!["compute4"], |graph| {
graph.edges = HashMap::from([("compute4".to_string(), vec!["router2".to_string()])]);
graph.nodes = HashMap::from([
(
"compute4".to_string(),
Node::Compute(test_compute_fn("compute4", Some("image_hash".to_string()))),
),
(
"router2".to_string(),
Node::Router(DynamicEdgeRouter {
name: "router2".to_string(),
..Default::default()
}),
),
]);
});
check_compute_parent("nonexistent", vec![], |_| {});

// More complex routing scenarios
check_compute_parent("compute2", vec!["compute1"], |graph| {
graph.edges = HashMap::from([
("compute1".to_string(), vec!["router1".to_string()]),
("router1".to_string(), vec!["compute2".to_string()]),
]);
graph.nodes = HashMap::from([
(
"compute1".to_string(),
Node::Compute(test_compute_fn("compute1", Some("image_hash".to_string()))),
),
(
"router1".to_string(),
Node::Router(DynamicEdgeRouter {
name: "router1".to_string(),
..Default::default()
}),
),
(
"compute2".to_string(),
Node::Compute(test_compute_fn("compute2", Some("image_hash".to_string()))),
),
]);
});

check_compute_parent("compute2", vec!["compute3"], |graph| {
graph.edges = HashMap::from([
("compute3".to_string(), vec!["router1".to_string()]),
("router1".to_string(), vec!["compute2".to_string()]),
]);
graph.nodes = HashMap::from([
(
"compute3".to_string(),
Node::Compute(test_compute_fn("compute3", Some("image_hash".to_string()))),
),
(
"router1".to_string(),
Node::Router(DynamicEdgeRouter {
name: "router1".to_string(),
..Default::default()
}),
),
(
"compute2".to_string(),
Node::Compute(test_compute_fn("compute2", Some("image_hash".to_string()))),
),
]);
});

check_compute_parent("compute2", vec!["compute3"], |graph| {
graph.edges = HashMap::from([
("compute3".to_string(), vec!["router1".to_string()]),
("router1".to_string(), vec!["compute2".to_string()]),
]);
graph.nodes = HashMap::from([
(
"compute3".to_string(),
Node::Compute(test_compute_fn("compute3", Some("image_hash".to_string()))),
),
(
"router1".to_string(),
Node::Router(DynamicEdgeRouter {
name: "router1".to_string(),
..Default::default()
}),
),
(
"compute2".to_string(),
Node::Compute(test_compute_fn("compute2", Some("image_hash".to_string()))),
),
]);
});

// test multiple parents
check_compute_parent(
"compute5",
vec!["compute1", "compute2", "compute3", "compute4"],
|graph| {
graph.edges = HashMap::from([
("compute1".to_string(), vec!["compute5".to_string()]),
("compute2".to_string(), vec!["compute5".to_string()]),
("compute3".to_string(), vec!["compute5".to_string()]),
("compute4".to_string(), vec!["compute5".to_string()]),
]);
graph.nodes = HashMap::from([
(
"compute1".to_string(),
Node::Compute(test_compute_fn("compute1", Some("image_hash".to_string()))),
),
(
"compute2".to_string(),
Node::Compute(test_compute_fn("compute1", Some("image_hash".to_string()))),
),
(
"compute3".to_string(),
Node::Compute(test_compute_fn("compute1", Some("image_hash".to_string()))),
),
(
"compute4".to_string(),
Node::Compute(test_compute_fn("compute1", Some("image_hash".to_string()))),
),
(
"compute5".to_string(),
Node::Compute(test_compute_fn("compute1", Some("image_hash".to_string()))),
),
(
"compute6".to_string(),
Node::Compute(test_compute_fn("compute1", Some("image_hash".to_string()))),
),
]);
},
);
}
}
6 changes: 3 additions & 3 deletions server/data_model/src/test_objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ pub mod tests {
}
}

fn reducer_fn(name: &str, reduce: bool) -> ComputeFn {
pub fn reducer_fn(name: &str) -> ComputeFn {
let mut compute_fn = test_compute_fn(name, None);
compute_fn.reducer = reduce;
compute_fn.reducer = true;
compute_fn
}

Expand Down Expand Up @@ -246,7 +246,7 @@ pub mod tests {

pub fn mock_graph_with_reducer() -> ComputeGraph {
let fn_a = test_compute_fn("fn_a", None);
let fn_b = reducer_fn("fn_b", true);
let fn_b = reducer_fn("fn_b");
let fn_c = test_compute_fn("fn_c", None);
ComputeGraph {
namespace: TEST_NAMESPACE.to_string(),
Expand Down
1 change: 1 addition & 0 deletions server/src/routes/internal_ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ pub async fn ingest_files_from_executor(
.map_err(|e| {
IndexifyAPIError::internal_error(anyhow!("failed to upload content: {}", e))
})?;

Ok(())
}

Expand Down
Loading

0 comments on commit deee6c0

Please sign in to comment.