Skip to content

Commit

Permalink
Runless events - run upsert builder
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <[email protected]>
  • Loading branch information
pawel-big-lebowski committed Oct 24, 2023
1 parent c4c3698 commit bf33c31
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 86 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

### Added
* API: support `DatasetEvent` [`#2641`](https://github.com/MarquezProject/marquez/pull/2641) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski)
* Save into Marquez model datasets sent via `DatasetEvent` event type
*Save into Marquez model datasets sent via `DatasetEvent` event type

## [0.41.0](https://github.com/MarquezProject/marquez/compare/0.40.0...0.41.0) - 2023-09-20
### Added
Expand Down
152 changes: 67 additions & 85 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import marquez.common.models.SourceType;
import marquez.db.DatasetFieldDao.DatasetFieldMapping;
import marquez.db.JobVersionDao.BagOfJobVersionInfo;
import marquez.db.RunDao.RunUpsert;
import marquez.db.RunDao.RunUpsert.RunUpsertBuilder;
import marquez.db.mappers.LineageEventMapper;
import marquez.db.models.ColumnLineageRow;
import marquez.db.models.DatasetFieldRow;
Expand Down Expand Up @@ -149,7 +151,6 @@ default UpdateLineageRow updateMarquezModel(LineageEvent event, ObjectMapper map

default UpdateLineageRow updateMarquezModel(DatasetEvent event, ObjectMapper mapper) {
daos.initBaseDao(this);

Instant now = event.getEventTime().withZoneSameInstant(ZoneId.of("UTC")).toInstant();

UpdateLineageRow bag = new UpdateLineageRow();
Expand All @@ -164,9 +165,9 @@ default UpdateLineageRow updateMarquezModel(DatasetEvent event, ObjectMapper map

Dataset dataset = event.getDataset();
List<DatasetRecord> datasetOutputs = new ArrayList<>();
DatasetRecord record = upsertLineageDataset(dataset, now, null, false, daos);
DatasetRecord record = upsertLineageDataset(dataset, now, null, false);
datasetOutputs.add(record);
insertOutputFacets(dataset, record, null, null, now, daos);
insertOutputFacets(dataset, record, null, null, now);

daos.getDatasetDao()
.updateVersion(
Expand All @@ -192,18 +193,8 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
DEFAULT_NAMESPACE_OWNER);
bag.setNamespace(namespace);

Instant nominalStartTime =
Optional.ofNullable(event.getRun().getFacets())
.flatMap(f -> Optional.ofNullable(f.getNominalTime()))
.map(NominalTimeRunFacet::getNominalStartTime)
.map(t -> t.withZoneSameInstant(ZoneId.of("UTC")).toInstant())
.orElse(null);
Instant nominalEndTime =
Optional.ofNullable(event.getRun().getFacets())
.flatMap(f -> Optional.ofNullable(f.getNominalTime()))
.map(NominalTimeRunFacet::getNominalEndTime)
.map(t -> t.withZoneSameInstant(ZoneId.of("UTC")).toInstant())
.orElse(null);
Instant nominalStartTime = getNominalStartTime(event);
Instant nominalEndTime = getNominalEndTime(event);

Optional<ParentRunFacet> parentRun =
Optional.ofNullable(event.getRun()).map(Run::getFacets).map(RunFacet::getParent);
Expand All @@ -230,51 +221,25 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
bag.setRunArgs(runArgs);

final UUID runUuid = runToUuid(event.getRun().getRunId());

RunRow run;
RunUpsertBuilder runUpsertBuilder =
RunUpsert.builder()
.runUuid(runUuid)
.parentRunUuid(parentUuid.orElse(null))
.externalId(event.getRun().getRunId())
.now(now)
.jobUuid(job.getUuid())
.jobVersionUuid(null)
.runArgsUuid(runArgs.getUuid())
.namespaceName(namespace.getName())
.jobName(job.getName())
.location(job.getLocation());

if (event.getEventType() != null) {
RunState runStateType = getRunState(event.getEventType());
run =
daos.getRunDao()
.upsert(
runUuid,
parentUuid.orElse(null),
event.getRun().getRunId(),
now,
job.getUuid(),
null,
runArgs.getUuid(),
nominalStartTime,
nominalEndTime,
runStateType,
now,
namespace.getName(),
job.getName(),
job.getLocation());
// Add ...
Optional.ofNullable(event.getRun().getFacets())
.ifPresent(
runFacet ->
daos.getRunFacetsDao()
.insertRunFacetsFor(
runUuid, now, event.getEventType(), event.getRun().getFacets()));
} else {
run =
daos.getRunDao()
.upsert(
runUuid,
parentUuid.orElse(null),
event.getRun().getRunId(),
now,
job.getUuid(),
null,
runArgs.getUuid(),
nominalStartTime,
nominalEndTime,
namespace.getName(),
job.getName(),
job.getLocation());
runUpsertBuilder.runStateType(getRunState(event.getEventType())).runStateTime(now);
}
run = daos.getRunDao().upsert(runUpsertBuilder.build());
insertRunFacets(event, runUuid, now);
bag.setRun(run);

if (event.getEventType() != null) {
Expand All @@ -290,51 +255,73 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
}
}

// Add ...
Optional.ofNullable(event.getJob().getFacets())
.ifPresent(
jobFacet ->
daos.getJobFacetsDao()
.insertJobFacetsFor(
job.getUuid(),
runUuid,
now,
event.getEventType(),
event.getJob().getFacets()));
insertJobFacets(event, job.getUuid(), runUuid, now);

// RunInput list uses null as a sentinel value
List<DatasetRecord> datasetInputs = null;
if (event.getInputs() != null) {
datasetInputs = new ArrayList<>();
for (Dataset dataset : event.getInputs()) {
DatasetRecord record = upsertLineageDataset(dataset, now, runUuid, true, daos);
DatasetRecord record = upsertLineageDataset(dataset, now, runUuid, true);
datasetInputs.add(record);
insertInputFacets(dataset, record, runUuid, event.getEventType(), now, daos);
insertInputFacets(dataset, record, runUuid, event.getEventType(), now);
}
}
bag.setInputs(Optional.ofNullable(datasetInputs));

// RunInput list uses null as a sentinel value
List<DatasetRecord> datasetOutputs = null;
if (event.getOutputs() != null) {
datasetOutputs = new ArrayList<>();
for (Dataset dataset : event.getOutputs()) {
DatasetRecord record = upsertLineageDataset(dataset, now, runUuid, false, daos);
DatasetRecord record = upsertLineageDataset(dataset, now, runUuid, false);
datasetOutputs.add(record);
insertOutputFacets(dataset, record, runUuid, event.getEventType(), now, daos);
insertOutputFacets(dataset, record, runUuid, event.getEventType(), now);
}
}

bag.setOutputs(Optional.ofNullable(datasetOutputs));
return bag;
}

private static Instant getNominalStartTime(LineageEvent event) {
return Optional.ofNullable(event.getRun().getFacets())
.flatMap(f -> Optional.ofNullable(f.getNominalTime()))
.map(NominalTimeRunFacet::getNominalStartTime)
.map(t -> t.withZoneSameInstant(ZoneId.of("UTC")).toInstant())
.orElse(null);
}

private static Instant getNominalEndTime(LineageEvent event) {
return Optional.ofNullable(event.getRun().getFacets())
.flatMap(f -> Optional.ofNullable(f.getNominalTime()))
.map(NominalTimeRunFacet::getNominalEndTime)
.map(t -> t.withZoneSameInstant(ZoneId.of("UTC")).toInstant())
.orElse(null);
}

private void insertRunFacets(LineageEvent event, UUID runUuid, Instant now) {
// Add ...
Optional.ofNullable(event.getRun().getFacets())
.ifPresent(
runFacet ->
daos.getRunFacetsDao()
.insertRunFacetsFor(
runUuid, now, event.getEventType(), event.getRun().getFacets()));
}

private void insertJobFacets(LineageEvent event, UUID jobUuid, UUID runUuid, Instant now) {
// Add ...
Optional.ofNullable(event.getJob().getFacets())
.ifPresent(
jobFacet ->
daos.getJobFacetsDao()
.insertJobFacetsFor(
jobUuid, runUuid, now, event.getEventType(), event.getJob().getFacets()));
}

private void insertInputFacets(
Dataset dataset,
DatasetRecord record,
UUID runUuid,
String eventType,
Instant now,
ModelDaos daos) {
Dataset dataset, DatasetRecord record, UUID runUuid, String eventType, Instant now) {
// Facets ...
Optional.ofNullable(dataset.getFacets())
.ifPresent(
Expand Down Expand Up @@ -363,12 +350,7 @@ private void insertInputFacets(
}

private void insertOutputFacets(
Dataset dataset,
DatasetRecord record,
UUID runUuid,
String eventType,
Instant now,
ModelDaos daos) {
Dataset dataset, DatasetRecord record, UUID runUuid, String eventType, Instant now) {
// Facets ...
Optional.ofNullable(dataset.getFacets())
.ifPresent(
Expand Down Expand Up @@ -665,7 +647,7 @@ default JobType getJobType(Job job) {
}

default DatasetRecord upsertLineageDataset(
Dataset ds, Instant now, UUID runUuid, boolean isInput, ModelDaos daos) {
Dataset ds, Instant now, UUID runUuid, boolean isInput) {
NamespaceRow dsNamespace =
daos.getNamespaceDao()
.upsertNamespaceRow(UUID.randomUUID(), now, ds.getNamespace(), DEFAULT_NAMESPACE_OWNER);
Expand Down
52 changes: 52 additions & 0 deletions api/src/main/java/marquez/db/RunDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import lombok.Builder;
import lombok.NonNull;
import marquez.common.Utils;
import marquez.common.models.DatasetId;
Expand Down Expand Up @@ -301,6 +302,40 @@ RunRow upsert(
String jobName,
String location);

default RunRow upsert(RunUpsert runUpsert) {
if (runUpsert.runStateType == null) {
return upsert(
runUpsert.runUuid(),
runUpsert.parentRunUuid(),
runUpsert.externalId(),
runUpsert.now(),
runUpsert.jobUuid(),
runUpsert.jobVersionUuid(),
runUpsert.runArgsUuid(),
runUpsert.nominalStartTime(),
runUpsert.nominalEndTime(),
runUpsert.namespaceName(),
runUpsert.jobName(),
runUpsert.location());
} else {
return upsert(
runUpsert.runUuid(),
runUpsert.parentRunUuid(),
runUpsert.externalId(),
runUpsert.now(),
runUpsert.jobUuid(),
runUpsert.jobVersionUuid(),
runUpsert.runArgsUuid(),
runUpsert.nominalStartTime(),
runUpsert.nominalEndTime(),
runUpsert.runStateType(),
runUpsert.runStateTime(),
runUpsert.namespaceName(),
runUpsert.jobName(),
runUpsert.location());
}
}

@SqlUpdate(
"INSERT INTO runs_input_mapping (run_uuid, dataset_version_uuid) "
+ "VALUES (:runUuid, :datasetVersionUuid) ON CONFLICT DO NOTHING")
Expand Down Expand Up @@ -452,4 +487,21 @@ default RunRow upsertRunMeta(
)
""")
Optional<Run> findByLatestJob(String namespace, String jobName);

@Builder
record RunUpsert(
UUID runUuid,
UUID parentRunUuid,
String externalId,
Instant now,
UUID jobUuid,
UUID jobVersionUuid,
UUID runArgsUuid,
Instant nominalStartTime,
Instant nominalEndTime,
RunState runStateType,
Instant runStateTime,
String namespaceName,
String jobName,
String location) {}
}
5 changes: 5 additions & 0 deletions api/src/main/java/marquez/db/models/ModelDaos.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
/*
* Copyright 2018-2023 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.db.models;

import marquez.db.BaseDao;
Expand Down

0 comments on commit bf33c31

Please sign in to comment.