Skip to content

Commit

Permalink
AMLII-2207 - alter dogstatsd packet buffers to automatically flush wh…
Browse files Browse the repository at this point in the history
…en stream sockets disconnect (DataDog#31768)
  • Loading branch information
ddrthall authored Dec 5, 2024
1 parent 945f15c commit 0db869d
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 1 deletion.
1 change: 1 addition & 0 deletions comp/dogstatsd/listeners/uds_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ func (l *UDSListener) handleConnection(conn netUnixConn, closeFunc CloseFunction
l.telemetryStore.tlmUDSConnections.Inc(tlmListenerID, l.transport)
defer func() {
_ = closeFunc(conn)
packetsBuffer.Flush()
packetsBuffer.Close()
if telemetryWithFullListenerID {
l.clearTelemetry(tlmListenerID)
Expand Down
7 changes: 7 additions & 0 deletions comp/dogstatsd/packets/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@ func (pb *Buffer) Append(packet *Packet) {
}
}

// Flush offers a thread-safe method to force a flush of the appended packets
func (pb *Buffer) Flush() {
pb.m.Lock()
pb.flush()
pb.m.Unlock()
}

func (pb *Buffer) flush() {
if len(pb.packets) > 0 {
t1 := time.Now()
Expand Down
21 changes: 20 additions & 1 deletion comp/dogstatsd/packets/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestBufferTelemetry(t *testing.T) {
// We need a high enough duration to avoid the buffer to flush
// And cause the program to deadlock on the packetChannel
duration := 10 * time.Second
packetChannel := make(chan Packets)
packetChannel := make(chan Packets, 1)
buffer := NewBuffer(3, duration, packetChannel, "test_buffer", telemetryStore)
defer buffer.Close()

Expand Down Expand Up @@ -127,3 +127,22 @@ func TestBufferTelemetryFull(t *testing.T) {

assert.Equal(t, float64(1), channelSizeMetrics[0].Value())
}

func TestBufferFlush(t *testing.T) {
telemetryComponent := fxutil.Test[telemetry.Component](t, telemetryimpl.MockModule())
telemetryStore := NewTelemetryStore(nil, telemetryComponent)
duration := 10 * time.Hour
packetChannel := make(chan Packets, 1)
buffer := NewBuffer(0, duration, packetChannel, "test_buffer", telemetryStore)
packet := &Packet{
Contents: []byte("test"),
Buffer: []byte("test read"),
Origin: "test origin",
ListenerID: "1",
Source: 0,
}

buffer.Append(packet)
buffer.Flush()
assert.Equal(t, 1, len(packetChannel))
}
13 changes: 13 additions & 0 deletions releasenotes/notes/stream-socket-race-fix-255b6e7a10f7fde0.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Each section from every release note are combined when the
# CHANGELOG.rst is rendered. So the text needs to be worded so that
# it does not depend on any information only available in another
# section. This may mean repeating some details, but each section
# must be readable independently of the other.
#
# Each section note must be formatted as reStructuredText.
---
fixes:
- |
Fixed race condition in stream UDS clients of Dogstatsd that
allowed for the loss of received data.

0 comments on commit 0db869d

Please sign in to comment.