diff --git a/misc/helm-charts/environmentd/README.md b/misc/helm-charts/environmentd/README.md index 175ebf8f36f4f..8e3faf8356022 100644 --- a/misc/helm-charts/environmentd/README.md +++ b/misc/helm-charts/environmentd/README.md @@ -52,7 +52,7 @@ The following table lists the configurable parameters of the Materialize environ | `environment.environmentdMemoryAllocation` | | ``"1Gi"`` | | `environment.forceRollout` | | ``"33333333-3333-3333-3333-333333333333"`` | | `environment.inPlaceRollout` | | ``false`` | -| `environment.name` | | ``"environment-12345678-1234-1234-1234-123456789012"`` | +| `environment.name` | | ``"12345678-1234-1234-1234-123456789012"`` | | `environment.requestRollout` | | ``"22222222-2222-2222-2222-222222222222"`` | | `environment.secret.metadataBackendUrl` | | ``"postgres://materialize_user:materialize_pass@postgres.materialize-environment.svc.cluster.local:5432/materialize_db?sslmode=disable"`` | | `environment.secret.persistBackendUrl` | | ``"s3://minio:minio123@bucket/12345678-1234-1234-1234-123456789012?endpoint=http%3A%2F%2Fminio.materialize-environment.svc.cluster.local%3A9000®ion=minio"`` | diff --git a/misc/helm-charts/environmentd/values.yaml b/misc/helm-charts/environmentd/values.yaml index c9ff40a57194a..7ed849df83d85 100644 --- a/misc/helm-charts/environmentd/values.yaml +++ b/misc/helm-charts/environmentd/values.yaml @@ -17,7 +17,7 @@ namespace: # Environment configuration for Materialize environment: # The name of the environment to be deployed, this should be a UUID - name: "environment-12345678-1234-1234-1234-123456789012" + name: "12345678-1234-1234-1234-123456789012" # Docker image reference for the Materialize `environmentd` service environmentdImageRef: "materialize/environmentd:v0.122.0-dev.0--pr.g8bb641fc00c77f98ba5556dcdca43670776eacfa" # Optional additional arguments to be passed to `environmentd` diff --git a/misc/helm-charts/operator/README.md b/misc/helm-charts/operator/README.md index a4b1f789ebc8a..6030867b19815 100644 --- a/misc/helm-charts/operator/README.md +++ b/misc/helm-charts/operator/README.md @@ -37,6 +37,8 @@ The following table lists the configurable parameters of the Materialize operato | Parameter | Description | Default | |-----------|-------------|---------| +| `clusterd.nodeSelector` | | ``{}`` | +| `environmentd.nodeSelector` | | ``{}`` | | `namespace.create` | | ``false`` | | `namespace.name` | | ``"materialize"`` | | `networkPolicies.enabled` | | ``false`` | @@ -44,13 +46,13 @@ The following table lists the configurable parameters of the Materialize operato | `observability.enabled` | | ``false`` | | `observability.prometheus.enabled` | | ``false`` | | `operator.args.cloudProvider` | | ``"local"`` | -| `operator.args.environmentdTargetArch` | | ``"arm64"`` | | `operator.args.localDevelopment` | | ``true`` | | `operator.args.region` | | ``"kind"`` | | `operator.args.startupLogFilter` | | ``"INFO,mz_orchestratord=TRACE"`` | | `operator.image.pullPolicy` | | ``"IfNotPresent"`` | | `operator.image.repository` | | ``"materialize/orchestratord"`` | | `operator.image.tag` | | ``"v0.122.0-dev.0--pr.g8bb641fc00c77f98ba5556dcdca43670776eacfa"`` | +| `operator.nodeSelector` | | ``{}`` | | `operator.resources.limits.cpu` | | ``"500m"`` | | `operator.resources.limits.memory` | | ``"512Mi"`` | | `operator.resources.requests.cpu` | | ``"100m"`` | diff --git a/misc/helm-charts/operator/templates/deployment.yaml b/misc/helm-charts/operator/templates/deployment.yaml index ac47aa86b8d9d..fc1e67722e6a4 100644 --- a/misc/helm-charts/operator/templates/deployment.yaml +++ b/misc/helm-charts/operator/templates/deployment.yaml @@ -26,6 +26,10 @@ spec: {{- include "materialize-operator.selectorLabels" . | nindent 8 }} spec: serviceAccountName: {{ include "materialize-operator.serviceAccountName" . }} + {{- if .Values.operator.nodeSelector }} + nodeSelector: + {{- toYaml .Values.operator.nodeSelector | nindent 8 }} + {{- end }} containers: - name: {{ .Chart.Name }} image: "{{ .Values.operator.image.repository }}:{{ .Values.operator.image.tag }}" @@ -37,6 +41,12 @@ spec: {{- if .Values.operator.args.localDevelopment }} - "--local-development" {{- end }} - - "--environmentd-target-arch={{ .Values.operator.args.environmentdTargetArch }}" + - "--image-pull-policy={{ kebabcase .Values.operator.image.pullPolicy }}" + {{- range $key, $value := .Values.environmentd.nodeSelector }} + - "--environmentd-node-selector={{ $key }}={{ $value }}" + {{- end }} + {{- range $key, $value := .Values.clusterd.nodeSelector }} + - "--clusterd-node-selector={{ $key }}={{ $value }}" + {{- end }} resources: {{- toYaml .Values.operator.resources | nindent 10 }} diff --git a/misc/helm-charts/operator/templates/service.yaml b/misc/helm-charts/operator/templates/service.yaml deleted file mode 100644 index b11cce0ac9330..0000000000000 --- a/misc/helm-charts/operator/templates/service.yaml +++ /dev/null @@ -1,25 +0,0 @@ -# Copyright Materialize, Inc. and contributors. All rights reserved. -# -# Use of this software is governed by the Business Source License -# included in the LICENSE file at the root of this repository. -# -# As of the Change Date specified in that file, in accordance with -# the Business Source License, use of this software will be governed -# by the Apache License, Version 2.0. - ---- -apiVersion: v1 -kind: Service -metadata: - name: {{ include "materialize-operator.fullname" . }} - labels: - {{- include "materialize-operator.labels" . | nindent 4 }} -spec: - type: ClusterIP - ports: - - port: 80 - targetPort: http - protocol: TCP - name: http - selector: - {{- include "materialize-operator.selectorLabels" . | nindent 4 }} diff --git a/misc/helm-charts/operator/templates/serviceaccount.yaml b/misc/helm-charts/operator/templates/serviceaccount.yaml index bc15813638ec3..9a5af4a68f17c 100644 --- a/misc/helm-charts/operator/templates/serviceaccount.yaml +++ b/misc/helm-charts/operator/templates/serviceaccount.yaml @@ -12,6 +12,7 @@ apiVersion: v1 kind: ServiceAccount metadata: name: {{ include "materialize-operator.serviceAccountName" . }} + namespace: {{ .Values.namespace.name }} labels: {{- include "materialize-operator.labels" . | nindent 4 }} diff --git a/misc/helm-charts/operator/values.yaml b/misc/helm-charts/operator/values.yaml index 171e38dc7e5d7..d3d4b93d0a3ad 100644 --- a/misc/helm-charts/operator/values.yaml +++ b/misc/helm-charts/operator/values.yaml @@ -25,8 +25,8 @@ operator: region: "kind" # Flag to indicate whether the environment is for local development localDevelopment: true - # Target architecture for environmentd (set to "arm64" for ARM systems, can also be "amd64" for Intel/AMD) - environmentdTargetArch: "arm64" # amd64 + # Node selector to use for the operator pod + nodeSelector: {} resources: # Resources requested by the operator for CPU and memory requests: @@ -36,6 +36,14 @@ operator: limits: memory: 512Mi +environmentd: + # Node selector to use for environmentd pods spawned by the operator + nodeSelector: {} + +clusterd: + # Node selector to use for clusterd pods spawned by the operator + nodeSelector: {} + # RBAC (Role-Based Access Control) settings rbac: # Whether to create necessary RBAC roles and bindings diff --git a/src/cloud-resources/src/crd/materialize.rs b/src/cloud-resources/src/crd/materialize.rs index 9d048ee84372a..d6d7eb961a797 100644 --- a/src/cloud-resources/src/crd/materialize.rs +++ b/src/cloud-resources/src/crd/materialize.rs @@ -82,26 +82,6 @@ pub mod v1alpha1 { format!("materialize-backend-{}", self.name_unchecked()) } - pub fn cockroach_database_name(&self) -> String { - format!("materialize_{}", self.name_unchecked()) - } - - pub fn cockroach_role_name(&self) -> String { - self.cockroach_database_name() - } - - pub fn oidc_sub(&self) -> String { - format!( - "system:serviceaccount:{}:{}", - self.name_unchecked(), - self.service_account_name() - ) - } - - pub fn persistence_bucket_prefix(&self) -> String { - self.oidc_sub() - } - pub fn namespace(&self) -> String { self.meta().namespace.clone().unwrap() } @@ -118,28 +98,20 @@ pub mod v1alpha1 { self.name_unchecked() } - pub fn environmentd_statefulset_name(generation: u64) -> String { - format!("environmentd-{generation}") - } - - pub fn environmentd_container_name() -> String { - "environmentd".to_string() - } - - pub fn environmentd_service_name() -> String { - "environmentd".to_string() + pub fn environmentd_statefulset_name(&self, generation: u64) -> String { + format!("environmentd-{}-{generation}", self.name_unchecked()) } - pub fn environmentd_generation_service_name(generation: u64) -> String { - format!("environmentd-{generation}") + pub fn environmentd_service_name(&self) -> String { + format!("environmentd-{}", self.name_unchecked()) } - pub fn environmentd_pod_name(generation: u64) -> String { - format!("{}-0", Self::environmentd_statefulset_name(generation)) + pub fn environmentd_generation_service_name(&self, generation: u64) -> String { + format!("environmentd-{}-{generation}", self.name_unchecked()) } - pub fn persist_pubsub_service_name(generation: u64) -> String { - format!("persist-pubsub-{generation}") + pub fn persist_pubsub_service_name(&self, generation: u64) -> String { + format!("persist-pubsub-{}-{generation}", self.name_unchecked()) } pub fn default_labels(&self) -> BTreeMap { diff --git a/src/orchestrator-kubernetes/src/lib.rs b/src/orchestrator-kubernetes/src/lib.rs index 9ac661231ba92..c8247125f2788 100644 --- a/src/orchestrator-kubernetes/src/lib.rs +++ b/src/orchestrator-kubernetes/src/lib.rs @@ -116,6 +116,16 @@ impl fmt::Display for KubernetesImagePullPolicy { } } +impl KubernetesImagePullPolicy { + pub fn as_kebab_case_str(&self) -> &'static str { + match self { + Self::Always => "always", + Self::IfNotPresent => "if-not-present", + Self::Never => "never", + } + } +} + /// An orchestrator backed by Kubernetes. pub struct KubernetesOrchestrator { client: Client, diff --git a/src/orchestratord/src/controller/materialize.rs b/src/orchestratord/src/controller/materialize.rs index 4fbabbf8c577a..fe9f344d826de 100644 --- a/src/orchestratord/src/controller/materialize.rs +++ b/src/orchestratord/src/controller/materialize.rs @@ -21,9 +21,9 @@ use tracing::{debug, trace}; use crate::metrics::Metrics; use mz_cloud_resources::crd::materialize::v1alpha1::{Materialize, MaterializeStatus}; +use mz_orchestrator_kubernetes::KubernetesImagePullPolicy; use mz_orchestrator_tracing::TracingCliArgs; -use mz_ore::cast::CastFrom; -use mz_ore::instrument; +use mz_ore::{cast::CastFrom, cli::KeyValueArg, instrument}; mod resources; @@ -35,26 +35,21 @@ pub struct Args { region: String, #[clap(long)] local_development: bool, - #[clap(long)] - environmentd_target_arch: String, #[clap(flatten)] aws_info: AwsInfo, #[clap(long)] - persist_bucket: Option, - - #[clap(long)] - frontegg_jwk: Option, - #[clap(long)] - frontegg_url: Option, + scheduler_name: Option, #[clap(long)] - frontegg_admin_role: Option, + enable_security_context: bool, #[clap(long)] - scheduler_name: Option, + environmentd_node_selector: Vec>, #[clap(long)] - enable_security_context: bool, + clusterd_node_selector: Vec>, + #[clap(long, default_value = "always", arg_enum)] + image_pull_policy: KubernetesImagePullPolicy, #[clap(long, default_value_t = default_cluster_replica_sizes())] environmentd_cluster_replica_sizes: String, @@ -348,7 +343,7 @@ impl k8s_controller::Context for Context { resources.promote_services(&client, &mz.namespace()).await?; if increment_generation { resources - .teardown_generation(&client, &mz.namespace(), active_generation) + .teardown_generation(&client, mz, active_generation) .await?; } self.update_status( @@ -376,7 +371,7 @@ impl k8s_controller::Context for Context { } Err(e) => { resources - .teardown_generation(&client, &mz.namespace(), next_generation) + .teardown_generation(&client, mz, next_generation) .await?; self.update_status( &mz_api, @@ -410,7 +405,7 @@ impl k8s_controller::Context for Context { let mut needs_update = mz.conditions_need_update(); if mz.update_in_progress() { resources - .teardown_generation(&client, &mz.namespace(), next_generation) + .teardown_generation(&client, mz, next_generation) .await?; needs_update = true; } @@ -448,7 +443,7 @@ impl k8s_controller::Context for Context { let mut needs_update = mz.conditions_need_update() || mz.rollout_requested(); if mz.update_in_progress() { resources - .teardown_generation(&client, &mz.namespace(), next_generation) + .teardown_generation(&client, mz, next_generation) .await?; needs_update = true; } diff --git a/src/orchestratord/src/controller/materialize/resources.rs b/src/orchestratord/src/controller/materialize/resources.rs index 53e010cd44147..075627afb3209 100644 --- a/src/orchestratord/src/controller/materialize/resources.rs +++ b/src/orchestratord/src/controller/materialize/resources.rs @@ -14,8 +14,7 @@ use k8s_openapi::{ api::{ apps::v1::{StatefulSet, StatefulSetSpec, StatefulSetUpdateStrategy}, core::v1::{ - Affinity, Capabilities, Container, ContainerPort, EnvVar, EnvVarSource, NodeAffinity, - NodeSelector, NodeSelectorRequirement, NodeSelectorTerm, Pod, PodSecurityContext, + Capabilities, Container, ContainerPort, EnvVar, EnvVarSource, Pod, PodSecurityContext, PodSpec, PodTemplateSpec, Probe, ResourceRequirements, SeccompProfile, SecretKeySelector, SecurityContext, Service, ServiceAccount, ServicePort, ServiceSpec, TCPSocketAction, Toleration, @@ -160,7 +159,7 @@ impl Resources { } let environmentd_url = - environmentd_internal_http_address(args, namespace, self.generation); + environmentd_internal_http_address(args, namespace, &*self.generation_service); let http_client = reqwest::Client::builder() .timeout(std::time::Duration::from_secs(10)) @@ -236,15 +235,7 @@ impl Resources { } } else { trace!("restarting environmentd pod to pick up statefulset changes"); - let Some(generation) = self - .statefulset - .annotations() - .get("materialize.cloud/generation") - .and_then(|generation| generation.parse().ok()) - else { - bail!("failed to restart environmentd pod: missing generation"); - }; - delete_resource(&pod_api, &Materialize::environmentd_pod_name(generation)).await?; + delete_resource(&pod_api, &statefulset_pod_name(&*self.statefulset, 0)).await?; } Ok(None) @@ -268,30 +259,26 @@ impl Resources { pub async fn teardown_generation( &self, client: &Client, - namespace: &str, + mz: &Materialize, generation: u64, ) -> Result<(), anyhow::Error> { - let service_api: Api = Api::namespaced(client.clone(), namespace); - let statefulset_api: Api = Api::namespaced(client.clone(), namespace); + let service_api: Api = Api::namespaced(client.clone(), &mz.namespace()); + let statefulset_api: Api = Api::namespaced(client.clone(), &mz.namespace()); trace!("deleting environmentd statefulset for generation {generation}"); delete_resource( &statefulset_api, - &Materialize::environmentd_statefulset_name(generation), + &mz.environmentd_statefulset_name(generation), ) .await?; trace!("deleting persist pubsub service for generation {generation}"); - delete_resource( - &service_api, - &Materialize::persist_pubsub_service_name(generation), - ) - .await?; + delete_resource(&service_api, &mz.persist_pubsub_service_name(generation)).await?; trace!("deleting environmentd per-generation service for generation {generation}"); delete_resource( &service_api, - &Materialize::environmentd_generation_service_name(generation), + &mz.environmentd_generation_service_name(generation), ) .await?; @@ -484,12 +471,7 @@ fn create_public_service_object( mz: &Materialize, generation: u64, ) -> Service { - create_base_service_object( - config, - mz, - generation, - &Materialize::environmentd_service_name(), - ) + create_base_service_object(config, mz, generation, &mz.environmentd_service_name()) } fn create_generation_service_object( @@ -501,7 +483,7 @@ fn create_generation_service_object( config, mz, generation, - &Materialize::environmentd_generation_service_name(generation), + &mz.environmentd_generation_service_name(generation), ) } @@ -538,7 +520,7 @@ fn create_base_service_object( }, ]; - let selector = btreemap! {"materialize.cloud/name".to_string() => Materialize::environmentd_pod_name(generation)}; + let selector = btreemap! {"materialize.cloud/name".to_string() => mz.environmentd_statefulset_name(generation)}; let spec = if config.local_development { ServiceSpec { @@ -571,12 +553,12 @@ fn create_persist_pubsub_service( generation: u64, ) -> Service { Service { - metadata: mz.managed_resource_meta(Materialize::persist_pubsub_service_name(generation)), + metadata: mz.managed_resource_meta(mz.persist_pubsub_service_name(generation)), spec: Some(ServiceSpec { type_: Some("ClusterIP".to_string()), cluster_ip: Some("None".to_string()), selector: Some(btreemap! { - "materialize.cloud/name".to_string() => Materialize::environmentd_pod_name(generation), + "materialize.cloud/name".to_string() => mz.environmentd_statefulset_name(generation), }), ports: Some(vec![ServicePort { name: Some("grpc".to_string()), @@ -774,13 +756,10 @@ fn create_statefulset_object( args.push(format!("--aws-account-id={account_id}")); } - args.extend([ - format!("--aws-secrets-controller-tags=Owner={}", mz.oidc_sub()), - format!( - "--aws-secrets-controller-tags=Environment={}", - mz.name_unchecked() - ), - ]); + args.extend([format!( + "--aws-secrets-controller-tags=Environment={}", + mz.name_unchecked() + )]); args.extend_from_slice(&config.aws_info.aws_secrets_controller_tags); } @@ -791,9 +770,17 @@ fn create_statefulset_object( "--orchestrator-kubernetes-service-account={}", &mz.service_account_name() ), - "--orchestrator-kubernetes-service-node-selector=workload=materialize-instance".into(), - "--orchestrator-kubernetes-image-pull-policy=if-not-present".into(), + format!( + "--orchestrator-kubernetes-image-pull-policy={}", + config.image_pull_policy.as_kebab_case_str(), + ), ]); + for selector in &config.clusterd_node_selector { + args.push(format!( + "--orchestrator-kubernetes-service-node-selector={}={}", + selector.key, selector.value, + )); + } if let Some(scheduler_name) = &config.scheduler_name { args.push(format!( "--orchestrator-kubernetes-scheduler-name={}", @@ -857,32 +844,10 @@ fn create_statefulset_object( ), ]); - // Add Frontegg arguments. - if let (Some(jwk_pem), Some(url), Some(admin_role)) = ( - &config.frontegg_jwk, - &config.frontegg_url, - &config.frontegg_admin_role, - ) { - args.extend([ - format!("--frontegg-tenant={}", mz.name_unchecked()), - format!("--frontegg-jwk={jwk_pem}"), - format!("--frontegg-api-token-url={url}/identity/resources/auth/v1/api-token",), - format!("--frontegg-admin-role={admin_role}"), - ]); - } - - // Add storage retention arguments -- see - // https://github.com/MaterializeInc/cloud/issues/5142#issuecomment-1385496948 - // for why 14 months in particular - let retention_days = 427; - args.push(format!( - "--storage-usage-retention-period={retention_days}days" - )); - // Add Persist PubSub arguments args.push(format!( "--persist-pubsub-url=http://{}:{}", - Materialize::persist_pubsub_service_name(generation), + mz.persist_pubsub_service_name(generation), config.environmentd_internal_persist_pubsub_port, )); args.push(format!( @@ -963,8 +928,9 @@ fn create_statefulset_object( ]; let container = Container { - name: Materialize::environmentd_container_name().to_owned(), + name: "environmentd".to_owned(), image: Some(mz.spec.environmentd_image_ref.to_owned()), + image_pull_policy: Some(config.image_pull_policy.to_string()), ports: Some(ports), command, args: Some(args), @@ -983,11 +949,11 @@ fn create_statefulset_object( let mut pod_template_labels = mz.default_labels(); pod_template_labels.insert( "materialize.cloud/name".to_owned(), - Materialize::environmentd_pod_name(generation), + mz.environmentd_statefulset_name(generation), ); pod_template_labels.insert( "materialize.cloud/app".to_owned(), - Materialize::environmentd_service_name(), + mz.environmentd_service_name(), ); pod_template_labels.insert("app".to_owned(), "environmentd".to_string()); @@ -1001,31 +967,6 @@ fn create_statefulset_object( "materialize.cloud/generation".to_owned() => generation.to_string(), }; - let affinity = Some(Affinity { - node_affinity: Some(NodeAffinity { - preferred_during_scheduling_ignored_during_execution: None, - required_during_scheduling_ignored_during_execution: Some(NodeSelector { - node_selector_terms: vec![NodeSelectorTerm { - match_expressions: Some(vec![ - NodeSelectorRequirement { - key: "workload".to_owned(), - operator: "In".to_owned(), - values: Some(vec!["materialize-instance".to_owned()]), - }, - NodeSelectorRequirement { - key: "kubernetes.io/arch".to_owned(), - operator: "In".to_owned(), - values: Some(vec![config.environmentd_target_arch.clone()]), - }, - ]), - match_fields: None, - }], - }), - }), - pod_affinity: None, - pod_anti_affinity: None, - }); - let tolerations = Some(vec![ // When the node becomes `NotReady` it indicates there is a problem with the node, // By default kubernetes waits 300s (5 minutes) before doing anything in this case, @@ -1056,7 +997,13 @@ fn create_statefulset_object( }), spec: Some(PodSpec { containers: vec![container], - affinity, + node_selector: Some( + config + .environmentd_node_selector + .iter() + .map(|selector| (selector.key.clone(), selector.value.clone())) + .collect(), + ), scheduler_name: config.scheduler_name.clone(), service_account_name: Some(mz.service_account_name()), volumes: None, @@ -1095,7 +1042,7 @@ fn create_statefulset_object( let mut match_labels = BTreeMap::new(); match_labels.insert( "materialize.cloud/name".to_owned(), - Materialize::environmentd_pod_name(generation), + mz.environmentd_statefulset_name(generation), ); let statefulset_spec = StatefulSetSpec { @@ -1105,7 +1052,7 @@ fn create_statefulset_object( rolling_update: None, type_: Some("OnDelete".to_owned()), }), - service_name: Materialize::environmentd_service_name(), + service_name: mz.environmentd_service_name(), selector: LabelSelector { match_expressions: None, match_labels: Some(match_labels), @@ -1119,7 +1066,7 @@ fn create_statefulset_object( "materialize.cloud/generation".to_owned() => generation.to_string(), "materialize.cloud/force".to_owned() => mz.spec.force_rollout.to_string(), }), - ..mz.managed_resource_meta(Materialize::environmentd_statefulset_name(generation)) + ..mz.managed_resource_meta(mz.environmentd_statefulset_name(generation)) }, spec: Some(statefulset_spec), status: None, @@ -1141,16 +1088,20 @@ enum BecomeLeaderResult { fn environmentd_internal_http_address( args: &super::Args, namespace: &str, - generation: u64, + generation_service: &Service, ) -> String { let host = if let Some(host_override) = &args.environmentd_internal_http_host_override { host_override.to_string() } else { format!( "{}.{}.svc.cluster.local", - Materialize::environmentd_generation_service_name(generation), + generation_service.name_unchecked(), namespace, ) }; format!("{}:{}", host, args.environmentd_internal_http_port) } + +fn statefulset_pod_name(statefulset: &StatefulSet, idx: u64) -> String { + format!("{}-{}", statefulset.name_unchecked(), idx) +}