Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick #8278 to 6.x: Allow TCP helper to support delimiters #8302

Merged
merged 2 commits into from
Sep 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ https://github.com/elastic/beats/compare/v6.4.0...6.x[Check the HEAD diff]
- Add fields for memory fragmentation, memory allocator stats, copy on write, master-slave status, and active defragmentation to `info` metricset of Redis module. {pull}7695[7695]
- Increase ignore_above for system.process.cmdline to 2048. {pull}8101[8100]
- Add support to renamed fields planned for redis 5.0. {pull}8167[8167]
- Allow TCP helper to support delimiters and graphite module to accept multiple metrics in a single payload. {pull}8278[8278]


- Add `metrics` metricset to MongoDB module. {pull}7611[7611]
Expand Down
15 changes: 14 additions & 1 deletion metricbeat/helper/server/tcp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,29 @@

package tcp

import "fmt"

type TcpConfig struct {
Host string `config:"host"`
Port int `config:"port"`
ReceiveBufferSize int `config:"receive_buffer_size"`
Delimiter string `config:"delimiter"`
}

func defaultTcpConfig() TcpConfig {
return TcpConfig{
Host: "localhost",
Port: 2003,
ReceiveBufferSize: 1024,
ReceiveBufferSize: 4096,
Delimiter: "\n",
}
}

// Validate ensures that the configured delimiter has only one character
func (t *TcpConfig) Validate() error {
if len(t.Delimiter) != 1 {
return fmt.Errorf("length of delimiter is expected to be 1 but is %v", len(t.Delimiter))
}

return nil
}
51 changes: 38 additions & 13 deletions metricbeat/helper/server/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package tcp

import (
"bufio"
"fmt"
"net"

Expand All @@ -35,6 +36,7 @@ type TcpServer struct {
receiveBufferSize int
done chan struct{}
eventQueue chan server.Event
delimiter byte
}

type TcpEvent struct {
Expand Down Expand Up @@ -67,6 +69,7 @@ func NewTcpServer(base mb.BaseMetricSet) (server.Server, error) {
receiveBufferSize: config.ReceiveBufferSize,
done: make(chan struct{}),
eventQueue: make(chan server.Event),
delimiter: byte(config.Delimiter[0]),
}, nil
}

Expand All @@ -83,7 +86,6 @@ func (g *TcpServer) Start() error {
}

func (g *TcpServer) watchMetrics() {
buffer := make([]byte, g.receiveBufferSize)
for {
select {
case <-g.done:
Expand All @@ -96,22 +98,45 @@ func (g *TcpServer) watchMetrics() {
logp.Err("Unable to accept connection due to error: %v", err)
continue
}
defer func() {
if conn != nil {
conn.Close()
}
}()

length, err := conn.Read(buffer)
go g.handle(conn)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the original PR, this statement is wrapped in an if conn != nil. Is that needed here too?

}
}

func (g *TcpServer) handle(conn net.Conn) {
if conn == nil {
return
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This if block is not in the original PR. Any specific reason for the difference in the backport?

logp.Debug("tcp", "Handling new connection...")

// Close connection when this function ends
defer conn.Close()

// Get a new reader with buffer size as the same as receiveBufferSize
bufReader := bufio.NewReaderSize(conn, g.receiveBufferSize)

for {
// Read tokens delimited by delimiter
bytes, err := bufReader.ReadBytes(g.delimiter)
if err != nil {
logp.Err("Error reading from buffer: %v", err.Error())
continue
logp.Debug("tcp", "unable to read bytes due to error: %v", err)
return
}

// Truncate to max buffer size if too big of a payload
if len(bytes) > g.receiveBufferSize {
bytes = bytes[:g.receiveBufferSize]
}
g.eventQueue <- &TcpEvent{
event: common.MapStr{
server.EventDataKey: buffer[:length],
},

// Drop the delimiter and send the data
if len(bytes) > 0 {
g.eventQueue <- &TcpEvent{
event: common.MapStr{
server.EventDataKey: bytes[:len(bytes)-1],
},
}
}

}
}

Expand Down
3 changes: 2 additions & 1 deletion metricbeat/helper/server/tcp/tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func GetTestTcpServer(host string, port int) (server.Server, error) {
receiveBufferSize: 1024,
done: make(chan struct{}),
eventQueue: make(chan server.Event),
delimiter: '\n',
}, nil
}

Expand All @@ -62,7 +63,7 @@ func TestTcpServer(t *testing.T) {
}

defer svc.Stop()
writeToServer(t, "test1", host, port)
writeToServer(t, "test1\n", host, port)
msg := <-svc.GetEvents()

assert.True(t, msg.GetEvent() != nil)
Expand Down