Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Column lineage graph endpoint #2124

Merged
merged 1 commit into from
Oct 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
### Added
* Implemented dataset symlink feature which allows providing multiple names for a dataset and adds edges to lineage graph based on symlinks [`#2066`](https://github.com/MarquezProject/marquez/pull/2066) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
* Store column lineage facets in separate table [`#2096`](https://github.com/MarquezProject/marquez/pull/2096) [@mzareba382](https://github.com/mzareba382) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
* Lineage graph endpoint for column lineage [`#2124`](https://github.com/MarquezProject/marquez/pull/2124) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)

### Fixed
* Add support for `parentRun` facet as reported by older Airflow OpenLineage versions [@collado-mike](https://github.com/collado-mike)
Expand Down
11 changes: 11 additions & 0 deletions api/src/main/java/marquez/MarquezContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.List;
import lombok.Getter;
import lombok.NonNull;
import marquez.api.ColumnLineageResource;
import marquez.api.DatasetResource;
import marquez.api.JobResource;
import marquez.api.NamespaceResource;
Expand All @@ -22,6 +23,7 @@
import marquez.api.TagResource;
import marquez.api.exceptions.JdbiExceptionExceptionMapper;
import marquez.db.BaseDao;
import marquez.db.ColumnLineageDao;
import marquez.db.DatasetDao;
import marquez.db.DatasetFieldDao;
import marquez.db.DatasetVersionDao;
Expand All @@ -39,6 +41,7 @@
import marquez.db.TagDao;
import marquez.graphql.GraphqlSchemaBuilder;
import marquez.graphql.MarquezGraphqlServletBuilder;
import marquez.service.ColumnLineageService;
import marquez.service.DatasetFieldService;
import marquez.service.DatasetService;
import marquez.service.DatasetVersionService;
Expand Down Expand Up @@ -70,6 +73,7 @@ public final class MarquezContext {
@Getter private final TagDao tagDao;
@Getter private final OpenLineageDao openLineageDao;
@Getter private final LineageDao lineageDao;
@Getter private final ColumnLineageDao columnLineageDao;
@Getter private final SearchDao searchDao;
@Getter private final List<RunTransitionListener> runTransitionListeners;

Expand All @@ -81,9 +85,11 @@ public final class MarquezContext {
@Getter private final RunService runService;
@Getter private final OpenLineageService openLineageService;
@Getter private final LineageService lineageService;
@Getter private final ColumnLineageService columnLineageService;
@Getter private final NamespaceResource namespaceResource;
@Getter private final SourceResource sourceResource;
@Getter private final DatasetResource datasetResource;
@Getter private final ColumnLineageResource columnLineageResource;
@Getter private final JobResource jobResource;
@Getter private final TagResource tagResource;
@Getter private final OpenLineageResource openLineageResource;
Expand Down Expand Up @@ -115,6 +121,7 @@ private MarquezContext(
this.tagDao = jdbi.onDemand(TagDao.class);
this.openLineageDao = jdbi.onDemand(OpenLineageDao.class);
this.lineageDao = jdbi.onDemand(LineageDao.class);
this.columnLineageDao = jdbi.onDemand(ColumnLineageDao.class);
this.searchDao = jdbi.onDemand(SearchDao.class);
this.runTransitionListeners = runTransitionListeners;

Expand All @@ -128,6 +135,7 @@ private MarquezContext(
this.tagService.init(tags);
this.openLineageService = new OpenLineageService(baseDao, runService);
this.lineageService = new LineageService(lineageDao, jobDao);
this.columnLineageService = new ColumnLineageService(columnLineageDao, datasetFieldDao);
this.jdbiException = new JdbiExceptionExceptionMapper();
final ServiceFactory serviceFactory =
ServiceFactory.builder()
Expand All @@ -139,12 +147,14 @@ private MarquezContext(
.openLineageService(openLineageService)
.sourceService(sourceService)
.lineageService(lineageService)
.columnLineageService(columnLineageService)
.datasetFieldService(new DatasetFieldService(baseDao))
.datasetVersionService(new DatasetVersionService(baseDao))
.build();
this.namespaceResource = new NamespaceResource(serviceFactory);
this.sourceResource = new SourceResource(serviceFactory);
this.datasetResource = new DatasetResource(serviceFactory);
this.columnLineageResource = new ColumnLineageResource(serviceFactory);
this.jobResource = new JobResource(serviceFactory, jobVersionDao);
this.tagResource = new TagResource(serviceFactory);
this.openLineageResource = new OpenLineageResource(serviceFactory, openLineageDao);
Expand All @@ -155,6 +165,7 @@ private MarquezContext(
namespaceResource,
sourceResource,
datasetResource,
columnLineageResource,
jobResource,
tagResource,
jdbiException,
Expand Down
3 changes: 3 additions & 0 deletions api/src/main/java/marquez/api/BaseResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import marquez.common.models.NamespaceName;
import marquez.common.models.RunId;
import marquez.common.models.SourceName;
import marquez.service.ColumnLineageService;
import marquez.service.DatasetFieldService;
import marquez.service.DatasetService;
import marquez.service.DatasetVersionService;
Expand All @@ -50,6 +51,7 @@ public class BaseResource {
protected DatasetVersionService datasetVersionService;
protected DatasetFieldService datasetFieldService;
protected LineageService lineageService;
protected ColumnLineageService columnLineageService;

public BaseResource(ServiceFactory serviceFactory) {
this.serviceFactory = serviceFactory;
Expand All @@ -63,6 +65,7 @@ public BaseResource(ServiceFactory serviceFactory) {
this.datasetVersionService = serviceFactory.getDatasetVersionService();
this.datasetFieldService = serviceFactory.getDatasetFieldService();
this.lineageService = serviceFactory.getLineageService();
this.columnLineageService = serviceFactory.getColumnLineageService();
}

void throwIfNotExists(@NonNull NamespaceName namespaceName) {
Expand Down
48 changes: 48 additions & 0 deletions api/src/main/java/marquez/api/ColumnLineageResource.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.api;

import static javax.ws.rs.core.MediaType.APPLICATION_JSON;

import com.codahale.metrics.annotation.ExceptionMetered;
import com.codahale.metrics.annotation.ResponseMetered;
import com.codahale.metrics.annotation.Timed;
import java.time.Instant;
import java.util.concurrent.ExecutionException;
import javax.validation.constraints.NotNull;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Response;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import marquez.service.ServiceFactory;
import marquez.service.models.NodeId;

@Slf4j
@Path("/api/v1/column-lineage")
public class ColumnLineageResource extends BaseResource {

private static final String DEFAULT_DEPTH = "20";

public ColumnLineageResource(@NonNull final ServiceFactory serviceFactory) {
super(serviceFactory);
}

@Timed
@ResponseMetered
@ExceptionMetered
@GET
@Produces(APPLICATION_JSON)
public Response getLineage(
@QueryParam("nodeId") @NotNull NodeId nodeId,
@QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth)
throws ExecutionException, InterruptedException {
return Response.ok(columnLineageService.lineage(nodeId, depth, Instant.now())).build();
}
}
27 changes: 27 additions & 0 deletions api/src/main/java/marquez/common/models/DatasetFieldId.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.common.models;

import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;

/** ID for {@code DatasetField}. */
@EqualsAndHashCode
@AllArgsConstructor
@ToString
public class DatasetFieldId {

@Getter private final DatasetId datasetId;
@Getter private final FieldName fieldName;

public static DatasetFieldId of(String namespace, String datasetName, String field) {
return new DatasetFieldId(
new DatasetId(NamespaceName.of(namespace), DatasetName.of(datasetName)),
FieldName.of(field));
}
}
62 changes: 62 additions & 0 deletions api/src/main/java/marquez/db/ColumnLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,27 @@

package marquez.db;

import static org.jdbi.v3.sqlobject.customizer.BindList.EmptyHandling.NULL_STRING;

import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import marquez.db.mappers.ColumnLineageNodeDataMapper;
import marquez.db.mappers.ColumnLineageRowMapper;
import marquez.db.models.ColumnLineageNodeData;
import marquez.db.models.ColumnLineageRow;
import org.apache.commons.lang3.tuple.Pair;
import org.jdbi.v3.sqlobject.config.RegisterRowMapper;
import org.jdbi.v3.sqlobject.customizer.BindBeanList;
import org.jdbi.v3.sqlobject.customizer.BindList;
import org.jdbi.v3.sqlobject.statement.SqlQuery;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;

@RegisterRowMapper(ColumnLineageRowMapper.class)
@RegisterRowMapper(ColumnLineageNodeDataMapper.class)
public interface ColumnLineageDao extends BaseDao {

default List<ColumnLineageRow> upsertColumnLineageRow(
Expand Down Expand Up @@ -88,4 +95,59 @@ void doUpsertColumnLineageRow(
},
value = "values")
List<ColumnLineageRow> rows);

@SqlQuery(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most important piece of the PR: recursive query to extract column-lineage graph.
Only column_lineage table is used and joined to obtained graph nodes.
Other tables are only used to enrich found nodes.

"""
WITH RECURSIVE
dataset_fields_view AS (
SELECT d.namespace_name as namespace_name, d.name as dataset_name, df.name as field_name, df.type, df.uuid
FROM dataset_fields df
INNER JOIN datasets_view d ON d.uuid = df.dataset_uuid
),
column_lineage_recursive AS (
SELECT *, 0 as depth
FROM column_lineage
WHERE output_dataset_field_uuid IN (<datasetFieldUuids>) AND created_at <= :createdAtUntil
UNION
SELECT
upstream_node.output_dataset_version_uuid,
upstream_node.output_dataset_field_uuid,
upstream_node.input_dataset_version_uuid,
upstream_node.input_dataset_field_uuid,
upstream_node.transformation_description,
upstream_node.transformation_type,
upstream_node.created_at,
upstream_node.updated_at,
node.depth + 1 as depth
FROM column_lineage upstream_node, column_lineage_recursive node
WHERE node.input_dataset_field_uuid = upstream_node.output_dataset_field_uuid
AND node.depth < :depth
)
SELECT
output_fields.namespace_name,
output_fields.dataset_name,
output_fields.field_name,
output_fields.type,
ARRAY_AGG(ARRAY[input_fields.namespace_name, input_fields.dataset_name, input_fields.field_name]) AS inputFields,
clr.transformation_description,
clr.transformation_type,
clr.created_at,
clr.updated_at
FROM column_lineage_recursive clr
INNER JOIN dataset_fields_view output_fields ON clr.output_dataset_field_uuid = output_fields.uuid -- hidden datasets will be filtered
LEFT JOIN dataset_fields_view input_fields ON clr.input_dataset_field_uuid = input_fields.uuid
GROUP BY
output_fields.namespace_name,
output_fields.dataset_name,
output_fields.field_name,
output_fields.type,
clr.transformation_description,
clr.transformation_type,
clr.created_at,
clr.updated_at
""")
Set<ColumnLineageNodeData> getLineage(
int depth,
@BindList(onEmpty = NULL_STRING) List<UUID> datasetFieldUuids,
Instant createdAtUntil);
}
19 changes: 19 additions & 0 deletions api/src/main/java/marquez/db/DatasetFieldDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,25 @@ default Dataset updateTags(
+ "WHERE dataset_uuid = :datasetUuid AND name = :name")
Optional<UUID> findUuid(UUID datasetUuid, String name);

@SqlQuery(
"""
SELECT df.uuid
FROM dataset_fields df
INNER JOIN datasets_view AS d
ON d.uuid = df.dataset_uuid AND d.name = :datasetName AND d.namespace_name = :namespace
""")
List<UUID> findDatasetFieldsUuids(String namespace, String datasetName);

@SqlQuery(
"""
SELECT df.uuid
FROM dataset_fields df
INNER JOIN datasets_view AS d
ON d.uuid = df.dataset_uuid AND d.name = :datasetName AND d.namespace_name = :namespace
WHERE df.name = :name
""")
Optional<UUID> findUuid(String namespace, String datasetName, String name);

@SqlQuery(
"SELECT f.*, "
+ "ARRAY(SELECT t.name "
Expand Down
34 changes: 19 additions & 15 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -758,36 +758,40 @@ private List<ColumnLineageRow> upsertColumnLineage(

return Optional.ofNullable(ds.getFacets())
.map(DatasetFacets::getColumnLineage)
.map(LineageEvent.ColumnLineageFacet::getOutputColumnsList)
.map(LineageEvent.ColumnLineageDatasetFacet::getFields)
.map(LineageEvent.ColumnLineageDatasetFacetFields::getAdditional)
.stream()
.flatMap(list -> list.stream())
.flatMap(map -> map.keySet().stream())
.filter(
columnName ->
ds.getFacets().getColumnLineage().getFields().getAdditional().get(columnName)
instanceof LineageEvent.ColumnLineageOutputColumn)
.flatMap(
outputColumn -> {
columnName -> {
LineageEvent.ColumnLineageOutputColumn columnLineage =
ds.getFacets().getColumnLineage().getFields().getAdditional().get(columnName);
Optional<DatasetFieldRow> outputField =
datasetFields.stream()
.filter(dfr -> dfr.getName().equals(outputColumn.getName()))
.findAny();
datasetFields.stream().filter(dfr -> dfr.getName().equals(columnName)).findAny();

if (outputField.isEmpty()) {
Logger log = LoggerFactory.getLogger(OpenLineageDao.class);
log.error(
"Cannot produce column lineage for missing output field in output dataset: {}",
outputColumn.getName());
return Stream.empty();
columnName);
return Stream.<ColumnLineageRow>empty();
}

// get field uuids of input columns related to this run
List<Pair<UUID, UUID>> inputFields =
runFields.stream()
.filter(
fieldData ->
outputColumn.getInputFields().stream()
columnLineage.getInputFields().stream()
.filter(
of ->
of.getDatasetNamespace().equals(fieldData.getNamespace())
&& of.getDatasetName()
.equals(fieldData.getDatasetName())
&& of.getFieldName().equals(fieldData.getField()))
of.getNamespace().equals(fieldData.getNamespace())
&& of.getName().equals(fieldData.getDatasetName())
&& of.getField().equals(fieldData.getField()))
.findAny()
.isPresent())
.map(
Expand All @@ -802,8 +806,8 @@ private List<ColumnLineageRow> upsertColumnLineage(
datasetVersionRow.getUuid(),
outputField.get().getUuid(),
inputFields,
outputColumn.getTransformationDescription(),
outputColumn.getTransformationType(),
columnLineage.getTransformationDescription(),
columnLineage.getTransformationType(),
now)
.stream();
})
Expand Down
Loading