diff --git a/internal/controller/datadogagent/feature/otlp/feature.go b/internal/controller/datadogagent/feature/otlp/feature.go index fde79fc6b..f0d8e1fa6 100644 --- a/internal/controller/datadogagent/feature/otlp/feature.go +++ b/internal/controller/datadogagent/feature/otlp/feature.go @@ -12,15 +12,18 @@ import ( "strings" corev1 "k8s.io/api/core/v1" + netv1 "k8s.io/api/networking/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" "github.com/DataDog/datadog-operator/api/datadoghq/v2alpha1" apiutils "github.com/DataDog/datadog-operator/api/utils" + "github.com/DataDog/datadog-operator/pkg/cilium/v1" "github.com/go-logr/logr" apicommon "github.com/DataDog/datadog-operator/api/datadoghq/common" "github.com/DataDog/datadog-operator/internal/controller/datadogagent/common" + "github.com/DataDog/datadog-operator/internal/controller/datadogagent/component/objects" "github.com/DataDog/datadog-operator/internal/controller/datadogagent/feature" ) @@ -59,6 +62,9 @@ type otlpFeature struct { forceEnableLocalService bool localServiceName string + createKubernetesNetworkPolicy bool + createCiliumNetworkPolicy bool + owner metav1.Object } @@ -121,6 +127,15 @@ func (f *otlpFeature) Configure(dda *v2alpha1.DatadogAgent) (reqComp feature.Req reqComp.Agent.Containers = append(reqComp.Agent.Containers, apicommon.TraceAgentContainerName) } } + if f.grpcEnabled || f.httpEnabled { + if enabled, flavor := v2alpha1.IsNetworkPolicyEnabled(dda); enabled { + if flavor == v2alpha1.NetworkPolicyFlavorCilium { + f.createCiliumNetworkPolicy = true + } else { + f.createKubernetesNetworkPolicy = true + } + } + } return reqComp } @@ -130,13 +145,17 @@ func (f *otlpFeature) Configure(dda *v2alpha1.DatadogAgent) (reqComp feature.Req func (f *otlpFeature) ManageDependencies(managers feature.ResourceManagers, components feature.RequiredComponents) error { platformInfo := managers.Store().GetPlatformInfo() versionInfo := platformInfo.GetVersionInfo() + if f.grpcEnabled { + port, err := extractPortEndpoint(f.grpcEndpoint) + if err != nil { + f.logger.Error(err, "failed to extract port from OTLP/gRPC endpoint") + return fmt.Errorf("failed to extract port from OTLP/gRPC endpoint: %w", err) + } + if f.grpcHostPortEnabled && f.grpcCustomHostPort != 0 { + port = f.grpcCustomHostPort + } if common.ShouldCreateAgentLocalService(versionInfo, f.forceEnableLocalService) { - port, err := extractPortEndpoint(f.grpcEndpoint) - if err != nil { - f.logger.Error(err, "failed to extract port from OTLP/gRPC endpoint") - return fmt.Errorf("failed to extract port from OTLP/gRPC endpoint: %w", err) - } servicePort := []corev1.ServicePort{ { Protocol: corev1.ProtocolTCP, @@ -150,14 +169,72 @@ func (f *otlpFeature) ManageDependencies(managers feature.ResourceManagers, comp return err } } + //network policies for gRPC OTLP + policyName, podSelector := objects.GetNetworkPolicyMetadata(f.owner, v2alpha1.NodeAgentComponentName) + if f.createKubernetesNetworkPolicy { + protocolTCP := corev1.ProtocolTCP + ingressRules := []netv1.NetworkPolicyIngressRule{ + { + Ports: []netv1.NetworkPolicyPort{ + { + Port: &intstr.IntOrString{ + Type: intstr.Int, + IntVal: port, + }, + Protocol: &protocolTCP, + }, + }, + }, + } + if err := managers.NetworkPolicyManager().AddKubernetesNetworkPolicy( + policyName, + f.owner.GetNamespace(), + podSelector, + nil, + ingressRules, + nil, + ); err != nil { + return err + } + } else if f.createCiliumNetworkPolicy { + policySpecs := []cilium.NetworkPolicySpec{ + { + Description: "Ingress for gRPC OTLP", + EndpointSelector: podSelector, + Ingress: []cilium.IngressRule{ + { + FromEndpoints: []metav1.LabelSelector{ + {}, + }, + ToPorts: []cilium.PortRule{ + { + Ports: []cilium.PortProtocol{ + { + Port: strconv.Itoa(int(port)), + Protocol: cilium.ProtocolTCP, + }, + }, + }, + }, + }, + }, + }, + } + if err := managers.CiliumPolicyManager().AddCiliumPolicy(policyName, f.owner.GetNamespace(), policySpecs); err != nil { + return err + } + } } if f.httpEnabled { + port, err := extractPortEndpoint(f.httpEndpoint) + if err != nil { + f.logger.Error(err, "failed to extract port from OTLP/HTTP endpoint") + return fmt.Errorf("failed to extract port from OTLP/HTTP endpoint: %w", err) + } + if f.httpHostPortEnabled && f.httpCustomHostPort != 0 { + port = f.httpCustomHostPort + } if common.ShouldCreateAgentLocalService(versionInfo, f.forceEnableLocalService) { - port, err := extractPortEndpoint(f.httpEndpoint) - if err != nil { - f.logger.Error(err, "failed to extract port from OTLP/HTTP endpoint") - return fmt.Errorf("failed to extract port from OTLP/HTTP endpoint: %w", err) - } servicePort := []corev1.ServicePort{ { Protocol: corev1.ProtocolTCP, @@ -171,6 +248,61 @@ func (f *otlpFeature) ManageDependencies(managers feature.ResourceManagers, comp return err } } + //network policies for HTTP OTLP + policyName, podSelector := objects.GetNetworkPolicyMetadata(f.owner, v2alpha1.NodeAgentComponentName) + if f.createKubernetesNetworkPolicy { + protocolTCP := corev1.ProtocolTCP + ingressRules := []netv1.NetworkPolicyIngressRule{ + { + Ports: []netv1.NetworkPolicyPort{ + { + Port: &intstr.IntOrString{ + Type: intstr.Int, + IntVal: port, + }, + Protocol: &protocolTCP, + }, + }, + }, + } + if err := managers.NetworkPolicyManager().AddKubernetesNetworkPolicy( + policyName, + f.owner.GetNamespace(), + podSelector, + nil, + ingressRules, + nil, + ); err != nil { + return err + } + } else if f.createCiliumNetworkPolicy { + policySpecs := []cilium.NetworkPolicySpec{ + { + Description: "Ingress for HTTP OTLP", + EndpointSelector: podSelector, + Ingress: []cilium.IngressRule{ + { + FromEndpoints: []metav1.LabelSelector{ + {}, + }, + ToPorts: []cilium.PortRule{ + { + Ports: []cilium.PortProtocol{ + { + Port: strconv.Itoa(int(port)), + Protocol: cilium.ProtocolTCP, + }, + }, + }, + }, + }, + }, + }, + } + if err := managers.CiliumPolicyManager().AddCiliumPolicy(policyName, f.owner.GetNamespace(), policySpecs); err != nil { + return err + } + } } return nil }