Skip to content

Commit

Permalink
Make test output honor output.elasticsearch.proxy_url
Browse files Browse the repository at this point in the history
  • Loading branch information
sakurai-youhei committed Oct 2, 2023
1 parent ee55f1d commit 268a6b7
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 5 deletions.
1 change: 1 addition & 0 deletions dev-tools/mage/integtest_docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ func WithGoIntegTestHostEnv(env map[string]string) map[string]string {
env["KIBANA_USER"] = "beats"
env["KIBANA_PASS"] = "testing"

env["PROXY_HOST"] = dockerServiceHostname
env["REDIS_HOST"] = dockerServiceHostname
env["SREDIS_HOST"] = dockerServiceHostname
env["LS_HOST"] = dockerServiceHostname
Expand Down
24 changes: 24 additions & 0 deletions libbeat/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ services:
elasticsearchssl: { condition: service_healthy }
logstash: { condition: service_healthy }
kafka: { condition: service_healthy }
proxy: { condition: service_healthy }
redis: { condition: service_healthy }
sredis: { condition: service_healthy }
kibana: { condition: service_healthy }
Expand All @@ -26,6 +27,7 @@ services:
interval: 1s
ports:
- 9200:9200
- 3128:3128 # Squid listens in the proxy service container.

elasticsearchssl:
extends:
Expand Down Expand Up @@ -67,6 +69,28 @@ services:
elasticsearch:
condition: service_healthy

proxy:
image: ubuntu/squid:latest
network_mode: service:elasticsearch
healthcheck:
test: ["CMD", "bash", "-c", "echo > /dev/tcp/localhost/3128 || exit 1"]
retries: 60
interval: 1s
entrypoint:
- /bin/sh
- -c
- |-
cat << EOF >> /etc/squid/conf.d/00_proxy.conf
auth_param basic program /usr/lib/squid/basic_fake_auth
acl auth proxy_auth REQUIRED
http_access deny !auth
http_access allow auth
http_access deny all
acl SSL_ports port 9200
dns_timeout 3 seconds
EOF
exec /usr/local/bin/entrypoint.sh -f /etc/squid/squid.conf -NYC
redis:
build: ${ES_BEATS}/testing/environments/docker/redis
ports:
Expand Down
59 changes: 54 additions & 5 deletions libbeat/esleg/eslegclient/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"time"

"go.elastic.co/apm/module/apmelasticsearch/v2"
"golang.org/x/net/proxy"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/productorigin"
Expand Down Expand Up @@ -312,15 +313,55 @@ func (conn *Connection) Close() error {
}

func (conn *Connection) Test(d testing.Driver) {
testProxyDialer := func(d testing.Driver, forward transport.Dialer, settings *httpcommon.HTTPClientProxySettings) transport.Dialer {
switch scheme := settings.URL.Scheme; scheme {
case "http", "https":
proxy.RegisterDialerType(scheme, settings.ProxyDialer)
}
return transport.TestProxyDialer(d, forward, &transport.ProxyConfig{URL: settings.URL.String()})
}

d.Run("elasticsearch: "+conn.URL, func(d testing.Driver) {
u, err := url.Parse(conn.URL)
d.Fatal("parse url", err)

address := u.Host

if proxyURL := conn.Transport.Proxy.URL; proxyURL != nil && !conn.Transport.Proxy.Disable {
d.Run("proxy", func(d testing.Driver) {
dialer := transport.TestNetDialer(d, conn.Transport.Timeout)

if proxyURL.Scheme == "https" {
tls, err := tlscommon.LoadTLSConfig(conn.Transport.TLS)
if err != nil {
d.Fatal("load tls config", err)
}
dialer = transport.TestTLSDialer(d, dialer, tls, conn.Transport.Timeout)
}

_, err := dialer.Dial("tcp", proxyURL.Host)
d.Fatal("dial up", err)
})
}

d.Run("connection", func(d testing.Driver) {
netDialer := transport.TestNetDialer(d, conn.Transport.Timeout)
_, err = netDialer.Dial("tcp", address)
var dialer transport.Dialer
if proxyURL := conn.Transport.Proxy.URL; proxyURL == nil || conn.Transport.Proxy.Disable {
dialer = transport.TestNetDialer(d, conn.Transport.Timeout)
} else {
dialer = transport.NetDialer(conn.Transport.Timeout)

if proxyURL.Scheme == "https" {
tls, err := tlscommon.LoadTLSConfig(conn.Transport.TLS)
if err != nil {
d.Fatal("load tls config", err)
}
dialer = transport.TLSDialer(dialer, tls, conn.Transport.Timeout)
}

dialer = testProxyDialer(d, dialer, &conn.Transport.Proxy)
}
_, err := dialer.Dial("tcp", address)
d.Fatal("dial up", err)
})

Expand All @@ -333,9 +374,17 @@ func (conn *Connection) Test(d testing.Driver) {
d.Fatal("load tls config", err)
}

netDialer := transport.NetDialer(conn.Transport.Timeout)
tlsDialer := transport.TestTLSDialer(d, netDialer, tls, conn.Transport.Timeout)
_, err = tlsDialer.Dial("tcp", address)
dialer := transport.NetDialer(conn.Transport.Timeout)

if proxyURL := conn.Transport.Proxy.URL; proxyURL != nil && !conn.Transport.Proxy.Disable {
if proxyURL.Scheme == "https" {
dialer = transport.TLSDialer(dialer, tls, conn.Transport.Timeout)
}
dialer = testProxyDialer(d, dialer, &conn.Transport.Proxy)
}

dialer = transport.TestTLSDialer(d, dialer, tls, conn.Transport.Timeout)
_, err = dialer.Dial("tcp", address)
d.Fatal("dial up", err)
})
}
Expand Down
53 changes: 53 additions & 0 deletions libbeat/tests/integration/cmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,56 @@ func TestCmdTestOutputBadHost(t *testing.T) {
mockbeat.WaitStdOutContains("parse url... OK", 10*time.Second)
mockbeat.WaitStdOutContains("dns lookup... ERROR", 10*time.Second)
}

func TestCmdTestOutputProxy(t *testing.T) {
esURL := GetESURL(t, "http")
proxyURL := GetProxyURL(t)
mockbeat := NewBeat(t, "mockbeat", "../../libbeat.test")
mockbeat.WriteConfigFile(fmt.Sprintf(CmdTestCfg, esURL.String()))
mockbeat.Start("test", "output", "-E", "output.elasticsearch.proxy_url="+proxyURL.String())
procState, err := mockbeat.Process.Wait()
require.NoError(t, err)
require.Equal(t, 0, procState.ExitCode(), "incorrect exit code")
mockbeat.WaitStdOutContains("parse url... OK", 10*time.Second)
mockbeat.WaitStdOutContains("proxy... OK", 10*time.Second)
mockbeat.WaitStdOutContains("TLS... WARN secure connection disabled", 10*time.Second)
mockbeat.WaitStdOutContains("talk to server... OK", 10*time.Second)
}

func TestCmdTestOutputProxyBadHost(t *testing.T) {
proxyURL := GetProxyURL(t)
mockbeat := NewBeat(t, "mockbeat", "../../libbeat.test")
mockbeat.WriteConfigFile(fmt.Sprintf(CmdTestCfg, "badhost:9200"))
mockbeat.Start("test", "output", "-E", "output.elasticsearch.proxy_url="+proxyURL.String())
procState, err := mockbeat.Process.Wait()
require.NoError(t, err)
require.Equal(t, 1, procState.ExitCode(), "incorrect exit code")
mockbeat.WaitStdOutContains("parse url... OK", 10*time.Second)
mockbeat.WaitStdOutContains("proxy... OK", 10*time.Second)
mockbeat.WaitStdOutContains("dial up... ERROR proxy server returned status code", 10*time.Second)
}

func TestCmdTestOutputBadProxy(t *testing.T) {
esURL := GetESURL(t, "http")
mockbeat := NewBeat(t, "mockbeat", "../../libbeat.test")
mockbeat.WriteConfigFile(fmt.Sprintf(CmdTestCfg, esURL.String()))
mockbeat.Start("test", "output", "-E", "output.elasticsearch.proxy_url=http://badproxy:8080")
procState, err := mockbeat.Process.Wait()
require.NoError(t, err)
require.Equal(t, 1, procState.ExitCode(), "incorrect exit code")
mockbeat.WaitStdOutContains("parse url... OK", 10*time.Second)
mockbeat.WaitStdOutContains("dns lookup... ERROR", 10*time.Second)
}

func TestCmdTestOutputBadProxyDisable(t *testing.T) {
esURL := GetESURL(t, "http")
mockbeat := NewBeat(t, "mockbeat", "../../libbeat.test")
mockbeat.WriteConfigFile(fmt.Sprintf(CmdTestCfg, esURL.String()))
mockbeat.Start("test", "output", "-E", "output.elasticsearch.proxy_url=http://badproxy:8080", "-E", "output.elasticsearch.proxy_disable=true")
procState, err := mockbeat.Process.Wait()
require.NoError(t, err)
require.Equal(t, 0, procState.ExitCode(), "incorrect exit code")
mockbeat.WaitStdOutContains("parse url... OK", 10*time.Second)
mockbeat.WaitStdOutContains("TLS... WARN secure connection disabled", 10*time.Second)
mockbeat.WaitStdOutContains("talk to server... OK", 10*time.Second)
}
36 changes: 36 additions & 0 deletions libbeat/tests/integration/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,42 @@ func GetKibana(t *testing.T) (url.URL, *url.Userinfo) {
return kibanaURL, kibanaUser
}

func GetProxyURL(t *testing.T) url.URL {
t.Helper()

scheme := os.Getenv("PROXY_SCHEME")
if scheme == "" {
scheme = "http"
}

proxyHost := os.Getenv("PROXY_HOST")
if proxyHost == "" {
proxyHost = "localhost"
}

proxyPort := os.Getenv("PROXY_PORT")
if proxyPort == "" {
proxyPort = "3128"
}

user := os.Getenv("PROXY_USER")
if user == "" {
user = "proxy"
}

pass := os.Getenv("PROXY_PASS")
if pass == "" {
pass = "testing"
}

proxyURL := url.URL{
Scheme: scheme,
Host: fmt.Sprintf("%s:%s", proxyHost, proxyPort),
User: url.UserPassword(user, pass),
}
return proxyURL
}

func HttpDo(t *testing.T, method string, targetURL url.URL) (statusCode int, body []byte, err error) {
t.Helper()
client := &http.Client{}
Expand Down

0 comments on commit 268a6b7

Please sign in to comment.