Skip to content

Commit

Permalink
NET-5073 - ProxyConfiguration: implement various connection options
Browse files Browse the repository at this point in the history
  • Loading branch information
jmurret committed Oct 14, 2023
1 parent 105ebfd commit e093aa7
Show file tree
Hide file tree
Showing 6 changed files with 250 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (b *Builder) buildExposePaths(workload *pbcatalog.Workload) {
buildListener()

b.addExposePathsRoute(exposePath, clusterName).
addLocalAppCluster(clusterName).
addLocalAppCluster(clusterName, fmt.Sprintf("%d", exposePath.LocalPathPort)).
addLocalAppStaticEndpoints(clusterName, exposePath.LocalPathPort)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ package builder
import (
"fmt"

pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v2beta1"
"google.golang.org/protobuf/types/known/wrapperspb"

"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/envoyextensions/xdscommon"
pbauth "github.com/hashicorp/consul/proto-public/pbauth/v2beta1"
Expand All @@ -31,13 +34,13 @@ func (b *Builder) BuildLocalApp(workload *pbcatalog.Workload, ctp *pbauth.Comput

if port.Protocol != pbcatalog.Protocol_PROTOCOL_MESH {
foundInboundNonMeshPorts = true
lb.addInboundRouter(clusterName, routeName, port, portName, trafficPermissions[portName]).
lb.addInboundRouter(clusterName, routeName, port, portName, trafficPermissions[portName], b.proxyCfg.GetDynamicConfig().GetInboundConnections()).
addInboundTLS()

if isL7(port.Protocol) {
b.addLocalAppRoute(routeName, clusterName)
}
b.addLocalAppCluster(clusterName).
b.addLocalAppCluster(clusterName, portName).
addLocalAppStaticEndpoints(clusterName, port.GetPort())
}
}
Expand Down Expand Up @@ -264,10 +267,16 @@ func (b *Builder) addInboundListener(name string, workload *pbcatalog.Workload)
// Add TLS inspection capability to be able to parse ALPN and/or SNI information from inbound connections.
listener.Capabilities = append(listener.Capabilities, pbproxystate.Capability_CAPABILITY_L4_TLS_INSPECTION)

if b.proxyCfg != nil && b.proxyCfg.DynamicConfig != nil && b.proxyCfg.DynamicConfig.InboundConnections != nil {
listener.BalanceConnections = pbproxystate.BalanceConnections(b.proxyCfg.DynamicConfig.InboundConnections.BalanceInboundConnections)
}
return b.NewListenerBuilder(listener)
}

func (l *ListenerBuilder) addInboundRouter(clusterName string, routeName string, port *pbcatalog.WorkloadPort, portName string, tp *pbproxystate.TrafficPermissions) *ListenerBuilder {
func (l *ListenerBuilder) addInboundRouter(clusterName string, routeName string,
port *pbcatalog.WorkloadPort, portName string, tp *pbproxystate.TrafficPermissions,
ic *pbmesh.InboundConnectionsConfig) *ListenerBuilder {

if l.listener == nil {
return l
}
Expand All @@ -289,6 +298,15 @@ func (l *ListenerBuilder) addInboundRouter(clusterName string, routeName string,
AlpnProtocols: []string{getAlpnProtocolFromPortName(portName)},
},
}

if ic != nil {
// MaxInboundConnections is uint32 that is used on:
// - router destinations MaxInboundConnection (uint64)
// - cluster circuit breakers UpstreamLimits.MaxConnections (uint32)
// It is cast to a uint64 here similarly as it is to the proxystateconverter code
r.GetL4().MaxInboundConnections = uint64(ic.MaxInboundConnections)
}

l.listener.Routers = append(l.listener.Routers, r)
} else if isL7(port.Protocol) {
r := &pbproxystate.Router{
Expand Down Expand Up @@ -373,9 +391,9 @@ func isL7(protocol pbcatalog.Protocol) bool {
return false
}

func (b *Builder) addLocalAppCluster(clusterName string) *Builder {
func (b *Builder) addLocalAppCluster(clusterName, portName string) *Builder {
// Make cluster for this router destination.
b.proxyStateTemplate.ProxyState.Clusters[clusterName] = &pbproxystate.Cluster{
cluster := &pbproxystate.Cluster{
Group: &pbproxystate.Cluster_EndpointGroup{
EndpointGroup: &pbproxystate.EndpointGroup{
Group: &pbproxystate.EndpointGroup_Static{
Expand All @@ -384,20 +402,34 @@ func (b *Builder) addLocalAppCluster(clusterName string) *Builder {
},
},
}

// configure inbound connections or connection timeout if either is defined
if b.proxyCfg.GetDynamicConfig() != nil {
lc, lcOK := b.proxyCfg.DynamicConfig.LocalConnection[portName]

if lcOK || b.proxyCfg.DynamicConfig.InboundConnections != nil {
cluster.GetEndpointGroup().GetStatic().Config = &pbproxystate.StaticEndpointGroupConfig{}

if lcOK {
cluster.GetEndpointGroup().GetStatic().GetConfig().ConnectTimeout = lc.ConnectTimeout
}

if b.proxyCfg.DynamicConfig.InboundConnections != nil {
cluster.GetEndpointGroup().GetStatic().GetConfig().CircuitBreakers = &pbproxystate.CircuitBreakers{
UpstreamLimits: &pbproxystate.UpstreamLimits{
MaxConnections: &wrapperspb.UInt32Value{Value: b.proxyCfg.DynamicConfig.InboundConnections.MaxInboundConnections},
},
}
}
}
}

b.proxyStateTemplate.ProxyState.Clusters[clusterName] = cluster
return b
}

func (b *Builder) addBlackHoleCluster() *Builder {
b.proxyStateTemplate.ProxyState.Clusters[xdscommon.BlackHoleClusterName] = &pbproxystate.Cluster{
Group: &pbproxystate.Cluster_EndpointGroup{
EndpointGroup: &pbproxystate.EndpointGroup{
Group: &pbproxystate.EndpointGroup_Static{
Static: &pbproxystate.StaticEndpointGroup{},
},
},
},
}
return b
return b.addLocalAppCluster(xdscommon.BlackHoleClusterName, "")
}

func (b *Builder) addLocalAppStaticEndpoints(clusterName string, port uint32) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
package builder

import (
"google.golang.org/protobuf/types/known/durationpb"
"sort"
"testing"
"time"

"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -139,17 +142,61 @@ func TestBuildLocalApp_WithProxyConfiguration(t *testing.T) {
},
},
},
"source/local-and-inbound-connections": {
workload: &pbcatalog.Workload{
Addresses: []*pbcatalog.WorkloadAddress{
{
Host: "10.0.0.1",
},
},
Ports: map[string]*pbcatalog.WorkloadPort{
"port1": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_TCP},
"port2": {Port: 20000, Protocol: pbcatalog.Protocol_PROTOCOL_MESH},
"port3": {Port: 8081, Protocol: pbcatalog.Protocol_PROTOCOL_TCP},
},
},
proxyCfg: &pbmesh.ComputedProxyConfiguration{
DynamicConfig: &pbmesh.DynamicConfig{
LocalConnection: map[string]*pbmesh.ConnectionConfig{
"port1": {
ConnectTimeout: durationpb.New(6 * time.Second),
RequestTimeout: durationpb.New(7 * time.Second)},
"port3": {
ConnectTimeout: durationpb.New(8 * time.Second),
RequestTimeout: durationpb.New(9 * time.Second)},
},
InboundConnections: &pbmesh.InboundConnectionsConfig{
MaxInboundConnections: 123,
BalanceInboundConnections: pbmesh.BalanceConnections(pbproxystate.BalanceConnections_BALANCE_CONNECTIONS_EXACT),
},
},
},
},
}

for name, c := range cases {
t.Run(name, func(t *testing.T) {
proxyTmpl := New(testProxyStateTemplateID(), testIdentityRef(), "foo.consul", "dc1", true, c.proxyCfg).
BuildLocalApp(c.workload, nil).
Build()

// sort routers because of test flakes where order was flip flopping.
actualRouters := proxyTmpl.ProxyState.Listeners[0].Routers
sort.Slice(actualRouters, func(i, j int) bool {
return actualRouters[i].String() < actualRouters[j].String()
})

actual := protoToJSON(t, proxyTmpl)
expected := golden.Get(t, actual, name+".golden")
expected := JSONToProxyTemplate(t, golden.GetBytes(t, actual, name+".golden"))

require.JSONEq(t, expected, actual)
// sort routers on listener from golden file
expectedRouters := expected.ProxyState.Listeners[0].Routers
sort.Slice(expectedRouters, func(i, j int) bool {
return expectedRouters[i].String() < expectedRouters[j].String()
})

// convert back to json after sorting so that test output does not contain extraneous fields.
require.Equal(t, protoToJSON(t, expected), protoToJSON(t, proxyTmpl))
})
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
{
"proxyState": {
"clusters": {
"local_app:port1": {
"endpointGroup": {
"static": {
"config": {
"connectTimeout": "6s",
"circuitBreakers": {
"upstreamLimits": {
"maxConnections": 123
}
}
}
}
},
"name": "local_app:port1"
},
"local_app:port3": {
"endpointGroup": {
"static": {
"config": {
"connectTimeout": "8s",
"circuitBreakers": {
"upstreamLimits": {
"maxConnections": 123
}
}
}
}
},
"name": "local_app:port3"
}
},
"endpoints": {
"local_app:port1": {
"endpoints": [
{
"hostPort": {
"host": "127.0.0.1",
"port": 8080
}
}
]
},
"local_app:port3": {
"endpoints": [
{
"hostPort": {
"host": "127.0.0.1",
"port": 8081
}
}
]
}
},
"identity": {
"name": "test-identity",
"tenancy": {
"namespace": "default",
"partition": "default",
"peerName": "local"
},
"type": {
"group": "auth",
"groupVersion": "v2beta1",
"kind": "WorkloadIdentity"
}
},
"listeners": [
{
"capabilities": [
"CAPABILITY_L4_TLS_INSPECTION"
],
"direction": "DIRECTION_INBOUND",
"hostPort": {
"host": "10.0.0.1",
"port": 20000
},
"name": "public_listener",
"balanceConnections": "BALANCE_CONNECTIONS_EXACT",
"routers": [
{
"inboundTls": {
"inboundMesh": {
"identityKey": "test-identity",
"validationContext": {
"trustBundlePeerNameKeys": [
"local"
]
}
}
},
"l4": {
"cluster": {
"name": "local_app:port1"
},
"maxInboundConnections": 123,
"statPrefix": "public_listener",
"trafficPermissions": {}
},
"match": {
"alpnProtocols": [
"consul~port1"
]
}
},
{
"inboundTls": {
"inboundMesh": {
"identityKey": "test-identity",
"validationContext": {
"trustBundlePeerNameKeys": [
"local"
]
}
}
},
"l4": {
"cluster": {
"name": "local_app:port3"
},
"maxInboundConnections": 123,
"statPrefix": "public_listener",
"trafficPermissions": {}
},
"match": {
"alpnProtocols": [
"consul~port3"
]
}
}
]
}
]
},
"requiredLeafCertificates": {
"test-identity": {
"name": "test-identity",
"namespace": "default",
"partition": "default"
}
},
"requiredTrustBundles": {
"local": {
"peer": "local"
}
}
}
6 changes: 3 additions & 3 deletions proto-public/pbmesh/v2beta1/connection.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion proto-public/pbmesh/v2beta1/connection.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ message ConnectionConfig {

// Referenced by ProxyConfiguration
message InboundConnectionsConfig {
uint64 max_inbound_connections = 12;
uint32 max_inbound_connections = 12;
BalanceConnections balance_inbound_connections = 13;
}

Expand Down

0 comments on commit e093aa7

Please sign in to comment.