Skip to content

Commit

Permalink
merge from branch-22.04
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Mar 8, 2022
2 parents 8b6796f + 9f727bd commit 53a4083
Show file tree
Hide file tree
Showing 289 changed files with 1,255 additions and 911 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
* Copyright (c) 2020-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,7 @@ import scala.reflect.api
import scala.reflect.runtime.universe._

import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.shims.SparkShimImpl

import org.apache.spark.internal.Logging

Expand Down Expand Up @@ -70,7 +71,7 @@ object ApiValidation extends Logging {
var printNewline = false

val sparkToShimMap = Map("3.0.1" -> "spark301", "3.1.1" -> "spark311")
val sparkVersion = ShimLoader.getSparkShims.getSparkShimVersion.toString
val sparkVersion = SparkShimImpl.getSparkShimVersion.toString
val shimVersion = sparkToShimMap(sparkVersion)

gpuKeys.foreach { e =>
Expand Down
2 changes: 1 addition & 1 deletion docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ Name | Description | Default Value
<a name="memory.gpu.maxAllocFraction"></a>spark.rapids.memory.gpu.maxAllocFraction|The fraction of total GPU memory that limits the maximum size of the RMM pool. The value must be greater than or equal to the setting for spark.rapids.memory.gpu.allocFraction. Note that this limit will be reduced by the reserve memory configured in spark.rapids.memory.gpu.reserve.|1.0
<a name="memory.gpu.minAllocFraction"></a>spark.rapids.memory.gpu.minAllocFraction|The fraction of total GPU memory that limits the minimum size of the RMM pool. The value must be less than or equal to the setting for spark.rapids.memory.gpu.allocFraction.|0.25
<a name="memory.gpu.oomDumpDir"></a>spark.rapids.memory.gpu.oomDumpDir|The path to a local directory where a heap dump will be created if the GPU encounters an unrecoverable out-of-memory (OOM) error. The filename will be of the form: "gpu-oom-<pid>.hprof" where <pid> is the process ID.|None
<a name="memory.gpu.pool"></a>spark.rapids.memory.gpu.pool|Select the RMM pooling allocator to use. Valid values are "DEFAULT", "ARENA", "ASYNC", and "NONE". With "DEFAULT", the RMM pool allocator is used; with "ARENA", the RMM arena allocator is used; with "ASYNC", the new CUDA stream-ordered memory allocator in CUDA 11.2+ is used. If set to "NONE", pooling is disabled and RMM just passes through to CUDA memory allocation directly.|ASYNC
<a name="memory.gpu.pool"></a>spark.rapids.memory.gpu.pool|Select the RMM pooling allocator to use. Valid values are "DEFAULT", "ARENA", "ASYNC", and "NONE". With "DEFAULT", the RMM pool allocator is used; with "ARENA", the RMM arena allocator is used; with "ASYNC", the new CUDA stream-ordered memory allocator in CUDA 11.2+ is used. If set to "NONE", pooling is disabled and RMM just passes through to CUDA memory allocation directly.|ARENA
<a name="memory.gpu.pooling.enabled"></a>spark.rapids.memory.gpu.pooling.enabled|Should RMM act as a pooling allocator for GPU memory, or should it just pass through to CUDA memory allocation directly. DEPRECATED: please use spark.rapids.memory.gpu.pool instead.|true
<a name="memory.gpu.reserve"></a>spark.rapids.memory.gpu.reserve|The amount of GPU memory that should remain unallocated by RMM and left for system use such as memory needed for kernels and kernel launches.|671088640
<a name="memory.gpu.unspill.enabled"></a>spark.rapids.memory.gpu.unspill.enabled|When a spilled GPU buffer is needed again, should it be unspilled, or only copied back into GPU memory temporarily. Unspilling may be useful for GPU buffers that are needed frequently, for example, broadcast variables; however, it may also increase GPU memory usage|false
Expand Down
6 changes: 3 additions & 3 deletions docs/dev/shims.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ In the following we provide recipes for typical scenarios addressed by the Shim
It's among the easiest issues to resolve. We define a method in SparkShims
trait covering a superset of parameters from all versions and call it
```
ShimLoader.getSparkShims.methodWithDiscrepancies(p_1, ..., p_n)
SparkShimImpl.methodWithDiscrepancies(p_1, ..., p_n)
```
instead of referencing it directly. Shim implementations are in charge of dispatching it further
instead of referencing it directly. Shim implementations (SparkShimImpl) are in charge of dispatching it further
to correct version-dependent methods. Moreover, unlike in the below sections
conflicts between versions are easily avoided by using different package or class names
for conflicting Shim implementations.
Expand All @@ -40,7 +40,7 @@ Upstream base classes we derive from might be incompatible in the sense that one
requires us to implement/override the method `M` whereas the other prohibits it by marking
the base implementation `final`, E.g. `org.apache.spark.sql.catalyst.trees.TreeNode` changes
between Spark 3.1.x and Spark 3.2.x. So instead of deriving from such classes directly we
inject an intermediate trait e.g. `com.nvidia.spark.rapids.shims.v2.ShimExpression` that
inject an intermediate trait e.g. `com.nvidia.spark.rapids.shims.ShimExpression` that
has a varying source code depending on the Spark version we compile against to overcome this
issue as you can see e.g., comparing TreeNode:
1. [ShimExpression For 3.0.x and 3.1.x](https://github.com/NVIDIA/spark-rapids/blob/main/sql-plugin/src/main/post320-treenode/scala/com/nvidia/spark/rapids/shims/v2/TreeNode.scala#L23)
Expand Down
7 changes: 5 additions & 2 deletions integration_tests/src/main/python/asserts.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2021, NVIDIA CORPORATION.
# Copyright (c) 2020-2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -13,7 +13,7 @@
# limitations under the License.

from conftest import is_incompat, should_sort_on_spark, should_sort_locally, get_float_check, get_limit, spark_jvm
from datetime import date, datetime
from datetime import date, datetime, timedelta
from decimal import Decimal
import math
from pyspark.sql import Row
Expand Down Expand Up @@ -92,6 +92,9 @@ def _assert_equal(cpu, gpu, float_check, path):
assert cpu == gpu, "GPU and CPU decimal values are different at {}".format(path)
elif isinstance(cpu, bytearray):
assert cpu == gpu, "GPU and CPU bytearray values are different at {}".format(path)
elif isinstance(cpu, timedelta):
# Used by interval type DayTimeInterval for Pyspark 3.3.0+
assert cpu == gpu, "GPU and CPU timedelta values are different at {}".format(path)
elif (cpu == None):
assert cpu == gpu, "GPU and CPU are not both null at {}".format(path)
else:
Expand Down
3 changes: 1 addition & 2 deletions integration_tests/src/main/python/cache_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@
enable_vectorized_confs = [{"spark.sql.inMemoryColumnarStorage.enableVectorizedReader": "true"},
{"spark.sql.inMemoryColumnarStorage.enableVectorizedReader": "false"}]

# cache does not work with 128-bit decimals, see https://github.com/NVIDIA/spark-rapids/issues/4826
_cache_decimal_gens = [decimal_gen_32bit, decimal_gen_64bit]
_cache_decimal_gens = [decimal_gen_32bit, decimal_gen_64bit, decimal_gen_128bit]
_cache_single_array_gens_no_null = [ArrayGen(gen) for gen in all_basic_gens_no_null + _cache_decimal_gens]

decimal_struct_gen= StructGen([['child0', sub_gen] for ind, sub_gen in enumerate(_cache_decimal_gens)])
Expand Down
27 changes: 27 additions & 0 deletions integration_tests/src/main/python/data_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,33 @@ def make_null():
return None
self._start(rand, make_null)

# DayTimeIntervalGen is for Spark 3.3.0+
# DayTimeIntervalType(startField, endField): Represents a day-time interval which is made up of a contiguous subset of the following fields:
# SECOND, seconds within minutes and possibly fractions of a second [0..59.999999],
# MINUTE, minutes within hours [0..59],
# HOUR, hours within days [0..23],
# DAY, days in the range [0..106751991].
# For more details: https://spark.apache.org/docs/latest/sql-ref-datatypes.html
# Note: 106751991/365 = 292471 years which is much bigger than 9999 year, seems something is wrong
class DayTimeIntervalGen(DataGen):
"""Generate DayTimeIntervalType values"""
def __init__(self, max_days = None, nullable=True, special_cases =[timedelta(seconds = 0)]):
super().__init__(DayTimeIntervalType(), nullable=nullable, special_cases=special_cases)
if max_days is None:
self._max_days = 106751991
else:
self._max_days = max_days
def start(self, rand):
self._start(rand,
lambda : timedelta(
microseconds = rand.randint(0, 999999),
seconds = rand.randint(0, 59),
minutes = rand.randint(0, 59),
hours = rand.randint(0, 23),
days = rand.randint(0, self._max_days),
)
)

def skip_if_not_utc():
if (not is_tz_utc()):
skip_unless_precommit_tests('The java system time zone is not set to UTC')
Expand Down
14 changes: 12 additions & 2 deletions integration_tests/src/main/python/date_time_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2021, NVIDIA CORPORATION.
# Copyright (c) 2020-2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -18,7 +18,7 @@
from datetime import date, datetime, timezone
from marks import incompat, allow_non_gpu
from pyspark.sql.types import *
from spark_session import with_spark_session, is_before_spark_311
from spark_session import with_spark_session, is_before_spark_311, is_before_spark_330
import pyspark.sql.functions as f

# We only support literal intervals for TimeSub
Expand All @@ -41,6 +41,16 @@ def test_timeadd(data_gen):
lambda spark: unary_op_df(spark, TimestampGen(start=datetime(5, 1, 1, tzinfo=timezone.utc), end=datetime(15, 1, 1, tzinfo=timezone.utc)), seed=1)
.selectExpr("a + (interval {} days {} seconds)".format(days, seconds)))

@pytest.mark.skipif(is_before_spark_330(), reason='DayTimeInterval is not supported before Pyspark 3.3.0')
def test_timeadd_daytime_column():
gen_list = [
# timestamp column max year is 1000
('t', TimestampGen(end = datetime(1000, 1, 1, tzinfo=timezone.utc))),
# max days is 8000 year, so added result will not be out of range
('d', DayTimeIntervalGen(max_days = 8000 * 365))]
assert_gpu_and_cpu_are_equal_collect(
lambda spark: gen_df(spark, gen_list).selectExpr("t + d", "t + INTERVAL '1 02:03:04' DAY TO SECOND"))

@pytest.mark.parametrize('data_gen', vals, ids=idfn)
def test_dateaddinterval(data_gen):
days, seconds = data_gen
Expand Down
18 changes: 18 additions & 0 deletions integration_tests/src/main/python/parquet_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -789,3 +789,21 @@ def test_parquet_read_field_id(spark_tmp_path):
lambda spark: spark.read.schema(readSchema).parquet(data_path),
'FileSourceScanExec',
{"spark.sql.parquet.fieldId.read.enabled": "true"}) # default is false

@pytest.mark.skipif(is_before_spark_330(), reason='DayTimeInterval is not supported before Pyspark 3.3.0')
def test_parquet_read_daytime_interval_cpu_file(spark_tmp_path):
data_path = spark_tmp_path + '/PARQUET_DATA'
gen_list = [('_c1', DayTimeIntervalGen())]
# write DayTimeInterval with CPU
with_cpu_session(lambda spark :gen_df(spark, gen_list).coalesce(1).write.mode("overwrite").parquet(data_path))
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.read.parquet(data_path))

