Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update locate_parquet_testing_files function to support hdfs input path for dataproc CI #10356

Merged
merged 3 commits into from
Feb 6, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 40 additions & 2 deletions integration_tests/src/main/python/parquet_testing_test.py
jlowe marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2023, NVIDIA CORPORATION.
# Copyright (c) 2023-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,7 @@
from data_gen import copy_and_update, non_utc_allow
from marks import allow_non_gpu
from pathlib import Path
import subprocess
import pytest
from spark_session import is_before_spark_330, is_spark_350_or_later
import warnings
Expand Down Expand Up @@ -72,6 +73,43 @@
_error_files["lz4_raw_compressed.parquet"] = "Exception"
_error_files["lz4_raw_compressed_larger.parquet"] = "Exception"

def hdfs_glob(path, pattern):
"""
Finds hdfs files by checking the input path with glob pattern

:param path: hdfs path to check
:type path: pathlib.Path
:return: generator of matched files
"""
path_str = path.as_posix()
full_pattern = path_str + '/' + pattern
cmd = ['hadoop', 'fs', '-ls', '-C', full_pattern]

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should be possible to call FileSystem globStatus directly via PY4J without forking the JVM via hadoop fs

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Gera, Thanks for the review!

I tried following code to use PY4J to glob files by patterns and it works as well.
But seems it's not so straightforward like hadoop fs -ls command and also we need to process the path string further.

Considering the readability, I think probably we could keep the current implementation?

Suggested change
cmd = ['hadoop', 'fs', '-ls', '-C', full_pattern]
path_str = path.as_posix()
full_pattern = path_str + '/' + pattern
sc = get_spark_i_know_what_i_am_doing().sparkContext
config = sc._jsc.hadoopConfiguration()
fs_path = sc._jvm.org.apache.hadoop.fs.Path(full_pattern)
fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(config)
statuses = fs.globStatus(fs_path)
for status in statuses:
# status.getPath().toString() return string like "hdfs://hostname:8020/src/test/resources/parquet-testing/data/single_nan.parquet"
# but pathlib.Path will remove the first "/" and convert it to "hdfs:/hostname:8020/src/test/resources/parquet-testing/data/single_nan.parquet" and then this path becomes illegal.
# so we need to process the path like this.
p = f'hdfs:{status.getPath().toUri().getPath()}'
yield Path(p)

Copy link
Collaborator

@gerashegalov gerashegalov Feb 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find it readable enough. Not a must-fix but probably would save us trouble-shooting various OOMs and other resource issues down the road. Especially in the xdist case. Keep in mind JVM initialization is slow. Hadoop adds significant XML parsing overhead on top of it. We can file it as a potential improvement and mitigation.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will file it as a improvement. Thanks a lot!

process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
stdout, stderr = process.communicate()
if process.returncode != 0:
raise AssertionError(f'Failed to list files from {path_str}. Error: {stderr}')

paths = stdout.strip().split('\n')

for p in paths:
yield Path(p)

def glob(path, pattern):
"""
Finds files by checking the input path with glob pattern.
Support local file system and hdfs

:param path: input path to check
:type path: pathlib.Path
:return: generator of matched files
"""
path_str = path.as_posix()
if not path_str.startswith('hdfs:'):
return path.glob(pattern)

return hdfs_glob(path, pattern)

def locate_parquet_testing_files():
"""
Finds the input files by first checking the standard input path,
Expand All @@ -88,7 +126,7 @@ def locate_parquet_testing_files():
for p in places:
files = []
for pattern in glob_patterns:
files += p.glob(pattern)
files += glob(p, pattern)
if files:
return files
locations = ", ".join([ p.joinpath(g).as_posix() for p in places for g in glob_patterns])
Expand Down
Loading