Skip to content

Commit

Permalink
Add LoadBalancer IR to HTTPRoute
Browse files Browse the repository at this point in the history
Relates to envoyproxy#1105

Signed-off-by: Arko Dasgupta <[email protected]>
  • Loading branch information
arkodg committed Oct 19, 2023
1 parent 23c91a6 commit 03ca370
Show file tree
Hide file tree
Showing 17 changed files with 483 additions and 21 deletions.
64 changes: 64 additions & 0 deletions internal/ir/xds.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ var (
ErrAddHeaderDuplicate = errors.New("header modifier filter attempts to add the same header more than once (case insensitive)")
ErrRemoveHeaderDuplicate = errors.New("header modifier filter attempts to remove the same header more than once (case insensitive)")
ErrRequestAuthenRequiresJwt = errors.New("jwt field is required when request authentication is set")
ErrLoadBalancerInvalid = errors.New("loadBalancer setting is invalid, only one setting can be set")
)

// Xds holds the intermediate representation of a Gateway and is
Expand Down Expand Up @@ -276,6 +277,8 @@ type HTTPRoute struct {
RequestAuthentication *RequestAuthentication `json:"requestAuthentication,omitempty" yaml:"requestAuthentication,omitempty"`
// Timeout is the time until which entire response is received from the upstream.
Timeout *metav1.Duration `json:"timeout,omitempty" yaml:"timeout,omitempty"`
// load balancer policy to use when routing to the backend endpoints.
LoadBalancer *LoadBalancer `json:"loadBalancer,omitempty" yaml:"loadBalancer,omitempty"`
// ExtensionRefs holds unstructured resources that were introduced by an extension and used on the HTTPRoute as extensionRef filters
ExtensionRefs []*UnstructuredRef `json:"extensionRefs,omitempty" yaml:"extensionRefs,omitempty"`
}
Expand Down Expand Up @@ -420,6 +423,12 @@ func (h HTTPRoute) Validate() error {
}
}
}
if h.LoadBalancer != nil {
if err := h.LoadBalancer.Validate(); err != nil {
errs = multierror.Append(errs, err)
}
}

return errs
}

Expand Down Expand Up @@ -952,3 +961,58 @@ type TCPKeepalive struct {
// Defaults to `75s`.
Interval *uint32 `json:"interval,omitempty" yaml:"interval,omitempty"`
}

// LoadBalancer defines the load balancer settings.
// +k8s:deepcopy-gen=true
type LoadBalancer struct {
// RoundRobin load balacning policy
RoundRobin *RoundRobin `json:"roundRobin,omitempty" yaml:"roundRobin,omitempty"`
// LeastRequest load balancer policy
LeastRequest *LeastRequest `json:"leastRequest,omitempty" yaml:"leastRequest,omitempty"`
// Random load balancer policy
Random *Random `json:"random,omitempty" yaml:"random,omitempty"`
// ConsistentHash load balancer policy
ConsistentHash *ConsistentHash `json:"consistentHash,omitempty" yaml:"consistentHash,omitempty"`
}

// Validate the fields within the LoadBalancer structure
func (l *LoadBalancer) Validate() error {
var errs error
matchCount := 0
if l.RoundRobin != nil {
matchCount++
}
if l.LeastRequest != nil {
matchCount++
}
if l.Random != nil {
matchCount++
}
if l.ConsistentHash != nil {
matchCount++
}
if matchCount != 1 {
errs = multierror.Append(errs, ErrLoadBalancerInvalid)
}

return errs
}

// RoundRobin load balancer settings
// +k8s:deepcopy-gen=true
type RoundRobin struct{}

// LeastRequest load balancer settings
// +k8s:deepcopy-gen=true
type LeastRequest struct{}

// Random load balancer settings
// +k8s:deepcopy-gen=true
type Random struct{}

// ConsistentHash load balancer settings
// +k8s:deepcopy-gen=true
type ConsistentHash struct {
// Hash based on the Source IP Address
SourceIP *bool `json:"sourceIP,omitempty" yaml:"sourceIP,omitempty"`
}
34 changes: 34 additions & 0 deletions internal/ir/xds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1110,6 +1110,40 @@ func TestValidateJwtRequestAuthentication(t *testing.T) {
}
}

func TestValidateLoadBalancer(t *testing.T) {
tests := []struct {
name string
input LoadBalancer
want error
}{
{
name: "random",
input: LoadBalancer{
Random: &Random{},
},
want: nil,
},
{
name: "least request and random set",
input: LoadBalancer{
Random: &Random{},
LeastRequest: &LeastRequest{},
},
want: ErrLoadBalancerInvalid,
},
}
for i := range tests {
test := tests[i]
t.Run(test.name, func(t *testing.T) {
if test.want == nil {
require.NoError(t, test.input.Validate())
} else {
require.EqualError(t, test.input.Validate(), test.want.Error())
}
})
}
}

