Skip to content

Commit

Permalink
aws/csm: Add support for AWS_CSM_HOST env option (#2677)
Browse files Browse the repository at this point in the history
Adds support for a host to be configured for the SDK's metric reporting
Client Side Metrics (CSM) client via the AWS_CSM_HOST environment
variable.
  • Loading branch information
jasdel authored Jul 10, 2019
1 parent 9d7c93d commit 7503c91
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 45 deletions.
40 changes: 40 additions & 0 deletions aws/csm/address_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// +build go1.7

package csm

import "testing"

func TestAddressWithDefaults(t *testing.T) {
cases := map[string]struct {
Host, Port string
Expect string
}{
"ip": {
Host: "127.0.0.2", Port: "", Expect: "127.0.0.2:31000",
},
"localhost": {
Host: "localhost", Port: "", Expect: "127.0.0.1:31000",
},
"uppercase localhost": {
Host: "LOCALHOST", Port: "", Expect: "127.0.0.1:31000",
},
"port": {
Host: "localhost", Port: "32000", Expect: "127.0.0.1:32000",
},
"ip6": {
Host: "::1", Port: "", Expect: "[::1]:31000",
},
"unset": {
Host: "", Port: "", Expect: "127.0.0.1:31000",
},
}

for name, c := range cases {
t.Run(name, func(t *testing.T) {
actual := AddressWithDefaults(c.Host, c.Port)
if e, a := c.Expect, actual; e != a {
t.Errorf("expect %v, got %v", e, a)
}
})
}
}
65 changes: 44 additions & 21 deletions aws/csm/doc.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,61 @@
// Package csm provides Client Side Monitoring (CSM) which enables sending metrics
// via UDP connection. Using the Start function will enable the reporting of
// metrics on a given port. If Start is called, with different parameters, again,
// a panic will occur.
// Package csm provides the Client Side Monitoring (CSM) client which enables
// sending metrics via UDP connection to the CSM agent. This package provides
// control options, and configuration for the CSM client. The client can be
// controlled manually, or automatically via the SDK's Session configuration.
//
// Pause can be called to pause any metrics publishing on a given port. Sessions
// that have had their handlers modified via InjectHandlers may still be used.
// However, the handlers will act as a no-op meaning no metrics will be published.
// Enabling CSM client via SDK's Session configuration
//
// The CSM client can be enabled automatically via SDK's Session configuration.
// The SDK's session configuration enables the CSM client if the AWS_CSM_PORT
// environment variable is set to a non-empty value.
//
// The configuration options for the CSM client via the SDK's session
// configuration are:
//
// * AWS_CSM_PORT=<port number>
// The port number the CSM agent will receive metrics on.
//
// * AWS_CSM_HOST=<hostname or ip>
// The hostname, or IP address the CSM agent will receive metrics on.
// Without port number.
//
// Manually enabling the CSM client
//
// The CSM client can be started, paused, and resumed manually. The Start
// function will enable the CSM client to publish metrics to the CSM agent. It
// is safe to call Start concurrently, but if Start is called additional times
// with different ClientID or address it will panic.
//
// Example:
// r, err := csm.Start("clientID", ":31000")
// if err != nil {
// panic(fmt.Errorf("failed starting CSM: %v", err))
// }
//
// When controlling the CSM client manually, you must also inject its request
// handlers into the SDK's Session configuration for the SDK's API clients to
// publish metrics.
//
// sess, err := session.NewSession(&aws.Config{})
// if err != nil {
// panic(fmt.Errorf("failed loading session: %v", err))
// }
//
// // Add CSM client's metric publishing request handlers to the SDK's
// // Session Configuration.
// r.InjectHandlers(&sess.Handlers)
//
// client := s3.New(sess)
// resp, err := client.GetObject(&s3.GetObjectInput{
// Bucket: aws.String("bucket"),
// Key: aws.String("key"),
// })
// Controlling CSM client
//
// Once the CSM client has been enabled the Get function will return a Reporter
// value that you can use to pause and resume the metrics published to the CSM
// agent. If Get function is called before the reporter is enabled with the
// Start function or via SDK's Session configuration nil will be returned.
//
// The Pause method can be called to stop the CSM client publishing metrics to
// the CSM agent. The Continue method will resume metric publishing.
//
// // Get the CSM client Reporter.
// r := csm.Get()
//
// // Will pause monitoring
// r.Pause()
Expand All @@ -35,12 +66,4 @@
//
// // Resume monitoring
// r.Continue()
//
// Start returns a Reporter that is used to enable or disable monitoring. If
// access to the Reporter is required later, calling Get will return the Reporter
// singleton.
//
// Example:
// r := csm.Get()
// r.Continue()
package csm
34 changes: 28 additions & 6 deletions aws/csm/enable.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,48 @@ package csm

import (
"fmt"
"strings"
"sync"
)

var (
lock sync.Mutex
)

// Client side metric handler names
const (
APICallMetricHandlerName = "awscsm.SendAPICallMetric"
APICallAttemptMetricHandlerName = "awscsm.SendAPICallAttemptMetric"
// DefaultPort is used when no port is specified.
DefaultPort = "31000"

// DefaultHost is the host that will be used when none is specified.
DefaultHost = "127.0.0.1"
)

// Start will start the a long running go routine to capture
// AddressWithDefaults returns a CSM address built from the host and port
// values. If the host or port is not set, default values will be used
// instead. If host is "localhost" it will be replaced with "127.0.0.1".
func AddressWithDefaults(host, port string) string {
if len(host) == 0 || strings.EqualFold(host, "localhost") {
host = DefaultHost
}

if len(port) == 0 {
port = DefaultPort
}

// Only IP6 host can contain a colon
if strings.Contains(host, ":") {
return "[" + host + "]:" + port
}

return host + ":" + port
}

// Start will start a long running go routine to capture
// client side metrics. Calling start multiple time will only
// start the metric listener once and will panic if a different
// client ID or port is passed in.
//
// Example:
// r, err := csm.Start("clientID", "127.0.0.1:8094")
// r, err := csm.Start("clientID", "127.0.0.1:31000")
// if err != nil {
// panic(fmt.Errorf("expected no error, but received %v", err))
// }
Expand Down
25 changes: 15 additions & 10 deletions aws/csm/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,6 @@ import (
"github.com/aws/aws-sdk-go/aws/request"
)

const (
// DefaultPort is used when no port is specified
DefaultPort = "31000"
)

// Reporter will gather metrics of API requests made and
// send those metrics to the CSM endpoint.
type Reporter struct {
Expand Down Expand Up @@ -190,8 +185,9 @@ func (rep *Reporter) start() {
}
}

// Pause will pause the metric channel preventing any new metrics from
// being added.
// Pause will pause the metric channel preventing any new metrics from being
// added. It is safe to call concurrently with other calls to Pause, but if
// called concurently with Continue can lead to unexpected state.
func (rep *Reporter) Pause() {
lock.Lock()
defer lock.Unlock()
Expand All @@ -203,8 +199,9 @@ func (rep *Reporter) Pause() {
rep.close()
}

// Continue will reopen the metric channel and allow for monitoring
// to be resumed.
// Continue will reopen the metric channel and allow for monitoring to be
// resumed. It is safe to call concurrently with other calls to Continue, but
// if called concurently with Pause can lead to unexpected state.
func (rep *Reporter) Continue() {
lock.Lock()
defer lock.Unlock()
Expand All @@ -219,10 +216,18 @@ func (rep *Reporter) Continue() {
rep.metricsCh.Continue()
}

// Client side metric handler names
const (
APICallMetricHandlerName = "awscsm.SendAPICallMetric"
APICallAttemptMetricHandlerName = "awscsm.SendAPICallAttemptMetric"
)

// InjectHandlers will will enable client side metrics and inject the proper
// handlers to handle how metrics are sent.
//
// Example:
// InjectHandlers is NOT safe to call concurrently. Calling InjectHandlers
// multiple times may lead to unexpected behavior, (e.g. duplicate metrics).
//
// // Start must be called in order to inject the correct handlers
// r, err := csm.Start("clientID", "127.0.0.1:8094")
// if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions aws/session/env_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ type envConfig struct {
CSMEnabled bool
CSMPort string
CSMClientID string
CSMHost string

enableEndpointDiscovery string
// Enables endpoint discovery via environment variables.
Expand All @@ -114,6 +115,9 @@ var (
csmEnabledEnvKey = []string{
"AWS_CSM_ENABLED",
}
csmHostEnvKey = []string{
"AWS_CSM_HOST",
}
csmPortEnvKey = []string{
"AWS_CSM_PORT",
}
Expand Down Expand Up @@ -184,6 +188,7 @@ func envConfigLoad(enableSharedConfig bool) envConfig {

// CSM environment variables
setFromEnvVal(&cfg.csmEnabled, csmEnabledEnvKey)
setFromEnvVal(&cfg.CSMHost, csmHostEnvKey)
setFromEnvVal(&cfg.CSMPort, csmPortEnvKey)
setFromEnvVal(&cfg.CSMClientID, csmClientIDEnvKey)
cfg.CSMEnabled = len(cfg.csmEnabled) > 0
Expand Down
33 changes: 25 additions & 8 deletions aws/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package session
import (
"crypto/tls"
"crypto/x509"
"fmt"
"io"
"io/ioutil"
"net/http"
Expand Down Expand Up @@ -104,7 +105,15 @@ func New(cfgs ...*aws.Config) *Session {

s := deprecatedNewSession(cfgs...)
if envCfg.CSMEnabled {
enableCSM(&s.Handlers, envCfg.CSMClientID, envCfg.CSMPort, s.Config.Logger)
err := enableCSM(&s.Handlers, envCfg.CSMClientID,
envCfg.CSMHost, envCfg.CSMPort, s.Config.Logger)
if err != nil {
err = fmt.Errorf("failed to enable CSM, %v", err)
s.Config.Logger.Log("ERROR:", err.Error())
s.Handlers.Validate.PushBack(func(r *request.Request) {
r.Error = err
})
}
}

return s
Expand Down Expand Up @@ -338,17 +347,21 @@ func deprecatedNewSession(cfgs ...*aws.Config) *Session {
return s
}

func enableCSM(handlers *request.Handlers, clientID string, port string, logger aws.Logger) {
logger.Log("Enabling CSM")
if len(port) == 0 {
port = csm.DefaultPort
func enableCSM(handlers *request.Handlers,
clientID, host, port string,
logger aws.Logger,
) error {
if logger != nil {
logger.Log("Enabling CSM")
}

r, err := csm.Start(clientID, "127.0.0.1:"+port)
r, err := csm.Start(clientID, csm.AddressWithDefaults(host, port))
if err != nil {
return
return err
}
r.InjectHandlers(handlers)

return nil
}

func newSession(opts Options, envCfg envConfig, cfgs ...*aws.Config) (*Session, error) {
Expand Down Expand Up @@ -395,7 +408,11 @@ func newSession(opts Options, envCfg envConfig, cfgs ...*aws.Config) (*Session,

initHandlers(s)
if envCfg.CSMEnabled {
enableCSM(&s.Handlers, envCfg.CSMClientID, envCfg.CSMPort, s.Config.Logger)
err := enableCSM(&s.Handlers, envCfg.CSMClientID,
envCfg.CSMHost, envCfg.CSMPort, s.Config.Logger)
if err != nil {
return nil, err
}
}

// Setup HTTP client with custom cert bundle if enabled
Expand Down

0 comments on commit 7503c91

Please sign in to comment.