Skip to content

Commit

Permalink
job-mapping rename job_versions_io_mapping to job_io_mapping
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <[email protected]>
  • Loading branch information
pawel-big-lebowski committed Nov 16, 2023
1 parent 78325b6 commit d7166cc
Show file tree
Hide file tree
Showing 13 changed files with 636 additions and 90 deletions.
4 changes: 2 additions & 2 deletions api/src/main/java/marquez/db/DbRetention.java
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ CREATE OR REPLACE FUNCTION delete_datasets_older_than_x_days()
BEGIN
CREATE TEMPORARY TABLE used_datasets_as_io_in_x_days AS (
SELECT dataset_uuid
FROM job_versions_io_mapping AS jvio INNER JOIN job_versions AS jv
FROM job_io_mapping AS jvio INNER JOIN job_versions AS jv
ON jvio.job_version_uuid = jv.uuid
WHERE jv.created_at >= CURRENT_TIMESTAMP - INTERVAL '${retentionDays} days'
);
Expand Down Expand Up @@ -621,7 +621,7 @@ CREATE OR REPLACE FUNCTION estimate_number_of_rows_older_than_x_days(retention_q
"""
CREATE TEMPORARY TABLE used_datasets_as_input_in_x_days AS (
SELECT dataset_uuid
FROM job_versions_io_mapping AS jvio INNER JOIN job_versions AS jv
FROM job_io_mapping AS jvio INNER JOIN job_versions AS jv
ON jvio.job_version_uuid = jv.uuid
WHERE jv.created_at >= CURRENT_TIMESTAMP - INTERVAL '${retentionDays} days'
AND jvio.io_type = 'INPUT'
Expand Down
103 changes: 74 additions & 29 deletions api/src/main/java/marquez/db/JobVersionDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ WITH job_version_io AS (
JSON_AGG(json_build_object('namespace', ds.namespace_name,
'name', ds.name))
FILTER (WHERE io.io_type = 'OUTPUT') AS output_datasets
FROM job_versions_io_mapping io
FROM job_io_mapping io
INNER JOIN job_versions jv ON jv.uuid = io.job_version_uuid
INNER JOIN datasets_view ds ON ds.uuid = io.dataset_uuid
INNER JOIN jobs_view j ON j.uuid=jv.job_uuid
Expand Down Expand Up @@ -192,40 +192,73 @@ ExtendedJobVersionRow upsertJobVersion(
String namespaceName);

/**
* Used to link an input dataset to a given job version.
* Used to upsert an input or output dataset to a given job version.
*
* @param jobVersionUuid The unique ID of the job version.
* @param inputDatasetUuid The unique ID of the input dataset.
* @param datasetUuid The unique ID of the output dataset
* @param ioType The {@link IoType} of the dataset.
* @param jobUuid The unique ID of the job.
*/
default void upsertInputDatasetFor(UUID jobVersionUuid, UUID inputDatasetUuid) {
upsertInputOrOutputDatasetFor(jobVersionUuid, inputDatasetUuid, IoType.INPUT);
}
@SqlUpdate(
"""
INSERT INTO job_io_mapping (
job_version_uuid, dataset_uuid, io_type, job_uuid, symlink_target_job_uuid, is_job_version_current)
VALUES (:jobVersionUuid, :datasetUuid, :ioType, :jobUuid, :symlinkTargetJobUuid, TRUE)
ON CONFLICT (job_version_uuid, dataset_uuid, io_type, job_uuid) DO UPDATE SET is_job_version_current = TRUE
""")
void upsertCurrentInputOrOutputDatasetFor(
UUID jobVersionUuid,
UUID datasetUuid,
UUID jobUuid,
UUID symlinkTargetJobUuid,
IoType ioType);

@SqlUpdate(
"""
UPDATE job_io_mapping
SET is_job_version_current = FALSE
WHERE (job_uuid = :jobUuid OR symlink_target_job_uuid = :jobUuid)
AND job_version_uuid != :jobVersionUuid
AND io_type = :ioType
AND is_job_version_current = TRUE;
""")
void markVersionIOMappingObsolete(UUID jobVersionUuid, UUID jobUuid, IoType ioType);

@SqlUpdate(
"""
UPDATE job_io_mapping
SET is_job_version_current = FALSE
WHERE (job_uuid = :jobUuid OR symlink_target_job_uuid = :jobUuid)
AND io_type = :ioType
AND is_job_version_current = TRUE;
""")
void markVersionIOMappingObsolete(UUID jobUuid, IoType ioType);

/**
* Used to link an output dataset to a given job version.
* Used to link an input dataset to a given job version.
*
* @param jobVersionUuid The unique ID of the job version.
* @param outputDatasetUuid The unique ID of the output dataset.
* @param inputDatasetUuid The unique ID of the input dataset.
* @param jobUuid The unique ID of the job.
*/
default void upsertOutputDatasetFor(UUID jobVersionUuid, UUID outputDatasetUuid) {
upsertInputOrOutputDatasetFor(jobVersionUuid, outputDatasetUuid, IoType.OUTPUT);
default void upsertInputDatasetFor(
UUID jobVersionUuid, UUID inputDatasetUuid, UUID jobUuid, UUID symlinkTargetJobUuid) {
markVersionIOMappingObsolete(jobVersionUuid, jobUuid, IoType.INPUT);
upsertCurrentInputOrOutputDatasetFor(
jobVersionUuid, inputDatasetUuid, jobUuid, symlinkTargetJobUuid, IoType.INPUT);
}

/**
* Used to upsert an input or output dataset to a given job version.
* Used to link an output dataset to a given job version.
*
* @param jobVersionUuid The unique ID of the job version.
* @param datasetUuid The unique ID of the output dataset
* @param ioType The {@link IoType} of the dataset.
* @param outputDatasetUuid The unique ID of the output dataset.
* @param jobUuid The unique ID of the job.
*/
@SqlUpdate(
"""
INSERT INTO job_versions_io_mapping (
job_version_uuid, dataset_uuid, io_type)
VALUES (:jobVersionUuid, :datasetUuid, :ioType)
ON CONFLICT DO NOTHING
""")
void upsertInputOrOutputDatasetFor(UUID jobVersionUuid, UUID datasetUuid, IoType ioType);
default void upsertOutputDatasetFor(
UUID jobVersionUuid, UUID outputDatasetUuid, UUID jobUuid, UUID symlinkTargetJobUuid) {
markVersionIOMappingObsolete(jobVersionUuid, jobUuid, IoType.OUTPUT);
upsertCurrentInputOrOutputDatasetFor(
jobVersionUuid, outputDatasetUuid, jobUuid, symlinkTargetJobUuid, IoType.OUTPUT);
}

/**
* Returns the input datasets to a given job version.
Expand Down Expand Up @@ -256,7 +289,7 @@ default List<UUID> findOutputDatasetsFor(UUID jobVersionUuid) {
@SqlQuery(
"""
SELECT dataset_uuid
FROM job_versions_io_mapping
FROM job_io_mapping
WHERE job_version_uuid = :jobVersionUuid
AND io_type = :ioType
""")
Expand All @@ -265,7 +298,7 @@ default List<UUID> findOutputDatasetsFor(UUID jobVersionUuid) {
@SqlQuery(
"""
SELECT d.namespace_name, d.name, io.io_type
FROM job_versions_io_mapping io
FROM job_io_mapping io
INNER JOIN jobs_view j ON j.current_version_uuid = io.job_version_uuid
INNER JOIN datasets_view d on d.uuid = io.dataset_uuid
WHERE j.name = :jobName AND j.namespace_name=:jobNamespace
Expand Down Expand Up @@ -366,14 +399,20 @@ default BagOfJobVersionInfo upsertRunlessJobVersion(
inputs.forEach(
i -> {
jobVersionDao.upsertInputDatasetFor(
jobVersionRow.getUuid(), i.getDatasetVersionRow().getDatasetUuid());
jobVersionRow.getUuid(),
i.getDatasetVersionRow().getDatasetUuid(),
jobVersionRow.getJobUuid(),
jobRow.getSymlinkTargetId());
});

// Link the output datasets to the job version.
outputs.forEach(
o -> {
jobVersionDao.upsertOutputDatasetFor(
jobVersionRow.getUuid(), o.getDatasetVersionRow().getDatasetUuid());
jobVersionRow.getUuid(),
o.getDatasetVersionRow().getDatasetUuid(),
jobVersionRow.getJobUuid(),
jobRow.getSymlinkTargetId());
});

