diff --git a/comp/dogstatsd/listeners/uds_common.go b/comp/dogstatsd/listeners/uds_common.go index 62a39421dcc12..ff165228b3bf1 100644 --- a/comp/dogstatsd/listeners/uds_common.go +++ b/comp/dogstatsd/listeners/uds_common.go @@ -204,6 +204,7 @@ func (l *UDSListener) handleConnection(conn netUnixConn, closeFunc CloseFunction l.telemetryStore.tlmUDSConnections.Inc(tlmListenerID, l.transport) defer func() { _ = closeFunc(conn) + packetsBuffer.Flush() packetsBuffer.Close() if telemetryWithFullListenerID { l.clearTelemetry(tlmListenerID) diff --git a/comp/dogstatsd/packets/buffer.go b/comp/dogstatsd/packets/buffer.go index 3a8a232f804db..f0ca9ab1d6659 100644 --- a/comp/dogstatsd/packets/buffer.go +++ b/comp/dogstatsd/packets/buffer.go @@ -70,6 +70,13 @@ func (pb *Buffer) Append(packet *Packet) { } } +// Flush offers a thread-safe method to force a flush of the appended packets +func (pb *Buffer) Flush() { + pb.m.Lock() + pb.flush() + pb.m.Unlock() +} + func (pb *Buffer) flush() { if len(pb.packets) > 0 { t1 := time.Now() diff --git a/comp/dogstatsd/packets/buffer_test.go b/comp/dogstatsd/packets/buffer_test.go index bc90d5c5f46b2..117b26d35ef86 100644 --- a/comp/dogstatsd/packets/buffer_test.go +++ b/comp/dogstatsd/packets/buffer_test.go @@ -24,7 +24,7 @@ func TestBufferTelemetry(t *testing.T) { // We need a high enough duration to avoid the buffer to flush // And cause the program to deadlock on the packetChannel duration := 10 * time.Second - packetChannel := make(chan Packets) + packetChannel := make(chan Packets, 1) buffer := NewBuffer(3, duration, packetChannel, "test_buffer", telemetryStore) defer buffer.Close() @@ -127,3 +127,22 @@ func TestBufferTelemetryFull(t *testing.T) { assert.Equal(t, float64(1), channelSizeMetrics[0].Value()) } + +func TestBufferFlush(t *testing.T) { + telemetryComponent := fxutil.Test[telemetry.Component](t, telemetryimpl.MockModule()) + telemetryStore := NewTelemetryStore(nil, telemetryComponent) + duration := 10 * time.Hour + packetChannel := make(chan Packets, 1) + buffer := NewBuffer(0, duration, packetChannel, "test_buffer", telemetryStore) + packet := &Packet{ + Contents: []byte("test"), + Buffer: []byte("test read"), + Origin: "test origin", + ListenerID: "1", + Source: 0, + } + + buffer.Append(packet) + buffer.Flush() + assert.Equal(t, 1, len(packetChannel)) +} diff --git a/releasenotes/notes/stream-socket-race-fix-255b6e7a10f7fde0.yaml b/releasenotes/notes/stream-socket-race-fix-255b6e7a10f7fde0.yaml new file mode 100644 index 0000000000000..2a0b8bf07e975 --- /dev/null +++ b/releasenotes/notes/stream-socket-race-fix-255b6e7a10f7fde0.yaml @@ -0,0 +1,13 @@ +# Each section from every release note are combined when the +# CHANGELOG.rst is rendered. So the text needs to be worded so that +# it does not depend on any information only available in another +# section. This may mean repeating some details, but each section +# must be readable independently of the other. +# +# Each section note must be formatted as reStructuredText. +--- +fixes: + - | + Fixed race condition in stream UDS clients of Dogstatsd that + allowed for the loss of received data. +