Skip to content

Commit

Permalink
[Heartbeat] Refactor TCP Monitor (elastic#17549) (elastic#17849)
Browse files Browse the repository at this point in the history
Refactors the TCP monitor to make the code easier to follow, more testable, and fixes elastic#17123 where TLS server name was not correctly sent. This is important because the code had accrued a lot of cruft and become very hard to follow. There were many wrappers and intermediate variable names that often subtly changed names as they crossed various functions. When debugging elastic#17123 I frequently found myself lost tracing the execution.

This new code should be simpler to understand for a few reasons:

    Less code (almost a 2x reduction) and fewer, simpler, better organized functions
    Less variable passing/renaming due to use of struct for key config variables
    More consistent and descriptive variable names
    Creation of the dialer as late as possible, to remove the confusing partial states, and clarity as to when which dialer layers are used.
    Adds (frustratingly tricky) integration tests for elastic#17123 using mismatched TLS certs, and also against a real SOCKS5 proxy
    Adds, for testing only, the ability to override the real network resolver for TCP checks which is invaluable in debugging TLS checks that depend on setting hostnames correctly. In the future if we decide to let users use a custom DNS resolver this will be nice.
    Reorganized giant TCP test file into multiple files

(cherry picked from commit dfe8c4b)
  • Loading branch information
andrewvc authored Apr 21, 2020
1 parent db4f563 commit b54f503
Show file tree
Hide file tree
Showing 19 changed files with 996 additions and 579 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix recording of SSL cert metadata for Expired/Unvalidated x509 certs. {pull}13687[13687]
- Fixed excessive memory usage introduced in 7.5 due to over-allocating memory for HTTP checks. {pull}15639[15639]
- Fixed scheduler shutdown issues which would in rare situations cause a panic due to semaphore misuse. {pull}16397[16397]
- Fixed TCP TLS checks to properly validate hostnames, this broke in 7.x and only worked for IP SANs. {pull}17549[17549]

*Journalbeat*

Expand Down
16 changes: 15 additions & 1 deletion heartbeat/hbtest/hbtestutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,19 @@ func SummaryChecks(up int, down int) validator.Validator {
})
}

// ResolveChecks returns a lookslike matcher for the 'resolve' fields.
func ResolveChecks(ip string) validator.Validator {
return lookslike.MustCompile(map[string]interface{}{
"resolve": map[string]interface{}{
"ip": ip,
"rtt.us": isdef.IsDuration,
},
})
}

