Skip to content

Commit

Permalink
[SPARK-37496][SQL] Migrate ReplaceTableAsSelectStatement to v2 command
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
This PR migrates `ReplaceTableAsSelectStatement` to the v2 command

### Why are the changes needed?
Migrate to the standard V2 framework

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
existing tests

Closes #34754 from huaxingao/replace_table.

Authored-by: Huaxin Gao <[email protected]>
Signed-off-by: Huaxin Gao <[email protected]>
  • Loading branch information
a0x8o committed Dec 2, 2021
1 parent d852b72 commit 2ab523c
Show file tree
Hide file tree
Showing 232 changed files with 5,000 additions and 1,821 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -463,14 +463,14 @@ object ResourceProfile extends Logging {
case ResourceProfile.CORES =>
cores = execReq.amount.toInt
case rName =>
val nameToUse = resourceMappings.get(rName).getOrElse(rName)
val nameToUse = resourceMappings.getOrElse(rName, rName)
customResources(nameToUse) = execReq
}
}
customResources.toMap
} else {
defaultResources.customResources.map { case (rName, execReq) =>
val nameToUse = resourceMappings.get(rName).getOrElse(rName)
val nameToUse = resourceMappings.getOrElse(rName, rName)
(nameToUse, execReq)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@ private[spark] class ResourceProfileManager(sparkConf: SparkConf,
private val notRunningUnitTests = !isTesting
private val testExceptionThrown = sparkConf.get(RESOURCE_PROFILE_MANAGER_TESTING)

// If we use anything except the default profile, its only supported on YARN right now.
// Throw an exception if not supported.
/**
* If we use anything except the default profile, it's only supported on YARN and Kubernetes
* with dynamic allocation enabled. Throw an exception if not supported.
*/
private[spark] def isSupported(rp: ResourceProfile): Boolean = {
val isNotDefaultProfile = rp.id != ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
val notYarnOrK8sAndNotDefaultProfile = isNotDefaultProfile && !(isYarn || isK8s)
Expand Down Expand Up @@ -103,7 +105,7 @@ private[spark] class ResourceProfileManager(sparkConf: SparkConf,
def resourceProfileFromId(rpId: Int): ResourceProfile = {
readLock.lock()
try {
resourceProfileIdToResourceProfile.get(rpId).getOrElse(
resourceProfileIdToResourceProfile.getOrElse(rpId,
throw new SparkException(s"ResourceProfileId $rpId not found!")
)
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private[spark] class FetchFailedException(
// which intercepts this exception (possibly wrapping it), the Executor can still tell there was
// a fetch failure, and send the correct error msg back to the driver. We wrap with an Option
// because the TaskContext is not defined in some test cases.
Option(TaskContext.get()).map(_.setFetchFailed(this))
Option(TaskContext.get()).foreach(_.setFetchFailed(this))

def toTaskFailedReason: TaskFailedReason = FetchFailed(
bmAddress, shuffleId, mapId, mapIndex, reduceId, Utils.exceptionString(this))
Expand Down
16 changes: 11 additions & 5 deletions core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP, STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH}
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcTimeout}
import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleBlockInfo}
import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
Expand Down Expand Up @@ -60,15 +61,17 @@ private[storage] class FallbackStorage(conf: SparkConf) extends Logging {
val indexFile = r.getIndexFile(shuffleId, mapId)

if (indexFile.exists()) {
val hash = JavaUtils.nonNegativeHash(indexFile.getName)
fallbackFileSystem.copyFromLocalFile(
new Path(indexFile.getAbsolutePath),
new Path(fallbackPath, s"$appId/$shuffleId/${indexFile.getName}"))
new Path(fallbackPath, s"$appId/$shuffleId/$hash/${indexFile.getName}"))

val dataFile = r.getDataFile(shuffleId, mapId)
if (dataFile.exists()) {
val hash = JavaUtils.nonNegativeHash(dataFile.getName)
fallbackFileSystem.copyFromLocalFile(
new Path(dataFile.getAbsolutePath),
new Path(fallbackPath, s"$appId/$shuffleId/${dataFile.getName}"))
new Path(fallbackPath, s"$appId/$shuffleId/$hash/${dataFile.getName}"))
}

// Report block statuses
Expand All @@ -86,7 +89,8 @@ private[storage] class FallbackStorage(conf: SparkConf) extends Logging {
}

def exists(shuffleId: Int, filename: String): Boolean = {
fallbackFileSystem.exists(new Path(fallbackPath, s"$appId/$shuffleId/$filename"))
val hash = JavaUtils.nonNegativeHash(filename)
fallbackFileSystem.exists(new Path(fallbackPath, s"$appId/$shuffleId/$hash/$filename"))
}
}

Expand Down Expand Up @@ -168,7 +172,8 @@ private[spark] object FallbackStorage extends Logging {
}

val name = ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID).name
val indexFile = new Path(fallbackPath, s"$appId/$shuffleId/$name")
val hash = JavaUtils.nonNegativeHash(name)
val indexFile = new Path(fallbackPath, s"$appId/$shuffleId/$hash/$name")
val start = startReduceId * 8L
val end = endReduceId * 8L
Utils.tryWithResource(fallbackFileSystem.open(indexFile)) { inputStream =>
Expand All @@ -178,7 +183,8 @@ private[spark] object FallbackStorage extends Logging {
index.skip(end - (start + 8L))
val nextOffset = index.readLong()
val name = ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID).name
val dataFile = new Path(fallbackPath, s"$appId/$shuffleId/$name")
val hash = JavaUtils.nonNegativeHash(name)
val dataFile = new Path(fallbackPath, s"$appId/$shuffleId/$hash/$name")
val f = fallbackFileSystem.open(dataFile)
val size = nextOffset - offset
logDebug(s"To byte array $size")
Expand Down
8 changes: 3 additions & 5 deletions core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,9 @@ import org.apache.spark.util.collection.OpenHashSet

/**
* A trait that allows a class to give [[SizeEstimator]] more accurate size estimation.
* When a class extends it, [[SizeEstimator]] will query the `estimatedSize` first.
* If `estimatedSize` does not return `None`, [[SizeEstimator]] will use the returned size
* as the size of the object. Otherwise, [[SizeEstimator]] will do the estimation work.
* The difference between a [[KnownSizeEstimation]] and
* [[org.apache.spark.util.collection.SizeTracker]] is that, a
* When a class extends it, [[SizeEstimator]] will query the `estimatedSize`, and use
* the returned size as the size of the object. The difference between a [[KnownSizeEstimation]]
* and [[org.apache.spark.util.collection.SizeTracker]] is that, a
* [[org.apache.spark.util.collection.SizeTracker]] still uses [[SizeEstimator]] to
* estimate the size. However, a [[KnownSizeEstimation]] can provide a better estimation without
* using [[SizeEstimator]].
Expand Down
2 changes: 1 addition & 1 deletion dev/create-release/release-build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ SCALA_2_12_PROFILES="-Pscala-2.12"
HIVE_PROFILES="-Phive -Phive-thriftserver"
# Profiles for publishing snapshots and release to Maven Central
# We use Apache Hive 2.3 for publishing
PUBLISH_PROFILES="$BASE_PROFILES $HIVE_PROFILES -Phive-2.3 -Pspark-ganglia-lgpl -Pkinesis-asl -Phadoop-cloud"
PUBLISH_PROFILES="$BASE_PROFILES $HIVE_PROFILES -Pspark-ganglia-lgpl -Pkinesis-asl -Phadoop-cloud"
# Profiles for building binary releases
BASE_RELEASE_PROFILES="$BASE_PROFILES -Psparkr"

Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.7-hive-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ cats-kernel_2.12/2.1.1//cats-kernel_2.12-2.1.1.jar
chill-java/0.10.0//chill-java-0.10.0.jar
chill_2.12/0.10.0//chill_2.12-0.10.0.jar
commons-beanutils/1.9.4//commons-beanutils-1.9.4.jar
commons-cli/1.2//commons-cli-1.2.jar
commons-cli/1.5.0//commons-cli-1.5.0.jar
commons-codec/1.15//commons-codec-1.15.jar
commons-collections/3.2.2//commons-collections-3.2.2.jar
commons-compiler/3.0.16//commons-compiler-3.0.16.jar
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-3.2-hive-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ breeze_2.12/1.2//breeze_2.12-1.2.jar
cats-kernel_2.12/2.1.1//cats-kernel_2.12-2.1.1.jar
chill-java/0.10.0//chill-java-0.10.0.jar
chill_2.12/0.10.0//chill_2.12-0.10.0.jar
commons-cli/1.2//commons-cli-1.2.jar
commons-cli/1.5.0//commons-cli-1.5.0.jar
commons-codec/1.15//commons-codec-1.15.jar
commons-collections/3.2.2//commons-collections-3.2.2.jar
commons-compiler/3.0.16//commons-compiler-3.0.16.jar
Expand Down
6 changes: 3 additions & 3 deletions dev/github_jira_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ def get_jira_prs():
page_json = get_json(page)

for pull in page_json:
jiras = re.findall(JIRA_PROJECT_NAME + "-[0-9]{4,5}", pull['title'])
for jira in jiras:
result = result + [(jira, pull)]
jira_issues = re.findall(JIRA_PROJECT_NAME + "-[0-9]{4,5}", pull['title'])
for jira_issue in jira_issues:
result = result + [(jira_issue, pull)]

# Check if there is another page
link_headers = list(filter(lambda k: k.startswith("Link"), page.headers))
Expand Down
66 changes: 1 addition & 65 deletions dev/lint-python
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ FLAKE8_BUILD="flake8"
MINIMUM_FLAKE8="3.9.0"
MINIMUM_MYPY="0.910"
MYPY_BUILD="mypy"
PYCODESTYLE_BUILD="pycodestyle"
MINIMUM_PYCODESTYLE="2.7.0"
PYTEST_BUILD="pytest"

PYTHON_EXECUTABLE="${PYTHON_EXECUTABLE:-python3}"
Expand Down Expand Up @@ -64,66 +62,6 @@ function compile_python_test {
fi
}

function pycodestyle_test {
local PYCODESTYLE_STATUS=
local PYCODESTYLE_REPORT=
local RUN_LOCAL_PYCODESTYLE=
local PYCODESTYLE_VERSION=
local EXPECTED_PYCODESTYLE=
local PYCODESTYLE_SCRIPT_PATH="$SPARK_ROOT_DIR/dev/pycodestyle-$MINIMUM_PYCODESTYLE.py"
local PYCODESTYLE_SCRIPT_REMOTE_PATH="https://raw.githubusercontent.com/PyCQA/pycodestyle/$MINIMUM_PYCODESTYLE/pycodestyle.py"

if [[ ! "$1" ]]; then
echo "No python files found! Something is very wrong -- exiting."
exit 1;
fi

# check for locally installed pycodestyle & version
RUN_LOCAL_PYCODESTYLE="False"
if hash "$PYCODESTYLE_BUILD" 2> /dev/null; then
PYCODESTYLE_VERSION="$($PYCODESTYLE_BUILD --version)"
EXPECTED_PYCODESTYLE="$(satisfies_min_version $PYCODESTYLE_VERSION $MINIMUM_PYCODESTYLE)"
if [ "$EXPECTED_PYCODESTYLE" == "True" ]; then
RUN_LOCAL_PYCODESTYLE="True"
fi
fi

# download the right version or run locally
if [ $RUN_LOCAL_PYCODESTYLE == "False" ]; then
# Get pycodestyle at runtime so that we don't rely on it being installed on the build server.
# See: https://github.com/apache/spark/pull/1744#issuecomment-50982162
# Updated to the latest official version of pep8. pep8 is formally renamed to pycodestyle.
echo "downloading pycodestyle from $PYCODESTYLE_SCRIPT_REMOTE_PATH..."
if [ ! -e "$PYCODESTYLE_SCRIPT_PATH" ]; then
curl --silent -o "$PYCODESTYLE_SCRIPT_PATH" "$PYCODESTYLE_SCRIPT_REMOTE_PATH"
local curl_status="$?"

if [ "$curl_status" -ne 0 ]; then
echo "Failed to download pycodestyle.py from $PYCODESTYLE_SCRIPT_REMOTE_PATH"
exit "$curl_status"
fi
fi

echo "starting pycodestyle test..."
PYCODESTYLE_REPORT=$( ("$PYTHON_EXECUTABLE" "$PYCODESTYLE_SCRIPT_PATH" --config=dev/tox.ini $1) 2>&1)
PYCODESTYLE_STATUS=$?
else
# we have the right version installed, so run locally
echo "starting pycodestyle test..."
PYCODESTYLE_REPORT=$( ($PYCODESTYLE_BUILD --config=dev/tox.ini $1) 2>&1)
PYCODESTYLE_STATUS=$?
fi

if [ $PYCODESTYLE_STATUS -ne 0 ]; then
echo "pycodestyle checks failed:"
echo "$PYCODESTYLE_REPORT"
exit "$PYCODESTYLE_STATUS"
else
echo "pycodestyle checks passed."
echo
fi
}


function mypy_annotation_test {
local MYPY_REPORT=
Expand Down Expand Up @@ -292,12 +230,10 @@ SPARK_ROOT_DIR="$(dirname "${SCRIPT_DIR}")"

pushd "$SPARK_ROOT_DIR" &> /dev/null

# skipping local ruby bundle directory from the search
PYTHON_SOURCE="$(find . -path ./docs/.local_ruby_bundle -prune -false -o -name "*.py")"
PYTHON_SOURCE="$(git ls-files '*.py')"

compile_python_test "$PYTHON_SOURCE"
black_test
pycodestyle_test "$PYTHON_SOURCE"
flake8_test
mypy_test

Expand Down
3 changes: 2 additions & 1 deletion dev/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ coverage

# Linter
mypy
flake8
git+https://github.com/typeddjango/pytest-mypy-plugins.git@b0020061f48e85743ee3335bd62a3a608d17c6bd
flake8==3.9.0

# Documentation (SQL)
mkdocs
Expand Down
3 changes: 0 additions & 3 deletions dev/run-tests-jenkins.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,6 @@ def main():
os.environ["AMPLAB_JENKINS_BUILD_PROFILE"] = "hadoop2.7"
if "test-hadoop3.2" in ghprb_pull_title:
os.environ["AMPLAB_JENKINS_BUILD_PROFILE"] = "hadoop3.2"
# Switch the Hive profile based on the PR title:
if "test-hive2.3" in ghprb_pull_title:
os.environ["AMPLAB_JENKINS_BUILD_HIVE_PROFILE"] = "hive2.3"
# Switch the Scala profile based on the PR title:
if "test-scala2.13" in ghprb_pull_title:
os.environ["AMPLAB_JENKINS_BUILD_SCALA_PROFILE"] = "scala2.13"
Expand Down
23 changes: 1 addition & 22 deletions dev/run-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,24 +345,6 @@ def get_hadoop_profiles(hadoop_version):
sys.exit(int(os.environ.get("CURRENT_BLOCK", 255)))


def get_hive_profiles(hive_version):
"""
For the given Hive version tag, return a list of Maven/SBT profile flags for
building and testing against that Hive version.
"""

sbt_maven_hive_profiles = {
"hive2.3": ["-Phive-2.3"],
}

if hive_version in sbt_maven_hive_profiles:
return sbt_maven_hive_profiles[hive_version]
else:
print("[error] Could not find", hive_version, "in the list. Valid options",
" are", sbt_maven_hive_profiles.keys())
sys.exit(int(os.environ.get("CURRENT_BLOCK", 255)))


def build_spark_maven(extra_profiles):
# Enable all of the profiles for the build:
build_profiles = extra_profiles + modules.root.build_profile_flags
Expand Down Expand Up @@ -616,7 +598,6 @@ def main():
build_tool = os.environ.get("AMPLAB_JENKINS_BUILD_TOOL", "sbt")
scala_version = os.environ.get("AMPLAB_JENKINS_BUILD_SCALA_PROFILE")
hadoop_version = os.environ.get("AMPLAB_JENKINS_BUILD_PROFILE", "hadoop3.2")
hive_version = os.environ.get("AMPLAB_JENKINS_BUILD_HIVE_PROFILE", "hive2.3")
test_env = "amplab_jenkins"
# add path for Python3 in Jenkins if we're calling from a Jenkins machine
# TODO(sknapp): after all builds are ported to the ubuntu workers, change this to be:
Expand All @@ -627,14 +608,12 @@ def main():
build_tool = "sbt"
scala_version = os.environ.get("SCALA_PROFILE")
hadoop_version = os.environ.get("HADOOP_PROFILE", "hadoop3.2")
hive_version = os.environ.get("HIVE_PROFILE", "hive2.3")
if "GITHUB_ACTIONS" in os.environ:
test_env = "github_actions"
else:
test_env = "local"

extra_profiles = get_hadoop_profiles(hadoop_version) + get_hive_profiles(hive_version) + \
get_scala_profiles(scala_version)
extra_profiles = get_hadoop_profiles(hadoop_version) + get_scala_profiles(scala_version)

print("[info] Using build tool", build_tool, "with profiles",
*(extra_profiles + ["under environment", test_env]))
Expand Down
2 changes: 2 additions & 0 deletions dev/sparktestsupport/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,7 @@ def __hash__(self):
"pyspark.sql.tests.test_streaming",
"pyspark.sql.tests.test_types",
"pyspark.sql.tests.test_udf",
"pyspark.sql.tests.test_udf_profiler",
"pyspark.sql.tests.test_utils",
]
)
Expand Down Expand Up @@ -606,6 +607,7 @@ def __hash__(self):
"pyspark.pandas.namespace",
"pyspark.pandas.numpy_compat",
"pyspark.pandas.sql_processor",
"pyspark.pandas.sql_formatter",
"pyspark.pandas.strings",
"pyspark.pandas.utils",
"pyspark.pandas.window",
Expand Down
8 changes: 3 additions & 5 deletions dev/test-dependencies.sh
Original file line number Diff line number Diff line change
Expand Up @@ -86,20 +86,18 @@ $MVN -q versions:set -DnewVersion=$TEMP_VERSION -DgenerateBackupPoms=false > /de
for HADOOP_HIVE_PROFILE in "${HADOOP_HIVE_PROFILES[@]}"; do
if [[ $HADOOP_HIVE_PROFILE == **hadoop-3.2-hive-2.3** ]]; then
HADOOP_PROFILE=hadoop-3.2
HIVE_PROFILE=hive-2.3
else
HADOOP_PROFILE=hadoop-2.7
HIVE_PROFILE=hive-2.3
fi
echo "Performing Maven install for $HADOOP_HIVE_PROFILE"
$MVN $HADOOP_MODULE_PROFILES -P$HADOOP_PROFILE -P$HIVE_PROFILE jar:jar jar:test-jar install:install clean -q
$MVN $HADOOP_MODULE_PROFILES -P$HADOOP_PROFILE jar:jar jar:test-jar install:install clean -q

echo "Performing Maven validate for $HADOOP_HIVE_PROFILE"
$MVN $HADOOP_MODULE_PROFILES -P$HADOOP_PROFILE -P$HIVE_PROFILE validate -q
$MVN $HADOOP_MODULE_PROFILES -P$HADOOP_PROFILE validate -q

echo "Generating dependency manifest for $HADOOP_HIVE_PROFILE"
mkdir -p dev/pr-deps
$MVN $HADOOP_MODULE_PROFILES -P$HADOOP_PROFILE -P$HIVE_PROFILE dependency:build-classpath -pl assembly -am \
$MVN $HADOOP_MODULE_PROFILES -P$HADOOP_PROFILE dependency:build-classpath -pl assembly -am \
| grep "Dependencies classpath:" -A 1 \
| tail -n 1 | tr ":" "\n" | awk -F '/' '{
# For each dependency classpath, we fetch the last three parts split by "/": artifact id, version, and jar name.
Expand Down
Loading

0 comments on commit 2ab523c

Please sign in to comment.