Skip to content

Commit

Permalink
case insensitive hostname comparison in kafka broker matching
Browse files Browse the repository at this point in the history
- re-use common.LocalIPAddrs in partition module for resolving IPs
- add missing net.IPAddr type switch to common.LocalIPAddrs
- update matching to extract addresses early on using strings.ToLower
  => ensure case insensitive matching by lowercasing
  • Loading branch information
urso committed Dec 14, 2016
1 parent 253faab commit d38d62c
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 72 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ https://github.com/elastic/beats/compare/v5.1.1...master[Check the HEAD diff]
*Metricbeat*

- Fix service times-out at startup. {pull}3056[3056]
- Kafka module case sensitive host name matching. {pull}3193[3193]

*Packetbeat*

Expand Down
24 changes: 19 additions & 5 deletions libbeat/common/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,29 @@ import (
// LocalIPAddrs finds the IP addresses of the hosts on which
// the shipper currently runs on.
func LocalIPAddrs() ([]net.IP, error) {
var localIPAddrs = []net.IP{}
var localIPAddrs []net.IP
ipaddrs, err := net.InterfaceAddrs()
if err != nil {
return []net.IP{}, err
return nil, err
}
for _, ipaddr := range ipaddrs {
if ipnet, ok := ipaddr.(*net.IPNet); ok {
localIPAddrs = append(localIPAddrs, ipnet.IP)
for _, addr := range ipaddrs {
var ip net.IP
ok := true

switch v := addr.(type) {
case *net.IPNet:
ip = v.IP
case *net.IPAddr:
ip = v.IP
default:
ok = false
}

if !ok {
continue
}

localIPAddrs = append(localIPAddrs, ip)
}
return localIPAddrs, nil
}
Expand Down
117 changes: 50 additions & 67 deletions metricbeat/module/kafka/partition/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (m *MetricSet) connect() (*sarama.Broker, error) {
return nil, err
}

other := findMatchingBroker(b.Addr(), meta.Brokers)
other := findMatchingBroker(brokerAddress(b), meta.Brokers)
if other == nil { // no broker found
closeBroker(b)
return nil, fmt.Errorf("No advertised broker with address %v found", b.Addr())
Expand Down Expand Up @@ -328,13 +328,22 @@ func findMatchingBroker(
addr string,
brokers []*sarama.Broker,
) *sarama.Broker {
lst := brokerAddresses(brokers)
if idx, found := findMatchingAddress(addr, lst); found {
return brokers[idx]
}
return nil
}

func findMatchingAddress(
addr string,
brokers []string,
) (int, bool) {
debugf("Try to match broker to: %v", addr)

// compare connection address to list of broker addresses
for _, b := range brokers {
if b.Addr() == addr {
return b
}
if i, found := indexOf(addr, brokers); found {
return i, true
}

// get connection 'port'
Expand All @@ -344,9 +353,9 @@ func findMatchingBroker(
}

// lookup local machines ips for comparing with broker addresses
localIPs, err := interfaceIPs()
localIPs, err := common.LocalIPAddrs()
if err != nil || len(localIPs) == 0 {
return nil
return -1, false
}
debugf("local machine ips: %v", localIPs)

Expand All @@ -356,9 +365,8 @@ func findMatchingBroker(
debugf("local machine addresses: %v", localHosts)
for _, host := range localHosts {
debugf("try to match with fqdn: %v (%v)", host, port)

if b := findBroker(host, port, brokers); b != nil {
return b
if i, found := indexOf(net.JoinHostPort(host, port), brokers); found {
return i, true
}
}

Expand All @@ -367,16 +375,17 @@ func findMatchingBroker(
if host, err := os.Hostname(); err == nil {
debugf("try to match with hostname only: %v (%v)", host, port)

if b := findBroker(host, port, brokers); b != nil {
return b
tmp := net.JoinHostPort(strings.ToLower(host), port)
if i, found := indexOf(tmp, brokers); found {
return i, true
}
}

// lookup ips for all brokers
debugf("match by ips")
for _, b := range brokers {
debugf("test broker address: %v", b.Addr())
bh, bp, err := net.SplitHostPort(b.Addr())
for i, b := range brokers {
debugf("test broker address: %v", b)
bh, bp, err := net.SplitHostPort(b)
if err != nil {
continue
}
Expand All @@ -396,59 +405,12 @@ func findMatchingBroker(
debugf("broker (%v) ips: %v", bh, ips)

// check if ip is known
if ipsMatch(ips, localIPs) {
return b
}
}

return nil
}

func findBroker(host, port string, brokers []*sarama.Broker) *sarama.Broker {
for _, b := range brokers {
debugf("test broker address: %v", b.Addr())

bh, bp, err := net.SplitHostPort(b.Addr())
if err != nil {
debugf("failed to parse broker address: %v", err)
continue
}

if bh == host && port == bp {
return b
if anyIPsMatch(ips, localIPs) {
return i, true
}
}

return nil
}

func interfaceIPs() ([]net.IP, error) {
var ips []net.IP
addrs, err := net.InterfaceAddrs()
if err != nil {
return nil, err
}
for _, addr := range addrs {
var ip net.IP
ok := true

switch v := addr.(type) {
case *net.IPNet:
ip = v.IP
case *net.IPAddr:
ip = v.IP
default:
debugf("non ip address: %v", addr)
ok = false
}

if !ok {
continue
}

ips = append(ips, ip)
}
return ips, nil
return -1, false
}

func lookupHosts(ips []net.IP) []string {
Expand All @@ -466,7 +428,7 @@ func lookupHosts(ips []net.IP) []string {
}

for _, host := range hosts {
h := strings.TrimSuffix(host, ".")
h := strings.ToLower(strings.TrimSuffix(host, "."))
set[h] = struct{}{}
}
}
Expand All @@ -478,7 +440,7 @@ func lookupHosts(ips []net.IP) []string {
return hosts
}

func ipsMatch(as, bs []net.IP) bool {
func anyIPsMatch(as, bs []net.IP) bool {
for _, a := range as {
for _, b := range bs {
if bytes.Equal(a, b) {
Expand All @@ -488,3 +450,24 @@ func ipsMatch(as, bs []net.IP) bool {
}
return false
}

func brokerAddresses(brokers []*sarama.Broker) []string {
addresses := make([]string, len(brokers))
for i, b := range brokers {
addresses[i] = brokerAddress(b)
}
return addresses
}

func brokerAddress(b *sarama.Broker) string {
return strings.ToLower(b.Addr())
}

func indexOf(s string, lst []string) (int, bool) {
for i, v := range lst {
if s == v {
return i, true
}
}
return -1, false
}

0 comments on commit d38d62c

Please sign in to comment.