From dde889f25fa5226e2863759c81d3ea4886ff190e Mon Sep 17 00:00:00 2001 From: Pawel Leszczynski Date: Thu, 5 Oct 2023 15:35:02 +0200 Subject: [PATCH 1/6] Runless events - consume dataset event Signed-off-by: Pawel Leszczynski --- CHANGELOG.md | 4 + .../java/marquez/api/OpenLineageResource.java | 27 +++--- .../java/marquez/db/DatasetFacetsDao.java | 5 +- .../main/java/marquez/db/OpenLineageDao.java | 86 ++++++++++++++++++- .../marquez/service/OpenLineageService.java | 25 ++++++ ...set_facets_lineage_event_type_nullable.sql | 1 + .../marquez/OpenLineageIntegrationTest.java | 34 +++++--- .../java/marquez/db/LineageTestUtils.java | 39 +++++++++ .../java/marquez/db/OpenLineageDaoTest.java | 16 ++++ .../OpenLineageServiceIntegrationTest.java | 45 ++++++++++ 10 files changed, 259 insertions(+), 23 deletions(-) create mode 100644 api/src/main/resources/marquez/db/migration/V65__dataset_facets_lineage_event_type_nullable.sql diff --git a/CHANGELOG.md b/CHANGELOG.md index 13c8a535c3..52e1a2074a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,10 @@ API: remove usage of `current_job_context_uuid` column [`#2622`](https://github. Web: fix Unix epoch time display for null `endedAt` values [`#2647`](https://github.com/MarquezProject/marquez/pull/2647) [@merobi-hub](https://github.com/merobi-hub) *Fixes the issue of the GUI displaying Unix epoch time (midnight on January 1, 1970) in the case of running jobs/null `endedAt` values.* +### Added +* API: support `DatasetEvent` [`#2641`](https://github.com/MarquezProject/marquez/pull/2641) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski) + * Save into Marquez model datasets sent via `DatasetEvent` event type + ## [0.41.0](https://github.com/MarquezProject/marquez/compare/0.40.0...0.41.0) - 2023-09-20 ### Added * API: add support for the following parameters in the `SearchDao` [`#2556`](https://github.com/MarquezProject/marquez/pull/2556) [@tati](https://github.com/tati) [@wslulciuc](https://github.com/wslulciuc) diff --git a/api/src/main/java/marquez/api/OpenLineageResource.java b/api/src/main/java/marquez/api/OpenLineageResource.java index 259a500a53..3c37892b8f 100644 --- a/api/src/main/java/marquez/api/OpenLineageResource.java +++ b/api/src/main/java/marquez/api/OpenLineageResource.java @@ -39,6 +39,8 @@ import marquez.db.OpenLineageDao; import marquez.service.ServiceFactory; import marquez.service.models.BaseEvent; +import marquez.service.models.DatasetEvent; +import marquez.service.models.JobEvent; import marquez.service.models.LineageEvent; import marquez.service.models.NodeId; @@ -67,16 +69,12 @@ public void create(@Valid @NotNull BaseEvent event, @Suspended final AsyncRespon if (event instanceof LineageEvent) { openLineageService .createAsync((LineageEvent) event) - .whenComplete( - (result, err) -> { - if (err != null) { - log.error("Unexpected error while processing request", err); - asyncResponse.resume(Response.status(determineStatusCode(err)).build()); - } else { - asyncResponse.resume(Response.status(201).build()); - } - }); - } else { + .whenComplete((result, err) -> onComplete(result, err, asyncResponse)); + } else if (event instanceof DatasetEvent) { + openLineageService + .createAsync((DatasetEvent) event) + .whenComplete((result, err) -> onComplete(result, err, asyncResponse)); + } else if (event instanceof JobEvent) { log.warn("Unsupported event type {}. Skipping without error", event.getClass().getName()); // return serialized event @@ -84,6 +82,15 @@ public void create(@Valid @NotNull BaseEvent event, @Suspended final AsyncRespon } } + private void onComplete(Void result, Throwable err, AsyncResponse asyncResponse) { + if (err != null) { + log.error("Unexpected error while processing request", err); + asyncResponse.resume(Response.status(determineStatusCode(err)).build()); + } else { + asyncResponse.resume(Response.status(201).build()); + } + } + private int determineStatusCode(Throwable e) { if (e instanceof CompletionException) { return determineStatusCode(e.getCause()); diff --git a/api/src/main/java/marquez/db/DatasetFacetsDao.java b/api/src/main/java/marquez/db/DatasetFacetsDao.java index 679a9bfaa3..ce4a2556cd 100644 --- a/api/src/main/java/marquez/db/DatasetFacetsDao.java +++ b/api/src/main/java/marquez/db/DatasetFacetsDao.java @@ -12,6 +12,7 @@ import java.util.Spliterators; import java.util.UUID; import java.util.stream.StreamSupport; +import javax.annotation.Nullable; import lombok.NonNull; import marquez.common.Utils; import marquez.service.models.LineageEvent; @@ -126,9 +127,9 @@ void insertDatasetFacet( default void insertDatasetFacetsFor( @NonNull UUID datasetUuid, @NonNull UUID datasetVersionUuid, - @NonNull UUID runUuid, + @Nullable UUID runUuid, @NonNull Instant lineageEventTime, - @NonNull String lineageEventType, + @Nullable String lineageEventType, @NonNull LineageEvent.DatasetFacets datasetFacets) { final Instant now = Instant.now(); diff --git a/api/src/main/java/marquez/db/OpenLineageDao.java b/api/src/main/java/marquez/db/OpenLineageDao.java index c6f7559cec..41111ae55f 100644 --- a/api/src/main/java/marquez/db/OpenLineageDao.java +++ b/api/src/main/java/marquez/db/OpenLineageDao.java @@ -48,6 +48,8 @@ import marquez.db.models.SourceRow; import marquez.db.models.UpdateLineageRow; import marquez.db.models.UpdateLineageRow.DatasetRecord; +import marquez.service.models.BaseEvent; +import marquez.service.models.DatasetEvent; import marquez.service.models.LineageEvent; import marquez.service.models.LineageEvent.Dataset; import marquez.service.models.LineageEvent.DatasetFacets; @@ -93,6 +95,14 @@ void createLineageEvent( PGobject event, String producer); + @SqlUpdate( + "INSERT INTO lineage_events (" + + "event_time, " + + "event, " + + "producer) " + + "VALUES (?, ?, ?)") + void createDatasetEvent(Instant eventTime, PGobject event, String producer); + @SqlQuery("SELECT event FROM lineage_events WHERE run_uuid = :runUuid") List findLineageEventsByRunUuid(UUID runUuid); @@ -135,6 +145,80 @@ default UpdateLineageRow updateMarquezModel(LineageEvent event, ObjectMapper map return updateLineageRow; } + default UpdateLineageRow updateMarquezModel(DatasetEvent event, ObjectMapper mapper) { + NamespaceDao namespaceDao = createNamespaceDao(); + DatasetSymlinkDao datasetSymlinkDao = createDatasetSymlinkDao(); + DatasetDao datasetDao = createDatasetDao(); + SourceDao sourceDao = createSourceDao(); + DatasetVersionDao datasetVersionDao = createDatasetVersionDao(); + DatasetFieldDao datasetFieldDao = createDatasetFieldDao(); + RunDao runDao = createRunDao(); + DatasetFacetsDao datasetFacetsDao = createDatasetFacetsDao(); + ColumnLineageDao columnLineageDao = createColumnLineageDao(); + + Instant now = event.getEventTime().withZoneSameInstant(ZoneId.of("UTC")).toInstant(); + + UpdateLineageRow bag = new UpdateLineageRow(); + NamespaceRow namespace = + namespaceDao.upsertNamespaceRow( + UUID.randomUUID(), + now, + formatNamespaceName(event.getDataset().getNamespace()), + DEFAULT_NAMESPACE_OWNER); + bag.setNamespace(namespace); + + Dataset dataset = event.getDataset(); + List datasetOutputs = new ArrayList<>(); + DatasetRecord record = + upsertLineageDataset( + dataset, + now, + null, + false, + namespaceDao, + datasetSymlinkDao, + sourceDao, + datasetDao, + datasetVersionDao, + datasetFieldDao, + runDao, + columnLineageDao); + datasetOutputs.add(record); + + // Facets ... + Optional.ofNullable(dataset.getFacets()) + .ifPresent( + facets -> + datasetFacetsDao.insertDatasetFacetsFor( + record.getDatasetRow().getUuid(), + record.getDatasetVersionRow().getUuid(), + null, + now, + null, + facets)); + + // OutputFacets ... + Optional.ofNullable(dataset.getOutputFacets()) + .ifPresent( + facets -> + datasetFacetsDao.insertOutputDatasetFacetsFor( + record.getDatasetRow().getUuid(), + record.getDatasetVersionRow().getUuid(), + null, + now, + null, + facets)); + + datasetDao.updateVersion( + record.getDatasetVersionRow().getDatasetUuid(), + Instant.now(), + record.getDatasetVersionRow().getUuid()); + + bag.setOutputs(Optional.ofNullable(datasetOutputs)); + + return bag; + } + default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper mapper) { NamespaceDao namespaceDao = createNamespaceDao(); DatasetSymlinkDao datasetSymlinkDao = createDatasetSymlinkDao(); @@ -938,7 +1022,7 @@ default UUID runToUuid(String runId) { } } - default PGobject createJsonArray(LineageEvent event, ObjectMapper mapper) { + default PGobject createJsonArray(BaseEvent event, ObjectMapper mapper) { try { PGobject jsonObject = new PGobject(); jsonObject.setType("json"); diff --git a/api/src/main/java/marquez/service/OpenLineageService.java b/api/src/main/java/marquez/service/OpenLineageService.java index 8c441a0225..ebeb0ff10c 100644 --- a/api/src/main/java/marquez/service/OpenLineageService.java +++ b/api/src/main/java/marquez/service/OpenLineageService.java @@ -45,6 +45,7 @@ import marquez.service.RunTransitionListener.RunInput; import marquez.service.RunTransitionListener.RunOutput; import marquez.service.RunTransitionListener.RunTransition; +import marquez.service.models.DatasetEvent; import marquez.service.models.LineageEvent; import marquez.service.models.RunMeta; @@ -67,6 +68,30 @@ public OpenLineageService(BaseDao baseDao, RunService runService, Executor execu this.executor = executor; } + public CompletableFuture createAsync(DatasetEvent event) { + CompletableFuture openLineage = + CompletableFuture.runAsync( + withSentry( + withMdc( + () -> + createDatasetEvent( + event.getEventTime().withZoneSameInstant(ZoneId.of("UTC")).toInstant(), + createJsonArray(event, mapper), + event.getProducer()))), + executor); + + CompletableFuture marquez = + CompletableFuture.runAsync( + withSentry( + withMdc( + () -> { + updateMarquezModel(event, mapper); + })), + executor); + + return CompletableFuture.allOf(marquez, openLineage); + } + public CompletableFuture createAsync(LineageEvent event) { UUID runUuid = runUuidFromEvent(event.getRun()); CompletableFuture openLineage = diff --git a/api/src/main/resources/marquez/db/migration/V65__dataset_facets_lineage_event_type_nullable.sql b/api/src/main/resources/marquez/db/migration/V65__dataset_facets_lineage_event_type_nullable.sql new file mode 100644 index 0000000000..0d106ce6f0 --- /dev/null +++ b/api/src/main/resources/marquez/db/migration/V65__dataset_facets_lineage_event_type_nullable.sql @@ -0,0 +1 @@ +ALTER TABLE dataset_facets ALTER COLUMN lineage_event_type DROP NOT NULL; diff --git a/api/src/test/java/marquez/OpenLineageIntegrationTest.java b/api/src/test/java/marquez/OpenLineageIntegrationTest.java index 4729a64971..fec39865c7 100644 --- a/api/src/test/java/marquez/OpenLineageIntegrationTest.java +++ b/api/src/test/java/marquez/OpenLineageIntegrationTest.java @@ -55,7 +55,6 @@ import marquez.client.models.Run; import marquez.common.Utils; import marquez.db.LineageTestUtils; -import marquez.service.models.DatasetEvent; import marquez.service.models.JobEvent; import org.assertj.core.api.InstanceOfAssertFactories; import org.jdbi.v3.core.Jdbi; @@ -1376,7 +1375,7 @@ public void testSendOpenLineage(String pathToOpenLineageEvent) throws IOExceptio } @Test - public void testSendDatasetEventIsDecoded() throws IOException { + public void testSendDatasetEvent() throws IOException { final String openLineageEventAsString = Resources.toString(Resources.getResource(EVENT_DATASET_EVENT), Charset.defaultCharset()); @@ -1394,16 +1393,31 @@ public void testSendDatasetEventIsDecoded() throws IOException { // Ensure the event was received. Map respMap = resp.join(); - assertThat(respMap.containsKey(200)).isTrue(); // Status should be 200 instead of 201 + assertThat(respMap.containsKey(201)).isTrue(); // (3) Convert the OpenLineage event to Json. - DatasetEvent datasetEvent = - marquez.client.Utils.fromJson(respMap.get(200), new TypeReference() {}); - assertThat(datasetEvent.getDataset().getName()).isEqualTo("my-dataset-name"); - assertThat(datasetEvent.getDataset().getFacets().getSchema().getFields()).hasSize(1); - assertThat(datasetEvent.getDataset().getFacets().getSchema().getFields().get(0).getName()) - .isEqualTo("col_a"); - assertThat(datasetEvent.getEventTime().toString()).startsWith("2020-12-28T09:52:00.001"); + final JsonNode openLineageEventAsJson = + Utils.fromJson(openLineageEventAsString, new TypeReference() {}); + + // (4) Verify dataset facet associated with the OpenLineage event. + final JsonNode json = openLineageEventAsJson.path("dataset"); + + final String namespace = json.path("namespace").asText(); + final String output = json.path("name").asText(); + final JsonNode expectedFacets = json.path("facets"); + + final Dataset dataset = client.getDataset(namespace, output); + assertThat(Utils.getMapper().convertValue(dataset.getFacets(), JsonNode.class)) + .isEqualTo(expectedFacets); + + List datasetVersions = client.listDatasetVersions(namespace, output); + assertThat(datasetVersions).isNotEmpty(); + + DatasetVersion latestDatasetVersion = datasetVersions.get(0); + assertThat(latestDatasetVersion.getNamespace()).isEqualTo(namespace); + assertThat(latestDatasetVersion.getName()).isEqualTo(output); + assertThat(Utils.getMapper().convertValue(latestDatasetVersion.getFacets(), JsonNode.class)) + .isEqualTo(expectedFacets); } @Test diff --git a/api/src/test/java/marquez/db/LineageTestUtils.java b/api/src/test/java/marquez/db/LineageTestUtils.java index 84dc6d18f1..c45a1a5bca 100644 --- a/api/src/test/java/marquez/db/LineageTestUtils.java +++ b/api/src/test/java/marquez/db/LineageTestUtils.java @@ -24,6 +24,7 @@ import marquez.common.Utils; import marquez.db.models.UpdateLineageRow; import marquez.db.models.UpdateLineageRow.DatasetRecord; +import marquez.service.models.DatasetEvent; import marquez.service.models.LineageEvent; import marquez.service.models.LineageEvent.Dataset; import marquez.service.models.LineageEvent.DatasetFacets; @@ -173,6 +174,44 @@ public static UpdateLineageRow createLineageRow( return updateLineageRow; } + /** + * Create an {@link UpdateLineageRow} from the input job details and datasets. + * + * @param dao + * @param dataset + * @return + */ + public static UpdateLineageRow createLineageRow(OpenLineageDao dao, Dataset dataset) { + + DatasetEvent event = + DatasetEvent.builder() + .eventTime(Instant.now().atZone(LOCAL_ZONE)) + .dataset(dataset) + .producer(PRODUCER_URL.toString()) + .build(); + + // emulate an OpenLineage DatasetEvent + event + .getProperties() + .put( + "_schemaURL", + "https://openlineage.io/spec/1-0-1/OpenLineage.json#/definitions/RunEvent"); + UpdateLineageRow updateLineageRow = dao.updateMarquezModel(event, Utils.getMapper()); + PGobject jsonObject = new PGobject(); + jsonObject.setType("json"); + try { + jsonObject.setValue(Utils.toJson(event)); + } catch (SQLException e) { + throw new RuntimeException(e); + } + dao.createDatasetEvent( + event.getEventTime().withZoneSameInstant(ZoneId.of("UTC")).toInstant(), + jsonObject, + event.getProducer()); + + return updateLineageRow; + } + public static DatasetFacets newDatasetFacet(SchemaField... fields) { return newDatasetFacet(EMPTY_MAP, fields); } diff --git a/api/src/test/java/marquez/db/OpenLineageDaoTest.java b/api/src/test/java/marquez/db/OpenLineageDaoTest.java index d0ddf7253b..2648d253e9 100644 --- a/api/src/test/java/marquez/db/OpenLineageDaoTest.java +++ b/api/src/test/java/marquez/db/OpenLineageDaoTest.java @@ -96,6 +96,22 @@ void testUpdateMarquezModel() { .isEqualTo(writeJob.getOutputs().get().get(0).getDatasetVersionRow()); } + @Test + void testUpdateMarquezModelWithDatasetEvent() { + UpdateLineageRow datasetEventRow = + LineageTestUtils.createLineageRow( + dao, new Dataset(LineageTestUtils.NAMESPACE, DATASET_NAME, datasetFacets)); + + assertThat(datasetEventRow.getOutputs()).isPresent(); + assertThat(datasetEventRow.getOutputs().get()).hasSize(1).first(); + assertThat(datasetEventRow.getOutputs().get().get(0).getDatasetRow()) + .hasFieldOrPropertyWithValue("name", DATASET_NAME) + .hasFieldOrPropertyWithValue("namespaceName", LineageTestUtils.NAMESPACE); + + assertThat(datasetEventRow.getOutputs().get().get(0).getDatasetVersionRow()) + .hasNoNullFieldsOrPropertiesExcept("runUuid"); + } + @Test void testUpdateMarquezModelLifecycleStateChangeFacet() { Dataset dataset = diff --git a/api/src/test/java/marquez/service/OpenLineageServiceIntegrationTest.java b/api/src/test/java/marquez/service/OpenLineageServiceIntegrationTest.java index 0a705ff91d..e1f9cc42ce 100644 --- a/api/src/test/java/marquez/service/OpenLineageServiceIntegrationTest.java +++ b/api/src/test/java/marquez/service/OpenLineageServiceIntegrationTest.java @@ -5,6 +5,8 @@ package marquez.service; +import static marquez.db.LineageTestUtils.PRODUCER_URL; +import static marquez.db.LineageTestUtils.SCHEMA_URL; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; @@ -28,6 +30,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import marquez.common.Utils; +import marquez.common.models.FieldName; import marquez.common.models.JobType; import marquez.common.models.RunState; import marquez.db.DatasetDao; @@ -47,11 +50,14 @@ import marquez.service.RunTransitionListener.JobOutputUpdate; import marquez.service.RunTransitionListener.RunTransition; import marquez.service.models.Dataset; +import marquez.service.models.DatasetEvent; import marquez.service.models.Job; import marquez.service.models.LineageEvent; import marquez.service.models.LineageEvent.DatasetFacets; import marquez.service.models.LineageEvent.DatasourceDatasetFacet; import marquez.service.models.LineageEvent.RunFacet; +import marquez.service.models.LineageEvent.SchemaDatasetFacet; +import marquez.service.models.LineageEvent.SchemaField; import marquez.service.models.Run; import org.jdbi.v3.core.Jdbi; import org.junit.jupiter.api.Assertions; @@ -437,6 +443,45 @@ void testJobIsNotHiddenAfterSubsequentOLEvent() throws ExecutionException, Inter assertThat(jobService.findJobByName(NAMESPACE, name)).isNotEmpty(); } + @Test + void testDatasetEvent() throws ExecutionException, InterruptedException { + LineageEvent.Dataset dataset = + LineageEvent.Dataset.builder() + .name(DATASET_NAME) + .namespace(NAMESPACE) + .facets( + DatasetFacets.builder() + .schema( + new SchemaDatasetFacet( + PRODUCER_URL, + SCHEMA_URL, + Arrays.asList(new SchemaField("col", "STRING", "my name")))) + .dataSource( + DatasourceDatasetFacet.builder() + .name("theDatasource") + .uri("http://thedatasource") + .build()) + .build()) + .build(); + + lineageService + .createAsync( + DatasetEvent.builder() + .eventTime(Instant.now().atZone(TIMEZONE)) + .dataset(dataset) + .build()) + .get(); + + Optional datasetRow = datasetDao.findDatasetByName(NAMESPACE, DATASET_NAME); + assertThat(datasetRow).isPresent().map(Dataset::getCurrentVersion).isPresent(); + assertThat(datasetRow.get().getSourceName().getValue()).isEqualTo("theDatasource"); + assertThat(datasetRow.get().getFields()) + .hasSize(1) + .first() + .hasFieldOrPropertyWithValue("name", FieldName.of("col")) + .hasFieldOrPropertyWithValue("type", "STRING"); + } + private void checkExists(LineageEvent.Dataset ds) { DatasetService datasetService = new DatasetService(openLineageDao, runService); From 91f8ff1b4b23365c07bb0d94e7203dafe514d87d Mon Sep 17 00:00:00 2001 From: Pawel Leszczynski Date: Mon, 23 Oct 2023 14:10:54 +0200 Subject: [PATCH 2/6] Runless events - add container to store daos Signed-off-by: Pawel Leszczynski --- .../main/java/marquez/db/OpenLineageDao.java | 463 ++++++++---------- .../java/marquez/db/models/ModelDaos.java | 141 ++++++ 2 files changed, 350 insertions(+), 254 deletions(-) create mode 100644 api/src/main/java/marquez/db/models/ModelDaos.java diff --git a/api/src/main/java/marquez/db/OpenLineageDao.java b/api/src/main/java/marquez/db/OpenLineageDao.java index 41111ae55f..fbdb84ea62 100644 --- a/api/src/main/java/marquez/db/OpenLineageDao.java +++ b/api/src/main/java/marquez/db/OpenLineageDao.java @@ -41,6 +41,7 @@ import marquez.db.models.DatasetVersionRow; import marquez.db.models.InputFieldData; import marquez.db.models.JobRow; +import marquez.db.models.ModelDaos; import marquez.db.models.NamespaceRow; import marquez.db.models.RunArgsRow; import marquez.db.models.RunRow; @@ -75,6 +76,7 @@ public interface OpenLineageDao extends BaseDao { String DEFAULT_SOURCE_NAME = "default"; String DEFAULT_NAMESPACE_OWNER = "anonymous"; + ModelDaos daos = new ModelDaos(); @SqlUpdate( "INSERT INTO lineage_events (" @@ -146,104 +148,73 @@ default UpdateLineageRow updateMarquezModel(LineageEvent event, ObjectMapper map } default UpdateLineageRow updateMarquezModel(DatasetEvent event, ObjectMapper mapper) { - NamespaceDao namespaceDao = createNamespaceDao(); - DatasetSymlinkDao datasetSymlinkDao = createDatasetSymlinkDao(); - DatasetDao datasetDao = createDatasetDao(); - SourceDao sourceDao = createSourceDao(); - DatasetVersionDao datasetVersionDao = createDatasetVersionDao(); - DatasetFieldDao datasetFieldDao = createDatasetFieldDao(); - RunDao runDao = createRunDao(); - DatasetFacetsDao datasetFacetsDao = createDatasetFacetsDao(); - ColumnLineageDao columnLineageDao = createColumnLineageDao(); + daos.initBaseDao(this); Instant now = event.getEventTime().withZoneSameInstant(ZoneId.of("UTC")).toInstant(); UpdateLineageRow bag = new UpdateLineageRow(); NamespaceRow namespace = - namespaceDao.upsertNamespaceRow( - UUID.randomUUID(), - now, - formatNamespaceName(event.getDataset().getNamespace()), - DEFAULT_NAMESPACE_OWNER); + daos.getNamespaceDao() + .upsertNamespaceRow( + UUID.randomUUID(), + now, + formatNamespaceName(event.getDataset().getNamespace()), + DEFAULT_NAMESPACE_OWNER); bag.setNamespace(namespace); Dataset dataset = event.getDataset(); List datasetOutputs = new ArrayList<>(); - DatasetRecord record = - upsertLineageDataset( - dataset, - now, - null, - false, - namespaceDao, - datasetSymlinkDao, - sourceDao, - datasetDao, - datasetVersionDao, - datasetFieldDao, - runDao, - columnLineageDao); + DatasetRecord record = upsertLineageDataset(dataset, now, null, false, daos); datasetOutputs.add(record); // Facets ... Optional.ofNullable(dataset.getFacets()) .ifPresent( facets -> - datasetFacetsDao.insertDatasetFacetsFor( - record.getDatasetRow().getUuid(), - record.getDatasetVersionRow().getUuid(), - null, - now, - null, - facets)); + daos.getDatasetFacetsDao() + .insertDatasetFacetsFor( + record.getDatasetRow().getUuid(), + record.getDatasetVersionRow().getUuid(), + null, + now, + null, + facets)); // OutputFacets ... Optional.ofNullable(dataset.getOutputFacets()) .ifPresent( facets -> - datasetFacetsDao.insertOutputDatasetFacetsFor( - record.getDatasetRow().getUuid(), - record.getDatasetVersionRow().getUuid(), - null, - now, - null, - facets)); + daos.getDatasetFacetsDao() + .insertOutputDatasetFacetsFor( + record.getDatasetRow().getUuid(), + record.getDatasetVersionRow().getUuid(), + null, + now, + null, + facets)); - datasetDao.updateVersion( - record.getDatasetVersionRow().getDatasetUuid(), - Instant.now(), - record.getDatasetVersionRow().getUuid()); + daos.getDatasetDao() + .updateVersion( + record.getDatasetVersionRow().getDatasetUuid(), + Instant.now(), + record.getDatasetVersionRow().getUuid()); bag.setOutputs(Optional.ofNullable(datasetOutputs)); - return bag; } default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper mapper) { - NamespaceDao namespaceDao = createNamespaceDao(); - DatasetSymlinkDao datasetSymlinkDao = createDatasetSymlinkDao(); - DatasetDao datasetDao = createDatasetDao(); - SourceDao sourceDao = createSourceDao(); - JobDao jobDao = createJobDao(); - JobFacetsDao jobFacetsDao = createJobFacetsDao(); - DatasetVersionDao datasetVersionDao = createDatasetVersionDao(); - DatasetFieldDao datasetFieldDao = createDatasetFieldDao(); - DatasetFacetsDao datasetFacetsDao = createDatasetFacetsDao(); - RunDao runDao = createRunDao(); - RunArgsDao runArgsDao = createRunArgsDao(); - RunStateDao runStateDao = createRunStateDao(); - ColumnLineageDao columnLineageDao = createColumnLineageDao(); - RunFacetsDao runFacetsDao = createRunFacetsDao(); - + daos.initBaseDao(this); Instant now = event.getEventTime().withZoneSameInstant(ZoneId.of("UTC")).toInstant(); UpdateLineageRow bag = new UpdateLineageRow(); NamespaceRow namespace = - namespaceDao.upsertNamespaceRow( - UUID.randomUUID(), - now, - formatNamespaceName(event.getJob().getNamespace()), - DEFAULT_NAMESPACE_OWNER); + daos.getNamespaceDao() + .upsertNamespaceRow( + UUID.randomUUID(), + now, + formatNamespaceName(event.getJob().getNamespace()), + DEFAULT_NAMESPACE_OWNER); bag.setNamespace(namespace); Instant nominalStartTime = @@ -265,14 +236,22 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper JobRow job = buildJobFromEvent( - event, mapper, jobDao, now, namespace, nominalStartTime, nominalEndTime, parentRun); + event, + mapper, + daos.getJobDao(), + now, + namespace, + nominalStartTime, + nominalEndTime, + parentRun); bag.setJob(job); Map runArgsMap = createRunArgs(event); RunArgsRow runArgs = - runArgsDao.upsertRunArgs( - UUID.randomUUID(), now, Utils.toJson(runArgsMap), Utils.checksumFor(runArgsMap)); + daos.getRunArgsDao() + .upsertRunArgs( + UUID.randomUUID(), now, Utils.toJson(runArgsMap), Utils.checksumFor(runArgsMap)); bag.setRunArgs(runArgs); final UUID runUuid = runToUuid(event.getRun().getRunId()); @@ -281,42 +260,45 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper if (event.getEventType() != null) { RunState runStateType = getRunState(event.getEventType()); run = - runDao.upsert( - runUuid, - parentUuid.orElse(null), - event.getRun().getRunId(), - now, - job.getUuid(), - null, - runArgs.getUuid(), - nominalStartTime, - nominalEndTime, - runStateType, - now, - namespace.getName(), - job.getName(), - job.getLocation()); + daos.getRunDao() + .upsert( + runUuid, + parentUuid.orElse(null), + event.getRun().getRunId(), + now, + job.getUuid(), + null, + runArgs.getUuid(), + nominalStartTime, + nominalEndTime, + runStateType, + now, + namespace.getName(), + job.getName(), + job.getLocation()); // Add ... Optional.ofNullable(event.getRun().getFacets()) .ifPresent( runFacet -> - runFacetsDao.insertRunFacetsFor( - runUuid, now, event.getEventType(), event.getRun().getFacets())); + daos.getRunFacetsDao() + .insertRunFacetsFor( + runUuid, now, event.getEventType(), event.getRun().getFacets())); } else { run = - runDao.upsert( - runUuid, - parentUuid.orElse(null), - event.getRun().getRunId(), - now, - job.getUuid(), - null, - runArgs.getUuid(), - nominalStartTime, - nominalEndTime, - namespace.getName(), - job.getName(), - job.getLocation()); + daos.getRunDao() + .upsert( + runUuid, + parentUuid.orElse(null), + event.getRun().getRunId(), + now, + job.getUuid(), + null, + runArgs.getUuid(), + nominalStartTime, + nominalEndTime, + namespace.getName(), + job.getName(), + job.getLocation()); } bag.setRun(run); @@ -324,12 +306,12 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper RunState runStateType = getRunState(event.getEventType()); RunStateRow runState = - runStateDao.upsert(UUID.randomUUID(), now, run.getUuid(), runStateType); + daos.getRunStateDao().upsert(UUID.randomUUID(), now, run.getUuid(), runStateType); bag.setRunState(runState); if (runStateType.isDone()) { - runDao.updateEndState(run.getUuid(), now, runState.getUuid()); + daos.getRunDao().updateEndState(run.getUuid(), now, runState.getUuid()); } else if (runStateType.isStarting()) { - runDao.updateStartState(run.getUuid(), now, runState.getUuid()); + daos.getRunDao().updateStartState(run.getUuid(), now, runState.getUuid()); } } @@ -337,53 +319,47 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper Optional.ofNullable(event.getJob().getFacets()) .ifPresent( jobFacet -> - jobFacetsDao.insertJobFacetsFor( - job.getUuid(), runUuid, now, event.getEventType(), event.getJob().getFacets())); + daos.getJobFacetsDao() + .insertJobFacetsFor( + job.getUuid(), + runUuid, + now, + event.getEventType(), + event.getJob().getFacets())); // RunInput list uses null as a sentinel value List datasetInputs = null; if (event.getInputs() != null) { datasetInputs = new ArrayList<>(); for (Dataset dataset : event.getInputs()) { - DatasetRecord record = - upsertLineageDataset( - dataset, - now, - runUuid, - true, - namespaceDao, - datasetSymlinkDao, - sourceDao, - datasetDao, - datasetVersionDao, - datasetFieldDao, - runDao, - columnLineageDao); + DatasetRecord record = upsertLineageDataset(dataset, now, runUuid, true, daos); datasetInputs.add(record); // Facets ... Optional.ofNullable(dataset.getFacets()) .ifPresent( facets -> - datasetFacetsDao.insertDatasetFacetsFor( - record.getDatasetRow().getUuid(), - record.getDatasetVersionRow().getUuid(), - runUuid, - now, - event.getEventType(), - facets)); + daos.getDatasetFacetsDao() + .insertDatasetFacetsFor( + record.getDatasetRow().getUuid(), + record.getDatasetVersionRow().getUuid(), + runUuid, + now, + event.getEventType(), + facets)); // InputFacets ... Optional.ofNullable(dataset.getInputFacets()) .ifPresent( facets -> - datasetFacetsDao.insertInputDatasetFacetsFor( - record.getDatasetRow().getUuid(), - record.getDatasetVersionRow().getUuid(), - runUuid, - now, - event.getEventType(), - facets)); + daos.getDatasetFacetsDao() + .insertInputDatasetFacetsFor( + record.getDatasetRow().getUuid(), + record.getDatasetVersionRow().getUuid(), + runUuid, + now, + event.getEventType(), + facets)); } } bag.setInputs(Optional.ofNullable(datasetInputs)); @@ -392,45 +368,34 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper if (event.getOutputs() != null) { datasetOutputs = new ArrayList<>(); for (Dataset dataset : event.getOutputs()) { - DatasetRecord record = - upsertLineageDataset( - dataset, - now, - runUuid, - false, - namespaceDao, - datasetSymlinkDao, - sourceDao, - datasetDao, - datasetVersionDao, - datasetFieldDao, - runDao, - columnLineageDao); + DatasetRecord record = upsertLineageDataset(dataset, now, runUuid, false, daos); datasetOutputs.add(record); // Facets ... Optional.ofNullable(dataset.getFacets()) .ifPresent( facets -> - datasetFacetsDao.insertDatasetFacetsFor( - record.getDatasetRow().getUuid(), - record.getDatasetVersionRow().getUuid(), - runUuid, - now, - event.getEventType(), - facets)); + daos.getDatasetFacetsDao() + .insertDatasetFacetsFor( + record.getDatasetRow().getUuid(), + record.getDatasetVersionRow().getUuid(), + runUuid, + now, + event.getEventType(), + facets)); // OutputFacets ... Optional.ofNullable(dataset.getOutputFacets()) .ifPresent( facets -> - datasetFacetsDao.insertOutputDatasetFacetsFor( - record.getDatasetRow().getUuid(), - record.getDatasetVersionRow().getUuid(), - runUuid, - now, - event.getEventType(), - facets)); + daos.getDatasetFacetsDao() + .insertOutputDatasetFacetsFor( + record.getDatasetRow().getUuid(), + record.getDatasetVersionRow().getUuid(), + runUuid, + now, + event.getEventType(), + facets)); } } @@ -707,35 +672,25 @@ default JobType getJobType(Job job) { } default DatasetRecord upsertLineageDataset( - Dataset ds, - Instant now, - UUID runUuid, - boolean isInput, - NamespaceDao namespaceDao, - DatasetSymlinkDao datasetSymlinkDao, - SourceDao sourceDao, - DatasetDao datasetDao, - DatasetVersionDao datasetVersionDao, - DatasetFieldDao datasetFieldDao, - RunDao runDao, - ColumnLineageDao columnLineageDao) { + Dataset ds, Instant now, UUID runUuid, boolean isInput, ModelDaos daos) { NamespaceRow dsNamespace = - namespaceDao.upsertNamespaceRow( - UUID.randomUUID(), now, ds.getNamespace(), DEFAULT_NAMESPACE_OWNER); + daos.getNamespaceDao() + .upsertNamespaceRow(UUID.randomUUID(), now, ds.getNamespace(), DEFAULT_NAMESPACE_OWNER); SourceRow source; if (ds.getFacets() != null && ds.getFacets().getDataSource() != null) { source = - sourceDao.upsert( - UUID.randomUUID(), - getSourceType(ds), - now, - ds.getFacets().getDataSource().getName(), - getUrlOrNull(ds.getFacets().getDataSource().getUri())); + daos.getSourceDao() + .upsert( + UUID.randomUUID(), + getSourceType(ds), + now, + ds.getFacets().getDataSource().getName(), + getUrlOrNull(ds.getFacets().getDataSource().getUri())); } else { source = - sourceDao.upsertOrDefault( - UUID.randomUUID(), getSourceType(ds), now, DEFAULT_SOURCE_NAME, ""); + daos.getSourceDao() + .upsertOrDefault(UUID.randomUUID(), getSourceType(ds), now, DEFAULT_SOURCE_NAME, ""); } String dsDescription = null; @@ -744,20 +699,22 @@ default DatasetRecord upsertLineageDataset( } NamespaceRow datasetNamespace = - namespaceDao.upsertNamespaceRow( - UUID.randomUUID(), - now, - formatNamespaceName(ds.getNamespace()), - DEFAULT_NAMESPACE_OWNER); + daos.getNamespaceDao() + .upsertNamespaceRow( + UUID.randomUUID(), + now, + formatNamespaceName(ds.getNamespace()), + DEFAULT_NAMESPACE_OWNER); DatasetSymlinkRow symlink = - datasetSymlinkDao.upsertDatasetSymlinkRow( - UUID.randomUUID(), - formatDatasetName(ds.getName()), - dsNamespace.getUuid(), - true, - null, - now); + daos.getDatasetSymlinkDao() + .upsertDatasetSymlinkRow( + UUID.randomUUID(), + formatDatasetName(ds.getName()), + dsNamespace.getUuid(), + true, + null, + now); Optional.ofNullable(ds.getFacets()) .map(facets -> facets.getSymlinks()) @@ -766,19 +723,20 @@ default DatasetRecord upsertLineageDataset( el.getIdentifiers().stream() .forEach( id -> - datasetSymlinkDao.doUpsertDatasetSymlinkRow( - symlink.getUuid(), - id.getName(), - namespaceDao - .upsertNamespaceRow( - UUID.randomUUID(), - now, - id.getNamespace(), - DEFAULT_NAMESPACE_OWNER) - .getUuid(), - false, - id.getType(), - now))); + daos.getDatasetSymlinkDao() + .doUpsertDatasetSymlinkRow( + symlink.getUuid(), + id.getName(), + daos.getNamespaceDao() + .upsertNamespaceRow( + UUID.randomUUID(), + now, + id.getNamespace(), + DEFAULT_NAMESPACE_OWNER) + .getUuid(), + false, + id.getType(), + now))); String dslifecycleState = Optional.ofNullable(ds.getFacets()) .map(DatasetFacets::getLifecycleStateChange) @@ -786,18 +744,19 @@ default DatasetRecord upsertLineageDataset( .orElse(""); DatasetRow datasetRow = - datasetDao.upsert( - symlink.getUuid(), - getDatasetType(ds), - now, - datasetNamespace.getUuid(), - datasetNamespace.getName(), - source.getUuid(), - source.getName(), - formatDatasetName(ds.getName()), - ds.getName(), - dsDescription, - dslifecycleState.equalsIgnoreCase("DROP")); + daos.getDatasetDao() + .upsert( + symlink.getUuid(), + getDatasetType(ds), + now, + datasetNamespace.getUuid(), + datasetNamespace.getName(), + source.getUuid(), + source.getName(), + formatDatasetName(ds.getName()), + ds.getName(), + dsDescription, + dslifecycleState.equalsIgnoreCase("DROP")); List fields = Optional.ofNullable(ds.getFacets()) @@ -810,7 +769,7 @@ default DatasetRecord upsertLineageDataset( datasetRow .getCurrentVersionUuid() .filter(v -> isInput) // only fetch the current version if this is a read - .flatMap(datasetVersionDao::findRowByUuid) + .flatMap(daos.getDatasetVersionDao()::findRowByUuid) // if this is a write _or_ if the dataset has no current version, // create a new version .orElseGet( @@ -826,16 +785,17 @@ default DatasetRecord upsertLineageDataset( runUuid) .getValue(); DatasetVersionRow row = - datasetVersionDao.upsert( - UUID.randomUUID(), - now, - dsRow.getUuid(), - versionUuid, - isInput ? null : runUuid, - datasetVersionDao.toPgObjectSchemaFields(fields), - dsNamespace.getName(), - ds.getName(), - dslifecycleState); + daos.getDatasetVersionDao() + .upsert( + UUID.randomUUID(), + now, + dsRow.getUuid(), + versionUuid, + isInput ? null : runUuid, + daos.getDatasetVersionDao().toPgObjectSchemaFields(fields), + dsNamespace.getName(), + ds.getName(), + dslifecycleState); return row; }); List datasetFieldMappings = new ArrayList<>(); @@ -843,28 +803,29 @@ default DatasetRecord upsertLineageDataset( if (fields != null) { for (SchemaField field : fields) { DatasetFieldRow datasetFieldRow = - datasetFieldDao.upsert( - UUID.randomUUID(), - now, - field.getName(), - field.getType(), - field.getDescription(), - datasetRow.getUuid()); + daos.getDatasetFieldDao() + .upsert( + UUID.randomUUID(), + now, + field.getName(), + field.getType(), + field.getDescription(), + datasetRow.getUuid()); datasetFields.add(datasetFieldRow); datasetFieldMappings.add( new DatasetFieldMapping(datasetVersionRow.getUuid(), datasetFieldRow.getUuid())); } } - datasetFieldDao.updateFieldMapping(datasetFieldMappings); + daos.getDatasetFieldDao().updateFieldMapping(datasetFieldMappings); if (isInput) { - runDao.updateInputMapping(runUuid, datasetVersionRow.getUuid()); + daos.getRunDao().updateInputMapping(runUuid, datasetVersionRow.getUuid()); // TODO - this is a short term fix until // https://github.com/MarquezProject/marquez/issues/1361 // is fully thought out if (datasetRow.getCurrentVersionUuid().isEmpty()) { - datasetDao.updateVersion(dsRow.getUuid(), now, datasetVersionRow.getUuid()); + daos.getDatasetDao().updateVersion(dsRow.getUuid(), now, datasetVersionRow.getUuid()); datasetRow = datasetRow.withCurrentVersionUuid(datasetVersionRow.getUuid()); } } @@ -872,14 +833,7 @@ default DatasetRecord upsertLineageDataset( List columnLineageRows = Collections.emptyList(); if (!isInput) { columnLineageRows = - upsertColumnLineage( - runUuid, - ds, - now, - datasetFields, - columnLineageDao, - datasetFieldDao, - datasetVersionRow); + upsertColumnLineage(runUuid, ds, now, datasetFields, datasetVersionRow, daos); } return new DatasetRecord(datasetRow, datasetVersionRow, datasetNamespace, columnLineageRows); @@ -890,13 +844,13 @@ private List upsertColumnLineage( Dataset ds, Instant now, List datasetFields, - ColumnLineageDao columnLineageDao, - DatasetFieldDao datasetFieldDao, - DatasetVersionRow datasetVersionRow) { + DatasetVersionRow datasetVersionRow, + ModelDaos daos) { Logger log = LoggerFactory.getLogger(OpenLineageDao.class); // get all the fields related to this particular run - List runFields = datasetFieldDao.findInputFieldsDataAssociatedWithRun(runUuid); + List runFields = + daos.getDatasetFieldDao().findInputFieldsDataAssociatedWithRun(runUuid); log.debug("Found input datasets fields for run '{}': {}", runUuid, runFields); return Optional.ofNullable(ds.getFacets()) @@ -948,7 +902,8 @@ private List upsertColumnLineage( outputField.get().getName(), datasetVersionRow.getUuid(), inputFields); - return columnLineageDao + return daos + .getColumnLineageDao() .upsertColumnLineageRow( datasetVersionRow.getUuid(), outputField.get().getUuid(), diff --git a/api/src/main/java/marquez/db/models/ModelDaos.java b/api/src/main/java/marquez/db/models/ModelDaos.java new file mode 100644 index 0000000000..307d789f41 --- /dev/null +++ b/api/src/main/java/marquez/db/models/ModelDaos.java @@ -0,0 +1,141 @@ +package marquez.db.models; + +import marquez.db.BaseDao; +import marquez.db.ColumnLineageDao; +import marquez.db.DatasetDao; +import marquez.db.DatasetFacetsDao; +import marquez.db.DatasetFieldDao; +import marquez.db.DatasetSymlinkDao; +import marquez.db.DatasetVersionDao; +import marquez.db.JobDao; +import marquez.db.JobFacetsDao; +import marquez.db.NamespaceDao; +import marquez.db.RunArgsDao; +import marquez.db.RunDao; +import marquez.db.RunFacetsDao; +import marquez.db.RunStateDao; +import marquez.db.SourceDao; + +/** + * Container for storing all the Dao classes which ensures parent interface methods are called + * exactly once. + */ +public final class ModelDaos { + private static NamespaceDao namespaceDao = null; + private static DatasetSymlinkDao datasetSymlinkDao = null; + private static DatasetDao datasetDao = null; + private static SourceDao sourceDao = null; + private static DatasetVersionDao datasetVersionDao = null; + private static DatasetFieldDao datasetFieldDao = null; + private static RunDao runDao = null; + private static DatasetFacetsDao datasetFacetsDao = null; + private static ColumnLineageDao columnLineageDao = null; + private static JobDao jobDao = null; + private static JobFacetsDao jobFacetsDao = null; + private static RunArgsDao runArgsDao = null; + private static RunStateDao runStateDao = null; + private static RunFacetsDao runFacetsDao = null; + private BaseDao baseDao; + + public void initBaseDao(BaseDao baseDao) { + this.baseDao = baseDao; + } + + public NamespaceDao getNamespaceDao() { + if (namespaceDao == null) { + namespaceDao = baseDao.createNamespaceDao(); + } + return namespaceDao; + } + + public DatasetSymlinkDao getDatasetSymlinkDao() { + if (datasetSymlinkDao == null) { + datasetSymlinkDao = baseDao.createDatasetSymlinkDao(); + } + return datasetSymlinkDao; + } + + public DatasetDao getDatasetDao() { + if (datasetDao == null) { + datasetDao = baseDao.createDatasetDao(); + } + return datasetDao; + } + + public SourceDao getSourceDao() { + if (sourceDao == null) { + sourceDao = baseDao.createSourceDao(); + } + return sourceDao; + } + + public DatasetVersionDao getDatasetVersionDao() { + if (datasetVersionDao == null) { + datasetVersionDao = baseDao.createDatasetVersionDao(); + } + return datasetVersionDao; + } + + public DatasetFieldDao getDatasetFieldDao() { + if (datasetFieldDao == null) { + datasetFieldDao = baseDao.createDatasetFieldDao(); + } + return datasetFieldDao; + } + + public RunDao getRunDao() { + if (runDao == null) { + runDao = baseDao.createRunDao(); + } + return runDao; + } + + public ColumnLineageDao getColumnLineageDao() { + if (columnLineageDao == null) { + columnLineageDao = baseDao.createColumnLineageDao(); + } + return columnLineageDao; + } + + public DatasetFacetsDao getDatasetFacetsDao() { + if (datasetFacetsDao == null) { + datasetFacetsDao = baseDao.createDatasetFacetsDao(); + } + return datasetFacetsDao; + } + + public JobDao getJobDao() { + if (jobDao == null) { + jobDao = baseDao.createJobDao(); + } + return jobDao; + } + + public JobFacetsDao getJobFacetsDao() { + if (jobFacetsDao == null) { + jobFacetsDao = baseDao.createJobFacetsDao(); + } + return jobFacetsDao; + } + + public RunArgsDao getRunArgsDao() { + if (runArgsDao == null) { + runArgsDao = baseDao.createRunArgsDao(); + } + return runArgsDao; + } + + public RunStateDao getRunStateDao() { + if (runStateDao == null) { + runStateDao = baseDao.createRunStateDao(); + } + return runStateDao; + } + + public RunFacetsDao getRunFacetsDao() { + if (runFacetsDao == null) { + runFacetsDao = baseDao.createRunFacetsDao(); + } + return runFacetsDao; + } +} From 6d38a7d98dfe1385757cfbf1c64f3c7ebcf5a30d Mon Sep 17 00:00:00 2001 From: Pawel Leszczynski Date: Mon, 23 Oct 2023 14:46:19 +0200 Subject: [PATCH 3/6] Runless events - extract common methods Signed-off-by: Pawel Leszczynski --- .../main/java/marquez/db/OpenLineageDao.java | 149 +++++++++--------- 1 file changed, 71 insertions(+), 78 deletions(-) diff --git a/api/src/main/java/marquez/db/OpenLineageDao.java b/api/src/main/java/marquez/db/OpenLineageDao.java index fbdb84ea62..149af0457a 100644 --- a/api/src/main/java/marquez/db/OpenLineageDao.java +++ b/api/src/main/java/marquez/db/OpenLineageDao.java @@ -166,32 +166,7 @@ default UpdateLineageRow updateMarquezModel(DatasetEvent event, ObjectMapper map List datasetOutputs = new ArrayList<>(); DatasetRecord record = upsertLineageDataset(dataset, now, null, false, daos); datasetOutputs.add(record); - - // Facets ... - Optional.ofNullable(dataset.getFacets()) - .ifPresent( - facets -> - daos.getDatasetFacetsDao() - .insertDatasetFacetsFor( - record.getDatasetRow().getUuid(), - record.getDatasetVersionRow().getUuid(), - null, - now, - null, - facets)); - - // OutputFacets ... - Optional.ofNullable(dataset.getOutputFacets()) - .ifPresent( - facets -> - daos.getDatasetFacetsDao() - .insertOutputDatasetFacetsFor( - record.getDatasetRow().getUuid(), - record.getDatasetVersionRow().getUuid(), - null, - now, - null, - facets)); + insertOutputFacets(dataset, record, null, null, now, daos); daos.getDatasetDao() .updateVersion( @@ -334,32 +309,7 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper for (Dataset dataset : event.getInputs()) { DatasetRecord record = upsertLineageDataset(dataset, now, runUuid, true, daos); datasetInputs.add(record); - - // Facets ... - Optional.ofNullable(dataset.getFacets()) - .ifPresent( - facets -> - daos.getDatasetFacetsDao() - .insertDatasetFacetsFor( - record.getDatasetRow().getUuid(), - record.getDatasetVersionRow().getUuid(), - runUuid, - now, - event.getEventType(), - facets)); - - // InputFacets ... - Optional.ofNullable(dataset.getInputFacets()) - .ifPresent( - facets -> - daos.getDatasetFacetsDao() - .insertInputDatasetFacetsFor( - record.getDatasetRow().getUuid(), - record.getDatasetVersionRow().getUuid(), - runUuid, - now, - event.getEventType(), - facets)); + insertInputFacets(dataset, record, runUuid, event.getEventType(), now, daos); } } bag.setInputs(Optional.ofNullable(datasetInputs)); @@ -370,32 +320,7 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper for (Dataset dataset : event.getOutputs()) { DatasetRecord record = upsertLineageDataset(dataset, now, runUuid, false, daos); datasetOutputs.add(record); - - // Facets ... - Optional.ofNullable(dataset.getFacets()) - .ifPresent( - facets -> - daos.getDatasetFacetsDao() - .insertDatasetFacetsFor( - record.getDatasetRow().getUuid(), - record.getDatasetVersionRow().getUuid(), - runUuid, - now, - event.getEventType(), - facets)); - - // OutputFacets ... - Optional.ofNullable(dataset.getOutputFacets()) - .ifPresent( - facets -> - daos.getDatasetFacetsDao() - .insertOutputDatasetFacetsFor( - record.getDatasetRow().getUuid(), - record.getDatasetVersionRow().getUuid(), - runUuid, - now, - event.getEventType(), - facets)); + insertOutputFacets(dataset, record, runUuid, event.getEventType(), now, daos); } } @@ -403,6 +328,74 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper return bag; } + private void insertInputFacets( + Dataset dataset, + DatasetRecord record, + UUID runUuid, + String eventType, + Instant now, + ModelDaos daos) { + // Facets ... + Optional.ofNullable(dataset.getFacets()) + .ifPresent( + facets -> + daos.getDatasetFacetsDao() + .insertDatasetFacetsFor( + record.getDatasetRow().getUuid(), + record.getDatasetVersionRow().getUuid(), + runUuid, + now, + eventType, + facets)); + + // InputFacets ... + Optional.ofNullable(dataset.getInputFacets()) + .ifPresent( + facets -> + daos.getDatasetFacetsDao() + .insertInputDatasetFacetsFor( + record.getDatasetRow().getUuid(), + record.getDatasetVersionRow().getUuid(), + runUuid, + now, + eventType, + facets)); + } + + private void insertOutputFacets( + Dataset dataset, + DatasetRecord record, + UUID runUuid, + String eventType, + Instant now, + ModelDaos daos) { + // Facets ... + Optional.ofNullable(dataset.getFacets()) + .ifPresent( + facets -> + daos.getDatasetFacetsDao() + .insertDatasetFacetsFor( + record.getDatasetRow().getUuid(), + record.getDatasetVersionRow().getUuid(), + runUuid, + now, + eventType, + facets)); + + // OutputFacets ... + Optional.ofNullable(dataset.getOutputFacets()) + .ifPresent( + facets -> + daos.getDatasetFacetsDao() + .insertOutputDatasetFacetsFor( + record.getDatasetRow().getUuid(), + record.getDatasetVersionRow().getUuid(), + runUuid, + now, + eventType, + facets)); + } + private JobRow buildJobFromEvent( LineageEvent event, ObjectMapper mapper, From 40bfe6b3000d6439054f90cd7e17df7d0b834a90 Mon Sep 17 00:00:00 2001 From: Pawel Leszczynski Date: Tue, 24 Oct 2023 09:45:41 +0200 Subject: [PATCH 4/6] Runless events - run upsert builder Signed-off-by: Pawel Leszczynski --- CHANGELOG.md | 2 +- .../main/java/marquez/db/OpenLineageDao.java | 152 ++++++++---------- api/src/main/java/marquez/db/RunDao.java | 52 ++++++ .../java/marquez/db/models/ModelDaos.java | 5 + build.gradle | 8 +- 5 files changed, 132 insertions(+), 87 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 52e1a2074a..3b0fb53b40 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,7 +27,7 @@ Web: fix Unix epoch time display for null `endedAt` values [`#2647`](https://git ### Added * API: support `DatasetEvent` [`#2641`](https://github.com/MarquezProject/marquez/pull/2641) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski) - * Save into Marquez model datasets sent via `DatasetEvent` event type + *Save into Marquez model datasets sent via `DatasetEvent` event type ## [0.41.0](https://github.com/MarquezProject/marquez/compare/0.40.0...0.41.0) - 2023-09-20 ### Added diff --git a/api/src/main/java/marquez/db/OpenLineageDao.java b/api/src/main/java/marquez/db/OpenLineageDao.java index 149af0457a..28a2e07d1b 100644 --- a/api/src/main/java/marquez/db/OpenLineageDao.java +++ b/api/src/main/java/marquez/db/OpenLineageDao.java @@ -33,6 +33,8 @@ import marquez.common.models.SourceType; import marquez.db.DatasetFieldDao.DatasetFieldMapping; import marquez.db.JobVersionDao.BagOfJobVersionInfo; +import marquez.db.RunDao.RunUpsert; +import marquez.db.RunDao.RunUpsert.RunUpsertBuilder; import marquez.db.mappers.LineageEventMapper; import marquez.db.models.ColumnLineageRow; import marquez.db.models.DatasetFieldRow; @@ -149,7 +151,6 @@ default UpdateLineageRow updateMarquezModel(LineageEvent event, ObjectMapper map default UpdateLineageRow updateMarquezModel(DatasetEvent event, ObjectMapper mapper) { daos.initBaseDao(this); - Instant now = event.getEventTime().withZoneSameInstant(ZoneId.of("UTC")).toInstant(); UpdateLineageRow bag = new UpdateLineageRow(); @@ -164,9 +165,9 @@ default UpdateLineageRow updateMarquezModel(DatasetEvent event, ObjectMapper map Dataset dataset = event.getDataset(); List datasetOutputs = new ArrayList<>(); - DatasetRecord record = upsertLineageDataset(dataset, now, null, false, daos); + DatasetRecord record = upsertLineageDataset(dataset, now, null, false); datasetOutputs.add(record); - insertOutputFacets(dataset, record, null, null, now, daos); + insertOutputFacets(dataset, record, null, null, now); daos.getDatasetDao() .updateVersion( @@ -192,18 +193,8 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper DEFAULT_NAMESPACE_OWNER); bag.setNamespace(namespace); - Instant nominalStartTime = - Optional.ofNullable(event.getRun().getFacets()) - .flatMap(f -> Optional.ofNullable(f.getNominalTime())) - .map(NominalTimeRunFacet::getNominalStartTime) - .map(t -> t.withZoneSameInstant(ZoneId.of("UTC")).toInstant()) - .orElse(null); - Instant nominalEndTime = - Optional.ofNullable(event.getRun().getFacets()) - .flatMap(f -> Optional.ofNullable(f.getNominalTime())) - .map(NominalTimeRunFacet::getNominalEndTime) - .map(t -> t.withZoneSameInstant(ZoneId.of("UTC")).toInstant()) - .orElse(null); + Instant nominalStartTime = getNominalStartTime(event); + Instant nominalEndTime = getNominalEndTime(event); Optional parentRun = Optional.ofNullable(event.getRun()).map(Run::getFacets).map(RunFacet::getParent); @@ -230,51 +221,25 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper bag.setRunArgs(runArgs); final UUID runUuid = runToUuid(event.getRun().getRunId()); - RunRow run; + RunUpsertBuilder runUpsertBuilder = + RunUpsert.builder() + .runUuid(runUuid) + .parentRunUuid(parentUuid.orElse(null)) + .externalId(event.getRun().getRunId()) + .now(now) + .jobUuid(job.getUuid()) + .jobVersionUuid(null) + .runArgsUuid(runArgs.getUuid()) + .namespaceName(namespace.getName()) + .jobName(job.getName()) + .location(job.getLocation()); + if (event.getEventType() != null) { - RunState runStateType = getRunState(event.getEventType()); - run = - daos.getRunDao() - .upsert( - runUuid, - parentUuid.orElse(null), - event.getRun().getRunId(), - now, - job.getUuid(), - null, - runArgs.getUuid(), - nominalStartTime, - nominalEndTime, - runStateType, - now, - namespace.getName(), - job.getName(), - job.getLocation()); - // Add ... - Optional.ofNullable(event.getRun().getFacets()) - .ifPresent( - runFacet -> - daos.getRunFacetsDao() - .insertRunFacetsFor( - runUuid, now, event.getEventType(), event.getRun().getFacets())); - } else { - run = - daos.getRunDao() - .upsert( - runUuid, - parentUuid.orElse(null), - event.getRun().getRunId(), - now, - job.getUuid(), - null, - runArgs.getUuid(), - nominalStartTime, - nominalEndTime, - namespace.getName(), - job.getName(), - job.getLocation()); + runUpsertBuilder.runStateType(getRunState(event.getEventType())).runStateTime(now); } + run = daos.getRunDao().upsert(runUpsertBuilder.build()); + insertRunFacets(event, runUuid, now); bag.setRun(run); if (event.getEventType() != null) { @@ -290,37 +255,28 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper } } - // Add ... - Optional.ofNullable(event.getJob().getFacets()) - .ifPresent( - jobFacet -> - daos.getJobFacetsDao() - .insertJobFacetsFor( - job.getUuid(), - runUuid, - now, - event.getEventType(), - event.getJob().getFacets())); + insertJobFacets(event, job.getUuid(), runUuid, now); // RunInput list uses null as a sentinel value List datasetInputs = null; if (event.getInputs() != null) { datasetInputs = new ArrayList<>(); for (Dataset dataset : event.getInputs()) { - DatasetRecord record = upsertLineageDataset(dataset, now, runUuid, true, daos); + DatasetRecord record = upsertLineageDataset(dataset, now, runUuid, true); datasetInputs.add(record); - insertInputFacets(dataset, record, runUuid, event.getEventType(), now, daos); + insertInputFacets(dataset, record, runUuid, event.getEventType(), now); } } bag.setInputs(Optional.ofNullable(datasetInputs)); + // RunInput list uses null as a sentinel value List datasetOutputs = null; if (event.getOutputs() != null) { datasetOutputs = new ArrayList<>(); for (Dataset dataset : event.getOutputs()) { - DatasetRecord record = upsertLineageDataset(dataset, now, runUuid, false, daos); + DatasetRecord record = upsertLineageDataset(dataset, now, runUuid, false); datasetOutputs.add(record); - insertOutputFacets(dataset, record, runUuid, event.getEventType(), now, daos); + insertOutputFacets(dataset, record, runUuid, event.getEventType(), now); } } @@ -328,13 +284,44 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper return bag; } + private static Instant getNominalStartTime(LineageEvent event) { + return Optional.ofNullable(event.getRun().getFacets()) + .flatMap(f -> Optional.ofNullable(f.getNominalTime())) + .map(NominalTimeRunFacet::getNominalStartTime) + .map(t -> t.withZoneSameInstant(ZoneId.of("UTC")).toInstant()) + .orElse(null); + } + + private static Instant getNominalEndTime(LineageEvent event) { + return Optional.ofNullable(event.getRun().getFacets()) + .flatMap(f -> Optional.ofNullable(f.getNominalTime())) + .map(NominalTimeRunFacet::getNominalEndTime) + .map(t -> t.withZoneSameInstant(ZoneId.of("UTC")).toInstant()) + .orElse(null); + } + + private void insertRunFacets(LineageEvent event, UUID runUuid, Instant now) { + // Add ... + Optional.ofNullable(event.getRun().getFacets()) + .ifPresent( + runFacet -> + daos.getRunFacetsDao() + .insertRunFacetsFor( + runUuid, now, event.getEventType(), event.getRun().getFacets())); + } + + private void insertJobFacets(LineageEvent event, UUID jobUuid, UUID runUuid, Instant now) { + // Add ... + Optional.ofNullable(event.getJob().getFacets()) + .ifPresent( + jobFacet -> + daos.getJobFacetsDao() + .insertJobFacetsFor( + jobUuid, runUuid, now, event.getEventType(), event.getJob().getFacets())); + } + private void insertInputFacets( - Dataset dataset, - DatasetRecord record, - UUID runUuid, - String eventType, - Instant now, - ModelDaos daos) { + Dataset dataset, DatasetRecord record, UUID runUuid, String eventType, Instant now) { // Facets ... Optional.ofNullable(dataset.getFacets()) .ifPresent( @@ -363,12 +350,7 @@ private void insertInputFacets( } private void insertOutputFacets( - Dataset dataset, - DatasetRecord record, - UUID runUuid, - String eventType, - Instant now, - ModelDaos daos) { + Dataset dataset, DatasetRecord record, UUID runUuid, String eventType, Instant now) { // Facets ... Optional.ofNullable(dataset.getFacets()) .ifPresent( @@ -665,7 +647,7 @@ default JobType getJobType(Job job) { } default DatasetRecord upsertLineageDataset( - Dataset ds, Instant now, UUID runUuid, boolean isInput, ModelDaos daos) { + Dataset ds, Instant now, UUID runUuid, boolean isInput) { NamespaceRow dsNamespace = daos.getNamespaceDao() .upsertNamespaceRow(UUID.randomUUID(), now, ds.getNamespace(), DEFAULT_NAMESPACE_OWNER); diff --git a/api/src/main/java/marquez/db/RunDao.java b/api/src/main/java/marquez/db/RunDao.java index fa22eba479..32c39cf4a2 100644 --- a/api/src/main/java/marquez/db/RunDao.java +++ b/api/src/main/java/marquez/db/RunDao.java @@ -14,6 +14,7 @@ import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; +import lombok.Builder; import lombok.NonNull; import marquez.common.Utils; import marquez.common.models.DatasetId; @@ -301,6 +302,40 @@ RunRow upsert( String jobName, String location); + default RunRow upsert(RunUpsert runUpsert) { + if (runUpsert.runStateType == null) { + return upsert( + runUpsert.runUuid(), + runUpsert.parentRunUuid(), + runUpsert.externalId(), + runUpsert.now(), + runUpsert.jobUuid(), + runUpsert.jobVersionUuid(), + runUpsert.runArgsUuid(), + runUpsert.nominalStartTime(), + runUpsert.nominalEndTime(), + runUpsert.namespaceName(), + runUpsert.jobName(), + runUpsert.location()); + } else { + return upsert( + runUpsert.runUuid(), + runUpsert.parentRunUuid(), + runUpsert.externalId(), + runUpsert.now(), + runUpsert.jobUuid(), + runUpsert.jobVersionUuid(), + runUpsert.runArgsUuid(), + runUpsert.nominalStartTime(), + runUpsert.nominalEndTime(), + runUpsert.runStateType(), + runUpsert.runStateTime(), + runUpsert.namespaceName(), + runUpsert.jobName(), + runUpsert.location()); + } + } + @SqlUpdate( "INSERT INTO runs_input_mapping (run_uuid, dataset_version_uuid) " + "VALUES (:runUuid, :datasetVersionUuid) ON CONFLICT DO NOTHING") @@ -452,4 +487,21 @@ default RunRow upsertRunMeta( ) """) Optional findByLatestJob(String namespace, String jobName); + + @Builder + record RunUpsert( + UUID runUuid, + UUID parentRunUuid, + String externalId, + Instant now, + UUID jobUuid, + UUID jobVersionUuid, + UUID runArgsUuid, + Instant nominalStartTime, + Instant nominalEndTime, + RunState runStateType, + Instant runStateTime, + String namespaceName, + String jobName, + String location) {} } diff --git a/api/src/main/java/marquez/db/models/ModelDaos.java b/api/src/main/java/marquez/db/models/ModelDaos.java index 307d789f41..1e2f2f8f85 100644 --- a/api/src/main/java/marquez/db/models/ModelDaos.java +++ b/api/src/main/java/marquez/db/models/ModelDaos.java @@ -1,3 +1,8 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + package marquez.db.models; import marquez.db.BaseDao; diff --git a/build.gradle b/build.gradle index 71ec5660a1..d07e60e785 100644 --- a/build.gradle +++ b/build.gradle @@ -22,6 +22,7 @@ buildscript { classpath 'com.adarshr:gradle-test-logger-plugin:3.2.0' classpath 'gradle.plugin.com.github.johnrengelman:shadow:7.1.2' classpath 'com.diffplug.spotless:spotless-plugin-gradle:6.20.0' + classpath "io.freefair.gradle:lombok-plugin:8.4" } } @@ -39,6 +40,7 @@ subprojects { apply plugin: 'com.github.johnrengelman.shadow' apply plugin: "com.diffplug.spotless" apply plugin: "pmd" + apply plugin: "io.freefair.lombok" project(':api') { apply plugin: 'application' @@ -95,7 +97,11 @@ subprojects { archiveClassifier.set("sources") } - task javadocJar(type: Jar, dependsOn: javadoc) { + task delombokJavadocs(type: Javadoc) { + source = delombok + } + + task javadocJar(type: Jar, dependsOn: delombokJavadocs) { from javadoc.destinationDir archiveClassifier.set("javadoc") } From 3aacc39ece5e0998314752b7fc3f0b9dc03c9cde Mon Sep 17 00:00:00 2001 From: Pawel Leszczynski Date: Wed, 25 Oct 2023 15:36:53 +0200 Subject: [PATCH 5/6] Runless events - review feedback Signed-off-by: Pawel Leszczynski --- .../java/marquez/api/OpenLineageResource.java | 3 +- .../main/java/marquez/db/OpenLineageDao.java | 29 +++++++------------ 2 files changed, 12 insertions(+), 20 deletions(-) diff --git a/api/src/main/java/marquez/api/OpenLineageResource.java b/api/src/main/java/marquez/api/OpenLineageResource.java index 3c37892b8f..253c8c46a7 100644 --- a/api/src/main/java/marquez/api/OpenLineageResource.java +++ b/api/src/main/java/marquez/api/OpenLineageResource.java @@ -40,7 +40,6 @@ import marquez.service.ServiceFactory; import marquez.service.models.BaseEvent; import marquez.service.models.DatasetEvent; -import marquez.service.models.JobEvent; import marquez.service.models.LineageEvent; import marquez.service.models.NodeId; @@ -74,7 +73,7 @@ public void create(@Valid @NotNull BaseEvent event, @Suspended final AsyncRespon openLineageService .createAsync((DatasetEvent) event) .whenComplete((result, err) -> onComplete(result, err, asyncResponse)); - } else if (event instanceof JobEvent) { + } else { log.warn("Unsupported event type {}. Skipping without error", event.getClass().getName()); // return serialized event diff --git a/api/src/main/java/marquez/db/OpenLineageDao.java b/api/src/main/java/marquez/db/OpenLineageDao.java index 28a2e07d1b..d3a2d7f56b 100644 --- a/api/src/main/java/marquez/db/OpenLineageDao.java +++ b/api/src/main/java/marquez/db/OpenLineageDao.java @@ -167,7 +167,8 @@ default UpdateLineageRow updateMarquezModel(DatasetEvent event, ObjectMapper map List datasetOutputs = new ArrayList<>(); DatasetRecord record = upsertLineageDataset(dataset, now, null, false); datasetOutputs.add(record); - insertOutputFacets(dataset, record, null, null, now); + insertDatasetFacets(dataset, record, null, null, now); + insertOutputDatasetFacets(dataset, record, null, null, now); daos.getDatasetDao() .updateVersion( @@ -264,7 +265,8 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper for (Dataset dataset : event.getInputs()) { DatasetRecord record = upsertLineageDataset(dataset, now, runUuid, true); datasetInputs.add(record); - insertInputFacets(dataset, record, runUuid, event.getEventType(), now); + insertDatasetFacets(dataset, record, runUuid, event.getEventType(), now); + insertInputDatasetFacets(dataset, record, runUuid, event.getEventType(), now); } } bag.setInputs(Optional.ofNullable(datasetInputs)); @@ -276,7 +278,8 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper for (Dataset dataset : event.getOutputs()) { DatasetRecord record = upsertLineageDataset(dataset, now, runUuid, false); datasetOutputs.add(record); - insertOutputFacets(dataset, record, runUuid, event.getEventType(), now); + insertDatasetFacets(dataset, record, runUuid, event.getEventType(), now); + insertOutputDatasetFacets(dataset, record, runUuid, event.getEventType(), now); } } @@ -320,7 +323,7 @@ private void insertJobFacets(LineageEvent event, UUID jobUuid, UUID runUuid, Ins jobUuid, runUuid, now, event.getEventType(), event.getJob().getFacets())); } - private void insertInputFacets( + private void insertDatasetFacets( Dataset dataset, DatasetRecord record, UUID runUuid, String eventType, Instant now) { // Facets ... Optional.ofNullable(dataset.getFacets()) @@ -334,7 +337,10 @@ private void insertInputFacets( now, eventType, facets)); + } + private void insertInputDatasetFacets( + Dataset dataset, DatasetRecord record, UUID runUuid, String eventType, Instant now) { // InputFacets ... Optional.ofNullable(dataset.getInputFacets()) .ifPresent( @@ -349,21 +355,8 @@ private void insertInputFacets( facets)); } - private void insertOutputFacets( + private void insertOutputDatasetFacets( Dataset dataset, DatasetRecord record, UUID runUuid, String eventType, Instant now) { - // Facets ... - Optional.ofNullable(dataset.getFacets()) - .ifPresent( - facets -> - daos.getDatasetFacetsDao() - .insertDatasetFacetsFor( - record.getDatasetRow().getUuid(), - record.getDatasetVersionRow().getUuid(), - runUuid, - now, - eventType, - facets)); - // OutputFacets ... Optional.ofNullable(dataset.getOutputFacets()) .ifPresent( From 5587fb24fad3600cffb0da70c3f716b96ae79220 Mon Sep 17 00:00:00 2001 From: Pawel Leszczynski Date: Fri, 3 Nov 2023 13:50:13 +0100 Subject: [PATCH 6/6] fix daos container - speeds up tests twice Signed-off-by: Pawel Leszczynski --- .../main/java/marquez/db/OpenLineageDao.java | 53 ++++++++++++------- .../java/marquez/db/models/ModelDaos.java | 28 +++++----- .../marquez/service/LineageServiceTest.java | 1 - 3 files changed, 49 insertions(+), 33 deletions(-) diff --git a/api/src/main/java/marquez/db/OpenLineageDao.java b/api/src/main/java/marquez/db/OpenLineageDao.java index d3a2d7f56b..7519e200ef 100644 --- a/api/src/main/java/marquez/db/OpenLineageDao.java +++ b/api/src/main/java/marquez/db/OpenLineageDao.java @@ -78,7 +78,6 @@ public interface OpenLineageDao extends BaseDao { String DEFAULT_SOURCE_NAME = "default"; String DEFAULT_NAMESPACE_OWNER = "anonymous"; - ModelDaos daos = new ModelDaos(); @SqlUpdate( "INSERT INTO lineage_events (" @@ -150,6 +149,7 @@ default UpdateLineageRow updateMarquezModel(LineageEvent event, ObjectMapper map } default UpdateLineageRow updateMarquezModel(DatasetEvent event, ObjectMapper mapper) { + ModelDaos daos = new ModelDaos(); daos.initBaseDao(this); Instant now = event.getEventTime().withZoneSameInstant(ZoneId.of("UTC")).toInstant(); @@ -165,10 +165,10 @@ default UpdateLineageRow updateMarquezModel(DatasetEvent event, ObjectMapper map Dataset dataset = event.getDataset(); List datasetOutputs = new ArrayList<>(); - DatasetRecord record = upsertLineageDataset(dataset, now, null, false); + DatasetRecord record = upsertLineageDataset(daos, dataset, now, null, false); datasetOutputs.add(record); - insertDatasetFacets(dataset, record, null, null, now); - insertOutputDatasetFacets(dataset, record, null, null, now); + insertDatasetFacets(daos, dataset, record, null, null, now); + insertOutputDatasetFacets(daos, dataset, record, null, null, now); daos.getDatasetDao() .updateVersion( @@ -181,6 +181,7 @@ default UpdateLineageRow updateMarquezModel(DatasetEvent event, ObjectMapper map } default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper mapper) { + ModelDaos daos = new ModelDaos(); daos.initBaseDao(this); Instant now = event.getEventTime().withZoneSameInstant(ZoneId.of("UTC")).toInstant(); @@ -240,7 +241,7 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper runUpsertBuilder.runStateType(getRunState(event.getEventType())).runStateTime(now); } run = daos.getRunDao().upsert(runUpsertBuilder.build()); - insertRunFacets(event, runUuid, now); + insertRunFacets(daos, event, runUuid, now); bag.setRun(run); if (event.getEventType() != null) { @@ -256,17 +257,17 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper } } - insertJobFacets(event, job.getUuid(), runUuid, now); + insertJobFacets(daos, event, job.getUuid(), runUuid, now); // RunInput list uses null as a sentinel value List datasetInputs = null; if (event.getInputs() != null) { datasetInputs = new ArrayList<>(); for (Dataset dataset : event.getInputs()) { - DatasetRecord record = upsertLineageDataset(dataset, now, runUuid, true); + DatasetRecord record = upsertLineageDataset(daos, dataset, now, runUuid, true); datasetInputs.add(record); - insertDatasetFacets(dataset, record, runUuid, event.getEventType(), now); - insertInputDatasetFacets(dataset, record, runUuid, event.getEventType(), now); + insertDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now); + insertInputDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now); } } bag.setInputs(Optional.ofNullable(datasetInputs)); @@ -276,10 +277,10 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper if (event.getOutputs() != null) { datasetOutputs = new ArrayList<>(); for (Dataset dataset : event.getOutputs()) { - DatasetRecord record = upsertLineageDataset(dataset, now, runUuid, false); + DatasetRecord record = upsertLineageDataset(daos, dataset, now, runUuid, false); datasetOutputs.add(record); - insertDatasetFacets(dataset, record, runUuid, event.getEventType(), now); - insertOutputDatasetFacets(dataset, record, runUuid, event.getEventType(), now); + insertDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now); + insertOutputDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now); } } @@ -303,7 +304,7 @@ private static Instant getNominalEndTime(LineageEvent event) { .orElse(null); } - private void insertRunFacets(LineageEvent event, UUID runUuid, Instant now) { + private void insertRunFacets(ModelDaos daos, LineageEvent event, UUID runUuid, Instant now) { // Add ... Optional.ofNullable(event.getRun().getFacets()) .ifPresent( @@ -313,7 +314,8 @@ private void insertRunFacets(LineageEvent event, UUID runUuid, Instant now) { runUuid, now, event.getEventType(), event.getRun().getFacets())); } - private void insertJobFacets(LineageEvent event, UUID jobUuid, UUID runUuid, Instant now) { + private void insertJobFacets( + ModelDaos daos, LineageEvent event, UUID jobUuid, UUID runUuid, Instant now) { // Add ... Optional.ofNullable(event.getJob().getFacets()) .ifPresent( @@ -324,7 +326,12 @@ private void insertJobFacets(LineageEvent event, UUID jobUuid, UUID runUuid, Ins } private void insertDatasetFacets( - Dataset dataset, DatasetRecord record, UUID runUuid, String eventType, Instant now) { + ModelDaos daos, + Dataset dataset, + DatasetRecord record, + UUID runUuid, + String eventType, + Instant now) { // Facets ... Optional.ofNullable(dataset.getFacets()) .ifPresent( @@ -340,7 +347,12 @@ private void insertDatasetFacets( } private void insertInputDatasetFacets( - Dataset dataset, DatasetRecord record, UUID runUuid, String eventType, Instant now) { + ModelDaos daos, + Dataset dataset, + DatasetRecord record, + UUID runUuid, + String eventType, + Instant now) { // InputFacets ... Optional.ofNullable(dataset.getInputFacets()) .ifPresent( @@ -356,7 +368,12 @@ private void insertInputDatasetFacets( } private void insertOutputDatasetFacets( - Dataset dataset, DatasetRecord record, UUID runUuid, String eventType, Instant now) { + ModelDaos daos, + Dataset dataset, + DatasetRecord record, + UUID runUuid, + String eventType, + Instant now) { // OutputFacets ... Optional.ofNullable(dataset.getOutputFacets()) .ifPresent( @@ -640,7 +657,7 @@ default JobType getJobType(Job job) { } default DatasetRecord upsertLineageDataset( - Dataset ds, Instant now, UUID runUuid, boolean isInput) { + ModelDaos daos, Dataset ds, Instant now, UUID runUuid, boolean isInput) { NamespaceRow dsNamespace = daos.getNamespaceDao() .upsertNamespaceRow(UUID.randomUUID(), now, ds.getNamespace(), DEFAULT_NAMESPACE_OWNER); diff --git a/api/src/main/java/marquez/db/models/ModelDaos.java b/api/src/main/java/marquez/db/models/ModelDaos.java index 1e2f2f8f85..f123ee9f67 100644 --- a/api/src/main/java/marquez/db/models/ModelDaos.java +++ b/api/src/main/java/marquez/db/models/ModelDaos.java @@ -26,20 +26,20 @@ * exactly once. */ public final class ModelDaos { - private static NamespaceDao namespaceDao = null; - private static DatasetSymlinkDao datasetSymlinkDao = null; - private static DatasetDao datasetDao = null; - private static SourceDao sourceDao = null; - private static DatasetVersionDao datasetVersionDao = null; - private static DatasetFieldDao datasetFieldDao = null; - private static RunDao runDao = null; - private static DatasetFacetsDao datasetFacetsDao = null; - private static ColumnLineageDao columnLineageDao = null; - private static JobDao jobDao = null; - private static JobFacetsDao jobFacetsDao = null; - private static RunArgsDao runArgsDao = null; - private static RunStateDao runStateDao = null; - private static RunFacetsDao runFacetsDao = null; + private NamespaceDao namespaceDao = null; + private DatasetSymlinkDao datasetSymlinkDao = null; + private DatasetDao datasetDao = null; + private SourceDao sourceDao = null; + private DatasetVersionDao datasetVersionDao = null; + private DatasetFieldDao datasetFieldDao = null; + private RunDao runDao = null; + private DatasetFacetsDao datasetFacetsDao = null; + private ColumnLineageDao columnLineageDao = null; + private JobDao jobDao = null; + private JobFacetsDao jobFacetsDao = null; + private RunArgsDao runArgsDao = null; + private RunStateDao runStateDao = null; + private RunFacetsDao runFacetsDao = null; private BaseDao baseDao; public void initBaseDao(BaseDao baseDao) { diff --git a/api/src/test/java/marquez/service/LineageServiceTest.java b/api/src/test/java/marquez/service/LineageServiceTest.java index 7f6828dfa0..df6083146e 100644 --- a/api/src/test/java/marquez/service/LineageServiceTest.java +++ b/api/src/test/java/marquez/service/LineageServiceTest.java @@ -56,7 +56,6 @@ public class LineageServiceTest { private static LineageDao lineageDao; private static LineageService lineageService; private static OpenLineageDao openLineageDao; - private static DatasetDao datasetDao; private static JobDao jobDao;