Skip to content

Commit

Permalink
Add ELASTIC_APM_SERVER_TIMEOUT config
Browse files Browse the repository at this point in the history
Defaults to 30s, but can be overridden. If set to a non-positive
value, timeout is disabled.
  • Loading branch information
axw committed Jul 30, 2018
1 parent 26f3905 commit 2e6ab00
Show file tree
Hide file tree
Showing 8 changed files with 141 additions and 27 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
- module/apmhttprouter: added a wrapper type for httprouter.Router to simplify adding routes (#140)
- Add Transaction.Context methods for setting user IDs (#144)
- module/apmgocql: new instrumentation module, providing an observer for gocql (#148)
- Add ELASTIC\_APM\_SERVER\_TIMEOUT config (#TODO)

## [v0.4.0](https://github.com/elastic/apm-agent-go/releases/tag/v0.4.0)

Expand Down
13 changes: 13 additions & 0 deletions docs/configuration.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,19 @@ If you use HTTPS, then you may need to configure your client machines so
that the server certificate can be verified. You can also disable certificate
verification with <<config-verify-server-cert>>.

[float]
[[config-server-timeout]]
=== `ELASTIC_APM_SERVER_TIMEOUT`

[options="header"]
|============
| Environment | Default | Example
| `ELASTIC_APM_SERVER_TIMEOUT` | `30s` | `30s`
|============

The timeout for requests made to your Elastic APM server. When set to zero
or a negative value, timeouts will be disabled.

[float]
[[config-secret-token]]
=== `ELASTIC_APM_SECRET_TOKEN`
Expand Down
30 changes: 5 additions & 25 deletions env.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"time"

"github.com/pkg/errors"

"github.com/elastic/apm-agent-go/internal/apmconfig"
)

const (
Expand Down Expand Up @@ -51,11 +53,11 @@ var (
)

func initialFlushInterval() (time.Duration, error) {
return parseEnvDuration(envFlushInterval, "s", defaultFlushInterval)
return apmconfig.ParseDurationEnv(envFlushInterval, "s", defaultFlushInterval)
}

func initialMetricsInterval() (time.Duration, error) {
return parseEnvDuration(envMetricsInterval, "s", defaultMetricsInterval)
return apmconfig.ParseDurationEnv(envMetricsInterval, "s", defaultMetricsInterval)
}

func initialMaxTransactionQueueSize() (int, error) {
Expand Down Expand Up @@ -148,7 +150,7 @@ func initialService() (name, version, environment string) {
}

func initialSpanFramesMinDuration() (time.Duration, error) {
return parseEnvDuration(envSpanFramesMinDuration, "", defaultSpanFramesMinDuration)
return apmconfig.ParseDurationEnv(envSpanFramesMinDuration, "", defaultSpanFramesMinDuration)
}

func initialActive() (bool, error) {
Expand All @@ -162,25 +164,3 @@ func initialActive() (bool, error) {
}
return active, nil
}

func parseEnvDuration(envKey, defaultSuffix string, defaultDuration time.Duration) (time.Duration, error) {
value := os.Getenv(envKey)
if value == "" {
return defaultDuration, nil
}
d, err := time.ParseDuration(value)
if err != nil && defaultSuffix != "" {
// We allow the value to have no suffix, in which case we append
// defaultSuffix ("s" for flush interval) for compatibility with
// configuration for other Elastic APM agents.
var err2 error
d, err2 = time.ParseDuration(value + defaultSuffix)
if err2 == nil {
err = nil
}
}
if err != nil {
return 0, errors.Wrapf(err, "failed to parse %s", envKey)
}
return d, nil
}
25 changes: 25 additions & 0 deletions internal/apmconfig/duration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package apmconfig

import (
"time"
)

// ParseDuration parses value as a duration, appending defaultSuffix if value
// does not have one.
func ParseDuration(value, defaultSuffix string) (time.Duration, error) {
d, err := time.ParseDuration(value)
if err != nil && defaultSuffix != "" {
// We allow the value to have no suffix, in which case we append
// defaultSuffix ("s" for flush interval) for compatibility with
// configuration for other Elastic APM agents.
var err2 error
d, err2 = time.ParseDuration(value + defaultSuffix)
if err2 == nil {
err = nil
}
}
if err != nil {
return 0, err
}
return d, nil
}
24 changes: 24 additions & 0 deletions internal/apmconfig/env.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package apmconfig

import (
"os"
"time"

"github.com/pkg/errors"
)

// ParseDurationEnv gets the value of the environment variable envKey
// and, if set, parses it as a duration. If the value has no suffix,
// defaultSuffix is appended. If the environment variable is unset,
// defaultDuration is returned.
func ParseDurationEnv(envKey, defaultSuffix string, defaultDuration time.Duration) (time.Duration, error) {
value := os.Getenv(envKey)
if value == "" {
return defaultDuration, nil
}
d, err := ParseDuration(value, defaultSuffix)
if err != nil {
return 0, errors.Wrapf(err, "failed to parse %s", envKey)
}
return d, nil
}
33 changes: 33 additions & 0 deletions internal/apmconfig/env_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package apmconfig_test

import (
"os"
"testing"
"time"

"github.com/elastic/apm-agent-go/internal/apmconfig"
"github.com/stretchr/testify/assert"
)

func TestParseDurationEnv(t *testing.T) {
const envKey = "ELASTIC_APM_TEST_DURATION"
os.Setenv(envKey, "")

d, err := apmconfig.ParseDurationEnv(envKey, "s", 42*time.Second)
assert.NoError(t, err)
assert.Equal(t, 42*time.Second, d)

os.Setenv(envKey, "5")
d, err = apmconfig.ParseDurationEnv(envKey, "s", 42*time.Second)
assert.NoError(t, err)
assert.Equal(t, 5*time.Second, d)

os.Setenv(envKey, "5ms")
d, err = apmconfig.ParseDurationEnv(envKey, "s", 42*time.Second)
assert.NoError(t, err)
assert.Equal(t, 5*time.Millisecond, d)

os.Setenv(envKey, "5")
_, err = apmconfig.ParseDurationEnv(envKey, "", 42*time.Second)
assert.EqualError(t, err, "failed to parse ELASTIC_APM_TEST_DURATION: time: missing unit in duration 5")
}
17 changes: 15 additions & 2 deletions transport/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ import (
"net/url"
"os"
"strings"
"time"

"github.com/pkg/errors"

"github.com/elastic/apm-agent-go/internal/apmconfig"
"github.com/elastic/apm-agent-go/internal/fastjson"
"github.com/elastic/apm-agent-go/model"
)
Expand All @@ -26,6 +28,7 @@ const (

envSecretToken = "ELASTIC_APM_SECRET_TOKEN"
envServerURL = "ELASTIC_APM_SERVER_URL"
envServerTimeout = "ELASTIC_APM_SERVER_TIMEOUT"
envVerifyServerCert = "ELASTIC_APM_VERIFY_SERVER_CERT"

// gzipThresholdBytes is the minimum size of the uncompressed
Expand All @@ -38,7 +41,8 @@ var (
// in case another package replaces the value later.
defaultHTTPTransport = http.DefaultTransport.(*http.Transport)

defaultServerURL = "http://localhost:8200"
defaultServerURL = "http://localhost:8200"
defaultServerTimeout = 30 * time.Second
)

// HTTPTransport is an implementation of Transport, sending payloads via
Expand Down Expand Up @@ -105,6 +109,14 @@ func NewHTTPTransport(serverURL, secretToken string) (*HTTPTransport, error) {
}
}

timeout, err := apmconfig.ParseDurationEnv(envServerTimeout, "s", defaultServerTimeout)
if err != nil {
return nil, err
}
if timeout > 0 {
client.Timeout = timeout
}

headers := make(http.Header)
headers.Set("Content-Type", "application/json")
if secretToken == "" {
Expand Down Expand Up @@ -187,11 +199,12 @@ func (t *HTTPTransport) sendPayload(req *http.Request, op string) error {
if err != nil {
return errors.Wrapf(err, "sending request for %s failed", op)
}
defer resp.Body.Close()
switch resp.StatusCode {
case http.StatusOK, http.StatusAccepted:
resp.Body.Close()
return nil
}
defer resp.Body.Close()

// apm-server will return 503 Service Unavailable
// if the data cannot be published to Elasticsearch,
Expand Down
25 changes: 25 additions & 0 deletions transport/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import (
"os"
"sync"
"testing"
"time"

"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand All @@ -26,6 +28,7 @@ import (

func init() {
// Don't let the environment influence tests.
os.Setenv("ELASTIC_APM_SERVER_TIMEOUT", "")
os.Setenv("ELASTIC_APM_SERVER_URL", "")
os.Setenv("ELASTIC_APM_SECRET_TOKEN", "")
os.Setenv("ELASTIC_APM_VERIFY_SERVER_CERT", "")
Expand Down Expand Up @@ -229,6 +232,28 @@ func TestHTTPTransportLargeCompressed(t *testing.T) {
assert.Equal(t, string(jw.Bytes()), decoded.String())
}

func TestHTTPTransportServerTimeout(t *testing.T) {
done := make(chan struct{})
blockingHandler := func(w http.ResponseWriter, req *http.Request) { <-done }
server := httptest.NewServer(http.HandlerFunc(blockingHandler))
defer server.Close()
defer close(done)
defer patchEnv("ELASTIC_APM_SERVER_TIMEOUT", "50ms")()

before := time.Now()
transport, err := transport.NewHTTPTransport(server.URL, "")
assert.NoError(t, err)
err = transport.SendTransactions(context.Background(), &model.TransactionsPayload{})
taken := time.Since(before)
assert.Error(t, err)
err = errors.Cause(err)
assert.Implements(t, new(net.Error), err)
assert.True(t, err.(net.Error).Timeout())
assert.Condition(t, func() bool {
return taken >= 50*time.Millisecond
})
}

func newHTTPTransport(t *testing.T, handler http.Handler) (*transport.HTTPTransport, *httptest.Server) {
server := httptest.NewServer(handler)
transport, err := transport.NewHTTPTransport(server.URL, "")
Expand Down

0 comments on commit 2e6ab00

Please sign in to comment.