diff --git a/sdk/python/feast/pyspark/abc.py b/sdk/python/feast/pyspark/abc.py
index 64bf877ffb..d3935ff65b 100644
--- a/sdk/python/feast/pyspark/abc.py
+++ b/sdk/python/feast/pyspark/abc.py
@@ -348,7 +348,12 @@ def get_arguments(self) -> List[str]:
args.extend(["--statsd", json.dumps(self._get_statsd_config())])
if self._deadletter_path:
- args.extend(["--deadletter-path", self._deadletter_path])
+ args.extend(
+ [
+ "--deadletter-path",
+ os.path.join(self._deadletter_path, self.get_feature_table_name()),
+ ]
+ )
if self._stencil_url:
args.extend(["--stencil-url", self._stencil_url])
diff --git a/sdk/python/feast/pyspark/launchers/gcloud/dataproc.py b/sdk/python/feast/pyspark/launchers/gcloud/dataproc.py
index ea4085c549..1dfce4ce44 100644
--- a/sdk/python/feast/pyspark/launchers/gcloud/dataproc.py
+++ b/sdk/python/feast/pyspark/launchers/gcloud/dataproc.py
@@ -245,6 +245,7 @@ def dataproc_submit(
"jar_file_uris": [main_file_uri] + self.EXTERNAL_JARS,
"main_class": job_params.get_class_name(),
"args": job_params.get_arguments(),
+ "properties": {"spark.yarn.user.classpath.first": "true"},
}
}
)
diff --git a/spark/ingestion/pom.xml b/spark/ingestion/pom.xml
index b67c333bd5..d78984837c 100644
--- a/spark/ingestion/pom.xml
+++ b/spark/ingestion/pom.xml
@@ -68,6 +68,7 @@
org.scala-lang
scala-library
${scala.fullVersion}
+ provided
diff --git a/spark/ingestion/src/main/scala/feast/ingestion/BatchPipeline.scala b/spark/ingestion/src/main/scala/feast/ingestion/BatchPipeline.scala
index c7019a5cd1..b3badc173b 100644
--- a/spark/ingestion/src/main/scala/feast/ingestion/BatchPipeline.scala
+++ b/spark/ingestion/src/main/scala/feast/ingestion/BatchPipeline.scala
@@ -16,10 +16,13 @@
*/
package feast.ingestion
+import java.nio.file.Paths
+
import feast.ingestion.sources.bq.BigQueryReader
import feast.ingestion.sources.file.FileReader
import feast.ingestion.validation.{RowValidator, TypeCheck}
-import org.apache.spark.sql.{Column, SparkSession}
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.{Column, SaveMode, SparkSession}
import org.apache.spark.sql.functions.col
/**
@@ -79,7 +82,8 @@ object BatchPipeline extends BasePipeline {
.filter(!validator.checkAll)
.write
.format("parquet")
- .save(path)
+ .mode(SaveMode.Append)
+ .save(Paths.get(path, SparkEnv.get.conf.getAppId).toString)
case _ => None
}
diff --git a/spark/ingestion/src/main/scala/feast/ingestion/StreamingPipeline.scala b/spark/ingestion/src/main/scala/feast/ingestion/StreamingPipeline.scala
index 07fd20f2c1..6f2d68b9a8 100644
--- a/spark/ingestion/src/main/scala/feast/ingestion/StreamingPipeline.scala
+++ b/spark/ingestion/src/main/scala/feast/ingestion/StreamingPipeline.scala
@@ -16,11 +16,14 @@
*/
package feast.ingestion
+import java.nio.file.Paths
+
import feast.ingestion.registry.proto.ProtoRegistryFactory
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
import org.apache.spark.sql.functions.udf
import feast.ingestion.utils.ProtoReflection
import feast.ingestion.validation.{RowValidator, TypeCheck}
+import org.apache.spark.SparkEnv
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.avro._
@@ -94,7 +97,7 @@ object StreamingPipeline extends BasePipeline with Serializable {
.write
.format("parquet")
.mode(SaveMode.Append)
- .save(path)
+ .save(Paths.get(path, SparkEnv.get.conf.getAppId).toString)
case _ =>
batchDF
.filter(!validator.checkAll)
diff --git a/spark/ingestion/src/test/scala/feast/ingestion/BatchPipelineIT.scala b/spark/ingestion/src/test/scala/feast/ingestion/BatchPipelineIT.scala
index 2e49c95b1f..6ccfe9ee34 100644
--- a/spark/ingestion/src/test/scala/feast/ingestion/BatchPipelineIT.scala
+++ b/spark/ingestion/src/test/scala/feast/ingestion/BatchPipelineIT.scala
@@ -16,6 +16,8 @@
*/
package feast.ingestion
+import java.nio.file.Paths
+
import collection.JavaConverters._
import com.dimafeng.testcontainers.{ForAllTestContainer, GenericContainer}
import feast.proto.types.ValueProto.ValueType
@@ -210,7 +212,11 @@ class BatchPipelineIT extends SparkSpec with ForAllTestContainer {
jedis.keys("*").toArray should be(empty)
sparkSession.read
- .parquet(deadletterConfig.deadLetterPath.get)
+ .parquet(
+ Paths
+ .get(deadletterConfig.deadLetterPath.get, sparkSession.conf.get("spark.app.id"))
+ .toString
+ )
.count() should be(rows.length)
}
diff --git a/spark/ingestion/src/test/scala/feast/ingestion/SparkSpec.scala b/spark/ingestion/src/test/scala/feast/ingestion/SparkSpec.scala
index e0238c1be8..7d2b153c38 100644
--- a/spark/ingestion/src/test/scala/feast/ingestion/SparkSpec.scala
+++ b/spark/ingestion/src/test/scala/feast/ingestion/SparkSpec.scala
@@ -29,6 +29,16 @@ class SparkSpec extends UnitSpec with BeforeAndAfter {
.setMaster("local[4]")
.setAppName("Testing")
.set("spark.default.parallelism", "8")
+ .set(
+ "spark.metrics.conf.*.source.redis.class",
+ "org.apache.spark.metrics.source.RedisSinkMetricSource"
+ )
+ .set(
+ "spark.metrics.conf.*.sink.statsd.class",
+ "org.apache.spark.metrics.sink.StatsdSinkWithTags"
+ )
+ .set("spark.metrics.conf.*.sink.statsd.host", "localhost")
+ .set("spark.metrics.conf.*.sink.statsd.port", "8125")
sparkSession = SparkSession
.builder()
diff --git a/spark/ingestion/src/test/scala/feast/ingestion/StreamingPipelineIT.scala b/spark/ingestion/src/test/scala/feast/ingestion/StreamingPipelineIT.scala
index 1bea232548..39c44ad55a 100644
--- a/spark/ingestion/src/test/scala/feast/ingestion/StreamingPipelineIT.scala
+++ b/spark/ingestion/src/test/scala/feast/ingestion/StreamingPipelineIT.scala
@@ -16,6 +16,7 @@
*/
package feast.ingestion
+import java.nio.file.Paths
import java.util.Properties
import com.dimafeng.testcontainers.{
@@ -165,7 +166,11 @@ class StreamingPipelineIT extends SparkSpec with ForAllTestContainer {
query.processAllAvailable()
sparkSession.read
- .parquet(configWithDeadletter.deadLetterPath.get)
+ .parquet(
+ Paths
+ .get(configWithDeadletter.deadLetterPath.get, sparkSession.conf.get("spark.app.id"))
+ .toString
+ )
.count() should be(2 * rows.length)
}