-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-40654][SQL] Protobuf support for Spark - from_protobuf AND to_protobuf #37972
Changes from 44 commits
419dd78
870dbbf
077ac9e
d605bc5
1ea816a
ac94e14
d2710c6
c7fca1c
4c9bd74
7d60f9e
0435dd7
85884f7
0e0a4d5
5d57331
c8ac0d4
6aeb274
63d648d
d24f873
9185cc8
8a2f493
75a4e5f
c86c5d8
be82d92
849213e
adbfaf1
a418407
f7ca9e7
a65c131
e0c22d2
780d119
e22b763
e1fde67
1901cd4
121630c
e8a7c2b
6cfad63
4b824bd
2d2a822
7cdf9dd
a90800c
6f447e8
7aa678c
c6d45b7
58151dc
03d791c
d7c40ac
330d01e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<!-- | ||
~ Licensed to the Apache Software Foundation (ASF) under one or more | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am planning to work on Python support. @mposdev21 @SandishKumarHN let me know if you have already changes for it, otherwise, I can get started. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @rangadi we already have the changes ready. should we create a separate PR now? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, please go ahead with the PR if you can. We can start reviewing it even as this PR is being merged. |
||
~ contributor license agreements. See the NOTICE file distributed with | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about schema-registry support? I can look into that. Let me know @mposdev21, @SandishKumarHN There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, Raghu. Go ahead. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, I will take that. |
||
~ this work for additional information regarding copyright ownership. | ||
~ The ASF licenses this file to You under the Apache License, Version 2.0 | ||
~ (the "License"); you may not use this file except in compliance with | ||
~ the License. You may obtain a copy of the License at | ||
~ | ||
~ http://www.apache.org/licenses/LICENSE-2.0 | ||
~ | ||
~ Unless required by applicable law or agreed to in writing, software | ||
~ distributed under the License is distributed on an "AS IS" BASIS, | ||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
~ See the License for the specific language governing permissions and | ||
~ limitations under the License. | ||
--> | ||
|
||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
<modelVersion>4.0.0</modelVersion> | ||
<parent> | ||
<groupId>org.apache.spark</groupId> | ||
<artifactId>spark-parent_2.12</artifactId> | ||
<version>3.4.0-SNAPSHOT</version> | ||
<relativePath>../../pom.xml</relativePath> | ||
</parent> | ||
|
||
<artifactId>spark-protobuf_2.12</artifactId> | ||
<properties> | ||
<sbt.project.name>protobuf</sbt.project.name> | ||
<protobuf.version>3.21.1</protobuf.version> | ||
</properties> | ||
<packaging>jar</packaging> | ||
<name>Spark Protobuf</name> | ||
<url>https://spark.apache.org/</url> | ||
|
||
<dependencies> | ||
<dependency> | ||
<groupId>org.apache.spark</groupId> | ||
<artifactId>spark-sql_${scala.binary.version}</artifactId> | ||
<version>${project.version}</version> | ||
<scope>provided</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.spark</groupId> | ||
<artifactId>spark-core_${scala.binary.version}</artifactId> | ||
<version>${project.version}</version> | ||
<type>test-jar</type> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.spark</groupId> | ||
<artifactId>spark-catalyst_${scala.binary.version}</artifactId> | ||
<version>${project.version}</version> | ||
<type>test-jar</type> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.spark</groupId> | ||
<artifactId>spark-sql_${scala.binary.version}</artifactId> | ||
<version>${project.version}</version> | ||
<type>test-jar</type> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.scalacheck</groupId> | ||
<artifactId>scalacheck_${scala.binary.version}</artifactId> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.spark</groupId> | ||
<artifactId>spark-tags_${scala.binary.version}</artifactId> | ||
</dependency> | ||
<!-- #if scala-2.13 --><!-- | ||
<dependency> | ||
<groupId>org.scala-lang.modules</groupId> | ||
<artifactId>scala-parallel-collections_${scala.binary.version}</artifactId> | ||
</dependency> | ||
--><!-- #endif scala-2.13 --> | ||
<dependency> | ||
<groupId>com.google.protobuf</groupId> | ||
<artifactId>protobuf-java</artifactId> | ||
<version>${protobuf.version}</version> | ||
<scope>compile</scope> | ||
</dependency> | ||
Comment on lines
+80
to
+85
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we shade this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @holdenk I have already added shading for "com.google.protobuf" below in the plugins section. I'm I missing something? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should the import statements be using shaded packages? Btw, what is the reason for shading? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @rangadi shading and relocation is needed to avoid potential conflicts with other third-party libraries, such as hadoop(uses proto2+).
module spark/connector/connect also doing shading & relocation of google-profobuf, I'm following the same here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmm.. wonder how it works without importing the shaded library. When we shade, it is like a new package. If we don't import it, then we are using the default one. Yeah, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @rangadi after packaging if spark apps imports "spark-protobuf" jar, it won't find a "spark-protobuf" specific version of "com.google.protobuf:", instead it will have relocated one "org.sparkproject.spark-protobuf.protobuf:" and no one will be using the "spark-protobuf" protobuf version other than the "spark-protobuf" module. shading is just making an uber jar with the relocated classes. The shaded classes will be available for import only after the shaded jar has been added as a dependency in pom.xml, correct? found this online
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, let go with this. Not exactly clear what is happening. Spark experts might comment as required. |
||
|
||
</dependencies> | ||
<build> | ||
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> | ||
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> | ||
<plugins> | ||
<plugin> | ||
<groupId>org.apache.maven.plugins</groupId> | ||
<artifactId>maven-shade-plugin</artifactId> | ||
<configuration> | ||
<shadedArtifactAttached>false</shadedArtifactAttached> | ||
<artifactSet> | ||
<includes> | ||
<include>com.google.protobuf:*</include> | ||
</includes> | ||
</artifactSet> | ||
<relocations> | ||
<relocation> | ||
<pattern>com.google.protobuf</pattern> | ||
<shadedPattern>${spark.shade.packageName}.spark-protobuf.protobuf</shadedPattern> | ||
<includes> | ||
<include>com.google.protobuf.**</include> | ||
</includes> | ||
</relocation> | ||
</relocations> | ||
</configuration> | ||
</plugin> | ||
</plugins> | ||
</build> | ||
</project> |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.spark.sql.protobuf | ||
|
||
import com.google.protobuf.DynamicMessage | ||
|
||
import org.apache.spark.sql.catalyst.expressions.{Expression, UnaryExpression} | ||
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} | ||
import org.apache.spark.sql.protobuf.utils.ProtobufUtils | ||
import org.apache.spark.sql.types.{BinaryType, DataType} | ||
|
||
private[protobuf] case class CatalystDataToProtobuf( | ||
child: Expression, | ||
descFilePath: String, | ||
messageName: String) | ||
extends UnaryExpression { | ||
|
||
override def dataType: DataType = BinaryType | ||
|
||
@transient private lazy val protoType = | ||
ProtobufUtils.buildDescriptor(descFilePath, messageName) | ||
|
||
@transient private lazy val serializer = | ||
new ProtobufSerializer(child.dataType, protoType, child.nullable) | ||
|
||
override def nullSafeEval(input: Any): Any = { | ||
val dynamicMessage = serializer.serialize(input).asInstanceOf[DynamicMessage] | ||
dynamicMessage.toByteArray | ||
} | ||
|
||
override def prettyName: String = "to_protobuf" | ||
|
||
override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { | ||
val expr = ctx.addReferenceObj("this", this) | ||
defineCodeGen(ctx, ev, input => s"(byte[]) $expr.nullSafeEval($input)") | ||
} | ||
|
||
override protected def withNewChildInternal(newChild: Expression): CatalystDataToProtobuf = | ||
copy(child = newChild) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,160 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.spark.sql.protobuf | ||
|
||
import scala.collection.JavaConverters._ | ||
import scala.util.control.NonFatal | ||
|
||
import com.google.protobuf.DynamicMessage | ||
|
||
import org.apache.spark.SparkException | ||
import org.apache.spark.sql.AnalysisException | ||
import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, SpecificInternalRow, UnaryExpression} | ||
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode} | ||
import org.apache.spark.sql.catalyst.util.{FailFastMode, ParseMode, PermissiveMode} | ||
import org.apache.spark.sql.protobuf.utils.{ProtobufOptions, ProtobufUtils, SchemaConverters} | ||
import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType, StructType} | ||
|
||
private[protobuf] case class ProtobufDataToCatalyst( | ||
child: Expression, | ||
descFilePath: String, | ||
messageName: String, | ||
options: Map[String, String]) | ||
extends UnaryExpression | ||
with ExpectsInputTypes { | ||
|
||
override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType) | ||
|
||
override lazy val dataType: DataType = { | ||
val dt = SchemaConverters.toSqlType(messageDescriptor).dataType | ||
parseMode match { | ||
// With PermissiveMode, the output Catalyst row might contain columns of null values for | ||
// corrupt records, even if some of the columns are not nullable in the user-provided schema. | ||
// Therefore we force the schema to be all nullable here. | ||
case PermissiveMode => dt.asNullable | ||
case _ => dt | ||
} | ||
} | ||
|
||
override def nullable: Boolean = true | ||
|
||
private lazy val protobufOptions = ProtobufOptions(options) | ||
|
||
@transient private lazy val messageDescriptor = | ||
ProtobufUtils.buildDescriptor(descFilePath, messageName) | ||
|
||
@transient private lazy val fieldsNumbers = | ||
messageDescriptor.getFields.asScala.map(f => f.getNumber) | ||
|
||
@transient private lazy val deserializer = new ProtobufDeserializer(messageDescriptor, dataType) | ||
|
||
@transient private var result: DynamicMessage = _ | ||
|
||
@transient private lazy val parseMode: ParseMode = { | ||
val mode = protobufOptions.parseMode | ||
if (mode != PermissiveMode && mode != FailFastMode) { | ||
throw new AnalysisException(unacceptableModeMessage(mode.name)) | ||
} | ||
mode | ||
} | ||
|
||
private def unacceptableModeMessage(name: String): String = { | ||
s"from_protobuf() doesn't support the $name mode. " + | ||
s"Acceptable modes are ${PermissiveMode.name} and ${FailFastMode.name}." | ||
} | ||
|
||
@transient private lazy val nullResultRow: Any = dataType match { | ||
case st: StructType => | ||
val resultRow = new SpecificInternalRow(st.map(_.dataType)) | ||
for (i <- 0 until st.length) { | ||
resultRow.setNullAt(i) | ||
} | ||
resultRow | ||
|
||
case _ => | ||
null | ||
} | ||
|
||
private def handleException(e: Throwable): Any = { | ||
parseMode match { | ||
case PermissiveMode => | ||
nullResultRow | ||
case FailFastMode => | ||
throw new SparkException( | ||
"Malformed records are detected in record parsing. " + | ||
s"Current parse Mode: ${FailFastMode.name}. To process malformed records as null " + | ||
"result, try setting the option 'mode' as 'PERMISSIVE'.", | ||
e) | ||
case _ => | ||
throw new AnalysisException(unacceptableModeMessage(parseMode.name)) | ||
} | ||
} | ||
|
||
override def nullSafeEval(input: Any): Any = { | ||
val binary = input.asInstanceOf[Array[Byte]] | ||
try { | ||
result = DynamicMessage.parseFrom(messageDescriptor, binary) | ||
val unknownFields = result.getUnknownFields | ||
if (!unknownFields.asMap().isEmpty) { | ||
unknownFields.asMap().keySet().asScala.map { number => | ||
{ | ||
if (fieldsNumbers.contains(number)) { | ||
return handleException( | ||
new Throwable(s"Type mismatch encountered for field:" + | ||
s" ${messageDescriptor.getFields.get(number)}")) | ||
} | ||
} | ||
} | ||
} | ||
val deserialized = deserializer.deserialize(result) | ||
assert( | ||
deserialized.isDefined, | ||
"Protobuf deserializer cannot return an empty result because filters are not pushed down") | ||
deserialized.get | ||
} catch { | ||
// There could be multiple possible exceptions here, e.g. java.io.IOException, | ||
// ProtoRuntimeException, ArrayIndexOutOfBoundsException, etc. | ||
// To make it simple, catch all the exceptions here. | ||
case NonFatal(e) => | ||
handleException(e) | ||
} | ||
} | ||
|
||
override def prettyName: String = "from_protobuf" | ||
|
||
override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { | ||
val expr = ctx.addReferenceObj("this", this) | ||
nullSafeCodeGen( | ||
ctx, | ||
ev, | ||
eval => { | ||
val result = ctx.freshName("result") | ||
val dt = CodeGenerator.boxedType(dataType) | ||
s""" | ||
$dt $result = ($dt) $expr.nullSafeEval($eval); | ||
if ($result == null) { | ||
${ev.isNull} = true; | ||
} else { | ||
${ev.value} = $result; | ||
} | ||
""" | ||
}) | ||
} | ||
|
||
override protected def withNewChildInternal(newChild: Expression): ProtobufDataToCatalyst = | ||
copy(child = newChild) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a Jira open for this feature. We can open an EPIC. I am thinking of adding tasks under it.
cc: @mposdev21
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, there is no JIRA. Please go ahead and create the. EPIC.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created an Epic with 5 tasks under it: https://issues.apache.org/jira/browse/SPARK-40653
I see that PR title is already updated :). Nice.