Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

aws/csm: Add support for AWS_CSM_HOST env option #2677

Merged
merged 3 commits into from
Jul 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions aws/csm/address_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// +build go1.7

package csm

import "testing"

func TestAddressWithDefaults(t *testing.T) {
cases := map[string]struct {
Host, Port string
Expect string
}{
"ip": {
Host: "127.0.0.2", Port: "", Expect: "127.0.0.2:31000",
},
"localhost": {
Host: "localhost", Port: "", Expect: "127.0.0.1:31000",
},
"uppercase localhost": {
Host: "LOCALHOST", Port: "", Expect: "127.0.0.1:31000",
},
"port": {
Host: "localhost", Port: "32000", Expect: "127.0.0.1:32000",
},
"ip6": {
Host: "::1", Port: "", Expect: "[::1]:31000",
},
"unset": {
Host: "", Port: "", Expect: "127.0.0.1:31000",
},
}

for name, c := range cases {
t.Run(name, func(t *testing.T) {
actual := AddressWithDefaults(c.Host, c.Port)
if e, a := c.Expect, actual; e != a {
t.Errorf("expect %v, got %v", e, a)
}
})
}
}
65 changes: 44 additions & 21 deletions aws/csm/doc.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,61 @@
// Package csm provides Client Side Monitoring (CSM) which enables sending metrics
// via UDP connection. Using the Start function will enable the reporting of
// metrics on a given port. If Start is called, with different parameters, again,
// a panic will occur.
// Package csm provides the Client Side Monitoring (CSM) client which enables
// sending metrics via UDP connection to the CSM agent. This package provides
// control options, and configuration for the CSM client. The client can be
// controlled manually, or automatically via the SDK's Session configuration.
//
// Pause can be called to pause any metrics publishing on a given port. Sessions
// that have had their handlers modified via InjectHandlers may still be used.
// However, the handlers will act as a no-op meaning no metrics will be published.
// Enabling CSM client via SDK's Session configuration
//
// The CSM client can be enabled automatically via SDK's Session configuration.
// The SDK's session configuration enables the CSM client if the AWS_CSM_PORT
// environment variable is set to a non-empty value.
//
// The configuration options for the CSM client via the SDK's session
// configuration are:
//
// * AWS_CSM_PORT=<port number>
// The port number the CSM agent will receive metrics on.
//
// * AWS_CSM_HOST=<hostname or ip>
// The hostname, or IP address the CSM agent will receive metrics on.
// Without port number.
//
// Manually enabling the CSM client
//
// The CSM client can be started, paused, and resumed manually. The Start
// function will enable the CSM client to publish metrics to the CSM agent. It
// is safe to call Start concurrently, but if Start is called additional times
// with different ClientID or address it will panic.
//
// Example:
// r, err := csm.Start("clientID", ":31000")
// if err != nil {
// panic(fmt.Errorf("failed starting CSM: %v", err))
// }
//
// When controlling the CSM client manually, you must also inject its request
// handlers into the SDK's Session configuration for the SDK's API clients to
// publish metrics.
//
// sess, err := session.NewSession(&aws.Config{})
// if err != nil {
// panic(fmt.Errorf("failed loading session: %v", err))
// }
//
// // Add CSM client's metric publishing request handlers to the SDK's
// // Session Configuration.
// r.InjectHandlers(&sess.Handlers)
//
// client := s3.New(sess)
// resp, err := client.GetObject(&s3.GetObjectInput{
// Bucket: aws.String("bucket"),
// Key: aws.String("key"),
// })
// Controlling CSM client
//
// Once the CSM client has been enabled the Get function will return a Reporter
// value that you can use to pause and resume the metrics published to the CSM
// agent. If Get function is called before the reporter is enabled with the
// Start function or via SDK's Session configuration nil will be returned.
//
// The Pause method can be called to stop the CSM client publishing metrics to
// the CSM agent. The Continue method will resume metric publishing.
//
// // Get the CSM client Reporter.
// r := csm.Get()
//
// // Will pause monitoring
// r.Pause()
Expand All @@ -35,12 +66,4 @@
//
// // Resume monitoring
// r.Continue()
//
// Start returns a Reporter that is used to enable or disable monitoring. If
// access to the Reporter is required later, calling Get will return the Reporter
// singleton.
//
// Example:
// r := csm.Get()
// r.Continue()
package csm
34 changes: 28 additions & 6 deletions aws/csm/enable.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,48 @@ package csm

import (
"fmt"
"strings"
"sync"
)

var (
lock sync.Mutex
)

// Client side metric handler names
const (
APICallMetricHandlerName = "awscsm.SendAPICallMetric"
APICallAttemptMetricHandlerName = "awscsm.SendAPICallAttemptMetric"
// DefaultPort is used when no port is specified.
DefaultPort = "31000"

// DefaultHost is the host that will be used when none is specified.
DefaultHost = "127.0.0.1"
)

// Start will start the a long running go routine to capture
// AddressWithDefaults returns a CSM address built from the host and port
// values. If the host or port is not set, default values will be used
// instead. If host is "localhost" it will be replaced with "127.0.0.1".
func AddressWithDefaults(host, port string) string {
if len(host) == 0 || strings.EqualFold(host, "localhost") {
host = DefaultHost
}

if len(port) == 0 {
port = DefaultPort
}

// Only IP6 host can contain a colon
if strings.Contains(host, ":") {
return "[" + host + "]:" + port
}

return host + ":" + port
}

