From 51213498cea2c565c913c71ee0714c4364d66aac Mon Sep 17 00:00:00 2001 From: Apurup Chevuru <60630804+achevuru@users.noreply.github.com> Date: Wed, 28 Feb 2024 14:12:14 -0800 Subject: [PATCH] Network Policy - Strict mode support (#209) * Network Policy - Strict mode support * CR updates * Bump up conntrack map size and UTs * Minor cleanup --- controllers/policyendpoints_controller.go | 47 ++- .../policyendpoints_controller_test.go | 306 ++++++++++++++++++ go.mod | 20 +- go.sum | 33 ++ main.go | 4 + pkg/ebpf/bpf_client.go | 10 + pkg/ebpf/bpf_client_test.go | 61 ++++ pkg/ebpf/c/v4events.bpf.c | 2 +- pkg/ebpf/c/v6events.bpf.c | 2 +- pkg/rpc/rpc_handler.go | 115 +++++++ 10 files changed, 585 insertions(+), 15 deletions(-) create mode 100644 pkg/rpc/rpc_handler.go diff --git a/controllers/policyendpoints_controller.go b/controllers/policyendpoints_controller.go index 65ca53e..2515d09 100644 --- a/controllers/policyendpoints_controller.go +++ b/controllers/policyendpoints_controller.go @@ -94,7 +94,6 @@ func NewPolicyEndpointsReconciler(k8sClient client.Client, log logr.Logger, // Start prometheus prometheusRegister() } - return r, err } @@ -544,16 +543,15 @@ func (r *PolicyEndpointsReconciler) deriveTargetPods(ctx context.Context, // by the Host IP value. nodeIP := net.ParseIP(r.nodeIP) for _, pod := range policyEndpoint.Spec.PodSelectorEndpoints { + podIdentifier := utils.GetPodIdentifier(pod.Name, pod.Namespace) if nodeIP.Equal(net.ParseIP(string(pod.HostIP))) { r.log.Info("Found a matching Pod: ", "name: ", pod.Name, "namespace: ", pod.Namespace) targetPods = append(targetPods, types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}) - podIdentifier := utils.GetPodIdentifier(pod.Name, pod.Namespace) podIdentifiers[podIdentifier] = true r.log.Info("Derived ", "Pod identifier: ", podIdentifier) - r.updatePodIdentifierToPEMap(ctx, podIdentifier, parentPEList) } + r.updatePodIdentifierToPEMap(ctx, podIdentifier, parentPEList) } - return targetPods, podIdentifiers } @@ -665,3 +663,44 @@ func (r *PolicyEndpointsReconciler) derivePolicyEndpointsOfParentNP(ctx context. } return parentPolicyEndpointList } + +func (r *PolicyEndpointsReconciler) GeteBPFClient() ebpf.BpfClient { + return r.ebpfClient +} + +func (r *PolicyEndpointsReconciler) DeriveFireWallRulesPerPodIdentifier(podIdentifier string, podNamespace string) ([]ebpf.EbpfFirewallRules, + []ebpf.EbpfFirewallRules, error) { + + ingressRules, egressRules, isIngressIsolated, isEgressIsolated, err := r.deriveIngressAndEgressFirewallRules(context.Background(), podIdentifier, + podNamespace, "", false) + if err != nil { + r.log.Error(err, "Error deriving firewall rules") + return ingressRules, egressRules, nil + } + + if len(ingressRules) == 0 && !isIngressIsolated { + // No active ingress rules for this pod, but we only should land here + // if there are active egress rules. So, we need to add an allow-all entry to ingress rule set + r.log.Info("No Ingress rules and no ingress isolation - Appending catch all entry") + r.addCatchAllEntry(context.Background(), &ingressRules) + } + + if len(egressRules) == 0 && !isEgressIsolated { + // No active egress rules for this pod but we only should land here + // if there are active ingress rules. So, we need to add an allow-all entry to egress rule set + r.log.Info("No Egress rules and no egress isolation - Appending catch all entry") + r.addCatchAllEntry(context.Background(), &egressRules) + } + + return ingressRules, egressRules, nil +} + +func (r *PolicyEndpointsReconciler) ArePoliciesAvailableInLocalCache(podIdentifier string) bool { + if policyEndpointList, ok := r.podIdentifierToPolicyEndpointMap.Load(podIdentifier); ok { + if len(policyEndpointList.([]string)) > 0 { + r.log.Info("Active policies available against", "podIdentifier", podIdentifier) + return true + } + } + return false +} diff --git a/controllers/policyendpoints_controller_test.go b/controllers/policyendpoints_controller_test.go index 7419b4c..ef2297c 100644 --- a/controllers/policyendpoints_controller_test.go +++ b/controllers/policyendpoints_controller_test.go @@ -712,3 +712,309 @@ func TestDeriveDefaultPodIsolation(t *testing.T) { }) } } + +func TestArePoliciesAvailableInLocalCache(t *testing.T) { + type want struct { + activePoliciesAvailable bool + } + + tests := []struct { + name string + podIdentifier string + policyEndpointName []string + want want + }{ + { + name: "Active policies present against the PodIdentifier", + podIdentifier: "foo-bar", + policyEndpointName: []string{"foo", "bar"}, + want: want{ + activePoliciesAvailable: true, + }, + }, + + { + name: "No Active policies present against the PodIdentifier", + podIdentifier: "foo-bar", + want: want{ + activePoliciesAvailable: false, + }, + }, + } + + for _, tt := range tests { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockClient := mock_client.NewMockClient(ctrl) + policyEndpointReconciler, _ := NewPolicyEndpointsReconciler(mockClient, logr.New(&log.NullLogSink{}), + false, false, false, false, 300) + var policyEndpointsList []string + policyEndpointsList = append(policyEndpointsList, tt.policyEndpointName...) + policyEndpointReconciler.podIdentifierToPolicyEndpointMap.Store(tt.podIdentifier, policyEndpointsList) + + t.Run(tt.name, func(t *testing.T) { + activePoliciesAvailable := policyEndpointReconciler.ArePoliciesAvailableInLocalCache(tt.podIdentifier) + assert.Equal(t, tt.want.activePoliciesAvailable, activePoliciesAvailable) + }) + } +} + +func TestDeriveFireWallRulesPerPodIdentifier(t *testing.T) { + protocolTCP := corev1.ProtocolTCP + protocolUDP := corev1.ProtocolUDP + var port80 int32 = 80 + + type policyendpointGetCall struct { + peRef types.NamespacedName + pe *policyendpoint.PolicyEndpoint + err error + } + + type want struct { + ingressRules []ebpf.EbpfFirewallRules + egressRules []ebpf.EbpfFirewallRules + isIngressIsolated bool + isEgressIsolated bool + } + + ingressAndEgressPolicy := policyendpoint.PolicyEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: "bar", + }, + Spec: policyendpoint.PolicyEndpointSpec{ + PodSelector: &metav1.LabelSelector{}, + PolicyRef: policyendpoint.PolicyReference{ + Name: "foo", + Namespace: "bar", + }, + Ingress: []policyendpoint.EndpointInfo{ + { + CIDR: "1.1.1.1/32", + Ports: []policyendpoint.Port{ + { + Port: &port80, + Protocol: &protocolTCP, + }, + }, + }, + }, + Egress: []policyendpoint.EndpointInfo{ + { + CIDR: "2.2.2.2/32", + Ports: []policyendpoint.Port{ + { + Port: &port80, + Protocol: &protocolUDP, + }, + }, + }, + }, + }, + } + + ingressRulesOnlyPolicy := policyendpoint.PolicyEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: "bar", + }, + Spec: policyendpoint.PolicyEndpointSpec{ + PodSelector: &metav1.LabelSelector{}, + PolicyRef: policyendpoint.PolicyReference{ + Name: "foo", + Namespace: "bar", + }, + PodIsolation: []networking.PolicyType{ + networking.PolicyTypeIngress, + networking.PolicyTypeEgress, + }, + Ingress: []policyendpoint.EndpointInfo{ + { + CIDR: "1.1.1.1/32", + Ports: []policyendpoint.Port{ + { + Port: &port80, + Protocol: &protocolTCP, + }, + }, + }, + }, + }, + } + + egressRulesOnlyPolicy := policyendpoint.PolicyEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: "bar", + }, + Spec: policyendpoint.PolicyEndpointSpec{ + PodSelector: &metav1.LabelSelector{}, + PolicyRef: policyendpoint.PolicyReference{ + Name: "foo", + Namespace: "bar", + }, + PodIsolation: []networking.PolicyType{ + networking.PolicyTypeIngress, + networking.PolicyTypeEgress, + }, + Egress: []policyendpoint.EndpointInfo{ + { + CIDR: "2.2.2.2/32", + Ports: []policyendpoint.Port{ + { + Port: &port80, + Protocol: &protocolUDP, + }, + }, + }, + }, + }, + } + + tests := []struct { + name string + podIdentifier string + resourceNamespace string + policyEndpointName string + policyendpointGetCall []policyendpointGetCall + want want + wantErr error + }{ + { + name: "Ingress and Egress Policy", + podIdentifier: "foo-bar", + resourceNamespace: "bar", + policyEndpointName: "foo", + policyendpointGetCall: []policyendpointGetCall{ + { + peRef: types.NamespacedName{ + Name: "foo", + Namespace: "bar", + }, + pe: &ingressAndEgressPolicy, + }, + }, + want: want{ + ingressRules: []ebpf.EbpfFirewallRules{ + { + IPCidr: "1.1.1.1/32", + L4Info: []policyendpoint.Port{ + { + Protocol: &protocolTCP, + Port: &port80, + }, + }, + }, + }, + egressRules: []ebpf.EbpfFirewallRules{ + { + IPCidr: "2.2.2.2/32", + L4Info: []policyendpoint.Port{ + { + Protocol: &protocolUDP, + Port: &port80, + }, + }, + }, + }, + isIngressIsolated: false, + isEgressIsolated: false, + }, + wantErr: nil, + }, + { + name: "Ingress Only Policy", + podIdentifier: "foo-bar", + resourceNamespace: "bar", + policyEndpointName: "foo", + policyendpointGetCall: []policyendpointGetCall{ + { + peRef: types.NamespacedName{ + Name: "foo", + Namespace: "bar", + }, + pe: &ingressRulesOnlyPolicy, + }, + }, + want: want{ + ingressRules: []ebpf.EbpfFirewallRules{ + { + IPCidr: "1.1.1.1/32", + L4Info: []policyendpoint.Port{ + { + Protocol: &protocolTCP, + Port: &port80, + }, + }, + }, + }, + isIngressIsolated: false, + isEgressIsolated: true, + }, + wantErr: nil, + }, + + { + name: "Egress Only Policy", + podIdentifier: "foo-bar", + resourceNamespace: "bar", + policyEndpointName: "foo", + policyendpointGetCall: []policyendpointGetCall{ + { + peRef: types.NamespacedName{ + Name: "foo", + Namespace: "bar", + }, + pe: &egressRulesOnlyPolicy, + }, + }, + want: want{ + egressRules: []ebpf.EbpfFirewallRules{ + { + IPCidr: "2.2.2.2/32", + L4Info: []policyendpoint.Port{ + { + Protocol: &protocolUDP, + Port: &port80, + }, + }, + }, + }, + isIngressIsolated: true, + isEgressIsolated: false, + }, + wantErr: nil, + }, + } + + for _, tt := range tests { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockClient := mock_client.NewMockClient(ctrl) + policyEndpointReconciler, _ := NewPolicyEndpointsReconciler(mockClient, logr.New(&log.NullLogSink{}), + false, false, false, false, 300) + var policyEndpointsList []string + policyEndpointsList = append(policyEndpointsList, tt.policyEndpointName) + policyEndpointReconciler.podIdentifierToPolicyEndpointMap.Store(tt.podIdentifier, policyEndpointsList) + for _, item := range tt.policyendpointGetCall { + call := item + mockClient.EXPECT().Get(gomock.Any(), call.peRef, gomock.Any()).DoAndReturn( + func(ctx context.Context, key types.NamespacedName, currentPE *policyendpoint.PolicyEndpoint, opts ...client.GetOption) error { + if call.pe != nil { + *currentPE = *call.pe + } + return call.err + }, + ).AnyTimes() + } + + t.Run(tt.name, func(t *testing.T) { + gotIngressRules, gotEgressRules, gotError := policyEndpointReconciler.DeriveFireWallRulesPerPodIdentifier(tt.podIdentifier, tt.resourceNamespace) + assert.Equal(t, tt.want.ingressRules, gotIngressRules) + assert.Equal(t, tt.want.egressRules, gotEgressRules) + assert.Equal(t, tt.wantErr, gotError) + }) + } +} diff --git a/go.mod b/go.mod index 2bae447..c4703e4 100644 --- a/go.mod +++ b/go.mod @@ -3,11 +3,11 @@ module github.com/aws/aws-network-policy-agent go 1.21 require ( - github.com/aws/amazon-vpc-cni-k8s v1.16.2 + github.com/aws/amazon-vpc-cni-k8s v1.12.1-0.20240206190757-c7ed274bef5f github.com/aws/aws-ebpf-sdk-go v1.0.7 github.com/aws/aws-sdk-go v1.50.9 github.com/go-logr/logr v1.4.1 - github.com/go-logr/zapr v1.2.4 + github.com/go-logr/zapr v1.3.0 github.com/golang/mock v1.6.0 github.com/google/go-cmp v0.6.0 github.com/google/uuid v1.4.0 @@ -19,11 +19,12 @@ require ( github.com/vishvananda/netlink v1.2.1-beta.2 go.uber.org/zap v1.26.0 golang.org/x/sys v0.16.0 + google.golang.org/grpc v1.61.0 gopkg.in/natefinch/lumberjack.v2 v2.2.1 k8s.io/api v0.29.1 k8s.io/apimachinery v0.29.1 k8s.io/client-go v0.29.1 - sigs.k8s.io/controller-runtime v0.16.3 + sigs.k8s.io/controller-runtime v0.17.0 ) require ( @@ -31,8 +32,8 @@ require ( github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect - github.com/evanphx/json-patch/v5 v5.6.0 // indirect - github.com/fsnotify/fsnotify v1.6.0 // indirect + github.com/evanphx/json-patch/v5 v5.8.0 // indirect + github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/go-openapi/jsonpointer v0.19.6 // indirect github.com/go-openapi/jsonreference v0.20.2 // indirect github.com/go-openapi/swag v0.22.3 // indirect @@ -59,22 +60,23 @@ require ( go.uber.org/multierr v1.11.0 // indirect golang.org/x/exp v0.0.0-20230315142452-642cacee5cc0 // indirect golang.org/x/net v0.19.0 // indirect - golang.org/x/oauth2 v0.13.0 // indirect + golang.org/x/oauth2 v0.14.0 // indirect golang.org/x/term v0.15.0 // indirect golang.org/x/text v0.14.0 // indirect golang.org/x/time v0.3.0 // indirect gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect google.golang.org/appengine v1.6.8 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 // indirect google.golang.org/protobuf v1.31.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - k8s.io/apiextensions-apiserver v0.28.3 // indirect - k8s.io/component-base v0.28.3 // indirect + k8s.io/apiextensions-apiserver v0.29.0 // indirect + k8s.io/component-base v0.29.0 // indirect k8s.io/klog/v2 v2.110.1 // indirect k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 // indirect k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect - sigs.k8s.io/yaml v1.3.0 // indirect + sigs.k8s.io/yaml v1.4.0 // indirect ) diff --git a/go.sum b/go.sum index ade8e59..28bce25 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/aws/amazon-vpc-cni-k8s v1.12.1-0.20240206190757-c7ed274bef5f h1:YQAUgbzD0Mq3/cuFtw04vc+tHqOPeXMgOHYFEjvLLkU= +github.com/aws/amazon-vpc-cni-k8s v1.12.1-0.20240206190757-c7ed274bef5f/go.mod h1:rgIw5ADNv+hAuC2d2bzBIMmlsFpSDeW2TxrgO4dgdX8= github.com/aws/amazon-vpc-cni-k8s v1.16.2 h1:gwno1KikgwrrxvK7XGpjpgNz15tYGeVqZh7ERo+/LyM= github.com/aws/amazon-vpc-cni-k8s v1.16.2/go.mod h1:ppYA2CLyu8YigHrG8g1QgtU6fpu5rFb6WVVha7Xb4gc= github.com/aws/aws-ebpf-sdk-go v1.0.6 h1:f4f2HKmDEA2hQLd4Sz5jA0YYAFLmgLVnopZIycKcxnA= @@ -20,16 +22,24 @@ github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxER github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= github.com/evanphx/json-patch v5.6.0+incompatible h1:jBYDEEiFBPxA0v50tFdvOzQQTCvpL6mnFh5mB2/l16U= github.com/evanphx/json-patch v5.6.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= +github.com/evanphx/json-patch v5.7.0+incompatible h1:vgGkfT/9f8zE6tvSCe74nfpAVDQ2tG6yudJd8LBksgI= github.com/evanphx/json-patch/v5 v5.6.0 h1:b91NhWfaz02IuVxO9faSllyAtNXHMPkC5J8sJCLunww= github.com/evanphx/json-patch/v5 v5.6.0/go.mod h1:G79N1coSVB93tBe7j6PhzjmR3/2VvlbKOFpnXhI9Bw4= +github.com/evanphx/json-patch/v5 v5.8.0 h1:lRj6N9Nci7MvzrXuX6HFzU8XjmhPiXPlsKEy1u0KQro= +github.com/evanphx/json-patch/v5 v5.8.0/go.mod h1:VNkHZ/282BpEyt/tObQO8s5CMPmYYq14uClGH4abBuQ= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= +github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= +github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= +github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/zapr v1.2.4 h1:QHVo+6stLbfJmYGkQ7uGHUCu5hnAFAj6mDe6Ea0SeOo= github.com/go-logr/zapr v1.2.4/go.mod h1:FyHWQIzQORZ0QVE1BtVHv3cKtNLuXsbNLtpuhNapBOA= +github.com/go-logr/zapr v1.3.0 h1:XGdV8XW8zdwFiwOA2Dryh1gj2KRQyOOoNmBy4EplIcQ= +github.com/go-logr/zapr v1.3.0/go.mod h1:YKepepNBd1u/oyhd/yQmtjVXmm9uML4IXUgMOwR8/Gg= github.com/go-openapi/jsonpointer v0.19.6 h1:eCs3fxoIi3Wh6vtgmLTOjdhSpiqphQ+DaPn38N2ZdrE= github.com/go-openapi/jsonpointer v0.19.6/go.mod h1:osyAmYz/mB/C3I+WsTTSgw1ONzaLJoLCyoi6/zppojs= github.com/go-openapi/jsonreference v0.20.2 h1:3sVjiK66+uXK/6oQ8xgcRKcFgQ5KXa2KvnJRumpMGbE= @@ -72,8 +82,10 @@ github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGw github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= +github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= @@ -95,8 +107,10 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/onsi/ginkgo/v2 v2.13.2 h1:Bi2gGVkfn6gQcjNjZJVO8Gf0FHzMPf2phUei9tejVMs= github.com/onsi/ginkgo/v2 v2.13.2/go.mod h1:XStQ8QcGwLyF4HdfcZB8SFOS/MWCgDuXMSBe6zrvLgM= +github.com/onsi/ginkgo/v2 v2.14.0 h1:vSmGj2Z5YPb9JwCWT6z6ihcUvDhuXLc3sJiqd3jMKAY= github.com/onsi/gomega v1.30.0 h1:hvMK7xYz4D3HapigLTeGdId/NcfQx1VHMJc60ew99+8= github.com/onsi/gomega v1.30.0/go.mod h1:9sxs+SwGrKI0+PWe4Fxa9tFQQBG5xSsSbMXOI8PPpoQ= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -142,6 +156,7 @@ go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= @@ -171,6 +186,8 @@ golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c= golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U= golang.org/x/oauth2 v0.13.0 h1:jDDenyj+WgFtmV3zYVoi8aE2BwtXFLWOA67ZfNWftiY= golang.org/x/oauth2 v0.13.0/go.mod h1:/JMhi4ZRXAf4HG9LiNmxvk+45+96RUlVThiH8FzNBn0= +golang.org/x/oauth2 v0.14.0 h1:P0Vrf/2538nmC0H+pEQ3MNFRRnVR7RlqyVw+bvm26z0= +golang.org/x/oauth2 v0.14.0/go.mod h1:lAtNWgaWfL4cm7j2OV8TxGi9Qb7ECORx8DktCY74OwM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -220,6 +237,14 @@ gomodules.xyz/jsonpatch/v2 v2.4.0 h1:Ci3iUJyx9UeRx7CeFN8ARgGbkESwJK+KB9lLcWxY/Zw gomodules.xyz/jsonpatch/v2 v2.4.0/go.mod h1:AH3dM2RI6uoBZxn3LVrfvJ3E0/9dG4cSrbuBJT4moAY= google.golang.org/appengine v1.6.8 h1:IhEN5q69dyKagZPYMSdIjS2HqprW324FRQZJcGqPAsM= google.golang.org/appengine v1.6.8/go.mod h1:1jJ3jBArFh5pcgW8gCtRJnepW8FzD1V44FJffLiz/Ds= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231002182017-d307bd883b97 h1:6GQBEOdGkX6MMTLT9V+TjtIRZCw9VPD5Z+yHY9wMgS0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231002182017-d307bd883b97/go.mod h1:v7nGkzlmW8P3n/bKmWBn2WpBjpOEx8Q6gMueudAmKfY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 h1:Jyp0Hsi0bmHXG6k9eATXoYtjd6e2UzZ1SCn/wIupY14= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17/go.mod h1:oQ5rr10WTTMvP4A36n8JpR1OrO1BEiV4f78CneXZxkA= +google.golang.org/grpc v1.60.1 h1:26+wFr+cNqSGFcOXcabYC0lUVJVRa2Sb2ortSK7VrEU= +google.golang.org/grpc v1.60.1/go.mod h1:OlCHIeLYqSSsLi6i49B5QGdzaMZK9+M7LXN2FKz4eGM= +google.golang.org/grpc v1.61.0 h1:TOvOcuXn30kRao+gfcvsebNEa5iZIiLkisYEkf7R7o0= +google.golang.org/grpc v1.61.0/go.mod h1:VUbo7IFqmF1QtCAstipjG0GIoq49KvMe9+h1jFLBNJs= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= @@ -243,12 +268,16 @@ k8s.io/api v0.29.1 h1:DAjwWX/9YT7NQD4INu49ROJuZAAAP/Ijki48GUPzxqw= k8s.io/api v0.29.1/go.mod h1:7Kl10vBRUXhnQQI8YR/R327zXC8eJ7887/+Ybta+RoQ= k8s.io/apiextensions-apiserver v0.28.3 h1:Od7DEnhXHnHPZG+W9I97/fSQkVpVPQx2diy+2EtmY08= k8s.io/apiextensions-apiserver v0.28.3/go.mod h1:NE1XJZ4On0hS11aWWJUTNkmVB03j9LM7gJSisbRt8Lc= +k8s.io/apiextensions-apiserver v0.29.0 h1:0VuspFG7Hj+SxyF/Z/2T0uFbI5gb5LRgEyUVE3Q4lV0= +k8s.io/apiextensions-apiserver v0.29.0/go.mod h1:TKmpy3bTS0mr9pylH0nOt/QzQRrW7/h7yLdRForMZwc= k8s.io/apimachinery v0.29.1 h1:KY4/E6km/wLBguvCZv8cKTeOwwOBqFNjwJIdMkMbbRc= k8s.io/apimachinery v0.29.1/go.mod h1:6HVkd1FwxIagpYrHSwJlQqZI3G9LfYWRPAkUvLnXTKU= k8s.io/client-go v0.29.1 h1:19B/+2NGEwnFLzt0uB5kNJnfTsbV8w6TgQRz9l7ti7A= k8s.io/client-go v0.29.1/go.mod h1:TDG/psL9hdet0TI9mGyHJSgRkW3H9JZk2dNEUS7bRks= k8s.io/component-base v0.28.3 h1:rDy68eHKxq/80RiMb2Ld/tbH8uAE75JdCqJyi6lXMzI= k8s.io/component-base v0.28.3/go.mod h1:fDJ6vpVNSk6cRo5wmDa6eKIG7UlIQkaFmZN2fYgIUD8= +k8s.io/component-base v0.29.0 h1:T7rjd5wvLnPBV1vC4zWd/iWRbV8Mdxs+nGaoaFzGw3s= +k8s.io/component-base v0.29.0/go.mod h1:sADonFTQ9Zc9yFLghpDpmNXEdHyQmFIGbiuZbqAXQ1M= k8s.io/klog/v2 v2.110.1 h1:U/Af64HJf7FcwMcXyKm2RPM22WZzyR7OSpYj5tg3cL0= k8s.io/klog/v2 v2.110.1/go.mod h1:YGtd1984u+GgbuZ7e08/yBuAfKLSO0+uR1Fhi6ExXjo= k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 h1:aVUu9fTY98ivBPKR9Y5w/AuzbMm96cd3YHRTU83I780= @@ -257,9 +286,13 @@ k8s.io/utils v0.0.0-20230726121419-3b25d923346b h1:sgn3ZU783SCgtaSJjpcVVlRqd6GSn k8s.io/utils v0.0.0-20230726121419-3b25d923346b/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= sigs.k8s.io/controller-runtime v0.16.3 h1:2TuvuokmfXvDUamSx1SuAOO3eTyye+47mJCigwG62c4= sigs.k8s.io/controller-runtime v0.16.3/go.mod h1:j7bialYoSn142nv9sCOJmQgDXQXxnroFU4VnX/brVJ0= +sigs.k8s.io/controller-runtime v0.17.0 h1:fjJQf8Ukya+VjogLO6/bNX9HE6Y2xpsO5+fyS26ur/s= +sigs.k8s.io/controller-runtime v0.17.0/go.mod h1:+MngTvIQQQhfXtwfdGw/UOQ/aIaqsYywfCINOtwMO/s= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+sGiqlzvrtq4= sigs.k8s.io/structured-merge-diff/v4 v4.4.1/go.mod h1:N8hJocpFajUSSeSJ9bOZ77VzejKZaXsTtZo4/u7Io08= sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo= sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8= +sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E= +sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY= diff --git a/main.go b/main.go index c9d943e..83131bc 100644 --- a/main.go +++ b/main.go @@ -19,6 +19,8 @@ package main import ( "os" + "github.com/aws/aws-network-policy-agent/pkg/rpc" + "github.com/aws/aws-network-policy-agent/pkg/logger" "github.com/go-logr/logr" @@ -106,12 +108,14 @@ func main() { } go metrics.ServeMetrics() + go rpc.RunRPCHandler(policyEndpointController) setupLog.Info("starting manager") if err := mgr.Start(ctx); err != nil { setupLog.Error(err, "problem running manager") os.Exit(1) } + } // loadControllerConfig loads the controller configuration diff --git a/pkg/ebpf/bpf_client.go b/pkg/ebpf/bpf_client.go index 9300620..0b9f8e5 100644 --- a/pkg/ebpf/bpf_client.go +++ b/pkg/ebpf/bpf_client.go @@ -89,6 +89,7 @@ type BpfClient interface { DetacheBPFProbes(pod types.NamespacedName, ingress bool, egress bool, deletePinPath bool) error UpdateEbpfMaps(podIdentifier string, ingressFirewallRules []EbpfFirewallRules, egressFirewallRules []EbpfFirewallRules) error IsEBPFProbeAttached(podName string, podNamespace string) (bool, bool) + IsMapUpdateRequired(podIdentifier string) bool } type EvProgram struct { @@ -701,6 +702,15 @@ func (l *bpfClient) IsEBPFProbeAttached(podName string, podNamespace string) (bo return ingress, egress } +func (l *bpfClient) IsMapUpdateRequired(podIdentifier string) bool { + mapUpdateRequired := false + if _, ok := l.policyEndpointeBPFContext.Load(podIdentifier); !ok { + l.logger.Info("No map instance found") + mapUpdateRequired = true + } + return mapUpdateRequired +} + func (l *bpfClient) updateEbpfMap(mapToUpdate goebpfmaps.BpfMap, firewallRules []EbpfFirewallRules) error { start := time.Now() duration := msSince(start) diff --git a/pkg/ebpf/bpf_client_test.go b/pkg/ebpf/bpf_client_test.go index 31d2474..c658b82 100644 --- a/pkg/ebpf/bpf_client_test.go +++ b/pkg/ebpf/bpf_client_test.go @@ -833,6 +833,67 @@ func TestMergeDuplicateL4Info(t *testing.T) { } } +func TestIsMapUpdateRequired(t *testing.T) { + sampleIngressPgmInfo := goelf.BpfData{ + Program: goebpfprogs.BpfProgram{ + ProgID: 2, + ProgFD: 3, + }, + } + sampleEgressPgmInfo := goelf.BpfData{ + Program: goebpfprogs.BpfProgram{ + ProgID: 4, + ProgFD: 5, + }, + } + + tests := []struct { + name string + podIdentifier string + isIngressPgmInfoPresent bool + isEgressPgmInfoPresent bool + want bool + }{ + { + name: "PodIdentifier with existing maps", + podIdentifier: "foo-bar", + isIngressPgmInfoPresent: true, + isEgressPgmInfoPresent: true, + want: false, + }, + { + name: "PodIdentifier without existing maps", + podIdentifier: "foo-bar", + want: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + testBpfClient := &bpfClient{ + nodeIP: "10.1.1.1", + logger: logr.New(&log.NullLogSink{}), + enableIPv6: false, + hostMask: "/32", + policyEndpointeBPFContext: new(sync.Map), + } + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + if tt.isIngressPgmInfoPresent || tt.isEgressPgmInfoPresent { + sampleBPFContext := BPFContext{ + ingressPgmInfo: sampleIngressPgmInfo, + egressPgmInfo: sampleEgressPgmInfo, + } + testBpfClient.policyEndpointeBPFContext.Store(tt.podIdentifier, sampleBPFContext) + } + gotIsMapUpdateRequired := testBpfClient.IsMapUpdateRequired(tt.podIdentifier) + assert.Equal(t, tt.want, gotIsMapUpdateRequired) + }) + } + +} + func Int32Ptr(i int32) *int32 { return &i } diff --git a/pkg/ebpf/c/v4events.bpf.c b/pkg/ebpf/c/v4events.bpf.c index 57584e1..15ae1cb 100644 --- a/pkg/ebpf/c/v4events.bpf.c +++ b/pkg/ebpf/c/v4events.bpf.c @@ -43,7 +43,7 @@ struct bpf_map_def_pvt SEC("maps") aws_conntrack_map = { .type = BPF_MAP_TYPE_LRU_HASH, .key_size =sizeof(struct conntrack_key), .value_size = sizeof(struct conntrack_value), - .max_entries = 65536, + .max_entries = 256 * 1024, .pinning = PIN_GLOBAL_NS, }; diff --git a/pkg/ebpf/c/v6events.bpf.c b/pkg/ebpf/c/v6events.bpf.c index ed2c4b3..19d359a 100644 --- a/pkg/ebpf/c/v6events.bpf.c +++ b/pkg/ebpf/c/v6events.bpf.c @@ -44,7 +44,7 @@ struct bpf_map_def_pvt SEC("maps") aws_conntrack_map = { .type = BPF_MAP_TYPE_LRU_HASH, .key_size =sizeof(struct conntrack_key), .value_size = sizeof(struct conntrack_value), - .max_entries = 65536, + .max_entries = 256 * 1024, .pinning = PIN_GLOBAL_NS, }; diff --git a/pkg/rpc/rpc_handler.go b/pkg/rpc/rpc_handler.go new file mode 100644 index 0000000..5d174fa --- /dev/null +++ b/pkg/rpc/rpc_handler.go @@ -0,0 +1,115 @@ +// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +package rpc + +import ( + "context" + "net" + + "github.com/aws/aws-network-policy-agent/controllers" + "github.com/aws/aws-network-policy-agent/pkg/utils" + + "github.com/aws/amazon-vpc-cni-k8s/rpc" + "github.com/go-logr/logr" + "github.com/pkg/errors" + "google.golang.org/grpc" + "google.golang.org/grpc/health" + healthpb "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/reflection" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" +) + +const ( + npgRPCaddress = "127.0.0.1:50052" + grpcHealthServiceName = "grpc.health.v1.np-agent" +) + +// server controls RPC service responses. +type server struct { + policyReconciler *controllers.PolicyEndpointsReconciler + log logr.Logger +} + +// EnforceNpToPod processes CNI Enforce NP network request +func (s *server) EnforceNpToPod(ctx context.Context, in *rpc.EnforceNpRequest) (*rpc.EnforceNpReply, error) { + s.log.Info("Received Enforce Network Policy Request for Pod", "Name", in.K8S_POD_NAME, "Namespace", in.K8S_POD_NAMESPACE) + var err error + + podIdentifier := utils.GetPodIdentifier(in.K8S_POD_NAME, in.K8S_POD_NAMESPACE) + isMapUpdateRequired := s.policyReconciler.GeteBPFClient().IsMapUpdateRequired(podIdentifier) + err = s.policyReconciler.GeteBPFClient().AttacheBPFProbes(types.NamespacedName{Name: in.K8S_POD_NAME, Namespace: in.K8S_POD_NAMESPACE}, + podIdentifier, true, true) + if err != nil { + s.log.Error(err, "Attaching eBPF probe failed for", "pod", in.K8S_POD_NAME, "namespace", in.K8S_POD_NAMESPACE) + return nil, err + } + + // We attempt to program eBPF firewall map entries for this pod, if the local agent is aware of the policies + // configured against it. For example, if this is a new replica of an existing pod/deployment then the local + // node agent will have the policy information available to it. If not, we will leave the pod in default deny state + // until the Network Policy controller reconciles existing policies against this pod. + + // Check if there are active policies against the new pod and if there are other pods on the local node that share + // the eBPF firewall maps with the newly launched pod, if already present we can skip the map update and return + if s.policyReconciler.ArePoliciesAvailableInLocalCache(podIdentifier) && isMapUpdateRequired { + // If we're here, then the local agent knows the list of active policies that apply to this pod and + // this is the first pod of it's type to land on the local node/cluster + s.log.Info("Active policies present against this pod and this is a new Pod to the local node, configuring firewall rules....") + + //Derive Ingress and Egress Firewall Rules and Update the relevant eBPF maps + ingressRules, egressRules, _ := + s.policyReconciler.DeriveFireWallRulesPerPodIdentifier(podIdentifier, in.K8S_POD_NAMESPACE) + + err = s.policyReconciler.GeteBPFClient().UpdateEbpfMaps(podIdentifier, ingressRules, egressRules) + if err != nil { + s.log.Error(err, "Map update(s) failed for, ", "podIdentifier ", podIdentifier) + return nil, err + } + } else { + s.log.Info("Pod either has no active policies or shares the eBPF firewall maps with other local pods. No Map update required..") + } + + resp := rpc.EnforceNpReply{ + Success: err == nil, + } + return &resp, nil +} + +// RunRPCHandler handles request from gRPC +func RunRPCHandler(policyReconciler *controllers.PolicyEndpointsReconciler) error { + rpcLog := ctrl.Log.WithName("rpc-handler") + + rpcLog.Info("Serving RPC Handler", "Address", npgRPCaddress) + listener, err := net.Listen("tcp", npgRPCaddress) + if err != nil { + rpcLog.Error(err, "Failed to listen gRPC port") + return errors.Wrap(err, "network policy agent: failed to listen to gRPC port") + } + grpcServer := grpc.NewServer() + rpc.RegisterNPBackendServer(grpcServer, &server{policyReconciler: policyReconciler, log: rpcLog}) + healthServer := health.NewServer() + // No need to ever change this to HealthCheckResponse_NOT_SERVING since it's a local service only + healthServer.SetServingStatus(grpcHealthServiceName, healthpb.HealthCheckResponse_SERVING) + healthpb.RegisterHealthServer(grpcServer, healthServer) + + // Register reflection service on gRPC server. + reflection.Register(grpcServer) + if err := grpcServer.Serve(listener); err != nil { + rpcLog.Error(err, "Failed to start server on gRPC port: %v", err) + return errors.Wrap(err, "network policy agent: failed to start server on gPRC port") + } + rpcLog.Info("Done with RPC Handler initialization") + return nil +}