Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Allow checkpoint resume when recovering a workflow #486

Merged
merged 11 commits into from
Oct 6, 2022

Conversation

andrewwdye
Copy link
Contributor

@andrewwdye andrewwdye commented Sep 25, 2022

TL;DR

This changes propagates task level checkpoint info between failed and recovered node executions. Previously intra task checkpointing was only supported for task level retries within a single node execution.

Related PR: flyteorg/flyteadmin#479

NOTE: cannot merge this until #467 is resolved

Type

  • Bug Fix
  • Feature
  • Plugin

Are all requirements met?

  • Code completed
  • Smoke tested
  • Unit tests added
  • Code documentation added
  • Any pending items have an associated Issue

Complete description

This change

  • Saves checkpoint path from successful or failed task nodes in TaskNodeMetadata
  • Sends to flyteadmin as part of the NodeExecutionEvent, to be stored in the db
  • When attempting to recover a node execution, read the checkpoint path from NodeExeuction.Closure and store in the ExecutableNodeStatus (persisted to the CRD) so that it's available for later phase processing
  • Provide this previous checkpoint path to the task on attempt 0, else continue passing path from attempt N-1 in the current node execution

Testing

Added various unit tests

Verified with a local version of Flyte, following setup steps here.

  1. Run a task indefinitely that checkpoints every 10s (see code below)
  2. Use kubectl to kill the pod (simulate infra failure)
  3. Recover the workflow
  4. On recovery, the task will find a checkpoint and exit successfully
import logging
import time

from flytekit import current_context, task, workflow

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__file__)


@task
def t():
    cp = current_context().checkpoint
    prev = cp.read()
    if prev:
        logger.info(f"Recovering from iteration {prev.decode()}")
        return
    iterations = 0
    while True:
        time.sleep(10)
        iterations += 1
        logger.info(iterations)
        cp.write(f"{iterations}".encode())


@workflow
def wf():
    t()

Tracking Issue

flyteorg/flyte#2254

Follow-up issue

Uncovered a few issues along the way
flyteorg/flyte#2894
flyteorg/flytesnacks#894 (PR)
flyteorg/flytekit#1189 (PR)

flyte-bot and others added 7 commits September 9, 2022 19:46
@codecov
Copy link

codecov bot commented Sep 25, 2022

Codecov Report

Merging #486 (dd86a60) into master (560bb1b) will decrease coverage by 0.03%.
The diff coverage is 53.33%.

Signed-off-by: Andrew Dye <[email protected]>
…into node-execution-checkpoints

Signed-off-by: Andrew Dye <[email protected]>
Signed-off-by: Andrew Dye <[email protected]>
@andrewwdye andrewwdye marked this pull request as ready for review October 5, 2022 18:18
@hamersaw hamersaw merged commit a9b831b into flyteorg:master Oct 6, 2022
eapolinario pushed a commit to eapolinario/flytepropeller that referenced this pull request Aug 9, 2023
* Update flyteidl version

Signed-off-by: Flyte-Bot <[email protected]>

* Update flyteidl version

Signed-off-by: Flyte-Bot <[email protected]>

* Fix build break

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* Update flyteidl version

Signed-off-by: Flyte-Bot <[email protected]>

* Save/restore CheckpointUri from NodeExecution

Signed-off-by: Andrew Dye <[email protected]>

* Lints, generate

Signed-off-by: Andrew Dye <[email protected]>

* Fix log line

Signed-off-by: Andrew Dye <[email protected]>

Signed-off-by: Flyte-Bot <[email protected]>
Signed-off-by: Haytham Abuelfutuh <[email protected]>
Signed-off-by: Andrew Dye <[email protected]>
Co-authored-by: flyte-bot <[email protected]>
Co-authored-by: Haytham Abuelfutuh <[email protected]>
Co-authored-by: Dan Rammer <[email protected]>
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants