diff --git a/charts/restate-operator-helm/templates/rbac.yaml b/charts/restate-operator-helm/templates/rbac.yaml index b4a5eb2..0e3ca21 100644 --- a/charts/restate-operator-helm/templates/rbac.yaml +++ b/charts/restate-operator-helm/templates/rbac.yaml @@ -47,6 +47,7 @@ rules: - statefulsets - persistentvolumeclaims - pods + - securitygrouppolicies verbs: - get - list @@ -57,16 +58,19 @@ rules: - '' - apps - networking.k8s.io + - vpcresources.k8s.aws - resources: - statefulsets - networkpolicies - pods + - securitygrouppolicies verbs: - delete apiGroups: - '' - apps - networking.k8s.io + - vpcresources.k8s.aws {{- if .Values.awsPodIdentityAssociationCluster }} - resources: - podidentityassociations diff --git a/crd/RestateCluster.pkl b/crd/RestateCluster.pkl index 771d0b9..d0ccbca 100644 --- a/crd/RestateCluster.pkl +++ b/crd/RestateCluster.pkl @@ -6,6 +6,7 @@ module dev.restate.v1.RestateCluster extends "package://pkg.pkl-lang.org/pkl-k8s/k8s@1.0.1#/K8sResource.pkl" import "package://pkg.pkl-lang.org/pkl-k8s/k8s@1.0.1#/apimachinery/pkg/apis/meta/v1/ObjectMeta.pkl" +import "package://pkg.pkl-lang.org/pkl-k8s/k8s@1.0.1#/api/core/v1/PodSpec.pkl" import "package://pkg.pkl-lang.org/pkl-k8s/k8s@1.0.1#/api/core/v1/EnvVar.pkl" import "package://pkg.pkl-lang.org/pkl-k8s/k8s@1.0.1#/api/core/v1/ResourceRequirements.pkl" import "package://pkg.pkl-lang.org/pkl-k8s/k8s@1.0.1#/api/networking/v1/NetworkPolicy.pkl" @@ -40,6 +41,15 @@ class Spec { /// Compute configuration class Compute { + /// Specifies the DNS parameters of the Restate pod. Parameters specified here will be merged to the + /// generated DNS configuration based on DNSPolicy. + dnsConfig: PodSpec.PodDNSConfig? + + /// Set DNS policy for the pod. Defaults to "ClusterFirst". Valid values are 'ClusterFirstWithHostNet', + /// 'ClusterFirst', 'Default' or 'None'. DNS parameters given in DNSConfig will be merged with the + /// policy selected with DNSPolicy. + dnsPolicy: String? + /// List of environment variables to set in the container; these may override defaults env: Listing? @@ -61,10 +71,14 @@ class Compute { /// Security configuration class Security { - /// if set, create a AWS PodIdentityAssociation using the ACK CRD in order to give the Restate pod + /// If set, create an AWS PodIdentityAssociation using the ACK CRD in order to give the Restate pod /// access to this role and allow the cluster to reach the Pod Identity agent. awsPodIdentityAssociationRoleArn: String? + /// If set, create an AWS SecurityGroupPolicy CRD object to place the Restate pod into these security + /// groups + awsPodSecurityGroups: Listing? + /// Egress rules to allow the cluster to make outbound requests; this is in addition to the default of /// allowing public internet access and cluster DNS access. Providing a single empty rule will allow /// all outbound traffic - not recommended diff --git a/crd/crd.yaml b/crd/crd.yaml index 5bb9b14..e919603 100644 --- a/crd/crd.yaml +++ b/crd/crd.yaml @@ -44,6 +44,37 @@ spec: compute: description: Compute configuration properties: + dnsConfig: + description: Specifies the DNS parameters of the Restate pod. Parameters specified here will be merged to the generated DNS configuration based on DNSPolicy. + nullable: true + properties: + nameservers: + description: A list of DNS name server IP addresses. This will be appended to the base nameservers generated from DNSPolicy. Duplicated nameservers will be removed. + items: + type: string + type: array + options: + description: A list of DNS resolver options. This will be merged with the base options generated from DNSPolicy. Duplicated entries will be removed. Resolution options given in Options will override those that appear in the base DNSPolicy. + items: + description: PodDNSConfigOption defines DNS resolver options of a pod. + properties: + name: + description: Required. + type: string + value: + type: string + type: object + type: array + searches: + description: A list of DNS search domains for host-name lookup. This will be appended to the base search paths generated from DNSPolicy. Duplicated search paths will be removed. + items: + type: string + type: array + type: object + dnsPolicy: + description: Set DNS policy for the pod. Defaults to "ClusterFirst". Valid values are 'ClusterFirstWithHostNet', 'ClusterFirst', 'Default' or 'None'. DNS parameters given in DNSConfig will be merged with the policy selected with DNSPolicy. + nullable: true + type: string env: description: List of environment variables to set in the container; these may override defaults items: @@ -178,9 +209,15 @@ spec: nullable: true properties: awsPodIdentityAssociationRoleArn: - description: if set, create a AWS PodIdentityAssociation using the ACK CRD in order to give the Restate pod access to this role and allow the cluster to reach the Pod Identity agent. + description: If set, create an AWS PodIdentityAssociation using the ACK CRD in order to give the Restate pod access to this role and allow the cluster to reach the Pod Identity agent. nullable: true type: string + awsPodSecurityGroups: + description: If set, create an AWS SecurityGroupPolicy CRD object to place the Restate pod into these security groups + items: + type: string + nullable: true + type: array networkEgressRules: description: Egress rules to allow the cluster to make outbound requests; this is in addition to the default of allowing public internet access and cluster DNS access. Providing a single empty rule will allow all outbound traffic - not recommended items: diff --git a/crd/pklgen/generate.pkl b/crd/pklgen/generate.pkl index d199c61..64085d9 100644 --- a/crd/pklgen/generate.pkl +++ b/crd/pklgen/generate.pkl @@ -1,6 +1,7 @@ amends "package://pkg.pkl-lang.org/pkl-pantry/k8s.contrib.crd@1.0.0#/generate.pkl" import "package://pkg.pkl-lang.org/pkl-k8s/k8s@1.0.1#/api/core/v1/ResourceRequirements.pkl" +import "package://pkg.pkl-lang.org/pkl-k8s/k8s@1.0.1#/api/core/v1/PodSpec.pkl" import "package://pkg.pkl-lang.org/pkl-k8s/k8s@1.0.1#/api/networking/v1/NetworkPolicy.pkl" import "package://pkg.pkl-lang.org/pkl-k8s/k8s@1.0.1#/api/core/v1/EnvVar.pkl" @@ -10,6 +11,7 @@ converters { ["restateclusters.restate.dev"] { [List("spec", "compute", "env", "env")] = EnvVar [List("spec", "compute", "resources")] = ResourceRequirements + [List("spec", "compute", "dnsConfig")] = PodSpec.PodDNSConfig [List("spec", "security", "networkEgressRules", "networkEgressRule")] = NetworkPolicy.NetworkPolicyEgressRule [List("spec", "security", "networkPeers", "admin", "admin")] = NetworkPolicy.NetworkPolicyPeer [List("spec", "security", "networkPeers", "ingress", "ingres")] = NetworkPolicy.NetworkPolicyPeer diff --git a/src/controller.rs b/src/controller.rs index b151a6a..f08bfc0 100644 --- a/src/controller.rs +++ b/src/controller.rs @@ -6,10 +6,12 @@ use chrono::{DateTime, Utc}; use futures::StreamExt; use k8s_openapi::api::apps::v1::StatefulSet; use k8s_openapi::api::core::v1::{ - EnvVar, Namespace, PersistentVolumeClaim, Pod, ResourceRequirements, Service, ServiceAccount, + EnvVar, Namespace, PersistentVolumeClaim, Pod, PodDNSConfig, ResourceRequirements, Service, + ServiceAccount, }; use k8s_openapi::api::networking::v1; use k8s_openapi::api::networking::v1::{NetworkPolicy, NetworkPolicyPeer, NetworkPolicyPort}; +use k8s_openapi::apimachinery::pkg::apis::meta::v1::APIGroup; use kube::core::PartialObjectMeta; use kube::runtime::reflector::{ObjectRef, Store}; use kube::runtime::{metadata_watcher, reflector, watcher, WatchStreamExt}; @@ -35,6 +37,7 @@ use crate::podidentityassociations::PodIdentityAssociation; use crate::reconcilers::compute::reconcile_compute; use crate::reconcilers::network_policies::reconcile_network_policies; use crate::reconcilers::object_meta; +use crate::securitygrouppolicies::SecurityGroupPolicy; use crate::{telemetry, Error, Metrics, Result}; pub static RESTATE_CLUSTER_FINALIZER: &str = "clusters.restate.dev"; @@ -171,6 +174,10 @@ pub struct RestateClusterCompute { pub env: Option>, /// Compute Resources for the Restate container. More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/ pub resources: Option, + /// Specifies the DNS parameters of the Restate pod. Parameters specified here will be merged to the generated DNS configuration based on DNSPolicy. + pub dns_config: Option, + /// Set DNS policy for the pod. Defaults to "ClusterFirst". Valid values are 'ClusterFirstWithHostNet', 'ClusterFirst', 'Default' or 'None'. DNS parameters given in DNSConfig will be merged with the policy selected with DNSPolicy. + pub dns_policy: Option, } fn env_schema(g: &mut schemars::gen::SchemaGenerator) -> Schema { @@ -190,9 +197,11 @@ fn env_schema(g: &mut schemars::gen::SchemaGenerator) -> Schema { pub struct RestateClusterSecurity { pub service_annotations: Option>, pub service_account_annotations: Option>, - /// if set, create a AWS PodIdentityAssociation using the ACK CRD in order to give the Restate pod access to this role and + /// If set, create an AWS PodIdentityAssociation using the ACK CRD in order to give the Restate pod access to this role and /// allow the cluster to reach the Pod Identity agent. pub aws_pod_identity_association_role_arn: Option, + /// If set, create an AWS SecurityGroupPolicy CRD object to place the Restate pod into these security groups + pub aws_pod_security_groups: Option>, /// Network peers to allow inbound access to restate ports /// If unset, will not allow any new traffic. Set any of these to [] to allow all traffic - not recommended. pub network_peers: Option, @@ -312,6 +321,8 @@ pub struct Context { pub ss_store: Store, // If set, watch PodIdentityAssociation resources, and if requested create them against this cluster pub aws_pod_identity_association_cluster: Option, + // Whether the EKS SecurityGroupPolicy CRD is installed + pub security_group_policy_installed: bool, /// Diagnostics read by the web server pub diagnostics: Arc>, /// Prometheus metrics @@ -593,12 +604,14 @@ impl State { client: Client, pvc_meta_store: Store>, ss_store: Store, + security_group_policy_installed: bool, ) -> Arc { Arc::new(Context { client, pvc_meta_store, ss_store, aws_pod_identity_association_cluster: self.aws_pod_identity_association_cluster.clone(), + security_group_policy_installed, metrics: Metrics::default().register(&self.registry).unwrap(), diagnostics: self.diagnostics.clone(), }) @@ -610,6 +623,29 @@ pub async fn run(state: State) { let client = Client::try_default() .await .expect("failed to create kube Client"); + + let api_groups = match client.list_api_groups().await { + Ok(list) => list, + Err(e) => { + error!("Could not list api groups: {e:?}"); + std::process::exit(1); + } + }; + + let (security_group_policy_installed, pod_identity_association_installed) = api_groups + .groups + .iter() + .fold((false, false), |(sgp, pia), group| { + fn group_matches>(group: &APIGroup) -> bool { + group.name == R::group(&()) + && group.versions.iter().any(|v| v.version == R::version(&())) + } + ( + sgp || group_matches::(group), + pia || group_matches::(group), + ) + }); + let rc_api = Api::::all(client.clone()); let ns_api = Api::::all(client.clone()); let ss_api = Api::::all(client.clone()); @@ -619,12 +655,11 @@ pub async fn run(state: State) { let np_api = Api::::all(client.clone()); let pia_api = Api::::all(client.clone()); let pod_api = Api::::all(client.clone()); + let sgp_api = Api::::all(client.clone()); - if state.aws_pod_identity_association_cluster.is_some() { - if let Err(e) = pia_api.list(&ListParams::default().limit(1)).await { - error!("PodIdentityAssociation is not queryable; {e:?}. Is the CRD installed?"); - std::process::exit(1); - } + if state.aws_pod_identity_association_cluster.is_some() && !pod_identity_association_installed { + error!("PodIdentityAssociation is not available on apiserver, but a pod identity association cluster was provided. Is the CRD installed?"); + std::process::exit(1); } if let Err(e) = rc_api.list(&ListParams::default().limit(1)).await { @@ -668,18 +703,28 @@ pub async fn run(state: State) { Some(ObjectRef::new(instance)) }, ); - let controller = if state.aws_pod_identity_association_cluster.is_some() { + let controller = if pod_identity_association_installed { controller .owns(pia_api, cfg.clone()) .owns(pod_api, cfg.clone()) } else { controller }; + let controller = if security_group_policy_installed { + controller.owns(sgp_api, cfg.clone()) + } else { + controller + }; controller .run( reconcile, error_policy, - state.to_context(client, pvc_meta_store, ss_store), + state.to_context( + client, + pvc_meta_store, + ss_store, + security_group_policy_installed, + ), ) .filter_map(|x| async move { Result::ok(x) }) .for_each(|_| futures::future::ready(())) diff --git a/src/lib.rs b/src/lib.rs index 9748025..2bb527f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -51,3 +51,4 @@ mod reconcilers; /// External CRDs mod podidentityassociations; +mod securitygrouppolicies; diff --git a/src/reconcilers/compute.rs b/src/reconcilers/compute.rs index 40ab628..01cf87a 100644 --- a/src/reconcilers/compute.rs +++ b/src/reconcilers/compute.rs @@ -21,6 +21,9 @@ use tracing::{debug, warn}; use crate::podidentityassociations::{PodIdentityAssociation, PodIdentityAssociationSpec}; use crate::reconcilers::{label_selector, object_meta, resource_labels}; +use crate::securitygrouppolicies::{ + SecurityGroupPolicy, SecurityGroupPolicySecurityGroups, SecurityGroupPolicySpec, +}; use crate::{Context, Error, RestateClusterCompute, RestateClusterSpec, RestateClusterStorage}; fn restate_service_account( @@ -65,6 +68,22 @@ fn restate_pod_identity_association( } } +fn restate_security_group_policy( + oref: &OwnerReference, + aws_security_groups: &[String], +) -> SecurityGroupPolicy { + SecurityGroupPolicy { + metadata: object_meta(oref, "restate"), + spec: SecurityGroupPolicySpec { + security_groups: Some(SecurityGroupPolicySecurityGroups { + group_ids: Some(aws_security_groups.into()), + }), + pod_selector: Some(label_selector(&oref.name)), + service_account_selector: None, + }, + } +} + fn restate_service( oref: &OwnerReference, annotations: Option<&BTreeMap>, @@ -161,6 +180,8 @@ fn restate_statefulset( }), spec: Some(PodSpec { automount_service_account_token: Some(false), + dns_policy: compute.dns_policy.clone(), + dns_config: compute.dns_config.clone(), containers: vec![Container { name: "restate".into(), image: Some(compute.image.clone()), @@ -266,6 +287,7 @@ pub async fn reconcile_compute( let svcacc_api: Api = Api::namespaced(ctx.client.clone(), namespace); let pia_api: Api = Api::namespaced(ctx.client.clone(), namespace); let pod_api: Api = Api::namespaced(ctx.client.clone(), namespace); + let sgp_api: Api = Api::namespaced(ctx.client.clone(), namespace); apply_service_account( namespace, @@ -279,8 +301,9 @@ pub async fn reconcile_compute( ) .await?; - // Pods MUST roll when these change, so we will apply these parameters as annotations to the pod meta - let pod_annotations: Option> = match ( + let mut pod_annotations: Option> = None; + + match ( ctx.aws_pod_identity_association_cluster.as_ref(), spec.security .as_ref() @@ -310,28 +333,57 @@ pub async fn reconcile_compute( return Err(Error::NotReady { reason: "PodIdentityAssociationCanaryFailed".into(), message: "Canary pod did not receive Pod Identity credentials; PIA webhook may need to catch up".into(), requeue_after: Some(Duration::from_secs(2)) }); } - Some(BTreeMap::from([ - ( - "restate.dev/aws-pod-identity-association-cluster".into(), - aws_pod_identity_association_cluster.clone(), - ), - ( - "restate.dev/aws-pod-identity-association-role-arn".into(), - aws_pod_identity_association_role_arn.clone(), - ), - ])) + // Pods MUST roll when these change, so we will apply these parameters as annotations to the pod meta + let pod_annotations = pod_annotations.get_or_insert_with(Default::default); + pod_annotations.insert( + "restate.dev/aws-pod-identity-association-cluster".into(), + aws_pod_identity_association_cluster.clone(), + ); + pod_annotations.insert( + "restate.dev/aws-pod-identity-association-role-arn".into(), + aws_pod_identity_association_role_arn.clone(), + ); } (Some(_), None) => { delete_pod_identity_association(namespace, &pia_api, "restate").await?; - None } (None, Some(aws_pod_identity_association_role_arn)) => { warn!("Ignoring AWS pod identity association role ARN {aws_pod_identity_association_role_arn} as the operator is not configured with --aws-pod-identity-association-cluster"); - None } - (None, None) => None, + (None, None) => {} }; + match spec + .security + .as_ref() + .and_then(|s| s.aws_pod_security_groups.as_deref()) + { + Some(aws_pod_security_groups) + if ctx.security_group_policy_installed && !aws_pod_security_groups.is_empty() => + { + apply_security_group_policy( + namespace, + &sgp_api, + restate_security_group_policy(oref, aws_pod_security_groups), + ) + .await?; + + let pod_annotations = pod_annotations.get_or_insert_with(Default::default); + // Pods MUST roll when these change, so we will apply the groups as annotations to the pod meta + pod_annotations.insert( + "restate.dev/aws-security-groups".into(), + aws_pod_security_groups.join(","), + ); + } + None | Some(_) if ctx.security_group_policy_installed => { + delete_security_group_policy(namespace, &sgp_api, "restate").await?; + } + Some(aws_pod_security_groups) if !aws_pod_security_groups.is_empty() => { + warn!("Ignoring AWS pod security groups {} as the SecurityGroupPolicy CRD is not installed", aws_pod_security_groups.join(",")); + } + None | Some(_) => {} + } + apply_service( namespace, &svc_api, @@ -508,6 +560,37 @@ async fn delete_pod_identity_association( } } +async fn apply_security_group_policy( + namespace: &str, + pia_api: &Api, + pia: SecurityGroupPolicy, +) -> Result<(), Error> { + let name = pia.metadata.name.as_ref().unwrap(); + let params: PatchParams = PatchParams::apply("restate-operator").force(); + debug!( + "Applying SecurityGroupPolicy {} in namespace {}", + name, namespace + ); + pia_api.patch(name, ¶ms, &Patch::Apply(&pia)).await?; + Ok(()) +} + +async fn delete_security_group_policy( + namespace: &str, + sgp_api: &Api, + name: &str, +) -> Result<(), Error> { + debug!( + "Ensuring SecurityGroupPolicy {} in namespace {} does not exist", + name, namespace + ); + match sgp_api.delete(name, &DeleteParams::default()).await { + Err(kube::Error::Api(kube::error::ErrorResponse { code: 404, .. })) => Ok(()), + Err(err) => Err(err.into()), + Ok(_) => Ok(()), + } +} + async fn resize_statefulset_storage( namespace: &str, oref: &OwnerReference, diff --git a/src/securitygrouppolicies.rs b/src/securitygrouppolicies.rs new file mode 100644 index 0000000..a8b0d78 --- /dev/null +++ b/src/securitygrouppolicies.rs @@ -0,0 +1,49 @@ +// WARNING: generated by kopium - manual changes will be overwritten +// kopium command: kopium securitygrouppolicies.vpcresources.k8s.aws -A +// kopium version: 0.16.5 + +use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector; +use kube::CustomResource; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +/// SecurityGroupPolicySpec defines the desired state of SecurityGroupPolicy +#[derive(CustomResource, Serialize, Deserialize, Clone, Debug, JsonSchema)] +#[kube( + group = "vpcresources.k8s.aws", + version = "v1beta1", + kind = "SecurityGroupPolicy", + plural = "securitygrouppolicies" +)] +#[kube(namespaced)] +pub struct SecurityGroupPolicySpec { + /// A label selector is a label query over a set of resources. The result of matchLabels and matchExpressions are ANDed. An empty label selector matches all objects. A null label selector matches no objects. + #[serde( + default, + skip_serializing_if = "Option::is_none", + rename = "podSelector" + )] + pub pod_selector: Option, + /// GroupIds contains the list of security groups that will be applied to the network interface of the pod matching the criteria. + #[serde( + default, + skip_serializing_if = "Option::is_none", + rename = "securityGroups" + )] + pub security_groups: Option, + /// A label selector is a label query over a set of resources. The result of matchLabels and matchExpressions are ANDed. An empty label selector matches all objects. A null label selector matches no objects. + #[serde( + default, + skip_serializing_if = "Option::is_none", + rename = "serviceAccountSelector" + )] + pub service_account_selector: Option, +} + +/// GroupIds contains the list of security groups that will be applied to the network interface of the pod matching the criteria. +#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema)] +pub struct SecurityGroupPolicySecurityGroups { + /// Groups is the list of EC2 Security Groups Ids that need to be applied to the ENI of a Pod. + #[serde(default, skip_serializing_if = "Option::is_none", rename = "groupIds")] + pub group_ids: Option>, +}