Skip to content

Commit

Permalink
feat: Adding extension server policy handling. (#3371)
Browse files Browse the repository at this point in the history
* Adding extension server policy handling.

Signed-off-by: Lior Okman <[email protected]>

* added missing license header

Signed-off-by: Lior Okman <[email protected]>

* Make the linter happy

Signed-off-by: Lior Okman <[email protected]>

* Make spell-checking linter satisfied

Signed-off-by: Lior Okman <[email protected]>

* Added some clarifying comments, fixed watchable key deletion in the
runner

Signed-off-by: Lior Okman <[email protected]>

* Added tests

Signed-off-by: Lior Okman <[email protected]>

* Add tests

Signed-off-by: Lior Okman <[email protected]>

* Finishing the rebase to the latest main.

Signed-off-by: Lior Okman <[email protected]>

* Added an XDS level unit test.

Signed-off-by: Lior Okman <[email protected]>

* Made the GCI linter happy

Signed-off-by: Lior Okman <[email protected]>

* Some more unit tests.

Signed-off-by: Lior Okman <[email protected]>

* Resolved comments provided in the review.

Signed-off-by: Lior Okman <[email protected]>

* Regenerate deepcopies.

Signed-off-by: Lior Okman <[email protected]>

* Removed the logger from the Translator type.

Signed-off-by: Lior Okman <[email protected]>

* Only start the status updater for extension server policies if there is
an extension server configured.

Signed-off-by: Lior Okman <[email protected]>

* Changed the Translate function signature to return an error, and log any
returned errors in the runner.

Signed-off-by: Lior Okman <[email protected]>

---------

Signed-off-by: Lior Okman <[email protected]>
  • Loading branch information
liorokman authored May 31, 2024
1 parent dc201ba commit 11349b1
Show file tree
Hide file tree
Showing 41 changed files with 2,002 additions and 117 deletions.
9 changes: 8 additions & 1 deletion api/v1alpha1/envoygateway_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,11 +451,18 @@ type RateLimitRedisSettings struct {
// ExtensionManager defines the configuration for registering an extension manager to
// the Envoy Gateway control plane.
type ExtensionManager struct {
// Resources defines the set of K8s resources the extension will handle.
// Resources defines the set of K8s resources the extension will handle as route
// filter resources
//
// +optional
Resources []GroupVersionKind `json:"resources,omitempty"`

// PolicyResources defines the set of K8S resources the extension server will handle
// as directly attached GatewayAPI policies
//
// +optional
PolicyResources []GroupVersionKind `json:"policyResources,omitempty"`

// Hooks defines the set of hooks the extension supports
//
// +kubebuilder:validation:Required
Expand Down
5 changes: 5 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions internal/cmd/egctl/translate.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func translateGatewayAPIToIR(resources *gatewayapi.Resources) (*gatewayapi.Trans
}
}

result := t.Translate(resources)
result, _ := t.Translate(resources)

return result, nil
}
Expand All @@ -310,7 +310,7 @@ func translateGatewayAPIToGatewayAPI(resources *gatewayapi.Resources) (gatewayap
EndpointRoutingDisabled: true,
EnvoyPatchPolicyEnabled: true,
}
gRes := gTranslator.Translate(resources)
gRes, _ := gTranslator.Translate(resources)
// Update the status of the GatewayClass based on EnvoyProxy validation
epInvalid := false
if resources.EnvoyProxy != nil {
Expand Down Expand Up @@ -342,7 +342,7 @@ func translateGatewayAPIToXds(dnsDomain string, resourceType string, resources *
EndpointRoutingDisabled: true,
EnvoyPatchPolicyEnabled: true,
}
gRes := gTranslator.Translate(resources)
gRes, _ := gTranslator.Translate(resources)

keys := []string{}
for key := range gRes.XdsIR {
Expand Down
29 changes: 22 additions & 7 deletions internal/extension/registry/xds_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,14 @@ type XDSHook struct {
grpcClient extension.EnvoyGatewayExtensionClient
}

func (h *XDSHook) PostRouteModifyHook(route *route.Route, routeHostnames []string, extensionResources []*unstructured.Unstructured) (*route.Route, error) {
// Take all of the unstructured resources for the extension and package them into bytes
func translateUnstructuredToUnstructuredBytes(e []*unstructured.Unstructured) ([]*extension.ExtensionResource, error) {
extensionResourceBytes := []*extension.ExtensionResource{}
for _, res := range extensionResources {
for _, res := range e {
if res != nil {
unstructuredBytes, err := res.MarshalJSON()
// This is probably a programming error, but just return the unmodified route if so
if err != nil {
return route, err
return nil, err
}

extensionResourceBytes = append(extensionResourceBytes,
Expand All @@ -42,6 +41,15 @@ func (h *XDSHook) PostRouteModifyHook(route *route.Route, routeHostnames []strin
)
}
}
return extensionResourceBytes, nil
}

func (h *XDSHook) PostRouteModifyHook(route *route.Route, routeHostnames []string, extensionResources []*unstructured.Unstructured) (*route.Route, error) {
// Take all of the unstructured resources for the extension and package them into bytes
extensionResourceBytes, err := translateUnstructuredToUnstructuredBytes(extensionResources)
if err != nil {
return route, err
}

// Make the request to the extension server
ctx := context.Background()
Expand Down Expand Up @@ -75,13 +83,20 @@ func (h *XDSHook) PostVirtualHostModifyHook(vh *route.VirtualHost) (*route.Virtu
return resp.VirtualHost, nil
}

func (h *XDSHook) PostHTTPListenerModifyHook(l *listener.Listener) (*listener.Listener, error) {
func (h *XDSHook) PostHTTPListenerModifyHook(l *listener.Listener, extensionResources []*unstructured.Unstructured) (*listener.Listener, error) {
// Take all of the unstructured resources for the extension and package them into bytes
extensionResourceBytes, err := translateUnstructuredToUnstructuredBytes(extensionResources)
if err != nil {
return l, err
}
// Make the request to the extension server
ctx := context.Background()
resp, err := h.grpcClient.PostHTTPListenerModify(ctx,
&extension.PostHTTPListenerModifyRequest{
Listener: l,
PostListenerContext: &extension.PostHTTPListenerExtensionContext{},
Listener: l,
PostListenerContext: &extension.PostHTTPListenerExtensionContext{
ExtensionResources: extensionResourceBytes,
},
})
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion internal/extension/types/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type XDSHookClient interface {
// PostHTTPListenerModifyHook allows an extension to make changes to a Listener generated by Envoy Gateway before it is finalized.
// PostHTTPListenerModifyHook is always executed when an extension is loaded. An extension may return nil
// in order to not make any changes to it.
PostHTTPListenerModifyHook(*listener.Listener) (*listener.Listener, error)
PostHTTPListenerModifyHook(listener *listener.Listener, extensionResources []*unstructured.Unstructured) (*listener.Listener, error)

// PostTranslateModifyHook allows an extension to modify the clusters and secrets in the xDS config.
// This allows for inserting clusters that may change along with extension specific configuration to be dynamically created rather than
Expand Down
205 changes: 205 additions & 0 deletions internal/gatewayapi/extensionserverpolicy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
// Copyright Envoy Gateway Authors
// SPDX-License-Identifier: Apache-2.0
// The full text of the Apache license is available in the LICENSE file at
// the root of the repo.

package gatewayapi

import (
"encoding/json"
"errors"
"fmt"
"sort"
"strings"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
gwv1a2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
gwv1b1 "sigs.k8s.io/gateway-api/apis/v1beta1"

"github.com/envoyproxy/gateway/internal/gatewayapi/status"
"github.com/envoyproxy/gateway/internal/ir"
"github.com/envoyproxy/gateway/internal/utils"
)

func (t *Translator) ProcessExtensionServerPolicies(policies []unstructured.Unstructured,
gateways []*GatewayContext,
xdsIR XdsIRMap,
) ([]unstructured.Unstructured, error) {
res := []unstructured.Unstructured{}

// Sort based on timestamp
sort.Slice(policies, func(i, j int) bool {
iTime := policies[i].GetCreationTimestamp()
jTime := policies[j].GetCreationTimestamp()
return iTime.Before(&jTime)
})

// First build a map out of the gateways for faster lookup
gatewayMap := map[types.NamespacedName]*policyGatewayTargetContext{}
for _, gw := range gateways {
key := utils.NamespacedName(gw)
gatewayMap[key] = &policyGatewayTargetContext{GatewayContext: gw}
}

var errs error
// Process the policies targeting Gateways. Only update the policy status if it was accepted.
// A policy is considered accepted if at least one targetRef contained inside matched a listener.
for _, policy := range policies {
policy := policy.DeepCopy()
var policyStatus gwv1a2.PolicyStatus
accepted := false
targetRefs, err := extractTargetRefs(policy)
if err != nil {
errs = errors.Join(errs, fmt.Errorf("error finding targetRefs for policy %s: %w", policy.GetName(), err))
continue
}
for _, currTarget := range targetRefs {
if currTarget.Kind != KindGateway {
errs = errors.Join(errs, fmt.Errorf("extension policy %s doesn't target a Gateway", policy.GetName()))
continue
}

// Negative statuses have already been assigned so its safe to skip
gateway, resolveErr := resolveExtServerPolicyGatewayTargetRef(policy, currTarget, gatewayMap)
if gateway == nil {
// unable to find a matching Gateway for policy
continue
}

// Skip the gateway. Don't add anything to the policy status.
if resolveErr != nil {
// The targetRef part is somehow wrong, this policy can't be attached.
continue
}

// Set conditions for translation if it got any
if t.translateExtServerPolicyForGateway(policy, gateway, currTarget, xdsIR) {
// Set Accepted condition if it is unset
// Only add a status condition if the policy was added into the IR
// Find its ancestor reference by resolved gateway, even with resolve error
gatewayNN := utils.NamespacedName(gateway)
ancestorRefs := []gwv1a2.ParentReference{
getAncestorRefForPolicy(gatewayNN, currTarget.SectionName),
}
status.SetAcceptedForPolicyAncestors(&policyStatus, ancestorRefs, t.GatewayControllerName)
accepted = true
}
}
if accepted {
res = append(res, *policy)
policy.Object["status"] = policyStatusToUnstructured(policyStatus)
}
}

return res, errs
}

func extractTargetRefs(policy *unstructured.Unstructured) ([]gwv1a2.LocalPolicyTargetReferenceWithSectionName, error) {
ret := []gwv1a2.LocalPolicyTargetReferenceWithSectionName{}
spec, found := policy.Object["spec"].(map[string]any)
if !found {
return nil, fmt.Errorf("no targets found for the policy")
}
targetRefs, found := spec["targetRefs"]
if found {
if refArr, ok := targetRefs.([]any); ok {
for i := range refArr {
ref, err := extractSingleTargetRef(refArr[i])
if err != nil {
return nil, err
}
ret = append(ret, ref)
}
} else {
return nil, fmt.Errorf("targetRefs is not an array")
}
}
targetRef, found := spec["targetRef"]
if found {
ref, err := extractSingleTargetRef(targetRef)
if err != nil {
return nil, err
}
ret = append(ret, ref)
}
if len(ret) == 0 {
return nil, fmt.Errorf("no targets found for the policy")
}
return ret, nil
}

func extractSingleTargetRef(data any) (gwv1a2.LocalPolicyTargetReferenceWithSectionName, error) {
var currRef gwv1a2.LocalPolicyTargetReferenceWithSectionName
d, err := json.Marshal(data)
if err != nil {
return currRef, err
}
if err := json.Unmarshal(d, &currRef); err != nil {
return currRef, err
}
if currRef.Group == "" || currRef.Name == "" || currRef.Kind == "" {
return currRef, fmt.Errorf("invalid targetRef found: %s", string(d))
}
return currRef, nil
}

func policyStatusToUnstructured(policyStatus gwv1a2.PolicyStatus) map[string]any {
ret := map[string]any{}
// No need to check the marshal/unmarshal error here
d, _ := json.Marshal(policyStatus)
_ = json.Unmarshal(d, &ret)
return ret
}

func resolveExtServerPolicyGatewayTargetRef(policy *unstructured.Unstructured, target gwv1a2.LocalPolicyTargetReferenceWithSectionName, gateways map[types.NamespacedName]*policyGatewayTargetContext) (*GatewayContext, *status.PolicyResolveError) {
targetNs := ptr.To(gwv1b1.Namespace(policy.GetNamespace()))

// Check if the gateway exists
key := types.NamespacedName{
Name: string(target.Name),
Namespace: string(*targetNs),
}
gateway, ok := gateways[key]

// Gateway not found
if !ok {
return nil, nil
}

// Ensure Policy and target are in the same namespace
if policy.GetNamespace() != string(*targetNs) {
message := fmt.Sprintf("Namespace:%s TargetRef.Namespace:%s, extension server policies can only target a resource in the same namespace.",
policy.GetNamespace(), *targetNs)

return gateway.GatewayContext, &status.PolicyResolveError{
Reason: gwv1a2.PolicyReasonInvalid,
Message: message,
}
}

return gateway.GatewayContext, nil
}

func (t *Translator) translateExtServerPolicyForGateway(
policy *unstructured.Unstructured,
gateway *GatewayContext,
target gwv1a2.LocalPolicyTargetReferenceWithSectionName,
xdsIR XdsIRMap,
) bool {
irKey := t.getIRKey(gateway.Gateway)
gwIR := xdsIR[irKey]
found := false
for _, currListener := range gwIR.HTTP {
listenerName := currListener.Name[strings.LastIndex(currListener.Name, "/")+1:]
if target.SectionName != nil && string(*target.SectionName) != listenerName {
continue
}
currListener.ExtensionRefs = append(currListener.ExtensionRefs, &ir.UnstructuredRef{
Object: policy,
})
found = true
}
return found
}
Loading

0 comments on commit 11349b1

Please sign in to comment.