Skip to content

Commit

Permalink
Merge pull request #363 from python-jsonschema/improve-file-handling
Browse files Browse the repository at this point in the history
Improve file handling with lazy reads
  • Loading branch information
sirosen authored Dec 6, 2023
2 parents bb2be1c + 03d433b commit 16a6062
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 46 deletions.
6 changes: 4 additions & 2 deletions src/check_jsonschema/cli/main_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
SchemaLoaderBase,
)
from ..transforms import TRANSFORM_LIBRARY
from .param_types import CommaDelimitedList, ValidatorClassName
from .param_types import CommaDelimitedList, LazyBinaryReadFile, ValidatorClassName
from .parse_result import ParseResult, SchemaLoadingMode

BUILTIN_SCHEMA_NAMES = [f"vendor.{k}" for k in SCHEMA_CATALOG.keys()] + [
Expand Down Expand Up @@ -220,7 +220,9 @@ def pretty_helptext_list(values: list[str] | tuple[str, ...]) -> str:
help="Reduce output verbosity",
count=True,
)
@click.argument("instancefiles", required=True, nargs=-1, type=click.File("rb"))
@click.argument(
"instancefiles", required=True, nargs=-1, type=LazyBinaryReadFile("rb", lazy=True)
)
def main(
*,
schemafile: str | None,
Expand Down
50 changes: 50 additions & 0 deletions src/check_jsonschema/cli/param_types.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from __future__ import annotations

import importlib
import os
import re
import stat
import typing as t

import click
import jsonschema
from click._compat import open_stream


class CommaDelimitedList(click.ParamType):
Expand Down Expand Up @@ -104,3 +107,50 @@ def convert(
self.fail(f"'{classname}' in '{pkg}' is not a class", param, ctx)

return t.cast(t.Type[jsonschema.protocols.Validator], result)


class CustomLazyFile(click.utils.LazyFile):
def __init__(
self,
filename: str | os.PathLike[str],
mode: str = "r",
encoding: str | None = None,
errors: str | None = "strict",
atomic: bool = False,
):
self.name: str = os.fspath(filename)
self.mode = mode
self.encoding = encoding
self.errors = errors
self.atomic = atomic
self._f: t.IO[t.Any] | None
self.should_close: bool

if self.name == "-":
self._f, self.should_close = open_stream(filename, mode, encoding, errors)
else:
if "r" in mode and not stat.S_ISFIFO(os.stat(filename).st_mode):
# Open and close the file in case we're opening it for
# reading so that we can catch at least some errors in
# some cases early.
open(filename, mode).close()
self._f = None
self.should_close = True


class LazyBinaryReadFile(click.File):
def convert(
self,
value: str | os.PathLike[str] | t.IO[t.Any],
param: click.Parameter | None,
ctx: click.Context | None,
) -> t.IO[bytes]:
if hasattr(value, "read") or hasattr(value, "write"):
return t.cast(t.IO[bytes], value)

value_: str | os.PathLike[str] = t.cast("str | os.PathLike[str]", value)

lf = CustomLazyFile(value_, mode="rb")
if ctx is not None:
ctx.call_on_close(lf.close_intelligently)
return t.cast(t.IO[bytes], lf)
27 changes: 19 additions & 8 deletions src/check_jsonschema/instance_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@
import io
import typing as t

from check_jsonschema.cli.param_types import CustomLazyFile

from .parsers import ParseError, ParserSet
from .transforms import Transform


class InstanceLoader:
def __init__(
self,
files: t.Sequence[t.BinaryIO],
files: t.Sequence[t.BinaryIO | CustomLazyFile],
default_filetype: str = "json",
data_transform: Transform | None = None,
) -> None:
Expand All @@ -35,12 +37,21 @@ def iter_files(self) -> t.Iterator[tuple[str, ParseError | t.Any]]:
name = "<stdin>"
else:
raise ValueError(f"File {file} has no name attribute")

try:
data: t.Any = self._parsers.parse_data_with_path(
file, name, self._default_filetype
)
except ParseError as err:
data = err
else:
data = self._data_transform(data)
if isinstance(file, CustomLazyFile):
stream: t.BinaryIO = t.cast(t.BinaryIO, file.open())
else:
stream = file

try:
data: t.Any = self._parsers.parse_data_with_path(
stream, name, self._default_filetype
)
except ParseError as err:
data = err
else:
data = self._data_transform(data)
finally:
file.close()
yield (name, data)
37 changes: 25 additions & 12 deletions src/check_jsonschema/schema_loader/readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@
yaml = ruamel.yaml.YAML(typ="safe")


class _UnsetType:
pass


_UNSET = _UnsetType()


def _run_load_callback(schema_location: str, callback: t.Callable) -> dict:
try:
schema = callback()
Expand All @@ -31,6 +38,7 @@ def __init__(self, filename: str) -> None:
self.path = filename2path(filename)
self.filename = str(self.path)
self.parsers = ParserSet()
self._parsed_schema: dict | _UnsetType = _UNSET

def get_retrieval_uri(self) -> str | None:
return self.path.as_uri()
Expand All @@ -39,21 +47,26 @@ def _read_impl(self) -> t.Any:
return self.parsers.parse_file(self.path, default_filetype="json")

def read_schema(self) -> dict:
return _run_load_callback(self.filename, self._read_impl)
if self._parsed_schema is _UNSET:
self._parsed_schema = _run_load_callback(self.filename, self._read_impl)
return t.cast(dict, self._parsed_schema)


class StdinSchemaReader:
def __init__(self) -> None:
self.parsers = ParserSet()
self._parsed_schema: dict | _UnsetType = _UNSET

def get_retrieval_uri(self) -> str | None:
return None

def read_schema(self) -> dict:
try:
return json.load(sys.stdin)
except ValueError as e:
raise ParseError("Failed to parse JSON from stdin") from e
if self._parsed_schema is _UNSET:
try:
self._parsed_schema = json.load(sys.stdin)
except ValueError as e:
raise ParseError("Failed to parse JSON from stdin") from e
return t.cast(dict, self._parsed_schema)


class HttpSchemaReader:
Expand All @@ -71,14 +84,12 @@ def __init__(
disable_cache=disable_cache,
validation_callback=self._parse,
)
self._parsed_schema: t.Any | None = None
self._parsed_schema: dict | _UnsetType = _UNSET

def _parse(self, schema_bytes: bytes) -> t.Any:
if self._parsed_schema is None:
self._parsed_schema = self.parsers.parse_data_with_path(
io.BytesIO(schema_bytes), self.url, default_filetype="json"
)
return self._parsed_schema
return self.parsers.parse_data_with_path(
io.BytesIO(schema_bytes), self.url, default_filetype="json"
)

def get_retrieval_uri(self) -> str | None:
return self.url
Expand All @@ -88,4 +99,6 @@ def _read_impl(self) -> t.Any:
return self._parse(fp.read())

def read_schema(self) -> dict:
return _run_load_callback(self.url, self._read_impl)
if self._parsed_schema is _UNSET:
self._parsed_schema = _run_load_callback(self.url, self._read_impl)
return t.cast(dict, self._parsed_schema)
40 changes: 21 additions & 19 deletions tests/acceptance/test_special_filetypes.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import multiprocessing
import os
import platform
import sys
import threading

import pytest
import responses
Expand Down Expand Up @@ -33,6 +33,16 @@ def test_schema_and_instance_in_memfds(run_line_simple):
os.close(instancefd)


# helper (in global scope) for multiprocessing "spawn" to be able to use to launch
# background writers
def _fifo_write(path, data):
fd = os.open(path, os.O_WRONLY)
try:
os.write(fd, data)
finally:
os.close(fd)


@pytest.mark.skipif(os.name != "posix", reason="test requires mkfifo")
@pytest.mark.parametrize("check_succeeds", (True, False))
def test_schema_and_instance_in_fifos(tmp_path, run_line, check_succeeds):
Expand All @@ -45,25 +55,17 @@ def test_schema_and_instance_in_fifos(tmp_path, run_line, check_succeeds):
os.mkfifo(schema_path)
os.mkfifo(instance_path)

# execute FIFO writes as blocking writes in background threads
# nonblocking writes fail file existence if there's no reader, so using a FIFO
# requires some level of concurrency
def fifo_write(path, data):
fd = os.open(path, os.O_WRONLY)
try:
os.write(fd, data)
finally:
os.close(fd)

schema_thread = threading.Thread(
target=fifo_write, args=[schema_path, b'{"type": "integer"}']
spawn_ctx = multiprocessing.get_context("spawn")

schema_proc = spawn_ctx.Process(
target=_fifo_write, args=(schema_path, b'{"type": "integer"}')
)
schema_thread.start()
schema_proc.start()
instance_data = b"42" if check_succeeds else b'"foo"'
instance_thread = threading.Thread(
target=fifo_write, args=[instance_path, instance_data]
instance_proc = spawn_ctx.Process(
target=_fifo_write, args=(instance_path, instance_data)
)
instance_thread.start()
instance_proc.start()

try:
result = run_line(
Expand All @@ -74,8 +76,8 @@ def fifo_write(path, data):
else:
assert result.exit_code == 1
finally:
schema_thread.join(timeout=0.1)
instance_thread.join(timeout=0.1)
schema_proc.terminate()
instance_proc.terminate()


@pytest.mark.parametrize("check_passes", (True, False))
Expand Down
3 changes: 1 addition & 2 deletions tests/unit/test_cli_parse.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from __future__ import annotations

import io
from unittest import mock

import click
Expand Down Expand Up @@ -86,7 +85,7 @@ def test_schemafile_and_instancefile(runner, mock_parse_result, in_tmp_dir, tmp_
assert mock_parse_result.schema_path == "schema.json"
assert isinstance(mock_parse_result.instancefiles, tuple)
for f in mock_parse_result.instancefiles:
assert isinstance(f, (io.BytesIO, io.BufferedReader))
assert isinstance(f, click.utils.LazyFile)
assert tuple(f.name for f in mock_parse_result.instancefiles) == ("foo.json",)


Expand Down
46 changes: 46 additions & 0 deletions tests/unit/test_lazy_file_handling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import os
import platform

import pytest
from click.testing import CliRunner

from check_jsonschema.cli.main_command import build_checker
from check_jsonschema.cli.main_command import main as cli_main


@pytest.fixture
def runner() -> CliRunner:
return CliRunner(mix_stderr=False)


@pytest.mark.skipif(
platform.system() != "Linux", reason="test requires /proc/self/ mechanism"
)
def test_open_file_usage_never_exceeds_1000(runner, monkeypatch, tmp_path):
schema_path = tmp_path / "schema.json"
schema_path.write_text("{}")

args = [
"--schemafile",
str(schema_path),
]

for i in range(2000):
instance_path = tmp_path / f"file{i}.json"
instance_path.write_text("{}")
args.append(str(instance_path))

checker = None

def fake_execute(argv):
nonlocal checker
checker = build_checker(argv)

monkeypatch.setattr("check_jsonschema.cli.main_command.execute", fake_execute)
res = runner.invoke(cli_main, args)
assert res.exit_code == 0, res.stderr

assert checker is not None
assert len(os.listdir("/proc/self/fd")) < 2000
for _fname, _data in checker._instance_loader.iter_files():
assert len(os.listdir("/proc/self/fd")), 2000
4 changes: 1 addition & 3 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,10 @@ commands = coverage report --skip-covered

[testenv:mypy]
description = "check type annotations with mypy"
# temporarily pin back click until either click 8.1.5 releases or mypy fixes the issue
# with referential integrity of type aliases
deps = mypy
types-jsonschema
types-requests
click==8.1.3
click
commands = mypy src/ {posargs}

[testenv:pyright]
Expand Down

0 comments on commit 16a6062

Please sign in to comment.