Skip to content

Commit

Permalink
Feature/read metrics from unix socket npipe (elastic#14558)
Browse files Browse the repository at this point in the history
Allow monitoring information from libbeat to be retrieved over unix domain socket and named pipes.

(cherry picked from commit baf8c1d)
  • Loading branch information
ph committed Nov 22, 2019
1 parent 0902147 commit 7dd9485
Show file tree
Hide file tree
Showing 14 changed files with 535 additions and 13 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- By default, all Beats-created files and folders will have a umask of 0027 (on POSIX systems). {pull}14119[14119]
- Adding new `Enterprise` license type to the licenser. {issue}14246[14246]
- Fix memory leak in kubernetes autodiscover provider and add_kubernetes_metadata processor happening when pods are terminated without sending a delete event. {pull}14259[14259]
- Change wording when we fail to load a CA file to the cert pool. {issue}14309[14309]
- Allow Metricbeat's beat module to read monitoring information over a named pipe or unix domain socket. {pull}14558[14558]

*Auditbeat*

Expand Down
7 changes: 7 additions & 0 deletions libbeat/api/npipe/listener_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@ func DialContext(npipe string) func(context.Context, string, string) (net.Conn,
}
}

// Dial create a Dial to be use with an http.Client to connect to a pipe.
func Dial(npipe string) func(string, string) (net.Conn, error) {
return func(_, _ string) (net.Conn, error) {
return winio.DialPipe(npipe, nil)
}
}

// DefaultSD returns a default SecurityDescriptor which is the minimal required permissions to be
// able to write to the named pipe. The security descriptor is returned in SDDL format.
//
Expand Down
7 changes: 7 additions & 0 deletions libbeat/api/npipe/listerner_posix.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,10 @@ func DialContext(npipe string) func(context.Context, string, string) (net.Conn,
return nil, errors.New("named pipe doesn't work on linux")
}
}

// Dial create a Dial to be use with an http.Client to connect to a pipe.
func Dial(npipe string) func(string, string) (net.Conn, error) {
return func(_, _ string) (net.Conn, error) {
return nil, errors.New("named pipe doesn't work on linux")
}
}
2 changes: 1 addition & 1 deletion libbeat/docs/http-endpoint.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ You can query a unix socket using the `CURL` command and the `--unix-socket` fla

[source,js]
----
curl -XGET --unix-socket '/var/run/{beatname_lc}.sock' 'http://unix/stats/?pretty'
curl -XGET --unix-socket '/var/run/{beatname_lc}.sock' 'http:/stats/?pretty'
----


Expand Down
14 changes: 13 additions & 1 deletion libbeat/outputs/transport/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ func TestNetDialer(d testing.Driver, timeout time.Duration) Dialer {
if err != nil {
return nil, err
}

addresses, err := net.LookupHost(host)
d.Fatal("dns lookup", err)
d.Info("addresses", strings.Join(addresses, ", "))
Expand All @@ -59,3 +58,16 @@ func TestNetDialer(d testing.Driver, timeout time.Duration) Dialer {
return DialWith(dialer, network, host, addresses, port)
})
}

// UnixDialer creates a Unix Dialer when using unix domain socket.
func UnixDialer(timeout time.Duration, sockFile string) Dialer {
return TestUnixDialer(testing.NullDriver, timeout, sockFile)
}

// TestUnixDialer creates a Test Unix Dialer when using domain socket.
func TestUnixDialer(d testing.Driver, timeout time.Duration, sockFile string) Dialer {
return DialerFunc(func(network, address string) (net.Conn, error) {
d.Info("connecting using unix domain socket", sockFile)
return net.DialTimeout("unix", sockFile, timeout)
})
}
59 changes: 59 additions & 0 deletions metricbeat/helper/dialer/dialer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package dialer

import (
"fmt"
"time"

"github.com/elastic/beats/libbeat/outputs/transport"
)

// Builder is a dialer builder.
type Builder interface {
fmt.Stringer
Make(time.Duration) (transport.Dialer, error)
}

// DefaultDialerBuilder create a builder to dialer over TCP and UDP.
type DefaultDialerBuilder struct{}

// Make creates a dialer.
func (t *DefaultDialerBuilder) Make(timeout time.Duration) (transport.Dialer, error) {
return transport.NetDialer(timeout), nil
}

func (t *DefaultDialerBuilder) String() string {
return "TCP/UDP"
}

