diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md index 28d3d9b8d..23f1ee1a7 100644 --- a/CONTRIBUTORS.md +++ b/CONTRIBUTORS.md @@ -11,6 +11,7 @@ history and [GitHubs 'Contributors' feature](https://github.com/py-pdf/PyPDF2/gr ## Contributors to the pyPdf / PyPDF2 project +* [JianzhengLuo](https://github.com/JianzhengLuo) * [Karvonen, Harry](https://github.com/Hatell/) * [KourFrost](https://github.com/KourFrost) * [Lightup1](https://github.com/Lightup1) diff --git a/PyPDF2/_merger.py b/PyPDF2/_merger.py index db6bf0480..a63fe76cc 100644 --- a/PyPDF2/_merger.py +++ b/PyPDF2/_merger.py @@ -27,7 +27,18 @@ from io import BytesIO, FileIO, IOBase from pathlib import Path -from typing import Any, Dict, Iterable, List, Optional, Tuple, Union, cast +from types import TracebackType +from typing import ( + Any, + Dict, + Iterable, + List, + Optional, + Tuple, + Type, + Union, + cast, +) from ._encryption import Encryption from ._page import PageObject @@ -84,18 +95,38 @@ class PdfMerger: :param bool strict: Determines whether user should be warned of all problems and also causes some correctable problems to be fatal. Defaults to ``False``. + :param fileobj: Output file. Can be a filename or any kind of + file-like object. """ @deprecate_bookmark(bookmarks="outline") - def __init__(self, strict: bool = False) -> None: + def __init__( + self, strict: bool = False, fileobj: Union[Path, StrByteType] = "" + ) -> None: self.inputs: List[Tuple[Any, PdfReader, bool]] = [] self.pages: List[Any] = [] self.output: Optional[PdfWriter] = PdfWriter() self.outline: OutlineType = [] self.named_dests: List[Any] = [] self.id_count = 0 + self.fileobj = fileobj self.strict = strict + def __enter__(self) -> "PdfMerger": + # There is nothing to do. + return self + + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc: Optional[BaseException], + traceback: Optional[TracebackType], + ) -> None: + """Write to the fileobj and close the merger.""" + if self.fileobj: + self.write(self.fileobj) + self.close() + @deprecate_bookmark(bookmark="outline_item", import_bookmarks="import_outline") def merge( self, @@ -254,7 +285,7 @@ def append( """ self.merge(len(self.pages), fileobj, outline_item, pages, import_outline) - def write(self, fileobj: StrByteType) -> None: + def write(self, fileobj: Union[Path, StrByteType]) -> None: """ Write all data that has been merged to the given output file. @@ -263,10 +294,6 @@ def write(self, fileobj: StrByteType) -> None: """ if self.output is None: raise RuntimeError(ERR_CLOSED_WRITER) - my_file = False - if isinstance(fileobj, str): - fileobj = FileIO(fileobj, "wb") - my_file = True # Add pages to the PdfWriter # The commented out line below was replaced with the two lines below it @@ -285,10 +312,10 @@ def write(self, fileobj: StrByteType) -> None: self._write_outline() # Write the output to the file - self.output.write(fileobj) + my_file, ret_fileobj = self.output.write(fileobj) if my_file: - fileobj.close() + ret_fileobj.close() def close(self) -> None: """Shut all file descriptors (input and output) and clear all memory usage.""" diff --git a/PyPDF2/_writer.py b/PyPDF2/_writer.py index 6f54df2ff..d9448d1bf 100644 --- a/PyPDF2/_writer.py +++ b/PyPDF2/_writer.py @@ -36,6 +36,9 @@ import time import uuid from hashlib import md5 +from io import BufferedReader, BufferedWriter, BytesIO, FileIO +from pathlib import Path +from types import TracebackType from typing import ( Any, Callable, @@ -44,6 +47,7 @@ List, Optional, Tuple, + Type, Union, cast, ) @@ -52,6 +56,7 @@ from ._reader import PdfReader from ._security import _alg33, _alg34, _alg35 from ._utils import ( + StrByteType, StreamType, _get_max_pdf_version_header, b_, @@ -121,7 +126,7 @@ class PdfWriter: class (typically :class:`PdfReader`). """ - def __init__(self) -> None: + def __init__(self, fileobj: StrByteType = "") -> None: self._header = b"%PDF-1.3" self._objects: List[Optional[PdfObject]] = [] # array of indirect objects self._idnum_hash: Dict[bytes, IndirectObject] = {} @@ -158,6 +163,23 @@ def __init__(self) -> None: ) self._root: Optional[IndirectObject] = None self._root_object = root + self.fileobj = fileobj + self.with_as_usage = False + + def __enter__(self) -> "PdfWriter": + """Store that writer is initialized by 'with'.""" + self.with_as_usage = True + return self + + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc: Optional[BaseException], + traceback: Optional[TracebackType], + ) -> None: + """Write data to the fileobj.""" + if self.fileobj: + self.write(self.fileobj) @property def pdf_header(self) -> bytes: @@ -771,13 +793,7 @@ def encrypt( self._encrypt = self._add_object(encrypt) self._encrypt_key = key - def write(self, stream: StreamType) -> None: - """ - Write the collection of pages added to this object out as a PDF file. - - :param stream: An object to write the file to. The object must support - the write method and the tell method, similar to a file object. - """ + def write_stream(self, stream: StreamType) -> None: if hasattr(stream, "mode") and "b" not in stream.mode: logger_warning( f"File <{stream.name}> to write to is not in binary mode. " # type: ignore @@ -803,6 +819,33 @@ def write(self, stream: StreamType) -> None: self._write_trailer(stream) stream.write(b_(f"\nstartxref\n{xref_location}\n%%EOF\n")) # eof + def write( + self, stream: Union[Path, StrByteType] + ) -> Tuple[bool, Union[FileIO, BytesIO, BufferedReader, BufferedWriter]]: + """ + Write the collection of pages added to this object out as a PDF file. + + :param stream: An object to write the file to. The object can support + the write method and the tell method, similar to a file object, or + be a file path, just like the fileobj, just named it stream to keep + existing workflow. + """ + my_file = False + + if stream == "": + raise ValueError(f"Output(stream={stream}) is empty.") + + if isinstance(stream, (str, Path)): + stream = FileIO(stream, "wb") + my_file = True + + self.write_stream(stream) + + if self.with_as_usage: + stream.close() + + return my_file, stream + def _write_header(self, stream: StreamType) -> List[int]: object_positions = [] stream.write(self.pdf_header + b"\n") diff --git a/tests/test_generic.py b/tests/test_generic.py index 0ed2ee7d1..350ab8aef 100644 --- a/tests/test_generic.py +++ b/tests/test_generic.py @@ -351,7 +351,6 @@ class Tst: # to replace pdf if length in (6, 10): assert b"BT /F1" in do._StreamObject__data raise PdfReadError("__ALLGOOD__") - print(exc.value) assert should_fail ^ (exc.value.args[0] == "__ALLGOOD__") diff --git a/tests/test_merger.py b/tests/test_merger.py index f4cf78179..7411a7a9d 100644 --- a/tests/test_merger.py +++ b/tests/test_merger.py @@ -18,14 +18,12 @@ sys.path.append(str(PROJECT_ROOT)) -def test_merge(): +def merger_operate(merger): pdf_path = RESOURCE_ROOT / "crazyones.pdf" outline = RESOURCE_ROOT / "pdflatex-outline.pdf" pdf_forms = RESOURCE_ROOT / "pdflatex-forms.pdf" pdf_pw = RESOURCE_ROOT / "libreoffice-writer-password.pdf" - merger = PyPDF2.PdfMerger() - # string path: merger.append(pdf_path) merger.append(outline) @@ -95,10 +93,8 @@ def test_merge(): merger.set_page_layout("/SinglePage") merger.set_page_mode("/UseThumbs") - tmp_path = "dont_commit_merged.pdf" - merger.write(tmp_path) - merger.close() +def check_outline(tmp_path): # Check if outline is correct reader = PyPDF2.PdfReader(tmp_path) assert [el.title for el in reader.outline if isinstance(el, Destination)] == [ @@ -117,8 +113,44 @@ def test_merge(): # TODO: There seem to be no destinations for those links? - # Clean up - os.remove(tmp_path) + +tmp_filename = "dont_commit_merged.pdf" + + +def test_merger_operations_by_traditional_usage(tmp_path): + # Arrange + merger = PdfMerger() + merger_operate(merger) + path = tmp_path / tmp_filename + + # Act + merger.write(path) + merger.close() + + # Assert + check_outline(path) + + +def test_merger_operations_by_semi_traditional_usage(tmp_path): + path = tmp_path / tmp_filename + + with PdfMerger() as merger: + merger_operate(merger) + merger.write(path) # Act + + # Assert + assert os.path.isfile(path) + check_outline(path) + + +def test_merger_operation_by_new_usage(tmp_path): + path = tmp_path / tmp_filename + with PdfMerger(fileobj=path) as merger: + merger_operate(merger) + + # Assert + assert os.path.isfile(path) + check_outline(path) def test_merge_page_exception(): diff --git a/tests/test_writer.py b/tests/test_writer.py index 048bd4ed7..667d6590b 100644 --- a/tests/test_writer.py +++ b/tests/test_writer.py @@ -26,12 +26,9 @@ def test_writer_clone(): assert len(writer.pages) == 4 -def test_writer_operations(): +def writer_operate(writer): """ - This test just checks if the operation throws an exception. - - This should be done way more thoroughly: It should be checked if the - output is as expected. + To test the writer that initialized by each of the four usages. """ pdf_path = RESOURCE_ROOT / "crazyones.pdf" pdf_outline_path = RESOURCE_ROOT / "pdflatex-outline.pdf" @@ -39,7 +36,6 @@ def test_writer_operations(): reader = PdfReader(pdf_path) reader_outline = PdfReader(pdf_outline_path) - writer = PdfWriter() page = reader.pages[0] with pytest.raises(PageSizeNotDefinedError) as exc: writer.add_blank_page() @@ -91,19 +87,101 @@ def test_writer_operations(): writer.add_attachment("foobar.gif", b"foobarcontent") - # finally, write "output" to PyPDF2-output.pdf - tmp_path = "dont_commit_writer.pdf" - with open(tmp_path, "wb") as output_stream: - writer.write(output_stream) - # Check that every key in _idnum_hash is correct objects_hash = [o.hash_value() for o in writer._objects] for k, v in writer._idnum_hash.items(): assert v.pdf == writer assert k in objects_hash, "Missing %s" % v - # cleanup - os.remove(tmp_path) + +tmp_path = "dont_commit_writer.pdf" + + +@pytest.mark.parametrize( + ("write_data_here", "needs_cleanup"), + [ + ("dont_commit_writer.pdf", True), + (Path("dont_commit_writer.pdf"), True), + (BytesIO(), False), + ], +) +def test_writer_operations_by_traditional_usage(write_data_here, needs_cleanup): + writer = PdfWriter() + + writer_operate(writer) + + # finally, write "output" to PyPDF2-output.pdf + if needs_cleanup: + with open(write_data_here, "wb") as output_stream: + writer.write(output_stream) + else: + output_stream = write_data_here + writer.write(output_stream) + + if needs_cleanup: + os.remove(write_data_here) + + +@pytest.mark.parametrize( + ("write_data_here", "needs_cleanup"), + [ + ("dont_commit_writer.pdf", True), + (Path("dont_commit_writer.pdf"), True), + (BytesIO(), False), + ], +) +def test_writer_operations_by_semi_traditional_usage(write_data_here, needs_cleanup): + with PdfWriter() as writer: + writer_operate(writer) + + # finally, write "output" to PyPDF2-output.pdf + if needs_cleanup: + with open(write_data_here, "wb") as output_stream: + writer.write(output_stream) + else: + output_stream = write_data_here + writer.write(output_stream) + + if needs_cleanup: + os.remove(write_data_here) + + +@pytest.mark.parametrize( + ("write_data_here", "needs_cleanup"), + [ + ("dont_commit_writer.pdf", True), + (Path("dont_commit_writer.pdf"), True), + (BytesIO(), False), + ], +) +def test_writer_operations_by_semi_new_traditional_usage( + write_data_here, needs_cleanup +): + with PdfWriter() as writer: + writer_operate(writer) + + # finally, write "output" to PyPDF2-output.pdf + writer.write(write_data_here) + + if needs_cleanup: + os.remove(write_data_here) + + +@pytest.mark.parametrize( + ("write_data_here", "needs_cleanup"), + [ + ("dont_commit_writer.pdf", True), + (Path("dont_commit_writer.pdf"), True), + (BytesIO(), False), + ], +) +def test_writer_operation_by_new_usage(write_data_here, needs_cleanup): + # This includes write "output" to PyPDF2-output.pdf + with PdfWriter(write_data_here) as writer: + writer_operate(writer) + + if needs_cleanup: + os.remove(write_data_here) @pytest.mark.parametrize( @@ -656,3 +734,13 @@ def test_colors_in_outline_item(): # Cleanup os.remove(target) # remove for testing + + +def test_write_empty_stream(): + reader = PdfReader(EXTERNAL_ROOT / "004-pdflatex-4-pages/pdflatex-4-pages.pdf") + writer = PdfWriter() + writer.clone_document_from_reader(reader) + + with pytest.raises(ValueError) as exc: + writer.write("") + assert exc.value.args[0] == "Output(stream=) is empty."