-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-24811][SQL]Avro: add new function from_avro and to_avro #21774
Conversation
Test build #93032 has finished for PR 21774 at commit
|
fd7ad1d
to
204a59d
Compare
This is ready for review. @cloud-fan |
Test build #93230 has finished for PR 21774 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these functions supposed to be used also from textual SQL, Python, etc.?
import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, UnaryExpression} | ||
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback | ||
import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would add ExpressionDescription
and javadoc here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not a function expression like the ones in SQL core, so ExpressionDescription
can't apply here. I think we can leave it for now.
|
||
override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType) | ||
|
||
override lazy val dataType: DataType = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@transient
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the dataType
is needed in executor side to build AvroDeserializer
, it's better to serialize it instead of recomputing it at executor side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the schema could be quite complex, I am wondering about sending the same schema to executors twice (dateType
, avroType
). But yeah, transmission of an extra payload and deserialization of dataType
might be faster then the schema conversion.
import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType} | ||
|
||
case class AvroDataToCatalyst(child: Expression, avroType: SerializableSchema) | ||
extends UnaryExpression with CodegenFallback with ExpectsInputTypes { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see the comment about CodegenFallback
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point. Since the implementation is short, I think it should be easy to codegen it.
|
||
case class CatalystDataToAvro(child: Expression) extends UnaryExpression with CodegenFallback { | ||
|
||
override lazy val dataType: DataType = BinaryType |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just def
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
import org.apache.spark.sql.catalyst.expressions.{Expression, UnaryExpression} | ||
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback | ||
import org.apache.spark.sql.types.{BinaryType, DataType} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ExpressionDescription
and javadoc?
@@ -36,4 +40,27 @@ package object avro { | |||
@scala.annotation.varargs | |||
def avro(sources: String*): DataFrame = reader.format("avro").load(sources: _*) | |||
} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why these two functions are not a part of org.apache.spark.sql.functions
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because avro data source is an external package like kafka data source. It's not available in org.apache.spark.sql.functions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, having a function depending on external package in org.apache.spark.sql.functions
would be weird.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks guys for your explanation!
""".stripMargin | ||
|
||
// When read float data as double, avro reader fails(trying to read 8 bytes while the data have | ||
// only 4 bytes), `AvroDataToCatalyst` catches the exception and returns null. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove AvroDataToCatalyst catches the exception and returns null.
Since Spark doesn't have a persistent UDF API like Hive UDF, I think this is the best we can do now. In the future we should migrate this to UDF API so that we can register it with a name and use it in SQL. |
|
||
override def sql: String = simpleString | ||
|
||
override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried complicated code like
nullSafeCodeGen(ctx, ev, binary => {
val avroReader = ctx.addReferenceObj("reader", reader)
val avroDeserializer = ctx.addReferenceObj("deserializer", deserializer)
val avroDecoder = ctx.addMutableState(classOf[BinaryDecoder].getName, "avroDecoder")
val catalystResult = ctx.addMutableState(classOf[Any].getName, "catalystResult")
val decoderFactory = classOf[DecoderFactory].getName
s"""
$avroDecoder = $decoderFactory.get().binaryDecoder($binary, $avroDecoder);
try {
$catalystResult = $avroReader.read($catalystResult, $avroDecoder);
} catch (java.io.IOException e) {
org.apache.spark.unsafe.Platform.throwException(e);
}
${ev.value} = (${CodeGenerator.boxedType(dataType)})
$avroDeserializer.deserialize($catalystResult);
"""
})
But eventually find it hard to avoid serializing some objects.
override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { | ||
val expr = ctx.addReferenceObj("this", this) | ||
defineCodeGen(ctx, ev, input => | ||
s"(${CodeGenerator.boxedType(dataType)})$expr.nullSafeEval($input)") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we know output is binary type, we can just write (byte[])
val avroTypeLong = new Schema.Parser().parse( | ||
s""" | ||
|{ | ||
| "type": "long", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the test name says int and string?
LGTM |
Test build #93288 has finished for PR 21774 at commit
|
Test build #93292 has finished for PR 21774 at commit
|
} | ||
|
||
override def simpleString: String = { | ||
s"from_avro(${child.sql}, ${dataType.simpleString})" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we use catalogString for datatype?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC simpleString
will be used in the plan string and should not be too long.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but this is being used in sql
though. Do we prefer truncated string form in sql
too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not used in sql
, we can override sql
here and use the untruncated version.
} | ||
|
||
override def simpleString: String = { | ||
s"to_avro(${child.sql}, ${child.dataType.simpleString})" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto for catalogString
* @param avroType the avro type. | ||
*/ | ||
@Experimental | ||
def from_avro(data: Column, avroType: Schema): Column = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we just expose String version (json) in case we expose it in SQL syntax (also R or Python maybe)? I am sure we won't want to have many variants in the future as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Column
can support arbitrary expressions. Personally I don't like these string-based overloads...
Since it's in an external module, I think we can only add python/R/SQL version when we have a persistent UDF API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, I was just worried if we happened to have many overloaded version of this again like json function family. We happened to have many overloaded versions of it, which seems an actual concern (just roughly given my few interaction with other guys like Reynold).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah sorry i thought you are talking about the data
parameter.
Yes, for avroType
parameter, we should have a string version
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am actually +1 with @HyukjinKwon .
As the Avro schema require serialization, I decide to make the API accepts JSON format string only. And then parse the string schema in executor side.
* Converts a binary column of avro format into its corresponding catalyst value. The specified | ||
* schema must match the read data, otherwise the behavior is undefined: it may fail or return | ||
* arbitrary result. | ||
* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we add @since
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM too otherwise
I will create another separate PR to totally remove SerializableSchema. |
Test build #93331 has finished for PR 21774 at commit
|
Test build #93328 has finished for PR 21774 at commit
|
retest this please |
Test build #93345 has finished for PR 21774 at commit
|
Thanks! Merged to master. |
Need to revert this PR since it breaks the build. https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Compile/job/spark-master-compile-maven-hadoop-2.6/7902/ |
## What changes were proposed in this pull request? 1. Add a new function from_avro for parsing a binary column of avro format and converting it into its corresponding catalyst value. 2. Add a new function to_avro for converting a column into binary of avro format with the specified schema. I created #21774 for this, but it failed the build https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Compile/job/spark-master-compile-maven-hadoop-2.6/7902/ Additional changes In this PR: 1. Add `scalacheck` dependency in pom.xml to resolve the failure. 2. Update the `log4j.properties` to make it consistent with other modules. ## How was this patch tested? Unit test Compile with different commands: ``` ./build/mvn --force -DzincPort=3643 -DskipTests -Phadoop-2.6 -Phive-thriftserver -Pkinesis-asl -Pspark-ganglia-lgpl -Pmesos -Pyarn compile test-compile ./build/mvn --force -DzincPort=3643 -DskipTests -Phadoop-2.7 -Phive-thriftserver -Pkinesis-asl -Pspark-ganglia-lgpl -Pmesos -Pyarn compile test-compile ./build/mvn --force -DzincPort=3643 -DskipTests -Phadoop-3.1 -Phive-thriftserver -Pkinesis-asl -Pspark-ganglia-lgpl -Pmesos -Pyarn compile test-compile ``` Author: Gengliang Wang <[email protected]> Closes #21838 from gengliangwang/from_and_to_avro.
1. Add a new function from_avro for parsing a binary column of avro format and converting it into its corresponding catalyst value. 2. Add a new function to_avro for converting a column into binary of avro format with the specified schema. I created apache#21774 for this, but it failed the build https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Compile/job/spark-master-compile-maven-hadoop-2.6/7902/ Additional changes In this PR: 1. Add `scalacheck` dependency in pom.xml to resolve the failure. 2. Update the `log4j.properties` to make it consistent with other modules. Unit test Compile with different commands: ``` ./build/mvn --force -DzincPort=3643 -DskipTests -Phadoop-2.6 -Phive-thriftserver -Pkinesis-asl -Pspark-ganglia-lgpl -Pmesos -Pyarn compile test-compile ./build/mvn --force -DzincPort=3643 -DskipTests -Phadoop-2.7 -Phive-thriftserver -Pkinesis-asl -Pspark-ganglia-lgpl -Pmesos -Pyarn compile test-compile ./build/mvn --force -DzincPort=3643 -DskipTests -Phadoop-3.1 -Phive-thriftserver -Pkinesis-asl -Pspark-ganglia-lgpl -Pmesos -Pyarn compile test-compile ``` Author: Gengliang Wang <[email protected]> Closes apache#21838 from gengliangwang/from_and_to_avro. (cherry picked from commit 8817c68)
What changes were proposed in this pull request?
Add a new function from_avro for parsing a binary column of avro format and converting it into its corresponding catalyst value.
Add a new function to_avro for converting a column into binary of avro format with the specified schema.
How was this patch tested?
Unit test