Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-40654][SQL] Protobuf support for Spark - from_protobuf AND to_protobuf #37972

Closed
wants to merge 47 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
419dd78
spark support for protobuf from_proto
Aug 11, 2022
870dbbf
read .pb fileformat
Aug 18, 2022
077ac9e
read .pb fileformat
Aug 18, 2022
d605bc5
to_proto version
Aug 21, 2022
1ea816a
Working copy of from_proto and to_proto
Aug 27, 2022
ac94e14
Merge remote-tracking branch 'apache-github/master' into SPARK_FROM_P…
Aug 27, 2022
d2710c6
Initial from_proto and to_proto support
Aug 28, 2022
c7fca1c
Abstracted the array operations into newArrayWriter
mposdev21 Aug 30, 2022
4c9bd74
Merge pull request #1 from SandishKumarHN/redo-array
SandishKumarHN Aug 30, 2022
7d60f9e
Added test cases.
mposdev21 Aug 30, 2022
0435dd7
Merge commit 'refs/pull/2/head' of https://github.com/SandishKumarHN/…
Aug 31, 2022
85884f7
Merge pull request #2 from SandishKumarHN/redo-array
SandishKumarHN Aug 31, 2022
0e0a4d5
Merge branch 'SPARK_PROTO_1' of https://github.com/SandishKumarHN/spa…
Aug 31, 2022
5d57331
remove use of proto compiler, ProtoSerdeSuite, ProtoFunctionsSuite tests
Sep 3, 2022
c8ac0d4
Added new test cases for the following:
mposdev21 Sep 3, 2022
6aeb274
Merge branch 'SPARK_PROTO_1' of github.com:SandishKumarHN/spark into …
mposdev21 Sep 3, 2022
63d648d
Fixed formatting problems and removed unused argument (ProtoDeseriali…
mposdev21 Sep 4, 2022
d24f873
Filter push down works now after making sure the sql schema
mposdev21 Sep 4, 2022
9185cc8
Don't handle exceptions locally. Spark tests need to see them.
mposdev21 Sep 7, 2022
8a2f493
serde suite sql type
Sep 7, 2022
75a4e5f
Remove the failing test (multiple_messages.desc not found)
mposdev21 Sep 7, 2022
c86c5d8
Added a test for bad schema. DynamicMessage parseFrom parses
mposdev21 Sep 15, 2022
be82d92
fixing scala style issues
Sep 15, 2022
849213e
Fix the typo in the test case
mposdev21 Sep 19, 2022
adbfaf1
implementing to_proto without descriptor funtion
SandishKumarHN Sep 20, 2022
a418407
Merge branch 'SPARK_PROTO_1' of https://github.com/SandishKumarHN/spa…
SandishKumarHN Sep 20, 2022
f7ca9e7
fixing style check issues
SandishKumarHN Sep 20, 2022
a65c131
adding PROTO_REBASE_MODE_IN_READ support
SandishKumarHN Sep 20, 2022
e0c22d2
CleanMessage to BasicMessage SerdeSuite
SandishKumarHN Sep 21, 2022
780d119
some nit changes
SandishKumarHN Sep 22, 2022
e22b763
nit picks
SandishKumarHN Sep 22, 2022
e1fde67
Merge branch 'master' of https://github.com/apache/spark into SPARK_P…
SandishKumarHN Sep 22, 2022
1901cd4
including proto to SparkBuild
SandishKumarHN Sep 22, 2022
121630c
scala 13 compilation changes
SandishKumarHN Sep 22, 2022
e8a7c2b
fixing build issues
SandishKumarHN Sep 23, 2022
6cfad63
Added a new test case for from_proto->filter->to_proto
mposdev21 Sep 25, 2022
4b824bd
Merge remote-tracking branch 'remote-spark/master' into SPARK_PROTO_1
SandishKumarHN Sep 25, 2022
2d2a822
spark-proto: shading google protobuf jars
SandishKumarHN Sep 28, 2022
7cdf9dd
mapsupport, rename proto-protobuf, schema-evolution, catch-recursive …
SandishKumarHN Sep 29, 2022
a90800c
Nit, cleanup unused
SandishKumarHN Oct 3, 2022
6f447e8
stylecheck, remove positional match, handle null values
SandishKumarHN Oct 5, 2022
7aa678c
Merge remote-tracking branch 'remote-spark/master' into SPARK_PROTO_1
SandishKumarHN Oct 6, 2022
c6d45b7
required field, remove tukaani, github labeler
SandishKumarHN Oct 6, 2022
58151dc
google protobuf shading changes
SandishKumarHN Oct 6, 2022
03d791c
Timestamp support and use latest error msg framework
SandishKumarHN Oct 11, 2022
d7c40ac
DayTimeIntervalType and Duration support
SandishKumarHN Oct 11, 2022
330d01e
move arraywriter mapping to single line
SandishKumarHN Oct 11, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/labeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -155,3 +155,6 @@ CONNECT:
- "connector/connect/**/*"
- "**/sql/sparkconnect/**/*"
- "python/pyspark/sql/**/connect/**/*"
PROTOBUF:
- "connector/protobuf/**/*"
- "python/pyspark/sql/protobuf/**/*"
115 changes: 115 additions & 0 deletions connector/protobuf/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
<?xml version="1.0" encoding="UTF-8"?>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a Jira open for this feature. We can open an EPIC. I am thinking of adding tasks under it.
cc: @mposdev21

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, there is no JIRA. Please go ahead and create the. EPIC.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created an Epic with 5 tasks under it: https://issues.apache.org/jira/browse/SPARK-40653
I see that PR title is already updated :). Nice.

