Skip to content

Commit

Permalink
[Integration][Spark] Support adding arbitrary parameters to OpenLinea…
Browse files Browse the repository at this point in the history
…ge URL (apache#425)

* Support Arbitrary Parameters in Lineage URL

Supports extracting parameters from config: spark.openlineage.url
Supports extracting parameters from config: spark.openlineage.url.param.xxx

Users can now pass along additional query / url parameters to the
openlineage url that is used when emitting lineage. This is useful for
passing additional parameters necessary for a non-marquez destination
of OpenLineage metadata.

If using spark.openlineage.url.param.xxx, xxx represents the name of
the url parameter you want to include in the lineage url. Any
configuration variable passed in beginning with spark.openlineage.url.param.
will be used as a url parameter in the lineage url.

These config settings will ignore api_key if it is specified as a url.param
to avoid conflicting with the spark.openlineage.apiKey config.

Signed-off-by: Will Johnson <[email protected]>

* Adding more idiomatic Java syntax

Signed-off-by: Will Johnson <[email protected]>

* Even more idiomatic java 8 syntax

Signed-off-by: Will Johnson <[email protected]>

* [spark] support read/write to kafka (apache#387)

* adding initial test for spark kafka integration (apache#279)

* adding initial test for spark kafka integration

Signed-off-by: tomassatka <[email protected]>

* [spark] Resolves: apache#280 kafka support

Signed-off-by: olek <[email protected]>

* [spark] Resolves: apache#280 kafka support

Signed-off-by: olek <[email protected]>

* [spark] Resolves: apache#280 remove debug line

Signed-off-by: olek <[email protected]>

* [spark] Resolves: apache#280 javadoc

Signed-off-by: olek <[email protected]>

* [spark] Resolves: apache#280 review

Signed-off-by: olek <[email protected]>

* [spark] Resolves: apache#280 review

Signed-off-by: olek <[email protected]>

* Moved KafkaWriter handling into KafkaRelationVisitor and added support for assign in kafka consumer parsing

Signed-off-by: Michael Collado <[email protected]>

* Add check for hive classes to avoid NoClassDefFoundErrors

Signed-off-by: Michael Collado <[email protected]>

* Fix build for kafka integration (apache#411)

* Fix integration tests step to run on raw machine to enable docker tests

Signed-off-by: Michael Collado <[email protected]>

* Updated kafka tests to use common version and common reference to kafka package

Signed-off-by: Michael Collado <[email protected]>

* Added reset() call to mock server to avoid results across tests

Signed-off-by: Michael Collado <[email protected]>

* Added String only constructor to HttpError

Signed-off-by: Michael Collado <[email protected]>

* Changed build to copy dependencies and use downloaded jars in Spark containers

Signed-off-by: Michael Collado <[email protected]>

* Fix spark integration test build to use explicit 3.1.2 version

Signed-off-by: Michael Collado <[email protected]>

Co-authored-by: Tomas Satka <[email protected]>
Co-authored-by: Michael Collado <[email protected]>
Co-authored-by: Michael Collado <[email protected]>
Signed-off-by: Will Johnson <[email protected]>

* dbt: support dbt build command (apache#398)

Signed-off-by: Maciej Obuchowski <[email protected]>
Signed-off-by: Will Johnson <[email protected]>

* great expectations: pin version to the one supported by airflow operator (apache#420)

Signed-off-by: Maciej Obuchowski <[email protected]>
Signed-off-by: Will Johnson <[email protected]>

* [SPARK] adding output metrics (apache#361)

* [spark] Resolves: apache#304 adding task metrics collector

Signed-off-by: olek <[email protected]>

* [spark] Resolves: apache#304 remove OutputDatasetWithMetadataVisitor.java and deprecated "OutputStatisticsFacet.java"

Signed-off-by: olek <[email protected]>

* [spark] Resolves: apache#304 add entry in CHANGELOG.md

Signed-off-by: olek <[email protected]>

* [spark] Resolves: apache#304 add test and fix test

Signed-off-by: olek <[email protected]>

* [spark] Resolves: apache#304 fix test

Signed-off-by: olek <[email protected]>

* [spark] Resolves: apache#304 review comments

Signed-off-by: olek <[email protected]>

* [spark] Resolves: apache#304 review comments

Signed-off-by: olek <[email protected]>

* Fix references to JobMetricsHolder to use singleton, default metric values to null if not present

Signed-off-by: Michael Collado <[email protected]>

Co-authored-by: Michael Collado <[email protected]>
Signed-off-by: Will Johnson <[email protected]>

* dbt: filter non-test nodes while processing assertions (apache#422)

Signed-off-by: Maciej Obuchowski <[email protected]>
Signed-off-by: Will Johnson <[email protected]>

* Updating OpenLineage spark integration README with new param

Signed-off-by: Will Johnson <[email protected]>

* Fixing spotless checks

Signed-off-by: Will Johnson <[email protected]>

Co-authored-by: OleksandrDvornik <[email protected]>
Co-authored-by: Tomas Satka <[email protected]>
Co-authored-by: Michael Collado <[email protected]>
Co-authored-by: Michael Collado <[email protected]>
Co-authored-by: Maciej Obuchowski <[email protected]>
  • Loading branch information
6 people authored Dec 7, 2021
1 parent 055df6a commit 41dfc82
Show file tree
Hide file tree
Showing 7 changed files with 274 additions and 159 deletions.
307 changes: 154 additions & 153 deletions integration/spark/README.md
Original file line number Diff line number Diff line change
@@ -1,153 +1,154 @@
# OpenLineage Spark Agent

The OpenLineage Spark Agent uses jvm instrumentation to emit OpenLineage metadata.

## Installation

Maven:

```xml
<dependency>
<groupId>io.openlineage</groupId>
<artifactId>openlineage-spark</artifactId>
<version>0.3.1</version>
</dependency>
```

or Gradle:

```groovy
implementation 'io.openlineage:openlineage-spark:0.3.1'
```

## Getting started

### Quickstart
The fastest way to get started testing Spark and OpenLineage is to use the docker-compose files included
in the project. From the spark integration directory ($OPENLINEAGE_ROOT/integration/spark) execute
```bash
docker-compose up
```
This will start Marquez as an Openlineage client and Jupyter Spark notebook on localhost:8888. On startup, the notebook container logs will show a list of URLs
including an access token, such as
```bash
notebook_1 | To access the notebook, open this file in a browser:
notebook_1 | file:///home/jovyan/.local/share/jupyter/runtime/nbserver-9-open.html
notebook_1 | Or copy and paste one of these URLs:
notebook_1 | http://abc12345d6e:8888/?token=XXXXXX
notebook_1 | or http://127.0.0.1:8888/?token=XXXXXX
```
Copy the URL with the localhost IP and paste into your browser window to begin creating a new Jupyter
Spark notebook (see the [https://jupyter-docker-stacks.readthedocs.io/en/latest/](docs) for info on
using the Jupyter docker image).

# OpenLineageSparkListener as a plain Spark Listener
The SparkListener can be referenced as a plain Spark Listener implementation.

Create a new notebook and paste the following into the first cell:
```python
from pyspark.sql import SparkSession

spark = (SparkSession.builder.master('local')
.appName('sample_spark')
.config('spark.jars.packages', 'io.openlineage:openlineage-spark:0.3.1')
.config('spark.extraListeners', 'io.openlineage.spark.agent.OpenLineageSparkListener')
.config('spark.openlineage.url', 'http://{openlineage.client.host}/api/v1/namespaces/spark_integration/')
.getOrCreate())
```
To use the local jar, you can build it with
```bash
gradle shadowJar
```
then reference it in the Jupyter notebook with the following (note that the jar should be built
*before* running the `docker-compose up` step or docker will just mount a dummy folder; once the
`build/libs` directory exists, you can repeatedly build the jar without restarting the jupyter
container):
```python
from pyspark.sql import SparkSession

file = "/home/jovyan/openlineage/libs/openlineage-spark-0.3.1.jar"

spark = (SparkSession.builder.master('local').appName('rdd_to_dataframe')
.config('spark.jars', file)
.config('spark.jars.packages', 'org.postgresql:postgresql:42.2.+')
.config('spark.extraListeners', 'io.openlineage.spark.agent.OpenLineageSparkListener')
.config('spark.openlineage.url', 'http://{openlineage.client.host}/api/v1/namespaces/spark_integration/')
.getOrCreate())
```

# OpenLineageSparkListener as a java agent
Configuring SparkListener as a java agent that needs to be added to
the JVM startup parameters. Setup in a pyspark notebook looks like the following:

```python
from pyspark.sql import SparkSession

file = "/home/jovyan/openlineage/libs/openlineage-spark-0.3.1.jar"

spark = (SparkSession.builder.master('local').appName('rdd_to_dataframe')
.config('spark.driver.extraJavaOptions',
f"-javaagent:{file}=http://{openlineage.client.host}/api/v1/namespaces/spark_integration/")
.config('spark.jars.packages', 'org.postgresql:postgresql:42.2.+')
.config('spark.sql.repl.eagerEval.enabled', 'true')
.getOrCreate())
```
When running on a real cluster, the openlineage-spark jar has to be in a known location on the master
node of the cluster and its location referenced in the `spark.driver.extraJavaOptions` parameter.
## Arguments

### Spark Listener
The SparkListener reads its configuration from SparkConf parameters. These can be specified on the
command line (e.g., `--conf "spark.openlineage.url=http://{openlineage.client.host}/api/v1/namespaces/my_namespace/job/the_job"`)
or from the `conf/spark-defaults.conf` file.

The following parameters can be specified
| Parameter | Definition | Example |
------------|------------|---------
| spark.openlineage.host | The hostname of the OpenLineage API server where events should be reported | http://localhost:5000 |
| spark.openlineage.version | The API version of the OpenLineage API server | 1|
| spark.openlineage.namespace | The default namespace to be applied for any jobs submitted | MyNamespace|
| spark.openlineage.parentJobName | The job name to be used for the parent job facet | ParentJobName |
| spark.openlineage.parentRunId | The RunId of the parent job that initiated this Spark job | xxxx-xxxx-xxxx-xxxx |
| spark.openlineage.apiKey | An API key to be used when sending events to the OpenLineage server | abcdefghijk |

### Java Agent
The java agent accepts an argument in the form of a uri. It includes the location of OpenLineage client, the
namespace name, the parent job name, and a parent run id. The run id will be emitted as a parent run
facet.
```
{openlineage.client.host}/api/v1/namespaces/{namespace}/job/{job_name}/runs/{run_uuid}?api_key={api_key}"
```
For example:
```
https://openlineage.client.host/api/v1/namespaces/foo/job/spark.submit_job/runs/a95858ad-f9b5-46d7-8f1c-ca9f58f68978"
```

# Build

## Java 8

Testing requires a Java 8 JVM to test the scala spark components.

`export JAVA_HOME=`/usr/libexec/java_home -v 1.8`

## Testing

To run the tests, from the current directory run:

```sh
./gradlew test
```

To run the integration tests, from the current directory run:

```sh
./gradlew integrationTest
```

## Build spark agent jar

```sh
./gradlew shadowJar
```
# OpenLineage Spark Agent

The OpenLineage Spark Agent uses jvm instrumentation to emit OpenLineage metadata.

## Installation

Maven:

```xml
<dependency>
<groupId>io.openlineage</groupId>
<artifactId>openlineage-spark</artifactId>
<version>0.3.1</version>
</dependency>
```

or Gradle:

```groovy
implementation 'io.openlineage:openlineage-spark:0.3.1'
```

## Getting started

### Quickstart
The fastest way to get started testing Spark and OpenLineage is to use the docker-compose files included
in the project. From the spark integration directory ($OPENLINEAGE_ROOT/integration/spark) execute
```bash
docker-compose up
```
This will start Marquez as an Openlineage client and Jupyter Spark notebook on localhost:8888. On startup, the notebook container logs will show a list of URLs
including an access token, such as
```bash
notebook_1 | To access the notebook, open this file in a browser:
notebook_1 | file:///home/jovyan/.local/share/jupyter/runtime/nbserver-9-open.html
notebook_1 | Or copy and paste one of these URLs:
notebook_1 | http://abc12345d6e:8888/?token=XXXXXX
notebook_1 | or http://127.0.0.1:8888/?token=XXXXXX
```
Copy the URL with the localhost IP and paste into your browser window to begin creating a new Jupyter
Spark notebook (see the [https://jupyter-docker-stacks.readthedocs.io/en/latest/](docs) for info on
using the Jupyter docker image).

# OpenLineageSparkListener as a plain Spark Listener
The SparkListener can be referenced as a plain Spark Listener implementation.

Create a new notebook and paste the following into the first cell:
```python
from pyspark.sql import SparkSession

spark = (SparkSession.builder.master('local')
.appName('sample_spark')
.config('spark.jars.packages', 'io.openlineage:openlineage-spark:0.3.1')
.config('spark.extraListeners', 'io.openlineage.spark.agent.OpenLineageSparkListener')
.config('spark.openlineage.url', 'http://{openlineage.client.host}/api/v1/namespaces/spark_integration/')
.getOrCreate())
```
To use the local jar, you can build it with
```bash
gradle shadowJar
```
then reference it in the Jupyter notebook with the following (note that the jar should be built
*before* running the `docker-compose up` step or docker will just mount a dummy folder; once the
`build/libs` directory exists, you can repeatedly build the jar without restarting the jupyter
container):
```python
from pyspark.sql import SparkSession

file = "/home/jovyan/openlineage/libs/openlineage-spark-0.3.1.jar"

spark = (SparkSession.builder.master('local').appName('rdd_to_dataframe')
.config('spark.jars', file)
.config('spark.jars.packages', 'org.postgresql:postgresql:42.2.+')
.config('spark.extraListeners', 'io.openlineage.spark.agent.OpenLineageSparkListener')
.config('spark.openlineage.url', 'http://{openlineage.client.host}/api/v1/namespaces/spark_integration/')
.getOrCreate())
```

# OpenLineageSparkListener as a java agent
Configuring SparkListener as a java agent that needs to be added to
the JVM startup parameters. Setup in a pyspark notebook looks like the following:

```python
from pyspark.sql import SparkSession

file = "/home/jovyan/openlineage/libs/openlineage-spark-0.3.1.jar"

spark = (SparkSession.builder.master('local').appName('rdd_to_dataframe')
.config('spark.driver.extraJavaOptions',
f"-javaagent:{file}=http://{openlineage.client.host}/api/v1/namespaces/spark_integration/")
.config('spark.jars.packages', 'org.postgresql:postgresql:42.2.+')
.config('spark.sql.repl.eagerEval.enabled', 'true')
.getOrCreate())
```
When running on a real cluster, the openlineage-spark jar has to be in a known location on the master
node of the cluster and its location referenced in the `spark.driver.extraJavaOptions` parameter.
## Arguments

### Spark Listener
The SparkListener reads its configuration from SparkConf parameters. These can be specified on the
command line (e.g., `--conf "spark.openlineage.url=http://{openlineage.client.host}/api/v1/namespaces/my_namespace/job/the_job"`)
or from the `conf/spark-defaults.conf` file.

The following parameters can be specified
| Parameter | Definition | Example |
------------|------------|---------
| spark.openlineage.host | The hostname of the OpenLineage API server where events should be reported | http://localhost:5000 |
| spark.openlineage.version | The API version of the OpenLineage API server | 1|
| spark.openlineage.namespace | The default namespace to be applied for any jobs submitted | MyNamespace|
| spark.openlineage.parentJobName | The job name to be used for the parent job facet | ParentJobName |
| spark.openlineage.parentRunId | The RunId of the parent job that initiated this Spark job | xxxx-xxxx-xxxx-xxxx |
| spark.openlineage.apiKey | An API key to be used when sending events to the OpenLineage server | abcdefghijk |
| spark.openlineage.url.param.xyz | A url parameter (replace xyz) and value to be included in requests to the OpenLineage API server | abcdefghijk |

### Java Agent
The java agent accepts an argument in the form of a uri. It includes the location of OpenLineage client, the
namespace name, the parent job name, and a parent run id. The run id will be emitted as a parent run
facet.
```
{openlineage.client.host}/api/v1/namespaces/{namespace}/job/{job_name}/runs/{run_uuid}?api_key={api_key}"
```
For example:
```
https://openlineage.client.host/api/v1/namespaces/foo/job/spark.submit_job/runs/a95858ad-f9b5-46d7-8f1c-ca9f58f68978"
```

# Build

## Java 8

Testing requires a Java 8 JVM to test the scala spark components.

`export JAVA_HOME=`/usr/libexec/java_home -v 1.8`

## Testing

To run the tests, from the current directory run:

```sh
./gradlew test
```

To run the integration tests, from the current directory run:

```sh
./gradlew integrationTest
```

## Build spark agent jar

```sh
./gradlew shadowJar
```
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import lombok.AllArgsConstructor;
Expand All @@ -27,6 +29,7 @@ public class ArgumentParser {
private final String jobName;
private final String parentRunId;
private final Optional<String> apiKey;
private final Optional<Map<String, String>> urlParams;

public static ArgumentParser parse(String clientUrl) {
URI uri = URI.create(clientUrl);
Expand All @@ -41,12 +44,13 @@ public static ArgumentParser parse(String clientUrl) {

List<NameValuePair> nameValuePairList = URLEncodedUtils.parse(uri, StandardCharsets.UTF_8);
Optional<String> apiKey = getApiKey(nameValuePairList);
Optional<Map<String, String>> urlParams = getUrlParams(nameValuePairList);

log.info(
String.format(
"%s/api/%s/namespaces/%s/jobs/%s/runs/%s", host, version, namespace, jobName, runId));

return new ArgumentParser(host, version, namespace, jobName, runId, apiKey);
return new ArgumentParser(host, version, namespace, jobName, runId, apiKey, urlParams);
}

public static UUID getRandomUuid() {
Expand All @@ -58,6 +62,23 @@ private static Optional<String> getApiKey(List<NameValuePair> nameValuePairList)
.filter(StringUtils::isNoneBlank);
}

public String getUrlParam(String urlParamName) {
String param = null;
if (urlParams.isPresent()) {
param = urlParams.get().get(urlParamName);
}
return param;
}

private static Optional<Map<String, String>> getUrlParams(List<NameValuePair> nameValuePairList) {
final Map<String, String> urlParams = new HashMap<String, String>();
nameValuePairList.stream()
.filter(pair -> !(pair.getName().equals("api_key")))
.forEach(pair -> urlParams.put(pair.getName(), pair.getValue()));

return urlParams.isEmpty() ? Optional.empty() : Optional.ofNullable(urlParams);
}

protected static String getNamedParameter(List<NameValuePair> nameValuePairList, String param) {
for (NameValuePair nameValuePair : nameValuePairList) {
if (nameValuePair.getName().equalsIgnoreCase(param)) {
Expand All @@ -78,6 +99,7 @@ private static String get(String[] elements, String name, int index, String defa
}

private static ArgumentParser getDefaultArguments() {
return new ArgumentParser("", "v1", "default", "default", null, Optional.empty());
return new ArgumentParser(
"", "v1", "default", "default", null, Optional.empty(), Optional.empty());
}
}
Loading

0 comments on commit 41dfc82

Please sign in to comment.