Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/read metrics from unix socket npipe #14558

Merged
merged 14 commits into from
Nov 22, 2019
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- By default, all Beats-created files and folders will have a umask of 0027 (on POSIX systems). {pull}14119[14119]
- Adding new `Enterprise` license type to the licenser. {issue}14246[14246]
- Change wording when we fail to load a CA file to the cert pool. {issue}14309[14309]
- Allow Metricbeat's beat module to read monitoring information over a named pipe or unix domain socket. {pull}14558[14558]

*Auditbeat*

Expand Down
7 changes: 7 additions & 0 deletions libbeat/api/npipe/listener_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@ func DialContext(npipe string) func(context.Context, string, string) (net.Conn,
}
}

// Dial create a Dial to be use with an http.Client to connect to a pipe.
func Dial(npipe string) func(string, string) (net.Conn, error) {
return func(_, _ string) (net.Conn, error) {
return winio.DialPipe(npipe, nil)
}
}
ycombinator marked this conversation as resolved.
Show resolved Hide resolved

// DefaultSD returns a default SecurityDescriptor which is the minimal required permissions to be
// able to write to the named pipe. The security descriptor is returned in SDDL format.
//
Expand Down
7 changes: 7 additions & 0 deletions libbeat/api/npipe/listerner_posix.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,10 @@ func DialContext(npipe string) func(context.Context, string, string) (net.Conn,
return nil, errors.New("named pipe doesn't work on linux")
}
}

// Dial create a Dial to be use with an http.Client to connect to a pipe.
func Dial(npipe string) func(string, string) (net.Conn, error) {
return func(_, _ string) (net.Conn, error) {
return nil, errors.New("named pipe doesn't work on linux")
}
}
ycombinator marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 1 addition & 1 deletion libbeat/docs/http-endpoint.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ You can query a unix socket using the `CURL` command and the `--unix-socket` fla

[source,js]
----
curl -XGET --unix-socket '/var/run/{beatname_lc}.sock' 'http://unix/stats/?pretty'
curl -XGET --unix-socket '/var/run/{beatname_lc}.sock' 'http:/stats/?pretty'
----


Expand Down
14 changes: 13 additions & 1 deletion libbeat/outputs/transport/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ func TestNetDialer(d testing.Driver, timeout time.Duration) Dialer {
if err != nil {
return nil, err
}

addresses, err := net.LookupHost(host)
d.Fatal("dns lookup", err)
d.Info("addresses", strings.Join(addresses, ", "))
Expand All @@ -59,3 +58,16 @@ func TestNetDialer(d testing.Driver, timeout time.Duration) Dialer {
return DialWith(dialer, network, host, addresses, port)
})
}

// UnixDialer creates a Unix Dialer when using unix domain socket.
func UnixDialer(timeout time.Duration, sockFile string) Dialer {
return TestUnixDialer(testing.NullDriver, timeout, sockFile)
}

// TestUnixDialer creates a Test Unix Dialer when using domain socket.
func TestUnixDialer(d testing.Driver, timeout time.Duration, sockFile string) Dialer {
return DialerFunc(func(network, address string) (net.Conn, error) {
d.Info("connecting using unix domain socket", sockFile)
return net.DialTimeout("unix", sockFile, timeout)
})
}
43 changes: 43 additions & 0 deletions metricbeat/helper/dialer_posix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//+build !windows

package helper

import (
"fmt"
"strings"
"time"

"github.com/elastic/beats/libbeat/outputs/transport"
"github.com/elastic/beats/metricbeat/mb"
)

func makeDialer(t time.Duration, hostData mb.HostData) (transport.Dialer, string, error) {
switch hostData.Transport {
case mb.TransportNpipe:
return nil, "", fmt.Errorf(
"cannot use %s as the URI, named pipes are only supported on Windows",
hostData.SanitizedURI,
)
case mb.TransportUnix:
return transport.UnixDialer(t, strings.TrimSuffix(hostData.TransportPath, "/")), hostData.SanitizedURI, nil
default:
return transport.NetDialer(t), hostData.SanitizedURI, nil
}
}
46 changes: 46 additions & 0 deletions metricbeat/helper/dialer_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//+build windows

package helper

import (
"fmt"
"strings"
"time"

"github.com/elastic/beats/libbeat/api/npipe"
"github.com/elastic/beats/libbeat/outputs/transport"
"github.com/elastic/beats/metricbeat/mb"
)

