Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VAULT-11510 Vault Agent can start listeners without caching #18137

Merged
merged 11 commits into from
Dec 5, 2022
3 changes: 3 additions & 0 deletions changelog/18137.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
agent: Configured Vault Agent listeners now listen without the need for caching to be configured.
```
169 changes: 95 additions & 74 deletions command/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,23 +466,25 @@ func (c *AgentCommand) Run(args []string) int {

var leaseCache *cache.LeaseCache
var previousToken string
// Parse agent listener configurations
if config.Cache != nil {
cacheLogger := c.logger.Named("cache")
var cacheHandler http.Handler

proxyClient, err := client.CloneWithHeaders()
if err != nil {
c.UI.Error(fmt.Sprintf("Error cloning client for caching: %v", err))
return 1
}
proxyClient, err := client.CloneWithHeaders()
if err != nil {
c.UI.Error(fmt.Sprintf("Error cloning client for proxying: %v", err))
return 1
}

if config.DisableIdleConnsCaching {
proxyClient.SetMaxIdleConnections(-1)
}
if config.DisableIdleConnsAPIProxy {
proxyClient.SetMaxIdleConnections(-1)
}

if config.DisableKeepAlivesCaching {
proxyClient.SetDisableKeepAlives(true)
}
if config.DisableKeepAlivesAPIProxy {
proxyClient.SetDisableKeepAlives(true)
}

// Parse agent cache configurations
if config.Cache != nil {
cacheLogger := c.logger.Named("cache")

// Create the API proxier
apiProxy, err := cache.NewAPIProxy(&cache.APIProxyConfig{
Expand Down Expand Up @@ -674,83 +676,102 @@ func (c *AgentCommand) Run(args []string) int {
proxyVaultToken := !config.Cache.ForceAutoAuthToken

// Create the request handler
cacheHandler := cache.Handler(ctx, cacheLogger, leaseCache, inmemSink, proxyVaultToken)
cacheHandler = cache.CachingProxyHandler(ctx, cacheLogger, leaseCache, inmemSink, proxyVaultToken)
}

var listeners []net.Listener
var listeners []net.Listener

// If there are templates, add an in-process listener
if len(config.Templates) > 0 {
config.Listeners = append(config.Listeners, &configutil.Listener{Type: listenerutil.BufConnType})
}
for i, lnConfig := range config.Listeners {
var ln net.Listener
var tlsConf *tls.Config
// If there are templates, add an in-process listener
if len(config.Templates) > 0 {
config.Listeners = append(config.Listeners, &configutil.Listener{Type: listenerutil.BufConnType})
}
for i, lnConfig := range config.Listeners {
var ln net.Listener
var tlsConf *tls.Config

if lnConfig.Type == listenerutil.BufConnType {
inProcListener := bufconn.Listen(1024 * 1024)
if lnConfig.Type == listenerutil.BufConnType {
inProcListener := bufconn.Listen(1024 * 1024)
if config.Cache != nil {
config.Cache.InProcDialer = listenerutil.NewBufConnWrapper(inProcListener)
ln = inProcListener
} else {
ln, tlsConf, err = cache.StartListener(lnConfig)
if err != nil {
c.UI.Error(fmt.Sprintf("Error starting listener: %v", err))
return 1
}
}
ln = inProcListener
} else {
ln, tlsConf, err = cache.StartListener(lnConfig)
if err != nil {
c.UI.Error(fmt.Sprintf("Error starting listener: %v", err))
return 1
}
}

listeners = append(listeners, ln)
listeners = append(listeners, ln)

// Parse 'require_request_header' listener config option, and wrap
// the request handler if necessary
muxHandler := cacheHandler
if lnConfig.RequireRequestHeader && ("metrics_only" != lnConfig.Role) {
muxHandler = verifyRequestHeader(muxHandler)
}
listenerLogger := c.logger.Named("listener")

// Create a muxer and add paths relevant for the lease cache layer
mux := http.NewServeMux()
quitEnabled := lnConfig.AgentAPI != nil && lnConfig.AgentAPI.EnableQuit
var muxHandler http.Handler

mux.Handle(consts.AgentPathMetrics, c.handleMetrics())
if "metrics_only" != lnConfig.Role {
mux.Handle(consts.AgentPathCacheClear, leaseCache.HandleCacheClear(ctx))
mux.Handle(consts.AgentPathQuit, c.handleQuit(quitEnabled))
mux.Handle("/", muxHandler)
if cacheHandler != nil {
muxHandler = cacheHandler
} else {
apiProxy, err := cache.NewAPIProxy(&cache.APIProxyConfig{
Client: proxyClient,
Logger: listenerLogger.Named("apiproxy"),
})
if err != nil {
c.UI.Error(fmt.Sprintf("Error creating API proxy: %v", err))
return 1
}
muxHandler = cache.ProxyHandler(ctx, listenerLogger, apiProxy)
}

scheme := "https://"
if tlsConf == nil {
scheme = "http://"
}
if ln.Addr().Network() == "unix" {
scheme = "unix://"
}
// Parse 'require_request_header' listener config option, and wrap
// the request handler if necessary
if lnConfig.RequireRequestHeader && ("metrics_only" != lnConfig.Role) {
muxHandler = verifyRequestHeader(muxHandler)
}

infoKey := fmt.Sprintf("api address %d", i+1)
info[infoKey] = scheme + ln.Addr().String()
infoKeys = append(infoKeys, infoKey)

server := &http.Server{
Addr: ln.Addr().String(),
TLSConfig: tlsConf,
Handler: mux,
ReadHeaderTimeout: 10 * time.Second,
ReadTimeout: 30 * time.Second,
IdleTimeout: 5 * time.Minute,
ErrorLog: cacheLogger.StandardLogger(nil),
}
// Create a muxer and add paths relevant for the lease cache layer
mux := http.NewServeMux()
quitEnabled := lnConfig.AgentAPI != nil && lnConfig.AgentAPI.EnableQuit

go server.Serve(ln)
mux.Handle(consts.AgentPathMetrics, c.handleMetrics())
if "metrics_only" != lnConfig.Role {
mux.Handle(consts.AgentPathCacheClear, leaseCache.HandleCacheClear(ctx))
mux.Handle(consts.AgentPathQuit, c.handleQuit(quitEnabled))
mux.Handle("/", muxHandler)
}

// Ensure that listeners are closed at all the exits
listenerCloseFunc := func() {
for _, ln := range listeners {
ln.Close()
}
scheme := "https://"
if tlsConf == nil {
scheme = "http://"
}
if ln.Addr().Network() == "unix" {
scheme = "unix://"
}

infoKey := fmt.Sprintf("api address %d", i+1)
info[infoKey] = scheme + ln.Addr().String()
infoKeys = append(infoKeys, infoKey)

server := &http.Server{
Addr: ln.Addr().String(),
TLSConfig: tlsConf,
Handler: mux,
ReadHeaderTimeout: 10 * time.Second,
ReadTimeout: 30 * time.Second,
IdleTimeout: 5 * time.Minute,
ErrorLog: listenerLogger.StandardLogger(nil),
}

go server.Serve(ln)
}

// Ensure that listeners are closed at all the exits
listenerCloseFunc := func() {
for _, ln := range listeners {
ln.Close()
}
defer c.cleanupGuard.Do(listenerCloseFunc)
}
defer c.cleanupGuard.Do(listenerCloseFunc)

// Inform any tests that the server is ready
if c.startedCh != nil {
Expand Down
36 changes: 36 additions & 0 deletions command/agent/cache/api_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,42 @@ func TestAPIProxy(t *testing.T) {
}
}

func TestAPIProxyNoCache(t *testing.T) {
cleanup, client, _, _ := setupClusterAndAgentNoCache(namespace.RootContext(nil), t, nil)
defer cleanup()

proxier, err := NewAPIProxy(&APIProxyConfig{
Client: client,
Logger: logging.NewVaultLogger(hclog.Trace),
})
if err != nil {
t.Fatal(err)
}

r := client.NewRequest("GET", "/v1/sys/health")
req, err := r.ToHTTP()
if err != nil {
t.Fatal(err)
}

resp, err := proxier.Send(namespace.RootContext(nil), &SendRequest{
Request: req,
})
if err != nil {
t.Fatal(err)
}

var result api.HealthResponse
err = jsonutil.DecodeJSONFromReader(resp.Response.Body, &result)
if err != nil {
t.Fatal(err)
}

if !result.Initialized || result.Sealed || result.Standby {
t.Fatalf("bad sys/health response: %#v", result)
}
}

func TestAPIProxy_queryParams(t *testing.T) {
// Set up an agent that points to a standby node for this particular test
// since it needs to proxy a /sys/health?standbyok=true request to a standby
Expand Down
65 changes: 41 additions & 24 deletions command/agent/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,26 @@ path "*" {
// be deferred immediately along with two clients, one for direct cluster
// communication and another to talk to the caching agent.
func setupClusterAndAgent(ctx context.Context, t *testing.T, coreConfig *vault.CoreConfig) (func(), *api.Client, *api.Client, *LeaseCache) {
return setupClusterAndAgentCommon(ctx, t, coreConfig, false)
return setupClusterAndAgentCommon(ctx, t, coreConfig, false, true)
}

// setupClusterAndAgentNoCache is a helper func used to set up a test cluster and
// proxying agent against the active node. It returns a cleanup func that should
// be deferred immediately along with two clients, one for direct cluster
// communication and another to talk to the caching agent.
func setupClusterAndAgentNoCache(ctx context.Context, t *testing.T, coreConfig *vault.CoreConfig) (func(), *api.Client, *api.Client, *LeaseCache) {
return setupClusterAndAgentCommon(ctx, t, coreConfig, false, false)
}

// setupClusterAndAgentOnStandby is a helper func used to set up a test cluster
// and caching agent against a standby node. It returns a cleanup func that
// should be deferred immediately along with two clients, one for direct cluster
// communication and another to talk to the caching agent.
func setupClusterAndAgentOnStandby(ctx context.Context, t *testing.T, coreConfig *vault.CoreConfig) (func(), *api.Client, *api.Client, *LeaseCache) {
return setupClusterAndAgentCommon(ctx, t, coreConfig, true)
return setupClusterAndAgentCommon(ctx, t, coreConfig, true, true)
}

func setupClusterAndAgentCommon(ctx context.Context, t *testing.T, coreConfig *vault.CoreConfig, onStandby bool) (func(), *api.Client, *api.Client, *LeaseCache) {
func setupClusterAndAgentCommon(ctx context.Context, t *testing.T, coreConfig *vault.CoreConfig, onStandby bool, useCache bool) (func(), *api.Client, *api.Client, *LeaseCache) {
t.Helper()

if ctx == nil {
Expand Down Expand Up @@ -121,45 +129,54 @@ func setupClusterAndAgentCommon(ctx context.Context, t *testing.T, coreConfig *v
origEnvVaultCACert := os.Getenv(api.EnvVaultCACert)
os.Setenv(api.EnvVaultCACert, fmt.Sprintf("%s/ca_cert.pem", cluster.TempDir))

cacheLogger := logging.NewVaultLogger(hclog.Trace).Named("cache")

listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}

apiProxyLogger := logging.NewVaultLogger(hclog.Trace).Named("apiproxy")

// Create the API proxier
apiProxy, err := NewAPIProxy(&APIProxyConfig{
Client: clienToUse,
Logger: cacheLogger.Named("apiproxy"),
})
if err != nil {
t.Fatal(err)
}

// Create the lease cache proxier and set its underlying proxier to
// the API proxier.
leaseCache, err := NewLeaseCache(&LeaseCacheConfig{
Client: clienToUse,
BaseContext: ctx,
Proxier: apiProxy,
Logger: cacheLogger.Named("leasecache"),
Logger: apiProxyLogger,
})
if err != nil {
t.Fatal(err)
}

// Create a muxer and add paths relevant for the lease cache layer
// Create a muxer and add paths relevant for the lease cache layer and API proxy layer
mux := http.NewServeMux()
mux.Handle("/agent/v1/cache-clear", leaseCache.HandleCacheClear(ctx))

mux.Handle("/", Handler(ctx, cacheLogger, leaseCache, nil, true))
var leaseCache *LeaseCache
if useCache {
cacheLogger := logging.NewVaultLogger(hclog.Trace).Named("cache")

// Create the lease cache proxier and set its underlying proxier to
// the API proxier.
leaseCache, err = NewLeaseCache(&LeaseCacheConfig{
Client: clienToUse,
BaseContext: ctx,
Proxier: apiProxy,
Logger: cacheLogger.Named("leasecache"),
})
if err != nil {
t.Fatal(err)
}

mux.Handle("/agent/v1/cache-clear", leaseCache.HandleCacheClear(ctx))

mux.Handle("/", CachingProxyHandler(ctx, cacheLogger, leaseCache, nil, true))
} else {
mux.Handle("/", ProxyHandler(ctx, apiProxyLogger, apiProxy))
}

server := &http.Server{
Handler: mux,
ReadHeaderTimeout: 10 * time.Second,
ReadTimeout: 30 * time.Second,
IdleTimeout: 5 * time.Minute,
ErrorLog: cacheLogger.StandardLogger(nil),
ErrorLog: apiProxyLogger.StandardLogger(nil),
}
go server.Serve(listener)

Expand Down Expand Up @@ -248,7 +265,7 @@ func TestCache_AutoAuthTokenStripping(t *testing.T) {
mux := http.NewServeMux()
mux.Handle(consts.AgentPathCacheClear, leaseCache.HandleCacheClear(ctx))

mux.Handle("/", Handler(ctx, cacheLogger, leaseCache, mock.NewSink("testid"), true))
mux.Handle("/", CachingProxyHandler(ctx, cacheLogger, leaseCache, mock.NewSink("testid"), true))
server := &http.Server{
Handler: mux,
ReadHeaderTimeout: 10 * time.Second,
Expand Down Expand Up @@ -337,7 +354,7 @@ func TestCache_AutoAuthClientTokenProxyStripping(t *testing.T) {
mux := http.NewServeMux()
// mux.Handle(consts.AgentPathCacheClear, leaseCache.HandleCacheClear(ctx))

mux.Handle("/", Handler(ctx, cacheLogger, leaseCache, mock.NewSink(realToken), false))
mux.Handle("/", CachingProxyHandler(ctx, cacheLogger, leaseCache, mock.NewSink(realToken), false))
server := &http.Server{
Handler: mux,
ReadHeaderTimeout: 10 * time.Second,
Expand Down
Loading