diff --git a/pyodk/_endpoints/form_draft_attachments.py b/pyodk/_endpoints/form_draft_attachments.py index 47b18da..cc6ae9a 100644 --- a/pyodk/_endpoints/form_draft_attachments.py +++ b/pyodk/_endpoints/form_draft_attachments.py @@ -1,5 +1,5 @@ import logging -from pathlib import Path +from os import PathLike from pyodk._endpoints import bases from pyodk._utils import validators as pv @@ -34,7 +34,7 @@ def __init__( def upload( self, - file_path: str, + file_path: PathLike | str, file_name: str | None = None, form_id: str | None = None, project_id: int | None = None, @@ -50,7 +50,7 @@ def upload( try: pid = pv.validate_project_id(project_id, self.default_project_id) fid = pv.validate_form_id(form_id, self.default_form_id) - file_path = Path(pv.validate_file_path(file_path)) + file_path = pv.validate_file_path(file_path) if file_name is None: file_name = pv.validate_str(file_path.name, key="file_name") except PyODKError as err: diff --git a/pyodk/_endpoints/form_drafts.py b/pyodk/_endpoints/form_drafts.py index 3066b3e..2d918dd 100644 --- a/pyodk/_endpoints/form_drafts.py +++ b/pyodk/_endpoints/form_drafts.py @@ -1,6 +1,7 @@ import logging -from contextlib import nullcontext -from pathlib import Path +from io import BytesIO +from os import PathLike +from zipfile import is_zipfile from pyodk._endpoints import bases from pyodk._utils import validators as pv @@ -8,6 +9,76 @@ from pyodk.errors import PyODKError log = logging.getLogger(__name__) +CONTENT_TYPES = { + ".xlsx": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", + ".xls": "application/vnd.ms-excel", + ".xml": "application/xml", +} + + +def is_xls_file(buf: bytes) -> bool: + """ + Implements the Microsoft Excel (Office 97-2003) document type matcher. + + From h2non/filetype v1.2.0, MIT License, Copyright (c) 2016 Tomás Aparicio + + :param buf: buffer to match against. + """ + if len(buf) > 520 and buf[0:8] == b"\xd0\xcf\x11\xe0\xa1\xb1\x1a\xe1": + if buf[512:516] == b"\xfd\xff\xff\xff" and (buf[518] == 0x00 or buf[518] == 0x02): + return True + if buf[512:520] == b"\x09\x08\x10\x00\x00\x06\x05\x00": + return True + if ( + len(buf) > 2095 + and b"\xe2\x00\x00\x00\x5c\x00\x70\x00\x04\x00\x00Calc" in buf[1568:2095] + ): + return True + + return False + + +def get_definition_data( + definition: PathLike | str | bytes | None, +) -> (bytes, str, str | None): + """ + Get the form definition data from a path or bytes. + + :param definition: The path to the file to upload (string or PathLike), or the + form definition in memory (string (XML) or bytes (XLS/XLSX)). + :return: definition_data, content_type, file_path_stem (if any). + """ + definition_data = None + content_type = None + file_path_stem = None + if ( + isinstance(definition, str) + and """http://www.w3.org/2002/xforms""" in definition[:1000] + ): + content_type = CONTENT_TYPES[".xml"] + definition_data = definition.encode("utf-8") + elif isinstance(definition, str | PathLike): + file_path = pv.validate_file_path(definition) + file_path_stem = file_path.stem + definition_data = file_path.read_bytes() + if file_path.suffix not in CONTENT_TYPES: + raise PyODKError( + "Parameter 'definition' file name has an unexpected file extension, " + "expected one of '.xlsx', '.xls', '.xml'." + ) + content_type = CONTENT_TYPES[file_path.suffix] + elif isinstance(definition, bytes): + definition_data = definition + if is_zipfile(BytesIO(definition)): + content_type = CONTENT_TYPES[".xlsx"] + elif is_xls_file(definition): + content_type = CONTENT_TYPES[".xls"] + if definition_data is None or content_type is None: + raise PyODKError( + "Parameter 'definition' has an unexpected file type, " + "expected one of '.xlsx', '.xls', '.xml'." + ) + return definition_data, content_type, file_path_stem class URLs(bases.Model): @@ -36,15 +107,16 @@ def __init__( def _prep_form_post( self, - file_path: Path | str | None = None, + definition: PathLike | str | bytes | None = None, ignore_warnings: bool | None = True, form_id: str | None = None, project_id: int | None = None, - ) -> (str, str, dict, dict): + ) -> (str, str, dict, dict, bytes | None): """ Prepare / validate input arguments for POSTing a new form definition or version. - :param file_path: The path to the file to upload. + :param definition: The path to the file to upload (string or PathLike), or the + form definition in memory (string (XML) or bytes (XLS/XLSX)). :param form_id: The xmlFormId of the Form being referenced. :param project_id: The id of the project this form belongs to. :param ignore_warnings: If True, create the form if there are XLSForm warnings. @@ -54,47 +126,33 @@ def _prep_form_post( pid = pv.validate_project_id(project_id, self.default_project_id) headers = {} params = {} + definition_data = None file_path_stem = None - if file_path is not None: - file_path = Path(pv.validate_file_path(file_path)) - file_path_stem = file_path.stem + if definition is not None: + definition_data, content_type, file_path_stem = get_definition_data( + definition=definition + ) + headers["Content-Type"] = content_type fid = pv.validate_form_id( form_id, self.default_form_id, file_path_stem, self.session.get_xform_uuid(), ) - if file_path is not None: + if definition is not None: if ignore_warnings is not None: key = "ignore_warnings" params["ignoreWarnings"] = pv.validate_bool(ignore_warnings, key=key) - if file_path.suffix == ".xlsx": - content_type = ( - "application/vnd.openxmlformats-" - "officedocument.spreadsheetml.sheet" - ) - elif file_path.suffix == ".xls": - content_type = "application/vnd.ms-excel" - elif file_path.suffix == ".xml": - content_type = "application/xml" - else: - raise PyODKError( # noqa: TRY301 - "Parameter 'file_path' file name has an unexpected extension, " - "expected one of '.xlsx', '.xls', '.xml'." - ) - headers = { - "Content-Type": content_type, - "X-XlsForm-FormId-Fallback": self.session.urlquote(fid), - } + headers["X-XlsForm-FormId-Fallback"] = self.session.urlquote(fid) except PyODKError as err: log.error(err, exc_info=True) raise - return pid, fid, headers, params + return pid, fid, headers, params, definition_data def create( self, - file_path: Path | str | None = None, + definition: PathLike | str | bytes | None = None, ignore_warnings: bool | None = True, form_id: str | None = None, project_id: int | None = None, @@ -102,28 +160,26 @@ def create( """ Create a Form Draft. - :param file_path: The path to the file to upload. + :param definition: The path to the file to upload (string or PathLike), or the + form definition in memory (string (XML) or bytes (XLS/XLSX)). :param form_id: The xmlFormId of the Form being referenced. :param project_id: The id of the project this form belongs to. :param ignore_warnings: If True, create the form if there are XLSForm warnings. """ - pid, fid, headers, params = self._prep_form_post( - file_path=file_path, + pid, fid, headers, params, form_def = self._prep_form_post( + definition=definition, ignore_warnings=ignore_warnings, form_id=form_id, project_id=project_id, ) - - with open(file_path, "rb") if file_path is not None else nullcontext() as fd: - response = self.session.response_or_error( - method="POST", - url=self.session.urlformat(self.urls.post, project_id=pid, form_id=fid), - logger=log, - headers=headers, - params=params, - data=fd, - ) - + response = self.session.response_or_error( + method="POST", + url=self.session.urlformat(self.urls.post, project_id=pid, form_id=fid), + logger=log, + headers=headers, + params=params, + data=form_def, + ) data = response.json() return data["success"] diff --git a/pyodk/_endpoints/forms.py b/pyodk/_endpoints/forms.py index 807a150..74caec8 100644 --- a/pyodk/_endpoints/forms.py +++ b/pyodk/_endpoints/forms.py @@ -1,6 +1,7 @@ import logging from collections.abc import Callable, Iterable from datetime import datetime +from os import PathLike from typing import Any from pyodk._endpoints import bases @@ -122,7 +123,7 @@ def get( def create( self, - definition: str, + definition: PathLike | str | bytes, ignore_warnings: bool | None = True, form_id: str | None = None, project_id: int | None = None, @@ -130,29 +131,29 @@ def create( """ Create a form. - :param definition: The path to a form definition file to upload. + :param definition: The path to the file to upload (string or PathLike), or the + form definition in memory (string (XML) or bytes (XLS/XLSX)). :param ignore_warnings: If True, create the form if there are XLSForm warnings. :param form_id: The xmlFormId of the Form being referenced. :param project_id: The id of the project this form belongs to. :return: An object representation of the Form's metadata. """ fd = FormDraftService(session=self.session, **self._default_kw()) - pid, fid, headers, params = fd._prep_form_post( - file_path=definition, + pid, fid, headers, params, form_def = fd._prep_form_post( + definition=definition, ignore_warnings=ignore_warnings, form_id=form_id, project_id=project_id, ) params["publish"] = True - with open(definition, "rb") as fd: - response = self.session.response_or_error( - method="POST", - url=self.session.urlformat(self.urls.forms, project_id=pid), - logger=log, - headers=headers, - params=params, - data=fd, - ) + response = self.session.response_or_error( + method="POST", + url=self.session.urlformat(self.urls.forms, project_id=pid), + logger=log, + headers=headers, + params=params, + data=form_def, + ) data = response.json() return Form(**data) @@ -160,8 +161,8 @@ def update( self, form_id: str, project_id: int | None = None, - definition: str | None = None, - attachments: Iterable[str] | None = None, + definition: PathLike | str | bytes | None = None, + attachments: Iterable[PathLike | str] | None = None, version_updater: Callable[[str], str] | None = None, ) -> None: """ @@ -187,7 +188,8 @@ def update( :param form_id: The xmlFormId of the Form being referenced. :param project_id: The id of the project this form belongs to. - :param definition: The path to a form definition file to upload. The form + :param definition: The path to the file to upload (string or PathLike), or the + form definition in memory (string (XML) or bytes (XLS/XLSX)). The form definition must include an updated version string. :param attachments: The paths of the form attachment file(s) to upload. :param version_updater: A function that accepts a version name string and returns @@ -203,7 +205,7 @@ def update( # Start a new draft - with a new definition, if provided. fp_ids = {"form_id": form_id, "project_id": project_id} fd = FormDraftService(session=self.session, **self._default_kw()) - if not fd.create(file_path=definition, **fp_ids): + if not fd.create(definition=definition, **fp_ids): raise PyODKError("Form update (form draft create) failed.") # Upload the attachments, if any. diff --git a/pyodk/_utils/validators.py b/pyodk/_utils/validators.py index 3b3281a..cf1b9e6 100644 --- a/pyodk/_utils/validators.py +++ b/pyodk/_utils/validators.py @@ -1,8 +1,10 @@ from collections.abc import Callable +from os import PathLike from pathlib import Path from typing import Any from pydantic.v1 import validators as v +from pydantic.v1.errors import PydanticValueError from pydantic_core._pydantic_core import ValidationError from pyodk._utils.utils import coalesce @@ -20,7 +22,7 @@ def wrap_error(validator: Callable, key: str, value: Any) -> Any: """ try: return validator(value) - except ValidationError as err: + except (ValidationError, PydanticValueError) as err: msg = f"{key}: {err!s}" raise PyODKError(msg) from err @@ -97,7 +99,7 @@ def validate_dict(*args: dict, key: str) -> int: ) -def validate_file_path(*args: str) -> Path: +def validate_file_path(*args: PathLike | str) -> Path: def validate_fp(f): p = v.path_validator(f) return v.path_exists_validator(p) diff --git a/pyproject.toml b/pyproject.toml index 80622e7..7b16aac 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,7 +17,8 @@ dependencies = [ # Install with `pip install pyodk[dev]`. dev = [ "ruff==0.3.4", # Format and lint - "openpyxl==3.1.2" # Create test XLSX files + "openpyxl==3.1.2", # Create test XLSX files + "xlwt==1.3.0", # Create test XLS files ] docs = [ "mkdocs==1.5.3", diff --git a/tests/endpoints/test_forms.py b/tests/endpoints/test_forms.py index a46e1a1..119c343 100644 --- a/tests/endpoints/test_forms.py +++ b/tests/endpoints/test_forms.py @@ -3,11 +3,17 @@ from datetime import datetime from functools import wraps from unittest import TestCase -from unittest.mock import MagicMock, mock_open, patch +from unittest.mock import MagicMock, patch from pyodk._endpoints.form_draft_attachments import FormDraftAttachmentService -from pyodk._endpoints.form_drafts import FormDraftService -from pyodk._endpoints.form_drafts import log as form_drafts_log +from pyodk._endpoints.form_drafts import ( + CONTENT_TYPES, + FormDraftService, + get_definition_data, +) +from pyodk._endpoints.form_drafts import ( + log as form_drafts_log, +) from pyodk._endpoints.forms import Form, FormService from pyodk._utils.session import Session from pyodk.client import Client @@ -15,6 +21,7 @@ from tests.resources import CONFIG_DATA, forms_data from tests.utils import utils +from tests.utils.md_table import md_table_to_bytes, md_table_to_bytes_xls @dataclass @@ -135,7 +142,7 @@ def test_update__def_only__create_publish_no_upload(self, ctx: MockContext): client = Client() client.forms.update("foo", definition="/some/path/file.xlsx") ctx.fd_create.assert_called_once_with( - file_path="/some/path/file.xlsx", + definition="/some/path/file.xlsx", form_id="foo", project_id=None, ) @@ -150,7 +157,7 @@ def test_update__attach_only__create_upload_publish(self, ctx: MockContext): client = Client() client.forms.update("foo", attachments=["/some/path/a.jpg", "/some/path/b.jpg"]) ctx.fd_create.assert_called_once_with( - file_path=None, + definition=None, form_id="foo", project_id=None, ) @@ -188,7 +195,7 @@ def test_update__def_and_attach__create_upload_publish(self, ctx: MockContext): attachments=["/some/path/a.jpg", "/some/path/b.jpg"], ) ctx.fd_create.assert_called_once_with( - file_path="/some/path/form.xlsx", + definition="/some/path/form.xlsx", form_id="foo", project_id=None, ) @@ -209,13 +216,12 @@ def update__def_encoding_steps( ): client = Client() - def mock_wrap_error(**kwargs): - return kwargs["value"] + def mock_get_def_data(*args, **kwargs): + return "", CONTENT_TYPES[".xlsx"], "" with ( patch.object(Session, "response_or_error") as mock_response, - patch("pyodk._utils.validators.wrap_error", mock_wrap_error), - patch("builtins.open", mock_open(), create=True) as mock_open_patch, + patch("pyodk._endpoints.form_drafts.get_definition_data", mock_get_def_data), ): client.forms.update(form_id, definition=definition) mock_response.assert_any_call( @@ -229,7 +235,7 @@ def mock_wrap_error(**kwargs): "X-XlsForm-FormId-Fallback": expected_fallback_id, }, params={"ignoreWarnings": True}, - data=mock_open_patch.return_value, + data="", ) def test_update__def_encoding(self): @@ -237,7 +243,6 @@ def test_update__def_encoding(self): test_cases = ( ("foo", "/some/path/foo.xlsx", "projects/1/forms/foo/draft", "foo"), ("foo", "/some/path/✅.xlsx", "projects/1/forms/foo/draft", "foo"), - (None, "/some/path/✅.xlsx", "projects/1/forms/%E2%9C%85/draft", "%E2%9C%85"), ("✅", "/some/path/✅.xlsx", "projects/1/forms/%E2%9C%85/draft", "%E2%9C%85"), ( "✅", @@ -245,7 +250,6 @@ def test_update__def_encoding(self): "projects/1/forms/%E2%9C%85/draft", "%E2%9C%85", ), - (None, "/some/path/foo.xlsx", "projects/1/forms/foo/draft", "foo"), ) for case in test_cases: with self.subTest(msg=str(case)): @@ -273,3 +277,89 @@ def test_update__with_def_with_version_updater__raises(self): "Must not specify both a definition and version_updater.", err.exception.args[0], ) + + +class TestGetDefinitionData(TestCase): + def test_get_definition_data__xml_file(self): + """Should get the expected definition data and content type.""" + form_data = forms_data.get_xml__range_draft() + with utils.get_temp_file(suffix=".xml") as fp: + fp.write_text(form_data, newline="\n") + expected_stem = fp.stem + definition_data, content_type, file_path_stem = get_definition_data( + definition=fp + ) + self.assertEqual(form_data, definition_data.decode("utf-8")) + self.assertEqual(CONTENT_TYPES[".xml"], content_type) + self.assertEqual(expected_stem, file_path_stem) + + def test_get_definition_data__xml_str(self): + """Should get the expected definition data and content type.""" + form_data = forms_data.get_xml__range_draft() + definition_data, content_type, file_path_stem = get_definition_data( + definition=form_data + ) + self.assertEqual(form_data, definition_data.decode("utf-8")) + self.assertEqual(CONTENT_TYPES[".xml"], content_type) + self.assertEqual(None, file_path_stem) + + def test_get_definition_data__xls_file(self): + """Should get the expected definition data and content type.""" + form_data = md_table_to_bytes_xls(forms_data.get_md__pull_data()) + with utils.get_temp_file(suffix=".xls") as fp: + fp.write_bytes(form_data) + expected_stem = fp.stem + definition_data, content_type, file_path_stem = get_definition_data( + definition=fp + ) + self.assertEqual(form_data, definition_data) + self.assertEqual(CONTENT_TYPES[".xls"], content_type) + self.assertEqual(expected_stem, file_path_stem) + + def test_get_definition_data__xls_bytes(self): + """Should get the expected definition data and content type.""" + form_data = md_table_to_bytes_xls(forms_data.get_md__pull_data()) + definition_data, content_type, file_path_stem = get_definition_data( + definition=form_data + ) + self.assertEqual(form_data, definition_data) + self.assertEqual(CONTENT_TYPES[".xls"], content_type) + self.assertEqual(None, file_path_stem) + + def test_get_definition_data__xlsx_file(self): + """Should get the expected definition data and content type.""" + form_data = md_table_to_bytes(forms_data.get_md__pull_data()) + with utils.get_temp_file(suffix=".xlsx") as fp: + fp.write_bytes(form_data) + expected_stem = fp.stem + definition_data, content_type, file_path_stem = get_definition_data( + definition=fp + ) + self.assertEqual(form_data, definition_data) + self.assertEqual(CONTENT_TYPES[".xlsx"], content_type) + self.assertEqual(expected_stem, file_path_stem) + + def test_get_definition_data__xlsx_bytes(self): + """Should get the expected definition data and content type.""" + form_data = md_table_to_bytes(forms_data.get_md__pull_data()) + definition_data, content_type, file_path_stem = get_definition_data( + definition=form_data + ) + self.assertEqual(form_data, definition_data) + self.assertEqual(CONTENT_TYPES[".xlsx"], content_type) + self.assertEqual(None, file_path_stem) + + def test_get_definition_data__unknown_file(self): + """Should throw an error if an unknown file extension is specified.""" + form_data = forms_data.get_xml__range_draft() + with utils.get_temp_file(suffix=".docx") as fp: + fp.write_text(form_data, newline="\n") + with self.assertRaises(PyODKError) as err: + get_definition_data(definition=fp) + self.assertIn("unexpected file extension", err.exception.args[0]) + + def test_get_definition_data__unknown_bytes(self): + """Should throw an error if an unknown file type is provided.""" + with self.assertRaises(PyODKError) as err: + get_definition_data(definition=b"hello world") + self.assertIn("unexpected file type", err.exception.args[0]) diff --git a/tests/test_client.py b/tests/test_client.py index ba73874..ac62fca 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -11,7 +11,7 @@ create_new_form__xml, get_latest_form_version, ) -from tests.utils.md_table import md_table_to_temp_dir +from tests.utils.md_table import md_table_to_bytes, md_table_to_temp_dir from tests.utils.submissions import ( create_new_or_get_last_submission, create_or_update_submission_with_comment, @@ -127,19 +127,16 @@ def test_direct_context(self): def test_form_create__new_definition_xml(self): """Should create a new form with the new definition.""" form_id = self.client.session.get_xform_uuid() - with utils.get_temp_file(suffix=".xml") as fp: - fp.write_text(forms_data.get_xml__range_draft(form_id=form_id)) - self.client.forms.create( - form_id=form_id, - definition=fp.as_posix(), - ) + self.client.forms.create( + form_id=form_id, + definition=forms_data.get_xml__range_draft(form_id=form_id), + ) def test_form_create__new_definition_xlsx(self): """Should create a new form with the new definition.""" - form_id = "no_form_id" form_def = forms_data.get_md__pull_data() - with md_table_to_temp_dir(form_id=form_id, mdstr=form_def) as fp: - form = self.client.forms.create(definition=fp.as_posix()) + wb = md_table_to_bytes(mdstr=form_def) + form = self.client.forms.create(definition=wb) self.assertTrue(form.xmlFormId.startswith("uuid:")) # Below tests assume project has forms by these names already published. diff --git a/tests/utils/md_table.py b/tests/utils/md_table.py index 17a8467..e15e930 100644 --- a/tests/utils/md_table.py +++ b/tests/utils/md_table.py @@ -4,9 +4,11 @@ import re from contextlib import contextmanager +from io import BytesIO from pathlib import Path from openpyxl import Workbook +from xlwt import Workbook as XLSWorkbook from tests.utils.utils import get_temp_dir @@ -88,3 +90,35 @@ def md_table_to_temp_dir(form_id: str, mdstr: str) -> Path: fp = Path(td) / f"{form_id}.xlsx" md_table_to_workbook(mdstr).save(fp.as_posix()) yield fp + + +def md_table_to_bytes(mdstr: str) -> bytes: + """ + Convert MarkDown table string to XLSX Workbook bytes. + + :param mdstr: The MarkDown table string. + """ + wb = md_table_to_workbook(mdstr=mdstr) + fd = BytesIO() + wb.save(fd) + fd.seek(0) + return fd.getvalue() + + +def md_table_to_bytes_xls(mdstr: str) -> bytes: + """ + Convert MarkDown table string to XLS Workbook bytes. + + :param mdstr: The MarkDown table string. + """ + md_data = md_table_to_ss_structure(mdstr=mdstr) + wb = XLSWorkbook() + for key, rows in md_data: + sheet = wb.add_sheet(sheetname=key) + for ir, row in enumerate(rows): + for ic, cell in enumerate(row): + sheet.write(ir, ic, cell) + fd = BytesIO() + wb.save(fd) + fd.seek(0) + return fd.getvalue()