Skip to content

Commit

Permalink
[SPARK-40360] *_ALREADY_EXISTS and *_NOT_FOUND errors
Browse files Browse the repository at this point in the history
  • Loading branch information
srielau committed Sep 14, 2022
1 parent 801faba commit ac0fcf7
Show file tree
Hide file tree
Showing 86 changed files with 1,232 additions and 448 deletions.
17 changes: 8 additions & 9 deletions R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,7 @@ test_that("test tableExists, cache, uncache and clearCache", {
clearCache()

expect_error(uncacheTable("zxwtyswklpf"),
"Error in uncacheTable : analysis error - Table or view not found: zxwtyswklpf")
"[TABLE_OR_VIEW_NOT_FOUND] The table or view `zxwtyswklpf` cannot be found.")

expect_true(tableExists("table1"))
expect_true(tableExists("default.table1"))
Expand Down Expand Up @@ -3367,8 +3367,8 @@ test_that("approxQuantile() on a DataFrame", {

test_that("SQL error message is returned from JVM", {
retError <- tryCatch(sql("select * from blah"), error = function(e) e)
expect_equal(grepl("Table or view not found", retError), TRUE)
expect_equal(grepl("blah", retError), TRUE)
expect_equal(grepl("[TABLE_OR_VIEW_NOT_FOUND]", retError), TRUE)
expect_equal(grepl("`blah`", retError), TRUE)
})

irisDF <- suppressWarnings(createDataFrame(iris))
Expand Down Expand Up @@ -4076,8 +4076,7 @@ test_that("catalog APIs, currentDatabase, setCurrentDatabase, listDatabases, get
expect_equal(currentDatabase(), "default")
expect_error(setCurrentDatabase("default"), NA)
expect_error(setCurrentDatabase("zxwtyswklpf"),
paste0("Error in setCurrentDatabase : no such database - Database ",
"'zxwtyswklpf' not found"))
"[SCHEMA_NOT_FOUND] The schema `zxwtyswklpf` cannot be found.")

expect_true(databaseExists("default"))
expect_true(databaseExists("spark_catalog.default"))
Expand Down Expand Up @@ -4109,15 +4108,16 @@ test_that("catalog APIs, listTables, getTable, listColumns, listFunctions, funct
tbs <- collect(tb)
expect_true(nrow(tbs[tbs$name == "cars", ]) > 0)
expect_error(listTables("bar"),
"Error in listTables : no such database - Database 'bar' not found")
"[SCHEMA_NOT_FOUND] The schema `bar` cannot be found.")

c <- listColumns("cars")
expect_equal(nrow(c), 2)
expect_equal(colnames(c),
c("name", "description", "dataType", "nullable", "isPartition", "isBucket"))
expect_equal(collect(c)[[1]][[1]], "speed")
expect_error(listColumns("zxwtyswklpf", "default"),
paste("Table or view not found: spark_catalog.default.zxwtyswklpf"))
paste("[TABLE_OR_VIEW_NOT_FOUND]",
" The table or view `spark_catalog`.`default`.`zxwtyswklpf` cannot be found."))

f <- listFunctions()
expect_true(nrow(f) >= 200) # 250
Expand All @@ -4126,8 +4126,7 @@ test_that("catalog APIs, listTables, getTable, listColumns, listFunctions, funct
expect_equal(take(orderBy(filter(f, "className IS NOT NULL"), "className"), 1)$className,
"org.apache.spark.sql.catalyst.expressions.Abs")
expect_error(listFunctions("zxwtyswklpf_db"),
paste("Error in listFunctions : no such database - Database",
"'zxwtyswklpf_db' not found"))
paste("[SCHEMA_NOT_FOUND] The schema `zxwtyswklpf_db` cannot be found."))

expect_true(functionExists("abs"))
expect_false(functionExists("aabbss"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,12 @@ private[v2] trait V2JDBCNamespaceTest extends SharedSparkSession with DockerInte
}
assert(catalog.namespaceExists(Array("foo")) === false)
assert(catalog.listNamespaces() === builtinNamespaces)
val msg = intercept[AnalysisException] {
val e = intercept[AnalysisException] {
catalog.listNamespaces(Array("foo"))
}.getMessage
assert(msg.contains("Namespace 'foo' not found"))
}
checkError(e,
errorClass = "SCHEMA_NOT_FOUND",
parameters = Map("schema_name" -> "`foo`"))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ package org.apache.spark.sql.jdbc.v2
import org.apache.logging.log4j.Level

import org.apache.spark.sql.{AnalysisException, DataFrame}
import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException}
import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Sample}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.connector.catalog.{Catalogs, Identifier, TableCatalog}
import org.apache.spark.sql.connector.catalog.index.SupportsIndex
import org.apache.spark.sql.connector.expressions.aggregate.GeneralAggregateFunc
Expand Down Expand Up @@ -99,10 +100,12 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
assert(msg.contains("Cannot add column, because C3 already exists"))
}
// Add a column to not existing table
val msg = intercept[AnalysisException] {
val e = intercept[AnalysisException] {
sql(s"ALTER TABLE $catalogName.not_existing_table ADD COLUMNS (C4 STRING)")
}.getMessage
assert(msg.contains("Table not found"))
}
checkError(e,
errorClass = "TABLE_OR_VIEW_NOT_FOUND",
parameters = Map("relation_name" -> s"`$catalogName`.`not_existing_table`"))
}

