Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement(socket sink): support unix datagram mode #21762

Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
The `socket` sink now supports the `unix_mode` configuration option that specifies the Unix socket mode to use. Valid values:
- `Stream` (default) - Stream-oriented (`SOCK_STREAM`)
- `Datagram` - Datagram-oriented (`SOCK_DGRAM`)

This option only applies when `mode = "unix"`.

authors: jpovixwm
74 changes: 60 additions & 14 deletions src/sinks/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,12 @@ impl SinkConfig for SocketSinkConfig {

#[cfg(test)]
mod test {
#[cfg(unix)]
use std::path::PathBuf;
use std::{
future::ready,
net::{SocketAddr, UdpSocket},
};
#[cfg(unix)]
use std::{os::unix::net::UnixDatagram, path::PathBuf};

use futures::stream::StreamExt;
use futures_util::stream;
Expand Down Expand Up @@ -196,14 +196,42 @@ mod test {
crate::test_util::test_generate_config::<SocketSinkConfig>();
}

async fn test_udp(addr: SocketAddr) {
let receiver = UdpSocket::bind(addr).unwrap();
enum DatagramSocket {
Udp(UdpSocket),
#[cfg(unix)]
Unix(UnixDatagram),
}

enum DatagramSocketAddr {
Udp(SocketAddr),
#[cfg(unix)]
Unix(PathBuf),
}

async fn test_datagram(datagram_addr: DatagramSocketAddr) {
let receiver = match &datagram_addr {
DatagramSocketAddr::Udp(addr) => DatagramSocket::Udp(UdpSocket::bind(addr).unwrap()),
#[cfg(unix)]
DatagramSocketAddr::Unix(path) => {
DatagramSocket::Unix(UnixDatagram::bind(path).unwrap())
}
};

let config = SocketSinkConfig {
mode: Mode::Udp(UdpMode {
config: UdpSinkConfig::from_address(addr.to_string()),
encoding: JsonSerializerConfig::default().into(),
}),
mode: match &datagram_addr {
DatagramSocketAddr::Udp(addr) => Mode::Udp(UdpMode {
config: UdpSinkConfig::from_address(addr.to_string()),
encoding: JsonSerializerConfig::default().into(),
}),
#[cfg(unix)]
DatagramSocketAddr::Unix(path) => Mode::Unix(UnixMode {
config: UnixSinkConfig::new(
path.to_path_buf(),
crate::sinks::util::service::net::UnixMode::Datagram,
),
encoding: (None::<FramingConfig>, JsonSerializerConfig::default()).into(),
}),
},
acknowledgements: Default::default(),
};

Expand All @@ -218,9 +246,13 @@ mod test {
.expect("Running sink failed");

let mut buf = [0; 256];
let (size, _src_addr) = receiver
.recv_from(&mut buf)
.expect("Did not receive message");
let size = match &receiver {
DatagramSocket::Udp(sock) => {
sock.recv_from(&mut buf).expect("Did not receive message").0
}
#[cfg(unix)]
DatagramSocket::Unix(sock) => sock.recv(&mut buf).expect("Did not receive message"),
};

let packet = String::from_utf8(buf[..size].to_vec()).expect("Invalid data received");
let data = serde_json::from_str::<Value>(&packet).expect("Invalid JSON received");
Expand All @@ -234,14 +266,25 @@ mod test {
async fn udp_ipv4() {
trace_init();

test_udp(next_addr()).await;
test_datagram(DatagramSocketAddr::Udp(next_addr())).await;
}

#[tokio::test]
async fn udp_ipv6() {
trace_init();

test_udp(next_addr_v6()).await;
test_datagram(DatagramSocketAddr::Udp(next_addr_v6())).await;
}

#[cfg(unix)]
#[tokio::test]
async fn unix_datagram() {
trace_init();

test_datagram(DatagramSocketAddr::Unix(temp_uds_path(
"unix_datagram_socket_test",
)))
.await;
}

#[tokio::test]
Expand Down Expand Up @@ -292,7 +335,10 @@ mod test {

let config = SocketSinkConfig {
mode: Mode::Unix(UnixMode {
config: UnixSinkConfig::new(out_path),
config: UnixSinkConfig::new(
out_path,
crate::sinks::util::service::net::UnixMode::Stream,
),
encoding: (None::<FramingConfig>, NativeJsonSerializerConfig).into(),
}),
acknowledgements: Default::default(),
Expand Down
Loading
Loading