diff --git a/CHANGELOG.md b/CHANGELOG.md index ecdec36bb5..6b075dbd89 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ ### Added * Implemented dataset symlink feature which allows providing multiple names for a dataset and adds edges to lineage graph based on symlinks [`#2066`](https://github.com/MarquezProject/marquez/pull/2066) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski) * Store column lineage facets in separate table [`#2096`](https://github.com/MarquezProject/marquez/pull/2096) [@mzareba382](https://github.com/mzareba382) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski) +* Lineage graph endpoint for column lineage [`#2124`](https://github.com/MarquezProject/marquez/pull/2124) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski) ### Fixed * Add support for `parentRun` facet as reported by older Airflow OpenLineage versions [@collado-mike](https://github.com/collado-mike) diff --git a/api/src/main/java/marquez/MarquezContext.java b/api/src/main/java/marquez/MarquezContext.java index b982bca539..600d9fcdfc 100644 --- a/api/src/main/java/marquez/MarquezContext.java +++ b/api/src/main/java/marquez/MarquezContext.java @@ -13,6 +13,7 @@ import java.util.List; import lombok.Getter; import lombok.NonNull; +import marquez.api.ColumnLineageResource; import marquez.api.DatasetResource; import marquez.api.JobResource; import marquez.api.NamespaceResource; @@ -22,6 +23,7 @@ import marquez.api.TagResource; import marquez.api.exceptions.JdbiExceptionExceptionMapper; import marquez.db.BaseDao; +import marquez.db.ColumnLineageDao; import marquez.db.DatasetDao; import marquez.db.DatasetFieldDao; import marquez.db.DatasetVersionDao; @@ -39,6 +41,7 @@ import marquez.db.TagDao; import marquez.graphql.GraphqlSchemaBuilder; import marquez.graphql.MarquezGraphqlServletBuilder; +import marquez.service.ColumnLineageService; import marquez.service.DatasetFieldService; import marquez.service.DatasetService; import marquez.service.DatasetVersionService; @@ -70,6 +73,7 @@ public final class MarquezContext { @Getter private final TagDao tagDao; @Getter private final OpenLineageDao openLineageDao; @Getter private final LineageDao lineageDao; + @Getter private final ColumnLineageDao columnLineageDao; @Getter private final SearchDao searchDao; @Getter private final List runTransitionListeners; @@ -81,9 +85,11 @@ public final class MarquezContext { @Getter private final RunService runService; @Getter private final OpenLineageService openLineageService; @Getter private final LineageService lineageService; + @Getter private final ColumnLineageService columnLineageService; @Getter private final NamespaceResource namespaceResource; @Getter private final SourceResource sourceResource; @Getter private final DatasetResource datasetResource; + @Getter private final ColumnLineageResource columnLineageResource; @Getter private final JobResource jobResource; @Getter private final TagResource tagResource; @Getter private final OpenLineageResource openLineageResource; @@ -115,6 +121,7 @@ private MarquezContext( this.tagDao = jdbi.onDemand(TagDao.class); this.openLineageDao = jdbi.onDemand(OpenLineageDao.class); this.lineageDao = jdbi.onDemand(LineageDao.class); + this.columnLineageDao = jdbi.onDemand(ColumnLineageDao.class); this.searchDao = jdbi.onDemand(SearchDao.class); this.runTransitionListeners = runTransitionListeners; @@ -128,6 +135,7 @@ private MarquezContext( this.tagService.init(tags); this.openLineageService = new OpenLineageService(baseDao, runService); this.lineageService = new LineageService(lineageDao, jobDao); + this.columnLineageService = new ColumnLineageService(columnLineageDao, datasetFieldDao); this.jdbiException = new JdbiExceptionExceptionMapper(); final ServiceFactory serviceFactory = ServiceFactory.builder() @@ -139,12 +147,14 @@ private MarquezContext( .openLineageService(openLineageService) .sourceService(sourceService) .lineageService(lineageService) + .columnLineageService(columnLineageService) .datasetFieldService(new DatasetFieldService(baseDao)) .datasetVersionService(new DatasetVersionService(baseDao)) .build(); this.namespaceResource = new NamespaceResource(serviceFactory); this.sourceResource = new SourceResource(serviceFactory); this.datasetResource = new DatasetResource(serviceFactory); + this.columnLineageResource = new ColumnLineageResource(serviceFactory); this.jobResource = new JobResource(serviceFactory, jobVersionDao); this.tagResource = new TagResource(serviceFactory); this.openLineageResource = new OpenLineageResource(serviceFactory, openLineageDao); @@ -155,6 +165,7 @@ private MarquezContext( namespaceResource, sourceResource, datasetResource, + columnLineageResource, jobResource, tagResource, jdbiException, diff --git a/api/src/main/java/marquez/api/BaseResource.java b/api/src/main/java/marquez/api/BaseResource.java index ce15d31ab3..7b116ab596 100644 --- a/api/src/main/java/marquez/api/BaseResource.java +++ b/api/src/main/java/marquez/api/BaseResource.java @@ -25,6 +25,7 @@ import marquez.common.models.NamespaceName; import marquez.common.models.RunId; import marquez.common.models.SourceName; +import marquez.service.ColumnLineageService; import marquez.service.DatasetFieldService; import marquez.service.DatasetService; import marquez.service.DatasetVersionService; @@ -50,6 +51,7 @@ public class BaseResource { protected DatasetVersionService datasetVersionService; protected DatasetFieldService datasetFieldService; protected LineageService lineageService; + protected ColumnLineageService columnLineageService; public BaseResource(ServiceFactory serviceFactory) { this.serviceFactory = serviceFactory; @@ -63,6 +65,7 @@ public BaseResource(ServiceFactory serviceFactory) { this.datasetVersionService = serviceFactory.getDatasetVersionService(); this.datasetFieldService = serviceFactory.getDatasetFieldService(); this.lineageService = serviceFactory.getLineageService(); + this.columnLineageService = serviceFactory.getColumnLineageService(); } void throwIfNotExists(@NonNull NamespaceName namespaceName) { diff --git a/api/src/main/java/marquez/api/ColumnLineageResource.java b/api/src/main/java/marquez/api/ColumnLineageResource.java new file mode 100644 index 0000000000..154e2e7764 --- /dev/null +++ b/api/src/main/java/marquez/api/ColumnLineageResource.java @@ -0,0 +1,48 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.api; + +import static javax.ws.rs.core.MediaType.APPLICATION_JSON; + +import com.codahale.metrics.annotation.ExceptionMetered; +import com.codahale.metrics.annotation.ResponseMetered; +import com.codahale.metrics.annotation.Timed; +import java.time.Instant; +import java.util.concurrent.ExecutionException; +import javax.validation.constraints.NotNull; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Response; +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; +import marquez.service.ServiceFactory; +import marquez.service.models.NodeId; + +@Slf4j +@Path("/api/v1/column-lineage") +public class ColumnLineageResource extends BaseResource { + + private static final String DEFAULT_DEPTH = "20"; + + public ColumnLineageResource(@NonNull final ServiceFactory serviceFactory) { + super(serviceFactory); + } + + @Timed + @ResponseMetered + @ExceptionMetered + @GET + @Produces(APPLICATION_JSON) + public Response getLineage( + @QueryParam("nodeId") @NotNull NodeId nodeId, + @QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth) + throws ExecutionException, InterruptedException { + return Response.ok(columnLineageService.lineage(nodeId, depth, Instant.now())).build(); + } +} diff --git a/api/src/main/java/marquez/common/models/DatasetFieldId.java b/api/src/main/java/marquez/common/models/DatasetFieldId.java new file mode 100644 index 0000000000..89aa505019 --- /dev/null +++ b/api/src/main/java/marquez/common/models/DatasetFieldId.java @@ -0,0 +1,27 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.common.models; + +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.ToString; + +/** ID for {@code DatasetField}. */ +@EqualsAndHashCode +@AllArgsConstructor +@ToString +public class DatasetFieldId { + + @Getter private final DatasetId datasetId; + @Getter private final FieldName fieldName; + + public static DatasetFieldId of(String namespace, String datasetName, String field) { + return new DatasetFieldId( + new DatasetId(NamespaceName.of(namespace), DatasetName.of(datasetName)), + FieldName.of(field)); + } +} diff --git a/api/src/main/java/marquez/db/ColumnLineageDao.java b/api/src/main/java/marquez/db/ColumnLineageDao.java index 3bc9410a2d..c472937cd1 100644 --- a/api/src/main/java/marquez/db/ColumnLineageDao.java +++ b/api/src/main/java/marquez/db/ColumnLineageDao.java @@ -5,20 +5,27 @@ package marquez.db; +import static org.jdbi.v3.sqlobject.customizer.BindList.EmptyHandling.NULL_STRING; + import java.time.Instant; import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; +import marquez.db.mappers.ColumnLineageNodeDataMapper; import marquez.db.mappers.ColumnLineageRowMapper; +import marquez.db.models.ColumnLineageNodeData; import marquez.db.models.ColumnLineageRow; import org.apache.commons.lang3.tuple.Pair; import org.jdbi.v3.sqlobject.config.RegisterRowMapper; import org.jdbi.v3.sqlobject.customizer.BindBeanList; +import org.jdbi.v3.sqlobject.customizer.BindList; import org.jdbi.v3.sqlobject.statement.SqlQuery; import org.jdbi.v3.sqlobject.statement.SqlUpdate; @RegisterRowMapper(ColumnLineageRowMapper.class) +@RegisterRowMapper(ColumnLineageNodeDataMapper.class) public interface ColumnLineageDao extends BaseDao { default List upsertColumnLineageRow( @@ -88,4 +95,59 @@ void doUpsertColumnLineageRow( }, value = "values") List rows); + + @SqlQuery( + """ + WITH RECURSIVE + dataset_fields_view AS ( + SELECT d.namespace_name as namespace_name, d.name as dataset_name, df.name as field_name, df.type, df.uuid + FROM dataset_fields df + INNER JOIN datasets_view d ON d.uuid = df.dataset_uuid + ), + column_lineage_recursive AS ( + SELECT *, 0 as depth + FROM column_lineage + WHERE output_dataset_field_uuid IN () AND created_at <= :createdAtUntil + UNION + SELECT + upstream_node.output_dataset_version_uuid, + upstream_node.output_dataset_field_uuid, + upstream_node.input_dataset_version_uuid, + upstream_node.input_dataset_field_uuid, + upstream_node.transformation_description, + upstream_node.transformation_type, + upstream_node.created_at, + upstream_node.updated_at, + node.depth + 1 as depth + FROM column_lineage upstream_node, column_lineage_recursive node + WHERE node.input_dataset_field_uuid = upstream_node.output_dataset_field_uuid + AND node.depth < :depth + ) + SELECT + output_fields.namespace_name, + output_fields.dataset_name, + output_fields.field_name, + output_fields.type, + ARRAY_AGG(ARRAY[input_fields.namespace_name, input_fields.dataset_name, input_fields.field_name]) AS inputFields, + clr.transformation_description, + clr.transformation_type, + clr.created_at, + clr.updated_at + FROM column_lineage_recursive clr + INNER JOIN dataset_fields_view output_fields ON clr.output_dataset_field_uuid = output_fields.uuid -- hidden datasets will be filtered + LEFT JOIN dataset_fields_view input_fields ON clr.input_dataset_field_uuid = input_fields.uuid + GROUP BY + output_fields.namespace_name, + output_fields.dataset_name, + output_fields.field_name, + output_fields.type, + clr.transformation_description, + clr.transformation_type, + clr.created_at, + clr.updated_at + """) + Set getLineage( + int depth, + @BindList(onEmpty = NULL_STRING) List datasetFieldUuids, + Instant createdAtUntil); } diff --git a/api/src/main/java/marquez/db/DatasetFieldDao.java b/api/src/main/java/marquez/db/DatasetFieldDao.java index 99a2630016..a54a271604 100644 --- a/api/src/main/java/marquez/db/DatasetFieldDao.java +++ b/api/src/main/java/marquez/db/DatasetFieldDao.java @@ -93,6 +93,25 @@ default Dataset updateTags( + "WHERE dataset_uuid = :datasetUuid AND name = :name") Optional findUuid(UUID datasetUuid, String name); + @SqlQuery( + """ + SELECT df.uuid + FROM dataset_fields df + INNER JOIN datasets_view AS d + ON d.uuid = df.dataset_uuid AND d.name = :datasetName AND d.namespace_name = :namespace + """) + List findDatasetFieldsUuids(String namespace, String datasetName); + + @SqlQuery( + """ + SELECT df.uuid + FROM dataset_fields df + INNER JOIN datasets_view AS d + ON d.uuid = df.dataset_uuid AND d.name = :datasetName AND d.namespace_name = :namespace + WHERE df.name = :name + """) + Optional findUuid(String namespace, String datasetName, String name); + @SqlQuery( "SELECT f.*, " + "ARRAY(SELECT t.name " diff --git a/api/src/main/java/marquez/db/OpenLineageDao.java b/api/src/main/java/marquez/db/OpenLineageDao.java index 96611f9e65..a75f2b22f9 100644 --- a/api/src/main/java/marquez/db/OpenLineageDao.java +++ b/api/src/main/java/marquez/db/OpenLineageDao.java @@ -758,22 +758,27 @@ private List upsertColumnLineage( return Optional.ofNullable(ds.getFacets()) .map(DatasetFacets::getColumnLineage) - .map(LineageEvent.ColumnLineageFacet::getOutputColumnsList) + .map(LineageEvent.ColumnLineageDatasetFacet::getFields) + .map(LineageEvent.ColumnLineageDatasetFacetFields::getAdditional) .stream() - .flatMap(list -> list.stream()) + .flatMap(map -> map.keySet().stream()) + .filter( + columnName -> + ds.getFacets().getColumnLineage().getFields().getAdditional().get(columnName) + instanceof LineageEvent.ColumnLineageOutputColumn) .flatMap( - outputColumn -> { + columnName -> { + LineageEvent.ColumnLineageOutputColumn columnLineage = + ds.getFacets().getColumnLineage().getFields().getAdditional().get(columnName); Optional outputField = - datasetFields.stream() - .filter(dfr -> dfr.getName().equals(outputColumn.getName())) - .findAny(); + datasetFields.stream().filter(dfr -> dfr.getName().equals(columnName)).findAny(); if (outputField.isEmpty()) { Logger log = LoggerFactory.getLogger(OpenLineageDao.class); log.error( "Cannot produce column lineage for missing output field in output dataset: {}", - outputColumn.getName()); - return Stream.empty(); + columnName); + return Stream.empty(); } // get field uuids of input columns related to this run @@ -781,13 +786,12 @@ private List upsertColumnLineage( runFields.stream() .filter( fieldData -> - outputColumn.getInputFields().stream() + columnLineage.getInputFields().stream() .filter( of -> - of.getDatasetNamespace().equals(fieldData.getNamespace()) - && of.getDatasetName() - .equals(fieldData.getDatasetName()) - && of.getFieldName().equals(fieldData.getField())) + of.getNamespace().equals(fieldData.getNamespace()) + && of.getName().equals(fieldData.getDatasetName()) + && of.getField().equals(fieldData.getField())) .findAny() .isPresent()) .map( @@ -802,8 +806,8 @@ private List upsertColumnLineage( datasetVersionRow.getUuid(), outputField.get().getUuid(), inputFields, - outputColumn.getTransformationDescription(), - outputColumn.getTransformationType(), + columnLineage.getTransformationDescription(), + columnLineage.getTransformationType(), now) .stream(); }) diff --git a/api/src/main/java/marquez/db/mappers/ColumnLineageNodeDataMapper.java b/api/src/main/java/marquez/db/mappers/ColumnLineageNodeDataMapper.java new file mode 100644 index 0000000000..1163a62e2b --- /dev/null +++ b/api/src/main/java/marquez/db/mappers/ColumnLineageNodeDataMapper.java @@ -0,0 +1,60 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.db.mappers; + +import static marquez.db.Columns.TRANSFORMATION_DESCRIPTION; +import static marquez.db.Columns.TRANSFORMATION_TYPE; +import static marquez.db.Columns.stringOrNull; +import static marquez.db.Columns.stringOrThrow; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; +import marquez.common.Utils; +import marquez.db.Columns; +import marquez.db.models.ColumnLineageNodeData; +import marquez.db.models.InputFieldNodeData; +import org.jdbi.v3.core.mapper.RowMapper; +import org.jdbi.v3.core.statement.StatementContext; +import org.postgresql.jdbc.PgArray; + +@Slf4j +public class ColumnLineageNodeDataMapper implements RowMapper { + + private static final ObjectMapper MAPPER = Utils.getMapper(); + + @Override + public ColumnLineageNodeData map(ResultSet results, StatementContext ctx) throws SQLException { + return new ColumnLineageNodeData( + stringOrThrow(results, Columns.NAMESPACE_NAME), + stringOrThrow(results, Columns.DATASET_NAME), + stringOrThrow(results, Columns.FIELD_NAME), + stringOrThrow(results, Columns.TYPE), + stringOrNull(results, TRANSFORMATION_DESCRIPTION), + stringOrNull(results, TRANSFORMATION_TYPE), + toInputFields(results, "inputFields")); + } + + public static ImmutableList toInputFields(ResultSet results, String column) + throws SQLException { + if (results.getObject(column) == null) { + return ImmutableList.of(); + } + + PgArray pgArray = (PgArray) results.getObject(column); + Object[] deserializedArray = (Object[]) pgArray.getArray(); + + return ImmutableList.copyOf( + Arrays.asList(deserializedArray).stream() + .map(o -> (String[]) o) + .map(arr -> new InputFieldNodeData(arr[0], arr[1], arr[2])) + .collect(Collectors.toList())); + } +} diff --git a/api/src/main/java/marquez/db/mappers/ColumnLineageRowMapper.java b/api/src/main/java/marquez/db/mappers/ColumnLineageRowMapper.java index 6b6c3de099..9000fb5fdc 100644 --- a/api/src/main/java/marquez/db/mappers/ColumnLineageRowMapper.java +++ b/api/src/main/java/marquez/db/mappers/ColumnLineageRowMapper.java @@ -7,7 +7,7 @@ import static marquez.db.Columns.TRANSFORMATION_DESCRIPTION; import static marquez.db.Columns.TRANSFORMATION_TYPE; -import static marquez.db.Columns.stringOrThrow; +import static marquez.db.Columns.stringOrNull; import static marquez.db.Columns.timestampOrThrow; import static marquez.db.Columns.uuidOrThrow; @@ -29,8 +29,8 @@ public ColumnLineageRow map(@NonNull ResultSet results, @NonNull StatementContex uuidOrThrow(results, Columns.OUTPUT_DATASET_FIELD_UUID), uuidOrThrow(results, Columns.INPUT_DATASET_VERSION_UUID), uuidOrThrow(results, Columns.INPUT_DATASET_FIELD_UUID), - stringOrThrow(results, TRANSFORMATION_DESCRIPTION), - stringOrThrow(results, TRANSFORMATION_TYPE), + stringOrNull(results, TRANSFORMATION_DESCRIPTION), + stringOrNull(results, TRANSFORMATION_TYPE), timestampOrThrow(results, Columns.CREATED_AT), timestampOrThrow(results, Columns.UPDATED_AT)); } diff --git a/api/src/main/java/marquez/db/models/ColumnLineageNodeData.java b/api/src/main/java/marquez/db/models/ColumnLineageNodeData.java new file mode 100644 index 0000000000..3c54dc1fe0 --- /dev/null +++ b/api/src/main/java/marquez/db/models/ColumnLineageNodeData.java @@ -0,0 +1,23 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.db.models; + +import java.util.List; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NonNull; + +@Getter +@AllArgsConstructor +public class ColumnLineageNodeData implements NodeData { + @NonNull String namespace; + @NonNull String dataset; + @NonNull String field; + @NonNull String fieldType; + String transformationDescription; + String transformationType; + @NonNull List inputFields; +} diff --git a/api/src/main/java/marquez/db/models/ColumnLineageRow.java b/api/src/main/java/marquez/db/models/ColumnLineageRow.java index 8ccd47c580..9508122a7f 100644 --- a/api/src/main/java/marquez/db/models/ColumnLineageRow.java +++ b/api/src/main/java/marquez/db/models/ColumnLineageRow.java @@ -6,6 +6,7 @@ package marquez.db.models; import java.time.Instant; +import java.util.Optional; import java.util.UUID; import lombok.AllArgsConstructor; import lombok.EqualsAndHashCode; @@ -21,8 +22,16 @@ public class ColumnLineageRow { @Getter @NonNull private final UUID outputDatasetFieldUuid; @Getter @NonNull private final UUID inputDatasetVersionUuid; @Getter @NonNull private final UUID inputDatasetFieldUuid; - @Getter @NonNull private final String transformationDescription; - @Getter @NonNull private final String transformationType; + private final String transformationDescription; + private final String transformationType; @Getter @NonNull private final Instant createdAt; @Getter @NonNull private Instant updatedAt; + + public Optional getTransformationDescription() { + return Optional.ofNullable(transformationDescription); + } + + public Optional getTransformationType() { + return Optional.ofNullable(transformationType); + } } diff --git a/api/src/main/java/marquez/db/models/InputFieldNodeData.java b/api/src/main/java/marquez/db/models/InputFieldNodeData.java new file mode 100644 index 0000000000..a068bf271f --- /dev/null +++ b/api/src/main/java/marquez/db/models/InputFieldNodeData.java @@ -0,0 +1,18 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.db.models; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NonNull; + +@Getter +@AllArgsConstructor +public class InputFieldNodeData { + @NonNull String namespace; + @NonNull String dataset; + @NonNull String field; +} diff --git a/api/src/main/java/marquez/db/models/NodeData.java b/api/src/main/java/marquez/db/models/NodeData.java index c2e7037b37..3a7bcc6dfc 100644 --- a/api/src/main/java/marquez/db/models/NodeData.java +++ b/api/src/main/java/marquez/db/models/NodeData.java @@ -14,6 +14,7 @@ property = "type") @JsonSubTypes({ @JsonSubTypes.Type(value = DatasetData.class, name = "DATASET"), - @JsonSubTypes.Type(value = JobData.class, name = "JOB") + @JsonSubTypes.Type(value = JobData.class, name = "JOB"), + @JsonSubTypes.Type(value = ColumnLineageNodeData.class, name = "DATASET_FIELD") }) public interface NodeData {} diff --git a/api/src/main/java/marquez/service/ColumnLineageService.java b/api/src/main/java/marquez/service/ColumnLineageService.java new file mode 100644 index 0000000000..66061b1d01 --- /dev/null +++ b/api/src/main/java/marquez/service/ColumnLineageService.java @@ -0,0 +1,127 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.service; + +import com.google.common.collect.ImmutableSortedSet; +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; +import marquez.common.models.DatasetFieldId; +import marquez.common.models.DatasetId; +import marquez.db.ColumnLineageDao; +import marquez.db.DatasetFieldDao; +import marquez.db.models.ColumnLineageNodeData; +import marquez.service.models.Edge; +import marquez.service.models.Lineage; +import marquez.service.models.Node; +import marquez.service.models.NodeId; + +@Slf4j +public class ColumnLineageService extends DelegatingDaos.DelegatingColumnLineageDao { + private final DatasetFieldDao datasetFieldDao; + + public ColumnLineageService(ColumnLineageDao dao, DatasetFieldDao datasetFieldDao) { + super(dao); + this.datasetFieldDao = datasetFieldDao; + } + + public Lineage lineage(NodeId nodeId, int depth, Instant createdAtUntil) { + List columnNodeUuids = getColumnNodeUuids(nodeId); + if (columnNodeUuids.isEmpty()) { + throw new NodeIdNotFoundException("Could not find node"); + } + + return toLineage(getLineage(depth, columnNodeUuids, createdAtUntil)); + } + + private Lineage toLineage(Set lineageNodeData) { + Map graphNodes = new HashMap<>(); + Map> inEdges = new HashMap<>(); + Map> outEdges = new HashMap<>(); + + // create nodes + lineageNodeData.stream() + .forEach( + columnLineageNodeData -> { + NodeId nodeId = + NodeId.of( + DatasetFieldId.of( + columnLineageNodeData.getNamespace(), + columnLineageNodeData.getDataset(), + columnLineageNodeData.getField())); + graphNodes.put(nodeId, Node.datasetField().data(columnLineageNodeData).id(nodeId)); + columnLineageNodeData.getInputFields().stream() + .map( + i -> + NodeId.of( + DatasetFieldId.of(i.getNamespace(), i.getDataset(), i.getField()))) + .forEach( + inputNodeId -> { + graphNodes.put(inputNodeId, Node.datasetField().id(inputNodeId)); + Optional.ofNullable(outEdges.get(inputNodeId)) + .ifPresentOrElse( + nodeEdges -> nodeEdges.add(nodeId), + () -> outEdges.put(inputNodeId, new LinkedList<>(List.of(nodeId)))); + Optional.ofNullable(inEdges.get(nodeId)) + .ifPresentOrElse( + nodeEdges -> nodeEdges.add(inputNodeId), + () -> inEdges.put(nodeId, new LinkedList<>(List.of(inputNodeId)))); + }); + }); + + // add edges between the nodes + inEdges.forEach( + (nodeId, nodes) -> { + graphNodes + .get(nodeId) + .inEdges( + nodes.stream() + .map(toNodeId -> new Edge(nodeId, toNodeId)) + .collect(Collectors.toSet())); + }); + outEdges.forEach( + (nodeId, nodes) -> { + graphNodes + .get(nodeId) + .outEdges( + nodes.stream() + .map(toNodeId -> new Edge(nodeId, toNodeId)) + .collect(Collectors.toSet())); + }); + + // build nodes and return as lineage + return new Lineage( + ImmutableSortedSet.copyOf( + graphNodes.values().stream().map(Node.Builder::build).collect(Collectors.toSet()))); + } + + List getColumnNodeUuids(NodeId nodeId) { + List columnNodeUuids = new ArrayList<>(); + if (nodeId.isDatasetType()) { + DatasetId datasetId = nodeId.asDatasetId(); + columnNodeUuids.addAll( + datasetFieldDao.findDatasetFieldsUuids( + datasetId.getNamespace().getValue(), datasetId.getName().getValue())); + } else if (nodeId.isDatasetFieldType()) { + DatasetFieldId datasetFieldId = nodeId.asDatasetFieldId(); + datasetFieldDao + .findUuid( + datasetFieldId.getDatasetId().getNamespace().getValue(), + datasetFieldId.getDatasetId().getName().getValue(), + datasetFieldId.getFieldName().getValue()) + .ifPresent(uuid -> columnNodeUuids.add(uuid)); + } + return columnNodeUuids; + } +} diff --git a/api/src/main/java/marquez/service/DelegatingDaos.java b/api/src/main/java/marquez/service/DelegatingDaos.java index 8184200ab4..5b980156af 100644 --- a/api/src/main/java/marquez/service/DelegatingDaos.java +++ b/api/src/main/java/marquez/service/DelegatingDaos.java @@ -7,6 +7,7 @@ import lombok.RequiredArgsConstructor; import lombok.experimental.Delegate; +import marquez.db.ColumnLineageDao; import marquez.db.DatasetDao; import marquez.db.DatasetFieldDao; import marquez.db.DatasetVersionDao; @@ -98,4 +99,9 @@ public static class DelegatingTagDao implements TagDao { public static class DelegatingLineageDao implements LineageDao { @Delegate private final LineageDao delegate; } + + @RequiredArgsConstructor + public static class DelegatingColumnLineageDao implements ColumnLineageDao { + @Delegate private final ColumnLineageDao delegate; + } } diff --git a/api/src/main/java/marquez/service/ServiceFactory.java b/api/src/main/java/marquez/service/ServiceFactory.java index 5a4b51465b..8b5365dadb 100644 --- a/api/src/main/java/marquez/service/ServiceFactory.java +++ b/api/src/main/java/marquez/service/ServiceFactory.java @@ -22,4 +22,5 @@ public class ServiceFactory { @NonNull DatasetVersionService datasetVersionService; @NonNull DatasetFieldService datasetFieldService; @NonNull LineageService lineageService; + @NonNull ColumnLineageService columnLineageService; } diff --git a/api/src/main/java/marquez/service/models/LineageEvent.java b/api/src/main/java/marquez/service/models/LineageEvent.java index 8202028944..ad69188478 100644 --- a/api/src/main/java/marquez/service/models/LineageEvent.java +++ b/api/src/main/java/marquez/service/models/LineageEvent.java @@ -332,7 +332,7 @@ public static class DatasetFacets { @Valid private SchemaDatasetFacet schema; @Valid private LifecycleStateChangeFacet lifecycleStateChange; @Valid private DatasourceDatasetFacet dataSource; - @Valid private ColumnLineageFacet columnLineage; + @Valid private LineageEvent.ColumnLineageDatasetFacet columnLineage; @Valid private DatasetSymlinkFacet symlinks; private String description; @Builder.Default @JsonIgnore private Map additional = new LinkedHashMap<>(); @@ -367,7 +367,7 @@ public DatasourceDatasetFacet getDataSource() { return dataSource; } - public ColumnLineageFacet getColumnLineage() { + public ColumnLineageDatasetFacet getColumnLineage() { return columnLineage; } @@ -490,22 +490,47 @@ public LifecycleStateChangeFacet( } } + @Getter + @Setter + @Valid + @ToString @NoArgsConstructor + @AllArgsConstructor + public static class ColumnLineageDatasetFacet extends BaseFacet { + @Valid private ColumnLineageDatasetFacetFields fields; + + @Builder + public ColumnLineageDatasetFacet( + @NotNull URI _producer, @NotNull URI _schemaURL, ColumnLineageDatasetFacetFields fields) { + super(_producer, _schemaURL); + this.fields = fields; + } + } + + @Builder @Getter @Setter @Valid @ToString - public static class ColumnLineageFacet extends BaseFacet { + @NoArgsConstructor + public static class ColumnLineageDatasetFacetFields { - private List outputColumnsList; + @Builder.Default @JsonIgnore + private Map additional = new LinkedHashMap<>(); + + @JsonAnySetter + public void setFacet(String key, ColumnLineageOutputColumn value) { + additional.put(key, value); + } + + @JsonAnyGetter + public Map getAdditionalFacets() { + return additional; + } @Builder - public ColumnLineageFacet( - @NotNull URI _producer, - @NotNull URI _schemaURL, - List outputColumnsList) { - super(_producer, _schemaURL); - this.outputColumnsList = outputColumnsList; + public ColumnLineageDatasetFacetFields(Map additional) { + this.additional = additional; } } @@ -518,10 +543,9 @@ public ColumnLineageFacet( @ToString public static class ColumnLineageOutputColumn extends BaseJsonModel { - @NotNull private String name; @NotNull private List inputFields; - @NotNull private String transformationDescription; - @NotNull private String transformationType; + private String transformationDescription; + private String transformationType; } @Builder @@ -533,8 +557,8 @@ public static class ColumnLineageOutputColumn extends BaseJsonModel { @ToString public static class ColumnLineageInputField extends BaseJsonModel { - @NotNull private String datasetNamespace; - @NotNull private String datasetName; - @NotNull private String fieldName; + @NotNull private String namespace; + @NotNull private String name; + @NotNull private String field; } } diff --git a/api/src/main/java/marquez/service/models/Node.java b/api/src/main/java/marquez/service/models/Node.java index 7cddf633ea..e526108b5b 100644 --- a/api/src/main/java/marquez/service/models/Node.java +++ b/api/src/main/java/marquez/service/models/Node.java @@ -52,6 +52,10 @@ public static Builder dataset() { return new Builder(NodeType.DATASET); } + public static Builder datasetField() { + return new Builder(NodeType.DATASET_FIELD); + } + public static Builder job() { return new Builder(NodeType.JOB); } diff --git a/api/src/main/java/marquez/service/models/NodeId.java b/api/src/main/java/marquez/service/models/NodeId.java index de8c82f8e7..04cbf2b6ce 100644 --- a/api/src/main/java/marquez/service/models/NodeId.java +++ b/api/src/main/java/marquez/service/models/NodeId.java @@ -20,9 +20,11 @@ import lombok.Getter; import lombok.NonNull; import lombok.ToString; +import marquez.common.models.DatasetFieldId; import marquez.common.models.DatasetId; import marquez.common.models.DatasetName; import marquez.common.models.DatasetVersionId; +import marquez.common.models.FieldName; import marquez.common.models.JobId; import marquez.common.models.JobName; import marquez.common.models.JobVersionId; @@ -40,11 +42,14 @@ public final class NodeId implements Comparable { public static final Joiner ID_JOINER = Joiner.on(ID_DELIM); private static final String ID_PREFX_DATASET = "dataset"; + private static final String ID_PREFX_DATASET_FIELD = "datasetField"; private static final String ID_PREFX_JOB = "job"; private static final String ID_PREFX_RUN = "run"; private static final Pattern ID_PATTERN = Pattern.compile( - String.format("^(%s|%s|%s):.*$", ID_PREFX_DATASET, ID_PREFX_JOB, ID_PREFX_RUN)); + String.format( + "^(%s|%s|%s|%s):.*$", + ID_PREFX_DATASET, ID_PREFX_DATASET_FIELD, ID_PREFX_JOB, ID_PREFX_RUN)); public static final String VERSION_DELIM = "#"; @@ -53,9 +58,10 @@ public final class NodeId implements Comparable { public NodeId(final String value) { checkArgument( ID_PATTERN.matcher(value).matches(), - "node ID (%s) must start with '%s', '%s', or '%s'", + "node ID (%s) must start with '%s', '%s', '%s' or '%s'", value, ID_PREFX_DATASET, + ID_PREFX_DATASET_FIELD, ID_PREFX_JOB, ID_PREFX_RUN); this.value = value; @@ -110,6 +116,15 @@ public static NodeId of(@NonNull DatasetId datasetId) { return NodeId.of(datasetId.getNamespace(), datasetId.getName()); } + public static NodeId of(@NonNull DatasetFieldId datasetFieldIdId) { + return of( + ID_JOINER.join( + ID_PREFX_DATASET_FIELD, + datasetFieldIdId.getDatasetId().getNamespace().getValue(), + datasetFieldIdId.getDatasetId().getName().getValue(), + datasetFieldIdId.getFieldName().getValue())); + } + public static NodeId of(@NonNull JobId jobId) { return NodeId.of(jobId.getNamespace(), jobId.getName()); } @@ -133,7 +148,12 @@ private static String appendVersionTo(@NonNull final String value, @Nullable fin @JsonIgnore public boolean isDatasetType() { - return value.startsWith(ID_PREFX_DATASET); + return value.startsWith(ID_PREFX_DATASET + ID_DELIM); + } + + @JsonIgnore + public boolean isDatasetFieldType() { + return value.startsWith(ID_PREFX_DATASET_FIELD); } @JsonIgnore @@ -167,7 +187,8 @@ public boolean sameTypeAs(@NonNull NodeId o) { || (this.isDatasetVersionType() && o.isDatasetVersionType()) || (this.isJobType() && o.isJobType()) || (this.isJobVersionType() && o.isJobVersionType()) - || (this.isRunType() && o.isRunType()); + || (this.isRunType() && o.isRunType()) + || (this.isDatasetFieldType() && o.isDatasetFieldType()); } @JsonIgnore @@ -220,6 +241,14 @@ public DatasetId asDatasetId() { return new DatasetId(NamespaceName.of(parts[1]), DatasetName.of(parts[2])); } + @JsonIgnore + public DatasetFieldId asDatasetFieldId() { + String[] parts = parts(4, ID_PREFX_DATASET); + return new DatasetFieldId( + new DatasetId(NamespaceName.of(parts[1]), DatasetName.of(parts[2])), + FieldName.of(parts[3])); + } + @JsonIgnore public JobVersionId asJobVersionId() { String[] parts = parts(3, ID_PREFX_JOB); diff --git a/api/src/main/java/marquez/service/models/NodeType.java b/api/src/main/java/marquez/service/models/NodeType.java index 4951fb5371..2f1cb3616d 100644 --- a/api/src/main/java/marquez/service/models/NodeType.java +++ b/api/src/main/java/marquez/service/models/NodeType.java @@ -7,6 +7,7 @@ public enum NodeType { DATASET, + DATASET_FIELD, JOB, RUN; } diff --git a/api/src/main/resources/marquez/db/migration/V49__column_lineage.sql b/api/src/main/resources/marquez/db/migration/V49__column_lineage.sql index bced4eff50..ed5d604968 100644 --- a/api/src/main/resources/marquez/db/migration/V49__column_lineage.sql +++ b/api/src/main/resources/marquez/db/migration/V49__column_lineage.sql @@ -5,8 +5,8 @@ CREATE TABLE column_lineage ( output_dataset_field_uuid uuid REFERENCES dataset_fields(uuid), input_dataset_version_uuid uuid REFERENCES dataset_versions(uuid), -- speed up graph column lineage graph traversal input_dataset_field_uuid uuid REFERENCES dataset_fields(uuid), - transformation_description VARCHAR(255) NOT NULL, - transformation_type VARCHAR(255) NOT NULL, + transformation_description VARCHAR(255), + transformation_type VARCHAR(255), created_at TIMESTAMP NOT NULL, updated_at TIMESTAMP NOT NULL, UNIQUE (output_dataset_version_uuid, output_dataset_field_uuid, input_dataset_version_uuid, input_dataset_field_uuid) diff --git a/api/src/main/resources/marquez/db/migration/V50__index_dataset_fields.sql b/api/src/main/resources/marquez/db/migration/V50__index_dataset_fields.sql new file mode 100644 index 0000000000..54b26b5921 --- /dev/null +++ b/api/src/main/resources/marquez/db/migration/V50__index_dataset_fields.sql @@ -0,0 +1,4 @@ +/* SPDX-License-Identifier: Apache-2.0 */ + +create index dataset_fields_dataset_uuid + on dataset_fields (dataset_uuid); \ No newline at end of file diff --git a/api/src/test/java/marquez/api/ApiTestUtils.java b/api/src/test/java/marquez/api/ApiTestUtils.java index f9e181cf79..96c09f0e7b 100644 --- a/api/src/test/java/marquez/api/ApiTestUtils.java +++ b/api/src/test/java/marquez/api/ApiTestUtils.java @@ -8,6 +8,7 @@ import static org.mockito.Mockito.mock; import java.util.Map; +import marquez.service.ColumnLineageService; import marquez.service.DatasetFieldService; import marquez.service.DatasetService; import marquez.service.DatasetVersionService; @@ -33,6 +34,9 @@ public static ServiceFactory mockServiceFactory(Map mocks) { return ServiceFactory.builder() .lineageService( (LineageService) mocks.getOrDefault(LineageService.class, (mock(LineageService.class)))) + .columnLineageService( + (ColumnLineageService) + mocks.getOrDefault(ColumnLineageService.class, (mock(ColumnLineageService.class)))) .openLineageService( (OpenLineageService) mocks.getOrDefault(OpenLineageService.class, (mock(OpenLineageService.class)))) diff --git a/api/src/test/java/marquez/api/ColumnLineageResourceTest.java b/api/src/test/java/marquez/api/ColumnLineageResourceTest.java new file mode 100644 index 0000000000..dd5c2ab03e --- /dev/null +++ b/api/src/test/java/marquez/api/ColumnLineageResourceTest.java @@ -0,0 +1,78 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.api; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.collect.ImmutableSortedSet; +import io.dropwizard.testing.junit5.DropwizardExtensionsSupport; +import io.dropwizard.testing.junit5.ResourceExtension; +import java.time.Instant; +import java.util.Map; +import marquez.common.Utils; +import marquez.service.ColumnLineageService; +import marquez.service.ServiceFactory; +import marquez.service.models.Lineage; +import marquez.service.models.Node; +import marquez.service.models.NodeId; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(DropwizardExtensionsSupport.class) +public class ColumnLineageResourceTest { + + private static ResourceExtension UNDER_TEST; + private static Lineage LINEAGE; + + static { + ColumnLineageService lineageService = mock(ColumnLineageService.class); + + Node testNode = + Utils.fromJson( + ColumnLineageResourceTest.class.getResourceAsStream("/column_lineage/node.json"), + new TypeReference<>() {}); + LINEAGE = new Lineage(ImmutableSortedSet.of(testNode)); + when(lineageService.lineage(any(NodeId.class), anyInt(), any(Instant.class))) + .thenReturn(LINEAGE); + + ServiceFactory serviceFactory = + ApiTestUtils.mockServiceFactory(Map.of(ColumnLineageService.class, lineageService)); + + UNDER_TEST = + ResourceExtension.builder().addResource(new ColumnLineageResource(serviceFactory)).build(); + } + + @Test + public void testGetColumnLineageByDatasetField() { + final Lineage lineage = + UNDER_TEST + .target("/api/v1/column-lineage") + .queryParam("nodeId", "datasetField:namespace:commonDataset:col_a") + .request() + .get() + .readEntity(Lineage.class); + + assertEquals(lineage, LINEAGE); + } + + @Test + public void testGetColumnLineageByDataset() { + final Lineage lineage = + UNDER_TEST + .target("/api/v1/column-lineage") + .queryParam("nodeId", "dataset:namespace:commonDataset") + .request() + .get() + .readEntity(Lineage.class); + + assertEquals(lineage, LINEAGE); + } +} diff --git a/api/src/test/java/marquez/db/ColumnLineageDaoTest.java b/api/src/test/java/marquez/db/ColumnLineageDaoTest.java index 6a8e7e2a1c..6586c8a57c 100644 --- a/api/src/test/java/marquez/db/ColumnLineageDaoTest.java +++ b/api/src/test/java/marquez/db/ColumnLineageDaoTest.java @@ -5,7 +5,13 @@ package marquez.db; +import static marquez.db.ColumnLineageTestUtils.getDatasetA; +import static marquez.db.ColumnLineageTestUtils.getDatasetB; +import static marquez.db.ColumnLineageTestUtils.getDatasetC; +import static marquez.db.LineageTestUtils.PRODUCER_URL; +import static marquez.db.LineageTestUtils.SCHEMA_URL; import static marquez.db.OpenLineageDao.DEFAULT_NAMESPACE_OWNER; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -13,14 +19,20 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.UUID; +import java.util.stream.Collectors; import marquez.common.models.DatasetType; +import marquez.db.models.ColumnLineageNodeData; import marquez.db.models.ColumnLineageRow; import marquez.db.models.DatasetRow; import marquez.db.models.DatasetVersionRow; import marquez.db.models.NamespaceRow; import marquez.db.models.SourceRow; +import marquez.db.models.UpdateLineageRow; import marquez.jdbi.MarquezJdbiExternalPostgresExtension; +import marquez.service.models.LineageEvent; +import marquez.service.models.LineageEvent.Dataset; import org.apache.commons.lang3.tuple.Pair; import org.jdbi.v3.core.Jdbi; import org.junit.jupiter.api.AfterEach; @@ -32,6 +44,7 @@ @ExtendWith(MarquezJdbiExternalPostgresExtension.class) public class ColumnLineageDaoTest { + private static OpenLineageDao openLineageDao; private static ColumnLineageDao dao; private static DatasetFieldDao fieldDao; private static DatasetDao datasetDao; @@ -47,9 +60,11 @@ public class ColumnLineageDaoTest { private DatasetRow outputDatasetRow; private DatasetVersionRow inputDatasetVersionRow; private DatasetVersionRow outputDatasetVersionRow; + private LineageEvent.JobFacet jobFacet; @BeforeAll public static void setUpOnce(Jdbi jdbi) { + openLineageDao = jdbi.onDemand(OpenLineageDao.class); dao = jdbi.onDemand(ColumnLineageDao.class); fieldDao = jdbi.onDemand(DatasetFieldDao.class); datasetDao = jdbi.onDemand(DatasetDao.class); @@ -129,20 +144,13 @@ public void setup() { // insert output dataset field fieldDao.upsert( outputDatasetFieldUuid, now, "output-field", "string", "desc", outputDatasetRow.getUuid()); + + jobFacet = new LineageEvent.JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); } @AfterEach public void tearDown(Jdbi jdbi) { - jdbi.inTransaction( - handle -> { - handle.execute("DELETE FROM column_lineage"); - handle.execute("DELETE FROM dataset_versions"); - handle.execute("DELETE FROM dataset_fields"); - handle.execute("DELETE FROM datasets"); - handle.execute("DELETE FROM sources"); - handle.execute("DELETE FROM namespaces"); - return null; - }); + ColumnLineageTestUtils.tearDown(jdbi); } @Test @@ -172,8 +180,8 @@ void testUpsertMultipleColumns() { assertTrue( Arrays.asList(inputFieldUuid1, inputFieldUuid2) .contains(rows.get(0).getInputDatasetFieldUuid())); // ordering may differ per run - assertEquals(transformationDescription, rows.get(0).getTransformationDescription()); - assertEquals(transformationType, rows.get(0).getTransformationType()); + assertEquals(transformationDescription, rows.get(0).getTransformationDescription().get()); + assertEquals(transformationType, rows.get(0).getTransformationType().get()); assertEquals(now.getEpochSecond(), rows.get(0).getCreatedAt().getEpochSecond()); assertEquals(now.getEpochSecond(), rows.get(0).getUpdatedAt().getEpochSecond()); } @@ -219,4 +227,342 @@ void testUpsertOnUpdatePreventsDuplicates() { assertEquals( now.plusSeconds(1000).getEpochSecond(), rows.get(0).getUpdatedAt().getEpochSecond()); } + + // dataset_A (col_a, col_b) + // dataset_B (col_c) depends on (col_a, col_b) + // dataset_C (col_d) depends on col_c + @Test + void testGetLineage() { + Dataset dataset_A = getDatasetA(); + Dataset dataset_B = getDatasetB(); + Dataset dataset_C = getDatasetC(); + + LineageTestUtils.createLineageRow( + openLineageDao, + "job1", + "COMPLETE", + jobFacet, + Arrays.asList(dataset_A), + Arrays.asList(dataset_B)); + + UpdateLineageRow lineageRow = + LineageTestUtils.createLineageRow( + openLineageDao, + "job2", + "COMPLETE", + jobFacet, + Arrays.asList(dataset_B), + Arrays.asList(dataset_C)); + + UpdateLineageRow.DatasetRecord datasetRecord_c = lineageRow.getOutputs().get().get(0); + UUID field_col_d = fieldDao.findUuid(datasetRecord_c.getDatasetRow().getUuid(), "col_d").get(); + Set lineage = + dao.getLineage(20, Collections.singletonList(field_col_d), Instant.now()); + + assertEquals(2, lineage.size()); + + ColumnLineageNodeData dataset_b = + lineage.stream().filter(cd -> cd.getDataset().equals("dataset_b")).findAny().get(); + ColumnLineageNodeData dataset_c = + lineage.stream().filter(cd -> cd.getDataset().equals("dataset_c")).findAny().get(); + + // test dataset_c + assertThat(dataset_c.getInputFields()).hasSize(1); + assertEquals("col_d", dataset_c.getField()); + assertEquals("namespace", dataset_c.getInputFields().get(0).getNamespace()); + assertEquals("dataset_b", dataset_c.getInputFields().get(0).getDataset()); + assertEquals("col_c", dataset_c.getInputFields().get(0).getField()); + assertEquals("type2", dataset_c.getTransformationType()); + assertEquals("description2", dataset_c.getTransformationDescription()); + + // test dataset_b + assertThat(dataset_b.getInputFields()).hasSize(2); + assertEquals("col_c", dataset_b.getField()); + assertEquals( + "col_b", + dataset_b.getInputFields().stream() + .filter(f -> f.getField().equals("col_b")) + .findAny() + .get() + .getField()); + assertEquals( + "col_a", + dataset_b.getInputFields().stream() + .filter(f -> f.getField().equals("col_a")) + .findAny() + .get() + .getField()); + + assertEquals("namespace", dataset_b.getInputFields().get(0).getNamespace()); + assertEquals("dataset_a", dataset_b.getInputFields().get(0).getDataset()); + assertEquals("type1", dataset_b.getTransformationType()); + assertEquals("description1", dataset_b.getTransformationDescription()); + } + + @Test + void testGetLineageWhenNoLineageForColumn() { + Dataset dataset_A = getDatasetA(); + Dataset dataset_B = getDatasetB(); + Dataset dataset_C = getDatasetC(); + + UpdateLineageRow lineageRow = + LineageTestUtils.createLineageRow( + openLineageDao, + "job1", + "COMPLETE", + jobFacet, + Arrays.asList(dataset_A), + Arrays.asList(dataset_B)); + + LineageTestUtils.createLineageRow( + openLineageDao, + "job2", + "COMPLETE", + jobFacet, + Arrays.asList(dataset_B), + Arrays.asList(dataset_C)); + + UpdateLineageRow.DatasetRecord datasetRecord_a = lineageRow.getInputs().get().get(0); + UUID field_col_a = fieldDao.findUuid(datasetRecord_a.getDatasetRow().getUuid(), "col_a").get(); + + // assert lineage is empty + assertThat(dao.getLineage(20, Collections.singletonList(field_col_a), Instant.now())).isEmpty(); + } + + /** + * Create dataset_d build on the topi of dataset_c. Lineage of depth 1 of dataset_d should be of + * size 2 (instead of 3) + */ + @Test + void testGetLineageWithLimitedDepth() { + Dataset dataset_A = getDatasetA(); + Dataset dataset_B = getDatasetB(); + Dataset dataset_C = getDatasetC(); + Dataset dataset_D = + new Dataset( + "namespace", + "dataset_d", + LineageEvent.DatasetFacets.builder() + .schema( + new LineageEvent.SchemaDatasetFacet( + PRODUCER_URL, + SCHEMA_URL, + Arrays.asList(new LineageEvent.SchemaField("col_e", "STRING", "")))) + .columnLineage( + new LineageEvent.ColumnLineageDatasetFacet( + PRODUCER_URL, + SCHEMA_URL, + new LineageEvent.ColumnLineageDatasetFacetFields( + Collections.singletonMap( + "col_e", + new LineageEvent.ColumnLineageOutputColumn( + Arrays.asList( + new LineageEvent.ColumnLineageInputField( + "namespace", "dataset_c", "col_d")), + "", + ""))))) + .build()); + + LineageTestUtils.createLineageRow( + openLineageDao, + "job1", + "COMPLETE", + jobFacet, + Arrays.asList(dataset_A), + Arrays.asList(dataset_B)); + + LineageTestUtils.createLineageRow( + openLineageDao, + "job2", + "COMPLETE", + jobFacet, + Arrays.asList(dataset_B), + Arrays.asList(dataset_C)); + + UpdateLineageRow lineageRow = + LineageTestUtils.createLineageRow( + openLineageDao, + "job2", + "COMPLETE", + jobFacet, + Arrays.asList(dataset_C), + Arrays.asList(dataset_D)); + + UpdateLineageRow.DatasetRecord datasetRecord_d = lineageRow.getOutputs().get().get(0); + UUID field_col_e = fieldDao.findUuid(datasetRecord_d.getDatasetRow().getUuid(), "col_e").get(); + + // make sure dataset are constructed properly + assertThat(dao.getLineage(20, Collections.singletonList(field_col_e), Instant.now())) + .hasSize(3); + + // verify graph size is 2 when max depth is 1 + assertThat(dao.getLineage(1, Collections.singletonList(field_col_e), Instant.now())).hasSize(2); + } + + @Test + void testGetLineageWhenCycleExists() { + Dataset dataset_A = + new Dataset( + "namespace", + "dataset_a", + LineageEvent.DatasetFacets.builder() + .schema( + new LineageEvent.SchemaDatasetFacet( + PRODUCER_URL, + SCHEMA_URL, + Arrays.asList( + new LineageEvent.SchemaField("col_a", "STRING", ""), + new LineageEvent.SchemaField("col_b", "STRING", "")))) + .columnLineage( + new LineageEvent.ColumnLineageDatasetFacet( + PRODUCER_URL, + SCHEMA_URL, + new LineageEvent.ColumnLineageDatasetFacetFields( + Collections.singletonMap( + "col_a", + new LineageEvent.ColumnLineageOutputColumn( + Arrays.asList( + new LineageEvent.ColumnLineageInputField( + "namespace", "dataset_c", "col_d")), + "description3", + "type3"))))) + .build()); + Dataset dataset_B = getDatasetB(); + Dataset dataset_C = getDatasetC(); + + LineageTestUtils.createLineageRow( + openLineageDao, + "job1", + "COMPLETE", + jobFacet, + Arrays.asList(dataset_A), + Arrays.asList(dataset_B)); + + LineageTestUtils.createLineageRow( + openLineageDao, + "job2", + "COMPLETE", + jobFacet, + Arrays.asList(dataset_B), + Arrays.asList(dataset_C)); + + UpdateLineageRow lineageRow = + LineageTestUtils.createLineageRow( + openLineageDao, + "job3", + "COMPLETE", + jobFacet, + Arrays.asList(dataset_C), + Arrays.asList(dataset_A)); + + UpdateLineageRow.DatasetRecord datasetRecord_a = lineageRow.getOutputs().get().get(0); + UpdateLineageRow.DatasetRecord datasetRecord_c = lineageRow.getInputs().get().get(0); + + UUID field_col_a = fieldDao.findUuid(datasetRecord_a.getDatasetRow().getUuid(), "col_a").get(); + UUID field_col_d = fieldDao.findUuid(datasetRecord_c.getDatasetRow().getUuid(), "col_d").get(); + + // column lineages for col_a and col_e should be of size 3 + assertThat(dao.getLineage(20, Collections.singletonList(field_col_a), Instant.now())) + .hasSize(3); + assertThat(dao.getLineage(20, Collections.singletonList(field_col_d), Instant.now())) + .hasSize(3); + } + + /** + * Run two jobs that write to dataset_b using dataset_a and dataset_c. Both input fields should be + * returned + */ + @Test + void testGetLineageWhenTwoJobsWriteToSameDataset() { + Dataset dataset_A = getDatasetA(); + Dataset dataset_B = getDatasetB(); + Dataset dataset_C = getDatasetC(); + Dataset dataset_B_another_job = + new Dataset( + "namespace", + "dataset_b", + LineageEvent.DatasetFacets.builder() + .schema( + new LineageEvent.SchemaDatasetFacet( + PRODUCER_URL, + SCHEMA_URL, + Arrays.asList(new LineageEvent.SchemaField("col_c", "STRING", "")))) + .columnLineage( + new LineageEvent.ColumnLineageDatasetFacet( + PRODUCER_URL, + SCHEMA_URL, + new LineageEvent.ColumnLineageDatasetFacetFields( + Collections.singletonMap( + "col_c", + new LineageEvent.ColumnLineageOutputColumn( + Arrays.asList( + new LineageEvent.ColumnLineageInputField( + "namespace", "dataset_c", "col_d")), + "description1", + "type1"))))) + .build()); + + LineageTestUtils.createLineageRow( + openLineageDao, + "job1", + "COMPLETE", + jobFacet, + Arrays.asList(dataset_A), + Arrays.asList(dataset_B)); + + UpdateLineageRow lineageRow = + LineageTestUtils.createLineageRow( + openLineageDao, + "job1", + "COMPLETE", + jobFacet, + Arrays.asList(dataset_C), + Arrays.asList(dataset_B_another_job)); + + UpdateLineageRow.DatasetRecord datasetRecord_b = lineageRow.getOutputs().get().get(0); + UUID field_col_c = fieldDao.findUuid(datasetRecord_b.getDatasetRow().getUuid(), "col_c").get(); + + // assert input fields for col_d contain col_a and col_c + List inputFields = + dao.getLineage(20, Collections.singletonList(field_col_c), Instant.now()).stream() + .filter(node -> node.getDataset().equals("dataset_b")) + .flatMap(node -> node.getInputFields().stream()) + .map(input -> input.getField()) + .collect(Collectors.toList()); + + assertThat(inputFields).hasSize(3).contains("col_a", "col_b", "col_d"); + } + + @Test + void testGetLineagePointInTime() { + Dataset dataset_A = getDatasetA(); + Dataset dataset_B = getDatasetB(); + + UpdateLineageRow lineageRow = + LineageTestUtils.createLineageRow( + openLineageDao, + "job1", + "COMPLETE", + jobFacet, + Arrays.asList(dataset_A), + Arrays.asList(dataset_B)); + + UpdateLineageRow.DatasetRecord datasetRecord_b = lineageRow.getOutputs().get().get(0); + UUID field_col_b = fieldDao.findUuid(datasetRecord_b.getDatasetRow().getUuid(), "col_c").get(); + Instant columnLineageCreatedAt = + dao.findColumnLineageByDatasetVersionColumnAndOutputDatasetField( + datasetRecord_b.getDatasetVersionRow().getUuid(), field_col_b) + .get(0) + .getCreatedAt(); + + // assert lineage is empty before and present after + assertThat( + dao.getLineage( + 20, Collections.singletonList(field_col_b), columnLineageCreatedAt.minusSeconds(1))) + .isEmpty(); + assertThat( + dao.getLineage( + 20, Collections.singletonList(field_col_b), columnLineageCreatedAt.plusSeconds(1))) + .hasSize(1); + } } diff --git a/api/src/test/java/marquez/db/ColumnLineageTestUtils.java b/api/src/test/java/marquez/db/ColumnLineageTestUtils.java new file mode 100644 index 0000000000..662f4c695f --- /dev/null +++ b/api/src/test/java/marquez/db/ColumnLineageTestUtils.java @@ -0,0 +1,126 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.db; + +import static marquez.db.LineageTestUtils.PRODUCER_URL; +import static marquez.db.LineageTestUtils.SCHEMA_URL; + +import java.util.Arrays; +import java.util.Collections; +import marquez.service.models.LineageEvent; +import org.jdbi.v3.core.Jdbi; + +public class ColumnLineageTestUtils { + + public static void tearDown(Jdbi jdbi) { + jdbi.inTransaction( + handle -> { + handle.execute("DELETE FROM column_lineage"); + handle.execute("DELETE FROM lineage_events"); + handle.execute("DELETE FROM runs_input_mapping"); + handle.execute("DELETE FROM datasets_tag_mapping"); + handle.execute("DELETE FROM dataset_versions_field_mapping"); + handle.execute("DELETE FROM dataset_versions"); + handle.execute("UPDATE runs SET start_run_state_uuid=NULL, end_run_state_uuid=NULL"); + handle.execute("DELETE FROM run_states"); + handle.execute("DELETE FROM runs"); + handle.execute("DELETE FROM run_args"); + handle.execute("DELETE FROM job_versions_io_mapping"); + handle.execute("DELETE FROM job_versions"); + handle.execute("DELETE FROM jobs"); + handle.execute("DELETE FROM dataset_fields_tag_mapping"); + handle.execute("DELETE FROM dataset_fields"); + handle.execute("DELETE FROM datasets"); + handle.execute("DELETE FROM sources"); + handle.execute("DELETE FROM dataset_symlinks"); + handle.execute("DELETE FROM namespaces"); + return null; + }); + } + + // dataset_A (col_a, col_b) + // dataset_B (col_c) depends on (col_a, col_b) + // dataset_C (col_d) depends on col_c + public static LineageEvent.Dataset getDatasetA() { + return new LineageEvent.Dataset( + "namespace", + "dataset_a", + LineageEvent.DatasetFacets.builder() + .schema( + new LineageEvent.SchemaDatasetFacet( + PRODUCER_URL, + SCHEMA_URL, + Arrays.asList( + new LineageEvent.SchemaField("col_a", "STRING", ""), + new LineageEvent.SchemaField("col_b", "STRING", "")))) + .dataSource( + new LineageEvent.DatasourceDatasetFacet( + PRODUCER_URL, SCHEMA_URL, "the source", "http://thesource.com")) + .build()); + } + + // dataset_B (col_c) depends on (col_a, col_b) + public static LineageEvent.Dataset getDatasetB() { + return new LineageEvent.Dataset( + "namespace", + "dataset_b", + LineageEvent.DatasetFacets.builder() + .schema( + new LineageEvent.SchemaDatasetFacet( + PRODUCER_URL, + SCHEMA_URL, + Arrays.asList(new LineageEvent.SchemaField("col_c", "STRING", "")))) + .dataSource( + new LineageEvent.DatasourceDatasetFacet( + PRODUCER_URL, SCHEMA_URL, "the source", "http://thesource.com")) + .columnLineage( + new LineageEvent.ColumnLineageDatasetFacet( + PRODUCER_URL, + SCHEMA_URL, + new LineageEvent.ColumnLineageDatasetFacetFields( + Collections.singletonMap( + "col_c", + new LineageEvent.ColumnLineageOutputColumn( + Arrays.asList( + new LineageEvent.ColumnLineageInputField( + "namespace", "dataset_a", "col_a"), + new LineageEvent.ColumnLineageInputField( + "namespace", "dataset_a", "col_b")), + "description1", + "type1"))))) + .build()); + } + + // dataset_C (col_d) depends on col_c + public static LineageEvent.Dataset getDatasetC() { + return new LineageEvent.Dataset( + "namespace", + "dataset_c", + LineageEvent.DatasetFacets.builder() + .schema( + new LineageEvent.SchemaDatasetFacet( + PRODUCER_URL, + SCHEMA_URL, + Arrays.asList(new LineageEvent.SchemaField("col_d", "STRING", "")))) + .columnLineage( + new LineageEvent.ColumnLineageDatasetFacet( + PRODUCER_URL, + SCHEMA_URL, + new LineageEvent.ColumnLineageDatasetFacetFields( + Collections.singletonMap( + "col_d", + new LineageEvent.ColumnLineageOutputColumn( + Arrays.asList( + new LineageEvent.ColumnLineageInputField( + "namespace", "dataset_b", "col_c")), + "description2", + "type2"))))) + .dataSource( + new LineageEvent.DatasourceDatasetFacet( + PRODUCER_URL, SCHEMA_URL, "the source", "http://thesource.com")) + .build()); + } +} diff --git a/api/src/test/java/marquez/db/OpenLineageDaoTest.java b/api/src/test/java/marquez/db/OpenLineageDaoTest.java index e6993e7656..9d06d18dbb 100644 --- a/api/src/test/java/marquez/db/OpenLineageDaoTest.java +++ b/api/src/test/java/marquez/db/OpenLineageDaoTest.java @@ -144,8 +144,8 @@ void testUpdateMarquezModelDatasetWithColumnLineageFacet() { (ds) -> ds.getInputDatasetVersionUuid(), (ds) -> ds.getOutputDatasetFieldUuid(), (ds) -> ds.getOutputDatasetVersionUuid(), - (ds) -> ds.getTransformationDescription(), - (ds) -> ds.getTransformationType()) + (ds) -> ds.getTransformationDescription().get(), + (ds) -> ds.getTransformationType().get()) .containsExactly( Tuple.tuple( datasetFieldDao @@ -187,17 +187,18 @@ void testUpdateMarquezModelDatasetWithColumnLineageFacetWhenOutputFieldDoesNotEx DATASET_NAME, LineageEvent.DatasetFacets.builder() // schema is missing .columnLineage( - new LineageEvent.ColumnLineageFacet( + new LineageEvent.ColumnLineageDatasetFacet( PRODUCER_URL, SCHEMA_URL, - Collections.singletonList( - new LineageEvent.ColumnLineageOutputColumn( + new LineageEvent.ColumnLineageDatasetFacetFields( + Collections.singletonMap( OUTPUT_COLUMN, - Collections.singletonList( - new LineageEvent.ColumnLineageInputField( - INPUT_NAMESPACE, INPUT_DATASET, INPUT_FIELD_NAME)), - TRANSFORMATION_DESCRIPTION, - TRANSFORMATION_TYPE)))) + new LineageEvent.ColumnLineageOutputColumn( + Collections.singletonList( + new LineageEvent.ColumnLineageInputField( + INPUT_NAMESPACE, INPUT_DATASET, INPUT_FIELD_NAME)), + TRANSFORMATION_DESCRIPTION, + TRANSFORMATION_TYPE))))) .build()); JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); @@ -237,17 +238,18 @@ void testUpsertColumnLineageData() { SCHEMA_URL, Arrays.asList(new SchemaField(OUTPUT_COLUMN, "STRING", "my name")))) .columnLineage( - new LineageEvent.ColumnLineageFacet( + new LineageEvent.ColumnLineageDatasetFacet( PRODUCER_URL, SCHEMA_URL, - Collections.singletonList( - new LineageEvent.ColumnLineageOutputColumn( + new LineageEvent.ColumnLineageDatasetFacetFields( + Collections.singletonMap( OUTPUT_COLUMN, - Collections.singletonList( - new LineageEvent.ColumnLineageInputField( - INPUT_NAMESPACE, INPUT_DATASET, INPUT_FIELD_NAME)), - UPDATED_TRANSFORMATION_DESCRIPTION, - UPDATED_TRANSFORMATION_TYPE)))) + new LineageEvent.ColumnLineageOutputColumn( + Collections.singletonList( + new LineageEvent.ColumnLineageInputField( + INPUT_NAMESPACE, INPUT_DATASET, INPUT_FIELD_NAME)), + UPDATED_TRANSFORMATION_DESCRIPTION, + UPDATED_TRANSFORMATION_TYPE))))) .build()); JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); @@ -530,17 +532,18 @@ private Dataset getOutputDatasetWithColumnLineage() { SCHEMA_URL, Arrays.asList(new SchemaField(OUTPUT_COLUMN, "STRING", "my name")))) .columnLineage( - new LineageEvent.ColumnLineageFacet( + new LineageEvent.ColumnLineageDatasetFacet( PRODUCER_URL, SCHEMA_URL, - Collections.singletonList( - new LineageEvent.ColumnLineageOutputColumn( + new LineageEvent.ColumnLineageDatasetFacetFields( + Collections.singletonMap( OUTPUT_COLUMN, - Collections.singletonList( - new LineageEvent.ColumnLineageInputField( - INPUT_NAMESPACE, INPUT_DATASET, INPUT_FIELD_NAME)), - TRANSFORMATION_DESCRIPTION, - TRANSFORMATION_TYPE)))) + new LineageEvent.ColumnLineageOutputColumn( + Collections.singletonList( + new LineageEvent.ColumnLineageInputField( + INPUT_NAMESPACE, INPUT_DATASET, INPUT_FIELD_NAME)), + TRANSFORMATION_DESCRIPTION, + TRANSFORMATION_TYPE))))) .build()); } } diff --git a/api/src/test/java/marquez/service/ColumnLineageServiceTest.java b/api/src/test/java/marquez/service/ColumnLineageServiceTest.java new file mode 100644 index 0000000000..78224080ae --- /dev/null +++ b/api/src/test/java/marquez/service/ColumnLineageServiceTest.java @@ -0,0 +1,227 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.service; + +import static marquez.db.ColumnLineageTestUtils.getDatasetA; +import static marquez.db.ColumnLineageTestUtils.getDatasetB; +import static marquez.db.ColumnLineageTestUtils.getDatasetC; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.time.Instant; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import marquez.common.models.DatasetFieldId; +import marquez.common.models.DatasetId; +import marquez.common.models.DatasetName; +import marquez.common.models.NamespaceName; +import marquez.db.ColumnLineageDao; +import marquez.db.ColumnLineageTestUtils; +import marquez.db.DatasetFieldDao; +import marquez.db.LineageTestUtils; +import marquez.db.OpenLineageDao; +import marquez.db.models.ColumnLineageNodeData; +import marquez.db.models.InputFieldNodeData; +import marquez.jdbi.MarquezJdbiExternalPostgresExtension; +import marquez.service.models.Lineage; +import marquez.service.models.LineageEvent; +import marquez.service.models.Node; +import marquez.service.models.NodeId; +import org.jdbi.v3.core.Jdbi; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(MarquezJdbiExternalPostgresExtension.class) +public class ColumnLineageServiceTest { + + private static ColumnLineageDao dao; + private static OpenLineageDao openLineageDao; + private static DatasetFieldDao fieldDao; + private static ColumnLineageService lineageService; + private static LineageEvent.JobFacet jobFacet; + + @BeforeAll + public static void setUpOnce(Jdbi jdbi) { + dao = jdbi.onDemand(ColumnLineageDao.class); + openLineageDao = jdbi.onDemand(OpenLineageDao.class); + fieldDao = jdbi.onDemand(DatasetFieldDao.class); + lineageService = new ColumnLineageService(dao, fieldDao); + jobFacet = new LineageEvent.JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + } + + @AfterEach + public void tearDown(Jdbi jdbi) { + ColumnLineageTestUtils.tearDown(jdbi); + } + + @Test + public void testLineageByDatasetFieldId() { + LineageEvent.Dataset dataset_A = getDatasetA(); + LineageEvent.Dataset dataset_B = getDatasetB(); + LineageEvent.Dataset dataset_C = getDatasetC(); + + LineageTestUtils.createLineageRow( + openLineageDao, + "job1", + "COMPLETE", + jobFacet, + Arrays.asList(dataset_A), + Arrays.asList(dataset_B)); + LineageTestUtils.createLineageRow( + openLineageDao, + "job2", + "COMPLETE", + jobFacet, + Arrays.asList(dataset_B), + Arrays.asList(dataset_C)); + + Lineage lineage = + lineageService.lineage( + NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_c")), 20, Instant.now()); + + assertThat(lineage.getGraph()).hasSize(3); + + // check dataset_B node + Node col_c = getNode(lineage, "dataset_b", "col_c").get(); + List inputFields = + ((ColumnLineageNodeData) col_c.getData()).getInputFields(); + assertEquals( + "description1", ((ColumnLineageNodeData) col_c.getData()).getTransformationDescription()); + assertEquals("type1", ((ColumnLineageNodeData) col_c.getData()).getTransformationType()); + assertEquals("STRING", ((ColumnLineageNodeData) col_c.getData()).getFieldType()); + assertThat(inputFields).hasSize(2); + assertEquals("dataset_a", inputFields.get(0).getDataset()); + + // check dataset_A node + Node col_a = getNode(lineage, "dataset_a", "col_b").get(); + assertNull((ColumnLineageNodeData) col_a.getData()); + + // verify edges + // assert dataset_B (col_c) -> dataset_A (col_a) + assertThat(col_c.getOutEdges()).isEmpty(); + assertThat( + col_c.getInEdges().stream() + .map(edge -> edge.getDestination().asDatasetFieldId()) + .filter(field -> field.getFieldName().getValue().equals("col_a")) + .filter(field -> field.getDatasetId().getName().getValue().equals("dataset_a")) + .findAny()) + .isPresent(); + + assertThat(col_a.getInEdges()).isEmpty(); + assertThat( + col_a.getOutEdges().stream() + .map(edge -> edge.getDestination().asDatasetFieldId()) + .filter(field -> field.getFieldName().getValue().equals("col_c")) + .filter(field -> field.getDatasetId().getName().getValue().equals("dataset_b")) + .findAny()) + .isPresent(); + + // verify dataset_C not present in the graph + assertThat(getNode(lineage, "dataset_c", "col_d")).isEmpty(); + } + + @Test + public void testLineageByDatasetId() { + LineageEvent.Dataset dataset_A = getDatasetA(); + LineageEvent.Dataset dataset_B = getDatasetB(); + LineageEvent.Dataset dataset_C = getDatasetC(); + + LineageTestUtils.createLineageRow( + openLineageDao, + "job1", + "COMPLETE", + jobFacet, + Arrays.asList(dataset_A), + Arrays.asList(dataset_B)); + LineageTestUtils.createLineageRow( + openLineageDao, + "job2", + "COMPLETE", + jobFacet, + Arrays.asList(dataset_B), + Arrays.asList(dataset_C)); + + Lineage lineageByField = + lineageService.lineage( + NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_c")), 20, Instant.now()); + + Lineage lineageByDataset = + lineageService.lineage( + NodeId.of(new DatasetId(NamespaceName.of("namespace"), DatasetName.of("dataset_b"))), + 20, + Instant.now()); + + // lineage of dataset and column should be equal + assertThat(lineageByField).isEqualTo(lineageByDataset); + } + + @Test + public void testLineageWhenLineageEmpty() { + LineageEvent.Dataset dataset_A = getDatasetA(); + LineageEvent.Dataset dataset_B = getDatasetB(); + LineageEvent.Dataset dataset_C = getDatasetC(); + + LineageTestUtils.createLineageRow( + openLineageDao, + "job1", + "COMPLETE", + jobFacet, + Arrays.asList(dataset_A), + Arrays.asList(dataset_B)); + LineageTestUtils.createLineageRow( + openLineageDao, + "job2", + "COMPLETE", + jobFacet, + Arrays.asList(dataset_B), + Arrays.asList(dataset_C)); + + assertThrows( + NodeIdNotFoundException.class, + () -> + lineageService.lineage( + NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_d")), + 20, + Instant.now())); + + assertThrows( + NodeIdNotFoundException.class, + () -> + lineageService.lineage( + NodeId.of( + new DatasetId(NamespaceName.of("namespace"), DatasetName.of("dataset_d"))), + 20, + Instant.now())); + + assertThat( + lineageService + .lineage( + NodeId.of(DatasetFieldId.of("namespace", "dataset_a", "col_a")), + 20, + Instant.now()) + .getGraph()) + .hasSize(0); + } + + private Optional getNode(Lineage lineage, String datasetName, String fieldName) { + return lineage.getGraph().stream() + .filter(n -> n.getId().asDatasetFieldId().getFieldName().getValue().equals(fieldName)) + .filter( + n -> + n.getId() + .asDatasetFieldId() + .getDatasetId() + .getName() + .getValue() + .equals(datasetName)) + .findAny(); + } +} diff --git a/api/src/test/java/marquez/service/models/NodeIdTest.java b/api/src/test/java/marquez/service/models/NodeIdTest.java index 054508e0b9..f7b15b9300 100644 --- a/api/src/test/java/marquez/service/models/NodeIdTest.java +++ b/api/src/test/java/marquez/service/models/NodeIdTest.java @@ -10,8 +10,10 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import marquez.common.models.DatasetFieldId; import marquez.common.models.DatasetId; import marquez.common.models.DatasetName; +import marquez.common.models.FieldName; import marquez.common.models.JobId; import marquez.common.models.JobName; import marquez.common.models.NamespaceName; @@ -111,4 +113,32 @@ public void testDatasetWithVersion(String namespace, String dataset) { assertEquals( dataset.split(VERSION_DELIM)[1], nodeId.asDatasetVersionId().getVersion().toString()); } + + @ParameterizedTest(name = "testDatasetField-{index} {argumentsWithNames}") + @CsvSource( + value = { + "my-namespace$my-dataset$colA", + "gs://bucket$/path/to/data$colA", + "gs://bucket$/path/to/data$col_A" + }, + delimiter = '$') + public void testDatasetField(String namespace, String dataset, String field) { + NamespaceName namespaceName = NamespaceName.of(namespace); + FieldName fieldName = FieldName.of(field); + DatasetName datasetName = DatasetName.of(dataset); + DatasetId dsId = new DatasetId(namespaceName, datasetName); + DatasetFieldId dsfId = new DatasetFieldId(dsId, fieldName); + NodeId nodeId = NodeId.of(dsfId); + assertFalse(nodeId.isRunType()); + assertFalse(nodeId.isJobType()); + assertFalse(nodeId.isDatasetType()); + assertFalse(nodeId.hasVersion()); + assertTrue(nodeId.isDatasetFieldType()); + + assertEquals(dsfId, nodeId.asDatasetFieldId()); + assertEquals(nodeId, NodeId.of(nodeId.getValue())); + assertEquals(namespace, nodeId.asDatasetFieldId().getDatasetId().getNamespace().getValue()); + assertEquals(dataset, nodeId.asDatasetFieldId().getDatasetId().getName().getValue()); + assertEquals(field, nodeId.asDatasetFieldId().getFieldName().getValue()); + } } diff --git a/api/src/test/resources/column_lineage/node.json b/api/src/test/resources/column_lineage/node.json new file mode 100644 index 0000000000..0ad713d4ec --- /dev/null +++ b/api/src/test/resources/column_lineage/node.json @@ -0,0 +1,28 @@ +{ + "id": "datasetField:namespace:commonDataset:columnA", + "type": "DATASET_FIELD", + "data": { + "type": "DATASET_FIELD", + "namespace": "namespace", + "dataset": "otherDataset", + "field": "columnA", + "fieldType": "integer", + "transformationDescription": "identical", + "transformationType": "IDENTITY", + "inputFields": [ + { "namespace": "namespace" , "dataset": "otherDataset", "field": "columnB"} + ] + }, + "inEdges": [ + { + "origin": "datasetField:namespace:otherDataset:columnB", + "destination": "datasetField:namespace:commonDataset:columnA" + } + ], + "outEdges": [ + { + "origin": "datasetField:namespace:commonDataset:columnA", + "destination": "datasetField:namespace:otherDataset:columnC" + } + ] +}