func makeDialer(t time.Duration, hostData mb.HostData) (transport.Dialer, string, error) {
switch hostData.Transport {
case mb.TransportUnix:
return nil, "", fmt.Errorf(
"cannot use %s as the URI, unix sockets are not supported on Windows, use npipe instead",
hostData.SanitizedURI,
)
case mb.TransportNpipe:
return npipe.DialContext(
strings.TrimSuffix(npipe.TransformString(p), "/"),
ph marked this conversation as resolved.
Show resolved Hide resolved
), hostData.SanitizedURI, nil
default:
return transport.NetDialer(t), hostData.SanitizedURI, nil
}
}
11 changes: 8 additions & 3 deletions metricbeat/helper/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
"github.com/elastic/beats/metricbeat/mb"
)

// HTTP is a custom HTTP Client that handle the complexity of connection and retrieving information
// from HTTP endpoint.
type HTTP struct {
hostData mb.HostData
client *http.Client // HTTP client that is reused across requests.
Expand Down Expand Up @@ -72,9 +74,12 @@ func newHTTPFromConfig(config Config, name string, hostData mb.HostData) (*HTTP,
return nil, err
}

var dialer, tlsDialer transport.Dialer
var tlsDialer transport.Dialer
ph marked this conversation as resolved.
Show resolved Hide resolved
dialer, uri, err := makeDialer(config.ConnectTimeout, hostData)
if err != nil {
return nil, err
}

dialer = transport.NetDialer(config.ConnectTimeout)
tlsDialer, err = transport.TLSDialer(dialer, tlsConfig, config.ConnectTimeout)
if err != nil {
return nil, err
Expand All @@ -92,7 +97,7 @@ func newHTTPFromConfig(config Config, name string, hostData mb.HostData) (*HTTP,
},
headers: config.Headers,
method: "GET",
uri: hostData.SanitizedURI,
uri: uri,
body: nil,
}, nil
}
Expand Down
87 changes: 87 additions & 0 deletions metricbeat/helper/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
package helper

import (
"fmt"
"io/ioutil"
"net"
"net/http"
"net/http/httptest"
"os"
"runtime"
"testing"
"time"

Expand Down Expand Up @@ -158,6 +161,90 @@ func TestAuthentication(t *testing.T) {
assert.Equal(t, http.StatusOK, response.StatusCode, "response status code")
}

func TestOverUnixSocket(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skipf("unix domain socket aren't supported under Windows")
return
}

t.Run("at root", func(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "testsocket")
require.NoError(t, err)
defer os.RemoveAll(tmpDir)

sockFile := tmpDir + "/test.sock"

l, err := net.Listen("unix", sockFile)
require.NoError(t, err)

defer l.Close()

mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "ehlo!")
})

go http.Serve(l, mux)

cfg := defaultConfig()
hostData := mb.HostData{
Transport: mb.TransportUnix,
TransportPath: sockFile,
URI: "http://unix/",
SanitizedURI: "http://unix",
}

h, err := newHTTPFromConfig(cfg, "test", hostData)
require.NoError(t, err)

r, err := h.FetchResponse()
require.NoError(t, err)
defer r.Body.Close()
content, err := ioutil.ReadAll(r.Body)
require.NoError(t, err)
assert.Equal(t, []byte("ehlo!"), content)
})

t.Run("at specific path", func(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "testsocket")
require.NoError(t, err)
defer os.RemoveAll(tmpDir)

sockFile := tmpDir + "/test.sock"
uri := "http://unix/ok"

l, err := net.Listen("unix", sockFile)
require.NoError(t, err)

defer l.Close()

mux := http.NewServeMux()
mux.HandleFunc("/ok", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "ehlo!")
})

go http.Serve(l, mux)

cfg := defaultConfig()
hostData := mb.HostData{
Transport: mb.TransportUnix,
TransportPath: sockFile,
URI: uri,
SanitizedURI: uri,
}

h, err := newHTTPFromConfig(cfg, "test", hostData)
require.NoError(t, err)

r, err := h.FetchResponse()
require.NoError(t, err)
defer r.Body.Close()
content, err := ioutil.ReadAll(r.Body)
require.NoError(t, err)
assert.Equal(t, []byte("ehlo!"), content)
})
}

func checkTimeout(t *testing.T, h *HTTP) {
t.Helper()

Expand Down
Loading