From 804adf6e4364b06968039f637cc938ef370facf8 Mon Sep 17 00:00:00 2001 From: Pawel Leszczynski Date: Wed, 5 Oct 2022 08:50:09 +0200 Subject: [PATCH] marquez java client Signed-off-by: Pawel Leszczynski --- .../java/marquez/client/MarquezClient.java | 50 +++++++ .../java/marquez/client/MarquezPathV1.java | 4 + .../main/java/marquez/client/MarquezUrl.java | 21 +++ .../src/main/java/marquez/client/Utils.java | 23 +++ .../marquez/client/models/ColumnLineage.java | 2 +- .../client/models/ColumnLineageNodeData.java | 31 ++++ .../marquez/client/models/DatasetFieldId.java | 16 ++ .../main/java/marquez/client/models/Edge.java | 19 +++ .../main/java/marquez/client/models/Node.java | 120 +++++++++++++++ .../java/marquez/client/models/NodeData.java | 16 ++ .../java/marquez/client/models/NodeId.java | 140 ++++++++++++++++++ .../java/marquez/client/models/NodeType.java | 13 ++ .../marquez/client/MarquezClientTest.java | 95 ++++++++++++ .../marquez/client/MarquezPathV1Test.java | 5 + .../java/marquez/client/MarquezUrlTest.java | 11 ++ .../marquez/client/models/NodeIdTest.java | 50 +++++++ 16 files changed, 615 insertions(+), 1 deletion(-) create mode 100644 clients/java/src/main/java/marquez/client/models/ColumnLineageNodeData.java create mode 100644 clients/java/src/main/java/marquez/client/models/DatasetFieldId.java create mode 100644 clients/java/src/main/java/marquez/client/models/Edge.java create mode 100644 clients/java/src/main/java/marquez/client/models/Node.java create mode 100644 clients/java/src/main/java/marquez/client/models/NodeData.java create mode 100644 clients/java/src/main/java/marquez/client/models/NodeId.java create mode 100644 clients/java/src/main/java/marquez/client/models/NodeType.java create mode 100644 clients/java/src/test/java/marquez/client/models/NodeIdTest.java diff --git a/clients/java/src/main/java/marquez/client/MarquezClient.java b/clients/java/src/main/java/marquez/client/MarquezClient.java index ba2f3d21eb..ee10d2f4a4 100644 --- a/clients/java/src/main/java/marquez/client/MarquezClient.java +++ b/clients/java/src/main/java/marquez/client/MarquezClient.java @@ -43,6 +43,7 @@ import marquez.client.models.LineageEvent; import marquez.client.models.Namespace; import marquez.client.models.NamespaceMeta; +import marquez.client.models.Node; import marquez.client.models.Run; import marquez.client.models.RunMeta; import marquez.client.models.RunState; @@ -57,6 +58,7 @@ public class MarquezClient { static final URL DEFAULT_BASE_URL = Utils.toUrl("http://localhost:8080"); + static final int DEFAULT_LINEAGE_GRAPH_DEPTH = 20; @VisibleForTesting static final int DEFAULT_LIMIT = 100; @VisibleForTesting static final int DEFAULT_OFFSET = 0; @@ -113,6 +115,36 @@ public enum SortDirection { @Getter public final String value; } + public Lineage getColumnLineage(@NonNull String namespaceName, @NonNull String datasetName) { + return getColumnLineage(namespaceName, datasetName, DEFAULT_LINEAGE_GRAPH_DEPTH, false); + } + + public Lineage getColumnLineage( + @NonNull String namespaceName, @NonNull String datasetName, @NonNull String field) { + return getColumnLineage(namespaceName, datasetName, field, DEFAULT_LINEAGE_GRAPH_DEPTH, false); + } + + public Lineage getColumnLineage( + @NonNull String namespaceName, + @NonNull String datasetName, + int depth, + boolean withDownstream) { + final String bodyAsJson = + http.get(url.toColumnLineageUrl(namespaceName, datasetName, depth, withDownstream)); + return Lineage.fromJson(bodyAsJson); + } + + public Lineage getColumnLineage( + @NonNull String namespaceName, + @NonNull String datasetName, + @NonNull String field, + int depth, + boolean withDownstream) { + final String bodyAsJson = + http.get(url.toColumnLineageUrl(namespaceName, datasetName, field, depth, withDownstream)); + return Lineage.fromJson(bodyAsJson); + } + public Namespace createNamespace( @NonNull String namespaceName, @NonNull NamespaceMeta namespaceMeta) { final String bodyAsJson = http.put(url.toNamespaceUrl(namespaceName), namespaceMeta.toJson()); @@ -660,4 +692,22 @@ String toJson() { return Utils.toJson(this); } } + + @Value + static class Lineage { + @Getter Set graph; + + @JsonCreator + Lineage(@JsonProperty("graph") final Set value) { + this.graph = ImmutableSet.copyOf(value); + } + + static Lineage fromJson(final String json) { + return Utils.fromJson(json, new TypeReference() {}); + } + + String toJson() { + return Utils.toJson(this); + } + } } diff --git a/clients/java/src/main/java/marquez/client/MarquezPathV1.java b/clients/java/src/main/java/marquez/client/MarquezPathV1.java index 11f2eaf0e3..298d2a3348 100644 --- a/clients/java/src/main/java/marquez/client/MarquezPathV1.java +++ b/clients/java/src/main/java/marquez/client/MarquezPathV1.java @@ -177,4 +177,8 @@ static String createTagPath(String name) { static String searchPath() { return path("/search"); } + + static String columnLineagePath() { + return path("/column-lineage/"); + } } diff --git a/clients/java/src/main/java/marquez/client/MarquezUrl.java b/clients/java/src/main/java/marquez/client/MarquezUrl.java index 51abf56637..a8b5affab3 100644 --- a/clients/java/src/main/java/marquez/client/MarquezUrl.java +++ b/clients/java/src/main/java/marquez/client/MarquezUrl.java @@ -7,6 +7,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static java.time.format.DateTimeFormatter.ISO_INSTANT; +import static marquez.client.MarquezPathV1.columnLineagePath; import static marquez.client.MarquezPathV1.createRunPath; import static marquez.client.MarquezPathV1.createTagPath; import static marquez.client.MarquezPathV1.datasetPath; @@ -41,6 +42,9 @@ import java.util.Map; import javax.annotation.Nullable; import lombok.NonNull; +import marquez.client.models.DatasetFieldId; +import marquez.client.models.DatasetId; +import marquez.client.models.NodeId; import marquez.client.models.RunState; import marquez.client.models.SearchFilter; import marquez.client.models.SearchSort; @@ -205,4 +209,21 @@ URL toSearchUrl( queryParams.put("limit", limit); return from(searchPath(), queryParams.build()); } + + URL toColumnLineageUrl( + String namespace, String dataset, String field, int depth, boolean withDownstream) { + final ImmutableMap.Builder queryParams = new ImmutableMap.Builder(); + queryParams.put("nodeId", NodeId.of(new DatasetFieldId(namespace, dataset, field)).getValue()); + queryParams.put("depth", String.valueOf(depth)); + queryParams.put("withDownstream", String.valueOf(withDownstream)); + return from(columnLineagePath(), queryParams.build()); + } + + URL toColumnLineageUrl(String namespace, String dataset, int depth, boolean withDownstream) { + final ImmutableMap.Builder queryParams = new ImmutableMap.Builder(); + queryParams.put("nodeId", NodeId.of(new DatasetId(namespace, dataset)).getValue()); + queryParams.put("depth", String.valueOf(depth)); + queryParams.put("withDownstream", String.valueOf(withDownstream)); + return from(columnLineagePath(), queryParams.build()); + } } diff --git a/clients/java/src/main/java/marquez/client/Utils.java b/clients/java/src/main/java/marquez/client/Utils.java index f1b5881138..52fcf4744e 100644 --- a/clients/java/src/main/java/marquez/client/Utils.java +++ b/clients/java/src/main/java/marquez/client/Utils.java @@ -5,6 +5,7 @@ package marquez.client; +import static com.google.common.base.Strings.lenientFormat; import static org.apache.http.HttpHeaders.AUTHORIZATION; import com.fasterxml.jackson.annotation.JsonInclude; @@ -17,6 +18,7 @@ import java.io.UncheckedIOException; import java.net.MalformedURLException; import java.net.URL; +import javax.annotation.Nullable; import lombok.NonNull; import org.apache.http.client.methods.HttpRequestBase; @@ -70,4 +72,25 @@ public static void addAuthTo( @NonNull final HttpRequestBase request, @NonNull final String apiKey) { request.addHeader(AUTHORIZATION, "Bearer " + apiKey); } + + public static String checkNotBlank(@NonNull final String arg) { + if (emptyOrBlank(arg)) { + throw new IllegalArgumentException(); + } + return arg; + } + + public static String checkNotBlank( + @NonNull final String arg, + @Nullable final String errorMessage, + @Nullable final Object... errorMessageArgs) { + if (emptyOrBlank(arg)) { + throw new IllegalArgumentException(lenientFormat(errorMessage, errorMessageArgs)); + } + return arg; + } + + private static boolean emptyOrBlank(final String arg) { + return arg.trim().isEmpty(); + } } diff --git a/clients/java/src/main/java/marquez/client/models/ColumnLineage.java b/clients/java/src/main/java/marquez/client/models/ColumnLineage.java index 4b23b65b2a..0668555ea8 100644 --- a/clients/java/src/main/java/marquez/client/models/ColumnLineage.java +++ b/clients/java/src/main/java/marquez/client/models/ColumnLineage.java @@ -18,7 +18,7 @@ @Getter public class ColumnLineage { @NonNull private String name; - @NonNull private List inputFields; + @NonNull private List inputFields; @NonNull private String transformationDescription; @NonNull private String transformationType; } diff --git a/clients/java/src/main/java/marquez/client/models/ColumnLineageNodeData.java b/clients/java/src/main/java/marquez/client/models/ColumnLineageNodeData.java new file mode 100644 index 0000000000..71a8855a85 --- /dev/null +++ b/clients/java/src/main/java/marquez/client/models/ColumnLineageNodeData.java @@ -0,0 +1,31 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.client.models; + +import com.fasterxml.jackson.core.type.TypeReference; +import java.util.List; +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NonNull; +import marquez.client.Utils; + +@Getter +@AllArgsConstructor +@EqualsAndHashCode +public class ColumnLineageNodeData implements NodeData { + @NonNull String namespace; + @NonNull String dataset; + @NonNull String field; + @NonNull String fieldType; + @NonNull String transformationDescription; + @NonNull String transformationType; + @NonNull List inputFields; + + public static ColumnLineageNodeData fromJson(@NonNull final String json) { + return Utils.fromJson(json, new TypeReference() {}); + } +} diff --git a/clients/java/src/main/java/marquez/client/models/DatasetFieldId.java b/clients/java/src/main/java/marquez/client/models/DatasetFieldId.java new file mode 100644 index 0000000000..3f924adee3 --- /dev/null +++ b/clients/java/src/main/java/marquez/client/models/DatasetFieldId.java @@ -0,0 +1,16 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.client.models; + +import lombok.NonNull; +import lombok.Value; + +@Value +public class DatasetFieldId { + @NonNull String namespace; + @NonNull String dataset; + @NonNull String field; +} diff --git a/clients/java/src/main/java/marquez/client/models/Edge.java b/clients/java/src/main/java/marquez/client/models/Edge.java new file mode 100644 index 0000000000..e00e2c93ec --- /dev/null +++ b/clients/java/src/main/java/marquez/client/models/Edge.java @@ -0,0 +1,19 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.client.models; + +import lombok.NonNull; +import lombok.Value; + +@Value +public class Edge { + @NonNull NodeId origin; + @NonNull NodeId destination; + + public static Edge of(@NonNull final NodeId origin, @NonNull final NodeId destination) { + return new Edge(origin, destination); + } +} diff --git a/clients/java/src/main/java/marquez/client/models/Node.java b/clients/java/src/main/java/marquez/client/models/Node.java new file mode 100644 index 0000000000..7da04f927c --- /dev/null +++ b/clients/java/src/main/java/marquez/client/models/Node.java @@ -0,0 +1,120 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.client.models; + +import static marquez.client.Utils.checkNotBlank; + +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.ImmutableSortedSet; +import com.google.common.collect.Sets; +import java.util.Set; +import javax.annotation.Nullable; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NonNull; +import lombok.Setter; +import lombok.ToString; + +@EqualsAndHashCode +@ToString +@JsonPropertyOrder({"id", "type", "data", "inEdges", "outEdges"}) +public final class Node { + @Getter private final NodeId id; + @Getter private final NodeType type; + @Getter @Setter @Nullable private NodeData data; + @Getter private final Set inEdges; + @Getter private final Set outEdges; + + public Node( + @NonNull final NodeId id, + @NonNull final NodeType type, + @Nullable final NodeData data, + @Nullable final Set inEdges, + @Nullable final Set outEdges) { + this.id = id; + this.type = type; + this.data = data; + this.inEdges = (inEdges == null) ? ImmutableSet.of() : ImmutableSortedSet.copyOf(inEdges); + this.outEdges = (outEdges == null) ? ImmutableSet.of() : ImmutableSortedSet.copyOf(outEdges); + } + + public static Builder dataset() { + return new Builder(NodeType.DATASET); + } + + public static Builder datasetField() { + return new Builder(NodeType.DATASET_FIELD); + } + + public static Builder job() { + return new Builder(NodeType.JOB); + } + + public static Builder run() { + return new Builder(NodeType.RUN); + } + + public boolean hasInEdges() { + return !inEdges.isEmpty(); + } + + public boolean hasOutEdges() { + return !outEdges.isEmpty(); + } + + public static final class Builder { + private NodeId id; + private final NodeType type; + private NodeData data; + private Set inEdges; + private Set outEdges; + + private Builder(@NonNull final NodeType type) { + this.type = type; + this.inEdges = ImmutableSet.of(); + this.outEdges = ImmutableSet.of(); + } + + public Builder id(@NonNull String idString) { + return id(NodeId.of(checkNotBlank(idString))); + } + + public Builder id(@NonNull NodeId id) { + this.id = id; + return this; + } + + public Builder data(@Nullable NodeData data) { + this.data = data; + return this; + } + + public Builder inEdges(@NonNull Edge... inEdges) { + this.inEdges = Sets.newHashSet(inEdges); + return this; + } + + public Builder inEdges(@Nullable Set inEdges) { + this.inEdges = (inEdges == null) ? ImmutableSet.of() : inEdges; + return this; + } + + public Builder outEdges(@NonNull Edge... outEdges) { + this.outEdges = Sets.newHashSet(outEdges); + return this; + } + + public Builder outEdges(@Nullable Set outEdges) { + this.outEdges = (outEdges == null) ? ImmutableSet.of() : outEdges; + return this; + } + + public Node build() { + return new Node(id, type, data, inEdges, outEdges); + } + } +} diff --git a/clients/java/src/main/java/marquez/client/models/NodeData.java b/clients/java/src/main/java/marquez/client/models/NodeData.java new file mode 100644 index 0000000000..e660793654 --- /dev/null +++ b/clients/java/src/main/java/marquez/client/models/NodeData.java @@ -0,0 +1,16 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.client.models; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; + +@JsonTypeInfo( + use = JsonTypeInfo.Id.NAME, + include = JsonTypeInfo.As.EXTERNAL_PROPERTY, + property = "type") +@JsonSubTypes({@JsonSubTypes.Type(value = ColumnLineageNodeData.class, name = "DATASET_FIELD")}) +public interface NodeData {} diff --git a/clients/java/src/main/java/marquez/client/models/NodeId.java b/clients/java/src/main/java/marquez/client/models/NodeId.java new file mode 100644 index 0000000000..90630b5af5 --- /dev/null +++ b/clients/java/src/main/java/marquez/client/models/NodeId.java @@ -0,0 +1,140 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.client.models; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.fasterxml.jackson.databind.util.StdConverter; +import com.google.common.base.Joiner; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NonNull; +import lombok.ToString; + +@EqualsAndHashCode +@ToString +@JsonDeserialize(converter = NodeId.FromValue.class) +@JsonSerialize(converter = NodeId.ToValue.class) +public final class NodeId { + public static final String ID_DELIM = ":"; + public static final Joiner ID_JOINER = Joiner.on(ID_DELIM); + + private static final String ID_PREFX_DATASET = "dataset"; + private static final String ID_PREFX_DATASET_FIELD = "datasetField"; + private static final String ID_PREFX_JOB = "job"; + private static final String ID_PREFX_RUN = "run"; + private static final Pattern ID_PATTERN = + Pattern.compile( + String.format( + "^(%s|%s|%s|%s):.*$", + ID_PREFX_DATASET, ID_PREFX_DATASET_FIELD, ID_PREFX_JOB, ID_PREFX_RUN)); + + public static final String VERSION_DELIM = "#"; + + @Getter private final String value; + + public NodeId(final String value) { + checkArgument( + ID_PATTERN.matcher(value).matches(), + "node ID (%s) must start with '%s', '%s', '%s' or '%s'", + value, + ID_PREFX_DATASET, + ID_PREFX_DATASET_FIELD, + ID_PREFX_JOB, + ID_PREFX_RUN); + this.value = value; + } + + public static NodeId of(final String value) { + return new NodeId(value); + } + + public static NodeId of(@NonNull DatasetId datasetId) { + return of(ID_JOINER.join(ID_PREFX_DATASET, datasetId.getNamespace(), datasetId.getName())); + } + + public static NodeId of(@NonNull DatasetFieldId datasetFieldId) { + return of( + ID_JOINER.join( + ID_PREFX_DATASET_FIELD, + datasetFieldId.getNamespace(), + datasetFieldId.getDataset(), + datasetFieldId.getField())); + } + + @JsonIgnore + public boolean isDatasetFieldType() { + return value.startsWith(ID_PREFX_DATASET_FIELD); + } + + @JsonIgnore + public boolean isDatasetType() { + return value.startsWith(ID_PREFX_DATASET + ID_DELIM); + } + + @JsonIgnore + private String[] parts(int expectedParts, String expectedType) { + + // dead simple splitting by token- matches most ids + String[] parts = value.split(ID_DELIM + "|" + VERSION_DELIM); + if (parts.length < expectedParts) { + throw new UnsupportedOperationException( + String.format( + "Expected NodeId of type %s with %s parts. Got: %s", + expectedType, expectedParts, getValue())); + } else if (parts.length == expectedParts) { + return parts; + } else { + // try to avoid matching colons in URIs- e.g., scheme://authority and host:port patterns + Pattern p = Pattern.compile("(?:" + ID_DELIM + "(?!//|\\d+))"); + Matcher matcher = p.matcher(value); + String[] returnParts = new String[expectedParts]; + + int index; + int prevIndex = 0; + for (int i = 0; i < expectedParts - 1; i++) { + matcher.find(); + index = matcher.start(); + returnParts[i] = value.substring(prevIndex, index); + prevIndex = matcher.end(); + } + returnParts[expectedParts - 1] = value.substring(prevIndex); + + return returnParts; + } + } + + @JsonIgnore + public DatasetId asDatasetId() { + String[] parts = parts(3, ID_PREFX_DATASET); + return new DatasetId(parts[1], parts[2]); + } + + @JsonIgnore + public DatasetFieldId asDatasetFieldId() { + String[] parts = parts(4, ID_PREFX_DATASET); + return new DatasetFieldId(parts[1], parts[2], parts[3]); + } + + public static class FromValue extends StdConverter { + @Override + public NodeId convert(@NonNull String value) { + return NodeId.of(value); + } + } + + public static class ToValue extends StdConverter { + @Override + public String convert(@NonNull NodeId id) { + return id.getValue(); + } + } +} diff --git a/clients/java/src/main/java/marquez/client/models/NodeType.java b/clients/java/src/main/java/marquez/client/models/NodeType.java new file mode 100644 index 0000000000..35ba1ae317 --- /dev/null +++ b/clients/java/src/main/java/marquez/client/models/NodeType.java @@ -0,0 +1,13 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.client.models; + +public enum NodeType { + DATASET, + DATASET_FIELD, + JOB, + RUN; +} diff --git a/clients/java/src/test/java/marquez/client/MarquezClientTest.java b/clients/java/src/test/java/marquez/client/MarquezClientTest.java index 5898689261..8eef5f6270 100644 --- a/clients/java/src/test/java/marquez/client/MarquezClientTest.java +++ b/clients/java/src/test/java/marquez/client/MarquezClientTest.java @@ -68,12 +68,15 @@ import marquez.client.MarquezClient.Runs; import marquez.client.MarquezClient.Sources; import marquez.client.MarquezClient.Tags; +import marquez.client.models.ColumnLineageNodeData; import marquez.client.models.Dataset; +import marquez.client.models.DatasetFieldId; import marquez.client.models.DatasetId; import marquez.client.models.DatasetVersion; import marquez.client.models.DbTable; import marquez.client.models.DbTableMeta; import marquez.client.models.DbTableVersion; +import marquez.client.models.Edge; import marquez.client.models.Field; import marquez.client.models.Job; import marquez.client.models.JobId; @@ -83,6 +86,9 @@ import marquez.client.models.LineageEvent; import marquez.client.models.Namespace; import marquez.client.models.NamespaceMeta; +import marquez.client.models.Node; +import marquez.client.models.NodeId; +import marquez.client.models.NodeType; import marquez.client.models.Run; import marquez.client.models.RunMeta; import marquez.client.models.RunState; @@ -132,6 +138,7 @@ public class MarquezClientTest { private static final List FIELDS = newFields(4); private static final Set TAGS = newTagNames(4); private static final Map DB_FACETS = newDatasetFacets(4); + private static final String FIELD_NAME = "test_field"; private static final DbTable DB_TABLE = new DbTable( @@ -381,6 +388,8 @@ public class MarquezClientTest { STREAM_DESCRIPTION, CREATED_BY_RUN, DB_FACETS); + private static final DatasetFieldId DATASET_FIELD_ID = + new DatasetFieldId(NAMESPACE_NAME, DB_TABLE_NAME, FIELD_NAME); private final MarquezUrl marquezUrl = MarquezUrl.create(DEFAULT_BASE_URL); @Mock private MarquezHttp http; @@ -942,6 +951,92 @@ public void testCreateTag() throws Exception { assertThat(createdTag.getDescription()).isNotEmpty().contains("description"); } + @Test + public void testGetColumnLineage() throws Exception { + Node node = + new Node( + NodeId.of(DATASET_FIELD_ID), + NodeType.DATASET_FIELD, + new ColumnLineageNodeData( + NAMESPACE_NAME, + DB_TABLE_NAME, + FIELD_NAME, + "String", + "transformationDescription", + "transformationType", + Collections.singletonList( + new DatasetFieldId("namespace", "inDataset", "some-col1"))), + ImmutableSet.of( + Edge.of( + NodeId.of(DATASET_FIELD_ID), + NodeId.of(new DatasetFieldId("namespace", "inDataset", "some-col1")))), + ImmutableSet.of( + Edge.of( + NodeId.of(new DatasetFieldId("namespace", "outDataset", "some-col2")), + NodeId.of(DATASET_FIELD_ID)))); + MarquezClient.Lineage lineage = new MarquezClient.Lineage(ImmutableSet.of(node)); + String lineageJson = lineage.toJson(); + when(http.get( + buildUrlFor( + "/column-lineage?nodeId=dataset%3Anamespace%3Adataset&depth=20&withDownstream=false"))) + .thenReturn(lineageJson); + + Node retrievedNode = + client.getColumnLineage("namespace", "dataset").getGraph().stream().findAny().get(); + assertThat(retrievedNode.getId()).isEqualTo(node.getId()); + assertThat(retrievedNode.getData()).isEqualTo(node.getData()); + assertThat(retrievedNode.getInEdges().stream().findFirst()) + .isEqualTo(node.getInEdges().stream().findFirst()); + assertThat(retrievedNode.getOutEdges().stream().findFirst()) + .isEqualTo(node.getOutEdges().stream().findFirst()); + } + + @Test + public void testGetColumnLineageByField() throws Exception { + Node node = + new Node( + NodeId.of(DATASET_FIELD_ID), + NodeType.DATASET_FIELD, + new ColumnLineageNodeData( + NAMESPACE_NAME, + DB_TABLE_NAME, + FIELD_NAME, + "String", + "transformationDescription", + "transformationType", + Collections.singletonList( + new DatasetFieldId("namespace", "inDataset", "some-col1"))), + ImmutableSet.of( + Edge.of( + NodeId.of(DATASET_FIELD_ID), + NodeId.of(new DatasetFieldId("namespace", "inDataset", "some-col1")))), + ImmutableSet.of( + Edge.of( + NodeId.of(new DatasetFieldId("namespace", "outDataset", "some-col2")), + NodeId.of(DATASET_FIELD_ID)))); + MarquezClient.Lineage lineage = new MarquezClient.Lineage(ImmutableSet.of(node)); + String lineageJson = lineage.toJson(); + when(http.get( + buildUrlFor( + "/column-lineage?nodeId=datasetField%3Anamespace%3Adataset%3Asome-col1&depth=20&withDownstream=false"))) + .thenReturn(lineageJson); + + Node retrievedNode = + client.getColumnLineage("namespace", "dataset", "some-col1").getGraph().stream() + .findAny() + .get(); + assertThat(retrievedNode.getId()).isEqualTo(node.getId()); + assertThat(retrievedNode.getData()).isEqualTo(node.getData()); + assertThat(retrievedNode.getInEdges().stream().findFirst()) + .isEqualTo(node.getInEdges().stream().findFirst()); + assertThat(retrievedNode.getOutEdges().stream().findFirst()) + .isEqualTo(node.getOutEdges().stream().findFirst()); + } + + private URL buildUrlFor(String pathTemplate) throws Exception { + return new URL(DEFAULT_BASE_URL + BASE_PATH + pathTemplate); + } + private URL buildUrlFor(String pathTemplate, String... pathArgs) throws Exception { return new URL(DEFAULT_BASE_URL + BASE_PATH + String.format(pathTemplate, (Object[]) pathArgs)); } diff --git a/clients/java/src/test/java/marquez/client/MarquezPathV1Test.java b/clients/java/src/test/java/marquez/client/MarquezPathV1Test.java index f427a36041..392b1d2e9f 100644 --- a/clients/java/src/test/java/marquez/client/MarquezPathV1Test.java +++ b/clients/java/src/test/java/marquez/client/MarquezPathV1Test.java @@ -81,4 +81,9 @@ void testPath_noPlaceholders() { MarquezPathV1.path("/whatever/%s/next/%s"); }); } + + @Test + void testPath_columnLineage() { + Assertions.assertEquals("/api/v1/column-lineage", MarquezPathV1.columnLineagePath()); + } } diff --git a/clients/java/src/test/java/marquez/client/MarquezUrlTest.java b/clients/java/src/test/java/marquez/client/MarquezUrlTest.java index 86e58c9cdc..20889d1167 100644 --- a/clients/java/src/test/java/marquez/client/MarquezUrlTest.java +++ b/clients/java/src/test/java/marquez/client/MarquezUrlTest.java @@ -33,4 +33,15 @@ void testEncodedMarquezUrl() { Assertions.assertEquals( "http://marquez:5000/namespace/s3:%2F%2Fbucket/job/jname", url.toString()); } + + @Test + void testToColumnLineageUrl() { + Assertions.assertEquals( + "http://marquez:5000/api/v1/column-lineage?nodeId=dataset%3Anamespace%3Adataset&depth=20&withDownstream=true", + marquezUrl.toColumnLineageUrl("namespace", "dataset", 20, true).toString()); + + Assertions.assertEquals( + "http://marquez:5000/api/v1/column-lineage?nodeId=datasetField%3Anamespace%3Adataset%3Afield&depth=20&withDownstream=true", + marquezUrl.toColumnLineageUrl("namespace", "dataset", "field", 20, true).toString()); + } } diff --git a/clients/java/src/test/java/marquez/client/models/NodeIdTest.java b/clients/java/src/test/java/marquez/client/models/NodeIdTest.java new file mode 100644 index 0000000000..3c67fb644a --- /dev/null +++ b/clients/java/src/test/java/marquez/client/models/NodeIdTest.java @@ -0,0 +1,50 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.client.models; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +public class NodeIdTest { + + @ParameterizedTest(name = "testDataset-{index} {argumentsWithNames}") + @CsvSource( + value = { + "my-namespace$my-dataset", + "gs://bucket$/path/to/data", + "postgresql://hostname:5432/database$my_table", + "my-namespace$my_struct" + }, + delimiter = '$') + public void testDataset(String namespace, String dataset) { + NodeId nodeId = NodeId.of(new DatasetId(namespace, dataset)); + assertTrue(nodeId.isDatasetType()); + assertFalse(nodeId.isDatasetFieldType()); + assertEquals(namespace, nodeId.asDatasetId().getNamespace()); + assertEquals(dataset, nodeId.asDatasetId().getName()); + } + + @ParameterizedTest(name = "testDatasetField-{index} {argumentsWithNames}") + @CsvSource( + value = { + "my-namespace$my-dataset$colA", + "gs://bucket$/path/to/data$colA", + "gs://bucket$/path/to/data$col_A" + }, + delimiter = '$') + public void testDatasetField(String namespace, String dataset, String field) { + NodeId nodeId = NodeId.of(new DatasetFieldId(namespace, dataset, field)); + assertFalse(nodeId.isDatasetType()); + assertTrue(nodeId.isDatasetFieldType()); + assertEquals(namespace, nodeId.asDatasetFieldId().getNamespace()); + assertEquals(dataset, nodeId.asDatasetFieldId().getDataset()); + assertEquals(field, nodeId.asDatasetFieldId().getField()); + } +}