diff --git a/api/src/main/java/marquez/service/OpenLineageService.java b/api/src/main/java/marquez/service/OpenLineageService.java index b8ac4ec64f..9c7deb4e24 100644 --- a/api/src/main/java/marquez/service/OpenLineageService.java +++ b/api/src/main/java/marquez/service/OpenLineageService.java @@ -30,6 +30,7 @@ import marquez.common.models.JobVersionId; import marquez.common.models.NamespaceName; import marquez.common.models.RunId; +import marquez.common.models.RunState; import marquez.db.BaseDao; import marquez.db.DatasetDao; import marquez.db.DatasetVersionDao; @@ -37,11 +38,13 @@ import marquez.db.models.JobRow; import marquez.db.models.RunArgsRow; import marquez.db.models.RunRow; +import marquez.db.models.RunStateRow; import marquez.db.models.UpdateLineageRow; import marquez.service.RunTransitionListener.JobInputUpdate; import marquez.service.RunTransitionListener.JobOutputUpdate; import marquez.service.RunTransitionListener.RunInput; import marquez.service.RunTransitionListener.RunOutput; +import marquez.service.RunTransitionListener.RunTransition; import marquez.service.models.LineageEvent; import marquez.service.models.RunMeta; @@ -91,6 +94,7 @@ public CompletableFuture createAsync(LineageEvent event) { buildJobOutputUpdate(update).ifPresent(runService::notify); } buildJobInputUpdate(update).ifPresent(runService::notify); + buildRunTransition(update).ifPresent(runService::notify); } }); @@ -222,4 +226,15 @@ private DatasetVersionId buildDatasetVersionId(ExtendedDatasetVersionRow ds) { .name(DatasetName.of(ds.getDatasetName())) .build(); } + + private Optional buildRunTransition(UpdateLineageRow record) { + RunId runId = RunId.of(record.getRun().getUuid()); + RunStateRow runStateRow = record.getRunState(); + if (runStateRow == null) { + return Optional.empty(); + } + RunState newState = RunState.valueOf(runStateRow.getState()); + RunState oldState = newState.isStarting() ? null : RunState.RUNNING; + return Optional.of(new RunTransition(runId, oldState, newState)); + } } diff --git a/api/src/test/java/marquez/service/OpenLineageServiceIntegrationTest.java b/api/src/test/java/marquez/service/OpenLineageServiceIntegrationTest.java index 103c749bce..f6eaf51eeb 100644 --- a/api/src/test/java/marquez/service/OpenLineageServiceIntegrationTest.java +++ b/api/src/test/java/marquez/service/OpenLineageServiceIntegrationTest.java @@ -45,6 +45,7 @@ import marquez.jdbi.MarquezJdbiExternalPostgresExtension; import marquez.service.RunTransitionListener.JobInputUpdate; import marquez.service.RunTransitionListener.JobOutputUpdate; +import marquez.service.RunTransitionListener.RunTransition; import marquez.service.models.Dataset; import marquez.service.models.Job; import marquez.service.models.LineageEvent; @@ -79,6 +80,7 @@ public class OpenLineageServiceIntegrationTest { private DatasetVersionDao datasetVersionDao; private ArgumentCaptor runInputListener; private ArgumentCaptor runOutputListener; + private ArgumentCaptor runTransitionListener; private OpenLineageService lineageService; public static String EVENT_REQUIRED_ONLY = "open_lineage/event_required_only.json"; @@ -145,6 +147,8 @@ public void setup(Jdbi jdbi) throws SQLException { doNothing().when(runService).notify(runInputListener.capture()); runOutputListener = ArgumentCaptor.forClass(JobOutputUpdate.class); doNothing().when(runService).notify(runOutputListener.capture()); + runTransitionListener = ArgumentCaptor.forClass(RunTransition.class); + doNothing().when(runService).notify(runTransitionListener.capture()); lineageService = new OpenLineageService(openLineageDao, runService); datasetDao = jdbi.onDemand(DatasetDao.class); @@ -243,6 +247,19 @@ public void testRunListenerOutput(List uris, ExpectedResults expectedResult } } + @ParameterizedTest + @MethodSource("getData") + public void testRunTransition(List uris, ExpectedResults expectedResults) { + initEvents(uris); + + if (expectedResults.inputEventCount > 0) { + Assertions.assertEquals( + uris.size(), + runTransitionListener.getAllValues().size(), + "RunTransition happens once for each run"); + } + } + @ParameterizedTest @MethodSource({"getData"}) public void serviceCalls(List uris, ExpectedResults expectedResults) {