Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Runless events - refactor job_versions_io_mapping #2654

Merged
merged 2 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 71 additions & 26 deletions api/src/main/java/marquez/db/JobVersionDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,40 +192,73 @@ ExtendedJobVersionRow upsertJobVersion(
String namespaceName);

/**
* Used to link an input dataset to a given job version.
* Used to upsert an input or output dataset to a given job version.
*
* @param jobVersionUuid The unique ID of the job version.
* @param inputDatasetUuid The unique ID of the input dataset.
* @param datasetUuid The unique ID of the output dataset
* @param ioType The {@link IoType} of the dataset.
* @param jobUuid The unique ID of the job.
*/
default void upsertInputDatasetFor(UUID jobVersionUuid, UUID inputDatasetUuid) {
upsertInputOrOutputDatasetFor(jobVersionUuid, inputDatasetUuid, IoType.INPUT);
}
@SqlUpdate(
"""
INSERT INTO job_versions_io_mapping (
job_version_uuid, dataset_uuid, io_type, job_uuid, job_symlink_target_uuid, is_current_job_version, made_current_at)
VALUES (:jobVersionUuid, :datasetUuid, :ioType, :jobUuid, :symlinkTargetJobUuid, TRUE, NOW())
ON CONFLICT (job_version_uuid, dataset_uuid, io_type, job_uuid) DO NOTHING
""")
void upsertCurrentInputOrOutputDatasetFor(
UUID jobVersionUuid,
UUID datasetUuid,
UUID jobUuid,
UUID symlinkTargetJobUuid,
IoType ioType);

@SqlUpdate(
"""
UPDATE job_versions_io_mapping
SET is_current_job_version = FALSE
WHERE (job_uuid = :jobUuid OR job_symlink_target_uuid = :jobUuid)
AND job_version_uuid != :jobVersionUuid
AND io_type = :ioType
AND is_current_job_version = TRUE;
""")
void markInputOrOutputDatasetAsPreviousFor(UUID jobVersionUuid, UUID jobUuid, IoType ioType);

@SqlUpdate(
"""
UPDATE job_versions_io_mapping
SET is_current_job_version = FALSE
WHERE (job_uuid = :jobUuid OR job_symlink_target_uuid = :jobUuid)
AND io_type = :ioType
AND is_current_job_version = TRUE;
""")
void markInputOrOutputDatasetAsPreviousFor(UUID jobUuid, IoType ioType);

