Skip to content

Commit

Permalink
Runless events - refactor job_versions_io_mapping
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <[email protected]>
  • Loading branch information
pawel-big-lebowski committed Oct 23, 2023
1 parent 2cb3e37 commit 9f1dedd
Show file tree
Hide file tree
Showing 9 changed files with 117 additions and 48 deletions.
51 changes: 39 additions & 12 deletions api/src/main/java/marquez/db/JobVersionDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -183,38 +183,65 @@ ExtendedJobVersionRow upsertJobVersion(
/**
* Used to link an input dataset to a given job version.
*
* @param jobVersionUuid The unique ID of the job version.
* @param inputDatasetUuid The unique ID of the input dataset.
* @param jobUuid The unique ID of the job.
*/
default void upsertInputDatasetFor(UUID jobVersionUuid, UUID inputDatasetUuid) {
upsertInputOrOutputDatasetFor(jobVersionUuid, inputDatasetUuid, IoType.INPUT);
default void upsertInputDatasetFor(UUID jobVersionUuid, UUID inputDatasetUuid, UUID jobUuid) {
markVersionIOMappingNotCurrent(jobVersionUuid, jobUuid, IoType.INPUT);
upsertCurrentInputOrOutputDatasetFor(jobVersionUuid, inputDatasetUuid, jobUuid, IoType.INPUT);
// TODO: include this in test -> check if jobUuid is set
}

/**
* Used to link an output dataset to a given job version.
*
* @param jobVersionUuid The unique ID of the job version.
* @param outputDatasetUuid The unique ID of the output dataset.
* @param jobUuid The unique ID of the job.
*/
default void upsertOutputDatasetFor(UUID jobVersionUuid, UUID outputDatasetUuid) {
upsertInputOrOutputDatasetFor(jobVersionUuid, outputDatasetUuid, IoType.OUTPUT);
default void upsertOutputDatasetFor(UUID jobVersionUuid, UUID outputDatasetUuid, UUID jobUuid) {
markVersionIOMappingNotCurrent(jobVersionUuid, jobUuid, IoType.OUTPUT);
upsertCurrentInputOrOutputDatasetFor(jobVersionUuid, outputDatasetUuid, jobUuid, IoType.OUTPUT);
// TODO: include this in test -> check if jobUuid is set
}

@SqlUpdate(
"""
UPDATE job_versions_io_mapping
SET is_job_version_current = FALSE
WHERE (job_uuid = :jobUuid OR symlink_target_job_uuid = :jobUuid)
AND job_version_uuid != :jobVersionUuid
AND io_type = :ioType
AND is_job_version_current = TRUE;
""")
void markVersionIOMappingNotCurrent(UUID jobVersionUuid, UUID jobUuid, IoType ioType);

@SqlUpdate(
"""
UPDATE job_versions_io_mapping
SET is_job_version_current = FALSE
WHERE (job_uuid = :jobUuid OR symlink_target_job_uuid = :jobUuid)
AND io_type = :ioType
AND is_job_version_current = TRUE;
""")
void markVersionIOMappingNotCurrent(UUID jobUuid, IoType ioType);

/**
* Used to upsert an input or output dataset to a given job version.
*
* @param jobVersionUuid The unique ID of the job version.
* @param datasetUuid The unique ID of the output dataset
* @param ioType The {@link IoType} of the dataset.
* @param jobUuid The unique ID of the job.
*/
@SqlUpdate(
"""
INSERT INTO job_versions_io_mapping (
job_version_uuid, dataset_uuid, io_type)
VALUES (:jobVersionUuid, :datasetUuid, :ioType)
ON CONFLICT DO NOTHING
job_version_uuid, dataset_uuid, io_type, job_uuid, is_job_version_current)
VALUES (:jobVersionUuid, :datasetUuid, :ioType, :jobUuid, TRUE)
ON CONFLICT (job_version_uuid, dataset_uuid, io_type, job_uuid) DO UPDATE SET is_job_version_current = TRUE
""")
void upsertInputOrOutputDatasetFor(UUID jobVersionUuid, UUID datasetUuid, IoType ioType);
void upsertCurrentInputOrOutputDatasetFor(
UUID jobVersionUuid, UUID datasetUuid, UUID jobUuid, IoType ioType);

/**
* Returns the input datasets to a given job version.
Expand Down Expand Up @@ -344,14 +371,14 @@ default BagOfJobVersionInfo upsertJobVersionOnRunTransition(
jobVersionInputs.forEach(
jobVersionInput -> {
jobVersionDao.upsertInputDatasetFor(
jobVersionRow.getUuid(), jobVersionInput.getDatasetUuid());
jobVersionRow.getUuid(), jobVersionInput.getDatasetUuid(), jobRow.getUuid());
});

// Link the output datasets to the job version.
jobVersionOutputs.forEach(
jobVersionOutput -> {
jobVersionDao.upsertOutputDatasetFor(
jobVersionRow.getUuid(), jobVersionOutput.getDatasetUuid());
jobVersionRow.getUuid(), jobVersionOutput.getDatasetUuid(), jobRow.getUuid());
});

// Link the job version to the run.
Expand Down
63 changes: 34 additions & 29 deletions api/src/main/java/marquez/db/LineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,46 +39,51 @@ public interface LineageDao {
@SqlQuery(
"""
WITH RECURSIVE
-- Find the current version of a job or its symlink target if the target has no
-- current_version_uuid. This ensures that we don't lose lineage for a job after it is
-- symlinked to another job but before that target job has run successfully.
job_current_version AS (
SELECT COALESCE(j.symlink_target_uuid, j.uuid) AS job_uuid,
COALESCE(s.current_version_uuid, j.current_version_uuid) AS job_version_uuid
FROM jobs j
LEFT JOIN jobs s ON s.uuid=j.symlink_target_uuid
WHERE s.current_version_uuid IS NULL
),
job_io AS (
SELECT j.job_uuid,
ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='INPUT') AS inputs,
ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='OUTPUT') AS outputs
job_io AS (
SELECT
io.job_uuid AS job_uuid,
io.symlink_target_job_uuid AS symlink_target_job_uuid,
ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io.io_type='INPUT') AS inputs,
ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io.io_type='OUTPUT') AS outputs
FROM job_versions_io_mapping io
INNER JOIN job_current_version j ON io.job_version_uuid=j.job_version_uuid
GROUP BY j.job_uuid
WHERE io.is_job_version_current = TRUE
GROUP BY io.symlink_target_job_uuid, io.job_uuid
),
lineage(job_uuid, inputs, outputs) AS (
SELECT v.job_uuid AS job_uuid,
lineage(job_uuid, symlink_target_job_uuid, inputs, outputs) AS (
SELECT job_uuid,
symlink_target_job_uuid,
COALESCE(inputs, Array[]::uuid[]) AS inputs,
COALESCE(outputs, Array[]::uuid[]) AS outputs,
0 AS depth
FROM jobs j
INNER JOIN job_current_version v ON (j.symlink_target_uuid IS NULL AND j.uuid=v.job_uuid) OR v.job_uuid=j.symlink_target_uuid
LEFT JOIN job_io io ON io.job_uuid=v.job_uuid
WHERE j.uuid IN (<jobIds>) OR j.symlink_target_uuid IN (<jobIds>)
FROM job_io
WHERE job_uuid IN (<jobIds>) OR symlink_target_job_uuid IN (<jobIds>)
UNION
SELECT io.job_uuid, io.inputs, io.outputs, l.depth + 1
FROM job_io io,
lineage l
WHERE io.job_uuid != l.job_uuid AND
SELECT io.job_uuid, io.symlink_target_job_uuid, io.inputs, io.outputs, l.depth + 1
FROM job_io io, lineage l
WHERE (io.job_uuid != l.job_uuid) AND
array_cat(io.inputs, io.outputs) && array_cat(l.inputs, l.outputs)
AND depth < :depth)
AND depth < :depth),
lineage_outside_job_io(job_uuid) AS (
SELECT
param_jobs.param_job_uuid as job_uuid,
j.symlink_target_uuid,
Array[]::uuid[] AS inputs,
Array[]::uuid[] AS outputs,
0 AS depth
FROM (SELECT unnest(ARRAY[<jobIds>]::UUID[]) AS param_job_uuid) param_jobs
LEFT JOIN lineage l on param_jobs.param_job_uuid = l.job_uuid
INNER JOIN jobs j ON j.uuid = param_jobs.param_job_uuid
WHERE l.job_uuid IS NULL
)
SELECT DISTINCT ON (j.uuid) j.*, inputs AS input_uuids, outputs AS output_uuids
FROM lineage l2
INNER JOIN jobs_view j ON j.uuid=l2.job_uuid;
FROM (SELECT * FROM lineage UNION SELECT * FROM lineage_outside_job_io) l2
INNER JOIN jobs_view j ON (j.uuid=l2.job_uuid OR j.uuid=l2.symlink_target_job_uuid)
""")
Set<JobData> getLineage(@BindList Set<UUID> jobIds, int depth);

// TODO: verify size of the lineage without (DISCTINCT -> recursion is growing but should not
// happen)

@SqlQuery(
"""
SELECT ds.*, dv.fields, dv.lifecycle_state
Expand Down
13 changes: 11 additions & 2 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import marquez.common.models.SourceType;
import marquez.db.DatasetFieldDao.DatasetFieldMapping;
import marquez.db.JobVersionDao.BagOfJobVersionInfo;
import marquez.db.JobVersionDao.IoType;
import marquez.db.mappers.LineageEventMapper;
import marquez.db.models.ColumnLineageRow;
import marquez.db.models.DatasetFieldRow;
Expand Down Expand Up @@ -225,6 +226,7 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
DatasetDao datasetDao = createDatasetDao();
SourceDao sourceDao = createSourceDao();
JobDao jobDao = createJobDao();
JobVersionDao jobVersionDao = createJobVersionDao();
JobFacetsDao jobFacetsDao = createJobFacetsDao();
DatasetVersionDao datasetVersionDao = createDatasetVersionDao();
DatasetFieldDao datasetFieldDao = createDatasetFieldDao();
Expand Down Expand Up @@ -342,7 +344,7 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper

// RunInput list uses null as a sentinel value
List<DatasetRecord> datasetInputs = null;
if (event.getInputs() != null) {
if (event.getInputs() != null && !event.getInputs().isEmpty()) {
datasetInputs = new ArrayList<>();
for (Dataset dataset : event.getInputs()) {
DatasetRecord record =
Expand Down Expand Up @@ -385,11 +387,15 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
event.getEventType(),
facets));
}
} else {
// mark job_versions_io_mapping as non-current
jobVersionDao.markVersionIOMappingNotCurrent(job.getUuid(), IoType.INPUT);
}

bag.setInputs(Optional.ofNullable(datasetInputs));
// RunInput list uses null as a sentinel value
List<DatasetRecord> datasetOutputs = null;
if (event.getOutputs() != null) {
if (event.getOutputs() != null && !event.getOutputs().isEmpty()) {
datasetOutputs = new ArrayList<>();
for (Dataset dataset : event.getOutputs()) {
DatasetRecord record =
Expand Down Expand Up @@ -432,6 +438,9 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
event.getEventType(),
facets));
}
} else {
// mark job_versions_io_mapping as non-current
jobVersionDao.markVersionIOMappingNotCurrent(job.getUuid(), IoType.OUTPUT);
}

bag.setOutputs(Optional.ofNullable(datasetOutputs));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ BEGIN
LEFT JOIN aliases a ON a.link_target_uuid = j.uuid
) j
WHERE jobs.uuid=j.uuid;
UPDATE job_versions_io_mapping
SET symlink_target_job_uuid=j.symlink_target_uuid
FROM jobs j
WHERE job_versions_io_mapping.job_uuid=j.uuid AND j.uuid = NEW.uuid;
END IF;
SELECT * INTO inserted_job FROM jobs_view
WHERE uuid=job_uuid OR (new_symlink_target_uuid IS NOT NULL AND uuid=new_symlink_target_uuid);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
ALTER TABLE job_versions_io_mapping ADD COLUMN job_uuid uuid REFERENCES jobs(uuid) ON DELETE CASCADE;
ALTER TABLE job_versions_io_mapping ADD COLUMN symlink_target_job_uuid uuid REFERENCES jobs(uuid) ON DELETE CASCADE;
ALTER TABLE job_versions_io_mapping ADD COLUMN is_job_version_current boolean DEFAULT FALSE;
-- TODO: create index for lineage query and update

CREATE INDEX job_versions_io_mapping_job_uuid ON job_versions_io_mapping (job_uuid);

-- To add job_uuid to the unique constraint, we first drop the primary key, then recreate it; note given that job_version_uuid can be NULL, we need to check that job_version_uuid != NULL before inserting (duplicate columns otherwise)
ALTER TABLE job_versions_io_mapping DROP CONSTRAINT job_versions_io_mapping_pkey;
ALTER TABLE job_versions_io_mapping ADD CONSTRAINT job_versions_io_mapping_pkey UNIQUE (job_version_uuid,dataset_uuid,io_type,job_uuid);

-- TODO: add a test which verifies correctness for UNIQUE <- adds multiple rows to job_versions_io_mapping

-- TODO: take care of is_current
-- TODO: take care of symlink_job_uuid
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
UPDATE job_versions_io_mapping
SET job_uuid = job_versions.job_uuid
FROM job_versions
WHERE job_versions_io_mapping.job_version_uuid = job_versions.uuid;

-- TODO: include a test for that, can be a migration test
6 changes: 4 additions & 2 deletions api/src/test/java/marquez/db/JobVersionDaoTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ public void testGetJobVersion() {
.orElseThrow(
() -> new IllegalStateException("Can't find test dataset " + ds.getName()));

jobVersionDao.upsertInputDatasetFor(jobVersionRow.getUuid(), dataset.getUuid());
jobVersionDao.upsertInputDatasetFor(
jobVersionRow.getUuid(), dataset.getUuid(), jobRow.getUuid());
}
for (DatasetId ds : jobMeta.getOutputs()) {
DatasetRow dataset =
Expand All @@ -181,7 +182,8 @@ public void testGetJobVersion() {
.orElseThrow(
() -> new IllegalStateException("Can't find test dataset " + ds.getName()));

jobVersionDao.upsertOutputDatasetFor(jobVersionRow.getUuid(), dataset.getUuid());
jobVersionDao.upsertOutputDatasetFor(
jobVersionRow.getUuid(), dataset.getUuid(), jobVersionRow.getJobUuid());
}
Optional<JobVersion> jobVersion =
jobVersionDao.findJobVersion(namespaceRow.getName(), jobRow.getName(), version.getValue());
Expand Down
1 change: 0 additions & 1 deletion api/src/test/java/marquez/db/LineageDaoTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ public void testGetLineage() {

@Test
public void testGetLineageForSymlinkedJob() throws SQLException {

UpdateLineageRow writeJob =
LineageTestUtils.createLineageRow(
openLineageDao,
Expand Down
6 changes: 4 additions & 2 deletions api/src/test/java/marquez/db/TestingDb.java
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,10 @@ JobVersionRow upsert(@NonNull JobVersionRow row) {
row.getJobName(),
row.getNamespaceUuid(),
row.getNamespaceName());
row.getInputUuids().forEach(in -> dao.upsertInputDatasetFor(row.getUuid(), in));
row.getInputUuids().forEach(out -> dao.upsertInputDatasetFor(row.getUuid(), out));
row.getInputUuids()
.forEach(in -> dao.upsertInputDatasetFor(row.getUuid(), in, row.getJobUuid()));
row.getInputUuids()
.forEach(out -> dao.upsertInputDatasetFor(row.getUuid(), out, row.getJobUuid()));
// ...
delegate.onDemand(JobDao.class).updateVersionFor(row.getJobUuid(), NOW, upserted.getUuid());
return upserted;
Expand Down

0 comments on commit 9f1dedd

Please sign in to comment.