diff --git a/CHANGELOG.md b/CHANGELOG.md index 3fe533c057..9b772c2b94 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,8 @@ *Improves experience with large graphs by adding a new tab to move between graph elements without looking at the graph itself.* * Web: add hover-over Tag tooltip to datasets [`#2630`](https://github.com/MarquezProject/marquez/pull/2630) [@davidsharp7](https://github.com/davidsharp7) *For parity with columns in the GUI, this adds a Tag tooltip to datasets.* +* API: upstream run-level lineage API [`#2658`](https://github.com/MarquezProject/marquez/pull/2658) [@julienledem]( https://github.com/julienledem) + *When trouble shooting an issue and doing root cause analysis, it is usefull to get the upstream run-level lineage to know exactly what version of each job and dataset a run is depending on.* ### Changed * Docker: upgrade to Docker Compose V2 [`#2644`](https://github.com/MarquezProject/marquez/pull/2644) [@merobi-hub](https://github.com/merobi-hub) diff --git a/api/src/main/java/marquez/api/OpenLineageResource.java b/api/src/main/java/marquez/api/OpenLineageResource.java index 253c8c46a7..ad7d36de04 100644 --- a/api/src/main/java/marquez/api/OpenLineageResource.java +++ b/api/src/main/java/marquez/api/OpenLineageResource.java @@ -36,6 +36,7 @@ import lombok.Value; import lombok.extern.slf4j.Slf4j; import marquez.api.models.SortDirection; +import marquez.common.models.RunId; import marquez.db.OpenLineageDao; import marquez.service.ServiceFactory; import marquez.service.models.BaseEvent; @@ -136,6 +137,28 @@ public Response getLineageEvents( return Response.ok(new Events(events, totalCount)).build(); } + /** + * Returns the upstream lineage for a given run. Recursively: run -> dataset version it read from + * -> the run that produced it + * + * @param runId the run to get upstream lineage from + * @param depth the maximum depth of the upstream lineage + * @return the upstream lineage for that run up to `detph` levels + */ + @Timed + @ResponseMetered + @ExceptionMetered + @GET + @Consumes(APPLICATION_JSON) + @Produces(APPLICATION_JSON) + @Path("/runlineage/upstream") + public Response getRunLineageUpstream( + @QueryParam("runId") @NotNull RunId runId, + @QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth) { + throwIfNotExists(runId); + return Response.ok(lineageService.upstream(runId, depth)).build(); + } + @Value static class Events { @NonNull diff --git a/api/src/main/java/marquez/db/Columns.java b/api/src/main/java/marquez/db/Columns.java index dfdf67492a..fe7c1fcf5f 100644 --- a/api/src/main/java/marquez/db/Columns.java +++ b/api/src/main/java/marquez/db/Columns.java @@ -150,9 +150,7 @@ public static UUID uuidOrNull(final ResultSet results, final String column) thro } public static UUID uuidOrThrow(final ResultSet results, final String column) throws SQLException { - if (results.getObject(column) == null) { - throw new IllegalArgumentException(); - } + checkNotNull(results, column); return results.getObject(column, UUID.class); } @@ -166,9 +164,7 @@ public static Instant timestampOrNull(final ResultSet results, final String colu public static Instant timestampOrThrow(final ResultSet results, final String column) throws SQLException { - if (results.getObject(column) == null) { - throw new IllegalArgumentException(); - } + checkNotNull(results, column); return results.getTimestamp(column).toInstant(); } @@ -182,9 +178,7 @@ public static String stringOrNull(final ResultSet results, final String column) public static String stringOrThrow(final ResultSet results, final String column) throws SQLException { - if (results.getObject(column) == null) { - throw new IllegalArgumentException(); - } + checkNotNull(results, column); return results.getString(column); } @@ -199,40 +193,30 @@ public static boolean booleanOrDefault( public static boolean booleanOrThrow(final ResultSet results, final String column) throws SQLException { - if (results.getObject(column) == null) { - throw new IllegalArgumentException(); - } + checkNotNull(results, column); return results.getBoolean(column); } public static int intOrThrow(final ResultSet results, final String column) throws SQLException { - if (results.getObject(column) == null) { - throw new IllegalArgumentException(); - } + checkNotNull(results, column); return results.getInt(column); } public static PGInterval pgIntervalOrThrow(final ResultSet results, final String column) throws SQLException { - if (results.getObject(column) == null) { - throw new IllegalArgumentException(); - } + checkNotNull(results, column); return new PGInterval(results.getString(column)); } public static BigDecimal bigDecimalOrThrow(final ResultSet results, final String column) throws SQLException { - if (results.getObject(column) == null) { - throw new IllegalArgumentException(); - } + checkNotNull(results, column); return results.getBigDecimal(column); } public static List uuidArrayOrThrow(final ResultSet results, final String column) throws SQLException { - if (results.getObject(column) == null) { - throw new IllegalArgumentException(); - } + checkNotNull(results, column); return Arrays.asList((UUID[]) results.getArray(column).getArray()); } @@ -246,9 +230,7 @@ public static List uuidArrayOrEmpty(final ResultSet results, final String public static List stringArrayOrThrow(final ResultSet results, final String column) throws SQLException { - if (results.getObject(column) == null) { - throw new IllegalArgumentException(); - } + checkNotNull(results, column); return Arrays.asList((String[]) results.getArray(column).getArray()); } @@ -311,4 +293,11 @@ public static PGobject toPgObject(@NonNull final Object object) { } return jsonObject; } + + private static void checkNotNull(final ResultSet results, final String column) + throws SQLException { + if (results.getObject(column) == null) { + throw new IllegalArgumentException(column + " not found in result"); + } + } } diff --git a/api/src/main/java/marquez/db/LineageDao.java b/api/src/main/java/marquez/db/LineageDao.java index c45a06e5a9..5e520b22a6 100644 --- a/api/src/main/java/marquez/db/LineageDao.java +++ b/api/src/main/java/marquez/db/LineageDao.java @@ -5,15 +5,22 @@ package marquez.db; +import java.time.Instant; import java.util.Collection; import java.util.List; import java.util.Optional; import java.util.Set; import java.util.UUID; +import javax.validation.constraints.NotNull; +import marquez.common.models.DatasetName; +import marquez.common.models.JobName; +import marquez.common.models.NamespaceName; +import marquez.common.models.RunId; import marquez.db.mappers.DatasetDataMapper; import marquez.db.mappers.JobDataMapper; import marquez.db.mappers.JobRowMapper; import marquez.db.mappers.RunMapper; +import marquez.db.mappers.UpstreamRunRowMapper; import marquez.service.models.DatasetData; import marquez.service.models.JobData; import marquez.service.models.Run; @@ -25,8 +32,18 @@ @RegisterRowMapper(JobDataMapper.class) @RegisterRowMapper(RunMapper.class) @RegisterRowMapper(JobRowMapper.class) +@RegisterRowMapper(UpstreamRunRowMapper.class) public interface LineageDao { + public record JobSummary(NamespaceName namespace, JobName name, UUID version) {} + + public record RunSummary(RunId id, Instant start, Instant end, String status) {} + + public record DatasetSummary( + NamespaceName namespace, DatasetName name, UUID version, RunId producedByRunId) {} + + public record UpstreamRunRow(JobSummary job, RunSummary run, DatasetSummary input) {} + /** * Fetch all of the jobs that consume or produce the datasets that are consumed or produced by the * input jobIds. This returns a single layer from the BFS using datasets as edges. Jobs that have @@ -154,4 +171,51 @@ SELECT DISTINCT on(r.job_name, r.namespace_name) r.*, jv.version as job_version WHERE j.uuid in () OR j.symlink_target_uuid IN () ORDER BY r.job_name, r.namespace_name, created_at DESC""") List getCurrentRuns(@BindList Collection jobUuid); + + @SqlQuery( + """ + WITH RECURSIVE + upstream_runs( + r_uuid, -- run uuid + dataset_uuid, dataset_version_uuid, dataset_namespace, dataset_name, -- input dataset version to the run + u_r_uuid, -- upstream run that produced that dataset version + depth -- current depth of traversal + ) AS ( + + -- initial case: find the inputs of the initial runs + select r.uuid, + dv.dataset_uuid, dv."version", dv.namespace_name, dv.dataset_name, + dv.run_uuid, + 0 AS depth -- starts at 0 + FROM (SELECT :runId::uuid AS uuid) r -- initial run + LEFT JOIN runs_input_mapping rim ON rim.run_uuid = r.uuid + LEFT JOIN dataset_versions dv ON dv.uuid = rim.dataset_version_uuid + + UNION + + -- recursion: find the inputs of the inputs found on the previous iteration and increase depth to know when to stop + SELECT + ur.u_r_uuid, + dv2.dataset_uuid, dv2."version", dv2.namespace_name, dv2.dataset_name, + dv2.run_uuid, + ur.depth + 1 AS depth -- increase depth to check end condition + FROM upstream_runs ur + LEFT JOIN runs_input_mapping rim2 ON rim2.run_uuid = ur.u_r_uuid + LEFT JOIN dataset_versions dv2 ON dv2.uuid = rim2.dataset_version_uuid + -- end condition of the recursion: no input or depth is over the maximum set + -- also avoid following cycles (ex: merge statement) + WHERE ur.u_r_uuid IS NOT NULL AND ur.u_r_uuid <> ur.r_uuid AND depth < :depth + ) + + -- present the result: use Distinct as we may have traversed the same edge multiple times if there are diamonds in the graph. + SELECT * FROM ( -- we need the extra statement to sort after the DISTINCT + SELECT DISTINCT ON (upstream_runs.r_uuid, upstream_runs.dataset_version_uuid, upstream_runs.u_r_uuid) + upstream_runs.*, + r.started_at, r.ended_at, r.current_run_state as state, + r.job_uuid, r.job_version_uuid, r.namespace_name as job_namespace, r.job_name + FROM upstream_runs, runs r WHERE upstream_runs.r_uuid = r.uuid + ) sub + ORDER BY depth ASC, job_name ASC; + """) + List getUpstreamRuns(@NotNull UUID runId, int depth); } diff --git a/api/src/main/java/marquez/db/mappers/UpstreamRunRowMapper.java b/api/src/main/java/marquez/db/mappers/UpstreamRunRowMapper.java new file mode 100644 index 0000000000..37d49a4cad --- /dev/null +++ b/api/src/main/java/marquez/db/mappers/UpstreamRunRowMapper.java @@ -0,0 +1,55 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.db.mappers; + +import static marquez.db.Columns.stringOrNull; +import static marquez.db.Columns.stringOrThrow; +import static marquez.db.Columns.timestampOrNull; +import static marquez.db.Columns.uuidOrThrow; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Optional; +import java.util.UUID; +import lombok.NonNull; +import marquez.common.models.DatasetName; +import marquez.common.models.JobName; +import marquez.common.models.NamespaceName; +import marquez.common.models.RunId; +import marquez.db.Columns; +import marquez.db.LineageDao.DatasetSummary; +import marquez.db.LineageDao.JobSummary; +import marquez.db.LineageDao.RunSummary; +import marquez.db.LineageDao.UpstreamRunRow; +import org.jdbi.v3.core.mapper.RowMapper; +import org.jdbi.v3.core.statement.StatementContext; + +/** Maps the upstream query result set to a UpstreamRunRow */ +public final class UpstreamRunRowMapper implements RowMapper { + @Override + public UpstreamRunRow map(@NonNull ResultSet results, @NonNull StatementContext context) + throws SQLException { + return new UpstreamRunRow( + new JobSummary( + new NamespaceName(stringOrThrow(results, "job_namespace")), + new JobName(stringOrThrow(results, "job_name")), + Optional.ofNullable(stringOrNull(results, "job_version_uuid")) + .map(UUID::fromString) + .orElse(null)), + new RunSummary( + new RunId(uuidOrThrow(results, "r_uuid")), + timestampOrNull(results, Columns.STARTED_AT), + timestampOrNull(results, Columns.ENDED_AT), + stringOrThrow(results, Columns.STATE)), + results.getObject("dataset_name") == null + ? null + : new DatasetSummary( + new NamespaceName(stringOrThrow(results, "dataset_namespace")), + new DatasetName(stringOrThrow(results, "dataset_name")), + UUID.fromString(stringOrThrow(results, "dataset_version_uuid")), + new RunId(UUID.fromString(stringOrThrow(results, "u_r_uuid"))))); + } +} diff --git a/api/src/main/java/marquez/service/LineageService.java b/api/src/main/java/marquez/service/LineageService.java index 1c2dc34a05..7eddec4fd9 100644 --- a/api/src/main/java/marquez/service/LineageService.java +++ b/api/src/main/java/marquez/service/LineageService.java @@ -5,6 +5,9 @@ package marquez.service; +import static java.util.stream.Collectors.groupingBy; +import static java.util.stream.Collectors.toList; + import com.google.common.base.Functions; import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSortedSet; @@ -12,6 +15,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -21,14 +25,20 @@ import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.Stream; +import javax.validation.constraints.NotNull; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import marquez.common.models.DatasetId; import marquez.common.models.JobId; +import marquez.common.models.RunId; import marquez.db.JobDao; import marquez.db.LineageDao; +import marquez.db.LineageDao.DatasetSummary; +import marquez.db.LineageDao.JobSummary; +import marquez.db.LineageDao.RunSummary; import marquez.db.models.JobRow; import marquez.service.DelegatingDaos.DelegatingLineageDao; +import marquez.service.LineageService.UpstreamRunLineage; import marquez.service.models.DatasetData; import marquez.service.models.Edge; import marquez.service.models.Graph; @@ -41,6 +51,11 @@ @Slf4j public class LineageService extends DelegatingLineageDao { + + public record UpstreamRunLineage(List runs) {} + + public record UpstreamRun(JobSummary job, RunSummary run, List inputs) {} + private final JobDao jobDao; public LineageService(LineageDao delegate, JobDao jobDao) { @@ -252,4 +267,32 @@ public Optional getJobUuid(NodeId nodeId) { String.format("Node '%s' must be of type dataset or job!", nodeId.getValue())); } } + + /** + * Returns the upstream lineage for a given run. Recursively: run -> dataset version it read from + * -> the run that produced it + * + * @param runId the run to get upstream lineage from + * @param depth the maximum depth of the upstream lineage + * @return the upstream lineage for that run up to `detph` levels + */ + public UpstreamRunLineage upstream(@NotNull RunId runId, int depth) { + List upstreamRuns = getUpstreamRuns(runId.getValue(), depth); + Map> collect = + upstreamRuns.stream().collect(groupingBy(r -> r.run().id(), LinkedHashMap::new, toList())); + List runs = + collect.entrySet().stream() + .map( + row -> { + UpstreamRunRow upstreamRunRow = row.getValue().get(0); + List inputs = + row.getValue().stream() + .map(UpstreamRunRow::input) + .filter(i -> i != null) + .collect(toList()); + return new UpstreamRun(upstreamRunRow.job(), upstreamRunRow.run(), inputs); + }) + .collect(toList()); + return new UpstreamRunLineage(runs); + } } diff --git a/api/src/test/java/marquez/db/LineageDaoTest.java b/api/src/test/java/marquez/db/LineageDaoTest.java index 354ab495dc..aed23901a8 100644 --- a/api/src/test/java/marquez/db/LineageDaoTest.java +++ b/api/src/test/java/marquez/db/LineageDaoTest.java @@ -29,6 +29,7 @@ import java.util.stream.Stream; import marquez.api.JdbiUtils; import marquez.common.models.JobType; +import marquez.db.LineageDao.UpstreamRunRow; import marquez.db.LineageTestUtils.DatasetConsumerJob; import marquez.db.LineageTestUtils.JobLineage; import marquez.db.models.JobRow; @@ -888,4 +889,88 @@ public void testGetCurrentRunsGetsLatestRun() { .extracting(r -> r.getId().getValue()) .containsAll(expectedRunIds); } + + @Test + public void testGetRunLineage() { + + Dataset upstreamDataset = new Dataset(NAMESPACE, "upstreamDataset", null); + + UpdateLineageRow upstreamJob = + LineageTestUtils.createLineageRow( + openLineageDao, + "upstreamJob", + "COMPLETE", + jobFacet, + Arrays.asList(), + Arrays.asList(upstreamDataset)); + + UpdateLineageRow writeJob = + LineageTestUtils.createLineageRow( + openLineageDao, + "writeJob", + "COMPLETE", + jobFacet, + Arrays.asList(upstreamDataset), + Arrays.asList(dataset)); + List jobRows = + writeDownstreamLineage( + openLineageDao, + new LinkedList<>( + Arrays.asList( + new DatasetConsumerJob("readJob", 20, Optional.of("outputData")), + new DatasetConsumerJob("downstreamJob", 1, Optional.empty()))), + jobFacet, + dataset); + + // don't expect a failed job in the returned lineage + UpdateLineageRow failedJobRow = + LineageTestUtils.createLineageRow( + openLineageDao, + "readJobFailed", + "FAILED", + jobFacet, + Arrays.asList(dataset), + Arrays.asList()); + + // don't expect a disjoint job in the returned lineage + UpdateLineageRow disjointJob = + LineageTestUtils.createLineageRow( + openLineageDao, + "writeRandomDataset", + "COMPLETE", + jobFacet, + Arrays.asList( + new Dataset( + NAMESPACE, + "randomDataset", + newDatasetFacet( + new SchemaField("firstname", "string", "the first name"), + new SchemaField("lastname", "string", "the last name")))), + Arrays.asList()); + + { + List upstream = + lineageDao.getUpstreamRuns(failedJobRow.getRun().getUuid(), 10); + + assertThat(upstream).size().isEqualTo(3); + assertThat(upstream.get(0).job().name().getValue()) + .isEqualTo(failedJobRow.getJob().getName()); + assertThat(upstream.get(0).input().name().getValue()).isEqualTo(dataset.getName()); + assertThat(upstream.get(1).job().name().getValue()).isEqualTo(writeJob.getJob().getName()); + assertThat(upstream.get(1).input().name().getValue()).isEqualTo(upstreamDataset.getName()); + assertThat(upstream.get(2).job().name().getValue()).isEqualTo(upstreamJob.getJob().getName()); + } + + { + List upstream2 = lineageDao.getUpstreamRuns(jobRows.get(0).getRunId(), 10); + + assertThat(upstream2).size().isEqualTo(3); + assertThat(upstream2.get(0).job().name().getValue()).isEqualTo(jobRows.get(0).getName()); + assertThat(upstream2.get(0).input().name().getValue()).isEqualTo(dataset.getName()); + assertThat(upstream2.get(1).job().name().getValue()).isEqualTo(writeJob.getJob().getName()); + assertThat(upstream2.get(1).input().name().getValue()).isEqualTo(upstreamDataset.getName()); + assertThat(upstream2.get(2).job().name().getValue()) + .isEqualTo(upstreamJob.getJob().getName()); + } + } } diff --git a/api/src/test/java/marquez/service/LineageServiceTest.java b/api/src/test/java/marquez/service/LineageServiceTest.java index df6083146e..97f6b9ce02 100644 --- a/api/src/test/java/marquez/service/LineageServiceTest.java +++ b/api/src/test/java/marquez/service/LineageServiceTest.java @@ -9,11 +9,13 @@ import static marquez.db.LineageTestUtils.newDatasetFacet; import static marquez.db.LineageTestUtils.writeDownstreamLineage; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertTrue; import java.util.Arrays; import java.util.LinkedList; import java.util.List; import java.util.Optional; +import java.util.stream.Collectors; import marquez.api.JdbiUtils; import marquez.common.models.DatasetName; import marquez.common.models.InputDatasetVersion; @@ -30,7 +32,9 @@ import marquez.db.OpenLineageDao; import marquez.db.models.UpdateLineageRow; import marquez.jdbi.MarquezJdbiExternalPostgresExtension; +import marquez.service.LineageService.UpstreamRunLineage; import marquez.service.models.Edge; +import marquez.service.models.Job; import marquez.service.models.JobData; import marquez.service.models.Lineage; import marquez.service.models.LineageEvent.Dataset; @@ -194,6 +198,27 @@ && jobNameEquals(n, "downstreamJob0<-outputData<-readJob0<-commonDataset")) NodeId.of( new NamespaceName(NAMESPACE), new DatasetName("outputData<-readJob0<-commonDataset"))); + + List jobs = jobDao.findAllWithRun(NAMESPACE, 1000, 0); + jobs = + jobs.stream() + .filter(j -> j.getName().getValue().contains("newDownstreamJob")) + .collect(Collectors.toList()); + assertTrue(jobs.size() > 0); + Job job = jobs.get(0); + assertTrue(job.getLatestRun().isPresent()); + UpstreamRunLineage upstreamLineage = + lineageService.upstream(job.getLatestRun().get().getId(), 10); + assertThat(upstreamLineage.runs()).size().isEqualTo(3); + assertThat(upstreamLineage.runs().get(0).job().name().getValue()) + .matches("newDownstreamJob.*<-outputData.*<-newReadJob.*<-commonDataset"); + assertThat(upstreamLineage.runs().get(0).inputs().get(0).name().getValue()) + .matches("outputData.*<-newReadJob.*<-commonDataset"); + assertThat(upstreamLineage.runs().get(1).job().name().getValue()) + .matches("newReadJob.*<-commonDataset"); + assertThat(upstreamLineage.runs().get(1).inputs().get(0).name().getValue()) + .isEqualTo("commonDataset"); + assertThat(upstreamLineage.runs().get(2).job().name().getValue()).isEqualTo("writeJob"); } @Test diff --git a/spec/openapi.yml b/spec/openapi.yml index 5b79d0dbfa..b8d4d0488d 100644 --- a/spec/openapi.yml +++ b/spec/openapi.yml @@ -603,6 +603,23 @@ paths: schema: $ref: '#/components/schemas/LineageGraph' + /runlineage/upstream: + get: + operationId: getRunLineageUpstream + paramteters: + - $ref: '#/components/parameters/runId' + - $ref: '#/components/parameters/depth' + tags: + - Lineage + summary: Get the upstream lineage for a given run + responses: + '200': + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/UpstreamRunLineage' + /column-lineage: get: operationId: getLineage @@ -1913,6 +1930,64 @@ components: example: description: My first tag! + UpstreamRunLineage: + type: object + properties: + runs: + description: the list of upstream runs including the selected run in topological order upstream + type: array + items: + type: object + propoerties: + job: + type: object + description: identifier of the job version for this run + propoerties: + namespace: + type: string + name: + type: string + version: + type: string + format: uuid + run: + description: run information + type: object + properties: + id: + description: the run identifier + type: string + format: uuid + start: + description: start time of the run + type: string + format: date-time + end: + description: end time of the run + type: string + format: date-time + status: + description: whether the run completed succesfully or not + type: string + inputs: + description: ustream input dataset versions + type: array + items: + description: identifier of the upstream dataset version consumed by this run + type: object + properties: + namespace: + type: string + name: + type: string + version: + type: string + format: uuid + producedByRunId: + description: the run that produced this dataset version + type: string + format: uuid + LineageEvent: example: eventType: COMPLETE