func TestPrintable(t *testing.T) {
tests := []struct {
name string
Expand Down
105 changes: 105 additions & 0 deletions internal/ir/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion internal/xds/translator/accesslog.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func processClusterForAccessLog(tCtx *types.ResourceVersionTable, al *ir.AccessL
Weight: ptr.To(uint32(1)),
Endpoints: []*ir.DestinationEndpoint{ir.NewDestEndpoint(otel.Host, otel.Port)},
}
if err := addXdsCluster(tCtx, addXdsClusterArgs{
if err := addXdsCluster(tCtx, &xdsClusterArgs{
name: clusterName,
settings: []*ir.DestinationSetting{ds},
tSocket: nil,
Expand Down
2 changes: 1 addition & 1 deletion internal/xds/translator/authentication.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func createJwksClusters(tCtx *types.ResourceVersionTable, routes []*ir.HTTPRoute
if err != nil {
return err
}
if err := addXdsCluster(tCtx, addXdsClusterArgs{
if err := addXdsCluster(tCtx, &xdsClusterArgs{
name: jwks.name,
settings: []*ir.DestinationSetting{ds},
tSocket: tSocket,
Expand Down
29 changes: 21 additions & 8 deletions internal/xds/translator/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,10 @@ const (
tcpClusterPerConnectionBufferLimitBytes = 32768
)

func buildXdsCluster(clusterName string, tSocket *corev3.TransportSocket, protocol ProtocolType, endpointType EndpointType) *clusterv3.Cluster {
func buildXdsCluster(args *xdsClusterArgs) *clusterv3.Cluster {
cluster := &clusterv3.Cluster{
Name: clusterName,
Name: args.name,
ConnectTimeout: durationpb.New(10 * time.Second),
LbPolicy: clusterv3.Cluster_LEAST_REQUEST,
DnsLookupFamily: clusterv3.Cluster_V4_ONLY,
CommonLbConfig: &clusterv3.Cluster_CommonLbConfig{
LocalityConfigSpecifier: &clusterv3.Cluster_CommonLbConfig_LocalityWeightedLbConfig_{
Expand All @@ -40,14 +39,14 @@ func buildXdsCluster(clusterName string, tSocket *corev3.TransportSocket, protoc
PerConnectionBufferLimitBytes: wrapperspb.UInt32(tcpClusterPerConnectionBufferLimitBytes),
}

if tSocket != nil {
cluster.TransportSocket = tSocket
if args.tSocket != nil {
cluster.TransportSocket = args.tSocket
}

if endpointType == Static {
if args.endpointType == Static {
cluster.ClusterDiscoveryType = &clusterv3.Cluster_Type{Type: clusterv3.Cluster_EDS}
cluster.EdsClusterConfig = &clusterv3.Cluster_EdsClusterConfig{
ServiceName: clusterName,
ServiceName: args.name,
EdsConfig: &corev3.ConfigSource{
ResourceApiVersion: resource.DefaultAPIVersion,
ConfigSourceSpecifier: &corev3.ConfigSource_Ads{
Expand All @@ -61,10 +60,24 @@ func buildXdsCluster(clusterName string, tSocket *corev3.TransportSocket, protoc
cluster.RespectDnsTtl = true
}

if protocol == HTTP2 {
if args.protocol == HTTP2 {
cluster.TypedExtensionProtocolOptions = buildTypedExtensionProtocolOptions()
}

// Set Load Balancer policy
//nolint:gocritic
if args.loadBalancer == nil {
cluster.LbPolicy = clusterv3.Cluster_LEAST_REQUEST
} else if args.loadBalancer.LeastRequest != nil {
cluster.LbPolicy = clusterv3.Cluster_LEAST_REQUEST
} else if args.loadBalancer.RoundRobin != nil {
cluster.LbPolicy = clusterv3.Cluster_ROUND_ROBIN
} else if args.loadBalancer.Random != nil {
cluster.LbPolicy = clusterv3.Cluster_RANDOM
} else if args.loadBalancer.ConsistentHash != nil {
cluster.LbPolicy = clusterv3.Cluster_MAGLEV
}

return cluster
}

Expand Down
8 changes: 7 additions & 1 deletion internal/xds/translator/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,13 @@ const (
func TestBuildXdsCluster(t *testing.T) {
bootstrapXdsCluster := getXdsClusterObjFromBootstrap(t)

dynamicXdsCluster := buildXdsCluster(bootstrapXdsCluster.Name, bootstrapXdsCluster.TransportSocket, HTTP2, DefaultEndpointType)
args := &xdsClusterArgs{
name: bootstrapXdsCluster.Name,
tSocket: bootstrapXdsCluster.TransportSocket,
protocol: HTTP2,
endpointType: DefaultEndpointType,
}
dynamicXdsCluster := buildXdsCluster(args)

require.Equal(t, bootstrapXdsCluster.Name, dynamicXdsCluster.Name)
require.Equal(t, bootstrapXdsCluster.ClusterDiscoveryType, dynamicXdsCluster.ClusterDiscoveryType)
Expand Down
2 changes: 1 addition & 1 deletion internal/xds/translator/ratelimit.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ func (t *Translator) createRateLimitServiceCluster(tCtx *types.ResourceVersionTa
return err
}

if err := addXdsCluster(tCtx, addXdsClusterArgs{
if err := addXdsCluster(tCtx, &xdsClusterArgs{
name: clusterName,
settings: []*ir.DestinationSetting{ds},
tSocket: tSocket,
Expand Down
Loading

0 comments on commit 03ca370

Please sign in to comment.