From bf33c31c176fa499e330c3100ac5e93b579ae8dd Mon Sep 17 00:00:00 2001 From: Pawel Leszczynski Date: Tue, 24 Oct 2023 09:45:41 +0200 Subject: [PATCH] Runless events - run upsert builder Signed-off-by: Pawel Leszczynski --- CHANGELOG.md | 2 +- .../main/java/marquez/db/OpenLineageDao.java | 152 ++++++++---------- api/src/main/java/marquez/db/RunDao.java | 52 ++++++ .../java/marquez/db/models/ModelDaos.java | 5 + 4 files changed, 125 insertions(+), 86 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 44c1fea7da..8380a08a0c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,7 @@ ### Added * API: support `DatasetEvent` [`#2641`](https://github.com/MarquezProject/marquez/pull/2641) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski) - * Save into Marquez model datasets sent via `DatasetEvent` event type + *Save into Marquez model datasets sent via `DatasetEvent` event type ## [0.41.0](https://github.com/MarquezProject/marquez/compare/0.40.0...0.41.0) - 2023-09-20 ### Added diff --git a/api/src/main/java/marquez/db/OpenLineageDao.java b/api/src/main/java/marquez/db/OpenLineageDao.java index 149af0457a..28a2e07d1b 100644 --- a/api/src/main/java/marquez/db/OpenLineageDao.java +++ b/api/src/main/java/marquez/db/OpenLineageDao.java @@ -33,6 +33,8 @@ import marquez.common.models.SourceType; import marquez.db.DatasetFieldDao.DatasetFieldMapping; import marquez.db.JobVersionDao.BagOfJobVersionInfo; +import marquez.db.RunDao.RunUpsert; +import marquez.db.RunDao.RunUpsert.RunUpsertBuilder; import marquez.db.mappers.LineageEventMapper; import marquez.db.models.ColumnLineageRow; import marquez.db.models.DatasetFieldRow; @@ -149,7 +151,6 @@ default UpdateLineageRow updateMarquezModel(LineageEvent event, ObjectMapper map default UpdateLineageRow updateMarquezModel(DatasetEvent event, ObjectMapper mapper) { daos.initBaseDao(this); - Instant now = event.getEventTime().withZoneSameInstant(ZoneId.of("UTC")).toInstant(); UpdateLineageRow bag = new UpdateLineageRow(); @@ -164,9 +165,9 @@ default UpdateLineageRow updateMarquezModel(DatasetEvent event, ObjectMapper map Dataset dataset = event.getDataset(); List datasetOutputs = new ArrayList<>(); - DatasetRecord record = upsertLineageDataset(dataset, now, null, false, daos); + DatasetRecord record = upsertLineageDataset(dataset, now, null, false); datasetOutputs.add(record); - insertOutputFacets(dataset, record, null, null, now, daos); + insertOutputFacets(dataset, record, null, null, now); daos.getDatasetDao() .updateVersion( @@ -192,18 +193,8 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper DEFAULT_NAMESPACE_OWNER); bag.setNamespace(namespace); - Instant nominalStartTime = - Optional.ofNullable(event.getRun().getFacets()) - .flatMap(f -> Optional.ofNullable(f.getNominalTime())) - .map(NominalTimeRunFacet::getNominalStartTime) - .map(t -> t.withZoneSameInstant(ZoneId.of("UTC")).toInstant()) - .orElse(null); - Instant nominalEndTime = - Optional.ofNullable(event.getRun().getFacets()) - .flatMap(f -> Optional.ofNullable(f.getNominalTime())) - .map(NominalTimeRunFacet::getNominalEndTime) - .map(t -> t.withZoneSameInstant(ZoneId.of("UTC")).toInstant()) - .orElse(null); + Instant nominalStartTime = getNominalStartTime(event); + Instant nominalEndTime = getNominalEndTime(event); Optional parentRun = Optional.ofNullable(event.getRun()).map(Run::getFacets).map(RunFacet::getParent); @@ -230,51 +221,25 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper bag.setRunArgs(runArgs); final UUID runUuid = runToUuid(event.getRun().getRunId()); - RunRow run; + RunUpsertBuilder runUpsertBuilder = + RunUpsert.builder() + .runUuid(runUuid) + .parentRunUuid(parentUuid.orElse(null)) + .externalId(event.getRun().getRunId()) + .now(now) + .jobUuid(job.getUuid()) + .jobVersionUuid(null) + .runArgsUuid(runArgs.getUuid()) + .namespaceName(namespace.getName()) + .jobName(job.getName()) + .location(job.getLocation()); + if (event.getEventType() != null) { - RunState runStateType = getRunState(event.getEventType()); - run = - daos.getRunDao() - .upsert( - runUuid, - parentUuid.orElse(null), - event.getRun().getRunId(), - now, - job.getUuid(), - null, - runArgs.getUuid(), - nominalStartTime, - nominalEndTime, - runStateType, - now, - namespace.getName(), - job.getName(), - job.getLocation()); - // Add ... - Optional.ofNullable(event.getRun().getFacets()) - .ifPresent( - runFacet -> - daos.getRunFacetsDao() - .insertRunFacetsFor( - runUuid, now, event.getEventType(), event.getRun().getFacets())); - } else { - run = - daos.getRunDao() - .upsert( - runUuid, - parentUuid.orElse(null), - event.getRun().getRunId(), - now, - job.getUuid(), - null, - runArgs.getUuid(), - nominalStartTime, - nominalEndTime, - namespace.getName(), - job.getName(), - job.getLocation()); + runUpsertBuilder.runStateType(getRunState(event.getEventType())).runStateTime(now); } + run = daos.getRunDao().upsert(runUpsertBuilder.build()); + insertRunFacets(event, runUuid, now); bag.setRun(run); if (event.getEventType() != null) { @@ -290,37 +255,28 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper } } - // Add ... - Optional.ofNullable(event.getJob().getFacets()) - .ifPresent( - jobFacet -> - daos.getJobFacetsDao() - .insertJobFacetsFor( - job.getUuid(), - runUuid, - now, - event.getEventType(), - event.getJob().getFacets())); + insertJobFacets(event, job.getUuid(), runUuid, now); // RunInput list uses null as a sentinel value List datasetInputs = null; if (event.getInputs() != null) { datasetInputs = new ArrayList<>(); for (Dataset dataset : event.getInputs()) { - DatasetRecord record = upsertLineageDataset(dataset, now, runUuid, true, daos); + DatasetRecord record = upsertLineageDataset(dataset, now, runUuid, true); datasetInputs.add(record); - insertInputFacets(dataset, record, runUuid, event.getEventType(), now, daos); + insertInputFacets(dataset, record, runUuid, event.getEventType(), now); } } bag.setInputs(Optional.ofNullable(datasetInputs)); + // RunInput list uses null as a sentinel value List datasetOutputs = null; if (event.getOutputs() != null) { datasetOutputs = new ArrayList<>(); for (Dataset dataset : event.getOutputs()) { - DatasetRecord record = upsertLineageDataset(dataset, now, runUuid, false, daos); + DatasetRecord record = upsertLineageDataset(dataset, now, runUuid, false); datasetOutputs.add(record); - insertOutputFacets(dataset, record, runUuid, event.getEventType(), now, daos); + insertOutputFacets(dataset, record, runUuid, event.getEventType(), now); } } @@ -328,13 +284,44 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper return bag; } + private static Instant getNominalStartTime(LineageEvent event) { + return Optional.ofNullable(event.getRun().getFacets()) + .flatMap(f -> Optional.ofNullable(f.getNominalTime())) + .map(NominalTimeRunFacet::getNominalStartTime) + .map(t -> t.withZoneSameInstant(ZoneId.of("UTC")).toInstant()) + .orElse(null); + } + + private static Instant getNominalEndTime(LineageEvent event) { + return Optional.ofNullable(event.getRun().getFacets()) + .flatMap(f -> Optional.ofNullable(f.getNominalTime())) + .map(NominalTimeRunFacet::getNominalEndTime) + .map(t -> t.withZoneSameInstant(ZoneId.of("UTC")).toInstant()) + .orElse(null); + } + + private void insertRunFacets(LineageEvent event, UUID runUuid, Instant now) { + // Add ... + Optional.ofNullable(event.getRun().getFacets()) + .ifPresent( + runFacet -> + daos.getRunFacetsDao() + .insertRunFacetsFor( + runUuid, now, event.getEventType(), event.getRun().getFacets())); + } + + private void insertJobFacets(LineageEvent event, UUID jobUuid, UUID runUuid, Instant now) { + // Add ... + Optional.ofNullable(event.getJob().getFacets()) + .ifPresent( + jobFacet -> + daos.getJobFacetsDao() + .insertJobFacetsFor( + jobUuid, runUuid, now, event.getEventType(), event.getJob().getFacets())); + } + private void insertInputFacets( - Dataset dataset, - DatasetRecord record, - UUID runUuid, - String eventType, - Instant now, - ModelDaos daos) { + Dataset dataset, DatasetRecord record, UUID runUuid, String eventType, Instant now) { // Facets ... Optional.ofNullable(dataset.getFacets()) .ifPresent( @@ -363,12 +350,7 @@ private void insertInputFacets( } private void insertOutputFacets( - Dataset dataset, - DatasetRecord record, - UUID runUuid, - String eventType, - Instant now, - ModelDaos daos) { + Dataset dataset, DatasetRecord record, UUID runUuid, String eventType, Instant now) { // Facets ... Optional.ofNullable(dataset.getFacets()) .ifPresent( @@ -665,7 +647,7 @@ default JobType getJobType(Job job) { } default DatasetRecord upsertLineageDataset( - Dataset ds, Instant now, UUID runUuid, boolean isInput, ModelDaos daos) { + Dataset ds, Instant now, UUID runUuid, boolean isInput) { NamespaceRow dsNamespace = daos.getNamespaceDao() .upsertNamespaceRow(UUID.randomUUID(), now, ds.getNamespace(), DEFAULT_NAMESPACE_OWNER); diff --git a/api/src/main/java/marquez/db/RunDao.java b/api/src/main/java/marquez/db/RunDao.java index fa22eba479..32c39cf4a2 100644 --- a/api/src/main/java/marquez/db/RunDao.java +++ b/api/src/main/java/marquez/db/RunDao.java @@ -14,6 +14,7 @@ import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; +import lombok.Builder; import lombok.NonNull; import marquez.common.Utils; import marquez.common.models.DatasetId; @@ -301,6 +302,40 @@ RunRow upsert( String jobName, String location); + default RunRow upsert(RunUpsert runUpsert) { + if (runUpsert.runStateType == null) { + return upsert( + runUpsert.runUuid(), + runUpsert.parentRunUuid(), + runUpsert.externalId(), + runUpsert.now(), + runUpsert.jobUuid(), + runUpsert.jobVersionUuid(), + runUpsert.runArgsUuid(), + runUpsert.nominalStartTime(), + runUpsert.nominalEndTime(), + runUpsert.namespaceName(), + runUpsert.jobName(), + runUpsert.location()); + } else { + return upsert( + runUpsert.runUuid(), + runUpsert.parentRunUuid(), + runUpsert.externalId(), + runUpsert.now(), + runUpsert.jobUuid(), + runUpsert.jobVersionUuid(), + runUpsert.runArgsUuid(), + runUpsert.nominalStartTime(), + runUpsert.nominalEndTime(), + runUpsert.runStateType(), + runUpsert.runStateTime(), + runUpsert.namespaceName(), + runUpsert.jobName(), + runUpsert.location()); + } + } + @SqlUpdate( "INSERT INTO runs_input_mapping (run_uuid, dataset_version_uuid) " + "VALUES (:runUuid, :datasetVersionUuid) ON CONFLICT DO NOTHING") @@ -452,4 +487,21 @@ default RunRow upsertRunMeta( ) """) Optional findByLatestJob(String namespace, String jobName); + + @Builder + record RunUpsert( + UUID runUuid, + UUID parentRunUuid, + String externalId, + Instant now, + UUID jobUuid, + UUID jobVersionUuid, + UUID runArgsUuid, + Instant nominalStartTime, + Instant nominalEndTime, + RunState runStateType, + Instant runStateTime, + String namespaceName, + String jobName, + String location) {} } diff --git a/api/src/main/java/marquez/db/models/ModelDaos.java b/api/src/main/java/marquez/db/models/ModelDaos.java index 307d789f41..1e2f2f8f85 100644 --- a/api/src/main/java/marquez/db/models/ModelDaos.java +++ b/api/src/main/java/marquez/db/models/ModelDaos.java @@ -1,3 +1,8 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + package marquez.db.models; import marquez.db.BaseDao;