Skip to content

Commit

Permalink
tests: hdfs: use hadoop/docker fixtures
Browse files Browse the repository at this point in the history
Fixed #4054
  • Loading branch information
efiop committed Jul 10, 2020
1 parent fe1d7e9 commit e039c06
Show file tree
Hide file tree
Showing 10 changed files with 117 additions and 115 deletions.
2 changes: 1 addition & 1 deletion dvc/remote/hdfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ def get_file_hash(self, path_info):
# NOTE: pyarrow doesn't support checksum, so we need to use hadoop
regex = r".*\t.*\t(?P<checksum>.*)"
stdout = self.hadoop_fs(
f"checksum {path_info.path}", user=path_info.user
f"checksum {path_info.url}", user=path_info.user
)
return self._group(regex, stdout, "checksum")

Expand Down
4 changes: 0 additions & 4 deletions scripts/ci/before_install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@ if [[ "$TRAVIS_BUILD_STAGE_NAME" == "test" ]]; then
ssh 0.0.0.0 ls &>/dev/null
fi

if [ "$TRAVIS_OS_NAME" == "linux" ]; then
bash "$scriptdir/install_hadoop.sh"
fi

if [[ "$TRAVIS_OS_NAME" == "osx" && "$TRAVIS_PULL_REQUEST" == "false" ]]; then
brew install openssl
$scriptdir/retry.sh brew cask install google-cloud-sdk
Expand Down
6 changes: 0 additions & 6 deletions scripts/ci/core-site.xml

This file was deleted.

6 changes: 0 additions & 6 deletions scripts/ci/hdfs-site.xml

This file was deleted.

48 changes: 0 additions & 48 deletions scripts/ci/install_hadoop.sh

This file was deleted.

7 changes: 0 additions & 7 deletions scripts/ci/remove_hadoop.sh

This file was deleted.

2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ def run(self):
"pylint",
"pylint-pytest",
"pylint-plugin-utils",
"wget",
"filelock",
]

if (sys.version_info) >= (3, 6):
Expand Down
8 changes: 8 additions & 0 deletions tests/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,11 @@ services:
image: rkuprieiev/oss-emulator
ports:
- "8880"
hdfs:
image: rkuprieiev/docker-hdfs
ports:
- "8020"
# NOTE: having this port as dynamic one will require modifying
# `dfs.datanode.address` in `hdfs-site.xml` and probably something
# else, so using default one for now.
- "50010:50010"
31 changes: 30 additions & 1 deletion tests/remotes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import pytest

from .azure import Azure, azure, azure_server # noqa: F401
from .hdfs import HDFS, hdfs # noqa: F401
from .hdfs import HDFS, hadoop, hdfs, hdfs_server # noqa: F401
from .http import HTTP, http, http_server # noqa: F401
from .local import Local, local_cloud, local_remote # noqa: F401
from .oss import OSS, TEST_OSS_REPO_BUCKET, oss, oss_server # noqa: F401
Expand Down Expand Up @@ -45,6 +45,35 @@ def docker_compose():
pytest.skip("no docker-compose installed")


@pytest.fixture(scope="session")
def docker_compose_project_name():
return "pytest-dvc-test"


@pytest.fixture(scope="session")
def docker_services(
docker_compose_file, docker_compose_project_name, tmp_path_factory
):
# overriding `docker_services` fixture from `pytest_docker` plugin to
# only launch docker images once.

from filelock import FileLock
from pytest_docker.plugin import DockerComposeExecutor, Services

executor = DockerComposeExecutor(
docker_compose_file, docker_compose_project_name,
)

# making sure we don't accidentally launch docker-compose in parallel,
# as it might result in network conflicts. Inspired by:
# https://github.com/pytest-dev/pytest-xdist#making-session-scoped-fixtures-execute-only-once
lockfile = tmp_path_factory.getbasetemp().parent / "docker-compose.lock"
with FileLock(str(lockfile)):
executor.execute("up --build -d")

return Services(executor)