jobDao.updateVersionFor(jobRow.getUuid(), jobRow.getCreatedAt(), jobVersionRow.getUuid());
Expand Down Expand Up @@ -468,14 +507,20 @@ default BagOfJobVersionInfo upsertJobVersionOnRunTransition(
jobVersionInputs.forEach(
jobVersionInput -> {
jobVersionDao.upsertInputDatasetFor(
jobVersionRow.getUuid(), jobVersionInput.getDatasetUuid());
jobVersionRow.getUuid(),
jobVersionInput.getDatasetUuid(),
jobVersionRow.getJobUuid(),
jobRow.getSymlinkTargetId());
});

// Link the output datasets to the job version.
jobVersionOutputs.forEach(
jobVersionOutput -> {
jobVersionDao.upsertOutputDatasetFor(
jobVersionRow.getUuid(), jobVersionOutput.getDatasetUuid());
jobVersionRow.getUuid(),
jobVersionOutput.getDatasetUuid(),
jobVersionRow.getJobUuid(),
jobRow.getSymlinkTargetId());
});

// Link the job version to the run.
Expand Down
78 changes: 40 additions & 38 deletions api/src/main/java/marquez/db/LineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,43 +56,45 @@ public record UpstreamRunRow(JobSummary job, RunSummary run, DatasetSummary inpu
@SqlQuery(
"""
WITH RECURSIVE
-- Find the current version of a job or its symlink target if the target has no
-- current_version_uuid. This ensures that we don't lose lineage for a job after it is
-- symlinked to another job but before that target job has run successfully.
job_current_version AS (
SELECT COALESCE(j.symlink_target_uuid, j.uuid) AS job_uuid,
COALESCE(s.current_version_uuid, j.current_version_uuid) AS job_version_uuid
FROM jobs j
LEFT JOIN jobs s ON s.uuid=j.symlink_target_uuid
WHERE s.current_version_uuid IS NULL
),
job_io AS (
SELECT j.job_uuid,
ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='INPUT') AS inputs,
ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='OUTPUT') AS outputs
FROM job_versions_io_mapping io
INNER JOIN job_current_version j ON io.job_version_uuid=j.job_version_uuid
GROUP BY j.job_uuid
),
lineage(job_uuid, inputs, outputs) AS (
SELECT v.job_uuid AS job_uuid,
COALESCE(inputs, Array[]::uuid[]) AS inputs,
COALESCE(outputs, Array[]::uuid[]) AS outputs,
0 AS depth
FROM jobs j
INNER JOIN job_current_version v ON (j.symlink_target_uuid IS NULL AND j.uuid=v.job_uuid) OR v.job_uuid=j.symlink_target_uuid
LEFT JOIN job_io io ON io.job_uuid=v.job_uuid
WHERE j.uuid IN (<jobIds>) OR j.symlink_target_uuid IN (<jobIds>)
UNION
SELECT io.job_uuid, io.inputs, io.outputs, l.depth + 1
FROM job_io io,
lineage l
WHERE io.job_uuid != l.job_uuid AND
array_cat(io.inputs, io.outputs) && array_cat(l.inputs, l.outputs)
AND depth < :depth)
SELECT DISTINCT ON (j.uuid) j.*, inputs AS input_uuids, outputs AS output_uuids
FROM lineage l2
INNER JOIN jobs_view j ON j.uuid=l2.job_uuid;
job_io AS (
SELECT
io.job_uuid AS job_uuid,
io.symlink_target_job_uuid AS symlink_target_job_uuid,
ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io.io_type='INPUT') AS inputs,
ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io.io_type='OUTPUT') AS outputs
FROM job_io_mapping io
WHERE io.is_job_version_current = TRUE
GROUP BY io.symlink_target_job_uuid, io.job_uuid
),
lineage(job_uuid, symlink_target_job_uuid, inputs, outputs) AS (
SELECT job_uuid,
symlink_target_job_uuid,
COALESCE(inputs, Array[]::uuid[]) AS inputs,
COALESCE(outputs, Array[]::uuid[]) AS outputs,
0 AS depth
FROM job_io
WHERE job_uuid IN (<jobIds>) OR symlink_target_job_uuid IN (<jobIds>)
UNION
SELECT io.job_uuid, io.symlink_target_job_uuid, io.inputs, io.outputs, l.depth + 1
FROM job_io io, lineage l
WHERE (io.job_uuid != l.job_uuid) AND
array_cat(io.inputs, io.outputs) && array_cat(l.inputs, l.outputs)
AND depth < :depth),
lineage_outside_job_io(job_uuid) AS (
SELECT
param_jobs.param_job_uuid as job_uuid,
j.symlink_target_uuid,
Array[]::uuid[] AS inputs,
Array[]::uuid[] AS outputs,
0 AS depth
FROM (SELECT unnest(ARRAY[<jobIds>]::UUID[]) AS param_job_uuid) param_jobs
LEFT JOIN lineage l on param_jobs.param_job_uuid = l.job_uuid
INNER JOIN jobs j ON j.uuid = param_jobs.param_job_uuid
WHERE l.job_uuid IS NULL
)
SELECT DISTINCT ON (j.uuid) j.*, inputs AS input_uuids, outputs AS output_uuids
FROM (SELECT * FROM lineage UNION SELECT * FROM lineage_outside_job_io) l2
INNER JOIN jobs_view j ON (j.uuid=l2.job_uuid OR j.uuid=l2.symlink_target_job_uuid)
""")
Set<JobData> getLineage(@BindList Set<UUID> jobIds, int depth);

Expand All @@ -116,7 +118,7 @@ WHERE ds.uuid IN (<dsUuids>)""")
"""
SELECT j.uuid FROM jobs j
INNER JOIN job_versions jv ON jv.job_uuid = j.uuid
INNER JOIN job_versions_io_mapping io ON io.job_version_uuid = jv.uuid
INNER JOIN job_io_mapping io ON io.job_version_uuid = jv.uuid
INNER JOIN datasets_view ds ON ds.uuid = io.dataset_uuid
WHERE ds.name = :datasetName AND ds.namespace_name = :namespaceName
ORDER BY io_type DESC, jv.created_at DESC
Expand Down
11 changes: 9 additions & 2 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import marquez.common.models.SourceType;
import marquez.db.DatasetFieldDao.DatasetFieldMapping;
import marquez.db.JobVersionDao.BagOfJobVersionInfo;
import marquez.db.JobVersionDao.IoType;
import marquez.db.RunDao.RunUpsert;
import marquez.db.RunDao.RunUpsert.RunUpsertBuilder;
import marquez.db.mappers.LineageEventMapper;
Expand Down Expand Up @@ -362,27 +363,33 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper

// RunInput list uses null as a sentinel value
List<DatasetRecord> datasetInputs = null;
if (event.getInputs() != null) {
if (event.getInputs() != null && !event.getInputs().isEmpty()) {
datasetInputs = new ArrayList<>();
for (Dataset dataset : event.getInputs()) {
DatasetRecord record = upsertLineageDataset(daos, dataset, now, runUuid, true);
datasetInputs.add(record);
insertDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now);
insertInputDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now);
}
} else {
// mark job_io_mapping as obsolete
daos.getJobVersionDao().markVersionIOMappingObsolete(job.getUuid(), IoType.INPUT);
}
bag.setInputs(Optional.ofNullable(datasetInputs));