// NewDefaultDialerBuilder creates a DefaultDialerBuilder.
func NewDefaultDialerBuilder() *DefaultDialerBuilder {
return &DefaultDialerBuilder{}
}

// NewNpipeDialerBuilder creates a NpipeDialerBuilder.
func NewNpipeDialerBuilder(path string) *NpipeDialerBuilder {
return &NpipeDialerBuilder{Path: path}
}

// NewUnixDialerBuilder returns a new TransportUnix instance that will allow the HTTP client to communicate
// over a unix domain socket it require a valid path to the socket on the filesystem.
func NewUnixDialerBuilder(path string) *UnixDialerBuilder {
return &UnixDialerBuilder{Path: path}
}
57 changes: 57 additions & 0 deletions metricbeat/helper/dialer/dialer_posix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//+build !windows

package dialer

import (
"strings"
"time"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/outputs/transport"
)

// UnixDialerBuilder creates a builder to dial over unix domain socket.
type UnixDialerBuilder struct {
Path string
}

// Make creates a dialer.
func (t *UnixDialerBuilder) Make(timeout time.Duration) (transport.Dialer, error) {
return transport.UnixDialer(timeout, strings.TrimSuffix(t.Path, "/")), nil
}

func (t *UnixDialerBuilder) String() string {
return "Unix: " + t.Path
}

// NpipeDialerBuilder creates a builder to dial over a named pipe.
type NpipeDialerBuilder struct {
Path string
}

// Make creates a dialer.
func (t *NpipeDialerBuilder) Make(_ time.Duration) (transport.Dialer, error) {
return nil, errors.New("cannot the URI, named pipes are only supported on Windows")
}

func (t *NpipeDialerBuilder) String() string {
return "Npipe: " + t.Path
}
71 changes: 71 additions & 0 deletions metricbeat/helper/dialer/dialer_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//+build windows

package dialer

import (
"net"
"strings"
"time"

"github.com/pkg/errors"

winio "github.com/Microsoft/go-winio"

"github.com/elastic/beats/libbeat/api/npipe"
"github.com/elastic/beats/libbeat/outputs/transport"
)

// UnixDialerBuilder creates a builder to dial over a unix domain socket.
type UnixDialerBuilder struct {
Path string
}

// Make creates a dialer.
func (t *UnixDialerBuilder) Make(_ time.Duration) (transport.Dialer, error) {
return nil, errors.New(
"cannot use the URI, unix sockets are not supported on Windows, use npipe instead",
)
}

func (t *UnixDialerBuilder) String() string {
return "Unix: " + t.Path
}

// NpipeDialerBuilder creates a builder to dial over a named pipe.
type NpipeDialerBuilder struct {
Path string
}

func (t *NpipeDialerBuilder) String() string {
return "Npipe: " + t.Path
}

// Make creates a dialer.
func (t *NpipeDialerBuilder) Make(timeout time.Duration) (transport.Dialer, error) {
to := timeout
return transport.DialerFunc(
func(_, _ string) (net.Conn, error) {
return winio.DialPipe(
strings.TrimSuffix(npipe.TransformString(t.Path), "/"),
&to,
)
},
), nil
}
16 changes: 14 additions & 2 deletions metricbeat/helper/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,12 @@ import (

"github.com/elastic/beats/libbeat/common/transport/tlscommon"
"github.com/elastic/beats/libbeat/outputs/transport"
"github.com/elastic/beats/metricbeat/helper/dialer"
"github.com/elastic/beats/metricbeat/mb"
)

// HTTP is a custom HTTP Client that handle the complexity of connection and retrieving information
// from HTTP endpoint.
type HTTP struct {
hostData mb.HostData
client *http.Client // HTTP client that is reused across requests.
Expand Down Expand Up @@ -72,9 +75,18 @@ func newHTTPFromConfig(config Config, name string, hostData mb.HostData) (*HTTP,
return nil, err
}

var dialer, tlsDialer transport.Dialer
// Ensure backward compatibility
builder := hostData.Transport
if builder == nil {
builder = dialer.NewDefaultDialerBuilder()
}

dialer, err := builder.Make(config.ConnectTimeout)
if err != nil {
return nil, err
}

dialer = transport.NetDialer(config.ConnectTimeout)
var tlsDialer transport.Dialer
tlsDialer, err = transport.TLSDialer(dialer, tlsConfig, config.ConnectTimeout)
if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit 7dd9485

Please sign in to comment.