diff --git a/src/alpacks/__init__.py b/src/alpacks/__init__.py index 21ee419..8f2b66f 100644 --- a/src/alpacks/__init__.py +++ b/src/alpacks/__init__.py @@ -68,6 +68,9 @@ def __init__(self, **k: Unpack[PackProperties]): self._is_hidden_in_browse_groups = k.get("is_hidden_in_browse_groups", False) self.__context: _Context | None = None + # The `open()` method might return None, so we need a separate + # tracker to check the open status. + self.__has_context: bool = False # Propagate unexpected keys up to `object`, so that errors # will be thrown if appropriate. @@ -114,20 +117,21 @@ async def close(self, context: _Context) -> None: # Which is equivalent to: # # p = PackWriter(**args) - # context = p._open() + # context = p.open() # try: # p.set_file(...) # p.set_preview(...) - # p._commit() + # p.commit() # finally: - # p._close(context) + # p.close(context) # async def __aenter__(self) -> Self: - if self.__context is not None: + if self.__has_context: msg = f"{self} is already open" raise ValueError(msg) self.__context = await self.open() + self.__has_context = True return self async def __aexit__( @@ -136,7 +140,7 @@ async def __aexit__( exc_inst: BaseException | None, exc_tb: TracebackType | None, ) -> None: - if self.__context is None: + if not self.__has_context: msg = f"{self} is not open" raise ValueError(msg) @@ -144,7 +148,12 @@ async def __aexit__( if exc_type is None: await self.commit() finally: - await self.close(self.__context) + await self.close( + # The context type is allowed to be `None`, so we + # can't just assert that this is present. + self.__context, # type: ignore + ) + self.__has_context = False # For synchronous writes, just wrap an async writer. @@ -177,7 +186,7 @@ def close(self, context: _Context) -> None: asyncio.run(self._pack_writer_async.close(context)) def __enter__(self) -> Self: - self._context = self.open() + asyncio.run(self._pack_writer_async.__aenter__()) return self def __exit__( @@ -186,11 +195,7 @@ def __exit__( exc_val: BaseException | None, exc_tb: TracebackType | None, ) -> None: - try: - if exc_type is None: - self.commit() - finally: - self.close(self._context) + asyncio.run(self._pack_writer_async.__aexit__(exc_type, exc_val, exc_tb)) class DirectoryPackWriterAsync(PackWriterAsync[None]): @@ -201,7 +206,9 @@ def __init__(self, output_dir: str, **k: Unpack[PackProperties]): # If the output dir exists and is non-empty (or is not a # directory), raise an error. - if os.path.exists(output_dir) and (not os.path.isdir(output_dir) or os.listdir(output_dir)): + if os.path.exists(output_dir) and ( + not os.path.isdir(output_dir) or os.listdir(output_dir) + ): msg = f"Output directory '{output_dir}' exists and is not empty." raise ValueError(msg) @@ -262,7 +269,9 @@ async def _write_properties_file(self) -> None: """ ).lstrip() - await self._write_to_path(os.path.join(FOLDER_INFO_DIR, PROPERTIES_FILE), text.encode("utf-8")) + await self._write_to_path( + os.path.join(FOLDER_INFO_DIR, PROPERTIES_FILE), text.encode("utf-8") + ) # Write pack metadata, e.g. tags. async def _write_xmp_file(self) -> None: @@ -273,7 +282,9 @@ async def _write_xmp_file(self) -> None: if len(tag_values) == 0: msg = f"Tag `{tag_name}` is empty for path `{path}`" raise ValueError(msg) - rdf_items.append("|".join(escape(val) for val in [tag_name, *tag_values])) + rdf_items.append( + "|".join(escape(val) for val in [tag_name, *tag_values]) + ) rdf_indent = " " tags_text += textwrap.indent( @@ -285,7 +296,9 @@ async def _write_xmp_file(self) -> None: """ ).lstrip("\n") - + "\n".join([f" {rdf_item}" for rdf_item in rdf_items]) + + "\n".join( + [f" {rdf_item}" for rdf_item in rdf_items] + ) + textwrap.dedent( """ @@ -322,7 +335,9 @@ async def _write_xmp_file(self) -> None: """ ).lstrip() - await self._write_to_path(os.path.join(FOLDER_INFO_DIR, XMP_FILE), xmp_text.encode("utf-8")) + await self._write_to_path( + os.path.join(FOLDER_INFO_DIR, XMP_FILE), xmp_text.encode("utf-8") + ) def _preview_path(self, path: str) -> str: return os.path.join(FOLDER_INFO_DIR, "Previews", f"{path}.ogg") diff --git a/tests/test_alpacks.py b/tests/test_alpacks.py index ee76b6a..6070582 100644 --- a/tests/test_alpacks.py +++ b/tests/test_alpacks.py @@ -1,16 +1,17 @@ import os +import asyncio import tempfile -from alpacks import DirectoryPackWriter +import alpacks -def test_simple_directory() -> None: +def test_directory() -> None: name = "Test" unique_id = "test.id" with tempfile.TemporaryDirectory() as output_dir: path = "test.txt" version = "2.3.4" - with DirectoryPackWriter( + with alpacks.DirectoryPackWriter( output_dir, name=name, unique_id=unique_id, @@ -24,10 +25,14 @@ def test_simple_directory() -> None: with open(os.path.join(output_dir, path)) as f: assert f.read() == "test-content" - with open(os.path.join(output_dir, "Ableton Folder Info", "Previews", f"{path}.ogg")) as f: + with open( + os.path.join(output_dir, "Ableton Folder Info", "Previews", f"{path}.ogg") + ) as f: assert f.read() == "test-preview-content" - with open(os.path.join(output_dir, "Ableton Folder Info", "Properties.cfg")) as f: + with open( + os.path.join(output_dir, "Ableton Folder Info", "Properties.cfg") + ) as f: properties_text = f.read() assert f'String PackUniqueID = "{unique_id}";' in properties_text assert f'String PackDisplayName = "{name}";' in properties_text @@ -39,13 +44,33 @@ def test_simple_directory() -> None: assert f"Int {field} = {value};" in properties_text xmp_files = [ - file for file in os.listdir(os.path.join(output_dir, "Ableton Folder Info")) if file.endswith(".xmp") + file + for file in os.listdir(os.path.join(output_dir, "Ableton Folder Info")) + if file.endswith(".xmp") ] assert len(xmp_files) == 1 xmp_file_path = os.path.join(output_dir, "Ableton Folder Info", xmp_files[0]) with open(xmp_file_path) as xmp_file: xmp_content = xmp_file.read() - assert f"{unique_id}" in xmp_content + assert ( + f"{unique_id}" in xmp_content + ) assert "2.3.4" in xmp_content assert "Tag Name|Tag Value|Subtag Value" in xmp_content + + +def test_simple_directory_async() -> None: + async def run() -> None: + with tempfile.TemporaryDirectory() as output_dir: + async with alpacks.DirectoryPackWriterAsync( + output_dir, name="Test", unique_id="test.id" + ) as pack_writer: + # Simple test, just make sure the write can happen + # without errors. + await pack_writer.set_file_content("path.adg", b"content") + + with open(os.path.join(output_dir, "path.adg")) as f: + assert f.read() == "content" + + asyncio.run(run())