@pytest.fixture
def remote(tmp_dir, dvc, request):
cloud = request.param
Expand Down
118 changes: 76 additions & 42 deletions tests/remotes/hdfs.py
Original file line number Diff line number Diff line change
@@ -1,57 +1,22 @@
import getpass
import locale
import os
import platform
import uuid
from contextlib import contextmanager
from subprocess import CalledProcessError, Popen, check_output

import pytest

from dvc.path_info import URLInfo

from .base import Base
from .local import Local


class HDFS(Base, URLInfo):
@staticmethod
def should_test():
if platform.system() != "Linux":
return False

try:
# pylint: disable=unexpected-keyword-arg
# see: https://github.com/PyCQA/pylint/issues/3645
check_output(
["hadoop", "version"],
shell=True,
executable=os.getenv("SHELL"),
)
except (CalledProcessError, OSError):
return False

p = Popen(
"hadoop fs -ls hdfs://127.0.0.1/",
shell=True,
executable=os.getenv("SHELL"),
)
p.communicate()
if p.returncode != 0:
return False

return True

@staticmethod
def get_url():
return "hdfs://{}@127.0.0.1{}".format(
getpass.getuser(), Local.get_storagepath()
)

class HDFS(Base, URLInfo): # pylint: disable=abstract-method
@contextmanager
def _hdfs(self):
import pyarrow

conn = pyarrow.hdfs.connect()
conn = pyarrow.hdfs.connect(self.host, self.port)
try:
yield conn
finally:
Expand Down Expand Up @@ -103,8 +68,77 @@ def read_text(self, encoding=None, errors=None):
return self.read_bytes().decode(encoding)


@pytest.fixture(scope="session")
def hadoop():
import wget
import tarfile
from appdirs import user_cache_dir

if platform.system() != "Linux":
pytest.skip("only supported on Linux")

hadoop_name = "hadoop-2.7.2.tar.gz"
java_name = "openjdk-7u75-b13-linux-x64-18_dec_2014.tar.gz"

base_url = "https://s3-us-east-2.amazonaws.com/dvc-public/dvc-test/"
hadoop_url = base_url + hadoop_name
java_url = base_url + java_name

(cache_dir,) = (user_cache_dir("dvc-test", "iterative"),)
dname = os.path.join(cache_dir, "hdfs")

java_tar = os.path.join(dname, java_name)
hadoop_tar = os.path.join(dname, hadoop_name)

java_home = os.path.join(dname, "java-se-7u75-ri")
hadoop_home = os.path.join(dname, "hadoop-2.7.2")

def _get(url, tar, target):
if os.path.isdir(target):
return

if not os.path.exists(tar):
wget.download(url, out=tar)
tar = tarfile.open(tar)
tar.extractall(dname)
assert os.path.isdir(target)

os.makedirs(dname, exist_ok=True)
_get(hadoop_url, hadoop_tar, hadoop_home)
_get(java_url, java_tar, java_home)

os.environ["JAVA_HOME"] = java_home
os.environ["HADOOP_HOME"] = hadoop_home
os.environ["PATH"] += f":{hadoop_home}/bin:{hadoop_home}/sbin"


@pytest.fixture(scope="session")
def hdfs_server(hadoop, docker_compose, docker_services):
import pyarrow

port = docker_services.port_for("hdfs", 8020)

def _check():
try:
# NOTE: just connecting or even opening something is not enough,
# we need to make sure that we are able to write something.
conn = pyarrow.hdfs.connect("127.0.0.1", port)
try:
with conn.open(str(uuid.uuid4()), "wb") as fobj:
fobj.write(b"test")
finally:
conn.close()
return True
except (pyarrow.ArrowException, OSError):
return False

docker_services.wait_until_responsive(timeout=30.0, pause=5, check=_check)

return port


@pytest.fixture
def hdfs():
if not HDFS.should_test():
pytest.skip("no hadoop running")
yield HDFS(HDFS.get_url())
def hdfs(hdfs_server):
port = hdfs_server
url = f"hdfs://127.0.0.1:{port}/{uuid.uuid4()}"
yield HDFS(url)

0 comments on commit e039c06

Please sign in to comment.