Skip to content

Commit

Permalink
[SPARK-19548][SQL] Support Hive UDFs which return typed Lists/Maps
Browse files Browse the repository at this point in the history
This is a backport of apache@226d388

## What changes were proposed in this pull request?
This PR adds support for Hive UDFs that return fully typed java Lists or Maps, for example `List<String>` or `Map<String, Integer>`.  It is also allowed to nest these structures, for example `Map<String, List<Integer>>`. Raw collections or collections using wildcards are still not supported, and cannot be supported due to the lack of type information.

## How was this patch tested?
Modified existing tests in `HiveUDFSuite`, and I have added test cases for raw collection and collection using wildcards.

Author: Herman van Hovell <[email protected]>

Closes apache#218 from hvanhovell/SPARK-19548.
  • Loading branch information
hvanhovell committed Feb 14, 2017
1 parent 20aa6cc commit 34f79ce
Show file tree
Hide file tree
Showing 11 changed files with 250 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.hive

import java.lang.reflect.{ParameterizedType, Type, WildcardType}

import scala.collection.JavaConverters._

import org.apache.hadoop.{io => hadoopIo}
Expand Down Expand Up @@ -178,7 +180,7 @@ import org.apache.spark.unsafe.types.UTF8String
*/
private[hive] trait HiveInspectors {

def javaClassToDataType(clz: Class[_]): DataType = clz match {
def javaTypeToDataType(clz: Type): DataType = clz match {
// writable
case c: Class[_] if c == classOf[hadoopIo.DoubleWritable] => DoubleType
case c: Class[_] if c == classOf[hiveIo.DoubleWritable] => DoubleType
Expand Down Expand Up @@ -218,26 +220,42 @@ private[hive] trait HiveInspectors {
case c: Class[_] if c == java.lang.Float.TYPE => FloatType
case c: Class[_] if c == java.lang.Boolean.TYPE => BooleanType

case c: Class[_] if c.isArray => ArrayType(javaClassToDataType(c.getComponentType))
case c: Class[_] if c.isArray => ArrayType(javaTypeToDataType(c.getComponentType))

// Hive seems to return this for struct types?
case c: Class[_] if c == classOf[java.lang.Object] => NullType

// java list type unsupported
case c: Class[_] if c == classOf[java.util.List[_]] =>
case p: ParameterizedType if isSubClassOf(p.getRawType, classOf[java.util.List[_]]) =>
val Array(elementType) = p.getActualTypeArguments
ArrayType(javaTypeToDataType(elementType))

case p: ParameterizedType if isSubClassOf(p.getRawType, classOf[java.util.Map[_, _]]) =>
val Array(keyType, valueType) = p.getActualTypeArguments
MapType(javaTypeToDataType(keyType), javaTypeToDataType(valueType))

// raw java list type unsupported
case c: Class[_] if isSubClassOf(c, classOf[java.util.List[_]]) =>
throw new AnalysisException(
"List type in java is unsupported because " +
"JVM type erasure makes spark fail to catch a component type in List<>")
"Raw list type in java is unsupported because Spark cannot infer the element type.")

// java map type unsupported
case c: Class[_] if c == classOf[java.util.Map[_, _]] =>
// raw java map type unsupported
case c: Class[_] if isSubClassOf(c, classOf[java.util.Map[_, _]]) =>
throw new AnalysisException(
"Map type in java is unsupported because " +
"JVM type erasure makes spark fail to catch key and value types in Map<>")
"Raw map type in java is unsupported because Spark cannot infer key and value types.")

case _: WildcardType =>
throw new AnalysisException(
"Collection types with wildcards (e.g. List<?> or Map<?, ?>) are unsupported because " +
"Spark cannot infer the data type for these type parameters.")

case c => throw new AnalysisException(s"Unsupported java type $c")
}

private def isSubClassOf(t: Type, parent: Class[_]): Boolean = t match {
case cls: Class[_] => parent.isAssignableFrom(cls)
case _ => false
}

private def withNullSafe(f: Any => Any): Any => Any = {
input => if (input == null) null else f(input)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ case class HiveSimpleUDF(
@transient
private lazy val conversionHelper = new ConversionHelper(method, arguments)

override lazy val dataType = javaClassToDataType(method.getReturnType)
override lazy val dataType = javaTypeToDataType(method.getGenericReturnType)

@transient
private lazy val wrappers = children.map(x => wrapperFor(toInspector(x), x.dataType)).toArray
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive.execution;

import org.apache.hadoop.hive.ql.exec.UDF;

import java.util.Collections;
import java.util.List;

/**
* UDF that returns a raw (non-parameterized) java List.
*/
public class UDFRawList extends UDF {
public List evaluate(Object o) {
return Collections.singletonList("data1");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive.execution;

import org.apache.hadoop.hive.ql.exec.UDF;

import java.util.Collections;
import java.util.Map;

/**
* UDF that returns a raw (non-parameterized) java Map.
*/
public class UDFRawMap extends UDF {
public Map evaluate(Object o) {
return Collections.singletonMap("a", "1");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@
import java.util.Map;

public class UDFToIntIntMap extends UDF {
public Map<Integer, Integer> evaluate(Object o) {
return new HashMap<Integer, Integer>() {
{
put(1, 1);
put(2, 1);
put(3, 1);
}
};
}
public Map<Integer, Integer> evaluate(Object o) {
return new HashMap<Integer, Integer>() {
{
put(1, 1);
put(2, 1);
put(3, 1);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@

import org.apache.hadoop.hive.ql.exec.UDF;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class UDFToListInt extends UDF {
public List<Integer> evaluate(Object o) {
return Arrays.asList(1, 2, 3);
}
public ArrayList<Integer> evaluate(Object o) {
return new ArrayList<>(Arrays.asList(1, 2, 3));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive.execution;

import org.apache.hadoop.hive.ql.exec.UDF;

import java.util.*;

/**
* UDF that returns a nested list of maps that uses a string as its key and a list of ints as its
* values.
*/
public class UDFToListMapStringListInt extends UDF {
public List<Map<String, List<Integer>>> evaluate(Object o) {
final Map<String, List<Integer>> map = new HashMap<>();
map.put("a", Arrays.asList(1, 2));
map.put("b", Arrays.asList(3, 4));
return Collections.singletonList(map);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import java.util.List;

public class UDFToListString extends UDF {
public List<String> evaluate(Object o) {
return Arrays.asList("data1", "data2", "data3");
}
public List<String> evaluate(Object o) {
return Arrays.asList("data1", "data2", "data3");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,15 @@
import org.apache.hadoop.hive.ql.exec.UDF;

import java.util.HashMap;
import java.util.Map;

public class UDFToStringIntMap extends UDF {
public Map<String, Integer> evaluate(Object o) {
return new HashMap<String, Integer>() {
{
put("key1", 1);
put("key2", 2);
put("key3", 3);
}
};
}
public HashMap<String, Integer> evaluate(Object o) {
return new HashMap<String, Integer>() {
{
put("key1", 1);
put("key2", 2);
put("key3", 3);
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive.execution;

import org.apache.hadoop.hive.ql.exec.UDF;

import java.util.Collections;
import java.util.List;

/**
* UDF that returns a raw (non-parameterized) java List.
*/
public class UDFWildcardList extends UDF {
public List<?> evaluate(Object o) {
return Collections.singletonList("data1");
}
}
Loading

0 comments on commit 34f79ce

Please sign in to comment.