Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auto Auth Healing for Proxy #26307

Merged
merged 4 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions changelog/26307.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
```release-note:improvement
proxy: Proxy will re-trigger auto auth if the token used for requests has been revoked, has exceeded the number of uses,
or is an otherwise invalid value.
```
176 changes: 93 additions & 83 deletions command/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"sort"
"strings"
"sync"
"sync/atomic"
"time"

systemd "github.com/coreos/go-systemd/daemon"
Expand Down Expand Up @@ -540,6 +541,83 @@ func (c *AgentCommand) Run(args []string) int {
}
}

// Create the AuthHandler, SinkServer, TemplateServer, and ExecServer now so that we can pass AuthHandler struct
// values into the Proxy http.Handler. We will wait to actually start these servers
// once we have configured the handlers for each listener below
authInProgress := &atomic.Bool{}
invalidTokenErrCh := make(chan error)
var ah *auth.AuthHandler
var ss *sink.SinkServer
var ts *template.Server
var es *exec.Server
if method != nil {
enableTemplateTokenCh := len(config.Templates) > 0
enableEnvTemplateTokenCh := len(config.EnvTemplates) > 0

// Auth Handler is going to set its own retry values, so we want to
// work on a copy of the client to not affect other subsystems.
ahClient, err := c.client.CloneWithHeaders()
if err != nil {
c.UI.Error(fmt.Sprintf("Error cloning client for auth handler: %v", err))
return 1
}

// Override the set namespace with the auto-auth specific namespace
if !namespaceSetByEnvironmentVariable && config.AutoAuth.Method.Namespace != "" {
ahClient.SetNamespace(config.AutoAuth.Method.Namespace)
}

if config.DisableIdleConnsAutoAuth {
ahClient.SetMaxIdleConnections(-1)
}

if config.DisableKeepAlivesAutoAuth {
ahClient.SetDisableKeepAlives(true)
}

ah = auth.NewAuthHandler(&auth.AuthHandlerConfig{
Logger: c.logger.Named("auth.handler"),
Client: ahClient,
WrapTTL: config.AutoAuth.Method.WrapTTL,
MinBackoff: config.AutoAuth.Method.MinBackoff,
MaxBackoff: config.AutoAuth.Method.MaxBackoff,
EnableReauthOnNewCredentials: config.AutoAuth.EnableReauthOnNewCredentials,
EnableTemplateTokenCh: enableTemplateTokenCh,
EnableExecTokenCh: enableEnvTemplateTokenCh,
Token: previousToken,
ExitOnError: config.AutoAuth.Method.ExitOnError,
UserAgent: useragent.AgentAutoAuthString(),
MetricsSignifier: "agent",
})

ss = sink.NewSinkServer(&sink.SinkServerConfig{
Logger: c.logger.Named("sink.server"),
Client: ahClient,
ExitAfterAuth: config.ExitAfterAuth,
})

ts = template.NewServer(&template.ServerConfig{
Logger: c.logger.Named("template.server"),
LogLevel: c.logger.GetLevel(),
LogWriter: c.logWriter,
AgentConfig: c.config,
Namespace: templateNamespace,
ExitAfterAuth: config.ExitAfterAuth,
})

es, err = exec.NewServer(&exec.ServerConfig{
AgentConfig: c.config,
Namespace: templateNamespace,
Logger: c.logger.Named("exec.server"),
LogLevel: c.logger.GetLevel(),
LogWriter: c.logWriter,
})
if err != nil {
c.logger.Error("could not create exec server", "error", err)
return 1
}
}

var listeners []net.Listener

