Skip to content

Commit

Permalink
Cleanup byHostname jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewvc committed Apr 7, 2020
1 parent d7a2cf8 commit 5dc587e
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 64 deletions.
4 changes: 1 addition & 3 deletions heartbeat/monitors/active/http/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,8 @@ func newHTTPMonitorIPsJob(
return nil, err
}

settings := monitors.MakeHostJobSettings(hostname, config.Mode)

pingFactory := createPingFactory(config, port, tls, req, body, validator)
job, err := monitors.MakeByHostJob(settings, pingFactory)
job, err := monitors.MakeByHostJob(hostname, config.Mode, pingFactory)

return job, err
}
Expand Down
3 changes: 1 addition & 2 deletions heartbeat/monitors/active/icmp/icmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ func create(
pingFactory := monitors.MakePingIPFactory(createPingIPFactory(&config))

for _, host := range config.Hosts {
settings := monitors.MakeHostJobSettings(host, config.Mode)
job, err := monitors.MakeByHostJob(settings, pingFactory)
job, err := monitors.MakeByHostJob(host, config.Mode, pingFactory)

if err != nil {
return nil, 0, err
Expand Down
27 changes: 18 additions & 9 deletions heartbeat/monitors/active/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,18 +129,18 @@ func (jf *jobFactory) makeEndpointJob(endpointURL *url.URL) (jobs.Job, error) {
// in resolving the actual IP.
// Create one job for every port number configured.
if jf.config.Socks5.URL != "" && !jf.config.Socks5.LocalResolve {
return wrappers.WithURLField(endpointURL,
jobs.MakeSimpleJob(func(event *beat.Event) error {
hostPort := net.JoinHostPort(endpointURL.Hostname(), endpointURL.Port())
return jf.dial(event, hostPort, endpointURL)
})), nil
jf.makeSocksLookupEndpointJob(endpointURL)
}

// Create job that first resolves one or multiple IP (depending on
// config.Mode) in order to create one continuation Task per IP.
settings := monitors.MakeHostJobSettings(endpointURL.Hostname(), jf.config.Mode)
return jf.makeDirectEndpointJob(endpointURL)
}

job, err := monitors.MakeByHostJob(settings,
// makeDirectEndpointJob makes jobs that directly lookup the IP of the endpoints, as opposed to using
// a Socks5 proxy.
func (jf *jobFactory) makeDirectEndpointJob(endpointURL *url.URL) (jobs.Job, error) {
// Create job that first resolves one or multiple IPs (depending on
// config.Mode) in order to create one continuation Task per IP.
job, err := monitors.MakeByHostJob(endpointURL.Hostname(), jf.config.Mode,
monitors.MakePingIPFactory(
func(event *beat.Event, ip *net.IPAddr) error {
// use address from resolved IP
Expand All @@ -154,6 +154,15 @@ func (jf *jobFactory) makeEndpointJob(endpointURL *url.URL) (jobs.Job, error) {
return job, nil
}

// makeDirectEndpointJob makes jobs that use a Socks5 proxy to perform DNS lookups
func (jf *jobFactory) makeSocksLookupEndpointJob(endpointURL *url.URL) (jobs.Job, error) {
return wrappers.WithURLField(endpointURL,
jobs.MakeSimpleJob(func(event *beat.Event) error {
hostPort := net.JoinHostPort(endpointURL.Hostname(), endpointURL.Port())
return jf.dial(event, hostPort, endpointURL)
})), nil
}

// dial builds a dialer and executes the network request.
// dialAddr is the host:port that the dialer will connect to, and where an explicit IP should go to.
// canonicalURL is the URL used to determine if TLS is used via the scheme of the URL, and
Expand Down
60 changes: 10 additions & 50 deletions heartbeat/monitors/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,6 @@ type IPSettings struct {
Mode PingMode `config:"mode"`
}

// HostJobSettings configures a Job including Host lookups and global fields to be added
// to every event.
type HostJobSettings struct {
Host string
IP IPSettings
Fields common.MapStr
}

// PingMode enumeration for configuring `any` or `all` IPs pinging.
type PingMode uint8

Expand Down Expand Up @@ -130,35 +122,34 @@ func MakeByIPJob(
// A pingFactory instance is normally build with MakePingIPFactory,
// MakePingAllIPFactory or MakePingAllIPPortFactory.
func MakeByHostJob(
settings HostJobSettings,
host string,
ipSettings IPSettings,
pingFactory func(ip *net.IPAddr) jobs.Job,
) (jobs.Job, error) {
host := settings.Host

if ip := net.ParseIP(host); ip != nil {
return MakeByIPJob(ip, pingFactory)
}

network := settings.IP.Network()
network := ipSettings.Network()
if network == "" {
return nil, errors.New("pinging hosts requires ipv4 or ipv6 mode enabled")
}

mode := settings.IP.Mode
mode := ipSettings.Mode

if mode == PingAny {
return makeByHostAnyIPJob(settings, host, pingFactory), nil
return makeByHostAnyIPJob(host, ipSettings, pingFactory), nil
}

return makeByHostAllIPJob(settings, host, pingFactory), nil
return makeByHostAllIPJob(host, ipSettings, pingFactory), nil
}

func makeByHostAnyIPJob(
settings HostJobSettings,
host string,
ipSettings IPSettings,
pingFactory func(ip *net.IPAddr) jobs.Job,
) jobs.Job {
network := settings.IP.Network()
network := ipSettings.Network()

return func(event *beat.Event) ([]jobs.Job, error) {
resolveStart := time.Now()
Expand All @@ -176,11 +167,11 @@ func makeByHostAnyIPJob(
}

func makeByHostAllIPJob(
settings HostJobSettings,
host string,
ipSettings IPSettings,
pingFactory func(ip *net.IPAddr) jobs.Job,
) jobs.Job {
network := settings.IP.Network()
network := ipSettings.Network()
filter := makeIPFilter(network)

return func(event *beat.Event) ([]jobs.Job, error) {
Expand Down Expand Up @@ -265,34 +256,3 @@ func filterIPs(ips []net.IP, filt func(net.IP) bool) []net.IP {
}
return out
}

// MakeHostJobSettings creates a new HostJobSettings structure without any global
// event fields.
func MakeHostJobSettings(host string, ip IPSettings) HostJobSettings {
return HostJobSettings{Host: host, IP: ip}
}

// WithFields adds new event fields to a Job. Existing fields will be
// overwritten.
// The fields map will be updated (no copy).
func (s HostJobSettings) WithFields(m common.MapStr) HostJobSettings {
s.AddFields(m)
return s
}

// AddFields adds new event fields to a Job. Existing fields will be
// overwritten.
func (s *HostJobSettings) AddFields(m common.MapStr) { addFields(&s.Fields, m) }

func addFields(to *common.MapStr, m common.MapStr) {
if m == nil {
return
}

fields := *to
if fields == nil {
fields = common.MapStr{}
*to = fields
}
fields.DeepUpdate(m)
}

0 comments on commit 5dc587e

Please sign in to comment.