Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VAULT-11510 Vault Agent can start listeners without caching #18137

Merged
merged 11 commits into from
Dec 5, 2022
3 changes: 3 additions & 0 deletions changelog/18137.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
agent: Configured Vault Agent listeners now listen without the need for caching to be configured.
```
253 changes: 148 additions & 105 deletions command/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"sync"
"time"

"github.com/hashicorp/vault/command/agent/sink/inmem"

systemd "github.com/coreos/go-systemd/daemon"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-secure-stdlib/gatedwriter"
Expand All @@ -39,7 +41,6 @@ import (
agentConfig "github.com/hashicorp/vault/command/agent/config"
"github.com/hashicorp/vault/command/agent/sink"
"github.com/hashicorp/vault/command/agent/sink/file"
"github.com/hashicorp/vault/command/agent/sink/inmem"
"github.com/hashicorp/vault/command/agent/template"
"github.com/hashicorp/vault/command/agent/winsvc"
"github.com/hashicorp/vault/helper/logging"
Expand Down Expand Up @@ -421,10 +422,37 @@ func (c *AgentCommand) Run(args []string) int {

enforceConsistency := cache.EnforceConsistencyNever
whenInconsistent := cache.WhenInconsistentFail
if config.APIProxy != nil {
switch config.APIProxy.EnforceConsistency {
case "always":
enforceConsistency = cache.EnforceConsistencyAlways
case "never", "":
default:
c.UI.Error(fmt.Sprintf("Unknown api_proxy setting for enforce_consistency: %q", config.APIProxy.EnforceConsistency))
return 1
}

switch config.APIProxy.WhenInconsistent {
case "retry":
whenInconsistent = cache.WhenInconsistentRetry
case "forward":
whenInconsistent = cache.WhenInconsistentForward
case "fail", "":
default:
c.UI.Error(fmt.Sprintf("Unknown api_proxy setting for when_inconsistent: %q", config.APIProxy.WhenInconsistent))
return 1
}
}
// Keep Cache configuration for legacy reasons, but error if defined alongside API Proxy
if config.Cache != nil {
switch config.Cache.EnforceConsistency {
case "always":
enforceConsistency = cache.EnforceConsistencyAlways
if enforceConsistency != cache.EnforceConsistencyNever {
c.UI.Error("enforce_consistency configured in both api_proxy and cache blocks. Please remove this configuration from the cache block.")
return 1
} else {
enforceConsistency = cache.EnforceConsistencyAlways
}
case "never", "":
default:
c.UI.Error(fmt.Sprintf("Unknown cache setting for enforce_consistency: %q", config.Cache.EnforceConsistency))
Expand All @@ -433,9 +461,19 @@ func (c *AgentCommand) Run(args []string) int {

switch config.Cache.WhenInconsistent {
case "retry":
whenInconsistent = cache.WhenInconsistentRetry
if whenInconsistent != cache.WhenInconsistentFail {
c.UI.Error("enforce_consistency configured in both api_proxy and cache blocks. Please remove this configuration from the cache block.")
VioletHynes marked this conversation as resolved.
Show resolved Hide resolved
return 1
} else {
whenInconsistent = cache.WhenInconsistentRetry
}
case "forward":
whenInconsistent = cache.WhenInconsistentForward
if whenInconsistent != cache.WhenInconsistentFail {
c.UI.Error("enforce_consistency configured in both api_proxy and cache blocks. Please remove this configuration from the cache block.")
VioletHynes marked this conversation as resolved.
Show resolved Hide resolved
return 1
} else {
whenInconsistent = cache.WhenInconsistentForward
}
case "fail", "":
default:
c.UI.Error(fmt.Sprintf("Unknown cache setting for when_inconsistent: %q", config.Cache.WhenInconsistent))
Expand Down Expand Up @@ -466,35 +504,38 @@ func (c *AgentCommand) Run(args []string) int {

var leaseCache *cache.LeaseCache
var previousToken string
// Parse agent listener configurations
if config.Cache != nil {
cacheLogger := c.logger.Named("cache")

proxyClient, err := client.CloneWithHeaders()
if err != nil {
c.UI.Error(fmt.Sprintf("Error cloning client for caching: %v", err))
return 1
}
proxyClient, err := client.CloneWithHeaders()
if err != nil {
c.UI.Error(fmt.Sprintf("Error cloning client for proxying: %v", err))
return 1
}

if config.DisableIdleConnsCaching {
proxyClient.SetMaxIdleConnections(-1)
}
if config.DisableIdleConnsAPIProxy {
proxyClient.SetMaxIdleConnections(-1)
}

if config.DisableKeepAlivesCaching {
proxyClient.SetDisableKeepAlives(true)
}
if config.DisableKeepAlivesAPIProxy {
proxyClient.SetDisableKeepAlives(true)
}

// Create the API proxier
apiProxy, err := cache.NewAPIProxy(&cache.APIProxyConfig{
Client: proxyClient,
Logger: cacheLogger.Named("apiproxy"),
EnforceConsistency: enforceConsistency,
WhenInconsistentAction: whenInconsistent,
})
if err != nil {
c.UI.Error(fmt.Sprintf("Error creating API proxy: %v", err))
return 1
}
apiProxyLogger := c.logger.Named("apiproxy")

// The API proxy to be used, if listeners are configured
apiProxy, err := cache.NewAPIProxy(&cache.APIProxyConfig{
Client: proxyClient,
Logger: apiProxyLogger,
EnforceConsistency: enforceConsistency,
WhenInconsistentAction: whenInconsistent,
})
if err != nil {
c.UI.Error(fmt.Sprintf("Error creating API proxy: %v", err))
return 1
}

// Parse agent cache configurations
if config.Cache != nil {
cacheLogger := c.logger.Named("cache")

// Create the lease cache proxier and set its underlying proxier to
// the API proxier.
Expand Down Expand Up @@ -654,103 +695,105 @@ func (c *AgentCommand) Run(args []string) int {
leaseCache.SetPersistentStorage(ps)
}
}
}

var inmemSink sink.Sink
if config.Cache.UseAutoAuthToken {
cacheLogger.Debug("auto-auth token is allowed to be used; configuring inmem sink")
inmemSink, err = inmem.New(&sink.SinkConfig{
Logger: cacheLogger,
}, leaseCache)
var listeners []net.Listener

// If there are templates, add an in-process listener
if len(config.Templates) > 0 {
config.Listeners = append(config.Listeners, &configutil.Listener{Type: listenerutil.BufConnType})
}
for i, lnConfig := range config.Listeners {
var ln net.Listener
var tlsConf *tls.Config

if lnConfig.Type == listenerutil.BufConnType {
inProcListener := bufconn.Listen(1024 * 1024)
if config.Cache != nil {
config.Cache.InProcDialer = listenerutil.NewBufConnWrapper(inProcListener)
}
ln = inProcListener
} else {
ln, tlsConf, err = cache.StartListener(lnConfig)
if err != nil {
c.UI.Error(fmt.Sprintf("Error creating inmem sink for cache: %v", err))
c.UI.Error(fmt.Sprintf("Error starting listener: %v", err))
return 1
}
sinks = append(sinks, &sink.SinkConfig{
Logger: cacheLogger,
Sink: inmemSink,
})
}

proxyVaultToken := !config.Cache.ForceAutoAuthToken

// Create the request handler
cacheHandler := cache.Handler(ctx, cacheLogger, leaseCache, inmemSink, proxyVaultToken)
listeners = append(listeners, ln)

var listeners []net.Listener

// If there are templates, add an in-process listener
if len(config.Templates) > 0 {
config.Listeners = append(config.Listeners, &configutil.Listener{Type: listenerutil.BufConnType})
}
for i, lnConfig := range config.Listeners {
var ln net.Listener
var tlsConf *tls.Config

if lnConfig.Type == listenerutil.BufConnType {
inProcListener := bufconn.Listen(1024 * 1024)
config.Cache.InProcDialer = listenerutil.NewBufConnWrapper(inProcListener)
ln = inProcListener
} else {
ln, tlsConf, err = cache.StartListener(lnConfig)
proxyVaultToken := true
var inmemSink sink.Sink
if config.APIProxy != nil {
if config.APIProxy.UseAutoAuthToken {
apiProxyLogger.Debug("auto-auth token is allowed to be used; configuring inmem sink")
inmemSink, err = inmem.New(&sink.SinkConfig{
Logger: apiProxyLogger,
}, leaseCache)
if err != nil {
c.UI.Error(fmt.Sprintf("Error starting listener: %v", err))
c.UI.Error(fmt.Sprintf("Error creating inmem sink for cache: %v", err))
return 1
}
sinks = append(sinks, &sink.SinkConfig{
Logger: apiProxyLogger,
Sink: inmemSink,
})
}
proxyVaultToken = !config.APIProxy.ForceAutoAuthToken
}

listeners = append(listeners, ln)
muxHandler := cache.ProxyHandler(ctx, apiProxyLogger, apiProxy, inmemSink, proxyVaultToken)

// Parse 'require_request_header' listener config option, and wrap
// the request handler if necessary
muxHandler := cacheHandler
if lnConfig.RequireRequestHeader && ("metrics_only" != lnConfig.Role) {
muxHandler = verifyRequestHeader(muxHandler)
}
// Parse 'require_request_header' listener config option, and wrap
// the request handler if necessary
if lnConfig.RequireRequestHeader && ("metrics_only" != lnConfig.Role) {
muxHandler = verifyRequestHeader(muxHandler)
}

// Create a muxer and add paths relevant for the lease cache layer
mux := http.NewServeMux()
quitEnabled := lnConfig.AgentAPI != nil && lnConfig.AgentAPI.EnableQuit
// Create a muxer and add paths relevant for the lease cache layer
mux := http.NewServeMux()
quitEnabled := lnConfig.AgentAPI != nil && lnConfig.AgentAPI.EnableQuit

mux.Handle(consts.AgentPathMetrics, c.handleMetrics())
if "metrics_only" != lnConfig.Role {
mux.Handle(consts.AgentPathCacheClear, leaseCache.HandleCacheClear(ctx))
mux.Handle(consts.AgentPathQuit, c.handleQuit(quitEnabled))
mux.Handle("/", muxHandler)
}

scheme := "https://"
if tlsConf == nil {
scheme = "http://"
}
if ln.Addr().Network() == "unix" {
scheme = "unix://"
}
mux.Handle(consts.AgentPathMetrics, c.handleMetrics())
if "metrics_only" != lnConfig.Role {
mux.Handle(consts.AgentPathCacheClear, leaseCache.HandleCacheClear(ctx))
mux.Handle(consts.AgentPathQuit, c.handleQuit(quitEnabled))
mux.Handle("/", muxHandler)
}

infoKey := fmt.Sprintf("api address %d", i+1)
info[infoKey] = scheme + ln.Addr().String()
infoKeys = append(infoKeys, infoKey)

server := &http.Server{
Addr: ln.Addr().String(),
TLSConfig: tlsConf,
Handler: mux,
ReadHeaderTimeout: 10 * time.Second,
ReadTimeout: 30 * time.Second,
IdleTimeout: 5 * time.Minute,
ErrorLog: cacheLogger.StandardLogger(nil),
}
scheme := "https://"
if tlsConf == nil {
scheme = "http://"
}
if ln.Addr().Network() == "unix" {
scheme = "unix://"
}

go server.Serve(ln)
infoKey := fmt.Sprintf("api address %d", i+1)
info[infoKey] = scheme + ln.Addr().String()
infoKeys = append(infoKeys, infoKey)

server := &http.Server{
Addr: ln.Addr().String(),
TLSConfig: tlsConf,
Handler: mux,
ReadHeaderTimeout: 10 * time.Second,
ReadTimeout: 30 * time.Second,
IdleTimeout: 5 * time.Minute,
ErrorLog: apiProxyLogger.StandardLogger(nil),
}

// Ensure that listeners are closed at all the exits
listenerCloseFunc := func() {
for _, ln := range listeners {
ln.Close()
}
go server.Serve(ln)
}

// Ensure that listeners are closed at all the exits
listenerCloseFunc := func() {
for _, ln := range listeners {
ln.Close()
}
defer c.cleanupGuard.Do(listenerCloseFunc)
}
defer c.cleanupGuard.Do(listenerCloseFunc)

// Inform any tests that the server is ready
if c.startedCh != nil {
Expand Down
Loading