@pytest.mark.skipif(is_before_spark_330(), reason='DayTimeInterval is not supported before Pyspark 3.3.0')
def test_parquet_read_daytime_interval_gpu_file(spark_tmp_path):
data_path = spark_tmp_path + '/PARQUET_DATA'
gen_list = [('_c1', DayTimeIntervalGen())]
# write DayTimeInterval with GPU
with_gpu_session(lambda spark :gen_df(spark, gen_list).coalesce(1).write.mode("overwrite").parquet(data_path))
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.read.parquet(data_path))
11 changes: 11 additions & 0 deletions integration_tests/src/main/python/parquet_write_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,3 +418,14 @@ def test_parquet_write_field_id(spark_tmp_path):
data_path,
'DataWritingCommandExec',
conf = {"spark.sql.parquet.fieldId.write.enabled" : "true"}) # default is true

@pytest.mark.order(1) # at the head of xdist worker queue if pytest-order is installed
@pytest.mark.skipif(is_before_spark_330(), reason='DayTimeInterval is not supported before Pyspark 3.3.0')
def test_write_daytime_interval(spark_tmp_path):
gen_list = [('_c1', DayTimeIntervalGen())]
data_path = spark_tmp_path + '/PARQUET_DATA'
assert_gpu_and_cpu_writes_are_equal_collect(
lambda spark, path: gen_df(spark, gen_list).coalesce(1).write.parquet(path),
lambda spark, path: spark.read.parquet(path),
data_path,
conf=writer_confs)
Loading

0 comments on commit 53a4083

Please sign in to comment.