Skip to content

Commit

Permalink
extract migration into sql run storage
Browse files Browse the repository at this point in the history
  • Loading branch information
prha committed Aug 25, 2022
1 parent 25202a6 commit d48096e
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 31 deletions.
54 changes: 31 additions & 23 deletions python_modules/dagster/dagster/_cli/run.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import click
from tqdm import tqdm

from dagster import __version__ as dagster_version
from dagster._cli.workspace.cli_target import (
get_external_pipeline_or_job_from_kwargs,
job_target_argument,
)
from dagster._core.instance import DagsterInstance


Expand Down Expand Up @@ -73,37 +78,46 @@ def run_wipe_command(force):
raise click.ClickException("Exiting without deleting all run history and event logs.")


@run_cli.command(name="migrate", help="Migrate the run history from one repository to another.")
@run_cli.command(
name="migrate-repository",
help="Migrate the run history for a job from a historic repository to its current repository.",
)
@click.option(
"--from",
"-f",
"from_label",
help="The repository from which to migrate (format: <repository_name>@<location_name>)",
prompt_required=True,
)
@click.option(
"--to",
"-t",
"to_label",
help="The repository name to migrate to (format: <repository_name>@<location_name>)",
prompt_required=True,
)
@click.argument("job_name")
def run_migrate_command(job_name, from_label, to_label):
@job_target_argument
def run_migrate_command(from_label, **kwargs):
from dagster._core.storage.pipeline_run import RunsFilter
from dagster._core.storage.runs.schema import RunTagsTable
from dagster._core.storage.runs.sql_run_storage import SqlRunStorage
from dagster._core.storage.tags import REPOSITORY_LABEL_TAG

if not from_label or not to_label:
raise click.UsageError("Please specify both a --from and --to repository label")
if not from_label:
raise click.UsageError("Must specify a --from repository label")

if not is_valid_repo_label(from_label) or not is_valid_repo_label(to_label):
if not is_valid_repo_label(from_label):
raise click.UsageError(
"`--from` and `--to` arguments must be of the format: <repository_name>@<location_name>"
"`--from` argument must be of the format: <repository_name>@<location_name>"
)

with DagsterInstance.get() as instance:
with get_external_pipeline_or_job_from_kwargs(
instance, version=dagster_version, kwargs=kwargs, using_job_op_graph_apis=True
) as external_pipeline:
new_job_origin = external_pipeline.get_external_origin()
job_name = external_pipeline.name
to_label = new_job_origin.external_repository_origin.get_label()

if not to_label:
raise click.UsageError("Must specify valid job targets to migrate history to.")

if to_label == from_label:
click.echo(f"Migrating runs from {from_label} to {to_label} is a no-op.")
return

records = instance.get_run_records(
filters=RunsFilter(job_name=job_name, tags={REPOSITORY_LABEL_TAG: from_label})
)
Expand All @@ -117,19 +131,13 @@ def run_migrate_command(job_name, from_label, to_label):

count = len(records)
confirmation = click.prompt(
f"Are you sure you want to migrate the run history for {job_name} ({count} runs)? Type MIGRATE"
f"Are you sure you want to migrate the run history for {job_name} from {from_label} to {to_label} ({count} runs)? Type MIGRATE"
)
should_migrate = confirmation == "MIGRATE"

if should_migrate:
for record in tqdm(records):
with instance.run_storage.connect() as conn:
conn.execute(
RunTagsTable.update() # pylint: disable=no-value-for-parameter
.where(RunTagsTable.c.run_id == record.pipeline_run.run_id)
.where(RunTagsTable.c.key == REPOSITORY_LABEL_TAG)
.values(value=to_label)
)
instance.run_storage.replace_job_origin(record.pipeline_run, new_job_origin)
click.echo(f"Migrated the run history for {job_name} from {from_label} to {to_label}.")
else:
raise click.ClickException("Exiting without migrating.")
Expand Down
6 changes: 6 additions & 0 deletions python_modules/dagster/dagster/_core/storage/pipeline_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,12 @@ def with_status(self, status):

return self._replace(status=status)

def with_job_origin(self, origin):
from dagster._core.host_representation.origin import ExternalPipelineOrigin

check.inst_param(origin, "origin", ExternalPipelineOrigin)
return self._replace(external_pipeline_origin=origin)

def with_mode(self, mode):
return self._replace(mode=mode)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,19 @@
from dagster._core.events import EVENT_TYPE_TO_PIPELINE_RUN_STATUS, DagsterEvent, DagsterEventType
from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill
from dagster._core.execution.bulk_actions import BulkActionType
from dagster._core.host_representation.origin import ExternalPipelineOrigin
from dagster._core.snap import (
ExecutionPlanSnapshot,
PipelineSnapshot,
create_execution_plan_snapshot_id,
create_pipeline_snapshot_id,
)
from dagster._core.storage.tags import PARTITION_NAME_TAG, PARTITION_SET_TAG, ROOT_RUN_ID_TAG
from dagster._core.storage.tags import (
PARTITION_NAME_TAG,
PARTITION_SET_TAG,
REPOSITORY_LABEL_TAG,
ROOT_RUN_ID_TAG,
)
from dagster._daemon.types import DaemonHeartbeat
from dagster._serdes import (
deserialize_as,
Expand Down Expand Up @@ -1077,6 +1083,24 @@ def kvs_set(self, pairs: Dict[str, str]) -> None:
.values(value=db.sql.case(pairs, value=KeyValueStoreTable.c.key))
)

