Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix column lineage when multiple jobs write to same dataset #2289

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
* Allow null column type in column-lineage [`#2272`](https://github.com/MarquezProject/marquez/pull/2272) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
* Include error message for JSON processing exception [`#2271`](https://github.com/MarquezProject/marquez/pull/2271) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
*In case of JSON processing exceptions Marquez API should return exception message to a client.*
* Fix column lineage when multiple jobs write to same dataset [`#2289`](https://github.com/MarquezProject/marquez/pull/2289) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
*The fix deprecates the way fields `transformationDescription` and `transformationType` are returned. The depracated way of returning those fields will be removed in 0.30.0.*

## [0.28.0](https://github.com/MarquezProject/marquez/compare/0.27.0...0.28.0) - 2022-11-21

Expand Down
43 changes: 20 additions & 23 deletions api/src/main/java/marquez/db/ColumnLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,15 @@ WHERE output_dataset_field_uuid IN (<datasetFieldUuids>)
output_fields.dataset_name,
output_fields.field_name,
output_fields.type,
ARRAY_AGG(DISTINCT ARRAY[input_fields.namespace_name, input_fields.dataset_name, CAST(clr.input_dataset_version_uuid AS VARCHAR), input_fields.field_name]) AS inputFields,
clr.output_dataset_version_uuid as dataset_version_uuid,
clr.transformation_description,
clr.transformation_type,
clr.created_at,
clr.updated_at
ARRAY_AGG(DISTINCT ARRAY[
input_fields.namespace_name,
input_fields.dataset_name,
CAST(clr.input_dataset_version_uuid AS VARCHAR),
input_fields.field_name,
clr.transformation_description,
clr.transformation_type
]) AS inputFields,
clr.output_dataset_version_uuid as dataset_version_uuid
FROM column_lineage_recursive clr
INNER JOIN dataset_fields_view output_fields ON clr.output_dataset_field_uuid = output_fields.uuid -- hidden datasets will be filtered
LEFT JOIN dataset_fields_view input_fields ON clr.input_dataset_field_uuid = input_fields.uuid
Expand All @@ -161,11 +164,7 @@ WHERE output_dataset_field_uuid IN (<datasetFieldUuids>)
output_fields.dataset_name,
output_fields.field_name,
output_fields.type,
clr.output_dataset_version_uuid,
clr.transformation_description,
clr.transformation_type,
clr.created_at,
clr.updated_at
clr.output_dataset_version_uuid
""")
Set<ColumnLineageNodeData> getLineage(
int depth,
Expand Down Expand Up @@ -193,25 +192,23 @@ dataset_fields_view AS (
output_fields.dataset_name,
output_fields.field_name,
output_fields.type,
ARRAY_AGG(DISTINCT ARRAY[input_fields.namespace_name, input_fields.dataset_name, CAST(c.input_dataset_version_uuid AS VARCHAR), input_fields.field_name]) AS inputFields,
c.output_dataset_version_uuid as dataset_version_uuid,
c.transformation_description,
c.transformation_type,
c.created_at,
c.updated_at
ARRAY_AGG(DISTINCT ARRAY[
input_fields.namespace_name,
input_fields.dataset_name,
CAST(c.input_dataset_version_uuid AS VARCHAR),
input_fields.field_name,
c.transformation_description,
c.transformation_type
]) AS inputFields,
null as dataset_version_uuid
FROM selected_column_lineage c
INNER JOIN dataset_fields_view output_fields ON c.output_dataset_field_uuid = output_fields.uuid
LEFT JOIN dataset_fields_view input_fields ON c.input_dataset_field_uuid = input_fields.uuid
GROUP BY
output_fields.namespace_name,
output_fields.dataset_name,
output_fields.field_name,
output_fields.type,
c.output_dataset_version_uuid,
c.transformation_description,
c.transformation_type,
c.created_at,
c.updated_at
output_fields.type
""")
/**
* Each dataset is identified by a pair of strings (namespace and name). A query returns column
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@

package marquez.db.mappers;

import static marquez.db.Columns.TRANSFORMATION_DESCRIPTION;
import static marquez.db.Columns.TRANSFORMATION_TYPE;
import static marquez.db.Columns.stringOrNull;
import static marquez.db.Columns.stringOrThrow;
import static marquez.db.Columns.uuidOrThrow;
import static marquez.db.Columns.uuidOrNull;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -37,11 +35,9 @@ public ColumnLineageNodeData map(ResultSet results, StatementContext ctx) throws
return new ColumnLineageNodeData(
stringOrThrow(results, Columns.NAMESPACE_NAME),
stringOrThrow(results, Columns.DATASET_NAME),
uuidOrThrow(results, Columns.DATASET_VERSION_UUID),
uuidOrNull(results, Columns.DATASET_VERSION_UUID),
stringOrThrow(results, Columns.FIELD_NAME),
stringOrNull(results, Columns.TYPE),
stringOrNull(results, TRANSFORMATION_DESCRIPTION),
stringOrNull(results, TRANSFORMATION_TYPE),
toInputFields(results, "inputFields"));
}

Expand All @@ -57,7 +53,10 @@ public static ImmutableList<InputFieldNodeData> toInputFields(ResultSet results,
return ImmutableList.copyOf(
Arrays.asList(deserializedArray).stream()
.map(o -> (String[]) o)
.map(arr -> new InputFieldNodeData(arr[0], arr[1], UUID.fromString(arr[2]), arr[3]))
.map(
arr ->
new InputFieldNodeData(
arr[0], arr[1], UUID.fromString(arr[2]), arr[3], arr[4], arr[5]))
.collect(Collectors.toList()));
}
}
49 changes: 45 additions & 4 deletions api/src/main/java/marquez/db/models/ColumnLineageNodeData.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,63 @@

package marquez.db.models;

import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Function;
import javax.annotation.Nullable;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NonNull;
import marquez.service.models.ColumnLineageInputField;

@Getter
@AllArgsConstructor
public class ColumnLineageNodeData implements NodeData {
@NonNull String namespace;
@NonNull String dataset;
@Nullable UUID datasetVersion;
@NonNull String field;
@Nullable String fieldType;
String transformationDescription;
String transformationType;
@Nullable String transformationDescription;
@Nullable String transformationType;
@NonNull List<InputFieldNodeData> inputFields;

public ColumnLineageNodeData(
String namespace,
String dataset,
UUID datasetVersion,
String field,
String fieldType,
ImmutableList<InputFieldNodeData> inputFields) {
this.namespace = namespace;
this.dataset = dataset;
this.datasetVersion = datasetVersion;
this.field = field;
this.fieldType = fieldType;
this.inputFields = inputFields;
}

/**
* @deprecated Moved into {@link ColumnLineageInputField} to support multiple jobs writing to a
* single dataset. This method is scheduled to be removed in release {@code 0.30.0}.
*/
public String getTransformationDescription() {
return Optional.ofNullable(inputFields).map(List::stream).stream()
.flatMap(Function.identity())
.findAny()
.map(d -> d.getTransformationDescription())
.orElse(null);
}

/**
* @deprecated Moved into {@link ColumnLineageInputField} to support multiple jobs writing to a
* single dataset. This method is scheduled to be removed in release {@code 0.30.0}.
*/
public String getTransformationType() {
return Optional.ofNullable(inputFields).map(List::stream).stream()
.flatMap(Function.identity())
.findAny()
.map(d -> d.getTransformationType())
.orElse(null);
}
}
2 changes: 2 additions & 0 deletions api/src/main/java/marquez/db/models/InputFieldNodeData.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,6 @@ public class InputFieldNodeData {
@NonNull String dataset;
@Nullable UUID datasetVersion;
@NonNull String field;
String transformationDescription;
String transformationType;
}
8 changes: 5 additions & 3 deletions api/src/main/java/marquez/service/ColumnLineageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -226,14 +226,16 @@ public void enrichWithColumnLineage(List<Dataset> datasets) {
.add(
ColumnLineage.builder()
.name(nodeData.getField())
.transformationDescription(nodeData.getTransformationDescription())
.transformationType(nodeData.getTransformationType())
.inputFields(
nodeData.getInputFields().stream()
.map(
f ->
new ColumnLineageInputField(
f.getNamespace(), f.getDataset(), f.getField()))
f.getNamespace(),
f.getDataset(),
f.getField(),
f.getTransformationDescription(),
f.getTransformationType()))
.collect(Collectors.toList()))
.build());
});
Expand Down
32 changes: 30 additions & 2 deletions api/src/main/java/marquez/service/models/ColumnLineage.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
package marquez.service.models;

