Skip to content

Commit

Permalink
[Heartbeat] Separate http req per task (#29697)
Browse files Browse the repository at this point in the history
This is an attempt to fix the race reported in #29580 by instantiating a separate http request per HTTP task. The theory being that the HTTP library modifies the headers and that the req object is not safe to share.

This has passed manual testing using mode: all against endpoints with multiple A records.

Tests are not included here due to the tricky nature of testing here, but we will do so in a follow-up

(cherry picked from commit 27f2d00)
  • Loading branch information
andrewvc authored and mergify-bot committed Jan 6, 2022
1 parent d420ccd commit cb3466c
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 19 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

*Heartbeat*

- Fix broken monitors with newer versions of image relying on dup3. {pull}28938[pull]
- Fix race condition in http monitors using `mode:all` that can cause crashes. {pull}29697[pull]

*Metricbeat*

Expand Down
47 changes: 29 additions & 18 deletions heartbeat/monitors/active/http/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ import (
"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"
)

type requestFactory func() (*http.Request, error)

func newHTTPMonitorHostJob(
addr string,
config *Config,
Expand All @@ -54,10 +56,7 @@ func newHTTPMonitorHostJob(
validator multiValidator,
) (jobs.Job, error) {

request, err := buildRequest(addr, config, enc)
if err != nil {
return nil, err
}
var reqFactory requestFactory = func() (*http.Request, error) { return buildRequest(addr, config, enc) }

return jobs.MakeSimpleJob(func(event *beat.Event) error {
var redirects []string
Expand All @@ -67,7 +66,13 @@ func newHTTPMonitorHostJob(
Transport: transport,
Timeout: config.Transport.Timeout,
}
_, _, err := execPing(event, client, request, body, config.Transport.Timeout, validator, config.Response)

req, err := reqFactory()
if err != nil {
return fmt.Errorf("could not make http request: %w", err)
}

_, _, err = execPing(event, client, req, body, config.Transport.Timeout, validator, config.Response)
if len(redirects) > 0 {
event.PutValue("http.response.redirects", redirects)
}
Expand All @@ -84,17 +89,14 @@ func newHTTPMonitorIPsJob(
validator multiValidator,
) (jobs.Job, error) {

req, err := buildRequest(addr, config, enc)
if err != nil {
return nil, err
}
var reqFactory requestFactory = func() (*http.Request, error) { return buildRequest(addr, config, enc) }

hostname, port, err := splitHostnamePort(req)
hostname, port, err := splitHostnamePort(addr)
if err != nil {
return nil, err
}

pingFactory := createPingFactory(config, port, tls, req, body, validator)
pingFactory := createPingFactory(config, port, tls, reqFactory, body, validator)
job, err := monitors.MakeByHostJob(hostname, config.Mode, monitors.NewStdResolver(), pingFactory)

return job, err
Expand All @@ -104,14 +106,19 @@ func createPingFactory(
config *Config,
port uint16,
tls *tlscommon.TLSConfig,
request *http.Request,
reqFactory requestFactory,
body []byte,
validator multiValidator,
) func(*net.IPAddr) jobs.Job {
timeout := config.Transport.Timeout
isTLS := request.URL.Scheme == "https"

return monitors.MakePingIPFactory(func(event *beat.Event, ip *net.IPAddr) error {
req, err := reqFactory()
if err != nil {
return fmt.Errorf("could not create http request: %w", err)
}
isTLS := req.URL.Scheme == "https"

addr := net.JoinHostPort(ip.String(), strconv.Itoa(int(port)))
d := &dialchain.DialerChain{
Net: dialchain.MakeConstAddrDialer(addr, dialchain.TCPDialer(timeout)),
Expand Down Expand Up @@ -163,7 +170,7 @@ func createPingFactory(
Transport: httpcommon.HeaderRoundTripper(transport, map[string]string{"User-Agent": userAgent}),
}

_, end, err := execPing(event, client, request, body, timeout, validator, config.Response)
_, end, err := execPing(event, client, req, body, timeout, validator, config.Response)
cbMutex.Lock()
defer cbMutex.Unlock()

Expand Down Expand Up @@ -313,11 +320,15 @@ func execRequest(client *http.Client, req *http.Request) (start time.Time, resp
return start, resp, nil
}

func splitHostnamePort(requ *http.Request) (string, uint16, error) {
host := requ.URL.Host
func splitHostnamePort(addr string) (string, uint16, error) {
u, err := url.Parse(addr)
if err != nil {
return "", 0, err
}
host := u.Host
// Try to add a default port if needed
if strings.LastIndex(host, ":") == -1 {
switch requ.URL.Scheme {
switch u.Scheme {
case urlSchemaHTTP:
host += ":80"
case urlSchemaHTTPS:
Expand All @@ -330,7 +341,7 @@ func splitHostnamePort(requ *http.Request) (string, uint16, error) {
}
p, err := strconv.ParseUint(port, 10, 16)
if err != nil {
return "", 0, fmt.Errorf("'%v' is no valid port number in '%v'", port, requ.URL.Host)
return "", 0, fmt.Errorf("'%v' is no valid port number in '%v'", port, u.Host)
}
return host, uint16(p), nil
}
Expand Down
2 changes: 1 addition & 1 deletion heartbeat/monitors/active/http/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func TestSplitHostnamePort(t *testing.T) {
request := &http.Request{
URL: url,
}
host, port, err := splitHostnamePort(request)
host, port, err := splitHostnamePort(request.URL.String())

if err != nil {
if test.expectedError == nil {
Expand Down

0 comments on commit cb3466c

Please sign in to comment.