diff --git a/changes/pr3596.yaml b/changes/pr3596.yaml new file mode 100644 index 000000000000..2fd493942642 --- /dev/null +++ b/changes/pr3596.yaml @@ -0,0 +1,5 @@ +enhancement: + - "Allow use of multiple image pull secrets in `KubernetesAgent`, `DaskKubernetesEnvironment` - [#3596](https://github.com/PrefectHQ/prefect/pull/3596)" + +contributor: + - "[James Lamb](https://github.com/jameslamb)" diff --git a/src/prefect/agent/kubernetes/agent.py b/src/prefect/agent/kubernetes/agent.py index 4f60d46234a7..8fac28a5ca72 100644 --- a/src/prefect/agent/kubernetes/agent.py +++ b/src/prefect/agent/kubernetes/agent.py @@ -303,7 +303,10 @@ def generate_job_spec_from_environment( - `IMAGE_PULL_POLICY`: policy for pulling images. Defaults to `"IfNotPresent"`. - `IMAGE_PULL_SECRETS`: name of an existing k8s secret that can be used to pull images. This is necessary if your flow uses an image that is in a non-public - container registry, such as Amazon ECR. + container registry, such as Amazon ECR, or in a public registry that requires + authentication to avoid hitting rate limits. To specify multiple image pull + secrets, provide a comma-delimited string with no spaces, like + `"some-secret,other-secret"`. - `SERVICE_ACCOUNT_NAME`: name of a service account to run the job as. By default, none is specified. - `YAML_TEMPLATE`: a path to where the YAML template should be loaded from. defaults @@ -366,9 +369,19 @@ def generate_job_spec_from_environment( # Use image pull secrets if provided image_pull_secrets = os.getenv("IMAGE_PULL_SECRETS") if image_pull_secrets: - job["spec"]["template"]["spec"]["imagePullSecrets"][0][ - "name" - ] = image_pull_secrets + secrets = image_pull_secrets.split(",") + for idx, secret_name in enumerate(secrets): + # this check preserves behavior from previous releases, + # where prefect would only overwrite the first entry in + # imagePullSecrets + if idx == 0: + job["spec"]["template"]["spec"]["imagePullSecrets"][0] = { + "name": secret_name + } + else: + job["spec"]["template"]["spec"]["imagePullSecrets"].append( + {"name": secret_name} + ) else: del job["spec"]["template"]["spec"]["imagePullSecrets"] diff --git a/src/prefect/environments/execution/dask/k8s.py b/src/prefect/environments/execution/dask/k8s.py index 7d658ed665d7..8c2e2ebc28a0 100644 --- a/src/prefect/environments/execution/dask/k8s.py +++ b/src/prefect/environments/execution/dask/k8s.py @@ -79,7 +79,9 @@ class DaskKubernetesEnvironment(Environment): - scheduler_spec_file (str, optional): Path to a scheduler spec YAML file - worker_spec_file (str, optional): Path to a worker spec YAML file - image_pull_secret (str, optional): optional name of an `imagePullSecret` to use for - the scheduler and worker pods. For more information go + the scheduler and worker pods. To specify multiple image pull secrets, provide a comma + delimited string with no spaces, like `"some-secret,other-secret"`. + For more information go [here](https://kubernetes.io/docs/tasks/configure-pod-container/pull-image-private-registry/). `Warning`: `image_pull_secret` if provided won't be appended to your custom `worker_spec_file` or `scheduler_spec_file`. If you want it, don't forget to add it in @@ -389,7 +391,9 @@ def _populate_job_yaml(self, yaml_obj: dict, docker_name: str) -> dict: pod_spec["imagePullSecrets"].append({"name": namespace + "-docker"}) elif self.image_pull_secret: pod_spec["imagePullSecrets"] = [] - pod_spec["imagePullSecrets"].append({"name": self.image_pull_secret}) + secrets = self.image_pull_secret.split(",") + for secret_name in secrets: + pod_spec["imagePullSecrets"].append({"name": secret_name}) env[0]["value"] = prefect.config.cloud.graphql env[1]["value"] = prefect.config.cloud.auth_token @@ -432,7 +436,9 @@ def _populate_worker_pod_yaml(self, yaml_obj: dict) -> dict: pod_spec["imagePullSecrets"].append({"name": namespace + "-docker"}) elif self.image_pull_secret: pod_spec["imagePullSecrets"] = [] - pod_spec["imagePullSecrets"].append({"name": self.image_pull_secret}) + secrets = self.image_pull_secret.split(",") + for secret_name in secrets: + pod_spec["imagePullSecrets"].append({"name": secret_name}) # set image yaml_obj["spec"]["containers"][0]["image"] = prefect.context.get( diff --git a/tests/agent/test_k8s_agent.py b/tests/agent/test_k8s_agent.py index dfe0a1be325f..35350edb6f68 100644 --- a/tests/agent/test_k8s_agent.py +++ b/tests/agent/test_k8s_agent.py @@ -273,6 +273,45 @@ def test_k8s_agent_replace_yaml_uses_user_env_vars(monkeypatch, cloud_api): ) assert job["spec"]["template"]["spec"]["serviceAccountName"] == "svc_name" + assert job["spec"]["template"]["spec"]["imagePullSecrets"] == [ + {"name": "my-secret"} + ] + + +def test_k8s_agent_replace_yaml_respects_multiple_image_secrets(monkeypatch, cloud_api): + get_jobs = MagicMock(return_value=[]) + monkeypatch.setattr( + "prefect.agent.kubernetes.agent.KubernetesAgent.manage_jobs", + get_jobs, + ) + + monkeypatch.setenv("IMAGE_PULL_SECRETS", "some-secret,other-secret") + monkeypatch.setenv("IMAGE_PULL_POLICY", "custom_policy") + + flow_run = GraphQLResult( + { + "flow": GraphQLResult( + { + "storage": Docker( + registry_url="test", image_name="name", image_tag="tag" + ).serialize(), + "environment": LocalEnvironment().serialize(), + "id": "new_id", + "core_version": "0.13.0", + } + ), + "id": "id", + } + ) + + with set_temporary_config( + {"cloud.agent.auth_token": "token", "logging.log_to_cloud": True} + ): + agent = KubernetesAgent(env_vars=dict(AUTH_THING="foo", PKG_SETTING="bar")) + job = agent.generate_job_spec_from_environment(flow_run, image="test/name:tag") + expected_secrets = [{"name": "some-secret"}, {"name": "other-secret"}] + assert job["spec"]["template"]["spec"]["imagePullSecrets"] == expected_secrets + def test_k8s_agent_replace_yaml(monkeypatch, cloud_api): get_jobs = MagicMock(return_value=[]) diff --git a/tests/environments/execution/test_dask_k8s_environment.py b/tests/environments/execution/test_dask_k8s_environment.py index afd0e7513667..f34a5d1355ff 100644 --- a/tests/environments/execution/test_dask_k8s_environment.py +++ b/tests/environments/execution/test_dask_k8s_environment.py @@ -56,6 +56,20 @@ def test_create_dask_environment_args(): assert environment.image_pull_secret == "secret" +def test_create_dask_environment_multiple_image_secrets_in_args(): + environment = DaskKubernetesEnvironment( + min_workers=5, + max_workers=6, + work_stealing=False, + scheduler_logs=True, + private_registry=True, + docker_secret="docker", + metadata={"test": "here"}, + image_pull_secret="some-cred,different-cred", + ) + assert environment.image_pull_secret == "some-cred,different-cred" + + def test_create_dask_environment_labels(): environment = DaskKubernetesEnvironment(labels=["foo"]) assert environment.labels == set(["foo"]) @@ -288,6 +302,32 @@ def test_populate_job_yaml(): ) +def test_populate_job_yaml_multiple_image_secrets(): + environment = DaskKubernetesEnvironment( + image_pull_secret="good-secret,dangerous-secret" + ) + + file_path = os.path.dirname(prefect.environments.execution.dask.k8s.__file__) + + with open(path.join(file_path, "job.yaml")) as job_file: + job = yaml.safe_load(job_file) + + with set_temporary_config( + { + "cloud.graphql": "gql_test", + "cloud.auth_token": "auth_test", + "logging.extra_loggers": ["test_logger"], + } + ): + with prefect.context(flow_run_id="id_test", namespace="namespace_test"): + yaml_obj = environment._populate_job_yaml( + yaml_obj=job, docker_name="test1/test2:test3" + ) + + expected_secrets = [dict(name="good-secret"), dict(name="dangerous-secret")] + assert yaml_obj["spec"]["template"]["spec"]["imagePullSecrets"] == expected_secrets + + def test_populate_worker_pod_yaml(): environment = DaskKubernetesEnvironment() @@ -363,6 +403,28 @@ def test_populate_worker_pod_yaml_with_image_pull_secret(): yaml_obj["spec"]["imagePullSecrets"][0] == dict(name="mysecret") +def test_populate_worker_pod_yaml_with_multiple_image_pull_secrets(): + environment = DaskKubernetesEnvironment(image_pull_secret="some-secret,another-one") + + file_path = os.path.dirname(prefect.environments.execution.dask.k8s.__file__) + + with open(path.join(file_path, "worker_pod.yaml")) as pod_file: + pod = yaml.safe_load(pod_file) + + with set_temporary_config( + {"cloud.graphql": "gql_test", "cloud.auth_token": "auth_test"} + ): + with prefect.context( + flow_run_id="id_test", image="my_image", namespace="foo-man" + ): + yaml_obj = environment._populate_worker_pod_yaml(yaml_obj=pod) + + assert yaml_obj["spec"]["imagePullSecrets"] == [ + dict(name="some-secret"), + dict(name="another-one"), + ] + + def test_initialize_environment_with_spec_populates(monkeypatch): with tempfile.TemporaryDirectory() as directory: