Skip to content

Commit

Permalink
chore(socket sink): Revert addition of unix datagram mode (#22113)
Browse files Browse the repository at this point in the history
Revert "enhancement(socket sink): support unix datagram mode (#21762)"

This reverts commit 7c6d0c9.

Co-authored-by: Jesse Szwedko <[email protected]>
  • Loading branch information
pront and jszwedko authored Jan 2, 2025
1 parent ca084cc commit 2ed4cd9
Show file tree
Hide file tree
Showing 10 changed files with 135 additions and 426 deletions.

This file was deleted.

2 changes: 2 additions & 0 deletions src/internal_events/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,14 @@ impl<E: std::fmt::Display> InternalEvent for UnixSocketError<'_, E> {
}
}

#[cfg(all(unix, any(feature = "sinks-socket", feature = "sinks-statsd")))]
#[derive(Debug)]
pub struct UnixSocketSendError<'a, E> {
pub(crate) error: &'a E,
pub path: &'a std::path::Path,
}

#[cfg(all(unix, any(feature = "sinks-socket", feature = "sinks-statsd")))]
impl<E: std::fmt::Display> InternalEvent for UnixSocketSendError<'_, E> {
fn emit(self) {
let reason = "Unix socket send error.";
Expand Down
74 changes: 14 additions & 60 deletions src/sinks/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,12 @@ impl SinkConfig for SocketSinkConfig {

#[cfg(test)]
mod test {
#[cfg(unix)]
use std::path::PathBuf;
use std::{
future::ready,
net::{SocketAddr, UdpSocket},
};
#[cfg(unix)]
use std::{os::unix::net::UnixDatagram, path::PathBuf};

use futures::stream::StreamExt;
use futures_util::stream;
Expand Down Expand Up @@ -196,42 +196,14 @@ mod test {
crate::test_util::test_generate_config::<SocketSinkConfig>();
}

enum DatagramSocket {
Udp(UdpSocket),
#[cfg(unix)]
Unix(UnixDatagram),
}

enum DatagramSocketAddr {
Udp(SocketAddr),
#[cfg(unix)]
Unix(PathBuf),
}

async fn test_datagram(datagram_addr: DatagramSocketAddr) {
let receiver = match &datagram_addr {
DatagramSocketAddr::Udp(addr) => DatagramSocket::Udp(UdpSocket::bind(addr).unwrap()),
#[cfg(unix)]
DatagramSocketAddr::Unix(path) => {
DatagramSocket::Unix(UnixDatagram::bind(path).unwrap())
}
};
async fn test_udp(addr: SocketAddr) {
let receiver = UdpSocket::bind(addr).unwrap();

let config = SocketSinkConfig {
mode: match &datagram_addr {
DatagramSocketAddr::Udp(addr) => Mode::Udp(UdpMode {
config: UdpSinkConfig::from_address(addr.to_string()),
encoding: JsonSerializerConfig::default().into(),
}),
#[cfg(unix)]
DatagramSocketAddr::Unix(path) => Mode::Unix(UnixMode {
config: UnixSinkConfig::new(
path.to_path_buf(),
crate::sinks::util::service::net::UnixMode::Datagram,
),
encoding: (None::<FramingConfig>, JsonSerializerConfig::default()).into(),
}),
},
mode: Mode::Udp(UdpMode {
config: UdpSinkConfig::from_address(addr.to_string()),
encoding: JsonSerializerConfig::default().into(),
}),
acknowledgements: Default::default(),
};

Expand All @@ -246,13 +218,9 @@ mod test {
.expect("Running sink failed");

let mut buf = [0; 256];
let size = match &receiver {
DatagramSocket::Udp(sock) => {
sock.recv_from(&mut buf).expect("Did not receive message").0
}
#[cfg(unix)]
DatagramSocket::Unix(sock) => sock.recv(&mut buf).expect("Did not receive message"),
};
let (size, _src_addr) = receiver
.recv_from(&mut buf)
.expect("Did not receive message");

let packet = String::from_utf8(buf[..size].to_vec()).expect("Invalid data received");
let data = serde_json::from_str::<Value>(&packet).expect("Invalid JSON received");
Expand All @@ -266,25 +234,14 @@ mod test {
async fn udp_ipv4() {
trace_init();

test_datagram(DatagramSocketAddr::Udp(next_addr())).await;
test_udp(next_addr()).await;
}

#[tokio::test]
async fn udp_ipv6() {
trace_init();

test_datagram(DatagramSocketAddr::Udp(next_addr_v6())).await;
}

#[cfg(unix)]
#[tokio::test]
async fn unix_datagram() {
trace_init();

test_datagram(DatagramSocketAddr::Unix(temp_uds_path(
"unix_datagram_socket_test",
)))
.await;
test_udp(next_addr_v6()).await;
}

#[tokio::test]
Expand Down Expand Up @@ -335,10 +292,7 @@ mod test {

let config = SocketSinkConfig {
mode: Mode::Unix(UnixMode {
config: UnixSinkConfig::new(
out_path,
crate::sinks::util::service::net::UnixMode::Stream,
),
config: UnixSinkConfig::new(out_path),
encoding: (None::<FramingConfig>, NativeJsonSerializerConfig).into(),
}),
acknowledgements: Default::default(),
Expand Down
106 changes: 0 additions & 106 deletions src/sinks/util/datagram.rs

This file was deleted.

3 changes: 1 addition & 2 deletions src/sinks/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ pub mod batch;
pub mod buffer;
pub mod builder;
pub mod compressor;
pub mod datagram;
pub mod encoding;
pub mod http;
pub mod metadata;
Expand All @@ -24,7 +23,7 @@ pub mod tcp;
#[cfg(any(test, feature = "test-utils"))]
pub mod test;
pub mod udp;
#[cfg(unix)]
#[cfg(all(any(feature = "sinks-socket", feature = "sinks-statsd"), unix))]
pub mod unix;
pub mod uri;
pub mod zstd;
Expand Down
4 changes: 2 additions & 2 deletions src/sinks/util/service/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::{
};

#[cfg(unix)]
use {crate::sinks::util::unix::UnixEither, std::path::PathBuf};
use std::path::PathBuf;

use crate::{
internal_events::{
Expand All @@ -33,7 +33,7 @@ pub use self::unix::{UnixConnectorConfig, UnixMode};
use self::tcp::TcpConnector;
use self::udp::UdpConnector;
#[cfg(unix)]
use self::unix::UnixConnector;
use self::unix::{UnixConnector, UnixEither};

use futures_util::{future::BoxFuture, FutureExt};
use snafu::{ResultExt, Snafu};
Expand Down
36 changes: 33 additions & 3 deletions src/sinks/util/service/net/unix.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
use std::path::{Path, PathBuf};
use std::{
io,
os::fd::{AsFd, BorrowedFd},
path::{Path, PathBuf},
};

use snafu::ResultExt;
use tokio::net::{UnixDatagram, UnixStream};
use tokio::{
io::AsyncWriteExt,
net::{UnixDatagram, UnixStream},
};

use vector_lib::configurable::configurable_component;

use crate::{net, sinks::util::unix::UnixEither};
use crate::net;

use super::{net_error::*, ConnectorType, NetError, NetworkConnector};

Expand Down Expand Up @@ -67,6 +74,29 @@ impl UnixConnectorConfig {
}
}

pub(super) enum UnixEither {
Datagram(UnixDatagram),
Stream(UnixStream),
}

impl UnixEither {
pub(super) async fn send(&mut self, buf: &[u8]) -> io::Result<usize> {
match self {
Self::Datagram(datagram) => datagram.send(buf).await,
Self::Stream(stream) => stream.write_all(buf).await.map(|_| buf.len()),
}
}
}

impl AsFd for UnixEither {
fn as_fd(&self) -> BorrowedFd<'_> {
match self {
Self::Datagram(datagram) => datagram.as_fd(),
Self::Stream(stream) => stream.as_fd(),
}
}
}

#[derive(Clone)]
pub(super) struct UnixConnector {
path: PathBuf,
Expand Down
Loading

0 comments on commit 2ed4cd9

Please sign in to comment.