<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am planning to work on Python support. @mposdev21 @SandishKumarHN let me know if you have already changes for it, otherwise, I can get started.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rangadi we already have the changes ready. should we create a separate PR now?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, please go ahead with the PR if you can. We can start reviewing it even as this PR is being merged.

~ contributor license agreements. See the NOTICE file distributed with
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about schema-registry support? I can look into that. Let me know @mposdev21, @SandishKumarHN

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, Raghu. Go ahead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will take that.

~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.4.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<artifactId>spark-protobuf_2.12</artifactId>
<properties>
<sbt.project.name>protobuf</sbt.project.name>
<protobuf.version>3.21.1</protobuf.version>
</properties>
<packaging>jar</packaging>
<name>Spark Protobuf</name>
<url>https://spark.apache.org/</url>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-tags_${scala.binary.version}</artifactId>
</dependency>
<!-- #if scala-2.13 --><!--
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-parallel-collections_${scala.binary.version}</artifactId>
</dependency>
--><!-- #endif scala-2.13 -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
<scope>compile</scope>
</dependency>
Comment on lines +80 to +85
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we shade this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@holdenk I have already added shading for "com.google.protobuf" below in the plugins section. I'm I missing something?

Copy link
Contributor

@rangadi rangadi Oct 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the import statements be using shaded packages? Btw, what is the reason for shading?
What blocks us from using Spark's version of Protobuf version?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rangadi shading and relocation is needed to avoid potential conflicts with other third-party libraries, such as hadoop(uses proto2+).
more discussions here #37075 (comment) , we don't need to change imports to shaded ones, I have tested the same on the cluster. without shading was getting below error.

Exception in thread "main" java.lang.NoSuchMethodError: 'boolean com.google.protobuf.DescriptorProtos$MessageOptions.getMapEntry()'
        at org.apache.spark.sql.protobuf.utils.SchemaConverters$.structFieldFor(SchemaConverters.scala:65)

module spark/connector/connect also doing shading & relocation of google-profobuf, I'm following the same here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm.. wonder how it works without importing the shaded library. When we shade, it is like a new package. If we don't import it, then we are using the default one.

Yeah, MapEntry does not exist in v2 library I guess.

Copy link
Contributor Author

@SandishKumarHN SandishKumarHN Oct 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rangadi after packaging if spark apps imports "spark-protobuf" jar, it won't find a "spark-protobuf" specific version of "com.google.protobuf:", instead it will have relocated one "org.sparkproject.spark-protobuf.protobuf:" and no one will be using the "spark-protobuf" protobuf version other than the "spark-protobuf" module. shading is just making an uber jar with the relocated classes. The shaded classes will be available for import only after the shaded jar has been added as a dependency in pom.xml, correct?

found this online

If the uber JAR is reused as a dependency of some other project, directly including classes
from the artifact's dependencies in the uber JAR can cause class loading conflicts due to duplicate
classes on the class path. To address this issue, one can relocate the classes which get included 
in the shaded artifact in order to create a private copy of their bytecode

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, let go with this. Not exactly clear what is happening. Spark experts might comment as required.


</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<artifactSet>
<includes>
<include>com.google.protobuf:*</include>
</includes>
</artifactSet>
<relocations>
<relocation>
<pattern>com.google.protobuf</pattern>
<shadedPattern>${spark.shade.packageName}.spark-protobuf.protobuf</shadedPattern>
<includes>
<include>com.google.protobuf.**</include>
</includes>
</relocation>
</relocations>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.protobuf

import com.google.protobuf.DynamicMessage

