Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incompatibility between enums and Spark SQL #87

Closed
danvk opened this issue Feb 17, 2016 · 8 comments
Closed

Incompatibility between enums and Spark SQL #87

danvk opened this issue Feb 17, 2016 · 8 comments

Comments

@danvk
Copy link

danvk commented Feb 17, 2016

Spark SQL attempts to infer the schema of your data using reflection. This works for case classes. ScalaPB messages are case classes, so I'd hoped this would just work for my collection of protos.

It's close. Schema discovery seems to work fine unless my message contains enums.

Here's some code:

case class MiniRide(pickupTime: Option[Int], totalAmount: Option[Float], paymentType: Option[Payment])

val ridesAsCaseClasses = ridesAsProtos.map(ride => MiniRide(ride.pickupTime, ride.totalAmount, ride.paymentType))

ridesAsCaseClasses.toDF().registerTempTable("rides")
val priceyRides = sqlContext.sql("SELECT COUNT(*) FROM rides WHERE totalAmount > 200").collect().foreach(println)

I'm working with the NYC Taxi data. See the full Rides proto. Payment is an enum. If I drop that field from the case class, this code works. If I include it, I get the following runtime error:

Exception in thread "main" scala.ScalaReflectionException: <none> is not a term
    at scala.reflect.api.Symbols$SymbolApi$class.asTerm(Symbols.scala:259)
    at scala.reflect.internal.Symbols$SymbolContextApiImpl.asTerm(Symbols.scala:73)
    at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:682)
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30)
    at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:659)
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:693)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:691)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:691)
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30)
    at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:630)
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30)
    at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:414)
    at org.apache.spark.sql.SQLImplicits.rddToDataFrameHolder(SQLImplicits.scala:94)
    at com.sidewalklabs.TlcConverter$.main(TlcConverter.scala:49)
    at com.sidewalklabs.TlcConverter.main(TlcConverter.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

I'm honestly not sure if this is more an issue for SparkSQL or for ScalaPB, but it would be nice if I could use SparkSQL with my protos!

@danvk
Copy link
Author

danvk commented Feb 17, 2016

Maybe relevant? https://issues.apache.org/jira/browse/SPARK-2449

@thesamet
Copy link
Contributor

Maybe if the enum we generate would have been annotated with UserDefinedType like explained here
http://stackoverflow.com/questions/32440461/how-to-define-schema-for-custom-type-in-spark-sql

this would work. Maybe try to edit the generated code directly based on the example at SO and see if it helps? Then we can see if this can be fixed without needing special support from SparkSQL.

@danvk
Copy link
Author

danvk commented Feb 18, 2016

Sorry, I think that's beyond my Scala abilities. I'm happy to provide a more detailed repro if it helps.

@thesamet
Copy link
Contributor

Yes, a small repo I can fork that can help me reproduce this problem would be great. Out of curiosity, where/how do you store the input protocol buffers?

@danvk
Copy link
Author

danvk commented Feb 18, 2016

@thesamet I put together a minimal-ish repro here: https://github.com/danvk/scalapb-repro/

The code in that repro builds & runs successfully. If you uncomment the lines which use a message with an enum, however, you'll get the <none> is not a term error.

@thesamet
Copy link
Contributor

I have good news! I have added SparkSQL support for ScalaPB. See docs here: http://trueaccord.github.io/ScalaPB/sparksql.html

@zzztimbo
Copy link

zzztimbo commented Nov 3, 2017

@thesamet how did you resolve the enum issue with Spark SQL?

@thesamet
Copy link
Contributor

thesamet commented Nov 3, 2017

It's been a while - sorry. If there's a specific issue you're encountering please let me know.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants