Skip to content

Commit

Permalink
add cli command to migrate job runs from one repo to another
Browse files Browse the repository at this point in the history
  • Loading branch information
prha committed Aug 15, 2022
1 parent 6239194 commit dd35a38
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 2 deletions.
70 changes: 69 additions & 1 deletion python_modules/dagster/dagster/_cli/run.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import click
from tqdm import tqdm

from dagster._core.instance import DagsterInstance

Expand All @@ -16,7 +17,7 @@ def run_list_command(limit):
with DagsterInstance.get() as instance:
for run in instance.get_runs(limit=limit):
click.echo("Run: {}".format(run.run_id))
click.echo(" Pipeline: {}".format(run.pipeline_name))
click.echo(" Job: {}".format(run.pipeline_name))


@run_cli.command(
Expand Down Expand Up @@ -70,3 +71,70 @@ def run_wipe_command(force):
click.echo("Deleted all run history and event logs.")
else:
raise click.ClickException("Exiting without deleting all run history and event logs.")


@run_cli.command(name="migrate", help="Migrate the run history from one repository to another.")
@click.option(
"--from",
"-f",
"from_label",
help="The repository from which to migrate (format: <repository_name>@<location_name>)",
prompt_required=True,
)
@click.option(
"--to",
"-t",
"to_label",
help="The repository name to migrate to (format: <repository_name>@<location_name>)",
prompt_required=True,
)
@click.argument("job_name")
def run_migrate_command(job_name, from_label, to_label):
from dagster._core.storage.pipeline_run import RunsFilter
from dagster._core.storage.runs.schema import RunTagsTable
from dagster._core.storage.runs.sql_run_storage import SqlRunStorage
from dagster._core.storage.tags import REPOSITORY_LABEL_TAG

if not from_label or not to_label:
raise click.UsageError("Please specify both a --from and --to repository label")

if not is_valid_repo_label(from_label) or not is_valid_repo_label(to_label):
raise click.UsageError(
"`--from` and `--to` arguments must be of the format: <repository_name>@<location_name>"
)

with DagsterInstance.get() as instance:
records = instance.get_run_records(
filters=RunsFilter(job_name=job_name, tags={REPOSITORY_LABEL_TAG: from_label})
)

if not records:
click.echo(f"No runs found for {job_name} in {from_label}.")
return

if not isinstance(instance.run_storage, SqlRunStorage):
return

count = len(records)
confirmation = click.prompt(
f"Are you sure you want to migrate the run history for {job_name} ({count} runs)? Type MIGRATE"
)
should_migrate = confirmation == "MIGRATE"

if should_migrate:
for record in tqdm(records):
with instance.run_storage.connect() as conn:
conn.execute(
RunTagsTable.update() # pylint: disable=no-value-for-parameter
.where(RunTagsTable.c.run_id == record.pipeline_run.run_id)
.where(RunTagsTable.c.key == REPOSITORY_LABEL_TAG)
.values(value=to_label)
)
click.echo(f"Migrated the run history for {job_name} from {from_label} to {to_label}.")
else:
raise click.ClickException("Exiting without migrating.")


def is_valid_repo_label(label):
parts = label.split("@")
return len(parts) == 2 and len(parts[0]) > 0 and len(parts[1]) > 0
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@
)
from dagster._cli import ENV_PREFIX, cli
from dagster._cli.job import job_execute_command
from dagster._cli.run import run_delete_command, run_list_command, run_wipe_command
from dagster._cli.run import (
run_delete_command,
run_list_command,
run_migrate_command,
run_wipe_command,
)
from dagster._core.definitions.decorators.sensor_decorator import sensor
from dagster._core.definitions.partition import PartitionedConfig, StaticPartitionsDefinition
from dagster._core.definitions.sensor_definition import RunRequest
Expand Down Expand Up @@ -897,3 +902,57 @@ def test_use_env_vars_for_cli_option():
result = runner.invoke(cli, ["debug"], auto_envvar_prefix=ENV_PREFIX)
assert __version__ in result.output
assert result.exit_code == 0


def create_repo_run(instance):
from dagster._core.test_utils import create_run_for_test
from dagster._core.workspace import WorkspaceProcessContext
from dagster._core.workspace.load_target import PythonFileTarget

with WorkspaceProcessContext(
instance,
PythonFileTarget(
python_file=file_relative_path(__file__, "repo_pipeline_and_job.py"),
attribute=None,
working_directory=os.path.dirname(__file__),
location_name=None,
),
) as workspace_process_context:
context = workspace_process_context.create_request_context()
repo = context.repository_locations[0].get_repository("my_repo")
external_pipeline = repo.get_full_external_pipeline("my_job")
run = create_run_for_test(
instance,
pipeline_name="my_job",
external_pipeline_origin=external_pipeline.get_external_origin(),
pipeline_code_origin=external_pipeline.get_python_origin(),
)
instance.launch_run(run.run_id, context)
return run


def get_repo_runs(instance, repo_label):
from dagster._core.storage.pipeline_run import RunsFilter
from dagster._core.storage.tags import REPOSITORY_LABEL_TAG

return instance.get_runs(filters=RunsFilter(tags={REPOSITORY_LABEL_TAG: repo_label}))


def test_run_migrate_command():
with instance_for_test() as instance:
create_repo_run(instance)
old_repo_label = "my_repo@repo_pipeline_and_job.py"
new_repo_label = "my_new_repo@lalalala"

assert len(get_repo_runs(instance, old_repo_label)) == 1
assert len(get_repo_runs(instance, new_repo_label)) == 0

runner = CliRunner()
runner.invoke(
run_migrate_command,
args=["my_job", "--from", old_repo_label, "--to", new_repo_label],
input="MIGRATE",
)

assert len(get_repo_runs(instance, old_repo_label)) == 0
assert len(get_repo_runs(instance, new_repo_label)) == 1

0 comments on commit dd35a38

Please sign in to comment.