From 15ae3513ad4ac0f3cb0f262d98825b77e518f516 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Mon, 30 Jul 2018 16:45:37 +0800 Subject: [PATCH] Add ELASTIC_APM_SERVER_TIMEOUT config Defaults to 30s, but can be overridden. If set to a non-positive value, timeout is disabled. --- CHANGELOG.md | 1 + docs/configuration.asciidoc | 13 +++++++++++++ env.go | 30 +++++------------------------- internal/apmconfig/duration.go | 25 +++++++++++++++++++++++++ internal/apmconfig/env.go | 24 ++++++++++++++++++++++++ internal/apmconfig/env_test.go | 34 ++++++++++++++++++++++++++++++++++ transport/http.go | 17 +++++++++++++++-- transport/http_test.go | 25 +++++++++++++++++++++++++ 8 files changed, 142 insertions(+), 27 deletions(-) create mode 100644 internal/apmconfig/duration.go create mode 100644 internal/apmconfig/env.go create mode 100644 internal/apmconfig/env_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 92474a9af..2c267c25b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ - module/apmhttprouter: added a wrapper type for httprouter.Router to simplify adding routes (#140) - Add Transaction.Context methods for setting user IDs (#144) - module/apmgocql: new instrumentation module, providing an observer for gocql (#148) + - Add ELASTIC\_APM\_SERVER\_TIMEOUT config (#157) ## [v0.4.0](https://github.com/elastic/apm-agent-go/releases/tag/v0.4.0) diff --git a/docs/configuration.asciidoc b/docs/configuration.asciidoc index 38dbbedf8..ee1b8501f 100644 --- a/docs/configuration.asciidoc +++ b/docs/configuration.asciidoc @@ -27,6 +27,19 @@ If you use HTTPS, then you may need to configure your client machines so that the server certificate can be verified. You can also disable certificate verification with <>. +[float] +[[config-server-timeout]] +=== `ELASTIC_APM_SERVER_TIMEOUT` + +[options="header"] +|============ +| Environment | Default | Example +| `ELASTIC_APM_SERVER_TIMEOUT` | `30s` | `30s` +|============ + +The timeout for requests made to your Elastic APM server. When set to zero +or a negative value, timeouts will be disabled. + [float] [[config-secret-token]] === `ELASTIC_APM_SECRET_TOKEN` diff --git a/env.go b/env.go index a13377c45..d7ea10ced 100644 --- a/env.go +++ b/env.go @@ -12,6 +12,8 @@ import ( "time" "github.com/pkg/errors" + + "github.com/elastic/apm-agent-go/internal/apmconfig" ) const ( @@ -51,11 +53,11 @@ var ( ) func initialFlushInterval() (time.Duration, error) { - return parseEnvDuration(envFlushInterval, "s", defaultFlushInterval) + return apmconfig.ParseDurationEnv(envFlushInterval, "s", defaultFlushInterval) } func initialMetricsInterval() (time.Duration, error) { - return parseEnvDuration(envMetricsInterval, "s", defaultMetricsInterval) + return apmconfig.ParseDurationEnv(envMetricsInterval, "s", defaultMetricsInterval) } func initialMaxTransactionQueueSize() (int, error) { @@ -148,7 +150,7 @@ func initialService() (name, version, environment string) { } func initialSpanFramesMinDuration() (time.Duration, error) { - return parseEnvDuration(envSpanFramesMinDuration, "", defaultSpanFramesMinDuration) + return apmconfig.ParseDurationEnv(envSpanFramesMinDuration, "", defaultSpanFramesMinDuration) } func initialActive() (bool, error) { @@ -162,25 +164,3 @@ func initialActive() (bool, error) { } return active, nil } - -func parseEnvDuration(envKey, defaultSuffix string, defaultDuration time.Duration) (time.Duration, error) { - value := os.Getenv(envKey) - if value == "" { - return defaultDuration, nil - } - d, err := time.ParseDuration(value) - if err != nil && defaultSuffix != "" { - // We allow the value to have no suffix, in which case we append - // defaultSuffix ("s" for flush interval) for compatibility with - // configuration for other Elastic APM agents. - var err2 error - d, err2 = time.ParseDuration(value + defaultSuffix) - if err2 == nil { - err = nil - } - } - if err != nil { - return 0, errors.Wrapf(err, "failed to parse %s", envKey) - } - return d, nil -} diff --git a/internal/apmconfig/duration.go b/internal/apmconfig/duration.go new file mode 100644 index 000000000..28f57598c --- /dev/null +++ b/internal/apmconfig/duration.go @@ -0,0 +1,25 @@ +package apmconfig + +import ( + "time" +) + +// ParseDuration parses value as a duration, appending defaultSuffix if value +// does not have one. +func ParseDuration(value, defaultSuffix string) (time.Duration, error) { + d, err := time.ParseDuration(value) + if err != nil && defaultSuffix != "" { + // We allow the value to have no suffix, in which case we append + // defaultSuffix ("s" for flush interval) for compatibility with + // configuration for other Elastic APM agents. + var err2 error + d, err2 = time.ParseDuration(value + defaultSuffix) + if err2 == nil { + err = nil + } + } + if err != nil { + return 0, err + } + return d, nil +} diff --git a/internal/apmconfig/env.go b/internal/apmconfig/env.go new file mode 100644 index 000000000..3552d3c69 --- /dev/null +++ b/internal/apmconfig/env.go @@ -0,0 +1,24 @@ +package apmconfig + +import ( + "os" + "time" + + "github.com/pkg/errors" +) + +// ParseDurationEnv gets the value of the environment variable envKey +// and, if set, parses it as a duration. If the value has no suffix, +// defaultSuffix is appended. If the environment variable is unset, +// defaultDuration is returned. +func ParseDurationEnv(envKey, defaultSuffix string, defaultDuration time.Duration) (time.Duration, error) { + value := os.Getenv(envKey) + if value == "" { + return defaultDuration, nil + } + d, err := ParseDuration(value, defaultSuffix) + if err != nil { + return 0, errors.Wrapf(err, "failed to parse %s", envKey) + } + return d, nil +} diff --git a/internal/apmconfig/env_test.go b/internal/apmconfig/env_test.go new file mode 100644 index 000000000..537cf11cb --- /dev/null +++ b/internal/apmconfig/env_test.go @@ -0,0 +1,34 @@ +package apmconfig_test + +import ( + "os" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/apm-agent-go/internal/apmconfig" +) + +func TestParseDurationEnv(t *testing.T) { + const envKey = "ELASTIC_APM_TEST_DURATION" + os.Setenv(envKey, "") + + d, err := apmconfig.ParseDurationEnv(envKey, "s", 42*time.Second) + assert.NoError(t, err) + assert.Equal(t, 42*time.Second, d) + + os.Setenv(envKey, "5") + d, err = apmconfig.ParseDurationEnv(envKey, "s", 42*time.Second) + assert.NoError(t, err) + assert.Equal(t, 5*time.Second, d) + + os.Setenv(envKey, "5ms") + d, err = apmconfig.ParseDurationEnv(envKey, "s", 42*time.Second) + assert.NoError(t, err) + assert.Equal(t, 5*time.Millisecond, d) + + os.Setenv(envKey, "5") + _, err = apmconfig.ParseDurationEnv(envKey, "", 42*time.Second) + assert.EqualError(t, err, "failed to parse ELASTIC_APM_TEST_DURATION: time: missing unit in duration 5") +} diff --git a/transport/http.go b/transport/http.go index c6e265c7a..7806f9827 100644 --- a/transport/http.go +++ b/transport/http.go @@ -12,9 +12,11 @@ import ( "net/url" "os" "strings" + "time" "github.com/pkg/errors" + "github.com/elastic/apm-agent-go/internal/apmconfig" "github.com/elastic/apm-agent-go/internal/fastjson" "github.com/elastic/apm-agent-go/model" ) @@ -26,6 +28,7 @@ const ( envSecretToken = "ELASTIC_APM_SECRET_TOKEN" envServerURL = "ELASTIC_APM_SERVER_URL" + envServerTimeout = "ELASTIC_APM_SERVER_TIMEOUT" envVerifyServerCert = "ELASTIC_APM_VERIFY_SERVER_CERT" // gzipThresholdBytes is the minimum size of the uncompressed @@ -38,7 +41,8 @@ var ( // in case another package replaces the value later. defaultHTTPTransport = http.DefaultTransport.(*http.Transport) - defaultServerURL = "http://localhost:8200" + defaultServerURL = "http://localhost:8200" + defaultServerTimeout = 30 * time.Second ) // HTTPTransport is an implementation of Transport, sending payloads via @@ -105,6 +109,14 @@ func NewHTTPTransport(serverURL, secretToken string) (*HTTPTransport, error) { } } + timeout, err := apmconfig.ParseDurationEnv(envServerTimeout, "s", defaultServerTimeout) + if err != nil { + return nil, err + } + if timeout > 0 { + client.Timeout = timeout + } + headers := make(http.Header) headers.Set("Content-Type", "application/json") if secretToken == "" { @@ -187,11 +199,12 @@ func (t *HTTPTransport) sendPayload(req *http.Request, op string) error { if err != nil { return errors.Wrapf(err, "sending request for %s failed", op) } - defer resp.Body.Close() switch resp.StatusCode { case http.StatusOK, http.StatusAccepted: + resp.Body.Close() return nil } + defer resp.Body.Close() // apm-server will return 503 Service Unavailable // if the data cannot be published to Elasticsearch, diff --git a/transport/http_test.go b/transport/http_test.go index 53802b5ad..1c0f05efb 100644 --- a/transport/http_test.go +++ b/transport/http_test.go @@ -15,7 +15,9 @@ import ( "os" "sync" "testing" + "time" + "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -26,6 +28,7 @@ import ( func init() { // Don't let the environment influence tests. + os.Setenv("ELASTIC_APM_SERVER_TIMEOUT", "") os.Setenv("ELASTIC_APM_SERVER_URL", "") os.Setenv("ELASTIC_APM_SECRET_TOKEN", "") os.Setenv("ELASTIC_APM_VERIFY_SERVER_CERT", "") @@ -229,6 +232,28 @@ func TestHTTPTransportLargeCompressed(t *testing.T) { assert.Equal(t, string(jw.Bytes()), decoded.String()) } +func TestHTTPTransportServerTimeout(t *testing.T) { + done := make(chan struct{}) + blockingHandler := func(w http.ResponseWriter, req *http.Request) { <-done } + server := httptest.NewServer(http.HandlerFunc(blockingHandler)) + defer server.Close() + defer close(done) + defer patchEnv("ELASTIC_APM_SERVER_TIMEOUT", "50ms")() + + before := time.Now() + transport, err := transport.NewHTTPTransport(server.URL, "") + assert.NoError(t, err) + err = transport.SendTransactions(context.Background(), &model.TransactionsPayload{}) + taken := time.Since(before) + assert.Error(t, err) + err = errors.Cause(err) + assert.Implements(t, new(net.Error), err) + assert.True(t, err.(net.Error).Timeout()) + assert.Condition(t, func() bool { + return taken >= 50*time.Millisecond + }) +} + func newHTTPTransport(t *testing.T, handler http.Handler) (*transport.HTTPTransport, *httptest.Server) { server := httptest.NewServer(handler) transport, err := transport.NewHTTPTransport(server.URL, "")