Skip to content

Commit

Permalink
Add feature table name & job id to deadletter destination (#1143)
Browse files Browse the repository at this point in the history
* add feature table name & app id to deadletter destination

Signed-off-by: Oleksii Moskalenko <[email protected]>

* scala library provided

Signed-off-by: Oleksii Moskalenko <[email protected]>
  • Loading branch information
pyalex authored Nov 6, 2020
1 parent b073285 commit 32ddf19
Show file tree
Hide file tree
Showing 8 changed files with 41 additions and 6 deletions.
7 changes: 6 additions & 1 deletion sdk/python/feast/pyspark/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,12 @@ def get_arguments(self) -> List[str]:
args.extend(["--statsd", json.dumps(self._get_statsd_config())])

if self._deadletter_path:
args.extend(["--deadletter-path", self._deadletter_path])
args.extend(
[
"--deadletter-path",
os.path.join(self._deadletter_path, self.get_feature_table_name()),
]
)

if self._stencil_url:
args.extend(["--stencil-url", self._stencil_url])
Expand Down
1 change: 1 addition & 0 deletions sdk/python/feast/pyspark/launchers/gcloud/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ def dataproc_submit(
"jar_file_uris": [main_file_uri] + self.EXTERNAL_JARS,
"main_class": job_params.get_class_name(),
"args": job_params.get_arguments(),
"properties": {"spark.yarn.user.classpath.first": "true"},
}
}
)
Expand Down
1 change: 1 addition & 0 deletions spark/ingestion/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.fullVersion}</version>
<scope>provided</scope>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
*/
package feast.ingestion

import java.nio.file.Paths

import feast.ingestion.sources.bq.BigQueryReader
import feast.ingestion.sources.file.FileReader
import feast.ingestion.validation.{RowValidator, TypeCheck}
import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.SparkEnv
import org.apache.spark.sql.{Column, SaveMode, SparkSession}
import org.apache.spark.sql.functions.col

/**
Expand Down Expand Up @@ -79,7 +82,8 @@ object BatchPipeline extends BasePipeline {
.filter(!validator.checkAll)
.write
.format("parquet")
.save(path)
.mode(SaveMode.Append)
.save(Paths.get(path, SparkEnv.get.conf.getAppId).toString)
case _ => None
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
*/
package feast.ingestion

import java.nio.file.Paths

import feast.ingestion.registry.proto.ProtoRegistryFactory
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
import org.apache.spark.sql.functions.udf
import feast.ingestion.utils.ProtoReflection
import feast.ingestion.validation.{RowValidator, TypeCheck}
import org.apache.spark.SparkEnv
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.avro._

Expand Down Expand Up @@ -94,7 +97,7 @@ object StreamingPipeline extends BasePipeline with Serializable {
.write
.format("parquet")
.mode(SaveMode.Append)
.save(path)
.save(Paths.get(path, SparkEnv.get.conf.getAppId).toString)
case _ =>
batchDF
.filter(!validator.checkAll)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package feast.ingestion

import java.nio.file.Paths

import collection.JavaConverters._
import com.dimafeng.testcontainers.{ForAllTestContainer, GenericContainer}
import feast.proto.types.ValueProto.ValueType
Expand Down Expand Up @@ -210,7 +212,11 @@ class BatchPipelineIT extends SparkSpec with ForAllTestContainer {
jedis.keys("*").toArray should be(empty)

sparkSession.read
.parquet(deadletterConfig.deadLetterPath.get)
.parquet(
Paths
.get(deadletterConfig.deadLetterPath.get, sparkSession.conf.get("spark.app.id"))
.toString
)
.count() should be(rows.length)
}

Expand Down
10 changes: 10 additions & 0 deletions spark/ingestion/src/test/scala/feast/ingestion/SparkSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@ class SparkSpec extends UnitSpec with BeforeAndAfter {
.setMaster("local[4]")
.setAppName("Testing")
.set("spark.default.parallelism", "8")
.set(
"spark.metrics.conf.*.source.redis.class",
"org.apache.spark.metrics.source.RedisSinkMetricSource"
)
.set(
"spark.metrics.conf.*.sink.statsd.class",
"org.apache.spark.metrics.sink.StatsdSinkWithTags"
)
.set("spark.metrics.conf.*.sink.statsd.host", "localhost")
.set("spark.metrics.conf.*.sink.statsd.port", "8125")

sparkSession = SparkSession
.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package feast.ingestion

import java.nio.file.Paths
import java.util.Properties

import com.dimafeng.testcontainers.{
Expand Down Expand Up @@ -165,7 +166,11 @@ class StreamingPipelineIT extends SparkSpec with ForAllTestContainer {
query.processAllAvailable()

sparkSession.read
.parquet(configWithDeadletter.deadLetterPath.get)
.parquet(
Paths
.get(configWithDeadletter.deadLetterPath.get, sparkSession.conf.get("spark.app.id"))
.toString
)
.count() should be(2 * rows.length)
}

Expand Down

0 comments on commit 32ddf19

Please sign in to comment.