Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Heartbeat] Separate http req per task #29697

Merged
merged 5 commits into from
Jan 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

- Remove accidentally included cups library in docker images. {pull}28853[pull]
- Fix broken monitors with newer versions of image relying on dup3. {pull}28938[pull]
- Fix race condition in http monitors using `mode:all` that can cause crashes. {pull}29697[pull]

*Metricbeat*

Expand Down
47 changes: 29 additions & 18 deletions heartbeat/monitors/active/http/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ import (
"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"
)

type requestFactory func() (*http.Request, error)

func newHTTPMonitorHostJob(
addr string,
config *Config,
Expand All @@ -54,10 +56,7 @@ func newHTTPMonitorHostJob(
validator multiValidator,
) (jobs.Job, error) {

request, err := buildRequest(addr, config, enc)
if err != nil {
return nil, err
}
var reqFactory requestFactory = func() (*http.Request, error) { return buildRequest(addr, config, enc) }

return jobs.MakeSimpleJob(func(event *beat.Event) error {
var redirects []string
Expand All @@ -67,7 +66,13 @@ func newHTTPMonitorHostJob(
Transport: transport,
Timeout: config.Transport.Timeout,
}
_, _, err := execPing(event, client, request, body, config.Transport.Timeout, validator, config.Response)

req, err := reqFactory()
if err != nil {
return fmt.Errorf("could not make http request: %w", err)
}

_, _, err = execPing(event, client, req, body, config.Transport.Timeout, validator, config.Response)
if len(redirects) > 0 {
event.PutValue("http.response.redirects", redirects)
}
Expand All @@ -84,17 +89,14 @@ func newHTTPMonitorIPsJob(
validator multiValidator,
) (jobs.Job, error) {

req, err := buildRequest(addr, config, enc)
if err != nil {
return nil, err
}
var reqFactory requestFactory = func() (*http.Request, error) { return buildRequest(addr, config, enc) }

hostname, port, err := splitHostnamePort(req)
hostname, port, err := splitHostnamePort(addr)
if err != nil {
return nil, err
}

pingFactory := createPingFactory(config, port, tls, req, body, validator)
pingFactory := createPingFactory(config, port, tls, reqFactory, body, validator)
job, err := monitors.MakeByHostJob(hostname, config.Mode, monitors.NewStdResolver(), pingFactory)

return job, err
Expand All @@ -104,14 +106,19 @@ func createPingFactory(
config *Config,
port uint16,
tls *tlscommon.TLSConfig,
request *http.Request,
reqFactory requestFactory,
body []byte,
validator multiValidator,
) func(*net.IPAddr) jobs.Job {
timeout := config.Transport.Timeout
isTLS := request.URL.Scheme == "https"

return monitors.MakePingIPFactory(func(event *beat.Event, ip *net.IPAddr) error {
req, err := reqFactory()
if err != nil {
return fmt.Errorf("could not create http request: %w", err)
}
isTLS := req.URL.Scheme == "https"

addr := net.JoinHostPort(ip.String(), strconv.Itoa(int(port)))
d := &dialchain.DialerChain{
Net: dialchain.MakeConstAddrDialer(addr, dialchain.TCPDialer(timeout)),
Expand Down Expand Up @@ -163,7 +170,7 @@ func createPingFactory(
Transport: httpcommon.HeaderRoundTripper(transport, map[string]string{"User-Agent": userAgent}),
}

_, end, err := execPing(event, client, request, body, timeout, validator, config.Response)
_, end, err := execPing(event, client, req, body, timeout, validator, config.Response)
cbMutex.Lock()
defer cbMutex.Unlock()

Expand Down Expand Up @@ -313,11 +320,15 @@ func execRequest(client *http.Client, req *http.Request) (start time.Time, resp
return start, resp, nil
}

func splitHostnamePort(requ *http.Request) (string, uint16, error) {
host := requ.URL.Host
func splitHostnamePort(addr string) (string, uint16, error) {
u, err := url.Parse(addr)
if err != nil {
return "", 0, err
}
host := u.Host
// Try to add a default port if needed
if strings.LastIndex(host, ":") == -1 {
switch requ.URL.Scheme {
switch u.Scheme {
case urlSchemaHTTP:
host += ":80"
case urlSchemaHTTPS:
Expand All @@ -330,7 +341,7 @@ func splitHostnamePort(requ *http.Request) (string, uint16, error) {
}
p, err := strconv.ParseUint(port, 10, 16)
if err != nil {
return "", 0, fmt.Errorf("'%v' is no valid port number in '%v'", port, requ.URL.Host)
return "", 0, fmt.Errorf("'%v' is no valid port number in '%v'", port, u.Host)
}
return host, uint16(p), nil
}
Expand Down
2 changes: 1 addition & 1 deletion heartbeat/monitors/active/http/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func TestSplitHostnamePort(t *testing.T) {
request := &http.Request{
URL: url,
}
host, port, err := splitHostnamePort(request)
host, port, err := splitHostnamePort(request.URL.String())

if err != nil {
if test.expectedError == nil {
Expand Down