From 7e78e71ed9334e0400a49db254ac4905f0525d24 Mon Sep 17 00:00:00 2001 From: Raghav Mangla <97332401+RaghavMangla@users.noreply.github.com> Date: Wed, 23 Oct 2024 22:24:47 +0530 Subject: [PATCH 1/8] Clarify behavior of interruptible map tasks (#5845) * Update map_tasks.md Removed the retries option from map_tasks.md Signed-off-by: Raghav Mangla <97332401+RaghavMangla@users.noreply.github.com> * reverted back changes in map_tasks and made suggested changes in task retries docs Signed-off-by: RaghavMangla * minor fix in optimizing_tasks.md Signed-off-by: RaghavMangla * changes made Signed-off-by: RaghavMangla * updated Signed-off-by: RaghavMangla --------- Signed-off-by: Raghav Mangla <97332401+RaghavMangla@users.noreply.github.com> Signed-off-by: RaghavMangla --- .../flyte_fundamentals/optimizing_tasks.md | 48 +++++++++++++++---- 1 file changed, 38 insertions(+), 10 deletions(-) diff --git a/docs/user_guide/flyte_fundamentals/optimizing_tasks.md b/docs/user_guide/flyte_fundamentals/optimizing_tasks.md index 00b3c693f1..6796876bb6 100644 --- a/docs/user_guide/flyte_fundamentals/optimizing_tasks.md +++ b/docs/user_guide/flyte_fundamentals/optimizing_tasks.md @@ -52,16 +52,26 @@ represents the cache key. Learn more in the {ref}`User Guide float: @@ -70,10 +80,28 @@ def compute_mean(data: List[float]) -> float: return sum(data) / len(data) ``` -```{note} -Retries only take effect when running a task on a Flyte cluster. -See {ref}`Fault Tolerance ` for details on the types of errors that will be retried. -``` + +- **System Errors**: Managed at the platform level through settings like `max-node-retries-system-failures` in the FlytePropeller configuration. This setting helps manage retries without requiring changes to the task code. + + Additionally, the `interruptible-failure-threshold` option in the node-config key defines how many system-level retries are considered interruptible. This is particularly useful for tasks running on preemptible instances. + + For more details, refer to the [Flyte Propeller Configuration](https://docs.flyte.org/en/latest/deployment/configuration/generated/flytepropeller_config.html#config-nodeconfig). + + +### Interruptible Tasks and Map Tasks + +Tasks marked as interruptible can be preempted and retried without counting against the USER error budget. This is useful for tasks running on preemptible compute resources like spot instances. + +For map tasks, the interruptible behavior aligns with that of regular tasks. The `retries` field in the task annotation is not necessary for handling SYSTEM errors, as these are managed by the platform's configuration. Alternatively, the USER budget is set by defining retries in the task decorator. + +Map Tasks: The behavior of interruptible tasks extends seamlessly to map tasks. The platform's configuration manages SYSTEM errors, ensuring consistency across task types without additional task-level settings. + +### Advanced Retry Policies + +Flyte supports advanced configurations that allow more granular control over retry behavior, such as specifying the number of retries that can be interruptible. This advanced setup helps in finely tuning the task executions based on the criticality and resource availability. + +For a deeper dive into configuring retries and understanding their impact, see the [Fault Tolerance](https://docs.flyte.org/en/latest/concepts/fault-tolerance.html) section in the Flyte documentation. + ## Timeouts From 8bd573e7c83e09f7e59fce8e8b1dee5698121861 Mon Sep 17 00:00:00 2001 From: Shivam Sharma <66767992+10sharmashivam@users.noreply.github.com> Date: Wed, 23 Oct 2024 23:51:09 +0530 Subject: [PATCH 2/8] [Docs] Simplifying for better user understanding (#5878) * Doc simplifying for better user understanding Signed-off-by: 10sharmashivam <10sharmashivam@gmail.com> * Caching Docs Signed-off-by: 10sharmashivam <10sharmashivam@gmail.com> * Reviewed changes and suggestions applied Signed-off-by: 10sharmashivam <10sharmashivam@gmail.com> --------- Signed-off-by: 10sharmashivam <10sharmashivam@gmail.com> --- docs/user_guide/development_lifecycle/caching.md | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/docs/user_guide/development_lifecycle/caching.md b/docs/user_guide/development_lifecycle/caching.md index 7fc4237ec6..ea6a5af574 100644 --- a/docs/user_guide/development_lifecycle/caching.md +++ b/docs/user_guide/development_lifecycle/caching.md @@ -19,15 +19,23 @@ Let's watch a brief explanation of caching and a demo in this video, followed by ``` +### Input Caching + +In Flyte, input caching allows tasks to automatically cache the input data required for execution. This feature is particularly useful in scenarios where tasks may need to be re-executed, such as during retries due to failures or when manually triggered by users. By caching input data, Flyte optimizes workflow performance and resource usage, preventing unnecessary recomputation of task inputs. + +### Output Caching + +Output caching in Flyte allows users to cache the results of tasks to avoid redundant computations. This feature is especially valuable for tasks that perform expensive or time-consuming operations where the results are unlikely to change frequently. + There are four parameters and one command-line flag related to caching. ## Parameters * `cache`(`bool`): Enables or disables caching of the workflow, task, or launch plan. By default, caching is disabled to avoid unintended consequences when caching executions with side effects. -To enable caching set `cache=True`. +To enable caching, set `cache=True`. * `cache_version` (`str`): Part of the cache key. -A change to this parameter will invalidate the cache. +Changing this version number tells Flyte to ignore previous cached results and run the task again if the task's function has changed. This allows you to explicitly indicate when a change has been made to the task that should invalidate any existing cached results. Note that this is not the only change that will invalidate the cache (see below). Also, note that you can manually trigger cache invalidation per execution using the [`overwrite-cache` flag](#overwrite-cache-flag). @@ -35,7 +43,7 @@ Also, note that you can manually trigger cache invalidation per execution using When enabled, Flyte ensures that a single instance of the task is run before any other instances that would otherwise run concurrently. This allows the initial instance to cache its result and lets the later instances reuse the resulting cached outputs. Cache serialization is disabled by default. -* `cache_ignore_input_vars` (`Tuple[str, ...]`): Input variables that should not be included when calculating hash for cache. By default, no input variables are ignored. This parameter only applies to task serialization. +* `cache_ignore_input_vars` (`Tuple[str, ...]`): Input variables that Flyte should ignore when deciding if a task’s result can be reused (hash calculation). By default, no input variables are ignored. This parameter only applies to task serialization. Task caching parameters can be specified at task definition time within `@task` decorator or at task invocation time using `with_overrides` method. @@ -127,7 +135,7 @@ Task executions can be cached across different versions of the task because a ch ### How does local caching work? -The flytekit package uses the [diskcache](https://github.com/grantjenks/python-diskcache) package, specifically [diskcache.Cache](http://www.grantjenks.com/docs/diskcache/tutorial.html#cache), to aid in the memoization of task executions. The results of local task executions are stored under `~/.flyte/local-cache/` and cache keys are composed of **Cache Version**, **Task Signature**, and **Task Input Values**. +Flyte uses a tool called [diskcache](https://github.com/grantjenks/python-diskcache), specifically [diskcache.Cache](http://www.grantjenks.com/docs/diskcache/tutorial.html#cache), to save task results so they don’t need to be recomputed if the same task is executed again, a technique known as ``memoization``. The results of local task executions are stored under `~/.flyte/local-cache/` and cache keys are composed of **Cache Version**, **Task Signature**, and **Task Input Values**. Similar to the remote case, a local cache entry for a task will be invalidated if either the `cache_version` or the task signature is modified. In addition, the local cache can also be emptied by running the following command: `pyflyte local-cache clear`, which essentially obliterates the contents of the `~/.flyte/local-cache/` directory. To disable the local cache, you can set the `local.cache_enabled` config option (e.g. by setting the environment variable `FLYTE_LOCAL_CACHE_ENABLED=False`). From 8890b381823d426abab651747baeb25b8da2ad23 Mon Sep 17 00:00:00 2001 From: Amin Maghsodi Date: Wed, 23 Oct 2024 21:59:39 +0330 Subject: [PATCH 3/8] Update ray.go not to fail when going suspend state. (#5816) Signed-off-by: Amin Maghsodi Signed-off-by: Kevin Su Co-authored-by: Kevin Su --- flyteplugins/go/tasks/plugins/k8s/ray/ray.go | 4 ++++ flyteplugins/go/tasks/plugins/k8s/ray/ray_test.go | 3 ++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/flyteplugins/go/tasks/plugins/k8s/ray/ray.go b/flyteplugins/go/tasks/plugins/k8s/ray/ray.go index 90388b46a5..95a87f4efa 100644 --- a/flyteplugins/go/tasks/plugins/k8s/ray/ray.go +++ b/flyteplugins/go/tasks/plugins/k8s/ray/ray.go @@ -574,6 +574,10 @@ func (plugin rayJobResourceHandler) GetTaskPhase(ctx context.Context, pluginCont phaseInfo, err = pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion, info), nil case rayv1.JobDeploymentStatusComplete: phaseInfo, err = pluginsCore.PhaseInfoSuccess(info), nil + case rayv1.JobDeploymentStatusSuspended: + phaseInfo, err = pluginsCore.PhaseInfoQueuedWithTaskInfo(time.Now(), pluginsCore.DefaultPhaseVersion, "Suspended", info), nil + case rayv1.JobDeploymentStatusSuspending: + phaseInfo, err = pluginsCore.PhaseInfoQueuedWithTaskInfo(time.Now(), pluginsCore.DefaultPhaseVersion, "Suspending", info), nil case rayv1.JobDeploymentStatusFailed: failInfo := fmt.Sprintf("Failed to run Ray job %s with error: [%s] %s", rayJob.Name, rayJob.Status.Reason, rayJob.Status.Message) phaseInfo, err = pluginsCore.PhaseInfoFailure(flyteerr.TaskFailedWithError, failInfo, info), nil diff --git a/flyteplugins/go/tasks/plugins/k8s/ray/ray_test.go b/flyteplugins/go/tasks/plugins/k8s/ray/ray_test.go index 7b555e9f23..38b2f56785 100644 --- a/flyteplugins/go/tasks/plugins/k8s/ray/ray_test.go +++ b/flyteplugins/go/tasks/plugins/k8s/ray/ray_test.go @@ -755,7 +755,8 @@ func TestGetTaskPhase(t *testing.T) { {rayv1.JobDeploymentStatusRunning, pluginsCore.PhaseRunning, false}, {rayv1.JobDeploymentStatusComplete, pluginsCore.PhaseSuccess, false}, {rayv1.JobDeploymentStatusFailed, pluginsCore.PhasePermanentFailure, false}, - {rayv1.JobDeploymentStatusSuspended, pluginsCore.PhaseUndefined, true}, + {rayv1.JobDeploymentStatusSuspended, pluginsCore.PhaseQueued, false}, + {rayv1.JobDeploymentStatusSuspending, pluginsCore.PhaseQueued, false}, } for _, tc := range testCases { From 5065f9b2f11307a7680de1e744881d351c726822 Mon Sep 17 00:00:00 2001 From: Eduardo Apolinario <653394+eapolinario@users.noreply.github.com> Date: Wed, 23 Oct 2024 18:45:03 -0400 Subject: [PATCH 4/8] Add dependency review gh workflow (#5902) * Add dependency review gh workflow Signed-off-by: Eduardo Apolinario * Comma-separated list of licenses Signed-off-by: Eduardo Apolinario * Fix use of comment-on-pr Signed-off-by: Eduardo Apolinario --------- Signed-off-by: Eduardo Apolinario Co-authored-by: Eduardo Apolinario --- .github/workflows/dependency-review.yml | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 .github/workflows/dependency-review.yml diff --git a/.github/workflows/dependency-review.yml b/.github/workflows/dependency-review.yml new file mode 100644 index 0000000000..a9a7d2ed8d --- /dev/null +++ b/.github/workflows/dependency-review.yml @@ -0,0 +1,18 @@ +name: 'Dependency Review' +on: [pull_request] +permissions: + contents: read + pull-requests: write +jobs: + dependency-review: + runs-on: ubuntu-latest + steps: + - name: 'Checkout Repository' + uses: actions/checkout@v4 + - name: Dependency Review + uses: actions/dependency-review-action@v4 + with: + comment-summary-in-pr: on-failure + # Licenses need to come from https://spdx.org/licenses/ + deny-licenses: GPL-1.0-only, GPL-1.0-or-later, GPL-2.0-only, GPL-2.0-or-later, GPL-3.0-only, GPL-3.0-or-later + From 67858874f6c04b18a8195ef3414ffda81e9af410 Mon Sep 17 00:00:00 2001 From: Yenting Chen <42114946+DenChenn@users.noreply.github.com> Date: Thu, 24 Oct 2024 11:22:11 +0800 Subject: [PATCH 5/8] [Docs] improve contribute docs (#5862) Signed-off-by: DenChenn --- docs/community/contribute/contribute_docs.md | 63 ++++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/docs/community/contribute/contribute_docs.md b/docs/community/contribute/contribute_docs.md index 3b5d996abf..f97152032b 100644 --- a/docs/community/contribute/contribute_docs.md +++ b/docs/community/contribute/contribute_docs.md @@ -57,3 +57,66 @@ Deployment and API docs mostly use reStructured Text. For more information, see You can cross-reference multiple Python modules, functions, classes, methods, and global data in documentations. For more information, see the [Sphinx documentation](https://www.sphinx-doc.org/en/master/usage/restructuredtext/domains.html#cross-referencing-python-objects). +### Quickstart + +Flyte Documentation is primarily maintained in two locations: [flyte](https://github.com/flyteorg/flyte) and [flytesnacks](https://github.com/flyteorg/flytesnacks). + +#### Tips +The following are some tips to include various content: +* **Images** + Flyte maintain all static resources in [static-resources-repo](https://github.com/flyteorg/static-resources). + You should upload your images to this repo and open the PR, and then refer to the image in the documentation. + Notice that the image URL should be in the format `https://raw.githubusercontent.com/flyteorg/static-resources//`. +* **Source code references (Link format)**
+ `.rst` example: + ```{code-block} + .. raw:: html + + a href="https://github.com/flyteorg//blob//#L-L">View source code on GitHub + ``` + + `.md` example: + ```{code-block} + [View source code on GitHub]("https://github.com/flyteorg//blob//#L-L") + ``` +* **Source code references (Embedded format)**
+ `.rst` example: + ```{code-block} + .. rli:: https://raw.githubusercontent.com/flyteorg/// + :lines: - + ``` + + `.md` example: + ````{code-block} + ```{rli} https://raw.githubusercontent.com/flyteorg/// + lines: - + ``` + ```` + +This way, the nested code block is properly displayed without breaking the Markdown structure. + +#### Open a pull request +[This is an example PR](https://github.com/flyteorg/flyte/pull/5844) + +Each time you update your PR, it triggers the CI build, so there’s no need to build the docs locally. Flyte uses the CI process `"docs/readthedocs.org:flyte"`, which builds the documentation after each PR. +Be sure to include the following CI-build preview link in your PR description so reviewers can easily preview the changes: +```{code-block} +https://flyte--.org.readthedocs.build/en//.html +``` +The relative path is based on the `docs` directory. +For example, if the full path is `flyte/docs/user_guide/advanced_composition/chaining_flyte_entities.md`, then the relative path would be `user_guide/advanced_composition/chaining_flyte_entities` + `.html`. + +#### Important note +In the `flytesnacks` repository, most Python comments using `# xxxx` are not imported into the documentation. +You may notice some overlap between `flytesnacks` and `flyte` docs, but what is displayed primarily comes from the`flyte` repository. + +Otherwise, take care of the following points: +````{important} +* Make sure `:lines:` are aligned correctly. +* Use gitsha to specify the example code instead of using master branch or relative path, as this ensures 100% accuracy. +* Build the documentation by submitting a PR instead of building it locally. +* For `flytesnacks`, run `make fmt` before submitting the PR. +* Before uploading commits, use `git commit -s` to sign off. This step is often forgotten during the first submission. +* Run `codespell` on the modified files to check for any spelling mistakes before pushing. +* When using reference code or images, use gitsha along with GitHub raw content links. +```` From 761f122d863f39a5267ee02d66b9870b3b8bedcf Mon Sep 17 00:00:00 2001 From: Tai <139771199+taieeuu@users.noreply.github.com> Date: Thu, 24 Oct 2024 11:31:01 +0800 Subject: [PATCH 6/8] Fix CONTRIBUTING.md and update (#5873) * Fix CONTRIBUTING.md and update Signed-off-by: taieeuu * merge Signed-off-by: Future-Outlier * mrege Signed-off-by: Future-Outlier --------- Signed-off-by: taieeuu Signed-off-by: Future-Outlier Co-authored-by: Future-Outlier --- CONTRIBUTING.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index f380c33d93..62daf171f2 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -1,3 +1,15 @@ # Contributing to Flyte For information related to contributing to Flyte, please check out the [Contributing to Flyte](https://docs.flyte.org/en/latest/community/contribute/index.html) section of the documentation. + +## Recommendation Order (For Beginners) +* Setup dev environment +* Read the following and run at least 5 examples. Pay close attention to the generated outputs, the Graph view, task + logs, etc. Repeat with as many examples as you need to have an initial understanding of what an execution looks like: + * https://docs.flyte.org/en/latest/user_guide/introduction.html + * https://docs.flyte.org/en/latest/flytesnacks/userguide.html +* Finish reading the [Concepts](https://docs.flyte.org/en/latest/user_guide/concepts/main_concepts/index.html) +* Finish reading the [Control Plane](https://docs.flyte.org/en/latest/user_guide/concepts/control_plane/index.html) +* Finish reading the [Component Architecture](https://docs.flyte.org/en/latest/user_guide/concepts/component_architecture/index.html) +* Choose 2 good first issues from the following and start solving them with the knowledge you have read. +* Familiar with using [ImageSpec to push images to localhost for development](https://docs.flyte.org/en/latest/user_guide/customizing_dependencies/imagespec.html#image-spec-example) From ffd72a09cb09079beccec5908ebd45b430ff81c5 Mon Sep 17 00:00:00 2001 From: Jason Parraga Date: Thu, 24 Oct 2024 06:31:43 -0700 Subject: [PATCH 7/8] Add pod template support for init containers (#5750) * Add pod template support for init containers Signed-off-by: Jason Parraga * Update docs Signed-off-by: Jason Parraga * fix comment Signed-off-by: Jason Parraga --------- Signed-off-by: Jason Parraga --- docs/deployment/configuration/general.rst | 3 + .../tasks/pluginmachinery/flytek8s/copilot.go | 16 +++-- .../pluginmachinery/flytek8s/copilot_test.go | 24 +++++-- .../pluginmachinery/flytek8s/pod_helper.go | 64 +++++++++++++++-- .../flytek8s/pod_helper_test.go | 70 +++++++++++++++++-- 5 files changed, 152 insertions(+), 25 deletions(-) diff --git a/docs/deployment/configuration/general.rst b/docs/deployment/configuration/general.rst index 0b97f8b8ff..5db7786c9a 100644 --- a/docs/deployment/configuration/general.rst +++ b/docs/deployment/configuration/general.rst @@ -128,6 +128,9 @@ as the base container configuration for all primary containers. If both containe names exist in the default PodTemplate, Flyte first applies the default configuration, followed by the primary configuration. +Note: Init containers can be configured with similar granularity using "default-init" +and "primary-init" init container names. + The ``containers`` field is required in each k8s PodSpec. If no default configuration is desired, specifying a container with a name other than "default" or "primary" (for example, "noop") is considered best practice. Since Flyte only diff --git a/flyteplugins/go/tasks/pluginmachinery/flytek8s/copilot.go b/flyteplugins/go/tasks/pluginmachinery/flytek8s/copilot.go index 8e89e58d3d..eaee5bce6c 100644 --- a/flyteplugins/go/tasks/pluginmachinery/flytek8s/copilot.go +++ b/flyteplugins/go/tasks/pluginmachinery/flytek8s/copilot.go @@ -201,14 +201,15 @@ func AddCoPilotToContainer(ctx context.Context, cfg config.FlyteCoPilotConfig, c return nil } -func AddCoPilotToPod(ctx context.Context, cfg config.FlyteCoPilotConfig, coPilotPod *v1.PodSpec, iFace *core.TypedInterface, taskExecMetadata core2.TaskExecutionMetadata, inputPaths io.InputFilePaths, outputPaths io.OutputFilePaths, pilot *core.DataLoadingConfig) error { +func AddCoPilotToPod(ctx context.Context, cfg config.FlyteCoPilotConfig, coPilotPod *v1.PodSpec, iFace *core.TypedInterface, taskExecMetadata core2.TaskExecutionMetadata, inputPaths io.InputFilePaths, outputPaths io.OutputFilePaths, pilot *core.DataLoadingConfig) (string, error) { if pilot == nil || !pilot.Enabled { - return nil + return "", nil } logger.Infof(ctx, "CoPilot Enabled for task [%s]", taskExecMetadata.GetTaskExecutionID().GetID().TaskId.Name) shareProcessNamespaceEnabled := true coPilotPod.ShareProcessNamespace = &shareProcessNamespaceEnabled + primaryInitContainerName := "" if iFace != nil { if iFace.Inputs != nil && len(iFace.Inputs.Variables) > 0 { inPath := cfg.DefaultInputDataPath @@ -231,13 +232,14 @@ func AddCoPilotToPod(ctx context.Context, cfg config.FlyteCoPilotConfig, coPilot // Lets add the Inputs init container args, err := DownloadCommandArgs(inputPaths.GetInputPath(), outputPaths.GetOutputPrefixPath(), inPath, format, iFace.Inputs) if err != nil { - return err + return primaryInitContainerName, err } downloader, err := FlyteCoPilotContainer(flyteInitContainerName, cfg, args, inputsVolumeMount) if err != nil { - return err + return primaryInitContainerName, err } coPilotPod.InitContainers = append(coPilotPod.InitContainers, downloader) + primaryInitContainerName = downloader.Name } if iFace.Outputs != nil && len(iFace.Outputs.Variables) > 0 { @@ -260,15 +262,15 @@ func AddCoPilotToPod(ctx context.Context, cfg config.FlyteCoPilotConfig, coPilot // Lets add the Inputs init container args, err := SidecarCommandArgs(outPath, outputPaths.GetOutputPrefixPath(), outputPaths.GetRawOutputPrefix(), cfg.StartTimeout.Duration, iFace) if err != nil { - return err + return primaryInitContainerName, err } sidecar, err := FlyteCoPilotContainer(flyteSidecarContainerName, cfg, args, outputsVolumeMount) if err != nil { - return err + return primaryInitContainerName, err } coPilotPod.Containers = append(coPilotPod.Containers, sidecar) } } - return nil + return primaryInitContainerName, nil } diff --git a/flyteplugins/go/tasks/pluginmachinery/flytek8s/copilot_test.go b/flyteplugins/go/tasks/pluginmachinery/flytek8s/copilot_test.go index 09a9fbf52b..aba18c85ac 100644 --- a/flyteplugins/go/tasks/pluginmachinery/flytek8s/copilot_test.go +++ b/flyteplugins/go/tasks/pluginmachinery/flytek8s/copilot_test.go @@ -533,7 +533,9 @@ func TestAddCoPilotToPod(t *testing.T) { InputPath: "in", OutputPath: "out", } - assert.NoError(t, AddCoPilotToPod(ctx, cfg, &pod, iface, taskMetadata, inputPaths, opath, pilot)) + primaryInitContainerName, err := AddCoPilotToPod(ctx, cfg, &pod, iface, taskMetadata, inputPaths, opath, pilot) + assert.NoError(t, err) + assert.Equal(t, "test-downloader", primaryInitContainerName) assertPodHasSNPS(t, &pod) assertPodHasCoPilot(t, cfg, pilot, iface, &pod) }) @@ -545,7 +547,9 @@ func TestAddCoPilotToPod(t *testing.T) { InputPath: "in", OutputPath: "out", } - assert.NoError(t, AddCoPilotToPod(ctx, cfg, &pod, nil, taskMetadata, inputPaths, opath, pilot)) + primaryInitContainerName, err := AddCoPilotToPod(ctx, cfg, &pod, nil, taskMetadata, inputPaths, opath, pilot) + assert.NoError(t, err) + assert.Empty(t, primaryInitContainerName) assertPodHasSNPS(t, &pod) assertPodHasCoPilot(t, cfg, pilot, nil, &pod) }) @@ -565,7 +569,9 @@ func TestAddCoPilotToPod(t *testing.T) { InputPath: "in", OutputPath: "out", } - assert.NoError(t, AddCoPilotToPod(ctx, cfg, &pod, iface, taskMetadata, inputPaths, opath, pilot)) + primaryInitContainerName, err := AddCoPilotToPod(ctx, cfg, &pod, iface, taskMetadata, inputPaths, opath, pilot) + assert.NoError(t, err) + assert.Equal(t, "test-downloader", primaryInitContainerName) assertPodHasSNPS(t, &pod) assertPodHasCoPilot(t, cfg, pilot, iface, &pod) }) @@ -584,7 +590,9 @@ func TestAddCoPilotToPod(t *testing.T) { InputPath: "in", OutputPath: "out", } - assert.NoError(t, AddCoPilotToPod(ctx, cfg, &pod, iface, taskMetadata, inputPaths, opath, pilot)) + primaryInitContainerName, err := AddCoPilotToPod(ctx, cfg, &pod, iface, taskMetadata, inputPaths, opath, pilot) + assert.NoError(t, err) + assert.Empty(t, primaryInitContainerName) assertPodHasSNPS(t, &pod) assertPodHasCoPilot(t, cfg, pilot, iface, &pod) }) @@ -603,11 +611,15 @@ func TestAddCoPilotToPod(t *testing.T) { InputPath: "in", OutputPath: "out", } - assert.NoError(t, AddCoPilotToPod(ctx, cfg, &pod, iface, taskMetadata, inputPaths, opath, pilot)) + primaryInitContainerName, err := AddCoPilotToPod(ctx, cfg, &pod, iface, taskMetadata, inputPaths, opath, pilot) + assert.NoError(t, err) + assert.Empty(t, primaryInitContainerName) assert.Len(t, pod.Volumes, 0) }) t.Run("nil", func(t *testing.T) { - assert.NoError(t, AddCoPilotToPod(ctx, cfg, nil, nil, taskMetadata, inputPaths, opath, nil)) + primaryInitContainerName, err := AddCoPilotToPod(ctx, cfg, nil, nil, taskMetadata, inputPaths, opath, nil) + assert.NoError(t, err) + assert.Empty(t, primaryInitContainerName) }) } diff --git a/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go b/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go index e8252090df..229f963968 100644 --- a/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go +++ b/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go @@ -28,7 +28,9 @@ const PrimaryContainerNotFound = "PrimaryContainerNotFound" const SIGKILL = 137 const defaultContainerTemplateName = "default" +const defaultInitContainerTemplateName = "default-init" const primaryContainerTemplateName = "primary" +const primaryInitContainerTemplateName = "primary-init" const PrimaryContainerKey = "primary_container_name" // AddRequiredNodeSelectorRequirements adds the provided v1.NodeSelectorRequirement @@ -387,14 +389,17 @@ func ApplyFlytePodConfiguration(ctx context.Context, tCtx pluginsCore.TaskExecut dataLoadingConfig = pod.GetDataConfig() } + primaryInitContainerName := "" + if dataLoadingConfig != nil { if err := AddCoPilotToContainer(ctx, config.GetK8sPluginConfig().CoPilot, primaryContainer, taskTemplate.Interface, dataLoadingConfig); err != nil { return nil, nil, err } - if err := AddCoPilotToPod(ctx, config.GetK8sPluginConfig().CoPilot, podSpec, taskTemplate.GetInterface(), - tCtx.TaskExecutionMetadata(), tCtx.InputReader(), tCtx.OutputWriter(), dataLoadingConfig); err != nil { + primaryInitContainerName, err = AddCoPilotToPod(ctx, config.GetK8sPluginConfig().CoPilot, podSpec, taskTemplate.GetInterface(), + tCtx.TaskExecutionMetadata(), tCtx.InputReader(), tCtx.OutputWriter(), dataLoadingConfig) + if err != nil { return nil, nil, err } } @@ -406,7 +411,7 @@ func ApplyFlytePodConfiguration(ctx context.Context, tCtx pluginsCore.TaskExecut } // merge PodSpec and ObjectMeta with configuration pod template (if exists) - podSpec, objectMeta, err = MergeWithBasePodTemplate(ctx, tCtx, podSpec, objectMeta, primaryContainerName) + podSpec, objectMeta, err = MergeWithBasePodTemplate(ctx, tCtx, podSpec, objectMeta, primaryContainerName, primaryInitContainerName) if err != nil { return nil, nil, err } @@ -495,7 +500,7 @@ func getBasePodTemplate(ctx context.Context, tCtx pluginsCore.TaskExecutionConte // MergeWithBasePodTemplate attempts to merge the provided PodSpec and ObjectMeta with the configuration PodTemplate for // this task. func MergeWithBasePodTemplate(ctx context.Context, tCtx pluginsCore.TaskExecutionContext, - podSpec *v1.PodSpec, objectMeta *metav1.ObjectMeta, primaryContainerName string) (*v1.PodSpec, *metav1.ObjectMeta, error) { + podSpec *v1.PodSpec, objectMeta *metav1.ObjectMeta, primaryContainerName string, primaryInitContainerName string) (*v1.PodSpec, *metav1.ObjectMeta, error) { // attempt to retrieve base PodTemplate podTemplate, err := getBasePodTemplate(ctx, tCtx, DefaultPodTemplateStore) @@ -507,7 +512,7 @@ func MergeWithBasePodTemplate(ctx context.Context, tCtx pluginsCore.TaskExecutio } // merge podSpec with podTemplate - mergedPodSpec, err := mergePodSpecs(&podTemplate.Template.Spec, podSpec, primaryContainerName) + mergedPodSpec, err := mergePodSpecs(&podTemplate.Template.Spec, podSpec, primaryContainerName, primaryInitContainerName) if err != nil { return nil, nil, err } @@ -524,7 +529,7 @@ func MergeWithBasePodTemplate(ctx context.Context, tCtx pluginsCore.TaskExecutio // mergePodSpecs merges the two provided PodSpecs. This process uses the first as the base configuration, where values // set by the first PodSpec are overwritten by the second in the return value. Additionally, this function applies // container-level configuration from the basePodSpec. -func mergePodSpecs(basePodSpec *v1.PodSpec, podSpec *v1.PodSpec, primaryContainerName string) (*v1.PodSpec, error) { +func mergePodSpecs(basePodSpec *v1.PodSpec, podSpec *v1.PodSpec, primaryContainerName string, primaryInitContainerName string) (*v1.PodSpec, error) { if basePodSpec == nil || podSpec == nil { return nil, errors.New("neither the basePodSpec or the podSpec can be nil") } @@ -539,6 +544,16 @@ func mergePodSpecs(basePodSpec *v1.PodSpec, podSpec *v1.PodSpec, primaryContaine } } + // extract defaultInitContainerTemplate and primaryInitContainerTemplate + var defaultInitContainerTemplate, primaryInitContainerTemplate *v1.Container + for i := 0; i < len(basePodSpec.InitContainers); i++ { + if basePodSpec.InitContainers[i].Name == defaultInitContainerTemplateName { + defaultInitContainerTemplate = &basePodSpec.InitContainers[i] + } else if basePodSpec.InitContainers[i].Name == primaryInitContainerTemplateName { + primaryInitContainerTemplate = &basePodSpec.InitContainers[i] + } + } + // merge PodTemplate PodSpec with podSpec var mergedPodSpec *v1.PodSpec = basePodSpec.DeepCopy() if err := mergo.Merge(mergedPodSpec, podSpec, mergo.WithOverride, mergo.WithAppendSlice); err != nil { @@ -580,6 +595,43 @@ func mergePodSpecs(basePodSpec *v1.PodSpec, podSpec *v1.PodSpec, primaryContaine } mergedPodSpec.Containers = mergedContainers + + // merge PodTemplate init containers + var mergedInitContainers []v1.Container + for _, initContainer := range podSpec.InitContainers { + // if applicable start with defaultContainerTemplate + var mergedInitContainer *v1.Container + if defaultInitContainerTemplate != nil { + mergedInitContainer = defaultInitContainerTemplate.DeepCopy() + } + + // if applicable merge with primaryInitContainerTemplate + if initContainer.Name == primaryInitContainerName && primaryInitContainerTemplate != nil { + if mergedInitContainer == nil { + mergedInitContainer = primaryInitContainerTemplate.DeepCopy() + } else { + err := mergo.Merge(mergedInitContainer, primaryInitContainerTemplate, mergo.WithOverride, mergo.WithAppendSlice) + if err != nil { + return nil, err + } + } + } + + // if applicable merge with existing init initContainer + if mergedInitContainer == nil { + mergedInitContainers = append(mergedInitContainers, initContainer) + } else { + err := mergo.Merge(mergedInitContainer, initContainer, mergo.WithOverride, mergo.WithAppendSlice) + if err != nil { + return nil, err + } + + mergedInitContainers = append(mergedInitContainers, *mergedInitContainer) + } + } + + mergedPodSpec.InitContainers = mergedInitContainers + return mergedPodSpec, nil } diff --git a/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go b/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go index 0c2e9ef5cc..9797b5e05b 100644 --- a/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go +++ b/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go @@ -1934,6 +1934,14 @@ func TestMergeWithBasePodTemplate(t *testing.T) { Name: "bar", }, }, + InitContainers: []v1.Container{ + v1.Container{ + Name: "foo-init", + }, + v1.Container{ + Name: "foo-bar", + }, + }, } objectMeta := metav1.ObjectMeta{ @@ -1954,7 +1962,7 @@ func TestMergeWithBasePodTemplate(t *testing.T) { tCtx.OnTaskExecutionMetadata().Return(dummyTaskExecutionMetadata(&v1.ResourceRequirements{}, nil, "")) tCtx.OnTaskReader().Return(taskReader) - resultPodSpec, resultObjectMeta, err := MergeWithBasePodTemplate(context.TODO(), tCtx, &podSpec, &objectMeta, "foo") + resultPodSpec, resultObjectMeta, err := MergeWithBasePodTemplate(context.TODO(), tCtx, &podSpec, &objectMeta, "foo", "foo-init") assert.Nil(t, err) assert.True(t, reflect.DeepEqual(podSpec, *resultPodSpec)) assert.True(t, reflect.DeepEqual(objectMeta, *resultObjectMeta)) @@ -1966,6 +1974,11 @@ func TestMergeWithBasePodTemplate(t *testing.T) { TerminationMessagePath: "/dev/primary-termination-log", } + primaryInitContainerTemplate := v1.Container{ + Name: primaryInitContainerTemplateName, + TerminationMessagePath: "/dev/primary-init-termination-log", + } + podTemplate := v1.PodTemplate{ ObjectMeta: metav1.ObjectMeta{ Name: "fooTemplate", @@ -1982,6 +1995,9 @@ func TestMergeWithBasePodTemplate(t *testing.T) { Containers: []v1.Container{ primaryContainerTemplate, }, + InitContainers: []v1.Container{ + primaryInitContainerTemplate, + }, }, }, } @@ -2008,13 +2024,16 @@ func TestMergeWithBasePodTemplate(t *testing.T) { tCtx.OnTaskExecutionMetadata().Return(dummyTaskExecutionMetadata(&v1.ResourceRequirements{}, nil, "")) tCtx.OnTaskReader().Return(taskReader) - resultPodSpec, resultObjectMeta, err := MergeWithBasePodTemplate(context.TODO(), tCtx, &podSpec, &objectMeta, "foo") + resultPodSpec, resultObjectMeta, err := MergeWithBasePodTemplate(context.TODO(), tCtx, &podSpec, &objectMeta, "foo", "foo-init") assert.Nil(t, err) // test that template podSpec is merged primaryContainer := resultPodSpec.Containers[0] assert.Equal(t, podSpec.Containers[0].Name, primaryContainer.Name) assert.Equal(t, primaryContainerTemplate.TerminationMessagePath, primaryContainer.TerminationMessagePath) + primaryInitContainer := resultPodSpec.InitContainers[0] + assert.Equal(t, podSpec.InitContainers[0].Name, primaryInitContainer.Name) + assert.Equal(t, primaryInitContainerTemplate.TerminationMessagePath, primaryInitContainer.TerminationMessagePath) // test that template object metadata is copied assert.Contains(t, resultObjectMeta.Labels, "fooKey") @@ -2027,13 +2046,13 @@ func TestMergeWithBasePodTemplate(t *testing.T) { func TestMergePodSpecs(t *testing.T) { var priority int32 = 1 - podSpec1, _ := mergePodSpecs(nil, nil, "foo") + podSpec1, _ := mergePodSpecs(nil, nil, "foo", "foo-init") assert.Nil(t, podSpec1) - podSpec2, _ := mergePodSpecs(&v1.PodSpec{}, nil, "foo") + podSpec2, _ := mergePodSpecs(&v1.PodSpec{}, nil, "foo", "foo-init") assert.Nil(t, podSpec2) - podSpec3, _ := mergePodSpecs(nil, &v1.PodSpec{}, "foo") + podSpec3, _ := mergePodSpecs(nil, &v1.PodSpec{}, "foo", "foo-init") assert.Nil(t, podSpec3) podSpec := v1.PodSpec{ @@ -2051,6 +2070,20 @@ func TestMergePodSpecs(t *testing.T) { Name: "bar", }, }, + InitContainers: []v1.Container{ + v1.Container{ + Name: "primary-init", + VolumeMounts: []v1.VolumeMount{ + { + Name: "nccl", + MountPath: "abc", + }, + }, + }, + v1.Container{ + Name: "bar-init", + }, + }, NodeSelector: map[string]string{ "baz": "bar", }, @@ -2076,11 +2109,25 @@ func TestMergePodSpecs(t *testing.T) { TerminationMessagePath: "/dev/primary-termination-log", } + defaultInitContainerTemplate := v1.Container{ + Name: defaultInitContainerTemplateName, + TerminationMessagePath: "/dev/default-init-termination-log", + } + + primaryInitContainerTemplate := v1.Container{ + Name: primaryInitContainerTemplateName, + TerminationMessagePath: "/dev/primary-init-termination-log", + } + podTemplateSpec := v1.PodSpec{ Containers: []v1.Container{ defaultContainerTemplate, primaryContainerTemplate, }, + InitContainers: []v1.Container{ + defaultInitContainerTemplate, + primaryInitContainerTemplate, + }, HostNetwork: true, NodeSelector: map[string]string{ "foo": "bar", @@ -2093,7 +2140,7 @@ func TestMergePodSpecs(t *testing.T) { }, } - mergedPodSpec, err := mergePodSpecs(&podTemplateSpec, &podSpec, "primary") + mergedPodSpec, err := mergePodSpecs(&podTemplateSpec, &podSpec, "primary", "primary-init") assert.Nil(t, err) // validate a PodTemplate-only field @@ -2117,6 +2164,17 @@ func TestMergePodSpecs(t *testing.T) { defaultContainer := mergedPodSpec.Containers[1] assert.Equal(t, podSpec.Containers[1].Name, defaultContainer.Name) assert.Equal(t, defaultContainerTemplate.TerminationMessagePath, defaultContainer.TerminationMessagePath) + + // validate primary init container + primaryInitContainer := mergedPodSpec.InitContainers[0] + assert.Equal(t, podSpec.InitContainers[0].Name, primaryInitContainer.Name) + assert.Equal(t, primaryInitContainerTemplate.TerminationMessagePath, primaryInitContainer.TerminationMessagePath) + assert.Equal(t, 1, len(primaryInitContainer.VolumeMounts)) + + // validate default init container + defaultInitContainer := mergedPodSpec.InitContainers[1] + assert.Equal(t, podSpec.InitContainers[1].Name, defaultInitContainer.Name) + assert.Equal(t, defaultInitContainerTemplate.TerminationMessagePath, defaultInitContainer.TerminationMessagePath) } func TestAddFlyteCustomizationsToContainer_SetConsoleUrl(t *testing.T) { From 7f92953f1221431faa1f77a782b84472f0588bd8 Mon Sep 17 00:00:00 2001 From: David Espejo <82604841+davidmirror-ops@users.noreply.github.com> Date: Thu, 24 Oct 2024 16:46:11 -0500 Subject: [PATCH 8/8] Update monitoring docs (#5903) * Update refs to public dashboards and instructions Signed-off-by: davidmirror-ops * Fix group tab error v1 Signed-off-by: davidmirror-ops * Apply review suggestions Signed-off-by: davidmirror-ops --------- Signed-off-by: davidmirror-ops --- docs/deployment/configuration/monitoring.rst | 102 ++++++++++++------- 1 file changed, 65 insertions(+), 37 deletions(-) diff --git a/docs/deployment/configuration/monitoring.rst b/docs/deployment/configuration/monitoring.rst index 48239288f4..449c147754 100644 --- a/docs/deployment/configuration/monitoring.rst +++ b/docs/deployment/configuration/monitoring.rst @@ -5,7 +5,7 @@ Monitoring .. tags:: Infrastructure, Advanced -.. tip:: The Flyte core team publishes and maintains Grafana dashboards built using Prometheus data sources, which can be found `here `__. +.. tip:: The Flyte core team publishes and maintains Grafana dashboards built using Prometheus data sources. You can import them to your Grafana instance from the `Grafana marketplace `__. Metrics for Executions ====================== @@ -87,53 +87,81 @@ Flyte Backend is written in Golang and exposes stats using Prometheus. The stats Both ``flyteadmin`` and ``flytepropeller`` are instrumented to expose metrics. To visualize these metrics, Flyte provides three Grafana dashboards, each with a different focus: -- **User-facing dashboards**: Dashboards that can be used to triage/investigate/observe performance and characteristics of workflows and tasks. - The user-facing dashboard is published under ID `13980 `__ in the Grafana marketplace. +- **User-facing dashboard**: it can be used to investigate performance and characteristics of workflow and task executions. It's published under ID `22146 `__ in the Grafana marketplace. - **System Dashboards**: Dashboards that are useful for the system maintainer to investigate the status and performance of their Flyte deployments. These are further divided into: - - `DataPlane/FlytePropeller `__: execution engine status and performance. - - `ControlPlane/Flyteadmin `__: API-level monitoring. + - Data plane (``flytepropeller``): `21719 `__: execution engine status and performance. + - Control plane (``flyteadmin``): `21720 `__: API-level monitoring. -The corresponding JSON files for each dashboard are also located at ``deployment/stats/prometheus``. +The corresponding JSON files for each dashboard are also located in the ``flyte`` repository at `deployment/stats/prometheus `__. .. note:: The dashboards are basic dashboards and do not include all the metrics exposed by Flyte. Feel free to use the scripts provided `here `__ to improve and -hopefully- contribute the improved dashboards. -How to use the dashboards -~~~~~~~~~~~~~~~~~~~~~~~~~ - -1. We recommend installing and configuring the Prometheus operator as described in `their docs `__. -This is especially true if you plan to use the Service Monitors provided by the `flyte-core `__ Helm chart. - -2. Enable the Prometheus instance to use Service Monitors in the namespace where Flyte is running, configuring the following keys in the ``prometheus`` resource: - -.. code-block:: yaml - - spec: - serviceMonitorSelector: {} - serviceMonitorNamespaceSelector: {} - -.. note:: - - The above example configuration lets Prometheus use any ``ServiceMonitor`` in any namespace in the cluster. Adjust the configuration to reduce the scope if needed. - -3. Once you have installed and configured the Prometheus operator, enable the Service Monitors in the Helm chart by configuring the following keys in your ``values`` file: - -.. code-block:: yaml - - flyteadmin: - serviceMonitor: - enabled: true - - flytepropeller: - serviceMonitor: - enabled: true - +Setup instructions +~~~~~~~~~~~~~~~~~~ + +The dashboards rely on a working Prometheus deployment with access to your Kubernetes cluster and Flyte pods. +Additionally, the user dashboard uses metrics that come from ``kube-state-metrics``. Both of these requirements can be fulfilled by installing the `kube-prometheus-stack `__. + +Once the prerequisites are in place, follow the instructions in this section to configure metrics scraping for the corresponding Helm chart: + +.. tabs:: + + .. group-tab:: flyte-core + + Save the following in a ``flyte-monitoring-overrides.yaml`` file and run a ``helm upgrade`` operation pointing to that ``--values`` file: + + .. code-block:: yaml + + flyteadmin: + serviceMonitor: + enabled: true + labels: + release: kube-prometheus-stack #This is particular to the kube-prometheus-stacl + selectorLabels: + - app.kubernetes.io/name: flyteadmin + flytepropeller: + serviceMonitor: + enabled: true + labels: + release: kube-prometheus-stack + selectorLabels: + - app.kubernetes.io/name: flytepropeller + service: + enabled: true + + The above configuration enables the ``serviceMonitor`` that Prometheus can then use to automatically discover services and scrape metrics from them. + + .. group-tab:: flyte-binary + + Save the following in a ``flyte-monitoring-overrides.yaml`` file and run a ``helm upgrade`` operation pointing to that ``--values`` file: + + .. code-block:: yaml + + configuration: + inline: + propeller: + prof-port: 10254 + metrics-prefix: "flyte:" + scheduler: + profilerPort: 10254 + metricsScope: "flyte:" + flyteadmin: + profilerPort: 10254 + service: + extraPorts: + - name: http-metrics + protocol: TCP + port: 10254 + + The above configuration enables the ``serviceMonitor`` that Prometheus can then use to automatically discover services and scrape metrics from them. + .. note:: By default, the ``ServiceMonitor`` is configured with a ``scrapeTimeout`` of 30s and ``interval`` of 60s. You can customize these values if needed. -With the above configuration in place you should be able to import the dashboards in your Grafana instance. +With the above configuration completed, you should be able to import the dashboards in your Grafana instance.