Skip to content

Commit

Permalink
[SPARK-37013][CORE][SQL][FOLLOWUP] Use the new error framework to thr…
Browse files Browse the repository at this point in the history
…ow error in `FormatString`

### What changes were proposed in this pull request?
This is a followup of apache/spark#34313. The main change of this pr is  change to use the new error framework to throw error when `pattern.contains("%0$")` is true.

### Why are the changes needed?
Use the new error framework to throw error

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass the Jenkins or GitHub Action

Closes #34454 from LuciferYang/SPARK-37013-FOLLOWUP.

Authored-by: yangjie01 <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
a0x8o committed Nov 23, 2021
1 parent 2f810a7 commit d852b72
Show file tree
Hide file tree
Showing 75 changed files with 1,668 additions and 1,061 deletions.
2 changes: 1 addition & 1 deletion R/pkg/R/pairRDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ setMethod("values",
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:10)
#' makePairs <- lapply(rdd, function(x) { list(x, x) })
#' collectRDD(mapValues(makePairs, function(x) { x * 2) })
#' collectRDD(mapValues(makePairs, function(x) { x * 2 }))
#' Output: list(list(1,2), list(2,4), list(3,6), ...)
#'}
#' @rdname mapValues
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public static void cleanup() throws Exception {

@Override
protected KVStore createStore() throws Exception {
assumeFalse(SystemUtils.IS_OS_MAC_OSX && System.getProperty("os.arch").equals("aarch64"));
assumeFalse(SystemUtils.IS_OS_MAC_OSX && SystemUtils.OS_ARCH.equals("aarch64"));
dbpath = File.createTempFile("test.", ".ldb");
dbpath.delete();
db = new LevelDB(dbpath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void cleanup() throws Exception {

@Before
public void setup() throws Exception {
assumeFalse(SystemUtils.IS_OS_MAC_OSX && System.getProperty("os.arch").equals("aarch64"));
assumeFalse(SystemUtils.IS_OS_MAC_OSX && SystemUtils.OS_ARCH.equals("aarch64"));
dbpath = File.createTempFile("test.", ".ldb");
dbpath.delete();
db = new LevelDB(dbpath);
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@
"IF_PARTITION_NOT_EXISTS_UNSUPPORTED" : {
"message" : [ "Cannot write, IF NOT EXISTS is not supported for table: %s" ]
},
"ILLEGAL_SUBSTRING" : {
"message" : [ "%s cannot contain %s." ]
},
"INCOMPARABLE_PIVOT_COLUMN" : {
"message" : [ "Invalid pivot column '%s'. Pivot columns must be comparable." ],
"sqlState" : "42000"
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1961,6 +1961,11 @@ private[spark] object Utils extends Logging {
*/
val isMac = SystemUtils.IS_OS_MAC_OSX

/**
* Whether the underlying operating system is Mac OS X and processor is Apple Silicon.
*/
val isMacOnAppleSilicon = SystemUtils.IS_OS_MAC_OSX && SystemUtils.OS_ARCH.equals("aarch64")

/**
* Pattern for matching a Windows drive, which contains only a single alphabet character.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1652,7 +1652,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {

if (!inMemory) {
// LevelDB doesn't support Apple Silicon yet
assume(!(Utils.isMac && System.getProperty("os.arch").equals("aarch64")))
assume(!Utils.isMacOnAppleSilicon)
conf.set(LOCAL_STORE_DIR, Utils.createTempDir().getAbsolutePath())
}
conf.set(HYBRID_STORE_ENABLED, useHybridStore)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
.set(EXECUTOR_PROCESS_TREE_METRICS_ENABLED, true)
conf.setAll(extraConf)
// Since LevelDB doesn't support Apple Silicon yet, fallback to in-memory provider
if (Utils.isMac && System.getProperty("os.arch").equals("aarch64")) {
if (Utils.isMacOnAppleSilicon) {
conf.remove(LOCAL_STORE_DIR)
}
provider = new FsHistoryProvider(conf)
Expand Down Expand Up @@ -389,7 +389,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
.set(LOCAL_STORE_DIR, storeDir.getAbsolutePath())
.remove(IS_TESTING)
// Since LevelDB doesn't support Apple Silicon yet, fallback to in-memory provider
if (Utils.isMac && System.getProperty("os.arch").equals("aarch64")) {
if (Utils.isMacOnAppleSilicon) {
myConf.remove(LOCAL_STORE_DIR)
}
val provider = new FsHistoryProvider(myConf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class AppStatusStoreSuite extends SparkFunSuite {
return AppStatusStore.createLiveStore(conf)
}
// LevelDB doesn't support Apple Silicon yet
if (Utils.isMac && System.getProperty("os.arch").equals("aarch64") && disk) {
if (Utils.isMacOnAppleSilicon && disk) {
return null
}

Expand Down
35 changes: 26 additions & 9 deletions dev/lint-python
Original file line number Diff line number Diff line change
Expand Up @@ -182,23 +182,40 @@ function mypy_data_test {
fi
}

function mypy_examples_test {
local MYPY_REPORT=
local MYPY_STATUS=

function mypy_test {
if ! hash "$MYPY_BUILD" 2> /dev/null; then
echo "The $MYPY_BUILD command was not found. Skipping for now."
return
echo "starting mypy examples test..."

MYPY_REPORT=$( (MYPYPATH=python $MYPY_BUILD \
--allow-untyped-defs \
--config-file python/mypy.ini \
--exclude "mllib/*" \
examples/src/main/python/) 2>&1)

MYPY_STATUS=$?

if [ "$MYPY_STATUS" -ne 0 ]; then
echo "examples failed mypy checks:"
echo "$MYPY_REPORT"
echo "$MYPY_STATUS"
exit "$MYPY_STATUS"
else
echo "examples passed mypy checks."
echo
fi
}

_MYPY_VERSION=($($MYPY_BUILD --version))
MYPY_VERSION="${_MYPY_VERSION[1]}"
EXPECTED_MYPY="$(satisfies_min_version $MYPY_VERSION $MINIMUM_MYPY)"

if [[ "$EXPECTED_MYPY" == "False" ]]; then
echo "The minimum mypy version needs to be $MINIMUM_MYPY. Your current version is $MYPY_VERSION. Skipping for now."
function mypy_test {
if ! hash "$MYPY_BUILD" 2> /dev/null; then
echo "The $MYPY_BUILD command was not found. Skipping for now."
return
fi

mypy_annotation_test
mypy_examples_test
mypy_data_test
}

Expand Down
40 changes: 30 additions & 10 deletions docs/sql-ref-ansi-compliance.md
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,6 @@ This is a graphical depiction of the precedence list as a directed tree:
The least common type from a set of types is the narrowest type reachable from the precedence list by all elements of the set of types.

The least common type resolution is used to:
- Decide whether a function expecting a parameter of a type can be invoked using an argument of a narrower type.
- Derive the argument type for functions which expect a shared argument type for multiple parameters, such as coalesce, least, or greatest.
- Derive the operand types for operators such as arithmetic operations or comparisons.
- Derive the result type for expressions such as the case expression.
Expand All @@ -246,19 +245,40 @@ DOUBLE
> SELECT (typeof(coalesce(1BD, 1F)));
DOUBLE

-- The substring function expects arguments of type INT for the start and length parameters.
> SELECT substring('hello', 1Y, 2);
he
> SELECT substring('hello', '1', 2);
he
> SELECT substring('hello', 1L, 2);
Error: Argument 2 requires an INT type.
> SELECT substring('hello', str, 2) FROM VALUES(CAST('1' AS STRING)) AS T(str);
Error: Argument 2 requires an INT type.
```

### SQL Functions
#### Function invocation
Under ANSI mode(spark.sql.ansi.enabled=true), the function invocation of Spark SQL:
- In general, it follows the `Store assignment` rules as storing the input values as the declared parameter type of the SQL functions
- Special rules apply for string literals and untyped NULL. A NULL can be promoted to any other type, while a string literal can be promoted to any simple data type.

```sql
> SET spark.sql.ansi.enabled=true;
-- implicitly cast Int to String type
> SELECT concat('total number: ', 1);
total number: 1
-- implicitly cast Timestamp to Date type
> select datediff(now(), current_date);
0

-- specialrule: implicitly cast String literal to Double type
> SELECT ceil('0.1');
1
-- specialrule: implicitly cast NULL to Date type
> SELECT year(null);
NULL

> CREATE TABLE t(s string);
-- Can't store String column as Numeric types.
> SELECT ceil(s) from t;
Error in query: cannot resolve 'CEIL(spark_catalog.default.t.s)' due to data type mismatch
-- Can't store String column as Date type.
> select year(s) from t;
Error in query: cannot resolve 'year(spark_catalog.default.t.s)' due to data type mismatch
```

#### Functions with different behaviors
The behavior of some SQL functions can be different under ANSI mode (`spark.sql.ansi.enabled=true`).
- `size`: This function returns null for null input.
- `element_at`:
Expand Down
16 changes: 16 additions & 0 deletions examples/src/main/python/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
20 changes: 10 additions & 10 deletions examples/src/main/python/als.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,26 +77,26 @@ def update(i, mat, ratings):
(M, U, F, ITERATIONS, partitions))

R = matrix(rand(M, F)) * matrix(rand(U, F).T)
ms = matrix(rand(M, F))
us = matrix(rand(U, F))
ms: matrix = matrix(rand(M, F))
us: matrix = matrix(rand(U, F))

Rb = sc.broadcast(R)
msb = sc.broadcast(ms)
usb = sc.broadcast(us)

for i in range(ITERATIONS):
ms = sc.parallelize(range(M), partitions) \
.map(lambda x: update(x, usb.value, Rb.value)) \
.collect()
ms_ = sc.parallelize(range(M), partitions) \
.map(lambda x: update(x, usb.value, Rb.value)) \
.collect()
# collect() returns a list, so array ends up being
# a 3-d array, we take the first 2 dims for the matrix
ms = matrix(np.array(ms)[:, :, 0])
ms = matrix(np.array(ms_)[:, :, 0])
msb = sc.broadcast(ms)

us = sc.parallelize(range(U), partitions) \
.map(lambda x: update(x, msb.value, Rb.value.T)) \
.collect()
us = matrix(np.array(us)[:, :, 0])
us_ = sc.parallelize(range(U), partitions) \
.map(lambda x: update(x, msb.value, Rb.value.T)) \
.collect()
us = matrix(np.array(us_)[:, :, 0])
usb = sc.broadcast(us)

error = rmse(R, ms, us)
Expand Down
4 changes: 3 additions & 1 deletion examples/src/main/python/avro_inputformat.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,10 @@
{u'favorite_color': u'red', u'name': u'Ben'}
"""
import sys
from typing import Any, Tuple

from functools import reduce
from pyspark.rdd import RDD
from pyspark.sql import SparkSession

if __name__ == "__main__":
Expand Down Expand Up @@ -75,7 +77,7 @@
schema_rdd = sc.textFile(sys.argv[2], 1).collect()
conf = {"avro.schema.input.key": reduce(lambda x, y: x + y, schema_rdd)}

avro_rdd = sc.newAPIHadoopFile(
avro_rdd: RDD[Tuple[Any, None]] = sc.newAPIHadoopFile(
path,
"org.apache.avro.mapreduce.AvroKeyInputFormat",
"org.apache.avro.mapred.AvroKey",
Expand Down
16 changes: 16 additions & 0 deletions examples/src/main/python/ml/__init__,py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
5 changes: 5 additions & 0 deletions examples/src/main/python/ml/chi_square_test_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@
df = spark.createDataFrame(data, ["label", "features"])

r = ChiSquareTest.test(df, "features", "label").head()

# $example off$
assert r is not None
# $example on$

print("pValues: " + str(r.pValues))
print("degreesOfFreedom: " + str(r.degreesOfFreedom))
print("statistics: " + str(r.statistics))
Expand Down
8 changes: 8 additions & 0 deletions examples/src/main/python/ml/correlation_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,17 @@
df = spark.createDataFrame(data, ["features"])

r1 = Correlation.corr(df, "features").head()

# $example off$
assert r1 is not None
# $example on$
print("Pearson correlation matrix:\n" + str(r1[0]))

r2 = Correlation.corr(df, "features", "spearman").head()

# $example off$
assert r2 is not None
# $example on$
print("Spearman correlation matrix:\n" + str(r2[0]))
# $example off$

Expand Down
16 changes: 16 additions & 0 deletions examples/src/main/python/mllib/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
4 changes: 3 additions & 1 deletion examples/src/main/python/parquet_inputformat.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
<...more log output...>
"""
import sys
from typing import Any, Tuple

from pyspark.rdd import RDD
from pyspark.sql import SparkSession

if __name__ == "__main__":
Expand All @@ -54,7 +56,7 @@

sc = spark.sparkContext

parquet_rdd = sc.newAPIHadoopFile(
parquet_rdd: RDD[Tuple[None, Any]] = sc.newAPIHadoopFile(
path,
'org.apache.parquet.avro.AvroParquetInputFormat',
'java.lang.Void',
Expand Down
4 changes: 3 additions & 1 deletion examples/src/main/python/sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
#

import sys
from typing import Tuple

from pyspark.rdd import RDD
from pyspark.sql import SparkSession


Expand All @@ -31,7 +33,7 @@
.getOrCreate()

lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
sortedCount = lines.flatMap(lambda x: x.split(' ')) \
sortedCount: RDD[Tuple[int, int]] = lines.flatMap(lambda x: x.split(' ')) \
.map(lambda x: (int(x), 1)) \
.sortByKey()
# This is just a demo on how to bring all the sorted data back to a single node.
Expand Down
16 changes: 16 additions & 0 deletions examples/src/main/python/sql/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
Loading

0 comments on commit d852b72

Please sign in to comment.