Skip to content

Commit

Permalink
add column lineage graph endpoint
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <[email protected]>
  • Loading branch information
pawel-big-lebowski committed Oct 7, 2022
1 parent b6544ec commit b79d712
Show file tree
Hide file tree
Showing 31 changed files with 1,405 additions and 81 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
### Added
* Implemented dataset symlink feature which allows providing multiple names for a dataset and adds edges to lineage graph based on symlinks [`#2066`](https://github.com/MarquezProject/marquez/pull/2066) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
* Store column lineage facets in separate table [`#2096`](https://github.com/MarquezProject/marquez/pull/2096) [@mzareba382](https://github.com/mzareba382) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
* Lineage graph endpoint for column lineage [`#2124`](https://github.com/MarquezProject/marquez/pull/2124) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)

### Fixed
* Add support for `parentRun` facet as reported by older Airflow OpenLineage versions [@collado-mike](https://github.com/collado-mike)
Expand Down
11 changes: 11 additions & 0 deletions api/src/main/java/marquez/MarquezContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.List;
import lombok.Getter;
import lombok.NonNull;
import marquez.api.ColumnLineageResource;
import marquez.api.DatasetResource;
import marquez.api.JobResource;
import marquez.api.NamespaceResource;
Expand All @@ -22,6 +23,7 @@
import marquez.api.TagResource;
import marquez.api.exceptions.JdbiExceptionExceptionMapper;
import marquez.db.BaseDao;
import marquez.db.ColumnLineageDao;
import marquez.db.DatasetDao;
import marquez.db.DatasetFieldDao;
import marquez.db.DatasetVersionDao;
Expand All @@ -39,6 +41,7 @@
import marquez.db.TagDao;
import marquez.graphql.GraphqlSchemaBuilder;
import marquez.graphql.MarquezGraphqlServletBuilder;
import marquez.service.ColumnLineageService;
import marquez.service.DatasetFieldService;
import marquez.service.DatasetService;
import marquez.service.DatasetVersionService;
Expand Down Expand Up @@ -70,6 +73,7 @@ public final class MarquezContext {
@Getter private final TagDao tagDao;
@Getter private final OpenLineageDao openLineageDao;
@Getter private final LineageDao lineageDao;
@Getter private final ColumnLineageDao columnLineageDao;
@Getter private final SearchDao searchDao;
@Getter private final List<RunTransitionListener> runTransitionListeners;

Expand All @@ -81,9 +85,11 @@ public final class MarquezContext {
@Getter private final RunService runService;
@Getter private final OpenLineageService openLineageService;
@Getter private final LineageService lineageService;
@Getter private final ColumnLineageService columnLineageService;
@Getter private final NamespaceResource namespaceResource;
@Getter private final SourceResource sourceResource;
@Getter private final DatasetResource datasetResource;
@Getter private final ColumnLineageResource columnLineageResource;
@Getter private final JobResource jobResource;
@Getter private final TagResource tagResource;
@Getter private final OpenLineageResource openLineageResource;
Expand Down Expand Up @@ -115,6 +121,7 @@ private MarquezContext(
this.tagDao = jdbi.onDemand(TagDao.class);
this.openLineageDao = jdbi.onDemand(OpenLineageDao.class);
this.lineageDao = jdbi.onDemand(LineageDao.class);
this.columnLineageDao = jdbi.onDemand(ColumnLineageDao.class);
this.searchDao = jdbi.onDemand(SearchDao.class);
this.runTransitionListeners = runTransitionListeners;

Expand All @@ -128,6 +135,7 @@ private MarquezContext(
this.tagService.init(tags);
this.openLineageService = new OpenLineageService(baseDao, runService);
this.lineageService = new LineageService(lineageDao, jobDao);
this.columnLineageService = new ColumnLineageService(columnLineageDao, datasetFieldDao);
this.jdbiException = new JdbiExceptionExceptionMapper();
final ServiceFactory serviceFactory =
ServiceFactory.builder()
Expand All @@ -139,12 +147,14 @@ private MarquezContext(
.openLineageService(openLineageService)
.sourceService(sourceService)
.lineageService(lineageService)
.columnLineageService(columnLineageService)
.datasetFieldService(new DatasetFieldService(baseDao))
.datasetVersionService(new DatasetVersionService(baseDao))
.build();
this.namespaceResource = new NamespaceResource(serviceFactory);
this.sourceResource = new SourceResource(serviceFactory);
this.datasetResource = new DatasetResource(serviceFactory);
this.columnLineageResource = new ColumnLineageResource(serviceFactory);
this.jobResource = new JobResource(serviceFactory, jobVersionDao);
this.tagResource = new TagResource(serviceFactory);
this.openLineageResource = new OpenLineageResource(serviceFactory, openLineageDao);
Expand All @@ -155,6 +165,7 @@ private MarquezContext(
namespaceResource,
sourceResource,
datasetResource,
columnLineageResource,
jobResource,
tagResource,
jdbiException,
Expand Down
3 changes: 3 additions & 0 deletions api/src/main/java/marquez/api/BaseResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import marquez.common.models.NamespaceName;
import marquez.common.models.RunId;
import marquez.common.models.SourceName;
import marquez.service.ColumnLineageService;
import marquez.service.DatasetFieldService;
import marquez.service.DatasetService;
import marquez.service.DatasetVersionService;
Expand All @@ -50,6 +51,7 @@ public class BaseResource {
protected DatasetVersionService datasetVersionService;
protected DatasetFieldService datasetFieldService;
protected LineageService lineageService;
protected ColumnLineageService columnLineageService;

public BaseResource(ServiceFactory serviceFactory) {
this.serviceFactory = serviceFactory;
Expand All @@ -63,6 +65,7 @@ public BaseResource(ServiceFactory serviceFactory) {
this.datasetVersionService = serviceFactory.getDatasetVersionService();
this.datasetFieldService = serviceFactory.getDatasetFieldService();
this.lineageService = serviceFactory.getLineageService();
this.columnLineageService = serviceFactory.getColumnLineageService();
}

void throwIfNotExists(@NonNull NamespaceName namespaceName) {
Expand Down
48 changes: 48 additions & 0 deletions api/src/main/java/marquez/api/ColumnLineageResource.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.api;

import static javax.ws.rs.core.MediaType.APPLICATION_JSON;

import com.codahale.metrics.annotation.ExceptionMetered;
import com.codahale.metrics.annotation.ResponseMetered;
import com.codahale.metrics.annotation.Timed;
import java.time.Instant;
import java.util.concurrent.ExecutionException;
import javax.validation.constraints.NotNull;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Response;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import marquez.service.ServiceFactory;
import marquez.service.models.NodeId;

@Slf4j
@Path("/api/v1/column-lineage")
public class ColumnLineageResource extends BaseResource {

private static final String DEFAULT_DEPTH = "20";

public ColumnLineageResource(@NonNull final ServiceFactory serviceFactory) {
super(serviceFactory);
}

@Timed
@ResponseMetered
@ExceptionMetered
@GET
@Produces(APPLICATION_JSON)
public Response getLineage(
@QueryParam("nodeId") @NotNull NodeId nodeId,
@QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth)
throws ExecutionException, InterruptedException {
return Response.ok(columnLineageService.lineage(nodeId, depth, Instant.now())).build();
}
}
27 changes: 27 additions & 0 deletions api/src/main/java/marquez/common/models/DatasetFieldId.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.common.models;

import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;

/** ID for {@code DatasetField}. */
@EqualsAndHashCode
@AllArgsConstructor
@ToString
public class DatasetFieldId {

@Getter private final DatasetId datasetId;
@Getter private final FieldName fieldName;

public static DatasetFieldId of(String namespace, String datasetName, String field) {
return new DatasetFieldId(
new DatasetId(NamespaceName.of(namespace), DatasetName.of(datasetName)),
FieldName.of(field));
}
}
62 changes: 62 additions & 0 deletions api/src/main/java/marquez/db/ColumnLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,27 @@

package marquez.db;

import static org.jdbi.v3.sqlobject.customizer.BindList.EmptyHandling.NULL_STRING;

import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import marquez.db.mappers.ColumnLineageNodeDataMapper;
import marquez.db.mappers.ColumnLineageRowMapper;
import marquez.db.models.ColumnLineageNodeData;
import marquez.db.models.ColumnLineageRow;
import org.apache.commons.lang3.tuple.Pair;
import org.jdbi.v3.sqlobject.config.RegisterRowMapper;
import org.jdbi.v3.sqlobject.customizer.BindBeanList;
import org.jdbi.v3.sqlobject.customizer.BindList;
import org.jdbi.v3.sqlobject.statement.SqlQuery;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;

@RegisterRowMapper(ColumnLineageRowMapper.class)
@RegisterRowMapper(ColumnLineageNodeDataMapper.class)
public interface ColumnLineageDao extends BaseDao {

default List<ColumnLineageRow> upsertColumnLineageRow(
Expand Down Expand Up @@ -88,4 +95,59 @@ void doUpsertColumnLineageRow(
},
value = "values")
List<ColumnLineageRow> rows);

@SqlQuery(
"""
WITH RECURSIVE
dataset_fields_view AS (
SELECT d.namespace_name as namespace_name, d.name as dataset_name, df.name as field_name, df.type, df.uuid
FROM dataset_fields df
INNER JOIN datasets_view d ON d.uuid = df.dataset_uuid
),
column_lineage_recursive AS (
SELECT *, 0 as depth
FROM column_lineage
WHERE output_dataset_field_uuid IN (<datasetFieldUuids>) AND created_at <= :createdAtUntil
UNION
SELECT
upstream_node.output_dataset_version_uuid,
upstream_node.output_dataset_field_uuid,
upstream_node.input_dataset_version_uuid,
upstream_node.input_dataset_field_uuid,
upstream_node.transformation_description,
upstream_node.transformation_type,
upstream_node.created_at,
upstream_node.updated_at,
node.depth + 1 as depth
FROM column_lineage upstream_node, column_lineage_recursive node
WHERE node.input_dataset_field_uuid = upstream_node.output_dataset_field_uuid
AND node.depth < :depth
)
SELECT
output_fields.namespace_name,
output_fields.dataset_name,
output_fields.field_name,
output_fields.type,
ARRAY_AGG(ARRAY[input_fields.namespace_name, input_fields.dataset_name, input_fields.field_name]) AS inputFields,
clr.transformation_description,
clr.transformation_type,
clr.created_at,
clr.updated_at
FROM column_lineage_recursive clr
INNER JOIN dataset_fields_view output_fields ON clr.output_dataset_field_uuid = output_fields.uuid -- hidden datasets will be filtered
LEFT JOIN dataset_fields_view input_fields ON clr.input_dataset_field_uuid = input_fields.uuid
GROUP BY
output_fields.namespace_name,
output_fields.dataset_name,
output_fields.field_name,
output_fields.type,
clr.transformation_description,
clr.transformation_type,
clr.created_at,
clr.updated_at
""")
Set<ColumnLineageNodeData> getLineage(
int depth,
@BindList(onEmpty = NULL_STRING) List<UUID> datasetFieldUuids,
Instant createdAtUntil);
}
19 changes: 19 additions & 0 deletions api/src/main/java/marquez/db/DatasetFieldDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,25 @@ default Dataset updateTags(
+ "WHERE dataset_uuid = :datasetUuid AND name = :name")
Optional<UUID> findUuid(UUID datasetUuid, String name);

@SqlQuery(
"""
SELECT df.uuid
FROM dataset_fields df
INNER JOIN datasets_view AS d
ON d.uuid = df.dataset_uuid AND d.name = :datasetName AND d.namespace_name = :namespace
""")
List<UUID> findDatasetFieldsUuids(String namespace, String datasetName);

@SqlQuery(
"""
SELECT df.uuid
FROM dataset_fields df
INNER JOIN datasets_view AS d
ON d.uuid = df.dataset_uuid AND d.name = :datasetName AND d.namespace_name = :namespace
WHERE df.name = :name
""")
Optional<UUID> findUuid(String namespace, String datasetName, String name);

@SqlQuery(
"SELECT f.*, "
+ "ARRAY(SELECT t.name "
Expand Down
34 changes: 19 additions & 15 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -731,36 +731,40 @@ private List<ColumnLineageRow> upsertColumnLineage(

return Optional.ofNullable(ds.getFacets())
.map(DatasetFacets::getColumnLineage)
.map(LineageEvent.ColumnLineageFacet::getOutputColumnsList)
.map(LineageEvent.ColumnLineageDatasetFacet::getFields)
.map(LineageEvent.ColumnLineageDatasetFacetFields::getAdditional)
.stream()
.flatMap(list -> list.stream())
.flatMap(map -> map.keySet().stream())
.filter(
columnName ->
ds.getFacets().getColumnLineage().getFields().getAdditional().get(columnName)
instanceof LineageEvent.ColumnLineageOutputColumn)
.flatMap(
outputColumn -> {
columnName -> {
LineageEvent.ColumnLineageOutputColumn columnLineage =
ds.getFacets().getColumnLineage().getFields().getAdditional().get(columnName);
Optional<DatasetFieldRow> outputField =
datasetFields.stream()
.filter(dfr -> dfr.getName().equals(outputColumn.getName()))
.findAny();
datasetFields.stream().filter(dfr -> dfr.getName().equals(columnName)).findAny();

if (outputField.isEmpty()) {
Logger log = LoggerFactory.getLogger(OpenLineageDao.class);
log.error(
"Cannot produce column lineage for missing output field in output dataset: {}",
outputColumn.getName());
return Stream.empty();
columnName);
return Stream.<ColumnLineageRow>empty();
}

// get field uuids of input columns related to this run
List<Pair<UUID, UUID>> inputFields =
runFields.stream()
.filter(
fieldData ->
outputColumn.getInputFields().stream()
columnLineage.getInputFields().stream()
.filter(
of ->
of.getDatasetNamespace().equals(fieldData.getNamespace())
&& of.getDatasetName()
.equals(fieldData.getDatasetName())
&& of.getFieldName().equals(fieldData.getField()))
of.getNamespace().equals(fieldData.getNamespace())
&& of.getName().equals(fieldData.getDatasetName())
&& of.getField().equals(fieldData.getField()))
.findAny()
.isPresent())
.map(
Expand All @@ -775,8 +779,8 @@ private List<ColumnLineageRow> upsertColumnLineage(
datasetVersionRow.getUuid(),
outputField.get().getUuid(),
inputFields,
outputColumn.getTransformationDescription(),
outputColumn.getTransformationType(),
columnLineage.getTransformationDescription(),
columnLineage.getTransformationType(),
now)
.stream();
})
Expand Down
Loading

0 comments on commit b79d712

Please sign in to comment.