import org.apache.spark.sql.catalyst.expressions.{Expression, UnaryExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.protobuf.utils.ProtobufUtils
import org.apache.spark.sql.types.{BinaryType, DataType}

private[protobuf] case class CatalystDataToProtobuf(
child: Expression,
descFilePath: String,
messageName: String)
extends UnaryExpression {

override def dataType: DataType = BinaryType

@transient private lazy val protoType =
ProtobufUtils.buildDescriptor(descFilePath, messageName)

@transient private lazy val serializer =
new ProtobufSerializer(child.dataType, protoType, child.nullable)

override def nullSafeEval(input: Any): Any = {
val dynamicMessage = serializer.serialize(input).asInstanceOf[DynamicMessage]
dynamicMessage.toByteArray
}

override def prettyName: String = "to_protobuf"

override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val expr = ctx.addReferenceObj("this", this)
defineCodeGen(ctx, ev, input => s"(byte[]) $expr.nullSafeEval($input)")
}

override protected def withNewChildInternal(newChild: Expression): CatalystDataToProtobuf =
copy(child = newChild)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.protobuf

import scala.collection.JavaConverters._
import scala.util.control.NonFatal

import com.google.protobuf.DynamicMessage

import org.apache.spark.SparkException
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, SpecificInternalRow, UnaryExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode}
import org.apache.spark.sql.catalyst.util.{FailFastMode, ParseMode, PermissiveMode}
import org.apache.spark.sql.protobuf.utils.{ProtobufOptions, ProtobufUtils, SchemaConverters}
import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType, StructType}

private[protobuf] case class ProtobufDataToCatalyst(
child: Expression,
descFilePath: String,
messageName: String,
options: Map[String, String])
extends UnaryExpression
with ExpectsInputTypes {

override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType)

override lazy val dataType: DataType = {
val dt = SchemaConverters.toSqlType(messageDescriptor).dataType
parseMode match {
// With PermissiveMode, the output Catalyst row might contain columns of null values for
// corrupt records, even if some of the columns are not nullable in the user-provided schema.
// Therefore we force the schema to be all nullable here.
case PermissiveMode => dt.asNullable
case _ => dt
}
}

override def nullable: Boolean = true

private lazy val protobufOptions = ProtobufOptions(options)

@transient private lazy val messageDescriptor =
ProtobufUtils.buildDescriptor(descFilePath, messageName)

@transient private lazy val fieldsNumbers =
messageDescriptor.getFields.asScala.map(f => f.getNumber)

@transient private lazy val deserializer = new ProtobufDeserializer(messageDescriptor, dataType)

@transient private var result: DynamicMessage = _

@transient private lazy val parseMode: ParseMode = {
val mode = protobufOptions.parseMode
if (mode != PermissiveMode && mode != FailFastMode) {
throw new AnalysisException(unacceptableModeMessage(mode.name))
}
mode
}

private def unacceptableModeMessage(name: String): String = {
s"from_protobuf() doesn't support the $name mode. " +
s"Acceptable modes are ${PermissiveMode.name} and ${FailFastMode.name}."
}

@transient private lazy val nullResultRow: Any = dataType match {
case st: StructType =>
val resultRow = new SpecificInternalRow(st.map(_.dataType))
for (i <- 0 until st.length) {
resultRow.setNullAt(i)
}
resultRow

case _ =>
null
}

private def handleException(e: Throwable): Any = {
parseMode match {
case PermissiveMode =>
nullResultRow
case FailFastMode =>
throw new SparkException(
"Malformed records are detected in record parsing. " +
s"Current parse Mode: ${FailFastMode.name}. To process malformed records as null " +
"result, try setting the option 'mode' as 'PERMISSIVE'.",
e)
case _ =>
throw new AnalysisException(unacceptableModeMessage(parseMode.name))
}
}

override def nullSafeEval(input: Any): Any = {
val binary = input.asInstanceOf[Array[Byte]]
try {
result = DynamicMessage.parseFrom(messageDescriptor, binary)
val unknownFields = result.getUnknownFields
if (!unknownFields.asMap().isEmpty) {
unknownFields.asMap().keySet().asScala.map { number =>
{
if (fieldsNumbers.contains(number)) {
return handleException(
new Throwable(s"Type mismatch encountered for field:" +
s" ${messageDescriptor.getFields.get(number)}"))
}
}
}
}
val deserialized = deserializer.deserialize(result)
assert(
deserialized.isDefined,
"Protobuf deserializer cannot return an empty result because filters are not pushed down")
deserialized.get
} catch {
// There could be multiple possible exceptions here, e.g. java.io.IOException,
// ProtoRuntimeException, ArrayIndexOutOfBoundsException, etc.
// To make it simple, catch all the exceptions here.
case NonFatal(e) =>
handleException(e)
}
}

override def prettyName: String = "from_protobuf"

override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val expr = ctx.addReferenceObj("this", this)
nullSafeCodeGen(
ctx,
ev,
eval => {
val result = ctx.freshName("result")
val dt = CodeGenerator.boxedType(dataType)
s"""
$dt $result = ($dt) $expr.nullSafeEval($eval);
if ($result == null) {
${ev.isNull} = true;
} else {
${ev.value} = $result;
}
"""
})
}

override protected def withNewChildInternal(newChild: Expression): ProtobufDataToCatalyst =
copy(child = newChild)
}
Loading