test("SPARK-33034: ALTER TABLE ... drop column") {
Expand All @@ -120,10 +123,12 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
assert(msg.contains(s"Missing field bad_column in table $catalogName.alt_table"))
}
// Drop a column from a not existing table
val msg = intercept[AnalysisException] {
val e = intercept[AnalysisException] {
sql(s"ALTER TABLE $catalogName.not_existing_table DROP COLUMN C1")
}.getMessage
assert(msg.contains("Table not found"))
}
checkError(e,
errorClass = "TABLE_OR_VIEW_NOT_FOUND",
parameters = Map("relation_name" -> s"`$catalogName`.`not_existing_table`"))
}

test("SPARK-33034: ALTER TABLE ... update column type") {
Expand All @@ -136,10 +141,12 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
assert(msg2.contains("Missing field bad_column"))
}
// Update column type in not existing table
val msg = intercept[AnalysisException] {
val e = intercept[AnalysisException] {
sql(s"ALTER TABLE $catalogName.not_existing_table ALTER COLUMN id TYPE DOUBLE")
}.getMessage
assert(msg.contains("Table not found"))
}
checkError(e,
errorClass = "TABLE_OR_VIEW_NOT_FOUND",
parameters = Map("relation_name" -> s"`$catalogName`.`not_existing_table`"))
}

test("SPARK-33034: ALTER TABLE ... rename column") {
Expand All @@ -154,21 +161,27 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
assert(msg.contains("Cannot rename column, because ID2 already exists"))
}
// Rename a column in a not existing table
val msg = intercept[AnalysisException] {
val e = intercept[AnalysisException] {
sql(s"ALTER TABLE $catalogName.not_existing_table RENAME COLUMN ID TO C")
}.getMessage
assert(msg.contains("Table not found"))
}
checkError(e,
errorClass = "TABLE_OR_VIEW_NOT_FOUND",
parameters = Map("relation_name" ->
UnresolvedAttribute.parseAttributeName(s"$catalogName.not_existing_table")
.map(part => quoteIdentifier(part)).mkString(".")))
}