/**
* Used to link an output dataset to a given job version.
* Used to link an input dataset to a given job version.
*
* @param jobVersionUuid The unique ID of the job version.
* @param outputDatasetUuid The unique ID of the output dataset.
* @param inputDatasetUuid The unique ID of the input dataset.
* @param jobUuid The unique ID of the job.
*/
default void upsertOutputDatasetFor(UUID jobVersionUuid, UUID outputDatasetUuid) {
upsertInputOrOutputDatasetFor(jobVersionUuid, outputDatasetUuid, IoType.OUTPUT);
default void upsertInputDatasetFor(
UUID jobVersionUuid, UUID inputDatasetUuid, UUID jobUuid, UUID symlinkTargetJobUuid) {
markInputOrOutputDatasetAsPreviousFor(jobVersionUuid, jobUuid, IoType.INPUT);
upsertCurrentInputOrOutputDatasetFor(
jobVersionUuid, inputDatasetUuid, jobUuid, symlinkTargetJobUuid, IoType.INPUT);
}

/**
* Used to upsert an input or output dataset to a given job version.
* Used to link an output dataset to a given job version.
*
* @param jobVersionUuid The unique ID of the job version.
* @param datasetUuid The unique ID of the output dataset
* @param ioType The {@link IoType} of the dataset.
* @param outputDatasetUuid The unique ID of the output dataset.
* @param jobUuid The unique ID of the job.
*/
@SqlUpdate(
"""
INSERT INTO job_versions_io_mapping (
job_version_uuid, dataset_uuid, io_type)
VALUES (:jobVersionUuid, :datasetUuid, :ioType)
ON CONFLICT DO NOTHING
""")
void upsertInputOrOutputDatasetFor(UUID jobVersionUuid, UUID datasetUuid, IoType ioType);
default void upsertOutputDatasetFor(
UUID jobVersionUuid, UUID outputDatasetUuid, UUID jobUuid, UUID symlinkTargetJobUuid) {
markInputOrOutputDatasetAsPreviousFor(jobVersionUuid, jobUuid, IoType.OUTPUT);
upsertCurrentInputOrOutputDatasetFor(
jobVersionUuid, outputDatasetUuid, jobUuid, symlinkTargetJobUuid, IoType.OUTPUT);
}

/**
* Returns the input datasets to a given job version.
Expand Down Expand Up @@ -366,14 +399,20 @@ default BagOfJobVersionInfo upsertRunlessJobVersion(
inputs.forEach(
i -> {
jobVersionDao.upsertInputDatasetFor(
jobVersionRow.getUuid(), i.getDatasetVersionRow().getDatasetUuid());
jobVersionRow.getUuid(),
i.getDatasetVersionRow().getDatasetUuid(),
jobVersionRow.getJobUuid(),
jobRow.getSymlinkTargetId());
});

// Link the output datasets to the job version.
outputs.forEach(
o -> {
jobVersionDao.upsertOutputDatasetFor(
jobVersionRow.getUuid(), o.getDatasetVersionRow().getDatasetUuid());
jobVersionRow.getUuid(),
o.getDatasetVersionRow().getDatasetUuid(),
jobVersionRow.getJobUuid(),
jobRow.getSymlinkTargetId());
});

jobDao.updateVersionFor(jobRow.getUuid(), jobRow.getCreatedAt(), jobVersionRow.getUuid());
Expand Down Expand Up @@ -468,14 +507,20 @@ default BagOfJobVersionInfo upsertJobVersionOnRunTransition(
jobVersionInputs.forEach(
jobVersionInput -> {
jobVersionDao.upsertInputDatasetFor(
jobVersionRow.getUuid(), jobVersionInput.getDatasetUuid());
jobVersionRow.getUuid(),
jobVersionInput.getDatasetUuid(),
jobVersionRow.getJobUuid(),
jobRow.getSymlinkTargetId());
});

// Link the output datasets to the job version.
jobVersionOutputs.forEach(
jobVersionOutput -> {
jobVersionDao.upsertOutputDatasetFor(
jobVersionRow.getUuid(), jobVersionOutput.getDatasetUuid());
jobVersionRow.getUuid(),
jobVersionOutput.getDatasetUuid(),
jobVersionRow.getJobUuid(),
jobRow.getSymlinkTargetId());
});

// Link the job version to the run.
Expand Down
76 changes: 39 additions & 37 deletions api/src/main/java/marquez/db/LineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,43 +56,45 @@ public record UpstreamRunRow(JobSummary job, RunSummary run, DatasetSummary inpu
@SqlQuery(
"""
WITH RECURSIVE
-- Find the current version of a job or its symlink target if the target has no
-- current_version_uuid. This ensures that we don't lose lineage for a job after it is
-- symlinked to another job but before that target job has run successfully.
job_current_version AS (
SELECT COALESCE(j.symlink_target_uuid, j.uuid) AS job_uuid,
COALESCE(s.current_version_uuid, j.current_version_uuid) AS job_version_uuid
FROM jobs j
LEFT JOIN jobs s ON s.uuid=j.symlink_target_uuid
WHERE s.current_version_uuid IS NULL
),
job_io AS (
SELECT j.job_uuid,
ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='INPUT') AS inputs,
ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='OUTPUT') AS outputs
FROM job_versions_io_mapping io
INNER JOIN job_current_version j ON io.job_version_uuid=j.job_version_uuid
GROUP BY j.job_uuid
),
lineage(job_uuid, inputs, outputs) AS (
SELECT v.job_uuid AS job_uuid,
COALESCE(inputs, Array[]::uuid[]) AS inputs,
COALESCE(outputs, Array[]::uuid[]) AS outputs,
0 AS depth
FROM jobs j
INNER JOIN job_current_version v ON (j.symlink_target_uuid IS NULL AND j.uuid=v.job_uuid) OR v.job_uuid=j.symlink_target_uuid
LEFT JOIN job_io io ON io.job_uuid=v.job_uuid
WHERE j.uuid IN (<jobIds>) OR j.symlink_target_uuid IN (<jobIds>)
UNION
SELECT io.job_uuid, io.inputs, io.outputs, l.depth + 1
FROM job_io io,
lineage l
WHERE io.job_uuid != l.job_uuid AND
array_cat(io.inputs, io.outputs) && array_cat(l.inputs, l.outputs)
AND depth < :depth)
SELECT DISTINCT ON (j.uuid) j.*, inputs AS input_uuids, outputs AS output_uuids
FROM lineage l2
INNER JOIN jobs_view j ON j.uuid=l2.job_uuid;
job_io AS (
SELECT
io.job_uuid AS job_uuid,
io.job_symlink_target_uuid AS job_symlink_target_uuid,
ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io.io_type='INPUT') AS inputs,
ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io.io_type='OUTPUT') AS outputs
FROM job_versions_io_mapping io
WHERE io.is_current_job_version = TRUE
GROUP BY io.job_symlink_target_uuid, io.job_uuid
),
lineage(job_uuid, job_symlink_target_uuid, inputs, outputs) AS (
SELECT job_uuid,
job_symlink_target_uuid,
COALESCE(inputs, Array[]::uuid[]) AS inputs,
COALESCE(outputs, Array[]::uuid[]) AS outputs,
0 AS depth
FROM job_io
WHERE job_uuid IN (<jobIds>) OR job_symlink_target_uuid IN (<jobIds>)
UNION
SELECT io.job_uuid, io.job_symlink_target_uuid, io.inputs, io.outputs, l.depth + 1
FROM job_io io, lineage l
WHERE (io.job_uuid != l.job_uuid) AND
array_cat(io.inputs, io.outputs) && array_cat(l.inputs, l.outputs)
AND depth < :depth),
lineage_outside_job_io(job_uuid) AS (
wslulciuc marked this conversation as resolved.
Show resolved Hide resolved
SELECT
param_jobs.param_job_uuid as job_uuid,
j.symlink_target_uuid,
Array[]::uuid[] AS inputs,
Array[]::uuid[] AS outputs,
0 AS depth
FROM (SELECT unnest(ARRAY[<jobIds>]::UUID[]) AS param_job_uuid) param_jobs
LEFT JOIN lineage l on param_jobs.param_job_uuid = l.job_uuid
INNER JOIN jobs j ON j.uuid = param_jobs.param_job_uuid
WHERE l.job_uuid IS NULL
)
SELECT DISTINCT ON (j.uuid) j.*, inputs AS input_uuids, outputs AS output_uuids
FROM (SELECT * FROM lineage UNION SELECT * FROM lineage_outside_job_io) l2
INNER JOIN jobs_view j ON (j.uuid=l2.job_uuid OR j.uuid=l2.job_symlink_target_uuid)
""")
Set<JobData> getLineage(@BindList Set<UUID> jobIds, int depth);

Expand Down
11 changes: 9 additions & 2 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import marquez.common.models.SourceType;
import marquez.db.DatasetFieldDao.DatasetFieldMapping;
import marquez.db.JobVersionDao.BagOfJobVersionInfo;
import marquez.db.JobVersionDao.IoType;
import marquez.db.RunDao.RunUpsert;
import marquez.db.RunDao.RunUpsert.RunUpsertBuilder;
import marquez.db.mappers.LineageEventMapper;
Expand Down Expand Up @@ -362,27 +363,33 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper

// RunInput list uses null as a sentinel value
List<DatasetRecord> datasetInputs = null;
if (event.getInputs() != null) {
if (event.getInputs() != null && !event.getInputs().isEmpty()) {
datasetInputs = new ArrayList<>();
for (Dataset dataset : event.getInputs()) {
DatasetRecord record = upsertLineageDataset(daos, dataset, now, runUuid, true);
datasetInputs.add(record);
insertDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now);
insertInputDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now);
}
} else {
// mark job_versions_io_mapping as obsolete
daos.getJobVersionDao().markInputOrOutputDatasetAsPreviousFor(job.getUuid(), IoType.INPUT);
}
bag.setInputs(Optional.ofNullable(datasetInputs));

// RunInput list uses null as a sentinel value
List<DatasetRecord> datasetOutputs = null;
if (event.getOutputs() != null) {
if (event.getOutputs() != null && !event.getOutputs().isEmpty()) {
datasetOutputs = new ArrayList<>();
for (Dataset dataset : event.getOutputs()) {
DatasetRecord record = upsertLineageDataset(daos, dataset, now, runUuid, false);
datasetOutputs.add(record);
insertDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now);
insertOutputDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now);
}
} else {
// mark job_versions_io_mapping as obsolete
daos.getJobVersionDao().markInputOrOutputDatasetAsPreviousFor(job.getUuid(), IoType.OUTPUT);
}

bag.setOutputs(Optional.ofNullable(datasetOutputs));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2018-2023 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.db.migrations;

import lombok.extern.slf4j.Slf4j;
import org.flywaydb.core.api.MigrationVersion;
import org.flywaydb.core.api.migration.Context;
import org.flywaydb.core.api.migration.JavaMigration;
import org.jdbi.v3.core.Jdbi;

@Slf4j
public class V67_2_JobVersionsIOMappingBackfillJob implements JavaMigration {

public static final String UPDATE_QUERY =
"""
UPDATE job_versions_io_mapping
SET
job_uuid = j.uuid,
job_symlink_target_uuid = j.symlink_target_uuid,
is_current_job_version = (jv.uuid = j.current_version_uuid)::BOOLEAN,
made_current_at = NOW()
FROM job_versions jv
INNER JOIN jobs_view j ON j.uuid = jv.job_uuid
WHERE jv.uuid = job_versions_io_mapping.job_version_uuid
""";

@Override
public MigrationVersion getVersion() {
return MigrationVersion.fromVersion("67.2");
}

@Override
public void migrate(Context context) throws Exception {
Jdbi jdbi = Jdbi.create(context.getConnection());
jdbi.withHandle(h -> h.createUpdate(UPDATE_QUERY).execute());
}

@Override
public String getDescription() {
return "Back fill job_uuid and is_current_job_version in job_versions_io_mapping table";
}

@Override
public Integer getChecksum() {
return null;
}

@Override
public boolean isUndo() {
return false;
}

@Override
public boolean canExecuteInTransaction() {
return false;
}

@Override
public boolean isBaselineMigration() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ BEGIN
LEFT JOIN aliases a ON a.link_target_uuid = j.uuid
) j
WHERE jobs.uuid=j.uuid;
UPDATE job_versions_io_mapping
SET job_symlink_target_uuid=j.symlink_target_uuid
FROM jobs j
WHERE job_versions_io_mapping.job_uuid=j.uuid AND j.uuid = NEW.uuid;
END IF;
SELECT * INTO inserted_job FROM jobs_view
WHERE uuid=job_uuid OR (new_symlink_target_uuid IS NOT NULL AND uuid=new_symlink_target_uuid);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
ALTER TABLE job_versions_io_mapping ADD COLUMN job_uuid uuid REFERENCES jobs(uuid) ON DELETE CASCADE;
ALTER TABLE job_versions_io_mapping ADD COLUMN job_symlink_target_uuid uuid REFERENCES jobs(uuid) ON DELETE CASCADE;
ALTER TABLE job_versions_io_mapping ADD COLUMN is_current_job_version boolean DEFAULT FALSE;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a made_current_at column?

ALTER TABLE job_versions_io_mapping ADD COLUMN made_current_at TIMESTAMP;

-- To add job_uuid to the unique constraint, we first drop the primary key, then recreate it; note given that job_version_uuid can be NULL, we need to check that job_version_uuid != NULL before inserting (duplicate columns otherwise)
ALTER TABLE job_versions_io_mapping DROP CONSTRAINT job_versions_io_mapping_pkey;
ALTER TABLE job_versions_io_mapping ALTER COLUMN job_version_uuid DROP NOT NULL;

CREATE INDEX job_versions_io_mapping_job_uuid_job_symlink_target_uuid ON job_versions_io_mapping (job_uuid, job_symlink_target_uuid);

ALTER TABLE job_versions_io_mapping ADD CONSTRAINT job_versions_io_mapping_mapping_pkey UNIQUE (job_version_uuid, dataset_uuid, io_type, job_uuid);
Loading
Loading