diff --git a/api/src/main/java/marquez/db/JobVersionDao.java b/api/src/main/java/marquez/db/JobVersionDao.java index 9173c8109f..23f7a9f118 100644 --- a/api/src/main/java/marquez/db/JobVersionDao.java +++ b/api/src/main/java/marquez/db/JobVersionDao.java @@ -192,40 +192,73 @@ ExtendedJobVersionRow upsertJobVersion( String namespaceName); /** - * Used to link an input dataset to a given job version. + * Used to upsert an input or output dataset to a given job version. * * @param jobVersionUuid The unique ID of the job version. - * @param inputDatasetUuid The unique ID of the input dataset. + * @param datasetUuid The unique ID of the output dataset + * @param ioType The {@link IoType} of the dataset. + * @param jobUuid The unique ID of the job. */ - default void upsertInputDatasetFor(UUID jobVersionUuid, UUID inputDatasetUuid) { - upsertInputOrOutputDatasetFor(jobVersionUuid, inputDatasetUuid, IoType.INPUT); - } + @SqlUpdate( + """ + INSERT INTO job_versions_io_mapping ( + job_version_uuid, dataset_uuid, io_type, job_uuid, job_symlink_target_uuid, is_current_job_version, made_current_at) + VALUES (:jobVersionUuid, :datasetUuid, :ioType, :jobUuid, :symlinkTargetJobUuid, TRUE, NOW()) + ON CONFLICT (job_version_uuid, dataset_uuid, io_type, job_uuid) DO NOTHING + """) + void upsertCurrentInputOrOutputDatasetFor( + UUID jobVersionUuid, + UUID datasetUuid, + UUID jobUuid, + UUID symlinkTargetJobUuid, + IoType ioType); + + @SqlUpdate( + """ + UPDATE job_versions_io_mapping + SET is_current_job_version = FALSE + WHERE (job_uuid = :jobUuid OR job_symlink_target_uuid = :jobUuid) + AND job_version_uuid != :jobVersionUuid + AND io_type = :ioType + AND is_current_job_version = TRUE; + """) + void markInputOrOutputDatasetAsPreviousFor(UUID jobVersionUuid, UUID jobUuid, IoType ioType); + + @SqlUpdate( + """ + UPDATE job_versions_io_mapping + SET is_current_job_version = FALSE + WHERE (job_uuid = :jobUuid OR job_symlink_target_uuid = :jobUuid) + AND io_type = :ioType + AND is_current_job_version = TRUE; + """) + void markInputOrOutputDatasetAsPreviousFor(UUID jobUuid, IoType ioType); /** - * Used to link an output dataset to a given job version. + * Used to link an input dataset to a given job version. * - * @param jobVersionUuid The unique ID of the job version. - * @param outputDatasetUuid The unique ID of the output dataset. + * @param inputDatasetUuid The unique ID of the input dataset. + * @param jobUuid The unique ID of the job. */ - default void upsertOutputDatasetFor(UUID jobVersionUuid, UUID outputDatasetUuid) { - upsertInputOrOutputDatasetFor(jobVersionUuid, outputDatasetUuid, IoType.OUTPUT); + default void upsertInputDatasetFor( + UUID jobVersionUuid, UUID inputDatasetUuid, UUID jobUuid, UUID symlinkTargetJobUuid) { + markInputOrOutputDatasetAsPreviousFor(jobVersionUuid, jobUuid, IoType.INPUT); + upsertCurrentInputOrOutputDatasetFor( + jobVersionUuid, inputDatasetUuid, jobUuid, symlinkTargetJobUuid, IoType.INPUT); } /** - * Used to upsert an input or output dataset to a given job version. + * Used to link an output dataset to a given job version. * - * @param jobVersionUuid The unique ID of the job version. - * @param datasetUuid The unique ID of the output dataset - * @param ioType The {@link IoType} of the dataset. + * @param outputDatasetUuid The unique ID of the output dataset. + * @param jobUuid The unique ID of the job. */ - @SqlUpdate( - """ - INSERT INTO job_versions_io_mapping ( - job_version_uuid, dataset_uuid, io_type) - VALUES (:jobVersionUuid, :datasetUuid, :ioType) - ON CONFLICT DO NOTHING - """) - void upsertInputOrOutputDatasetFor(UUID jobVersionUuid, UUID datasetUuid, IoType ioType); + default void upsertOutputDatasetFor( + UUID jobVersionUuid, UUID outputDatasetUuid, UUID jobUuid, UUID symlinkTargetJobUuid) { + markInputOrOutputDatasetAsPreviousFor(jobVersionUuid, jobUuid, IoType.OUTPUT); + upsertCurrentInputOrOutputDatasetFor( + jobVersionUuid, outputDatasetUuid, jobUuid, symlinkTargetJobUuid, IoType.OUTPUT); + } /** * Returns the input datasets to a given job version. @@ -366,14 +399,20 @@ default BagOfJobVersionInfo upsertRunlessJobVersion( inputs.forEach( i -> { jobVersionDao.upsertInputDatasetFor( - jobVersionRow.getUuid(), i.getDatasetVersionRow().getDatasetUuid()); + jobVersionRow.getUuid(), + i.getDatasetVersionRow().getDatasetUuid(), + jobVersionRow.getJobUuid(), + jobRow.getSymlinkTargetId()); }); // Link the output datasets to the job version. outputs.forEach( o -> { jobVersionDao.upsertOutputDatasetFor( - jobVersionRow.getUuid(), o.getDatasetVersionRow().getDatasetUuid()); + jobVersionRow.getUuid(), + o.getDatasetVersionRow().getDatasetUuid(), + jobVersionRow.getJobUuid(), + jobRow.getSymlinkTargetId()); }); jobDao.updateVersionFor(jobRow.getUuid(), jobRow.getCreatedAt(), jobVersionRow.getUuid()); @@ -468,14 +507,20 @@ default BagOfJobVersionInfo upsertJobVersionOnRunTransition( jobVersionInputs.forEach( jobVersionInput -> { jobVersionDao.upsertInputDatasetFor( - jobVersionRow.getUuid(), jobVersionInput.getDatasetUuid()); + jobVersionRow.getUuid(), + jobVersionInput.getDatasetUuid(), + jobVersionRow.getJobUuid(), + jobRow.getSymlinkTargetId()); }); // Link the output datasets to the job version. jobVersionOutputs.forEach( jobVersionOutput -> { jobVersionDao.upsertOutputDatasetFor( - jobVersionRow.getUuid(), jobVersionOutput.getDatasetUuid()); + jobVersionRow.getUuid(), + jobVersionOutput.getDatasetUuid(), + jobVersionRow.getJobUuid(), + jobRow.getSymlinkTargetId()); }); // Link the job version to the run. diff --git a/api/src/main/java/marquez/db/LineageDao.java b/api/src/main/java/marquez/db/LineageDao.java index 5e520b22a6..6a550a04ba 100644 --- a/api/src/main/java/marquez/db/LineageDao.java +++ b/api/src/main/java/marquez/db/LineageDao.java @@ -56,43 +56,45 @@ public record UpstreamRunRow(JobSummary job, RunSummary run, DatasetSummary inpu @SqlQuery( """ WITH RECURSIVE - -- Find the current version of a job or its symlink target if the target has no - -- current_version_uuid. This ensures that we don't lose lineage for a job after it is - -- symlinked to another job but before that target job has run successfully. - job_current_version AS ( - SELECT COALESCE(j.symlink_target_uuid, j.uuid) AS job_uuid, - COALESCE(s.current_version_uuid, j.current_version_uuid) AS job_version_uuid - FROM jobs j - LEFT JOIN jobs s ON s.uuid=j.symlink_target_uuid - WHERE s.current_version_uuid IS NULL - ), - job_io AS ( - SELECT j.job_uuid, - ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='INPUT') AS inputs, - ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='OUTPUT') AS outputs - FROM job_versions_io_mapping io - INNER JOIN job_current_version j ON io.job_version_uuid=j.job_version_uuid - GROUP BY j.job_uuid - ), - lineage(job_uuid, inputs, outputs) AS ( - SELECT v.job_uuid AS job_uuid, - COALESCE(inputs, Array[]::uuid[]) AS inputs, - COALESCE(outputs, Array[]::uuid[]) AS outputs, - 0 AS depth - FROM jobs j - INNER JOIN job_current_version v ON (j.symlink_target_uuid IS NULL AND j.uuid=v.job_uuid) OR v.job_uuid=j.symlink_target_uuid - LEFT JOIN job_io io ON io.job_uuid=v.job_uuid - WHERE j.uuid IN () OR j.symlink_target_uuid IN () - UNION - SELECT io.job_uuid, io.inputs, io.outputs, l.depth + 1 - FROM job_io io, - lineage l - WHERE io.job_uuid != l.job_uuid AND - array_cat(io.inputs, io.outputs) && array_cat(l.inputs, l.outputs) - AND depth < :depth) - SELECT DISTINCT ON (j.uuid) j.*, inputs AS input_uuids, outputs AS output_uuids - FROM lineage l2 - INNER JOIN jobs_view j ON j.uuid=l2.job_uuid; + job_io AS ( + SELECT + io.job_uuid AS job_uuid, + io.job_symlink_target_uuid AS job_symlink_target_uuid, + ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io.io_type='INPUT') AS inputs, + ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io.io_type='OUTPUT') AS outputs + FROM job_versions_io_mapping io + WHERE io.is_current_job_version = TRUE + GROUP BY io.job_symlink_target_uuid, io.job_uuid + ), + lineage(job_uuid, job_symlink_target_uuid, inputs, outputs) AS ( + SELECT job_uuid, + job_symlink_target_uuid, + COALESCE(inputs, Array[]::uuid[]) AS inputs, + COALESCE(outputs, Array[]::uuid[]) AS outputs, + 0 AS depth + FROM job_io + WHERE job_uuid IN () OR job_symlink_target_uuid IN () + UNION + SELECT io.job_uuid, io.job_symlink_target_uuid, io.inputs, io.outputs, l.depth + 1 + FROM job_io io, lineage l + WHERE (io.job_uuid != l.job_uuid) AND + array_cat(io.inputs, io.outputs) && array_cat(l.inputs, l.outputs) + AND depth < :depth), + lineage_outside_job_io(job_uuid) AS ( + SELECT + param_jobs.param_job_uuid as job_uuid, + j.symlink_target_uuid, + Array[]::uuid[] AS inputs, + Array[]::uuid[] AS outputs, + 0 AS depth + FROM (SELECT unnest(ARRAY[]::UUID[]) AS param_job_uuid) param_jobs + LEFT JOIN lineage l on param_jobs.param_job_uuid = l.job_uuid + INNER JOIN jobs j ON j.uuid = param_jobs.param_job_uuid + WHERE l.job_uuid IS NULL + ) + SELECT DISTINCT ON (j.uuid) j.*, inputs AS input_uuids, outputs AS output_uuids + FROM (SELECT * FROM lineage UNION SELECT * FROM lineage_outside_job_io) l2 + INNER JOIN jobs_view j ON (j.uuid=l2.job_uuid OR j.uuid=l2.job_symlink_target_uuid) """) Set getLineage(@BindList Set jobIds, int depth); diff --git a/api/src/main/java/marquez/db/OpenLineageDao.java b/api/src/main/java/marquez/db/OpenLineageDao.java index 2663d7ed7e..a1512e755d 100644 --- a/api/src/main/java/marquez/db/OpenLineageDao.java +++ b/api/src/main/java/marquez/db/OpenLineageDao.java @@ -33,6 +33,7 @@ import marquez.common.models.SourceType; import marquez.db.DatasetFieldDao.DatasetFieldMapping; import marquez.db.JobVersionDao.BagOfJobVersionInfo; +import marquez.db.JobVersionDao.IoType; import marquez.db.RunDao.RunUpsert; import marquez.db.RunDao.RunUpsert.RunUpsertBuilder; import marquez.db.mappers.LineageEventMapper; @@ -362,7 +363,7 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper // RunInput list uses null as a sentinel value List datasetInputs = null; - if (event.getInputs() != null) { + if (event.getInputs() != null && !event.getInputs().isEmpty()) { datasetInputs = new ArrayList<>(); for (Dataset dataset : event.getInputs()) { DatasetRecord record = upsertLineageDataset(daos, dataset, now, runUuid, true); @@ -370,12 +371,15 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper insertDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now); insertInputDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now); } + } else { + // mark job_versions_io_mapping as obsolete + daos.getJobVersionDao().markInputOrOutputDatasetAsPreviousFor(job.getUuid(), IoType.INPUT); } bag.setInputs(Optional.ofNullable(datasetInputs)); // RunInput list uses null as a sentinel value List datasetOutputs = null; - if (event.getOutputs() != null) { + if (event.getOutputs() != null && !event.getOutputs().isEmpty()) { datasetOutputs = new ArrayList<>(); for (Dataset dataset : event.getOutputs()) { DatasetRecord record = upsertLineageDataset(daos, dataset, now, runUuid, false); @@ -383,6 +387,9 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper insertDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now); insertOutputDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now); } + } else { + // mark job_versions_io_mapping as obsolete + daos.getJobVersionDao().markInputOrOutputDatasetAsPreviousFor(job.getUuid(), IoType.OUTPUT); } bag.setOutputs(Optional.ofNullable(datasetOutputs)); diff --git a/api/src/main/java/marquez/db/migrations/V67_2_JobVersionsIOMappingBackfillJob.java b/api/src/main/java/marquez/db/migrations/V67_2_JobVersionsIOMappingBackfillJob.java new file mode 100644 index 0000000000..0e6690d708 --- /dev/null +++ b/api/src/main/java/marquez/db/migrations/V67_2_JobVersionsIOMappingBackfillJob.java @@ -0,0 +1,65 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.db.migrations; + +import lombok.extern.slf4j.Slf4j; +import org.flywaydb.core.api.MigrationVersion; +import org.flywaydb.core.api.migration.Context; +import org.flywaydb.core.api.migration.JavaMigration; +import org.jdbi.v3.core.Jdbi; + +@Slf4j +public class V67_2_JobVersionsIOMappingBackfillJob implements JavaMigration { + + public static final String UPDATE_QUERY = + """ + UPDATE job_versions_io_mapping + SET + job_uuid = j.uuid, + job_symlink_target_uuid = j.symlink_target_uuid, + is_current_job_version = (jv.uuid = j.current_version_uuid)::BOOLEAN, + made_current_at = NOW() + FROM job_versions jv + INNER JOIN jobs_view j ON j.uuid = jv.job_uuid + WHERE jv.uuid = job_versions_io_mapping.job_version_uuid + """; + + @Override + public MigrationVersion getVersion() { + return MigrationVersion.fromVersion("67.2"); + } + + @Override + public void migrate(Context context) throws Exception { + Jdbi jdbi = Jdbi.create(context.getConnection()); + jdbi.withHandle(h -> h.createUpdate(UPDATE_QUERY).execute()); + } + + @Override + public String getDescription() { + return "Back fill job_uuid and is_current_job_version in job_versions_io_mapping table"; + } + + @Override + public Integer getChecksum() { + return null; + } + + @Override + public boolean isUndo() { + return false; + } + + @Override + public boolean canExecuteInTransaction() { + return false; + } + + @Override + public boolean isBaselineMigration() { + return false; + } +} diff --git a/api/src/main/resources/marquez/db/migration/R__1_Jobs_view_and_rewrite_function.sql b/api/src/main/resources/marquez/db/migration/R__1_Jobs_view_and_rewrite_function.sql index eb390b9d5c..8f22f987a2 100644 --- a/api/src/main/resources/marquez/db/migration/R__1_Jobs_view_and_rewrite_function.sql +++ b/api/src/main/resources/marquez/db/migration/R__1_Jobs_view_and_rewrite_function.sql @@ -111,6 +111,10 @@ BEGIN LEFT JOIN aliases a ON a.link_target_uuid = j.uuid ) j WHERE jobs.uuid=j.uuid; + UPDATE job_versions_io_mapping + SET job_symlink_target_uuid=j.symlink_target_uuid + FROM jobs j + WHERE job_versions_io_mapping.job_uuid=j.uuid AND j.uuid = NEW.uuid; END IF; SELECT * INTO inserted_job FROM jobs_view WHERE uuid=job_uuid OR (new_symlink_target_uuid IS NOT NULL AND uuid=new_symlink_target_uuid); diff --git a/api/src/main/resources/marquez/db/migration/V67.1__job_versions_io_mapping_add_job_reference.sql b/api/src/main/resources/marquez/db/migration/V67.1__job_versions_io_mapping_add_job_reference.sql new file mode 100644 index 0000000000..3491add251 --- /dev/null +++ b/api/src/main/resources/marquez/db/migration/V67.1__job_versions_io_mapping_add_job_reference.sql @@ -0,0 +1,12 @@ +ALTER TABLE job_versions_io_mapping ADD COLUMN job_uuid uuid REFERENCES jobs(uuid) ON DELETE CASCADE; +ALTER TABLE job_versions_io_mapping ADD COLUMN job_symlink_target_uuid uuid REFERENCES jobs(uuid) ON DELETE CASCADE; +ALTER TABLE job_versions_io_mapping ADD COLUMN is_current_job_version boolean DEFAULT FALSE; +ALTER TABLE job_versions_io_mapping ADD COLUMN made_current_at TIMESTAMP; + +-- To add job_uuid to the unique constraint, we first drop the primary key, then recreate it; note given that job_version_uuid can be NULL, we need to check that job_version_uuid != NULL before inserting (duplicate columns otherwise) +ALTER TABLE job_versions_io_mapping DROP CONSTRAINT job_versions_io_mapping_pkey; +ALTER TABLE job_versions_io_mapping ALTER COLUMN job_version_uuid DROP NOT NULL; + +CREATE INDEX job_versions_io_mapping_job_uuid_job_symlink_target_uuid ON job_versions_io_mapping (job_uuid, job_symlink_target_uuid); + +ALTER TABLE job_versions_io_mapping ADD CONSTRAINT job_versions_io_mapping_mapping_pkey UNIQUE (job_version_uuid, dataset_uuid, io_type, job_uuid); \ No newline at end of file diff --git a/api/src/test/java/marquez/db/BackfillTestUtils.java b/api/src/test/java/marquez/db/BackfillTestUtils.java index 07b798ed95..16dd9ccfed 100644 --- a/api/src/test/java/marquez/db/BackfillTestUtils.java +++ b/api/src/test/java/marquez/db/BackfillTestUtils.java @@ -17,9 +17,12 @@ import java.util.Optional; import java.util.UUID; import marquez.common.Utils; +import marquez.common.models.DatasetType; +import marquez.db.models.DatasetRow; import marquez.db.models.NamespaceRow; import marquez.db.models.RunArgsRow; import marquez.db.models.RunRow; +import marquez.db.models.SourceRow; import marquez.service.models.LineageEvent; import marquez.service.models.LineageEvent.JobFacet; import marquez.service.models.LineageEvent.JobLink; @@ -115,8 +118,14 @@ INSERT INTO job_versions (uuid, created_at, updated_at, job_uuid, version, locat .job( new LineageEvent.Job( NAMESPACE, jobName, new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP))) - .inputs(Collections.emptyList()) - .outputs(Collections.emptyList()) + .inputs( + Collections.singletonList( + new LineageEvent.Dataset( + "namespace", "dataset_a", LineageEvent.DatasetFacets.builder().build()))) + .outputs( + Collections.singletonList( + new LineageEvent.Dataset( + "namespace", "dataset_b", LineageEvent.DatasetFacets.builder().build()))) .producer(PRODUCER_URL.toString()) .build(); PGobject eventJson = new PGobject(); @@ -158,4 +167,95 @@ INSERT INTO jobs (uuid, type, created_at, updated_at, namespace_uuid, name, name .first(); }); } + + public static UUID writeJobVersion( + Jdbi jdbi, UUID jobUuid, String location, String jobName, NamespaceRow namespace) + throws SQLException { + return jdbi.withHandle( + h -> { + return h.createQuery( + """ + INSERT INTO job_versions ( + uuid, + created_at, + updated_at, + job_uuid, + location, + version, + job_name, + namespace_uuid, + namespace_name + ) + VALUES ( + :uuid, + :created_at, + :updated_at, + :job_uuid, + :location, + :version, + :job_name, + :namespace_uuid, + :namespace_name + ) + RETURNING uuid + """) + .bind("uuid", UUID.randomUUID()) + .bind("created_at", Instant.now()) + .bind("updated_at", Instant.now()) + .bind("job_uuid", jobUuid) + .bind("location", location) + .bind("version", UUID.randomUUID()) + .bind("job_name", jobUuid) + .bind("namespace_uuid", namespace.getUuid()) + .bind("namespace_name", namespace.getName()) + .mapTo(UUID.class) + .first(); + }); + } + + public static DatasetRow writeDataset(Jdbi jdbi, NamespaceRow namespaceRow, String datasetName) { + DatasetDao datasetDao = jdbi.onDemand(DatasetDao.class); + + SourceRow sourceRow = + jdbi.onDemand(SourceDao.class) + .upsert(UUID.randomUUID(), "type", Instant.now(), "name", "http://a"); + + return datasetDao.upsert( + UUID.randomUUID(), + DatasetType.DB_TABLE, + Instant.now(), + namespaceRow.getUuid(), + namespaceRow.getName(), + sourceRow.getUuid(), + "sourceName", + datasetName, + "", + "", + false); + } + + public static UUID writeJobIOMapping(Jdbi jdbi, UUID jobUuid, UUID datasetUuid) + throws SQLException { + return jdbi.withHandle( + h -> { + return h.createQuery( + """ + INSERT INTO job_versions_io_mapping ( + job_version_uuid, + dataset_uuid, + io_type, + job_uuid, + is_current_job_version + ) + VALUES (:job_version_uuid, :dataset_uuid, :io_type, :job_uuid, TRUE) + RETURNING uuid + """) + .bind("job_version_uuid", UUID.randomUUID()) + .bind("dataset_uuid", Instant.now()) + .bind("io_type", Instant.now()) + .bind("job_uuid", jobUuid) + .mapTo(UUID.class) + .first(); + }); + } } diff --git a/api/src/test/java/marquez/db/JobVersionDaoTest.java b/api/src/test/java/marquez/db/JobVersionDaoTest.java index b36f505693..aa0caaf4fc 100644 --- a/api/src/test/java/marquez/db/JobVersionDaoTest.java +++ b/api/src/test/java/marquez/db/JobVersionDaoTest.java @@ -6,12 +6,16 @@ package marquez.db; import static marquez.Generator.newTimestamp; +import static marquez.common.models.CommonModelGenerator.newDescription; import static marquez.common.models.CommonModelGenerator.newJobName; +import static marquez.common.models.CommonModelGenerator.newJobType; import static marquez.common.models.CommonModelGenerator.newLocation; import static marquez.common.models.CommonModelGenerator.newVersion; import static marquez.db.JobVersionDao.BagOfJobVersionInfo; import static marquez.db.models.DbModelGenerator.newRowUuid; +import static marquez.service.models.ServiceModelGenerator.newInputsWith; import static marquez.service.models.ServiceModelGenerator.newJobMetaWith; +import static marquez.service.models.ServiceModelGenerator.newOutputsWith; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -52,6 +56,7 @@ public class JobVersionDaoTest extends BaseIntegrationTest { static Jdbi jdbiForTesting; static DatasetVersionDao datasetVersionDao; + static DatasetDao datasetDao; static JobDao jobDao; static RunDao runDao; static OpenLineageDao openLineageDao; @@ -63,6 +68,7 @@ public class JobVersionDaoTest extends BaseIntegrationTest { @BeforeAll public static void setUpOnce(final Jdbi jdbi) { jdbiForTesting = jdbi; + datasetDao = jdbiForTesting.onDemand(DatasetDao.class); datasetVersionDao = jdbiForTesting.onDemand(DatasetVersionDao.class); jobDao = jdbi.onDemand(JobDao.class); runDao = jdbi.onDemand(RunDao.class); @@ -190,7 +196,11 @@ public void testGetJobVersion() { .orElseThrow( () -> new IllegalStateException("Can't find test dataset " + ds.getName())); - jobVersionDao.upsertInputDatasetFor(jobVersionRow.getUuid(), dataset.getUuid()); + jobVersionDao.upsertInputDatasetFor( + jobVersionRow.getUuid(), + dataset.getUuid(), + jobVersionRow.getJobUuid(), + jobRow.getSymlinkTargetId()); } for (DatasetId ds : jobMeta.getOutputs()) { DatasetRow dataset = @@ -199,7 +209,11 @@ public void testGetJobVersion() { .orElseThrow( () -> new IllegalStateException("Can't find test dataset " + ds.getName())); - jobVersionDao.upsertOutputDatasetFor(jobVersionRow.getUuid(), dataset.getUuid()); + jobVersionDao.upsertOutputDatasetFor( + jobVersionRow.getUuid(), + dataset.getUuid(), + jobVersionRow.getJobUuid(), + jobRow.getSymlinkTargetId()); } Optional jobVersion = jobVersionDao.findJobVersion(namespaceRow.getName(), jobRow.getName(), version.getValue()); @@ -425,4 +439,123 @@ public void testUpsertRunlessJobVersion() { .extracting(JobVersion::getInputs, InstanceOfAssertFactories.list(UUID.class)) .isNotEmpty(); } + + @Test + public void testUpsertDatasetMarksOtherRowsObsolete() { + // (1) Add a new job; the input and output datasets for the job will also be added. + final JobMeta jobMeta = + new JobMeta( + newJobType(), + newInputsWith(NamespaceName.of(namespaceRow.getName()), 1), + newOutputsWith(NamespaceName.of(namespaceRow.getName()), 1), + newLocation(), + newDescription(), + null); + + final JobRow jobRow = + DbTestUtils.newJobWith( + jdbiForTesting, namespaceRow.getName(), newJobName().getValue(), jobMeta); + + // (2) Get UUID of the datasets + DatasetId inputDatasetId = jobMeta.getInputs().stream().findFirst().get(); + DatasetId outputDatasetId = jobMeta.getOutputs().stream().findFirst().get(); + + UUID inputDatasetUuid = + datasetDao + .getUuid(inputDatasetId.getNamespace().getValue(), inputDatasetId.getName().getValue()) + .get() + .getUuid(); + UUID outputDatasetUuid = + datasetDao + .getUuid( + outputDatasetId.getNamespace().getValue(), outputDatasetId.getName().getValue()) + .get() + .getUuid(); + + // (3) Upsert job version row + UUID jobVersionUuid = + jobVersionDao + .upsertJobVersion( + newRowUuid(), + newTimestamp(), + jobRow.getUuid(), + newLocation().toString(), + UUID.randomUUID(), + jobRow.getName(), + namespaceRow.getUuid(), + namespaceRow.getName()) + .getUuid(); + + // (4) upsert job_versions_io rows for each dataset + jobVersionDao.upsertInputDatasetFor( + jobVersionUuid, inputDatasetUuid, jobRow.getUuid(), jobRow.getSymlinkTargetId()); + jobVersionDao.upsertOutputDatasetFor( + jobVersionUuid, outputDatasetUuid, jobRow.getUuid(), jobRow.getSymlinkTargetId()); + + // (5) there should be 2 rows in job_versions_io_mapping + assertThat( + jdbiForTesting + .withHandle( + h -> + h.createQuery( + "SELECT count(*) as cnt FROM job_versions_io_mapping WHERE job_uuid = :jobUuid AND is_current_job_version = TRUE") + .bind("jobUuid", jobRow.getUuid()) + .map(rv -> rv.getColumn("cnt", Integer.class)) + .one()) + .intValue()) + .isEqualTo(2); + + // (2) Modify job - create a new version of it + UUID newJobVersion = UUID.randomUUID(); + ExtendedJobVersionRow newVersionRow = + DbTestUtils.newJobVersion( + jdbiForTesting, + jobRow.getUuid(), + newJobVersion, + jobRow.getName(), + namespaceRow.getUuid(), + namespaceRow.getName()); + + // (4) upsert job_versions_io rows for each dataset + jobVersionDao.upsertInputDatasetFor( + newVersionRow.getUuid(), + inputDatasetUuid, + jobRow.getUuid(), + jobRow.getUuid()); // for testing use symlink job uuid same as job uuid + jobVersionDao.upsertOutputDatasetFor( + newVersionRow.getUuid(), + outputDatasetUuid, + jobRow.getUuid(), + jobRow.getUuid()); // for testing use symlink job uuid same as job uuid + + // (5) Verify input and output datasets if they are the current ones + assertThat( + jdbiForTesting + .withHandle( + h -> + h.createQuery( + "SELECT count(*) as cnt FROM job_versions_io_mapping WHERE job_uuid = :jobUuid") + .bind("jobUuid", jobRow.getUuid()) + .map(rv -> rv.getColumn("cnt", Integer.class)) + .one()) + .intValue()) + .isEqualTo(4); + + assertThat( + jdbiForTesting + .withHandle( + h -> + h.createQuery( + """ + SELECT count(*) as cnt FROM job_versions_io_mapping + WHERE job_uuid = :jobUuid AND is_current_job_version = TRUE + AND job_symlink_target_uuid = :symlinkTargetId + """) + .bind("jobUuid", jobRow.getUuid()) + .bind("symlinkTargetId", jobRow.getUuid()) + .map(rv -> rv.getColumn("cnt", Integer.class)) + .one()) + .intValue()) + .isEqualTo(2); + } } diff --git a/api/src/test/java/marquez/db/TestingDb.java b/api/src/test/java/marquez/db/TestingDb.java index 0ccb2af65f..a1718d6cc5 100644 --- a/api/src/test/java/marquez/db/TestingDb.java +++ b/api/src/test/java/marquez/db/TestingDb.java @@ -186,8 +186,10 @@ JobVersionRow upsert(@NonNull JobVersionRow row) { row.getJobName(), row.getNamespaceUuid(), row.getNamespaceName()); - row.getInputUuids().forEach(in -> dao.upsertInputDatasetFor(row.getUuid(), in)); - row.getInputUuids().forEach(out -> dao.upsertInputDatasetFor(row.getUuid(), out)); + row.getInputUuids() + .forEach(in -> dao.upsertInputDatasetFor(row.getUuid(), in, row.getJobUuid(), null)); + row.getInputUuids() + .forEach(out -> dao.upsertInputDatasetFor(row.getUuid(), out, row.getJobUuid(), null)); // ... delegate.onDemand(JobDao.class).updateVersionFor(row.getJobUuid(), NOW, upserted.getUuid()); return upserted; diff --git a/api/src/test/java/marquez/db/migrations/V67_2_JobFacetsBackfillJobVersionTest.java b/api/src/test/java/marquez/db/migrations/V67_2_JobFacetsBackfillJobVersionTest.java new file mode 100644 index 0000000000..a47a82c91b --- /dev/null +++ b/api/src/test/java/marquez/db/migrations/V67_2_JobFacetsBackfillJobVersionTest.java @@ -0,0 +1,176 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.db.migrations; + +import static marquez.db.BackfillTestUtils.writeDataset; +import static marquez.db.BackfillTestUtils.writeJob; +import static marquez.db.BackfillTestUtils.writeJobVersion; +import static marquez.db.LineageTestUtils.NAMESPACE; +import static org.assertj.core.api.Assertions.assertThat; + +import com.fasterxml.jackson.core.JsonProcessingException; +import java.sql.Connection; +import java.sql.SQLException; +import java.time.Instant; +import java.util.UUID; +import lombok.extern.slf4j.Slf4j; +import marquez.db.JobVersionDao.IoType; +import marquez.db.NamespaceDao; +import marquez.db.OpenLineageDao; +import marquez.db.models.DatasetRow; +import marquez.db.models.NamespaceRow; +import marquez.jdbi.JdbiExternalPostgresExtension.FlywaySkipRepeatable; +import marquez.jdbi.JdbiExternalPostgresExtension.FlywayTarget; +import marquez.jdbi.MarquezJdbiExternalPostgresExtension; +import org.flywaydb.core.api.configuration.Configuration; +import org.flywaydb.core.api.migration.Context; +import org.jdbi.v3.core.Jdbi; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +/** + * Test to validate if a job_uuid, job_symlink_target_uuid and is_current_job_version are filled + * properly within job_versions_io_mapping table + */ +@ExtendWith(MarquezJdbiExternalPostgresExtension.class) +@FlywayTarget("67.2") +@FlywaySkipRepeatable() +@Slf4j +public class V67_2_JobFacetsBackfillJobVersionTest { + + private static V67_2_JobVersionsIOMappingBackfillJob migration = + new V67_2_JobVersionsIOMappingBackfillJob(); + static Jdbi jdbi; + private static OpenLineageDao openLineageDao; + + @BeforeAll + public static void setUpOnce(Jdbi jdbi) { + V67_2_JobFacetsBackfillJobVersionTest.jdbi = jdbi; + openLineageDao = jdbi.onDemand(OpenLineageDao.class); + } + + @Test + public void testBackFill() throws SQLException, JsonProcessingException { + NamespaceDao namespaceDao = jdbi.onDemand(NamespaceDao.class); + Instant now = Instant.now(); + NamespaceRow namespace = + namespaceDao.upsertNamespaceRow(UUID.randomUUID(), now, NAMESPACE, "me"); + + // (1) Write a job + UUID symlinkJobUuid = writeJob(jdbi, "symlink", now, namespace); + UUID jobUuid = writeJob(jdbi, "job", now, namespace); + + // (2) Write a job version + UUID oldJobVersion = writeJobVersion(jdbi, jobUuid, "location", "job", namespace); + UUID currentJobVersion = writeJobVersion(jdbi, jobUuid, "location", "job", namespace); + + jdbi.withHandle( + h -> + h.createUpdate( + """ + UPDATE jobs + SET current_version_uuid = :current_version_uuid, symlink_target_uuid = :symlink_target_uuid + WHERE uuid = :job_uuid + """) + .bind("current_version_uuid", currentJobVersion) + .bind("job_uuid", jobUuid) + .bind("symlink_target_uuid", symlinkJobUuid) + .execute()); + + // (3) Write a dataset + DatasetRow dataset = writeDataset(jdbi, namespace, "some_dataset"); + + // (4) Write a job io mapping + insertJobIOMapping(oldJobVersion, dataset); + insertJobIOMapping(currentJobVersion, dataset); + + // (5) Run Migration + runMigration(); + + // (4) Verify job_version column in job_facets table is updated + assertThat( + jdbi.withHandle( + h -> + h.createQuery( + """ + SELECT count(*) FROM job_versions_io_mapping + WHERE job_version_uuid = :job_version_uuid + AND job_uuid = :job_uuid + AND is_current_job_version = TRUE + AND job_symlink_target_uuid = :symlink_target_uuid + """) + .bind("job_version_uuid", currentJobVersion) + .bind("job_uuid", jobUuid) + .bind("symlink_target_uuid", symlinkJobUuid) + .mapTo(Integer.class) + .findFirst()) + .get()) + .isEqualTo(1); + + assertThat( + jdbi.withHandle( + h -> + h.createQuery( + """ + SELECT count(*) FROM job_versions_io_mapping + WHERE job_version_uuid = :job_version_uuid + AND job_uuid = :job_uuid + AND is_current_job_version = FALSE + AND job_symlink_target_uuid = :symlink_target_uuid + """) + .bind("job_version_uuid", oldJobVersion) + .bind("job_uuid", jobUuid) + .bind("symlink_target_uuid", symlinkJobUuid) + .mapTo(Integer.class) + .findFirst()) + .get()) + .isEqualTo(1); + } + + private static void insertJobIOMapping(UUID jobVersion, DatasetRow dataset) { + jdbi.withHandle( + h -> { + return h.createQuery( + """ + INSERT INTO job_versions_io_mapping ( + job_version_uuid, dataset_uuid, io_type) + VALUES (:job_version_uuid, :dataset_uuid, :io_type) + ON CONFLICT (job_version_uuid, dataset_uuid, io_type, job_uuid) DO UPDATE SET is_current_job_version = TRUE + RETURNING job_version_uuid + """) + .bind("job_version_uuid", jobVersion) + .bind("dataset_uuid", dataset.getUuid()) + .bind("io_type", IoType.OUTPUT) + .mapTo(UUID.class) + .first(); + }); + } + + private static void runMigration() { + jdbi.useHandle( + handle -> { + try { + Context context = + new Context() { + @Override + public Configuration getConfiguration() { + return null; + } + + @Override + public Connection getConnection() { + return handle.getConnection(); + } + }; + // apply migrations in order + new V67_2_JobVersionsIOMappingBackfillJob().migrate(context); + } catch (Exception e) { + throw new AssertionError("Unable to execute migration", e); + } + }); + } +}