diff --git a/apis/v1alpha1/policy_methods.go b/apis/v1alpha1/policy_methods.go index 97e7074a62..ad399c0897 100644 --- a/apis/v1alpha1/policy_methods.go +++ b/apis/v1alpha1/policy_methods.go @@ -31,3 +31,15 @@ func (p *ObservabilityPolicy) GetPolicyStatus() v1alpha2.PolicyStatus { func (p *ObservabilityPolicy) SetPolicyStatus(status v1alpha2.PolicyStatus) { p.Status = status } + +func (p *UpstreamSettingsPolicy) GetTargetRefs() []v1alpha2.LocalPolicyTargetReference { + return p.Spec.TargetRefs +} + +func (p *UpstreamSettingsPolicy) GetPolicyStatus() v1alpha2.PolicyStatus { + return p.Status +} + +func (p *UpstreamSettingsPolicy) SetPolicyStatus(status v1alpha2.PolicyStatus) { + p.Status = status +} diff --git a/config/crd/bases/gateway.nginx.org_upstreamsettingspolicies.yaml b/config/crd/bases/gateway.nginx.org_upstreamsettingspolicies.yaml index dbe0462862..992af79932 100644 --- a/config/crd/bases/gateway.nginx.org_upstreamsettingspolicies.yaml +++ b/config/crd/bases/gateway.nginx.org_upstreamsettingspolicies.yaml @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.16.2 + controller-gen.kubebuilder.io/version: v0.16.5 labels: gateway.networking.k8s.io/policy: direct name: upstreamsettingspolicies.gateway.nginx.org @@ -76,13 +76,13 @@ spec: Time defines the maximum time during which requests can be processed through one keep-alive connection. After this time is reached, the connection is closed following the subsequent request processing. Directive: https://nginx.org/en/docs/http/ngx_http_upstream_module.html#keepalive_time - pattern: ^\d{1,4}(ms|s)?$ + pattern: ^[0-9]{1,4}(ms|s|m|h)?$ type: string timeout: description: |- Timeout defines the keep-alive timeout for upstreams. Directive: https://nginx.org/en/docs/http/ngx_http_upstream_module.html#keepalive_timeout - pattern: ^\d{1,4}(ms|s)?$ + pattern: ^[0-9]{1,4}(ms|s|m|h)?$ type: string type: object targetRefs: diff --git a/internal/mode/static/nginx/config/generator.go b/internal/mode/static/nginx/config/generator.go index 9792eb4d9a..24b8ff8996 100644 --- a/internal/mode/static/nginx/config/generator.go +++ b/internal/mode/static/nginx/config/generator.go @@ -6,9 +6,11 @@ import ( "github.com/go-logr/logr" ngfConfig "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/config" + "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/config/http" "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/config/policies" "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/config/policies/clientsettings" "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/config/policies/observability" + "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/config/policies/upstreamsettings" "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/file" "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/state/dataplane" ) @@ -131,7 +133,10 @@ func (g GeneratorImpl) executeConfigTemplates( ) []file.File { fileBytes := make(map[string][]byte) - for _, execute := range g.getExecuteFuncs(generator) { + httpUpstreams := g.createUpstreams(conf.Upstreams, upstreamsettings.NewProcessor()) + keepAliveCheck := newKeepAliveChecker(httpUpstreams) + + for _, execute := range g.getExecuteFuncs(generator, httpUpstreams, keepAliveCheck) { results := execute(conf) for _, res := range results { fileBytes[res.dest] = append(fileBytes[res.dest], res.data...) @@ -156,12 +161,16 @@ func (g GeneratorImpl) executeConfigTemplates( return files } -func (g GeneratorImpl) getExecuteFuncs(generator policies.Generator) []executeFunc { +func (g GeneratorImpl) getExecuteFuncs( + generator policies.Generator, + upstreams []http.Upstream, + keepAliveCheck keepAliveChecker, +) []executeFunc { return []executeFunc{ executeMainConfig, executeBaseHTTPConfig, - g.newExecuteServersFunc(generator), - g.executeUpstreams, + g.newExecuteServersFunc(generator, keepAliveCheck), + newExecuteUpstreamsFunc(upstreams), executeSplitClients, executeMaps, executeTelemetry, diff --git a/internal/mode/static/nginx/config/http/config.go b/internal/mode/static/nginx/config/http/config.go index 24aecaa3e4..f1c6caae46 100644 --- a/internal/mode/static/nginx/config/http/config.go +++ b/internal/mode/static/nginx/config/http/config.go @@ -1,6 +1,8 @@ package http -import "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/config/shared" +import ( + "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/config/shared" +) const ( InternalRoutePathPrefix = "/_ngf-internal" @@ -82,9 +84,18 @@ const ( // Upstream holds all configuration for an HTTP upstream. type Upstream struct { - Name string - ZoneSize string // format: 512k, 1m - Servers []UpstreamServer + Name string + ZoneSize string // format: 512k, 1m + KeepAlive UpstreamKeepAlive + Servers []UpstreamServer +} + +// UpstreamKeepAlive holds the keepalive configuration for an HTTP upstream. +type UpstreamKeepAlive struct { + Time string + Timeout string + Connections int32 + Requests int32 } // UpstreamServer holds all configuration for an HTTP upstream server. diff --git a/internal/mode/static/nginx/config/policies/upstreamsettings/processor.go b/internal/mode/static/nginx/config/policies/upstreamsettings/processor.go new file mode 100644 index 0000000000..5df29eed64 --- /dev/null +++ b/internal/mode/static/nginx/config/policies/upstreamsettings/processor.go @@ -0,0 +1,67 @@ +package upstreamsettings + +import ( + ngfAPI "github.com/nginxinc/nginx-gateway-fabric/apis/v1alpha1" + "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/config/http" + "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/config/policies" +) + +// Processor processes UpstreamSettingsPolicies. +type Processor struct{} + +// UpstreamSettings contains settings from UpstreamSettingsPolicy. +type UpstreamSettings struct { + // ZoneSize is the zone size setting. + ZoneSize string + // KeepAlive contains the keepalive settings. + KeepAlive http.UpstreamKeepAlive +} + +// NewProcessor returns a new Processor. +func NewProcessor() Processor { + return Processor{} +} + +// Process processes policies into an UpstreamSettings object. The policies are already validated and are guaranteed +// to not contain overlapping settings. This method merges all fields in the policies into a single UpstreamSettings +// object. +func (g Processor) Process(pols []policies.Policy) UpstreamSettings { + return processPolicies(pols) +} + +func processPolicies(pols []policies.Policy) UpstreamSettings { + upstreamSettings := UpstreamSettings{} + + for _, pol := range pols { + usp, ok := pol.(*ngfAPI.UpstreamSettingsPolicy) + if !ok { + continue + } + + // we can assume that there will be no instance of two or more policies setting the same + // field for the same service + if usp.Spec.ZoneSize != nil { + upstreamSettings.ZoneSize = string(*usp.Spec.ZoneSize) + } + + if usp.Spec.KeepAlive != nil { + if usp.Spec.KeepAlive.Connections != nil { + upstreamSettings.KeepAlive.Connections = *usp.Spec.KeepAlive.Connections + } + + if usp.Spec.KeepAlive.Requests != nil { + upstreamSettings.KeepAlive.Requests = *usp.Spec.KeepAlive.Requests + } + + if usp.Spec.KeepAlive.Time != nil { + upstreamSettings.KeepAlive.Time = string(*usp.Spec.KeepAlive.Time) + } + + if usp.Spec.KeepAlive.Timeout != nil { + upstreamSettings.KeepAlive.Timeout = string(*usp.Spec.KeepAlive.Timeout) + } + } + } + + return upstreamSettings +} diff --git a/internal/mode/static/nginx/config/policies/upstreamsettings/processor_test.go b/internal/mode/static/nginx/config/policies/upstreamsettings/processor_test.go new file mode 100644 index 0000000000..b7c785376f --- /dev/null +++ b/internal/mode/static/nginx/config/policies/upstreamsettings/processor_test.go @@ -0,0 +1,334 @@ +package upstreamsettings + +import ( + "testing" + + . "github.com/onsi/gomega" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + ngfAPI "github.com/nginxinc/nginx-gateway-fabric/apis/v1alpha1" + "github.com/nginxinc/nginx-gateway-fabric/internal/framework/helpers" + "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/config/http" + "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/config/policies" +) + +func TestProcess(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + expUpstreamSettings UpstreamSettings + policies []policies.Policy + }{ + { + name: "all fields populated", + policies: []policies.Policy{ + &ngfAPI.UpstreamSettingsPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "usp", + Namespace: "test", + }, + Spec: ngfAPI.UpstreamSettingsPolicySpec{ + ZoneSize: helpers.GetPointer[ngfAPI.Size]("2m"), + KeepAlive: helpers.GetPointer(ngfAPI.UpstreamKeepAlive{ + Connections: helpers.GetPointer(int32(1)), + Requests: helpers.GetPointer(int32(1)), + Time: helpers.GetPointer[ngfAPI.Duration]("5s"), + Timeout: helpers.GetPointer[ngfAPI.Duration]("10s"), + }), + }, + }, + }, + expUpstreamSettings: UpstreamSettings{ + ZoneSize: "2m", + KeepAlive: http.UpstreamKeepAlive{ + Connections: 1, + Requests: 1, + Time: "5s", + Timeout: "10s", + }, + }, + }, + { + name: "zone size set", + policies: []policies.Policy{ + &ngfAPI.UpstreamSettingsPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "usp", + Namespace: "test", + }, + Spec: ngfAPI.UpstreamSettingsPolicySpec{ + ZoneSize: helpers.GetPointer[ngfAPI.Size]("2m"), + }, + }, + }, + expUpstreamSettings: UpstreamSettings{ + ZoneSize: "2m", + }, + }, + { + name: "keep alive connections set", + policies: []policies.Policy{ + &ngfAPI.UpstreamSettingsPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "usp", + Namespace: "test", + }, + Spec: ngfAPI.UpstreamSettingsPolicySpec{ + KeepAlive: helpers.GetPointer(ngfAPI.UpstreamKeepAlive{ + Connections: helpers.GetPointer(int32(1)), + }), + }, + }, + }, + expUpstreamSettings: UpstreamSettings{ + KeepAlive: http.UpstreamKeepAlive{ + Connections: 1, + }, + }, + }, + { + name: "keep alive requests set", + policies: []policies.Policy{ + &ngfAPI.UpstreamSettingsPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "usp", + Namespace: "test", + }, + Spec: ngfAPI.UpstreamSettingsPolicySpec{ + KeepAlive: helpers.GetPointer(ngfAPI.UpstreamKeepAlive{ + Requests: helpers.GetPointer(int32(1)), + }), + }, + }, + }, + expUpstreamSettings: UpstreamSettings{ + KeepAlive: http.UpstreamKeepAlive{ + Requests: 1, + }, + }, + }, + { + name: "keep alive time set", + policies: []policies.Policy{ + &ngfAPI.UpstreamSettingsPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "usp", + Namespace: "test", + }, + Spec: ngfAPI.UpstreamSettingsPolicySpec{ + KeepAlive: helpers.GetPointer(ngfAPI.UpstreamKeepAlive{ + Time: helpers.GetPointer[ngfAPI.Duration]("5s"), + }), + }, + }, + }, + expUpstreamSettings: UpstreamSettings{ + KeepAlive: http.UpstreamKeepAlive{ + Time: "5s", + }, + }, + }, + { + name: "keep alive timeout set", + policies: []policies.Policy{ + &ngfAPI.UpstreamSettingsPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "usp", + Namespace: "test", + }, + Spec: ngfAPI.UpstreamSettingsPolicySpec{ + KeepAlive: helpers.GetPointer(ngfAPI.UpstreamKeepAlive{ + Timeout: helpers.GetPointer[ngfAPI.Duration]("10s"), + }), + }, + }, + }, + expUpstreamSettings: UpstreamSettings{ + KeepAlive: http.UpstreamKeepAlive{ + Timeout: "10s", + }, + }, + }, + { + name: "no fields populated", + policies: []policies.Policy{ + &ngfAPI.UpstreamSettingsPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "usp", + Namespace: "test", + }, + Spec: ngfAPI.UpstreamSettingsPolicySpec{}, + }, + }, + expUpstreamSettings: UpstreamSettings{}, + }, + { + name: "multiple UpstreamSettingsPolicies", + policies: []policies.Policy{ + &ngfAPI.UpstreamSettingsPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "usp-zonesize", + Namespace: "test", + }, + Spec: ngfAPI.UpstreamSettingsPolicySpec{ + ZoneSize: helpers.GetPointer[ngfAPI.Size]("2m"), + }, + }, + &ngfAPI.UpstreamSettingsPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "usp-keepalive-connections", + Namespace: "test", + }, + Spec: ngfAPI.UpstreamSettingsPolicySpec{ + KeepAlive: helpers.GetPointer(ngfAPI.UpstreamKeepAlive{ + Connections: helpers.GetPointer(int32(1)), + }), + }, + }, + &ngfAPI.UpstreamSettingsPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "usp-keepalive-requests", + Namespace: "test", + }, + Spec: ngfAPI.UpstreamSettingsPolicySpec{ + KeepAlive: helpers.GetPointer(ngfAPI.UpstreamKeepAlive{ + Requests: helpers.GetPointer(int32(1)), + }), + }, + }, + &ngfAPI.UpstreamSettingsPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "usp-keepalive-time", + Namespace: "test", + }, + Spec: ngfAPI.UpstreamSettingsPolicySpec{ + KeepAlive: helpers.GetPointer(ngfAPI.UpstreamKeepAlive{ + Time: helpers.GetPointer[ngfAPI.Duration]("5s"), + }), + }, + }, + &ngfAPI.UpstreamSettingsPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "usp-keepalive-timeout", + Namespace: "test", + }, + Spec: ngfAPI.UpstreamSettingsPolicySpec{ + KeepAlive: helpers.GetPointer(ngfAPI.UpstreamKeepAlive{ + Timeout: helpers.GetPointer[ngfAPI.Duration]("10s"), + }), + }, + }, + }, + expUpstreamSettings: UpstreamSettings{ + ZoneSize: "2m", + KeepAlive: http.UpstreamKeepAlive{ + Connections: 1, + Requests: 1, + Time: "5s", + Timeout: "10s", + }, + }, + }, + { + name: "multiple UpstreamSettingsPolicies along with other policies", + policies: []policies.Policy{ + &ngfAPI.UpstreamSettingsPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "usp-zonesize", + Namespace: "test", + }, + Spec: ngfAPI.UpstreamSettingsPolicySpec{ + ZoneSize: helpers.GetPointer[ngfAPI.Size]("2m"), + }, + }, + &ngfAPI.UpstreamSettingsPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "usp-keepalive-connections", + Namespace: "test", + }, + Spec: ngfAPI.UpstreamSettingsPolicySpec{ + KeepAlive: helpers.GetPointer(ngfAPI.UpstreamKeepAlive{ + Connections: helpers.GetPointer(int32(1)), + }), + }, + }, + &ngfAPI.UpstreamSettingsPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "usp-keepalive-requests", + Namespace: "test", + }, + Spec: ngfAPI.UpstreamSettingsPolicySpec{ + KeepAlive: helpers.GetPointer(ngfAPI.UpstreamKeepAlive{ + Requests: helpers.GetPointer(int32(1)), + }), + }, + }, + &ngfAPI.ClientSettingsPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "client-settings-policy", + Namespace: "test", + }, + Spec: ngfAPI.ClientSettingsPolicySpec{ + Body: &ngfAPI.ClientBody{ + MaxSize: helpers.GetPointer[ngfAPI.Size]("1m"), + }, + }, + }, + &ngfAPI.UpstreamSettingsPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "usp-keepalive-time", + Namespace: "test", + }, + Spec: ngfAPI.UpstreamSettingsPolicySpec{ + KeepAlive: helpers.GetPointer(ngfAPI.UpstreamKeepAlive{ + Time: helpers.GetPointer[ngfAPI.Duration]("5s"), + }), + }, + }, + &ngfAPI.UpstreamSettingsPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "usp-keepalive-timeout", + Namespace: "test", + }, + Spec: ngfAPI.UpstreamSettingsPolicySpec{ + KeepAlive: helpers.GetPointer(ngfAPI.UpstreamKeepAlive{ + Timeout: helpers.GetPointer[ngfAPI.Duration]("10s"), + }), + }, + }, + &ngfAPI.ObservabilityPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "observability-policy", + Namespace: "test", + }, + Spec: ngfAPI.ObservabilityPolicySpec{ + Tracing: &ngfAPI.Tracing{ + Strategy: ngfAPI.TraceStrategyRatio, + Ratio: helpers.GetPointer(int32(1)), + }, + }, + }, + }, + expUpstreamSettings: UpstreamSettings{ + ZoneSize: "2m", + KeepAlive: http.UpstreamKeepAlive{ + Connections: 1, + Requests: 1, + Time: "5s", + Timeout: "10s", + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + t.Parallel() + g := NewWithT(t) + processor := NewProcessor() + + g.Expect(processor.Process(test.policies)).To(Equal(test.expUpstreamSettings)) + }) + } +} diff --git a/internal/mode/static/nginx/config/servers.go b/internal/mode/static/nginx/config/servers.go index 33ea858f31..e7ccef05fc 100644 --- a/internal/mode/static/nginx/config/servers.go +++ b/internal/mode/static/nginx/config/servers.go @@ -23,82 +23,41 @@ const ( rootPath = "/" ) -// httpBaseHeaders contains the constant headers set in each HTTP server block. -var httpBaseHeaders = []http.Header{ - { - Name: "Host", - Value: "$gw_api_compliant_host", - }, - { - Name: "X-Forwarded-For", - Value: "$proxy_add_x_forwarded_for", - }, - { - Name: "Upgrade", - Value: "$http_upgrade", - }, - { - Name: "Connection", - Value: "$connection_upgrade", - }, - { - Name: "X-Real-IP", - Value: "$remote_addr", - }, - { - Name: "X-Forwarded-Proto", - Value: "$scheme", - }, - { - Name: "X-Forwarded-Host", - Value: "$host", - }, - { - Name: "X-Forwarded-Port", - Value: "$server_port", - }, -} - -// grpcBaseHeaders contains the constant headers set in each gRPC server block. -var grpcBaseHeaders = []http.Header{ - { - Name: "Host", - Value: "$gw_api_compliant_host", - }, - { - Name: "X-Forwarded-For", - Value: "$proxy_add_x_forwarded_for", - }, - { - Name: "Authority", - Value: "$gw_api_compliant_host", - }, - { - Name: "X-Real-IP", - Value: "$remote_addr", - }, - { - Name: "X-Forwarded-Proto", - Value: "$scheme", - }, - { - Name: "X-Forwarded-Host", - Value: "$host", - }, - { - Name: "X-Forwarded-Port", - Value: "$server_port", - }, -} - -func (g GeneratorImpl) newExecuteServersFunc(generator policies.Generator) executeFunc { +var grpcAuthorityHeader = http.Header{ + Name: "Authority", + Value: "$gw_api_compliant_host", +} + +var httpConnectionHeader = http.Header{ + Name: "Connection", + Value: "$connection_upgrade", +} + +var unsetHTTPConnectionHeader = http.Header{ + Name: "Connection", + Value: "", +} + +var httpUpgradeHeader = http.Header{ + Name: "Upgrade", + Value: "$http_upgrade", +} + +func (g GeneratorImpl) newExecuteServersFunc( + generator policies.Generator, + keepAliveCheck keepAliveChecker, +) executeFunc { return func(configuration dataplane.Configuration) []executeResult { - return g.executeServers(configuration, generator) + return g.executeServers(configuration, generator, keepAliveCheck) } } -func (g GeneratorImpl) executeServers(conf dataplane.Configuration, generator policies.Generator) []executeResult { - servers, httpMatchPairs := createServers(conf, generator) +func (g GeneratorImpl) executeServers( + conf dataplane.Configuration, + generator policies.Generator, + keepAliveCheck keepAliveChecker, +) []executeResult { + servers, httpMatchPairs := createServers(conf, generator, keepAliveCheck) serverConfig := http.ServerConfig{ Servers: servers, @@ -145,7 +104,11 @@ func getIPFamily(baseHTTPConfig dataplane.BaseHTTPConfig) shared.IPFamily { return shared.IPFamily{IPv4: true, IPv6: true} } -func createServers(conf dataplane.Configuration, generator policies.Generator) ([]http.Server, httpMatchPairs) { +func createServers( + conf dataplane.Configuration, + generator policies.Generator, + keepAliveCheck keepAliveChecker, +) ([]http.Server, httpMatchPairs) { servers := make([]http.Server, 0, len(conf.HTTPServers)+len(conf.SSLServers)) finalMatchPairs := make(httpMatchPairs) sharedTLSPorts := make(map[int32]struct{}) @@ -156,7 +119,7 @@ func createServers(conf dataplane.Configuration, generator policies.Generator) ( for idx, s := range conf.HTTPServers { serverID := fmt.Sprintf("%d", idx) - httpServer, matchPairs := createServer(s, serverID, generator) + httpServer, matchPairs := createServer(s, serverID, generator, keepAliveCheck) servers = append(servers, httpServer) maps.Copy(finalMatchPairs, matchPairs) } @@ -164,7 +127,7 @@ func createServers(conf dataplane.Configuration, generator policies.Generator) ( for idx, s := range conf.SSLServers { serverID := fmt.Sprintf("SSL_%d", idx) - sslServer, matchPairs := createSSLServer(s, serverID, generator) + sslServer, matchPairs := createSSLServer(s, serverID, generator, keepAliveCheck) if _, portInUse := sharedTLSPorts[s.Port]; portInUse { sslServer.Listen = getSocketNameHTTPS(s.Port) sslServer.IsSocket = true @@ -180,6 +143,7 @@ func createSSLServer( virtualServer dataplane.VirtualServer, serverID string, generator policies.Generator, + keepAliveCheck keepAliveChecker, ) (http.Server, httpMatchPairs) { listen := fmt.Sprint(virtualServer.Port) if virtualServer.IsDefault { @@ -189,7 +153,7 @@ func createSSLServer( }, nil } - locs, matchPairs, grpc := createLocations(&virtualServer, serverID, generator) + locs, matchPairs, grpc := createLocations(&virtualServer, serverID, generator, keepAliveCheck) server := http.Server{ ServerName: virtualServer.Hostname, @@ -218,6 +182,7 @@ func createServer( virtualServer dataplane.VirtualServer, serverID string, generator policies.Generator, + keepAliveCheck keepAliveChecker, ) (http.Server, httpMatchPairs) { listen := fmt.Sprint(virtualServer.Port) @@ -228,7 +193,7 @@ func createServer( }, nil } - locs, matchPairs, grpc := createLocations(&virtualServer, serverID, generator) + locs, matchPairs, grpc := createLocations(&virtualServer, serverID, generator, keepAliveCheck) server := http.Server{ ServerName: virtualServer.Hostname, @@ -264,6 +229,7 @@ func createLocations( server *dataplane.VirtualServer, serverID string, generator policies.Generator, + keepAliveCheck keepAliveChecker, ) ([]http.Location, httpMatchPairs, bool) { maxLocs, pathsAndTypes := getMaxLocationCountAndPathMap(server.PathRules) locs := make([]http.Location, 0, maxLocs) @@ -292,7 +258,15 @@ func createLocations( if !needsInternalLocations(rule) { for _, r := range rule.MatchRules { - extLocations = updateLocations(r.Filters, extLocations, r, server.Port, rule.Path, rule.GRPC) + extLocations = updateLocations( + r.Filters, + extLocations, + r, + server.Port, + rule.Path, + rule.GRPC, + keepAliveCheck, + ) } locs = append(locs, extLocations...) @@ -314,6 +288,7 @@ func createLocations( server.Port, rule.Path, rule.GRPC, + keepAliveCheck, ) internalLocations = append(internalLocations, intLocation) @@ -450,6 +425,7 @@ func updateLocation( listenerPort int32, path string, grpc bool, + keepAliveCheck keepAliveChecker, ) http.Location { if filters.InvalidFilter != nil { location.Return = &http.Return{Code: http.StatusInternalServerError} @@ -465,7 +441,16 @@ func updateLocation( } rewrites := createRewritesValForRewriteFilter(filters.RequestURLRewrite, path) - proxySetHeaders := generateProxySetHeaders(&matchRule.Filters, grpc) + + extraHeaders := make([]http.Header, 0, 3) + if grpc { + extraHeaders = append(extraHeaders, grpcAuthorityHeader) + } else { + extraHeaders = append(extraHeaders, httpUpgradeHeader) + extraHeaders = append(extraHeaders, getConnectionHeader(keepAliveCheck, matchRule.BackendGroup.Backends)) + } + + proxySetHeaders := generateProxySetHeaders(&matchRule.Filters, createBaseProxySetHeaders(extraHeaders...)) responseHeaders := generateResponseHeaders(&matchRule.Filters) if rewrites != nil { @@ -502,11 +487,12 @@ func updateLocations( listenerPort int32, path string, grpc bool, + keepAliveCheck keepAliveChecker, ) []http.Location { updatedLocations := make([]http.Location, len(buildLocations)) for i, loc := range buildLocations { - updatedLocations[i] = updateLocation(filters, loc, matchRule, listenerPort, path, grpc) + updatedLocations[i] = updateLocation(filters, loc, matchRule, listenerPort, path, grpc, keepAliveCheck) } return updatedLocations @@ -760,32 +746,26 @@ func createMatchLocation(path string, grpc bool) http.Location { return loc } -func generateProxySetHeaders(filters *dataplane.HTTPFilters, grpc bool) []http.Header { - var headers []http.Header - if !grpc { - headers = make([]http.Header, len(httpBaseHeaders)) - copy(headers, httpBaseHeaders) - } else { - headers = make([]http.Header, len(grpcBaseHeaders)) - copy(headers, grpcBaseHeaders) - } - +func generateProxySetHeaders( + filters *dataplane.HTTPFilters, + baseHeaders []http.Header, +) []http.Header { if filters != nil && filters.RequestURLRewrite != nil && filters.RequestURLRewrite.Hostname != nil { - for i, header := range headers { + for i, header := range baseHeaders { if header.Name == "Host" { - headers[i].Value = *filters.RequestURLRewrite.Hostname + baseHeaders[i].Value = *filters.RequestURLRewrite.Hostname break } } } if filters == nil || filters.RequestHeaderModifiers == nil { - return headers + return baseHeaders } headerFilter := filters.RequestHeaderModifiers - headerLen := len(headerFilter.Add) + len(headerFilter.Set) + len(headerFilter.Remove) + len(headers) + headerLen := len(headerFilter.Add) + len(headerFilter.Set) + len(headerFilter.Remove) + len(baseHeaders) proxySetHeaders := make([]http.Header, 0, headerLen) if len(headerFilter.Add) > 0 { addHeaders := createHeadersWithVarName(headerFilter.Add) @@ -803,7 +783,7 @@ func generateProxySetHeaders(filters *dataplane.HTTPFilters, grpc bool) []http.H }) } - return append(proxySetHeaders, headers...) + return append(proxySetHeaders, baseHeaders...) } func generateResponseHeaders(filters *dataplane.HTTPFilters) http.ResponseHeaders { @@ -887,3 +867,48 @@ func getRewriteClientIPSettings(rewriteIPConfig dataplane.RewriteClientIPSetting ProxyProtocol: proxyProtocol, } } + +func createBaseProxySetHeaders(extraHeaders ...http.Header) []http.Header { + baseHeaders := []http.Header{ + { + Name: "Host", + Value: "$gw_api_compliant_host", + }, + { + Name: "X-Forwarded-For", + Value: "$proxy_add_x_forwarded_for", + }, + { + Name: "X-Real-IP", + Value: "$remote_addr", + }, + { + Name: "X-Forwarded-Proto", + Value: "$scheme", + }, + { + Name: "X-Forwarded-Host", + Value: "$host", + }, + { + Name: "X-Forwarded-Port", + Value: "$server_port", + }, + } + + baseHeaders = append(baseHeaders, extraHeaders...) + + return baseHeaders +} + +func getConnectionHeader(keepAliveCheck keepAliveChecker, backends []dataplane.Backend) http.Header { + for _, backend := range backends { + if keepAliveCheck(backend.UpstreamName) { + // if keep-alive settings are enabled on any upstream, the connection header value + // must be empty for the location + return unsetHTTPConnectionHeader + } + } + + return httpConnectionHeader +} diff --git a/internal/mode/static/nginx/config/servers_test.go b/internal/mode/static/nginx/config/servers_test.go index 112991b521..24b40d0f60 100644 --- a/internal/mode/static/nginx/config/servers_test.go +++ b/internal/mode/static/nginx/config/servers_test.go @@ -17,6 +17,12 @@ import ( "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/state/dataplane" ) +var ( + httpBaseHeaders = createBaseProxySetHeaders(httpUpgradeHeader, httpConnectionHeader) + grpcBaseHeaders = createBaseProxySetHeaders(grpcAuthorityHeader) + alwaysFalseKeepAliveChecker = func(_ string) bool { return false } +) + func TestExecuteServers(t *testing.T) { t.Parallel() @@ -182,7 +188,7 @@ func TestExecuteServers(t *testing.T) { ) gen := GeneratorImpl{} - results := gen.executeServers(conf, fakeGenerator) + results := gen.executeServers(conf, fakeGenerator, alwaysFalseKeepAliveChecker) g.Expect(results).To(HaveLen(len(expectedResults))) for _, res := range results { @@ -321,7 +327,8 @@ func TestExecuteServers_IPFamily(t *testing.T) { g := NewWithT(t) gen := GeneratorImpl{} - results := gen.executeServers(test.config, &policiesfakes.FakeGenerator{}) + results := gen.executeServers(test.config, &policiesfakes.FakeGenerator{}, alwaysFalseKeepAliveChecker) + g.Expect(results).To(HaveLen(2)) serverConf := string(results[0].data) httpMatchConf := string(results[1].data) @@ -439,7 +446,7 @@ func TestExecuteServers_RewriteClientIP(t *testing.T) { g := NewWithT(t) gen := GeneratorImpl{} - results := gen.executeServers(test.config, &policiesfakes.FakeGenerator{}) + results := gen.executeServers(test.config, &policiesfakes.FakeGenerator{}, alwaysFalseKeepAliveChecker) g.Expect(results).To(HaveLen(2)) serverConf := string(results[0].data) httpMatchConf := string(results[1].data) @@ -481,7 +488,7 @@ func TestExecuteServers_Plus(t *testing.T) { g := NewWithT(t) gen := GeneratorImpl{plus: true} - results := gen.executeServers(config, &policiesfakes.FakeGenerator{}) + results := gen.executeServers(config, &policiesfakes.FakeGenerator{}, alwaysFalseKeepAliveChecker) g.Expect(results).To(HaveLen(2)) serverConf := string(results[0].data) @@ -565,7 +572,7 @@ func TestExecuteForDefaultServers(t *testing.T) { g := NewWithT(t) gen := GeneratorImpl{} - serverResults := gen.executeServers(tc.conf, &policiesfakes.FakeGenerator{}) + serverResults := gen.executeServers(tc.conf, &policiesfakes.FakeGenerator{}, alwaysFalseKeepAliveChecker) g.Expect(serverResults).To(HaveLen(2)) serverConf := string(serverResults[0].data) httpMatchConf := string(serverResults[1].data) @@ -649,6 +656,18 @@ func TestCreateServers(t *testing.T) { }, } + keepAliveGroup := dataplane.BackendGroup{ + Source: hrNsName, + RuleIdx: 4, + Backends: []dataplane.Backend{ + { + UpstreamName: "test_keep_alive_80", + Valid: true, + Weight: 1, + }, + }, + } + filterGroup1 := dataplane.BackendGroup{Source: hrNsName, RuleIdx: 3} filterGroup2 := dataplane.BackendGroup{Source: hrNsName, RuleIdx: 4} @@ -969,6 +988,16 @@ func TestCreateServers(t *testing.T) { }, }, }, + { + Path: "/keep-alive-enabled", + PathType: dataplane.PathTypeExact, + MatchRules: []dataplane.MatchRule{ + { + Match: dataplane.Match{}, + BackendGroup: keepAliveGroup, + }, + }, + }, } conf := dataplane.Configuration{ @@ -1072,14 +1101,6 @@ func TestCreateServers(t *testing.T) { Name: "X-Forwarded-For", Value: "$proxy_add_x_forwarded_for", }, - { - Name: "Upgrade", - Value: "$http_upgrade", - }, - { - Name: "Connection", - Value: "$connection_upgrade", - }, { Name: "X-Real-IP", Value: "$remote_addr", @@ -1096,6 +1117,14 @@ func TestCreateServers(t *testing.T) { Name: "X-Forwarded-Port", Value: "$server_port", }, + { + Name: "Upgrade", + Value: "$http_upgrade", + }, + { + Name: "Connection", + Value: "$connection_upgrade", + }, } externalIncludes := []shared.Include{ @@ -1343,44 +1372,12 @@ func TestCreateServers(t *testing.T) { { Path: "/proxy-set-headers/", ProxyPass: "http://test_foo_80$request_uri", - ProxySetHeaders: []http.Header{ + ProxySetHeaders: append([]http.Header{ { Name: "my-header", Value: "${my_header_header_var}some-value-123", }, - { - Name: "Host", - Value: "$gw_api_compliant_host", - }, - { - Name: "X-Forwarded-For", - Value: "$proxy_add_x_forwarded_for", - }, - { - Name: "Upgrade", - Value: "$http_upgrade", - }, - { - Name: "Connection", - Value: "$connection_upgrade", - }, - { - Name: "X-Real-IP", - Value: "$remote_addr", - }, - { - Name: "X-Forwarded-Proto", - Value: "$scheme", - }, - { - Name: "X-Forwarded-Host", - Value: "$host", - }, - { - Name: "X-Forwarded-Port", - Value: "$server_port", - }, - }, + }, httpBaseHeaders...), ResponseHeaders: http.ResponseHeaders{ Add: []http.Header{ { @@ -1397,44 +1394,12 @@ func TestCreateServers(t *testing.T) { { Path: "= /proxy-set-headers", ProxyPass: "http://test_foo_80$request_uri", - ProxySetHeaders: []http.Header{ + ProxySetHeaders: append([]http.Header{ { Name: "my-header", Value: "${my_header_header_var}some-value-123", }, - { - Name: "Host", - Value: "$gw_api_compliant_host", - }, - { - Name: "X-Forwarded-For", - Value: "$proxy_add_x_forwarded_for", - }, - { - Name: "Upgrade", - Value: "$http_upgrade", - }, - { - Name: "Connection", - Value: "$connection_upgrade", - }, - { - Name: "X-Real-IP", - Value: "$remote_addr", - }, - { - Name: "X-Forwarded-Proto", - Value: "$scheme", - }, - { - Name: "X-Forwarded-Host", - Value: "$host", - }, - { - Name: "X-Forwarded-Port", - Value: "$server_port", - }, - }, + }, httpBaseHeaders...), ResponseHeaders: http.ResponseHeaders{ Add: []http.Header{ { @@ -1489,6 +1454,13 @@ func TestCreateServers(t *testing.T) { Type: http.InternalLocationType, Includes: internalIncludes, }, + { + Path: "= /keep-alive-enabled", + ProxyPass: "http://test_keep_alive_80$request_uri", + ProxySetHeaders: createBaseProxySetHeaders(httpUpgradeHeader, unsetHTTPConnectionHeader), + Type: http.ExternalLocationType, + Includes: externalIncludes, + }, } } @@ -1541,7 +1513,15 @@ func TestCreateServers(t *testing.T) { }, }) - result, httpMatchPair := createServers(conf, fakeGenerator) + keepAliveEnabledUpstream := http.Upstream{ + Name: "test_keep_alive_80", + KeepAlive: http.UpstreamKeepAlive{ + Connections: 1, + }, + } + keepAliveCheck := newKeepAliveChecker([]http.Upstream{keepAliveEnabledUpstream}) + + result, httpMatchPair := createServers(conf, fakeGenerator, keepAliveCheck) g.Expect(httpMatchPair).To(Equal(allExpMatchPair)) g.Expect(helpers.Diff(expectedServers, result)).To(BeEmpty()) @@ -1762,6 +1742,7 @@ func TestCreateServersConflicts(t *testing.T) { result, _ := createServers( dataplane.Configuration{HTTPServers: httpServers}, &policiesfakes.FakeGenerator{}, + alwaysFalseKeepAliveChecker, ) g.Expect(helpers.Diff(expectedServers, result)).To(BeEmpty()) }) @@ -1912,7 +1893,7 @@ func TestCreateServers_Includes(t *testing.T) { conf := dataplane.Configuration{HTTPServers: httpServers, SSLServers: sslServers} - actualServers, matchPairs := createServers(conf, fakeGenerator) + actualServers, matchPairs := createServers(conf, fakeGenerator, alwaysFalseKeepAliveChecker) g.Expect(matchPairs).To(BeEmpty()) g.Expect(actualServers).To(HaveLen(len(expServers))) @@ -2073,7 +2054,7 @@ func TestCreateLocations_Includes(t *testing.T) { }, }) - locations, matches, grpc := createLocations(&httpServer, "1", fakeGenerator) + locations, matches, grpc := createLocations(&httpServer, "1", fakeGenerator, alwaysFalseKeepAliveChecker) g := NewWithT(t) g.Expect(grpc).To(BeFalse()) @@ -2256,10 +2237,15 @@ func TestCreateLocationsRootPath(t *testing.T) { t.Parallel() g := NewWithT(t) - locs, httpMatchPair, grpc := createLocations(&dataplane.VirtualServer{ - PathRules: test.pathRules, - Port: 80, - }, "1", &policiesfakes.FakeGenerator{}) + locs, httpMatchPair, grpc := createLocations( + &dataplane.VirtualServer{ + PathRules: test.pathRules, + Port: 80, + }, + "1", + &policiesfakes.FakeGenerator{}, + alwaysFalseKeepAliveChecker, + ) g.Expect(locs).To(Equal(test.expLocations)) g.Expect(httpMatchPair).To(BeEmpty()) g.Expect(grpc).To(Equal(test.grpc)) @@ -2897,7 +2883,7 @@ func TestGenerateProxySetHeaders(t *testing.T) { filters *dataplane.HTTPFilters msg string expectedHeaders []http.Header - GRPC bool + baseHeaders []http.Header }{ { msg: "header filter", @@ -2918,7 +2904,7 @@ func TestGenerateProxySetHeaders(t *testing.T) { Remove: []string{"my-header"}, }, }, - expectedHeaders: []http.Header{ + expectedHeaders: append([]http.Header{ { Name: "Authorization", Value: "${authorization_header_var}my-auth", @@ -2931,39 +2917,8 @@ func TestGenerateProxySetHeaders(t *testing.T) { Name: "my-header", Value: "", }, - { - Name: "Host", - Value: "$gw_api_compliant_host", - }, - { - Name: "X-Forwarded-For", - Value: "$proxy_add_x_forwarded_for", - }, - { - Name: "Upgrade", - Value: "$http_upgrade", - }, - { - Name: "Connection", - Value: "$connection_upgrade", - }, - { - Name: "X-Real-IP", - Value: "$remote_addr", - }, - { - Name: "X-Forwarded-Proto", - Value: "$scheme", - }, - { - Name: "X-Forwarded-Host", - Value: "$host", - }, - { - Name: "X-Forwarded-Port", - Value: "$server_port", - }, - }, + }, httpBaseHeaders...), + baseHeaders: httpBaseHeaders, }, { msg: "with url rewrite hostname", @@ -2993,14 +2948,6 @@ func TestGenerateProxySetHeaders(t *testing.T) { Name: "X-Forwarded-For", Value: "$proxy_add_x_forwarded_for", }, - { - Name: "Upgrade", - Value: "$http_upgrade", - }, - { - Name: "Connection", - Value: "$connection_upgrade", - }, { Name: "X-Real-IP", Value: "$remote_addr", @@ -3017,11 +2964,19 @@ func TestGenerateProxySetHeaders(t *testing.T) { Name: "X-Forwarded-Port", Value: "$server_port", }, + { + Name: "Upgrade", + Value: "$http_upgrade", + }, + { + Name: "Connection", + Value: "$connection_upgrade", + }, }, + baseHeaders: createBaseProxySetHeaders(httpUpgradeHeader, httpConnectionHeader), }, { - msg: "header filter with gRPC", - GRPC: true, + msg: "header filter with gRPC", filters: &dataplane.HTTPFilters{ RequestHeaderModifiers: &dataplane.HTTPHeaderFilter{ Add: []dataplane.HTTPHeader{ @@ -3039,7 +2994,7 @@ func TestGenerateProxySetHeaders(t *testing.T) { Remove: []string{"my-header"}, }, }, - expectedHeaders: []http.Header{ + expectedHeaders: append([]http.Header{ { Name: "Authorization", Value: "${authorization_header_var}my-auth", @@ -3052,33 +3007,207 @@ func TestGenerateProxySetHeaders(t *testing.T) { Name: "my-header", Value: "", }, + }, grpcBaseHeaders...), + baseHeaders: grpcBaseHeaders, + }, + } + + for _, tc := range tests { + t.Run(tc.msg, func(t *testing.T) { + t.Parallel() + g := NewWithT(t) + + headers := generateProxySetHeaders(tc.filters, tc.baseHeaders) + g.Expect(headers).To(Equal(tc.expectedHeaders)) + }) + } +} + +func TestCreateBaseProxySetHeaders(t *testing.T) { + t.Parallel() + + expBaseHeaders := []http.Header{ + { + Name: "Host", + Value: "$gw_api_compliant_host", + }, + { + Name: "X-Forwarded-For", + Value: "$proxy_add_x_forwarded_for", + }, + { + Name: "X-Real-IP", + Value: "$remote_addr", + }, + { + Name: "X-Forwarded-Proto", + Value: "$scheme", + }, + { + Name: "X-Forwarded-Host", + Value: "$host", + }, + { + Name: "X-Forwarded-Port", + Value: "$server_port", + }, + } + + tests := []struct { + msg string + additionalHeaders []http.Header + expBaseHeaders []http.Header + }{ + { + msg: "no additional headers", + additionalHeaders: []http.Header{}, + expBaseHeaders: expBaseHeaders, + }, + { + msg: "single additional headers", + additionalHeaders: []http.Header{ + grpcAuthorityHeader, + }, + expBaseHeaders: append(expBaseHeaders, grpcAuthorityHeader), + }, + { + msg: "multiple additional headers", + additionalHeaders: []http.Header{ + httpConnectionHeader, + httpUpgradeHeader, + }, + expBaseHeaders: append(expBaseHeaders, httpConnectionHeader, httpUpgradeHeader), + }, + { + msg: "unset connection header and upgrade header", + additionalHeaders: []http.Header{ + unsetHTTPConnectionHeader, + httpUpgradeHeader, + }, + expBaseHeaders: append(expBaseHeaders, unsetHTTPConnectionHeader, httpUpgradeHeader), + }, + } + + for _, test := range tests { + t.Run(test.msg, func(t *testing.T) { + t.Parallel() + g := NewWithT(t) + + result := createBaseProxySetHeaders(test.additionalHeaders...) + g.Expect(result).To(Equal(test.expBaseHeaders)) + }) + } +} + +func TestGetConnectionHeader(t *testing.T) { + t.Parallel() + + tests := []struct { + msg string + upstreams []http.Upstream + expConnectionHeader http.Header + backends []dataplane.Backend + }{ + { + msg: "no upstreams with keepAlive enabled", + upstreams: []http.Upstream{ + { + Name: "upstream1", + }, { - Name: "Host", - Value: "$gw_api_compliant_host", + Name: "upstream2", }, { - Name: "X-Forwarded-For", - Value: "$proxy_add_x_forwarded_for", + Name: "upstream3", }, + }, + backends: []dataplane.Backend{ { - Name: "Authority", - Value: "$gw_api_compliant_host", + UpstreamName: "upstream1", }, { - Name: "X-Real-IP", - Value: "$remote_addr", + UpstreamName: "upstream2", }, { - Name: "X-Forwarded-Proto", - Value: "$scheme", + UpstreamName: "upstream3", }, + }, + expConnectionHeader: httpConnectionHeader, + }, + { + msg: "upstream with keepAlive enabled", + upstreams: []http.Upstream{ { - Name: "X-Forwarded-Host", - Value: "$host", + Name: "upstream", + KeepAlive: http.UpstreamKeepAlive{ + Connections: 1, + }, }, + }, + backends: []dataplane.Backend{ { - Name: "X-Forwarded-Port", - Value: "$server_port", + UpstreamName: "upstream", + }, + }, + expConnectionHeader: unsetHTTPConnectionHeader, + }, + { + msg: "multiple upstreams with keepAlive enabled", + upstreams: []http.Upstream{ + { + Name: "upstream1", + KeepAlive: http.UpstreamKeepAlive{ + Connections: 1, + }, + }, + { + Name: "upstream2", + KeepAlive: http.UpstreamKeepAlive{ + Connections: 2, + Requests: 1, + }, + }, + { + Name: "upstream3", + KeepAlive: http.UpstreamKeepAlive{ + Connections: 3, + Time: "5s", + }, + }, + }, + backends: []dataplane.Backend{ + { + UpstreamName: "upstream1", + }, + { + UpstreamName: "upstream2", + }, + { + UpstreamName: "upstream3", + }, + }, + expConnectionHeader: unsetHTTPConnectionHeader, + }, + { + msg: "mix of upstreams with keepAlive enabled and disabled", + expConnectionHeader: unsetHTTPConnectionHeader, + upstreams: []http.Upstream{ + { + Name: "upstream1", + KeepAlive: http.UpstreamKeepAlive{ + Connections: 1, + }, + }, + { + Name: "upstream2", + }, + }, + backends: []dataplane.Backend{ + { + UpstreamName: "upstream1", + }, + { + UpstreamName: "upstream2", }, }, }, @@ -3089,8 +3218,10 @@ func TestGenerateProxySetHeaders(t *testing.T) { t.Parallel() g := NewWithT(t) - headers := generateProxySetHeaders(tc.filters, tc.GRPC) - g.Expect(headers).To(Equal(tc.expectedHeaders)) + keepAliveCheck := newKeepAliveChecker(tc.upstreams) + + connectionHeader := getConnectionHeader(keepAliveCheck, tc.backends) + g.Expect(connectionHeader).To(Equal(tc.expConnectionHeader)) }) } } diff --git a/internal/mode/static/nginx/config/upstreams.go b/internal/mode/static/nginx/config/upstreams.go index 88c66c47fd..7722f6f0a9 100644 --- a/internal/mode/static/nginx/config/upstreams.go +++ b/internal/mode/static/nginx/config/upstreams.go @@ -6,11 +6,15 @@ import ( "github.com/nginxinc/nginx-gateway-fabric/internal/framework/helpers" "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/config/http" + "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/config/policies/upstreamsettings" "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/config/stream" "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/state/dataplane" ) -var upstreamsTemplate = gotemplate.Must(gotemplate.New("upstreams").Parse(upstreamsTemplateText)) +var ( + upstreamsTemplate = gotemplate.Must(gotemplate.New("upstreams").Parse(upstreamsTemplateText)) + streamUpstreamsTemplate = gotemplate.Must(gotemplate.New("streamUpstreams").Parse(streamUpstreamsTemplateText)) +) const ( // nginx503Server is used as a backend for services that cannot be resolved (have no IP address). @@ -29,9 +33,32 @@ const ( plusZoneSizeStream = "1m" ) -func (g GeneratorImpl) executeUpstreams(conf dataplane.Configuration) []executeResult { - upstreams := g.createUpstreams(conf.Upstreams) +// keepAliveChecker takes an upstream name and returns if it has keep alive settings enabled. +type keepAliveChecker func(upstreamName string) bool + +func newKeepAliveChecker(upstreams []http.Upstream) keepAliveChecker { + upstreamMap := make(map[string]http.Upstream) + + for _, upstream := range upstreams { + upstreamMap[upstream.Name] = upstream + } + + return func(upstreamName string) bool { + if upstream, exists := upstreamMap[upstreamName]; exists { + return upstream.KeepAlive.Connections != 0 + } + return false + } +} + +func newExecuteUpstreamsFunc(upstreams []http.Upstream) executeFunc { + return func(_ dataplane.Configuration) []executeResult { + return executeUpstreams(upstreams) + } +} + +func executeUpstreams(upstreams []http.Upstream) []executeResult { result := executeResult{ dest: httpConfigFile, data: helpers.MustExecuteTemplate(upstreamsTemplate, upstreams), @@ -45,7 +72,7 @@ func (g GeneratorImpl) executeStreamUpstreams(conf dataplane.Configuration) []ex result := executeResult{ dest: streamConfigFile, - data: helpers.MustExecuteTemplate(upstreamsTemplate, upstreams), + data: helpers.MustExecuteTemplate(streamUpstreamsTemplate, upstreams), } return []executeResult{result} @@ -87,12 +114,15 @@ func (g GeneratorImpl) createStreamUpstream(up dataplane.Upstream) stream.Upstre } } -func (g GeneratorImpl) createUpstreams(upstreams []dataplane.Upstream) []http.Upstream { +func (g GeneratorImpl) createUpstreams( + upstreams []dataplane.Upstream, + processor upstreamsettings.Processor, +) []http.Upstream { // capacity is the number of upstreams + 1 for the invalid backend ref upstream ups := make([]http.Upstream, 0, len(upstreams)+1) for _, u := range upstreams { - ups = append(ups, g.createUpstream(u)) + ups = append(ups, g.createUpstream(u, processor)) } ups = append(ups, createInvalidBackendRefUpstream()) @@ -100,12 +130,21 @@ func (g GeneratorImpl) createUpstreams(upstreams []dataplane.Upstream) []http.Up return ups } -func (g GeneratorImpl) createUpstream(up dataplane.Upstream) http.Upstream { +func (g GeneratorImpl) createUpstream( + up dataplane.Upstream, + processor upstreamsettings.Processor, +) http.Upstream { + upstreamPolicySettings := processor.Process(up.Policies) + zoneSize := ossZoneSize if g.plus { zoneSize = plusZoneSize } + if upstreamPolicySettings.ZoneSize != "" { + zoneSize = upstreamPolicySettings.ZoneSize + } + if len(up.Endpoints) == 0 { return http.Upstream{ Name: up.Name, @@ -130,9 +169,10 @@ func (g GeneratorImpl) createUpstream(up dataplane.Upstream) http.Upstream { } return http.Upstream{ - Name: up.Name, - ZoneSize: zoneSize, - Servers: upstreamServers, + Name: up.Name, + ZoneSize: zoneSize, + Servers: upstreamServers, + KeepAlive: upstreamPolicySettings.KeepAlive, } } diff --git a/internal/mode/static/nginx/config/upstreams_template.go b/internal/mode/static/nginx/config/upstreams_template.go index a04915bec8..47fac2dd00 100644 --- a/internal/mode/static/nginx/config/upstreams_template.go +++ b/internal/mode/static/nginx/config/upstreams_template.go @@ -5,13 +5,41 @@ package config // NGINX Plus needs 1m to support roughly the same amount of http servers (556 upstream servers). // For stream upstream servers, 512k will support 576 in OSS and 1m will support 991 in NGINX Plus // https://github.com/nginxinc/nginx-gateway-fabric/issues/483 +// +// if the keepalive directive is present, it is necessary to activate the load balancing method before the directive. const upstreamsTemplateText = ` {{ range $u := . }} upstream {{ $u.Name }} { random two least_conn; {{ if $u.ZoneSize -}} zone {{ $u.Name }} {{ $u.ZoneSize }}; - {{ end -}} + {{- end }} + {{ range $server := $u.Servers }} + server {{ $server.Address }}; + {{- end }} + {{ if $u.KeepAlive.Connections -}} + keepalive {{ $u.KeepAlive.Connections }}; + {{- end }} + {{ if $u.KeepAlive.Requests -}} + keepalive_requests {{ $u.KeepAlive.Requests }}; + {{- end }} + {{ if $u.KeepAlive.Time -}} + keepalive_time {{ $u.KeepAlive.Time }}; + {{- end }} + {{ if $u.KeepAlive.Timeout -}} + keepalive_timeout {{ $u.KeepAlive.Timeout }}; + {{- end }} +} +{{ end -}} +` + +const streamUpstreamsTemplateText = ` +{{ range $u := . }} +upstream {{ $u.Name }} { + random two least_conn; + {{ if $u.ZoneSize -}} + zone {{ $u.Name }} {{ $u.ZoneSize }}; + {{- end }} {{ range $server := $u.Servers }} server {{ $server.Address }}; {{- end }} diff --git a/internal/mode/static/nginx/config/upstreams_test.go b/internal/mode/static/nginx/config/upstreams_test.go index 5b3a8268a3..8ce0cbd763 100644 --- a/internal/mode/static/nginx/config/upstreams_test.go +++ b/internal/mode/static/nginx/config/upstreams_test.go @@ -4,8 +4,13 @@ import ( "testing" . "github.com/onsi/gomega" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + ngfAPI "github.com/nginxinc/nginx-gateway-fabric/apis/v1alpha1" + "github.com/nginxinc/nginx-gateway-fabric/internal/framework/helpers" "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/config/http" + "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/config/policies" + "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/config/policies/upstreamsettings" "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/config/stream" "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/state/dataplane" "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/state/resolver" @@ -47,6 +52,32 @@ func TestExecuteUpstreams(t *testing.T) { }, }, }, + { + Name: "up5-usp", + Endpoints: []resolver.Endpoint{ + { + Address: "12.0.0.0", + Port: 80, + }, + }, + Policies: []policies.Policy{ + &ngfAPI.UpstreamSettingsPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "usp", + Namespace: "test", + }, + Spec: ngfAPI.UpstreamSettingsPolicySpec{ + ZoneSize: helpers.GetPointer[ngfAPI.Size]("2m"), + KeepAlive: helpers.GetPointer(ngfAPI.UpstreamKeepAlive{ + Connections: helpers.GetPointer(int32(1)), + Requests: helpers.GetPointer(int32(1)), + Time: helpers.GetPointer[ngfAPI.Duration]("5s"), + Timeout: helpers.GetPointer[ngfAPI.Duration]("10s"), + }), + }, + }, + }, + }, } expectedSubStrings := []string{ @@ -54,21 +85,32 @@ func TestExecuteUpstreams(t *testing.T) { "upstream up2", "upstream up3", "upstream up4-ipv6", + "upstream up5-usp", "upstream invalid-backend-ref", + "server 10.0.0.0:80;", "server 11.0.0.0:80;", "server [2001:db8::1]:80", + "server 12.0.0.0:80;", "server unix:/var/run/nginx/nginx-503-server.sock;", + + "keepalive 1;", + "keepalive_requests 1;", + "keepalive_time 5s;", + "keepalive_timeout 10s;", + "zone up5-usp 2m;", } - upstreamResults := gen.executeUpstreams(dataplane.Configuration{Upstreams: stateUpstreams}) + upstreams := gen.createUpstreams(stateUpstreams, upstreamsettings.NewProcessor()) + + upstreamResults := executeUpstreams(upstreams) g := NewWithT(t) g.Expect(upstreamResults).To(HaveLen(1)) - upstreams := string(upstreamResults[0].data) + nginxUpstreams := string(upstreamResults[0].data) g.Expect(upstreamResults[0].dest).To(Equal(httpConfigFile)) for _, expSubString := range expectedSubStrings { - g.Expect(upstreams).To(ContainSubstring(expSubString)) + g.Expect(nginxUpstreams).To(ContainSubstring(expSubString)) } } @@ -116,6 +158,32 @@ func TestCreateUpstreams(t *testing.T) { }, }, }, + { + Name: "up5-usp", + Endpoints: []resolver.Endpoint{ + { + Address: "12.0.0.0", + Port: 80, + }, + }, + Policies: []policies.Policy{ + &ngfAPI.UpstreamSettingsPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "usp", + Namespace: "test", + }, + Spec: ngfAPI.UpstreamSettingsPolicySpec{ + ZoneSize: helpers.GetPointer[ngfAPI.Size]("2m"), + KeepAlive: helpers.GetPointer(ngfAPI.UpstreamKeepAlive{ + Connections: helpers.GetPointer(int32(1)), + Requests: helpers.GetPointer(int32(1)), + Time: helpers.GetPointer[ngfAPI.Duration]("5s"), + Timeout: helpers.GetPointer[ngfAPI.Duration]("10s"), + }), + }, + }, + }, + }, } expUpstreams := []http.Upstream{ @@ -161,6 +229,21 @@ func TestCreateUpstreams(t *testing.T) { }, }, }, + { + Name: "up5-usp", + ZoneSize: "2m", + Servers: []http.UpstreamServer{ + { + Address: "12.0.0.0:80", + }, + }, + KeepAlive: http.UpstreamKeepAlive{ + Connections: 1, + Requests: 1, + Time: "5s", + Timeout: "10s", + }, + }, { Name: invalidBackendRef, Servers: []http.UpstreamServer{ @@ -172,7 +255,7 @@ func TestCreateUpstreams(t *testing.T) { } g := NewWithT(t) - result := gen.createUpstreams(stateUpstreams) + result := gen.createUpstreams(stateUpstreams, upstreamsettings.NewProcessor()) g.Expect(result).To(Equal(expUpstreams)) } @@ -181,8 +264,8 @@ func TestCreateUpstream(t *testing.T) { gen := GeneratorImpl{} tests := []struct { msg string - stateUpstream dataplane.Upstream expectedUpstream http.Upstream + stateUpstream dataplane.Upstream }{ { stateUpstream: dataplane.Upstream{ @@ -273,13 +356,183 @@ func TestCreateUpstream(t *testing.T) { }, msg: "endpoint ipv6", }, + { + stateUpstream: dataplane.Upstream{ + Name: "single upstreamSettingsPolicy", + Endpoints: []resolver.Endpoint{ + { + Address: "10.0.0.1", + Port: 80, + }, + }, + Policies: []policies.Policy{ + &ngfAPI.UpstreamSettingsPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "usp", + Namespace: "test", + }, + Spec: ngfAPI.UpstreamSettingsPolicySpec{ + ZoneSize: helpers.GetPointer[ngfAPI.Size]("2m"), + KeepAlive: helpers.GetPointer(ngfAPI.UpstreamKeepAlive{ + Connections: helpers.GetPointer(int32(1)), + Requests: helpers.GetPointer(int32(1)), + Time: helpers.GetPointer[ngfAPI.Duration]("5s"), + Timeout: helpers.GetPointer[ngfAPI.Duration]("10s"), + }), + }, + }, + }, + }, + expectedUpstream: http.Upstream{ + Name: "single upstreamSettingsPolicy", + ZoneSize: "2m", + Servers: []http.UpstreamServer{ + { + Address: "10.0.0.1:80", + }, + }, + KeepAlive: http.UpstreamKeepAlive{ + Connections: 1, + Requests: 1, + Time: "5s", + Timeout: "10s", + }, + }, + msg: "single upstreamSettingsPolicy", + }, + { + stateUpstream: dataplane.Upstream{ + Name: "multiple upstreamSettingsPolicies", + Endpoints: []resolver.Endpoint{ + { + Address: "10.0.0.1", + Port: 80, + }, + }, + Policies: []policies.Policy{ + &ngfAPI.UpstreamSettingsPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "usp1", + Namespace: "test", + }, + Spec: ngfAPI.UpstreamSettingsPolicySpec{ + ZoneSize: helpers.GetPointer[ngfAPI.Size]("2m"), + KeepAlive: helpers.GetPointer(ngfAPI.UpstreamKeepAlive{ + Time: helpers.GetPointer[ngfAPI.Duration]("5s"), + Timeout: helpers.GetPointer[ngfAPI.Duration]("10s"), + }), + }, + }, + &ngfAPI.UpstreamSettingsPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "usp2", + Namespace: "test", + }, + Spec: ngfAPI.UpstreamSettingsPolicySpec{ + KeepAlive: helpers.GetPointer(ngfAPI.UpstreamKeepAlive{ + Connections: helpers.GetPointer(int32(1)), + Requests: helpers.GetPointer(int32(1)), + }), + }, + }, + }, + }, + expectedUpstream: http.Upstream{ + Name: "multiple upstreamSettingsPolicies", + ZoneSize: "2m", + Servers: []http.UpstreamServer{ + { + Address: "10.0.0.1:80", + }, + }, + KeepAlive: http.UpstreamKeepAlive{ + Connections: 1, + Requests: 1, + Time: "5s", + Timeout: "10s", + }, + }, + msg: "multiple upstreamSettingsPolicies", + }, + { + stateUpstream: dataplane.Upstream{ + Name: "empty upstreamSettingsPolicies", + Endpoints: []resolver.Endpoint{ + { + Address: "10.0.0.1", + Port: 80, + }, + }, + Policies: []policies.Policy{ + &ngfAPI.UpstreamSettingsPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "usp1", + Namespace: "test", + }, + }, + }, + }, + expectedUpstream: http.Upstream{ + Name: "empty upstreamSettingsPolicies", + ZoneSize: ossZoneSize, + Servers: []http.UpstreamServer{ + { + Address: "10.0.0.1:80", + }, + }, + }, + msg: "empty upstreamSettingsPolicies", + }, + { + stateUpstream: dataplane.Upstream{ + Name: "upstreamSettingsPolicy with only keep alive settings", + Endpoints: []resolver.Endpoint{ + { + Address: "10.0.0.1", + Port: 80, + }, + }, + Policies: []policies.Policy{ + &ngfAPI.UpstreamSettingsPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "usp1", + Namespace: "test", + }, + Spec: ngfAPI.UpstreamSettingsPolicySpec{ + KeepAlive: helpers.GetPointer(ngfAPI.UpstreamKeepAlive{ + Connections: helpers.GetPointer(int32(1)), + Requests: helpers.GetPointer(int32(1)), + Time: helpers.GetPointer[ngfAPI.Duration]("5s"), + Timeout: helpers.GetPointer[ngfAPI.Duration]("10s"), + }), + }, + }, + }, + }, + expectedUpstream: http.Upstream{ + Name: "upstreamSettingsPolicy with only keep alive settings", + ZoneSize: ossZoneSize, + Servers: []http.UpstreamServer{ + { + Address: "10.0.0.1:80", + }, + }, + KeepAlive: http.UpstreamKeepAlive{ + Connections: 1, + Requests: 1, + Time: "5s", + Timeout: "10s", + }, + }, + msg: "upstreamSettingsPolicy with only keep alive settings", + }, } for _, test := range tests { t.Run(test.msg, func(t *testing.T) { t.Parallel() g := NewWithT(t) - result := gen.createUpstream(test.stateUpstream) + result := gen.createUpstream(test.stateUpstream, upstreamsettings.NewProcessor()) g.Expect(result).To(Equal(test.expectedUpstream)) }) } @@ -308,7 +561,7 @@ func TestCreateUpstreamPlus(t *testing.T) { }, } - result := gen.createUpstream(stateUpstream) + result := gen.createUpstream(stateUpstream, upstreamsettings.NewProcessor()) g := NewWithT(t) g.Expect(result).To(Equal(expectedUpstream)) @@ -505,3 +758,215 @@ func TestCreateStreamUpstreamPlus(t *testing.T) { g := NewWithT(t) g.Expect(result).To(Equal(expectedUpstream)) } + +func TestKeepAliveChecker(t *testing.T) { + t.Parallel() + + tests := []struct { + msg string + upstreams []http.Upstream + expKeepAliveEnabled []bool + }{ + { + msg: "upstream with all keepAlive fields set", + upstreams: []http.Upstream{ + { + Name: "upAllKeepAliveFieldsSet", + KeepAlive: http.UpstreamKeepAlive{ + Connections: 1, + Requests: 1, + Time: "5s", + Timeout: "10s", + }, + }, + }, + expKeepAliveEnabled: []bool{ + true, + }, + }, + { + msg: "upstream with keepAlive connection field set", + upstreams: []http.Upstream{ + { + Name: "upKeepAliveConnectionsSet", + KeepAlive: http.UpstreamKeepAlive{ + Connections: 1, + }, + }, + }, + expKeepAliveEnabled: []bool{ + true, + }, + }, + { + msg: "upstream with keepAlive requests field set", + upstreams: []http.Upstream{ + { + Name: "upKeepAliveRequestsSet", + KeepAlive: http.UpstreamKeepAlive{ + Requests: 1, + }, + }, + }, + expKeepAliveEnabled: []bool{ + false, + }, + }, + { + msg: "upstream with keepAlive time field set", + upstreams: []http.Upstream{ + { + Name: "upKeepAliveTimeSet", + KeepAlive: http.UpstreamKeepAlive{ + Time: "5s", + }, + }, + }, + expKeepAliveEnabled: []bool{ + false, + }, + }, + { + msg: "upstream with keepAlive timeout field set", + upstreams: []http.Upstream{ + { + Name: "upKeepAliveTimeoutSet", + KeepAlive: http.UpstreamKeepAlive{ + Timeout: "10s", + }, + }, + }, + expKeepAliveEnabled: []bool{ + false, + }, + }, + { + msg: "upstream with no keepAlive fields set", + upstreams: []http.Upstream{ + { + Name: "upNoKeepAliveFieldsSet", + }, + }, + expKeepAliveEnabled: []bool{ + false, + }, + }, + { + msg: "upstream with keepAlive fields set to empty values", + upstreams: []http.Upstream{ + { + Name: "upKeepAliveFieldsEmpty", + KeepAlive: http.UpstreamKeepAlive{ + Connections: 0, + Requests: 0, + Time: "", + Timeout: "", + }, + }, + }, + expKeepAliveEnabled: []bool{ + false, + }, + }, + { + msg: "multiple upstreams with keepAlive fields set", + upstreams: []http.Upstream{ + { + Name: "upstream1", + KeepAlive: http.UpstreamKeepAlive{ + Connections: 1, + Requests: 1, + Time: "5s", + Timeout: "10s", + }, + }, + { + Name: "upstream2", + KeepAlive: http.UpstreamKeepAlive{ + Connections: 1, + Requests: 1, + Time: "5s", + Timeout: "10s", + }, + }, + { + Name: "upstream3", + KeepAlive: http.UpstreamKeepAlive{ + Connections: 1, + Requests: 1, + Time: "5s", + Timeout: "10s", + }, + }, + }, + expKeepAliveEnabled: []bool{ + true, + true, + true, + }, + }, + { + msg: "mix of keepAlive enabled upstreams and disabled upstreams", + upstreams: []http.Upstream{ + { + Name: "upstream1", + KeepAlive: http.UpstreamKeepAlive{ + Connections: 1, + Requests: 1, + Time: "5s", + Timeout: "10s", + }, + }, + { + Name: "upstream2", + }, + { + Name: "upstream3", + KeepAlive: http.UpstreamKeepAlive{ + Connections: 1, + Requests: 1, + Time: "5s", + Timeout: "10s", + }, + }, + }, + expKeepAliveEnabled: []bool{ + true, + false, + true, + }, + }, + { + msg: "all upstreams without keepAlive fields set", + upstreams: []http.Upstream{ + { + Name: "upstream1", + }, + { + Name: "upstream2", + }, + { + Name: "upstream3", + }, + }, + expKeepAliveEnabled: []bool{ + false, + false, + false, + }, + }, + } + + for _, test := range tests { + t.Run(test.msg, func(t *testing.T) { + t.Parallel() + g := NewWithT(t) + + keepAliveCheck := newKeepAliveChecker(test.upstreams) + + for index, upstream := range test.upstreams { + g.Expect(keepAliveCheck(upstream.Name)).To(Equal(test.expKeepAliveEnabled[index])) + } + }) + } +} diff --git a/internal/mode/static/state/dataplane/types.go b/internal/mode/static/state/dataplane/types.go index 274897c007..974efbb142 100644 --- a/internal/mode/static/state/dataplane/types.go +++ b/internal/mode/static/state/dataplane/types.go @@ -111,6 +111,8 @@ type Upstream struct { ErrorMsg string // Endpoints are the endpoints of the Upstream. Endpoints []resolver.Endpoint + // Policies contains the list of policies that are applied to this Upstream. + Policies []policies.Policy } // SSL is the SSL configuration for a server.