diff --git a/integration/spark/README.md b/integration/spark/README.md index 47faba202657f..efd0431d73366 100644 --- a/integration/spark/README.md +++ b/integration/spark/README.md @@ -1,153 +1,154 @@ -# OpenLineage Spark Agent - -The OpenLineage Spark Agent uses jvm instrumentation to emit OpenLineage metadata. - -## Installation - -Maven: - -```xml - - io.openlineage - openlineage-spark - 0.3.1 - -``` - -or Gradle: - -```groovy -implementation 'io.openlineage:openlineage-spark:0.3.1' -``` - -## Getting started - -### Quickstart -The fastest way to get started testing Spark and OpenLineage is to use the docker-compose files included -in the project. From the spark integration directory ($OPENLINEAGE_ROOT/integration/spark) execute -```bash -docker-compose up -``` -This will start Marquez as an Openlineage client and Jupyter Spark notebook on localhost:8888. On startup, the notebook container logs will show a list of URLs -including an access token, such as -```bash -notebook_1 | To access the notebook, open this file in a browser: -notebook_1 | file:///home/jovyan/.local/share/jupyter/runtime/nbserver-9-open.html -notebook_1 | Or copy and paste one of these URLs: -notebook_1 | http://abc12345d6e:8888/?token=XXXXXX -notebook_1 | or http://127.0.0.1:8888/?token=XXXXXX -``` -Copy the URL with the localhost IP and paste into your browser window to begin creating a new Jupyter -Spark notebook (see the [https://jupyter-docker-stacks.readthedocs.io/en/latest/](docs) for info on -using the Jupyter docker image). - -# OpenLineageSparkListener as a plain Spark Listener -The SparkListener can be referenced as a plain Spark Listener implementation. - -Create a new notebook and paste the following into the first cell: -```python -from pyspark.sql import SparkSession - -spark = (SparkSession.builder.master('local') - .appName('sample_spark') - .config('spark.jars.packages', 'io.openlineage:openlineage-spark:0.3.1') - .config('spark.extraListeners', 'io.openlineage.spark.agent.OpenLineageSparkListener') - .config('spark.openlineage.url', 'http://{openlineage.client.host}/api/v1/namespaces/spark_integration/') - .getOrCreate()) -``` -To use the local jar, you can build it with -```bash -gradle shadowJar -``` -then reference it in the Jupyter notebook with the following (note that the jar should be built -*before* running the `docker-compose up` step or docker will just mount a dummy folder; once the -`build/libs` directory exists, you can repeatedly build the jar without restarting the jupyter -container): -```python -from pyspark.sql import SparkSession - -file = "/home/jovyan/openlineage/libs/openlineage-spark-0.3.1.jar" - -spark = (SparkSession.builder.master('local').appName('rdd_to_dataframe') - .config('spark.jars', file) - .config('spark.jars.packages', 'org.postgresql:postgresql:42.2.+') - .config('spark.extraListeners', 'io.openlineage.spark.agent.OpenLineageSparkListener') - .config('spark.openlineage.url', 'http://{openlineage.client.host}/api/v1/namespaces/spark_integration/') - .getOrCreate()) -``` - -# OpenLineageSparkListener as a java agent -Configuring SparkListener as a java agent that needs to be added to -the JVM startup parameters. Setup in a pyspark notebook looks like the following: - -```python -from pyspark.sql import SparkSession - -file = "/home/jovyan/openlineage/libs/openlineage-spark-0.3.1.jar" - -spark = (SparkSession.builder.master('local').appName('rdd_to_dataframe') - .config('spark.driver.extraJavaOptions', - f"-javaagent:{file}=http://{openlineage.client.host}/api/v1/namespaces/spark_integration/") - .config('spark.jars.packages', 'org.postgresql:postgresql:42.2.+') - .config('spark.sql.repl.eagerEval.enabled', 'true') - .getOrCreate()) -``` -When running on a real cluster, the openlineage-spark jar has to be in a known location on the master -node of the cluster and its location referenced in the `spark.driver.extraJavaOptions` parameter. -## Arguments - -### Spark Listener -The SparkListener reads its configuration from SparkConf parameters. These can be specified on the -command line (e.g., `--conf "spark.openlineage.url=http://{openlineage.client.host}/api/v1/namespaces/my_namespace/job/the_job"`) -or from the `conf/spark-defaults.conf` file. - -The following parameters can be specified -| Parameter | Definition | Example | -------------|------------|--------- -| spark.openlineage.host | The hostname of the OpenLineage API server where events should be reported | http://localhost:5000 | -| spark.openlineage.version | The API version of the OpenLineage API server | 1| -| spark.openlineage.namespace | The default namespace to be applied for any jobs submitted | MyNamespace| -| spark.openlineage.parentJobName | The job name to be used for the parent job facet | ParentJobName | -| spark.openlineage.parentRunId | The RunId of the parent job that initiated this Spark job | xxxx-xxxx-xxxx-xxxx | -| spark.openlineage.apiKey | An API key to be used when sending events to the OpenLineage server | abcdefghijk | - -### Java Agent -The java agent accepts an argument in the form of a uri. It includes the location of OpenLineage client, the -namespace name, the parent job name, and a parent run id. The run id will be emitted as a parent run -facet. -``` -{openlineage.client.host}/api/v1/namespaces/{namespace}/job/{job_name}/runs/{run_uuid}?api_key={api_key}" - -``` -For example: -``` -https://openlineage.client.host/api/v1/namespaces/foo/job/spark.submit_job/runs/a95858ad-f9b5-46d7-8f1c-ca9f58f68978" -``` - -# Build - -## Java 8 - -Testing requires a Java 8 JVM to test the scala spark components. - -`export JAVA_HOME=`/usr/libexec/java_home -v 1.8` - -## Testing - -To run the tests, from the current directory run: - -```sh -./gradlew test -``` - -To run the integration tests, from the current directory run: - -```sh -./gradlew integrationTest -``` - -## Build spark agent jar - -```sh -./gradlew shadowJar -``` +# OpenLineage Spark Agent + +The OpenLineage Spark Agent uses jvm instrumentation to emit OpenLineage metadata. + +## Installation + +Maven: + +```xml + + io.openlineage + openlineage-spark + 0.3.1 + +``` + +or Gradle: + +```groovy +implementation 'io.openlineage:openlineage-spark:0.3.1' +``` + +## Getting started + +### Quickstart +The fastest way to get started testing Spark and OpenLineage is to use the docker-compose files included +in the project. From the spark integration directory ($OPENLINEAGE_ROOT/integration/spark) execute +```bash +docker-compose up +``` +This will start Marquez as an Openlineage client and Jupyter Spark notebook on localhost:8888. On startup, the notebook container logs will show a list of URLs +including an access token, such as +```bash +notebook_1 | To access the notebook, open this file in a browser: +notebook_1 | file:///home/jovyan/.local/share/jupyter/runtime/nbserver-9-open.html +notebook_1 | Or copy and paste one of these URLs: +notebook_1 | http://abc12345d6e:8888/?token=XXXXXX +notebook_1 | or http://127.0.0.1:8888/?token=XXXXXX +``` +Copy the URL with the localhost IP and paste into your browser window to begin creating a new Jupyter +Spark notebook (see the [https://jupyter-docker-stacks.readthedocs.io/en/latest/](docs) for info on +using the Jupyter docker image). + +# OpenLineageSparkListener as a plain Spark Listener +The SparkListener can be referenced as a plain Spark Listener implementation. + +Create a new notebook and paste the following into the first cell: +```python +from pyspark.sql import SparkSession + +spark = (SparkSession.builder.master('local') + .appName('sample_spark') + .config('spark.jars.packages', 'io.openlineage:openlineage-spark:0.3.1') + .config('spark.extraListeners', 'io.openlineage.spark.agent.OpenLineageSparkListener') + .config('spark.openlineage.url', 'http://{openlineage.client.host}/api/v1/namespaces/spark_integration/') + .getOrCreate()) +``` +To use the local jar, you can build it with +```bash +gradle shadowJar +``` +then reference it in the Jupyter notebook with the following (note that the jar should be built +*before* running the `docker-compose up` step or docker will just mount a dummy folder; once the +`build/libs` directory exists, you can repeatedly build the jar without restarting the jupyter +container): +```python +from pyspark.sql import SparkSession + +file = "/home/jovyan/openlineage/libs/openlineage-spark-0.3.1.jar" + +spark = (SparkSession.builder.master('local').appName('rdd_to_dataframe') + .config('spark.jars', file) + .config('spark.jars.packages', 'org.postgresql:postgresql:42.2.+') + .config('spark.extraListeners', 'io.openlineage.spark.agent.OpenLineageSparkListener') + .config('spark.openlineage.url', 'http://{openlineage.client.host}/api/v1/namespaces/spark_integration/') + .getOrCreate()) +``` + +# OpenLineageSparkListener as a java agent +Configuring SparkListener as a java agent that needs to be added to +the JVM startup parameters. Setup in a pyspark notebook looks like the following: + +```python +from pyspark.sql import SparkSession + +file = "/home/jovyan/openlineage/libs/openlineage-spark-0.3.1.jar" + +spark = (SparkSession.builder.master('local').appName('rdd_to_dataframe') + .config('spark.driver.extraJavaOptions', + f"-javaagent:{file}=http://{openlineage.client.host}/api/v1/namespaces/spark_integration/") + .config('spark.jars.packages', 'org.postgresql:postgresql:42.2.+') + .config('spark.sql.repl.eagerEval.enabled', 'true') + .getOrCreate()) +``` +When running on a real cluster, the openlineage-spark jar has to be in a known location on the master +node of the cluster and its location referenced in the `spark.driver.extraJavaOptions` parameter. +## Arguments + +### Spark Listener +The SparkListener reads its configuration from SparkConf parameters. These can be specified on the +command line (e.g., `--conf "spark.openlineage.url=http://{openlineage.client.host}/api/v1/namespaces/my_namespace/job/the_job"`) +or from the `conf/spark-defaults.conf` file. + +The following parameters can be specified +| Parameter | Definition | Example | +------------|------------|--------- +| spark.openlineage.host | The hostname of the OpenLineage API server where events should be reported | http://localhost:5000 | +| spark.openlineage.version | The API version of the OpenLineage API server | 1| +| spark.openlineage.namespace | The default namespace to be applied for any jobs submitted | MyNamespace| +| spark.openlineage.parentJobName | The job name to be used for the parent job facet | ParentJobName | +| spark.openlineage.parentRunId | The RunId of the parent job that initiated this Spark job | xxxx-xxxx-xxxx-xxxx | +| spark.openlineage.apiKey | An API key to be used when sending events to the OpenLineage server | abcdefghijk | +| spark.openlineage.url.param.xyz | A url parameter (replace xyz) and value to be included in requests to the OpenLineage API server | abcdefghijk | + +### Java Agent +The java agent accepts an argument in the form of a uri. It includes the location of OpenLineage client, the +namespace name, the parent job name, and a parent run id. The run id will be emitted as a parent run +facet. +``` +{openlineage.client.host}/api/v1/namespaces/{namespace}/job/{job_name}/runs/{run_uuid}?api_key={api_key}" + +``` +For example: +``` +https://openlineage.client.host/api/v1/namespaces/foo/job/spark.submit_job/runs/a95858ad-f9b5-46d7-8f1c-ca9f58f68978" +``` + +# Build + +## Java 8 + +Testing requires a Java 8 JVM to test the scala spark components. + +`export JAVA_HOME=`/usr/libexec/java_home -v 1.8` + +## Testing + +To run the tests, from the current directory run: + +```sh +./gradlew test +``` + +To run the integration tests, from the current directory run: + +```sh +./gradlew integrationTest +``` + +## Build spark agent jar + +```sh +./gradlew shadowJar +``` diff --git a/integration/spark/src/main/common/java/io/openlineage/spark/agent/ArgumentParser.java b/integration/spark/src/main/common/java/io/openlineage/spark/agent/ArgumentParser.java index 4b78abc92ca45..b7fa5aa3d8fd1 100644 --- a/integration/spark/src/main/common/java/io/openlineage/spark/agent/ArgumentParser.java +++ b/integration/spark/src/main/common/java/io/openlineage/spark/agent/ArgumentParser.java @@ -3,7 +3,9 @@ import java.net.URI; import java.nio.charset.StandardCharsets; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.UUID; import lombok.AllArgsConstructor; @@ -27,6 +29,7 @@ public class ArgumentParser { private final String jobName; private final String parentRunId; private final Optional apiKey; + private final Optional> urlParams; public static ArgumentParser parse(String clientUrl) { URI uri = URI.create(clientUrl); @@ -41,12 +44,13 @@ public static ArgumentParser parse(String clientUrl) { List nameValuePairList = URLEncodedUtils.parse(uri, StandardCharsets.UTF_8); Optional apiKey = getApiKey(nameValuePairList); + Optional> urlParams = getUrlParams(nameValuePairList); log.info( String.format( "%s/api/%s/namespaces/%s/jobs/%s/runs/%s", host, version, namespace, jobName, runId)); - return new ArgumentParser(host, version, namespace, jobName, runId, apiKey); + return new ArgumentParser(host, version, namespace, jobName, runId, apiKey, urlParams); } public static UUID getRandomUuid() { @@ -58,6 +62,23 @@ private static Optional getApiKey(List nameValuePairList) .filter(StringUtils::isNoneBlank); } + public String getUrlParam(String urlParamName) { + String param = null; + if (urlParams.isPresent()) { + param = urlParams.get().get(urlParamName); + } + return param; + } + + private static Optional> getUrlParams(List nameValuePairList) { + final Map urlParams = new HashMap(); + nameValuePairList.stream() + .filter(pair -> !(pair.getName().equals("api_key"))) + .forEach(pair -> urlParams.put(pair.getName(), pair.getValue())); + + return urlParams.isEmpty() ? Optional.empty() : Optional.ofNullable(urlParams); + } + protected static String getNamedParameter(List nameValuePairList, String param) { for (NameValuePair nameValuePair : nameValuePairList) { if (nameValuePair.getName().equalsIgnoreCase(param)) { @@ -78,6 +99,7 @@ private static String get(String[] elements, String name, int index, String defa } private static ArgumentParser getDefaultArguments() { - return new ArgumentParser("", "v1", "default", "default", null, Optional.empty()); + return new ArgumentParser( + "", "v1", "default", "default", null, Optional.empty(), Optional.empty()); } } diff --git a/integration/spark/src/main/common/java/io/openlineage/spark/agent/OpenLineageContext.java b/integration/spark/src/main/common/java/io/openlineage/spark/agent/OpenLineageContext.java index 2e95fd5dca062..1a4f295484d14 100644 --- a/integration/spark/src/main/common/java/io/openlineage/spark/agent/OpenLineageContext.java +++ b/integration/spark/src/main/common/java/io/openlineage/spark/agent/OpenLineageContext.java @@ -8,7 +8,9 @@ import io.openlineage.spark.agent.client.ResponseMessage; import java.net.URI; import java.net.URISyntaxException; +import java.util.Map; import java.util.Optional; +import java.util.StringJoiner; import java.util.UUID; import java.util.concurrent.ForkJoinPool; import lombok.Getter; @@ -26,8 +28,23 @@ public class OpenLineageContext { public OpenLineageContext(ArgumentParser argument) throws URISyntaxException { this.client = OpenLineageClient.create(argument.getApiKey(), ForkJoinPool.commonPool()); + // Extract url parameters other than api_key to append to lineageURI + String queryParams = null; + if (argument.getUrlParams().isPresent()) { + Map urlParams = argument.getUrlParams().get(); + + StringJoiner query = new StringJoiner("&"); + urlParams.forEach((k, v) -> query.add(k + "=" + v)); + + queryParams = query.toString(); + } + + // Convert host to a URI to extract scheme and authority + URI hostURI = new URI(argument.getHost()); + String uriPath = String.format("/api/%s/lineage", argument.getVersion()); + this.lineageURI = - new URI(String.format("%s/api/%s/lineage", argument.getHost(), argument.getVersion())); + new URI(hostURI.getScheme(), hostURI.getAuthority(), uriPath, queryParams, null); this.jobNamespace = argument.getNamespace(); this.parentJobName = argument.getJobName(); this.parentRunId = convertToUUID(argument.getParentRunId()); diff --git a/integration/spark/src/main/common/java/io/openlineage/spark/agent/OpenLineageSparkListener.java b/integration/spark/src/main/common/java/io/openlineage/spark/agent/OpenLineageSparkListener.java index 67e4b481baa05..45db7103486af 100644 --- a/integration/spark/src/main/common/java/io/openlineage/spark/agent/OpenLineageSparkListener.java +++ b/integration/spark/src/main/common/java/io/openlineage/spark/agent/OpenLineageSparkListener.java @@ -2,6 +2,7 @@ import static io.openlineage.spark.agent.ArgumentParser.DEFAULTS; import static io.openlineage.spark.agent.util.SparkConfUtils.findSparkConfigKey; +import static io.openlineage.spark.agent.util.SparkConfUtils.findSparkUrlParams; import io.openlineage.client.OpenLineage; import io.openlineage.spark.agent.client.OpenLineageClient; @@ -54,6 +55,7 @@ public class OpenLineageSparkListener extends org.apache.spark.scheduler.SparkLi public static final String SPARK_CONF_JOB_NAME_KEY = "openlineage.parentJobName"; public static final String SPARK_CONF_PARENT_RUN_ID_KEY = "openlineage.parentRunId"; public static final String SPARK_CONF_API_KEY = "openlineage.apiKey"; + public static final String SPARK_CONF_URL_PARAM_PREFIX = "openlineage.url.param"; private static WeakHashMap, Configuration> outputs = new WeakHashMap<>(); private static ContextFactory contextFactory; private static JobMetricsHolder jobMetrics = JobMetricsHolder.getInstance(); @@ -287,7 +289,9 @@ private ArgumentParser parseConf(SparkConf conf) { findSparkConfigKey(conf, SPARK_CONF_PARENT_RUN_ID_KEY, DEFAULTS.getParentRunId()); Optional apiKey = findSparkConfigKey(conf, SPARK_CONF_API_KEY).filter(str -> !str.isEmpty()); - return new ArgumentParser(host, version, namespace, jobName, runId, apiKey); + Optional> urlParams = + findSparkUrlParams(conf, SPARK_CONF_URL_PARAM_PREFIX); + return new ArgumentParser(host, version, namespace, jobName, runId, apiKey, urlParams); } } } diff --git a/integration/spark/src/main/common/java/io/openlineage/spark/agent/util/SparkConfUtils.java b/integration/spark/src/main/common/java/io/openlineage/spark/agent/util/SparkConfUtils.java index 5c8aaa1559da7..e68c428945f86 100644 --- a/integration/spark/src/main/common/java/io/openlineage/spark/agent/util/SparkConfUtils.java +++ b/integration/spark/src/main/common/java/io/openlineage/spark/agent/util/SparkConfUtils.java @@ -1,5 +1,7 @@ package io.openlineage.spark.agent.util; +import java.util.HashMap; +import java.util.Map; import java.util.Optional; import org.apache.spark.SparkConf; @@ -13,4 +15,15 @@ public static Optional findSparkConfigKey(SparkConf conf, String name) { conf.getOption(name) .getOrElse(ScalaConversionUtils.toScalaFn(() -> conf.getOption("spark." + name)))); } + + public static Optional> findSparkUrlParams(SparkConf conf, String prefix) { + Map urlParams = new HashMap(); + scala.Tuple2[] urlConfigs = conf.getAllWithPrefix("spark." + prefix + "."); + for (scala.Tuple2 param : urlConfigs) { + urlParams.put(param._1, param._2); + System.out.println(param._1); + } + + return Optional.ofNullable(urlParams); + } } diff --git a/integration/spark/src/test/common/java/io/openlineage/spark/agent/ArgumentParserTest.java b/integration/spark/src/test/common/java/io/openlineage/spark/agent/ArgumentParserTest.java index 831b10f70b655..a9153689ca809 100644 --- a/integration/spark/src/test/common/java/io/openlineage/spark/agent/ArgumentParserTest.java +++ b/integration/spark/src/test/common/java/io/openlineage/spark/agent/ArgumentParserTest.java @@ -5,7 +5,9 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; @@ -22,7 +24,8 @@ public static Collection data() { "job_name", "ea445b5c-22eb-457a-8007-01c7c52b6e54", false, - Optional.of("abc") + Optional.of("abc"), + Optional.empty() }); pass.add( new Object[] { @@ -33,6 +36,7 @@ public static Collection data() { "job_name", "ea445b5c-22eb-457a-8007-01c7c52b6e54", false, + Optional.empty(), Optional.empty() }); pass.add( @@ -44,6 +48,7 @@ public static Collection data() { "job_name", "ea445b5c-22eb-457a-8007-01c7c52b6e54", false, + Optional.empty(), Optional.empty() }); pass.add( @@ -55,8 +60,43 @@ public static Collection data() { "job_name", null, true, + Optional.empty(), Optional.empty() }); + pass.add( + new Object[] { + "http://localhost:5000/api/v1/namespaces/ns_name/jobs/job_name/runs/ea445b5c-22eb-457a-8007-01c7c52b6e54?api_key=abc&myParam=xyz", + "http://localhost:5000", + "v1", + "ns_name", + "job_name", + "ea445b5c-22eb-457a-8007-01c7c52b6e54", + false, + Optional.of("abc"), + Optional.of( + new HashMap() { + { + put("myParam", "xyz"); + } + }) + }); + pass.add( + new Object[] { + "http://localhost:5000/api/v1/namespaces/ns_name/jobs/job_name/runs/ea445b5c-22eb-457a-8007-01c7c52b6e54?api_key=&myParam=xyz", + "http://localhost:5000", + "v1", + "ns_name", + "job_name", + "ea445b5c-22eb-457a-8007-01c7c52b6e54", + false, + Optional.empty(), + Optional.of( + new HashMap() { + { + put("myParam", "xyz"); + } + }) + }); return pass; } @@ -70,7 +110,8 @@ public void testArgument( String jobName, String runId, boolean defaultRunId, - Optional apiKey) { + Optional apiKey, + Optional> urlParams) { ArgumentParser parser = ArgumentParser.parse(input); assertEquals(host, parser.getHost()); assertEquals(version, parser.getVersion()); @@ -82,5 +123,11 @@ public void testArgument( assertEquals(runId, parser.getParentRunId()); } assertEquals(apiKey, parser.getApiKey()); + assertEquals(urlParams, parser.getUrlParams()); + if (urlParams.isPresent()) { + urlParams + .get() + .forEach((k, v) -> assertEquals(urlParams.get().get(k), parser.getUrlParam(k))); + } } } diff --git a/integration/spark/src/test/common/java/io/openlineage/spark/agent/OpenLineageContextTest.java b/integration/spark/src/test/common/java/io/openlineage/spark/agent/OpenLineageContextTest.java index 86f810ac87d70..20d20c041ca80 100644 --- a/integration/spark/src/test/common/java/io/openlineage/spark/agent/OpenLineageContextTest.java +++ b/integration/spark/src/test/common/java/io/openlineage/spark/agent/OpenLineageContextTest.java @@ -17,4 +17,15 @@ public void testLineageUri() throws URISyntaxException { + "ea445b5c-22eb-457a-8007-01c7c52b6e54?api_key=abc")); assertEquals(URI.create("https://localhost:5000/api/v1/lineage"), ctx.getLineageURI()); } + + @Test + public void testLineageUriWithExtraParams() throws URISyntaxException { + OpenLineageContext ctx = + new OpenLineageContext( + ArgumentParser.parse( + "https://localhost:5000/api/v1/namespaces/ns_name/jobs/job_name/runs/" + + "ea445b5c-22eb-457a-8007-01c7c52b6e54?api_key=abc&code=123&foo=bar")); + assertEquals( + URI.create("https://localhost:5000/api/v1/lineage?code=123&foo=bar"), ctx.getLineageURI()); + } }