Skip to content

Commit

Permalink
Remove enrichment_tables from Graph and use them as sinks
Browse files Browse the repository at this point in the history
  • Loading branch information
esensar committed Dec 16, 2024
1 parent 5c6f7ac commit 5032181
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 26 deletions.
13 changes: 11 additions & 2 deletions src/config/compiler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use super::{
OutputId,
};

use indexmap::IndexSet;
use indexmap::{IndexMap, IndexSet};
use vector_lib::id::Inputs;

pub fn compile(mut builder: ConfigBuilder) -> Result<(Config, Vec<String>), Vec<String>> {
Expand Down Expand Up @@ -52,8 +52,17 @@ pub fn compile(mut builder: ConfigBuilder) -> Result<(Config, Vec<String>), Vec<
graceful_shutdown_duration,
allow_empty: _,
} = builder;
let sinks_and_table_sinks = sinks
.clone()
.into_iter()
.chain(
enrichment_tables
.iter()
.filter_map(|(key, table)| table.as_sink().map(|s| (key.clone(), s))),
)
.collect::<IndexMap<_, _>>();

let graph = match Graph::new(&sources, &transforms, &sinks, &enrichment_tables, schema) {
let graph = match Graph::new(&sources, &transforms, &sinks_and_table_sinks, schema) {
Ok(graph) => graph,
Err(graph_errors) => {
errors.extend(graph_errors);
Expand Down
27 changes: 6 additions & 21 deletions src/config/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use indexmap::{set::IndexSet, IndexMap};
use std::collections::{HashMap, HashSet, VecDeque};

use super::{
schema, ComponentKey, DataType, EnrichmentTableOuter, OutputId, SinkOuter, SourceOuter,
SourceOutput, TransformOuter, TransformOutput,
schema, ComponentKey, DataType, OutputId, SinkOuter, SourceOuter, SourceOutput, TransformOuter,
TransformOutput,
};

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -37,28 +37,24 @@ impl Graph {
sources: &IndexMap<ComponentKey, SourceOuter>,
transforms: &IndexMap<ComponentKey, TransformOuter<String>>,
sinks: &IndexMap<ComponentKey, SinkOuter<String>>,
enrichment_tables: &IndexMap<ComponentKey, EnrichmentTableOuter<String>>,
schema: schema::Options,
) -> Result<Self, Vec<String>> {
Self::new_inner(sources, transforms, sinks, enrichment_tables, false, schema)
Self::new_inner(sources, transforms, sinks, false, schema)
}

pub fn new_unchecked(
sources: &IndexMap<ComponentKey, SourceOuter>,
transforms: &IndexMap<ComponentKey, TransformOuter<String>>,
sinks: &IndexMap<ComponentKey, SinkOuter<String>>,
enrichment_tables: &IndexMap<ComponentKey, EnrichmentTableOuter<String>>,
schema: schema::Options,
) -> Self {
Self::new_inner(sources, transforms, sinks, enrichment_tables, true, schema)
.expect("errors ignored")
Self::new_inner(sources, transforms, sinks, true, schema).expect("errors ignored")
}

fn new_inner(
sources: &IndexMap<ComponentKey, SourceOuter>,
transforms: &IndexMap<ComponentKey, TransformOuter<String>>,
sinks: &IndexMap<ComponentKey, SinkOuter<String>>,
enrichment_tables: &IndexMap<ComponentKey, EnrichmentTableOuter<String>>,
ignore_errors: bool,
schema: schema::Options,
) -> Result<Self, Vec<String>> {
Expand Down Expand Up @@ -89,15 +85,7 @@ impl Graph {
);
}

let table_sinks = enrichment_tables
.iter()
.filter_map(|(key, table)| table.as_sink().map(|s| (key, s)))
.collect::<Vec<_>>();

for (id, config) in sinks
.iter()
.chain(table_sinks.iter().map(|(key, sink)| (*key, sink)))
{
for (id, config) in sinks {
graph.nodes.insert(
id.clone(),
Node::Sink {
Expand All @@ -118,10 +106,7 @@ impl Graph {
}
}

for (id, config) in sinks
.iter()
.chain(table_sinks.iter().map(|(key, sink)| (*key, sink)))
{
for (id, config) in sinks {
for input in config.inputs.iter() {
if let Err(e) = graph.add_input(input, id, &available_inputs) {
errors.push(e);
Expand Down
3 changes: 0 additions & 3 deletions src/config/unit_test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,6 @@ async fn build_unit_test(
&transform_only_config.sources,
&transform_only_config.transforms,
&transform_only_config.sinks,
&transform_only_config.enrichment_tables,
transform_only_config.schema,
);
let test = test.resolve_outputs(&transform_only_graph)?;
Expand All @@ -402,7 +401,6 @@ async fn build_unit_test(
&config_builder.sources,
&config_builder.transforms,
&config_builder.sinks,
&config_builder.enrichment_tables,
config_builder.schema,
);

Expand Down Expand Up @@ -434,7 +432,6 @@ async fn build_unit_test(
&config_builder.sources,
&config_builder.transforms,
&config_builder.sinks,
&config_builder.enrichment_tables,
config_builder.schema,
);
let valid_inputs = graph.input_map()?;
Expand Down

0 comments on commit 5032181

Please sign in to comment.