Skip to content

Commit

Permalink
Increase row limit when doing count() for HostColumnarToGpu (#1868)
Browse files Browse the repository at this point in the history
* Increase row limit when doing count() for HostColumnarToGpu

Signed-off-by: Thomas Graves <[email protected]>

* put test back in

* comment

* update test comment

* update comment

Signed-off-by: Thomas Graves <[email protected]>

* Update count tests function, fix missed calls, and review comments
  • Loading branch information
tgravescs authored Mar 5, 2021
1 parent f66c3ef commit dc66f03
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 18 deletions.
38 changes: 24 additions & 14 deletions integration_tests/src/main/python/asserts.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def __ge__(self, other):
def __ne__(self, other):
return self.cmp(other) != 0

def _prep_func_for_compare(func, should_collect):
def _prep_func_for_compare(func, mode):
sort_locally = should_sort_locally()
if should_sort_on_spark():
def with_sorted(spark):
Expand All @@ -174,9 +174,12 @@ def with_limit(spark):
else:
limit_func = sorted_func

if should_collect:
if mode == 'COLLECT':
bring_back = lambda spark: limit_func(spark).collect()
collect_type = 'COLLECT'
elif mode == 'COUNT':
bring_back = lambda spark: limit_func(spark).count()
collect_type = 'COUNT'
else:
bring_back = lambda spark: limit_func(spark).toLocalIterator()
collect_type = 'ITERATOR'
Expand All @@ -196,7 +199,7 @@ def _assert_gpu_and_cpu_writes_are_equal(
write_func,
read_func,
base_path,
should_collect,
mode,
conf={}):
conf = _prep_incompat_conf(conf)

Expand All @@ -214,9 +217,9 @@ def _assert_gpu_and_cpu_writes_are_equal(
gpu_end - gpu_start, cpu_end - cpu_start))

(cpu_bring_back, cpu_collect_type) = _prep_func_for_compare(
lambda spark: read_func(spark, cpu_path), should_collect)
lambda spark: read_func(spark, cpu_path), mode)
(gpu_bring_back, gpu_collect_type) = _prep_func_for_compare(
lambda spark: read_func(spark, gpu_path), should_collect)
lambda spark: read_func(spark, gpu_path), mode)