test("SPARK-33034: ALTER TABLE ... update column nullability") {
withTable(s"$catalogName.alt_table") {
testUpdateColumnNullability(s"$catalogName.alt_table")
}
// Update column nullability in not existing table
val msg = intercept[AnalysisException] {
val e = intercept[AnalysisException] {
sql(s"ALTER TABLE $catalogName.not_existing_table ALTER COLUMN ID DROP NOT NULL")
}.getMessage
assert(msg.contains("Table not found"))
}
checkError(e,
errorClass = "TABLE_OR_VIEW_NOT_FOUND",
parameters = Map("relation_name" -> s"`$catalogName`.`not_existing_table`"))
}

test("CREATE TABLE with table comment") {
Expand All @@ -187,10 +200,12 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu

test("CREATE TABLE with table property") {
withTable(s"$catalogName.new_table") {
val m = intercept[AnalysisException] {
val e = intercept[AnalysisException] {
sql(s"CREATE TABLE $catalogName.new_table (i INT) TBLPROPERTIES('a'='1')")
}.message
assert(m.contains("Failed table creation"))
}
checkError(e,
errorClass = "TABLE_OR_VIEW_ALREADY_EXISTS",
parameters = Map("relation_name" -> s"`$catalogName`.`new_table`"))
testCreateTableWithProperty(s"$catalogName.new_table")
}
}
Expand Down
92 changes: 92 additions & 0 deletions core/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,24 @@
}
}
},
"INDEX_ALREADY_EXISTS" : {
"message" : [
"Cannot create the index because it already exists. <message>."
],
"sqlState" : "42000"
},
"INDEX_NOT_FOUND" : {
"message" : [
"Cannot find the index. <message>."
],
"sqlState" : "42000"
},
"INDEX_OUT_OF_BOUNDS" : {
"message" : [
"Index <indexValue> must be between 0 and the length of the ArrayData."
],
"sqlState" : "22023"
},
"INTERNAL_ERROR" : {
"message" : [
"<message>"
Expand Down Expand Up @@ -400,6 +418,21 @@
],
"sqlState" : "42000"
},
"PARTITIONS_ALREADY_EXIST" : {
"message" : [
"Cannot ADD or RENAME TO partition(s) <partition_list> in table <table_name> because they already exist.",
"Choose a different name, drop the existing partition, or add the IF NOT EXISTS clause to tolerate a pre-existing partition."
],
"sqlState" : "42000"
},
"PARTITIONS_NOT_FOUND" : {
"message" : [
"The partition(s) <partition_list> cannot be found in table <table_name>.",
"Verify the partition specification and table name.",
"To tolerate the error on drop use ALTER TABLE … DROP IF EXISTS PARTITION."
],
"sqlState" : "42000"
},
"PIVOT_VALUE_DATA_TYPE_MISMATCH" : {
"message" : [
"Invalid pivot value '<value>': value data type <valueType> does not match pivot column data type <pivotType>"
Expand All @@ -417,12 +450,71 @@
"Failed to set original permission <permission> back to the created path: <path>. Exception: <message>"
]
},
"ROUTINE_ALREADY_EXISTS" : {
"message" : [
"Cannot create the function <routine_name> because it already exists.",
"Choose a different name, drop or replace the existing function, or add the IF NOT EXISTS clause to tolerate a pre-existing function."
],
"sqlState" : "42000"
},
"ROUTINE_NOT_FOUND" : {
"message" : [
"The function <routine_name> cannot be found. Verify the spelling and correctness of the schema and catalog.",
"If you did not qualify the name with a schema and catalog, verify the current_schema() output, or qualify the name with the correct schema and catalog.",
"To tolerate the error on drop use DROP FUNCTION IF EXISTS."
],
"sqlState" : "42000"
},
"SCHEMA_ALREADY_EXISTS" : {
"message" : [
"Cannot create schema <schema_name> because it already exists.",
"Choose a different name, drop the existing schema, or add the IF NOT EXISTS clause to tolerate pre-existing schema."
],
"sqlState" : "42000"
},
"SCHEMA_NOT_EMPTY" : {
"message" : [
"Cannot drop a schema <schema_name> because it contains objects.",
"Use DROP SCHEMA ... CASCADE to drop the schema and all its objects."
],
"sqlState" : "42000"
},
"SCHEMA_NOT_FOUND" : {
"message" : [
"The schema <schema_name> cannot be found. Verify the spelling and correctness of the schema and catalog.",
"If you did not qualify the name with a catalog, verify the current_schema() output, or qualify the name with the correct catalog.",
"To tolerate the error on drop use DROP SCHEMA IF EXISTS."
],
"sqlState" : "42000"
},
"SECOND_FUNCTION_ARGUMENT_NOT_INTEGER" : {
"message" : [
"The second argument of <functionName> function needs to be an integer."
],
"sqlState" : "22023"
},
"TABLE_OR_VIEW_ALREADY_EXISTS" : {
"message" : [
"Cannot create table or view <relation_name> because it already exists.",
"Choose a different name, drop or replace the existing object, or add the IF NOT EXISTS clause to tolerate pre-existing objects."
],
"sqlState" : "42000"
},
"TABLE_OR_VIEW_NOT_FOUND" : {
"message" : [
"The table or view <relation_name> cannot be found. Verify the spelling and correctness of the schema and catalog.",
"If you did not qualify the name with a schema, verify the current_schema() output, or qualify the name with the correct schema and catalog.",
"To tolerate the error on drop use DROP VIEW IF EXISTS or DROP TABLE IF EXISTS."
],
"sqlState" : "42000"
},
"TEMP_TABLE_OR_VIEW_ALREADY_EXISTS" : {
"message" : [
"Cannot create the temporary view <relation_name> because it already exists.",
"Choose a different name, drop or replace the existing view, or add the IF NOT EXISTS clause to tolerate pre-existing views."
],
"sqlState" : "42000"
},
"TOO_MANY_ARRAY_ELEMENTS" : {
"message" : [
"Cannot initialize array with <numElements> elements of size <size>"
Expand Down
24 changes: 16 additions & 8 deletions core/src/test/scala/org/apache/spark/SparkFunSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -321,14 +321,22 @@ abstract class SparkFunSuite
} else {
assert(expectedParameters === parameters)
}
val actualQueryContext = exception.getQueryContext()
assert(actualQueryContext.length === queryContext.length, "Invalid length of the query context")
actualQueryContext.zip(queryContext).foreach { case (actual, expected) =>
assert(actual.objectType() === expected.objectType(), "Invalid objectType of a query context")
assert(actual.objectName() === expected.objectName(), "Invalid objectName of a query context")
assert(actual.startIndex() === expected.startIndex(), "Invalid startIndex of a query context")
assert(actual.stopIndex() === expected.stopIndex(), "Invalid stopIndex of a query context")
assert(actual.fragment() === expected.fragment(), "Invalid fragment of a query context")
if (!queryContext.isEmpty) {
val actualQueryContext = exception.getQueryContext()
assert(actualQueryContext.length === queryContext.length,
"Invalid length of the query context")
actualQueryContext.zip(queryContext).foreach { case (actual, expected) =>
assert(actual.objectType() === expected.objectType(),
"Invalid objectType of a query context")
assert(actual.objectName() === expected.objectName(),
"Invalid objectName of a query context")
assert(actual.startIndex() === expected.startIndex(),
"Invalid startIndex of a query context")
assert(actual.stopIndex() === expected.stopIndex(),
"Invalid stopIndex of a query context")
assert(actual.fragment() === expected.fragment(),
"Invalid fragment of a query context")
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ default void createPartition(
try {
createPartitions(new InternalRow[]{ident}, new Map[]{properties});
} catch (PartitionsAlreadyExistException e) {
throw new PartitionAlreadyExistsException(e.getMessage());
throw new PartitionAlreadyExistsException("PARTITIONS_ALREADY_EXIST",
e.messageParameters());
}
}

Expand Down
Loading

0 comments on commit ac0fcf7

Please sign in to comment.