# Migrating run history
def replace_job_origin(self, run: PipelineRun, job_origin: ExternalPipelineOrigin):
new_label = job_origin.external_repository_origin.get_label()
with self.connect() as conn:
conn.execute(
RunsTable.update() # pylint: disable=no-value-for-parameter
.where(RunsTable.c.run_id == run.run_id)
.values(
run_body=serialize_dagster_namedtuple(run.with_job_origin(job_origin)),
)
)
conn.execute(
RunTagsTable.update() # pylint: disable=no-value-for-parameter
.where(RunTagsTable.c.run_id == run.run_id)
.where(RunTagsTable.c.key == REPOSITORY_LABEL_TAG)
.values(value=new_label)
)


GET_PIPELINE_SNAPSHOT_QUERY_ID = "get-pipeline-snapshot"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,14 @@ def multiproc():
branch_op(out_1)
branch_op(out_2)

@job
def my_job():
my_op()

@repository
def my_other_repo():
return [my_job]


# default executor_def is multiproc
multiproc_job = multiproc.to_job()
Expand Down Expand Up @@ -856,19 +864,19 @@ def test_run_list_limit():
result = runner.invoke(run_list_command, args="--limit 1")
assert result.exit_code == 0
assert result.output.count("Run: ") == 1
assert result.output.count("Pipeline: double_adder_job") == 1
assert result.output.count("Job: double_adder_job") == 1

# Shows two runs because of the limit argument is now 2
two_results = runner.invoke(run_list_command, args="--limit 2")
assert two_results.exit_code == 0
assert two_results.output.count("Run: ") == 2
assert two_results.output.count("Pipeline: double_adder_job") == 2
assert two_results.output.count("Job: double_adder_job") == 2

# Should only shows two runs although the limit argument is 3 because there are only 2 runs
shows_two_results = runner.invoke(run_list_command, args="--limit 3")
assert shows_two_results.exit_code == 0
assert shows_two_results.output.count("Run: ") == 2
assert shows_two_results.output.count("Pipeline: double_adder_job") == 2
assert shows_two_results.output.count("Job: double_adder_job") == 2


def runner_pipeline_or_job_execute(runner, cli_args):
Expand Down Expand Up @@ -906,7 +914,7 @@ def test_use_env_vars_for_cli_option():

def create_repo_run(instance):
from dagster._core.test_utils import create_run_for_test
from dagster._core.workspace import WorkspaceProcessContext
from dagster._core.workspace.context import WorkspaceProcessContext
from dagster._core.workspace.load_target import PythonFileTarget

with WorkspaceProcessContext(
Expand Down Expand Up @@ -942,17 +950,26 @@ def test_run_migrate_command():
with instance_for_test() as instance:
create_repo_run(instance)
old_repo_label = "my_repo@repo_pipeline_and_job.py"
new_repo_label = "my_new_repo@lalalala"
new_repo_label = "my_other_repo@test_cli_commands.py"

assert len(get_repo_runs(instance, old_repo_label)) == 1
assert len(get_repo_runs(instance, new_repo_label)) == 0

runner = CliRunner()
runner.invoke(
run_migrate_command,
args=["my_job", "--from", old_repo_label, "--to", new_repo_label],
args=[
"--from",
old_repo_label,
"-f",
__file__,
"-r",
"my_other_repo",
"-j",
"my_job"
],
input="MIGRATE",
)

assert len(get_repo_runs(instance, old_repo_label)) == 0
assert len(get_repo_runs(instance, new_repo_label)) == 1
assert len(get_repo_runs(instance, new_repo_label)) == 1
Original file line number Diff line number Diff line change
Expand Up @@ -1505,3 +1505,46 @@ def test_kvs(self, storage):

storage.kvs_set({"foo": "1", "bar": "2", "key": "3"})
assert storage.kvs_get({"foo", "bar", "key"}) == {"foo": "1", "bar": "2", "key": "3"}

def test_migrate_repo(self, storage):
assert storage
self._skip_in_memory(storage)

one = make_new_run_id()
two = make_new_run_id()
job_name = "some_job"

origin_one = self.fake_job_origin(job_name, "fake_repo_one")
origin_two = self.fake_job_origin(job_name, "fake_repo_two")
storage.add_run(
TestRunStorage.build_run(
run_id=one, pipeline_name=job_name, external_pipeline_origin=origin_one
)
)
storage.add_run(
TestRunStorage.build_run(
run_id=two, pipeline_name=job_name, external_pipeline_origin=origin_one
)
)

one_runs = storage.get_runs(
RunsFilter(tags={REPOSITORY_LABEL_TAG: "fake_repo_one@fake:fake"})
)
assert len(one_runs) == 2
two_runs = storage.get_runs(
RunsFilter(tags={REPOSITORY_LABEL_TAG: "fake_repo_two@fake:fake"})
)
assert len(two_runs) == 0

# replace job origin for run one
storage.replace_job_origin(one_runs[1], origin_two)

one_runs = storage.get_runs(
RunsFilter(tags={REPOSITORY_LABEL_TAG: "fake_repo_one@fake:fake"})
)
assert len(one_runs) == 1
two_runs = storage.get_runs(
RunsFilter(tags={REPOSITORY_LABEL_TAG: "fake_repo_two@fake:fake"})
)
assert len(two_runs) == 1
assert two_runs[0].run_id == one

0 comments on commit d48096e

Please sign in to comment.