import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import lombok.Builder;
import lombok.EqualsAndHashCode;
Expand All @@ -19,6 +22,31 @@
public class ColumnLineage {
@NotNull private String name;
@NotNull private List<ColumnLineageInputField> inputFields;
@NotNull private String transformationDescription;
@NotNull private String transformationType;

@Nullable private String transformationDescription;
@Nullable private String transformationType;

/**
* @deprecated Moved into {@link ColumnLineageInputField} to support multiple jobs writing to a
* single dataset. This method is scheduled to be removed in release {@code 0.30.0}.
*/
public String getTransformationDescription() {
return Optional.ofNullable(inputFields).map(List::stream).stream()
.flatMap(Function.identity())
.findAny()
.map(d -> d.getTransformationDescription())
.orElse(null);
}

/**
* @deprecated Moved into {@link ColumnLineageInputField} to support multiple jobs writing to a
* single dataset. This method is scheduled to be removed in release {@code 0.30.0}.
*/
public String getTransformationType() {
return Optional.ofNullable(inputFields).map(List::stream).stream()
.flatMap(Function.identity())
.findAny()
.map(d -> d.getTransformationType())
.orElse(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,6 @@ public class ColumnLineageInputField {
@NotNull private String namespace;
@NotNull private String dataset;
@NotNull private String field;
@NotNull private String transformationDescription;
@NotNull private String transformationType;
}
Loading