diff --git a/src/components/validation/runner/io.rs b/src/components/validation/runner/io.rs index d9a49943d32d8..1c84093bd743e 100644 --- a/src/components/validation/runner/io.rs +++ b/src/components/validation/runner/io.rs @@ -50,10 +50,9 @@ impl VectorService for EventForwardService { .map(Event::from) .collect(); - self.tx - .send(events) - .await - .expect("event forward rx should not close first"); + if let Err(e) = self.tx.send(events).await { + return Err(Status::internal(format!("Failed to forward events: {}", e))); + } Ok(tonic::Response::new(PushEventsResponse {})) } @@ -70,10 +69,7 @@ impl VectorService for EventForwardService { } } -pub struct InputEdge { - #[allow(dead_code)] - client: VectorClient, -} +pub struct InputEdge {} pub struct OutputEdge { listen_addr: GrpcAddress, @@ -109,6 +105,7 @@ impl InputEdge { if let Err(e) = client.push_events(request).await { error!(error = ?e, "Failed to send input event to controlled input edge."); + continue; // Retry the next event or decide how to handle failure } }