// If there are templates, add an in-process listener
Expand Down Expand Up @@ -578,31 +656,28 @@ func (c *AgentCommand) Run(args []string) int {
listeners = append(listeners, ln)

proxyVaultToken := true
var inmemSink sink.Sink
apiProxyLogger.Debug("auto-auth token is allowed to be used; configuring inmem sink")
inmemSink, err := inmem.New(&sink.SinkConfig{
Logger: apiProxyLogger,
}, leaseCache)
if err != nil {
c.UI.Error(fmt.Sprintf("Error creating inmem sink for cache: %v", err))
c.tlsReloadFuncsLock.Unlock()
return 1
}
sinks = append(sinks, &sink.SinkConfig{
Logger: apiProxyLogger,
Sink: inmemSink,
})
if config.APIProxy != nil {
if config.APIProxy.UseAutoAuthToken {
apiProxyLogger.Debug("auto-auth token is allowed to be used; configuring inmem sink")
inmemSink, err = inmem.New(&sink.SinkConfig{
Logger: apiProxyLogger,
}, leaseCache)
if err != nil {
c.UI.Error(fmt.Sprintf("Error creating inmem sink for cache: %v", err))
c.tlsReloadFuncsLock.Unlock()
return 1
}
sinks = append(sinks, &sink.SinkConfig{
Logger: apiProxyLogger,
Sink: inmemSink,
})
}
proxyVaultToken = !config.APIProxy.ForceAutoAuthToken
}

var muxHandler http.Handler
if leaseCache != nil {
muxHandler = cache.ProxyHandler(ctx, apiProxyLogger, leaseCache, inmemSink, proxyVaultToken)
muxHandler = cache.ProxyHandler(ctx, apiProxyLogger, leaseCache, inmemSink, proxyVaultToken, authInProgress, invalidTokenErrCh)
} else {
muxHandler = cache.ProxyHandler(ctx, apiProxyLogger, apiProxy, inmemSink, proxyVaultToken)
muxHandler = cache.ProxyHandler(ctx, apiProxyLogger, apiProxy, inmemSink, proxyVaultToken, authInProgress, invalidTokenErrCh)
}

// Parse 'require_request_header' listener config option, and wrap
Expand Down Expand Up @@ -708,71 +783,6 @@ func (c *AgentCommand) Run(args []string) int {

// Start auto-auth and sink servers
if method != nil {
enableTemplateTokenCh := len(config.Templates) > 0
enableEnvTemplateTokenCh := len(config.EnvTemplates) > 0

// Auth Handler is going to set its own retry values, so we want to
// work on a copy of the client to not affect other subsystems.
ahClient, err := c.client.CloneWithHeaders()
if err != nil {
c.UI.Error(fmt.Sprintf("Error cloning client for auth handler: %v", err))
return 1
}

// Override the set namespace with the auto-auth specific namespace
if !namespaceSetByEnvironmentVariable && config.AutoAuth.Method.Namespace != "" {
ahClient.SetNamespace(config.AutoAuth.Method.Namespace)
}

if config.DisableIdleConnsAutoAuth {
ahClient.SetMaxIdleConnections(-1)
}

if config.DisableKeepAlivesAutoAuth {
ahClient.SetDisableKeepAlives(true)
}

ah := auth.NewAuthHandler(&auth.AuthHandlerConfig{
Logger: c.logger.Named("auth.handler"),
Client: ahClient,
WrapTTL: config.AutoAuth.Method.WrapTTL,
MinBackoff: config.AutoAuth.Method.MinBackoff,
MaxBackoff: config.AutoAuth.Method.MaxBackoff,
EnableReauthOnNewCredentials: config.AutoAuth.EnableReauthOnNewCredentials,
EnableTemplateTokenCh: enableTemplateTokenCh,
EnableExecTokenCh: enableEnvTemplateTokenCh,
Token: previousToken,
ExitOnError: config.AutoAuth.Method.ExitOnError,
UserAgent: useragent.AgentAutoAuthString(),
MetricsSignifier: "agent",
})

ss := sink.NewSinkServer(&sink.SinkServerConfig{
Logger: c.logger.Named("sink.server"),
Client: ahClient,
ExitAfterAuth: config.ExitAfterAuth,
})

ts := template.NewServer(&template.ServerConfig{
Logger: c.logger.Named("template.server"),
LogLevel: c.logger.GetLevel(),
LogWriter: c.logWriter,
AgentConfig: c.config,
Namespace: templateNamespace,
ExitAfterAuth: config.ExitAfterAuth,
})

es, err := exec.NewServer(&exec.ServerConfig{
AgentConfig: c.config,
Namespace: templateNamespace,
Logger: c.logger.Named("exec.server"),
LogLevel: c.logger.GetLevel(),
LogWriter: c.logWriter,
})
if err != nil {
c.logger.Error("could not create exec server", "error", err)
return 1
}

g.Add(func() error {
return ah.Run(ctx, method)
Expand Down
2 changes: 1 addition & 1 deletion command/agent/cache_end_to_end_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func TestCache_UsingAutoAuthToken(t *testing.T) {
mux.Handle(consts.AgentPathCacheClear, leaseCache.HandleCacheClear(ctx))

// Passing a non-nil inmemsink tells the agent to use the auto-auth token
mux.Handle("/", cache.ProxyHandler(ctx, cacheLogger, leaseCache, inmemSink, true))
mux.Handle("/", cache.ProxyHandler(ctx, cacheLogger, leaseCache, inmemSink, true, nil, nil))
server := &http.Server{
Handler: mux,
ReadHeaderTimeout: 10 * time.Second,
Expand Down
2 changes: 1 addition & 1 deletion command/agentproxyshared/cache/api_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
gohttp "net/http"
"sync"

hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-retryablehttp"
"github.com/hashicorp/vault/api"
"github.com/hashicorp/vault/helper/namespace"
Expand Down
4 changes: 2 additions & 2 deletions command/agentproxyshared/cache/api_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,9 +285,9 @@ func setupClusterAndAgentCommon(ctx context.Context, t *testing.T, coreConfig *v

mux.Handle("/agent/v1/cache-clear", leaseCache.HandleCacheClear(ctx))

mux.Handle("/", ProxyHandler(ctx, cacheLogger, leaseCache, nil, true))
mux.Handle("/", ProxyHandler(ctx, cacheLogger, leaseCache, nil, true, nil, nil))
} else {
mux.Handle("/", ProxyHandler(ctx, apiProxyLogger, apiProxy, nil, true))
mux.Handle("/", ProxyHandler(ctx, apiProxyLogger, apiProxy, nil, true, nil, nil))
}

server := &http.Server{
Expand Down
4 changes: 2 additions & 2 deletions command/agentproxyshared/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func TestCache_AutoAuthTokenStripping(t *testing.T) {
mux := http.NewServeMux()
mux.Handle(consts.AgentPathCacheClear, leaseCache.HandleCacheClear(ctx))

mux.Handle("/", ProxyHandler(ctx, cacheLogger, leaseCache, mock.NewSink("testid"), true))
mux.Handle("/", ProxyHandler(ctx, cacheLogger, leaseCache, mock.NewSink("testid"), true, nil, nil))
server := &http.Server{
Handler: mux,
ReadHeaderTimeout: 10 * time.Second,
Expand Down Expand Up @@ -170,7 +170,7 @@ func TestCache_AutoAuthClientTokenProxyStripping(t *testing.T) {
mux := http.NewServeMux()
// mux.Handle(consts.AgentPathCacheClear, leaseCache.HandleCacheClear(ctx))

mux.Handle("/", ProxyHandler(ctx, cacheLogger, leaseCache, mock.NewSink(realToken), false))
mux.Handle("/", ProxyHandler(ctx, cacheLogger, leaseCache, mock.NewSink(realToken), false, nil, nil))
server := &http.Server{
Handler: mux,
ReadHeaderTimeout: 10 * time.Second,
Expand Down
26 changes: 22 additions & 4 deletions command/agentproxyshared/cache/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"io"
"io/ioutil"
"net/http"
"strings"
"sync/atomic"
"time"

"github.com/armon/go-metrics"
Expand All @@ -23,7 +25,7 @@ import (
"github.com/hashicorp/vault/sdk/logical"
)

func ProxyHandler(ctx context.Context, logger hclog.Logger, proxier Proxier, inmemSink sink.Sink, proxyVaultToken bool) http.Handler {
func ProxyHandler(ctx context.Context, logger hclog.Logger, proxier Proxier, inmemSink sink.Sink, proxyVaultToken bool, authInProgress *atomic.Bool, invalidTokenErrCh chan error) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
logger.Info("received request", "method", r.Method, "path", r.URL.Path)

Expand All @@ -33,9 +35,13 @@ func ProxyHandler(ctx context.Context, logger hclog.Logger, proxier Proxier, inm

token := r.Header.Get(consts.AuthHeaderName)

if token == "" && inmemSink != nil {
logger.Debug("using auto auth token", "method", r.Method, "path", r.URL.Path)
token = inmemSink.(sink.SinkReader).Token()
var autoAuthToken string
if inmemSink != nil {
autoAuthToken = inmemSink.(sink.SinkReader).Token()
if token == "" {
logger.Debug("using auto auth token", "method", r.Method, "path", r.URL.Path)
token = autoAuthToken
}
}

// Parse and reset body.
Expand All @@ -59,10 +65,22 @@ func ProxyHandler(ctx context.Context, logger hclog.Logger, proxier Proxier, inm
if err != nil {
// If this is an api.Response error, don't wrap the response.
if resp != nil && resp.Response.Error() != nil {
responseErrMessage := resp.Response.Error()
copyHeader(w.Header(), resp.Response.Header)
w.WriteHeader(resp.Response.StatusCode)
io.Copy(w, resp.Response.Body)
metrics.IncrCounter([]string{"agent", "proxy", "client_error"}, 1)
// Re-trigger auto auth if the token is the same as the auto auth token
if resp.Response.StatusCode == 403 && strings.Contains(responseErrMessage.Error(), logical.ErrInvalidToken.Error()) &&
autoAuthToken == token && !authInProgress.Load() {
// Drain the error channel first
logger.Info("proxy received an invalid token error")
select {
case <-invalidTokenErrCh:
default:
}
invalidTokenErrCh <- resp.Response.Error()
}
} else {
metrics.IncrCounter([]string{"agent", "proxy", "error"}, 1)
logical.RespondError(w, http.StatusInternalServerError, fmt.Errorf("failed to get the response: %w", err))
Expand Down
27 changes: 25 additions & 2 deletions command/agentproxyshared/cache/static_secret_cache_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"sync/atomic"
"time"

"github.com/hashicorp/go-hclog"
Expand All @@ -20,6 +23,7 @@ import (
"github.com/hashicorp/vault/command/agentproxyshared/cache/cachememdb"
"github.com/hashicorp/vault/command/agentproxyshared/sink"
"github.com/hashicorp/vault/helper/useragent"
"github.com/hashicorp/vault/sdk/logical"
"golang.org/x/exp/maps"
"nhooyr.io/websocket"
)
Expand Down Expand Up @@ -359,13 +363,23 @@ func (updater *StaticSecretCacheUpdater) openWebSocketConnection(ctx context.Con
}

if err != nil {
errMessage := err.Error()
if resp != nil {
if resp.StatusCode == http.StatusNotFound {
return nil, fmt.Errorf("received 404 when opening web socket to %s, ensure Vault is Enterprise version 1.16 or above", wsURL)
}
if resp.StatusCode == http.StatusForbidden {
var errBytes []byte
errBytes, err = io.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
return nil, fmt.Errorf("error occured when attempting to read error response from Vault server")
}
errMessage = string(errBytes)
}
}
return nil, fmt.Errorf("error returned when opening event stream web socket to %s, ensure auto-auth token"+
" has correct permissions and Vault is Enterprise version 1.16 or above: %w", wsURL, err)
" has correct permissions and Vault is Enterprise version 1.16 or above: %s", wsURL, errMessage)
}

if conn == nil {
Expand All @@ -379,7 +393,7 @@ func (updater *StaticSecretCacheUpdater) openWebSocketConnection(ctx context.Con
// Once a token is provided to the sink, we will start the websocket and start consuming
// events and updating secrets.
// Run will shut down gracefully when the context is cancelled.
func (updater *StaticSecretCacheUpdater) Run(ctx context.Context) error {
func (updater *StaticSecretCacheUpdater) Run(ctx context.Context, authRenewalInProgress *atomic.Bool, invalidTokenErrCh chan error) error {
updater.logger.Info("starting static secret cache updater subsystem")
defer func() {
updater.logger.Info("static secret cache updater subsystem stopped")
Expand Down Expand Up @@ -415,6 +429,15 @@ tokenLoop:
if err != nil {
updater.logger.Error("error occurred during streaming static secret cache update events", "err", err)
shouldBackoff = true
if strings.Contains(err.Error(), logical.ErrInvalidToken.Error()) && !authRenewalInProgress.Load() {
// Drain the channel in case there is an error that has already been sent but not received
select {
case <-invalidTokenErrCh:
default:
}
updater.logger.Error("received invalid token error while opening websocket")
invalidTokenErrCh <- err
}
continue
}
}
Expand Down
Loading
Loading