// RunInput list uses null as a sentinel value
List<DatasetRecord> datasetOutputs = null;
if (event.getOutputs() != null) {
if (event.getOutputs() != null && !event.getOutputs().isEmpty()) {
datasetOutputs = new ArrayList<>();
for (Dataset dataset : event.getOutputs()) {
DatasetRecord record = upsertLineageDataset(daos, dataset, now, runUuid, false);
datasetOutputs.add(record);
insertDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now);
insertOutputDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now);
}
} else {
// mark job_io_mapping as obsolete
daos.getJobVersionDao().markVersionIOMappingObsolete(job.getUuid(), IoType.OUTPUT);
}

bag.setOutputs(Optional.ofNullable(datasetOutputs));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2018-2023 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.db.migrations;

import lombok.extern.slf4j.Slf4j;
import org.flywaydb.core.api.MigrationVersion;
import org.flywaydb.core.api.migration.Context;
import org.flywaydb.core.api.migration.JavaMigration;
import org.jdbi.v3.core.Jdbi;

@Slf4j
public class V67_2_JobVersionsIOMappingBackfillJob implements JavaMigration {

public static final String UPDATE_QUERY =
"""
UPDATE job_io_mapping
SET
job_uuid = j.uuid,
symlink_target_job_uuid = j.symlink_target_uuid,
is_job_version_current = (jv.uuid = j.current_version_uuid)::BOOLEAN
FROM job_versions jv
INNER JOIN jobs_view j ON j.uuid = jv.job_uuid
WHERE jv.uuid = job_io_mapping.job_version_uuid
""";

@Override
public MigrationVersion getVersion() {
return MigrationVersion.fromVersion("67.2");
}

@Override
public void migrate(Context context) throws Exception {
Jdbi jdbi = Jdbi.create(context.getConnection());
jdbi.withHandle(h -> h.createUpdate(UPDATE_QUERY).execute());
}

@Override
public String getDescription() {
return "Back fill job_uuid and is_job_version_current in job_io_mapping table";
}

@Override
public Integer getChecksum() {
return null;
}

@Override
public boolean isUndo() {
return false;
}

@Override
public boolean canExecuteInTransaction() {
return false;
}

@Override
public boolean isBaselineMigration() {
return false;
}
}
Loading

0 comments on commit d7166cc

Please sign in to comment.