from_cpu = with_cpu_session(cpu_bring_back, conf=conf)
from_gpu = with_cpu_session(gpu_bring_back, conf=conf)
Expand All @@ -233,7 +236,7 @@ def assert_gpu_and_cpu_writes_are_equal_collect(write_func, read_func, base_path
In this case the data is collected back to the driver and compared here, so be
careful about the amount of data returned.
"""
_assert_gpu_and_cpu_writes_are_equal(write_func, read_func, base_path, True, conf=conf)
_assert_gpu_and_cpu_writes_are_equal(write_func, read_func, base_path, 'COLLECT', conf=conf)

def assert_gpu_and_cpu_writes_are_equal_iterator(write_func, read_func, base_path, conf={}):
"""
Expand All @@ -242,7 +245,7 @@ def assert_gpu_and_cpu_writes_are_equal_iterator(write_func, read_func, base_pat
In this case the data is pulled back to the driver in chunks and compared here
so any amount of data can work, just be careful about how long it might take.
"""
_assert_gpu_and_cpu_writes_are_equal(write_func, read_func, base_path, False, conf=conf)
_assert_gpu_and_cpu_writes_are_equal(write_func, read_func, base_path, 'ITERATOR', conf=conf)

def assert_gpu_fallback_write(write_func,
read_func,
Expand All @@ -268,9 +271,9 @@ def assert_gpu_fallback_write(write_func,
gpu_end - gpu_start, cpu_end - cpu_start))

(cpu_bring_back, cpu_collect_type) = _prep_func_for_compare(
lambda spark: read_func(spark, cpu_path), True)
lambda spark: read_func(spark, cpu_path), 'COLLECT')
(gpu_bring_back, gpu_collect_type) = _prep_func_for_compare(
lambda spark: read_func(spark, gpu_path), True)
lambda spark: read_func(spark, gpu_path), 'COLLECT')

from_cpu = with_cpu_session(cpu_bring_back, conf=conf)
from_gpu = with_cpu_session(gpu_bring_back, conf=conf)
Expand All @@ -283,7 +286,7 @@ def assert_gpu_fallback_write(write_func,
def assert_gpu_fallback_collect(func,
cpu_fallback_class_name,
conf={}):
(bring_back, collect_type) = _prep_func_for_compare(func, True)
(bring_back, collect_type) = _prep_func_for_compare(func, 'COLLECT')
conf = _prep_incompat_conf(conf)

print('### CPU RUN ###')
Expand All @@ -307,9 +310,9 @@ def assert_gpu_fallback_collect(func,
assert_equal(from_cpu, from_gpu)

def _assert_gpu_and_cpu_are_equal(func,
should_collect,
mode,
conf={}):
(bring_back, collect_type) = _prep_func_for_compare(func, should_collect)
(bring_back, collect_type) = _prep_func_for_compare(func, mode)
conf = _prep_incompat_conf(conf)

print('### CPU RUN ###')
Expand All @@ -335,16 +338,23 @@ def assert_gpu_and_cpu_are_equal_collect(func, conf={}):
In this case the data is collected back to the driver and compared here, so be
careful about the amount of data returned.
"""
_assert_gpu_and_cpu_are_equal(func, True, conf=conf)
_assert_gpu_and_cpu_are_equal(func, 'COLLECT', conf=conf)

def assert_gpu_and_cpu_are_equal_iterator(func, conf={}):
"""
Assert when running func on both the CPU and the GPU that the results are equal.
In this case the data is pulled back to the driver in chunks and compared here
so any amount of data can work, just be careful about how long it might take.
"""
_assert_gpu_and_cpu_are_equal(func, False, conf=conf)
_assert_gpu_and_cpu_are_equal(func, 'ITERATOR', conf=conf)

def assert_gpu_and_cpu_row_counts_equal(func, conf={}):
"""
Assert that the row counts from running the func are the same on both the CPU and GPU.
This function runs count() to only get the number of rows and compares that count
between the CPU and GPU. It does NOT compare any underlying data.
"""
_assert_gpu_and_cpu_are_equal(func, 'COUNT', conf=conf)

def assert_gpu_and_cpu_are_equal_sql(df_fun, table_name, sql, conf=None):
"""
Expand Down
10 changes: 8 additions & 2 deletions integration_tests/src/main/python/datasourcev2_read_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

import pytest

from asserts import assert_gpu_and_cpu_are_equal_collect
from marks import validate_execs_in_gpu_plan
from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_row_counts_equal
from marks import *

columnarClass = 'com.nvidia.spark.rapids.tests.datasourcev2.parquet.ArrowColumnarDataSourceV2'

Expand All @@ -39,6 +39,12 @@ def test_read_all_types():
readTable("int,bool,byte,short,long,string,float,double,date,timestamp", columnarClass),
conf={'spark.rapids.sql.castFloatToString.enabled': 'true'})

@validate_execs_in_gpu_plan('HostColumnarToGpu')
def test_read_all_types_count():
assert_gpu_and_cpu_row_counts_equal(
readTable("int,bool,byte,short,long,string,float,double,date,timestamp", columnarClass),
conf={'spark.rapids.sql.castFloatToString.enabled': 'true'})

@validate_execs_in_gpu_plan('HostColumnarToGpu')
def test_read_arrow_off():
assert_gpu_and_cpu_are_equal_collect(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,14 @@ class HostToGpuCoalesceIterator(iter: Iterator[ColumnarBatch],
// when reading host batches it is essential to read the data immediately and pass to a
// builder and we need to determine how many rows to allocate in the builder based on the
// schema and desired batch size
batchRowLimit = GpuBatchUtils.estimateRowCount(goal.targetSizeBytes,
GpuBatchUtils.estimateGpuMemory(schema, 512), 512)
batchRowLimit = if (batch.numCols() > 0) {
GpuBatchUtils.estimateRowCount(goal.targetSizeBytes,
GpuBatchUtils.estimateGpuMemory(schema, 512), 512)
} else {
// when there aren't any columns, it generally means user is doing a count() and we don't
// need to limit batch size because there isn't any actual data
Integer.MAX_VALUE
}

// if no columns then probably a count operation so doesn't matter which builder we use
// as we won't actually copy any data and we can't tell what type of data it is without
Expand Down

0 comments on commit dc66f03

Please sign in to comment.