// Start will start a long running go routine to capture
// client side metrics. Calling start multiple time will only
// start the metric listener once and will panic if a different
// client ID or port is passed in.
//
// Example:
// r, err := csm.Start("clientID", "127.0.0.1:8094")
// r, err := csm.Start("clientID", "127.0.0.1:31000")
// if err != nil {
// panic(fmt.Errorf("expected no error, but received %v", err))
// }
Expand Down
25 changes: 15 additions & 10 deletions aws/csm/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,6 @@ import (
"github.com/aws/aws-sdk-go/aws/request"
)

const (
// DefaultPort is used when no port is specified
DefaultPort = "31000"
)

// Reporter will gather metrics of API requests made and
// send those metrics to the CSM endpoint.
type Reporter struct {
Expand Down Expand Up @@ -190,8 +185,9 @@ func (rep *Reporter) start() {
}
}

// Pause will pause the metric channel preventing any new metrics from
// being added.
// Pause will pause the metric channel preventing any new metrics from being
// added. It is safe to call concurrently with other calls to Pause, but if
// called concurently with Continue can lead to unexpected state.
func (rep *Reporter) Pause() {
lock.Lock()
defer lock.Unlock()
Expand All @@ -203,8 +199,9 @@ func (rep *Reporter) Pause() {
rep.close()
}

// Continue will reopen the metric channel and allow for monitoring
// to be resumed.
// Continue will reopen the metric channel and allow for monitoring to be
// resumed. It is safe to call concurrently with other calls to Continue, but
// if called concurently with Pause can lead to unexpected state.
func (rep *Reporter) Continue() {
lock.Lock()
defer lock.Unlock()
Expand All @@ -219,10 +216,18 @@ func (rep *Reporter) Continue() {
rep.metricsCh.Continue()
}

// Client side metric handler names
const (
APICallMetricHandlerName = "awscsm.SendAPICallMetric"
APICallAttemptMetricHandlerName = "awscsm.SendAPICallAttemptMetric"
)

// InjectHandlers will will enable client side metrics and inject the proper
// handlers to handle how metrics are sent.
//
// Example:
// InjectHandlers is NOT safe to call concurrently. Calling InjectHandlers
// multiple times may lead to unexpected behavior, (e.g. duplicate metrics).
//
// // Start must be called in order to inject the correct handlers
// r, err := csm.Start("clientID", "127.0.0.1:8094")
// if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions aws/session/env_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ type envConfig struct {
CSMEnabled bool
CSMPort string
CSMClientID string
CSMHost string

enableEndpointDiscovery string
// Enables endpoint discovery via environment variables.
Expand All @@ -114,6 +115,9 @@ var (
csmEnabledEnvKey = []string{
"AWS_CSM_ENABLED",
}
csmHostEnvKey = []string{
"AWS_CSM_HOST",
}
csmPortEnvKey = []string{
"AWS_CSM_PORT",
}
Expand Down Expand Up @@ -184,6 +188,7 @@ func envConfigLoad(enableSharedConfig bool) envConfig {

// CSM environment variables
setFromEnvVal(&cfg.csmEnabled, csmEnabledEnvKey)
setFromEnvVal(&cfg.CSMHost, csmHostEnvKey)
setFromEnvVal(&cfg.CSMPort, csmPortEnvKey)
setFromEnvVal(&cfg.CSMClientID, csmClientIDEnvKey)
cfg.CSMEnabled = len(cfg.csmEnabled) > 0
Expand Down
33 changes: 25 additions & 8 deletions aws/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package session
import (
"crypto/tls"
"crypto/x509"
"fmt"
"io"
"io/ioutil"
"net/http"
Expand Down Expand Up @@ -104,7 +105,15 @@ func New(cfgs ...*aws.Config) *Session {

s := deprecatedNewSession(cfgs...)
if envCfg.CSMEnabled {
enableCSM(&s.Handlers, envCfg.CSMClientID, envCfg.CSMPort, s.Config.Logger)
err := enableCSM(&s.Handlers, envCfg.CSMClientID,
envCfg.CSMHost, envCfg.CSMPort, s.Config.Logger)
if err != nil {
err = fmt.Errorf("failed to enable CSM, %v", err)
s.Config.Logger.Log("ERROR:", err.Error())
s.Handlers.Validate.PushBack(func(r *request.Request) {
r.Error = err
})
}
}

return s
Expand Down Expand Up @@ -338,17 +347,21 @@ func deprecatedNewSession(cfgs ...*aws.Config) *Session {
return s
}

func enableCSM(handlers *request.Handlers, clientID string, port string, logger aws.Logger) {
logger.Log("Enabling CSM")
if len(port) == 0 {
port = csm.DefaultPort
func enableCSM(handlers *request.Handlers,
clientID, host, port string,
logger aws.Logger,
) error {
if logger != nil {
logger.Log("Enabling CSM")
}

r, err := csm.Start(clientID, "127.0.0.1:"+port)
r, err := csm.Start(clientID, csm.AddressWithDefaults(host, port))
if err != nil {
return
return err
}
r.InjectHandlers(handlers)

return nil
}

func newSession(opts Options, envCfg envConfig, cfgs ...*aws.Config) (*Session, error) {
Expand Down Expand Up @@ -395,7 +408,11 @@ func newSession(opts Options, envCfg envConfig, cfgs ...*aws.Config) (*Session,

initHandlers(s)
if envCfg.CSMEnabled {
enableCSM(&s.Handlers, envCfg.CSMClientID, envCfg.CSMPort, s.Config.Logger)
err := enableCSM(&s.Handlers, envCfg.CSMClientID,
envCfg.CSMHost, envCfg.CSMPort, s.Config.Logger)
if err != nil {
return nil, err
}
}

// Setup HTTP client with custom cert bundle if enabled
Expand Down