// SimpleURLChecks returns a check for a simple URL
// with only a scheme, host, and port
func SimpleURLChecks(t *testing.T, scheme string, host string, port uint16) validator.Validator {

hostPort := host
if port != 0 {
hostPort = fmt.Sprintf("%s:%d", host, port)
Expand All @@ -165,6 +174,11 @@ func SimpleURLChecks(t *testing.T, scheme string, host string, port uint16) vali
u, err := url.Parse(fmt.Sprintf("%s://%s", scheme, hostPort))
require.NoError(t, err)

return URLChecks(t, u)
}

// URLChecks returns a validator for the given URL's fields
func URLChecks(t *testing.T, u *url.URL) validator.Validator {
return lookslike.MustCompile(map[string]interface{}{
"url": wrappers.URLFields(u),
})
Expand Down
198 changes: 0 additions & 198 deletions heartbeat/monitors/active/dialchain/builder.go

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import (
// }
// }
func TCPDialer(to time.Duration) NetDialer {
return netDialer(to)
return CreateNetDialer(to)
}

// UDPDialer creates a new NetDialer with constant event fields and default
Expand All @@ -62,10 +62,11 @@ func TCPDialer(to time.Duration) NetDialer {
// }
// }
func UDPDialer(to time.Duration) NetDialer {
return netDialer(to)
return CreateNetDialer(to)
}

func netDialer(timeout time.Duration) NetDialer {
// CreateNetDialer returns a NetDialer with the given timeout.
func CreateNetDialer(timeout time.Duration) NetDialer {
return func(event *beat.Event) (transport.Dialer, error) {
return makeDialer(func(network, address string) (net.Conn, error) {
namespace := ""
Expand Down
9 changes: 0 additions & 9 deletions heartbeat/monitors/active/dialchain/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,6 @@ type timer struct {
s, e time.Time
}

// IDLayer creates an empty placeholder layer.
func IDLayer() Layer {
return _idLayer
}

var _idLayer = Layer(func(event *beat.Event, next transport.Dialer) (transport.Dialer, error) {
return next, nil
})

// ConstAddrLayer introduces a network layer always passing a constant address
// to the underlying layer.
func ConstAddrLayer(address string) Layer {
Expand Down
4 changes: 1 addition & 3 deletions heartbeat/monitors/active/http/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,8 @@ func newHTTPMonitorIPsJob(
return nil, err
}

settings := monitors.MakeHostJobSettings(hostname, config.Mode)

pingFactory := createPingFactory(config, port, tls, req, body, validator)
job, err := monitors.MakeByHostJob(settings, pingFactory)
job, err := monitors.MakeByHostJob(hostname, config.Mode, monitors.NewStdResolver(), pingFactory)

return job, err
}
Expand Down
3 changes: 1 addition & 2 deletions heartbeat/monitors/active/icmp/icmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ func create(
pingFactory := monitors.MakePingIPFactory(createPingIPFactory(&config))

for _, host := range config.Hosts {
settings := monitors.MakeHostJobSettings(host, config.Mode)
job, err := monitors.MakeByHostJob(settings, pingFactory)
job, err := monitors.MakeByHostJob(host, config.Mode, monitors.NewStdResolver(), pingFactory)

if err != nil {
return nil, 0, err
Expand Down
12 changes: 7 additions & 5 deletions heartbeat/monitors/active/tcp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"
)

type Config struct {
type config struct {
// check all ports if host does not contain port
Hosts []string `config:"hosts" validate:"required"`
Ports []uint16 `config:"ports"`
Expand All @@ -45,12 +45,14 @@ type Config struct {
ReceiveString string `config:"check.receive"`
}

var DefaultConfig = Config{
Timeout: 16 * time.Second,
Mode: monitors.DefaultIPSettings,
func defaultConfig() config {
return config{
Timeout: 16 * time.Second,
Mode: monitors.DefaultIPSettings,
}
}

func (c *Config) Validate() error {
func (c *config) Validate() error {
if c.Socks5.URL != "" {
if c.Mode.Mode != monitors.PingAny && !c.Socks5.LocalResolve {
return errors.New("ping all ips only supported if proxy_use_local_resolver is enabled`")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,20 @@ import (
"net"
)

type ConnCheck func(net.Conn) error
// dataCheck executes over an open TCP connection using the send / receive
// parameters the user has defined.
type dataCheck func(net.Conn) error

var (
errNoDataReceived = errors.New("no data")
errRecvMismatch = errors.New("received string mismatch")
)

func (c ConnCheck) Validate(conn net.Conn) error {
func (c dataCheck) Check(conn net.Conn) error {
return c(conn)
}

func makeValidateConn(config *Config) ConnCheck {
func makeDataCheck(config *config) dataCheck {
send := config.SendString
recv := config.ReceiveString

Expand All @@ -52,7 +54,7 @@ func makeValidateConn(config *Config) ConnCheck {

func checkOk(_ net.Conn) error { return nil }

func checkAll(checks ...ConnCheck) ConnCheck {
func checkAll(checks ...dataCheck) dataCheck {
return func(conn net.Conn) error {
for _, check := range checks {
if err := check(conn); err != nil {
Expand All @@ -63,13 +65,13 @@ func checkAll(checks ...ConnCheck) ConnCheck {
}
}

func checkSend(buf []byte) ConnCheck {
func checkSend(buf []byte) dataCheck {
return func(conn net.Conn) error {
return sendBuffer(conn, buf)
}
}

func checkRecv(expected []byte) ConnCheck {
func checkRecv(expected []byte) dataCheck {
return func(conn net.Conn) error {
buf := make([]byte, len(expected))
if err := recvBuffer(conn, buf); err != nil {
Expand Down
Loading

0 comments on commit b54f503

Please sign in to comment.