Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate more system tests to Go #4469

Merged
merged 6 commits into from
Dec 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions systemtest/apikeycmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ import (

func apiKeyCommand(subcommand string, args ...string) *apmservertest.ServerCmd {
cfg := apmservertest.DefaultConfig()
return apiKeyCommandConfig(cfg, subcommand, args...)
}

func apiKeyCommandConfig(cfg apmservertest.Config, subcommand string, args ...string) *apmservertest.ServerCmd {
cfgargs, err := cfg.Args()
if err != nil {
panic(err)
Expand Down Expand Up @@ -86,6 +90,22 @@ func TestAPIKeyCreateExpiration(t *testing.T) {
assert.Contains(t, attrs, "expiration")
}

func TestAPIKeyCreateInvalidUser(t *testing.T) {
// heartbeat_user lacks cluster privileges, and cannot create keys
// beats_user has cluster privileges, but not APM application privileges
for _, username := range []string{"heartbeat_user", "beats_user"} {
cfg := apmservertest.DefaultConfig()
cfg.Output.Elasticsearch.Username = username
cfg.Output.Elasticsearch.Password = "changeme"

cmd := apiKeyCommandConfig(cfg, "create", "--name", t.Name(), "--json")
out, err := cmd.CombinedOutput()
require.Error(t, err)
attrs := decodeJSONMap(t, bytes.NewReader(out))
assert.Regexp(t, username+` is missing the following requested privilege\(s\): .*`, attrs["error"])
}
}

func TestAPIKeyInvalidateName(t *testing.T) {
systemtest.InvalidateAPIKeys(t)
defer systemtest.InvalidateAPIKeys(t)
Expand Down
101 changes: 80 additions & 21 deletions systemtest/apmservertest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type Config struct {
Sampling *SamplingConfig `json:"apm-server.sampling,omitempty"`
RUM *RUMConfig `json:"apm-server.rum,omitempty"`
DataStreams *DataStreamsConfig `json:"apm-server.data_streams,omitempty"`
APIKey *APIKeyConfig `json:"apm-server.api_key,omitempty"`

// ResponseHeaders holds headers to add to all APM Server HTTP responses.
ResponseHeaders http.Header `json:"apm-server.response_headers,omitempty"`
Expand Down Expand Up @@ -125,12 +126,12 @@ func (t *TailSamplingConfig) MarshalJSON() ([]byte, error) {
// Convert time.Durations to durations, to encode as duration strings.
type config struct {
Enabled bool `json:"enabled"`
Interval duration `json:"interval"`
Interval string `json:"interval"`
Policies []TailSamplingPolicy `json:"policies,omitempty"`
}
return json.Marshal(config{
Enabled: t.Enabled,
Interval: duration(t.Interval),
Interval: durationString(t.Interval),
Policies: t.Policies,
})
}
Expand All @@ -157,9 +158,66 @@ type DataStreamsConfig struct {
Enabled bool `json:"enabled"`
}

// APIKeyConfig holds APM Server API Key auth configuration.
type APIKeyConfig struct {
Enabled bool `json:"enabled"`
}

// InstrumentationConfig holds APM Server instrumentation configuration.
type InstrumentationConfig struct {
Enabled bool `json:"enabled"`
Enabled bool `json:"enabled"`
Profiling *ProfilingConfig `json:"profiling,omitempty"`

Hosts []string `json:"hosts,omitempty"`
APIKey string `json:"api_key,omitempty"`
SecretToken string `json:"secret_token,omitempty"`
}

// ProfilingConfig holds APM Server profiling configuration.
type ProfilingConfig struct {
CPU *CPUProfilingConfig `json:"cpu,omitempty"`
Heap *HeapProfilingConfig `json:"heap,omitempty"`
}

// CPUProfilingConfig holds APM Server profiling configuration.
type CPUProfilingConfig struct {
Enabled bool `json:"enabled"`
Interval time.Duration `json:"interval,omitempty"`
Duration time.Duration `json:"duration,omitempty"`
}

func (c *CPUProfilingConfig) MarshalJSON() ([]byte, error) {
// time.Duration is encoded as int64.
// Convert time.Durations to durations, to encode as duration strings.
type config struct {
Enabled bool `json:"enabled"`
Interval string `json:"interval,omitempty"`
Duration string `json:"duration,omitempty"`
}
return json.Marshal(config{
Enabled: c.Enabled,
Interval: durationString(c.Interval),
Duration: durationString(c.Duration),
})
}

// HeapProfilingConfig holds APM Server profiling configuration.
type HeapProfilingConfig struct {
Enabled bool `json:"enabled"`
Interval time.Duration `json:"interval,omitempty"`
}

func (c *HeapProfilingConfig) MarshalJSON() ([]byte, error) {
// time.Duration is encoded as int64.
// Convert time.Durations to durations, to encode as duration strings.
type config struct {
Enabled bool `json:"enabled"`
Interval string `json:"interval,omitempty"`
}
return json.Marshal(config{
Enabled: c.Enabled,
Interval: durationString(c.Interval),
})
}

// OutputConfig holds APM Server libbeat output configuration.
Expand Down Expand Up @@ -203,14 +261,14 @@ func (m *MemoryQueueConfig) MarshalJSON() ([]byte, error) {
// time.Duration is encoded as int64.
// Convert time.Durations to durations, to encode as duration strings.
type config struct {
Events int `json:"events"`
FlushMinEvents int `json:"flush.min_events"`
FlushTimeout duration `json:"flush.timeout"`
Events int `json:"events"`
FlushMinEvents int `json:"flush.min_events"`
FlushTimeout string `json:"flush.timeout,omitempty"`
}
return json.Marshal(config{
Events: m.Events,
FlushMinEvents: m.FlushMinEvents,
FlushTimeout: duration(m.FlushTimeout),
FlushTimeout: durationString(m.FlushTimeout),
})
}

Expand All @@ -228,14 +286,14 @@ func (m *MonitoringConfig) MarshalJSON() ([]byte, error) {
type config struct {
Enabled bool `json:"enabled"`
Elasticsearch *ElasticsearchOutputConfig `json:"elasticsearch,omitempty"`
MetricsPeriod duration `json:"elasticsearch.metrics.period,omitempty"`
StatePeriod duration `json:"elasticsearch.state.period,omitempty"`
MetricsPeriod string `json:"elasticsearch.metrics.period,omitempty"`
StatePeriod string `json:"elasticsearch.state.period,omitempty"`
}
return json.Marshal(config{
Enabled: m.Enabled,
Elasticsearch: m.Elasticsearch,
MetricsPeriod: duration(m.MetricsPeriod),
StatePeriod: duration(m.StatePeriod),
MetricsPeriod: durationString(m.MetricsPeriod),
StatePeriod: durationString(m.StatePeriod),
})
}

Expand All @@ -255,12 +313,12 @@ func (m *TransactionAggregationConfig) MarshalJSON() ([]byte, error) {
// time.Duration is encoded as int64.
// Convert time.Durations to durations, to encode as duration strings.
type config struct {
Enabled bool `json:"enabled"`
Interval duration `json:"interval,omitempty"`
Enabled bool `json:"enabled"`
Interval string `json:"interval,omitempty"`
}
return json.Marshal(config{
Enabled: m.Enabled,
Interval: duration(m.Interval),
Interval: durationString(m.Interval),
})
}

Expand All @@ -274,19 +332,20 @@ func (s *ServiceDestinationAggregationConfig) MarshalJSON() ([]byte, error) {
// time.Duration is encoded as int64.
// Convert time.Durations to durations, to encode as duration strings.
type config struct {
Enabled bool `json:"enabled"`
Interval duration `json:"interval,omitempty"`
Enabled bool `json:"enabled"`
Interval string `json:"interval,omitempty"`
}
return json.Marshal(config{
Enabled: s.Enabled,
Interval: duration(s.Interval),
Interval: durationString(s.Interval),
})
}

type duration time.Duration

func (d duration) MarshalText() (text []byte, err error) {
return []byte(time.Duration(d).String()), nil
func durationString(d time.Duration) string {
if d == 0 {
return ""
}
return d.String()
}

func configArgs(cfg Config, extra map[string]interface{}) ([]string, error) {
Expand Down
166 changes: 166 additions & 0 deletions systemtest/instrumentation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,16 @@
package systemtest_test

import (
"bytes"
"encoding/json"
"net/http"
"net/http/httptest"
"net/http/httputil"
"net/url"
"sort"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -83,3 +91,161 @@ func TestAPMServerInstrumentation(t *testing.T) {
}
t.Fatal("failed to identify log message with matching trace IDs")
}

func TestAPMServerInstrumentationAuth(t *testing.T) {
test := func(t *testing.T, external, useSecretToken, useAPIKey bool) {
systemtest.CleanupElasticsearch(t)
srv := apmservertest.NewUnstartedServer(t)
srv.Config.SecretToken = "hunter2"
srv.Config.APIKey = &apmservertest.APIKeyConfig{Enabled: true}
srv.Config.Instrumentation = &apmservertest.InstrumentationConfig{Enabled: true}

serverURLChan := make(chan string, 1)
if external {
// The server URL is not known ahead of time, so we run
// a reverse proxy which waits for the server URL.
var serverURL string
var serverURLOnce sync.Once
proxy := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
serverURLOnce.Do(func() {
select {
case <-r.Context().Done():
case serverURL = <-serverURLChan:
}
})
u, err := url.Parse(serverURL)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
rp := httputil.NewSingleHostReverseProxy(u)
rp.ServeHTTP(w, r)
}))
defer proxy.Close()
srv.Config.Instrumentation.Hosts = []string{proxy.URL}
}
if useSecretToken {
srv.Config.Instrumentation.SecretToken = srv.Config.SecretToken
}
if useAPIKey {
systemtest.InvalidateAPIKeys(t)
defer systemtest.InvalidateAPIKeys(t)

cmd := apiKeyCommand("create", "--name", t.Name(), "--json")
out, err := cmd.CombinedOutput()
require.NoError(t, err)
attrs := decodeJSONMap(t, bytes.NewReader(out))
srv.Config.Instrumentation.APIKey = attrs["credentials"].(string)
}

err := srv.Start()
require.NoError(t, err)
serverURLChan <- srv.URL

// Send a transaction to the server, causing the server to
// trace the request from the agent.
tracer := srv.Tracer()
tracer.StartTransaction("name", "type").End()
tracer.Flush(nil)

systemtest.Elasticsearch.ExpectDocs(t, "apm-*", estest.BoolQuery{
Filter: []interface{}{
estest.TermQuery{
Field: "processor.event",
Value: "transaction",
},
estest.TermQuery{
Field: "service.name",
Value: "apm-server",
},
estest.TermQuery{
Field: "transaction.type",
Value: "request",
},
},
})
}
t.Run("self_no_auth", func(t *testing.T) {
// sending data to self, no auth specified
test(t, false, false, false)
})
t.Run("external_secret_token", func(t *testing.T) {
// sending data to external server, secret token specified
test(t, true, true, false)
})
t.Run("external_api_key", func(t *testing.T) {
// sending data to external server, API Key specified
test(t, true, false, true)
})
}

func TestAPMServerProfiling(t *testing.T) {
test := func(t *testing.T, profilingConfig *apmservertest.ProfilingConfig, expectedMetrics []string) {
systemtest.CleanupElasticsearch(t)
srv := apmservertest.NewUnstartedServer(t)
srv.Config.Instrumentation = &apmservertest.InstrumentationConfig{
Enabled: true,
Profiling: profilingConfig,
}
err := srv.Start()
require.NoError(t, err)

// Generate some load to cause the server to consume resources.
tracer := srv.Tracer()
for i := 0; i < 1000; i++ {
tracer.StartTransaction("name", "type").End()
}
tracer.Flush(nil)

result := systemtest.Elasticsearch.ExpectDocs(t, "apm-*", estest.TermQuery{
Field: "processor.event",
Value: "profile",
})
assert.Equal(t, expectedMetrics, profileMetricNames(result))
}
t.Run("cpu", func(t *testing.T) {
test(t, &apmservertest.ProfilingConfig{
CPU: &apmservertest.CPUProfilingConfig{
Enabled: true,
Interval: time.Second,
Duration: time.Second,
},
}, []string{"cpu.ns", "duration", "samples.count"})
})
t.Run("heap", func(t *testing.T) {
test(t, &apmservertest.ProfilingConfig{
Heap: &apmservertest.HeapProfilingConfig{
Enabled: true,
Interval: time.Second,
},
}, []string{
"alloc_objects.count",
"alloc_space.bytes",
"inuse_objects.count",
"inuse_space.bytes",
})
})
}

func profileMetricNames(result estest.SearchResult) []string {
unique := make(map[string]struct{})
var metricNames []string
for _, hit := range result.Hits.Hits {
profileField, ok := hit.Source["profile"].(map[string]interface{})
if !ok {
continue
}
for k, v := range profileField {
if _, ok := v.(float64); !ok {
continue
}
if _, ok := unique[k]; ok {
continue
}
unique[k] = struct{}{}
metricNames = append(metricNames, k)
}
}
sort.Strings(metricNames)
return metricNames
}
Loading