diff --git a/.env b/.env deleted file mode 100644 index 7eecf9c..0000000 --- a/.env +++ /dev/null @@ -1,7 +0,0 @@ -export CELERY_BROKER=redis://localhost:6379/0 -export ELASTICSEARCH_INGEST_SERVER=localhost:9200 -export ELASTICSEARCH_INGEST_USE_SSL=0 -export PLONE_SERVICE=http://localhost:8080 -export PLONE_PATH=Plone -export PLONE_USER=admin -export PLONE_PASSWORD=admin diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..7ef4f64 --- /dev/null +++ b/.flake8 @@ -0,0 +1,22 @@ +# Generated from: +# https://github.com/plone/meta/tree/master/config/default +# See the inline comments on how to expand/tweak this configuration file +[flake8] +doctests = 1 +ignore = + # black takes care of line length + E501, + # black takes care of where to break lines + W503, + # black takes care of spaces within slicing (list[:]) + E203, + # black takes care of spaces after commas + E231, + +## +# Add extra configuration options in .meta.toml: +# [flake8] +# extra_lines = """ +# _your own configuration lines_ +# """ +## diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml new file mode 100644 index 0000000..816a2e3 --- /dev/null +++ b/.github/workflows/ci.yaml @@ -0,0 +1,25 @@ +name: Python package CI + +on: + - push + - pull_request + +jobs: + build: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ['3.8', '3.9', '3.10', '3.11', '3.12'] + + steps: + - uses: actions/checkout@v3 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: | + python -m pip install --upgrade pip + python -m pip install tox tox-gh-actions + - name: Test with tox + run: tox \ No newline at end of file diff --git a/.gitignore b/.gitignore index 864e9bb..df70903 100644 --- a/.gitignore +++ b/.gitignore @@ -6,18 +6,12 @@ *.swp # dirs bin/ -buildout-cache/ -develop-eggs/ -eggs/ +venv/ htmlcov/ include/ lib/ local/ -node_modules/ -parts/ -src/* dist/* -test.plone_addon/ var/ # files .installed.cfg @@ -28,6 +22,7 @@ output.xml pip-selfcheck.json report.html .vscode/ +.tox .python-version reports/ # excludes diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..02a90f5 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,42 @@ +ci: + autofix_prs: false + autoupdate_schedule: monthly + +repos: +- repo: https://github.com/asottile/pyupgrade + rev: v3.15.0 + hooks: + - id: pyupgrade +- repo: https://github.com/pycqa/isort + rev: 5.12.0 + hooks: + - id: isort +- repo: https://github.com/psf/black + rev: 23.9.1 + hooks: + - id: black +- repo: https://github.com/PyCQA/flake8 + rev: 6.1.0 + hooks: + - id: flake8 +- repo: https://github.com/pre-commit/mirrors-mypy + rev: v1.5.1 + hooks: + - id: mypy + additional_dependencies: + - "types-requests" + - "pytest-stub" +# - repo: https://github.com/codespell-project/codespell +# rev: v2.2.5 +# hooks: +# - id: codespell +# additional_dependencies: +# - tomli +- repo: https://github.com/mgedmin/check-manifest + rev: "0.49" + hooks: + - id: check-manifest +- repo: https://github.com/regebro/pyroma + rev: "4.2" + hooks: + - id: pyroma diff --git a/CHANGES.rst b/CHANGES.rst index 070b066..0b4d9df 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -2,8 +2,8 @@ Changelog ========= -1.4.1 (unreleased) ------------------- +2.0.0b1 (unreleased) +-------------------- - Fix OpenSearch / ElasticSearch switch. [ksuess] - Update example mapping for nested field "NamedBlobFile": @@ -11,6 +11,17 @@ Changelog [ksuess] - code-style: black & isort [jensens] - Add support for Plone ClassicUI based sites (no Volto blocks available) [jensens] +- Move mappings.json, analysis.json.example with its lexicon out of code into examples directory and pimped docs on how to use all this. + [jensens] +- Add docker-compose file to start OpensSearch to example directory and move `.env` to example too. + [jensens] +- rename `ELASTIC_*` environment variables to have an consistent naming scheme, see README for details. [jensens] +- Add tox, Github Actions, CI and CD. [jensens] +- Refactor field-map loading to not happen on startup. [jensens] +- Remove Support for OpenSearch 1.x and ElasticSearch < 8 [jensens] +- Rename .elastic.get_ingest_client to .client.get_client [jensens] +- Do not initialize a new client for each operation, but use a thread local cached one. + This speeds up indexing a lot. [jensens] 1.4 (2023-08-17) diff --git a/CONTRIBUTORS.rst b/CONTRIBUTORS.rst index aa9e86c..87cff98 100644 --- a/CONTRIBUTORS.rst +++ b/CONTRIBUTORS.rst @@ -1,5 +1,8 @@ Contributors ============ -- Jens W. Klein, jk@kleinundpartner.at -- Katja Süss, Rohberg, @ksuess +- Peter Holzer - Initiative, idea and testing. +- Jens W. Klein, jk@kleinundpartner.at - Concept & code. +- Katja Süss, Rohberg, @ksuess - Text analysis code and configuration. + +Initial implementation was made possible by `Evangelisch-reformierte Landeskirche des Kantons Zürich `_. diff --git a/MANIFEST.in b/MANIFEST.in index 3a7f455..1a6a25f 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,3 +1,5 @@ graft src/collective +graft examples include *.rst +exclude container/* global-exclude *.pyc diff --git a/README.rst b/README.rst index 98e0640..532b5c3 100644 --- a/README.rst +++ b/README.rst @@ -2,71 +2,209 @@ collective.elastic.ingest ========================= -Ingestion service queue runner between Plone RestAPI and ElasticSearch or OpenSearch. +Ingestion service queue runner between Plone RestAPI and ElasticSearch 8+ or OpenSearch 2+. Provides Celery-tasks to asynchronous index Plone content. - auto-create Open-/ElasticSearch... - index - - mapping from Plone schema using a flexible conversions file (JSON). + - mapping from Plone schema using a flexible conversions file (JSON), - ingest-attachment pipelines using (same as above) file. - task to - index a content object with all data given plus allowedRolesAndUsers and section (primary path) - unindex an content object - configure from environment variables: - celery, - - elasticsearch, + - elasticsearch or opensearch - sentry logging (optional) - +------------ Installation ------------ -Install ``collective.elastic.ingest`` (redis-ready) using pip:: +We recommended to use a Python virtual environment, create one with ``python3 -m venv venv``, and activate it in the current terminal session with ``source venv/bin/activate``. + +Install ``collective.elastic.ingest`` ready to use with redis and opensearch:: + + pip install collective.elastic.ingest[redis,opensearch] + +Depending on the queue server and index server used, the extra requirements vary: + +- index server: ``opensearch``, ``elasticsearch``. +- queue server: ``redis`` or ``rabbitmq``. + + +------------- +Configuration +------------- + +Configuration is done via environment variables and JSON files. + +Environment variables are: + +INDEX_SERVER + The URL of the ElasticSearch or OpenSearch server. + + Default: localhost:9200 + +INDEX_USE_SSL + Whether to use a secure connection or not. + + Default: 0 + +INDEX_OPENSEARCH + Whether to use OpenSearch or ElasticSearch. + + Default: 1 + +INDEX_LOGIN + Username for the ElasticSearch 8+ or OpenSearch server. + + Default: admin - pip install collective.elastic.ingest redis +INDEX_PASSWORD + Password for the ElasticSearch 8+ or OpenSearch server. -``collective.elastic.ingest`` requires ``elasticsearch``. -Specify the version according your ElasticSearch app version. -For example:: + Default: admin - pip install 'elasticsearch~=7.0' +CELERY_BROKER + The broker URL for Celery. + See `docs.celeryq.dev `_ for details. + Default: `redis://localhost:6379/0` + +PLONE_SERVICE + Base URL of the Plone Server + + Default: http://localhost:8080 + +PLONE_PATH + Path to the site to index at the Plone Server + + Default: `Plone` + +PLONE_USER + Username for the Plone Server, needs to have at least Site Administrator role. + + Default: admin + +PLONE_PASSWORD + Password for the Plone Server. + + Default: admin + +MAPPINGS_FILE + Absolute path to the mappings configuration file. + Configures field mappings from Plone schema to ElasticSearch. + + No default, must be given. + +PREPROCESSINGS_FILE + Configures preprocessing of field values before indexing. + + Default: Uses a defaults file of this package. + +ANALYSIS_FILE + (optional) Absolute path to the analysis configuration file. + +SENTRY_DSN + (optional) Sentry DSN for error reporting. + + Default: disabled + + +-------- Starting -------- -Define the configuration as environment variables:: +Run celery worker:: - CELERY_BROKER=redis://localhost:6379/0 - ELASTICSEARCH_INGEST_SERVER=localhost:9200 - ELASTICSEARCH_INGEST_USE_SSL=0 - PLONE_SERVICE=http://localhost:8080 - PLONE_PATH=Plone - PLONE_USER=admin - PLONE_PASSWORD=admin + celery -A collective.elastic.ingest.celery.app worker -c 1 -l info -Request OpenSearch:: +Or with debug information:: - OPENSEARCH=1 - ELASTICSEARCH_INGEST_LOGIN=admin - ELASTICSEARCH_INGEST_PASSWORD=admin + celery -A collective.elastic.ingest.celery.app worker -c 1 -l debug -Optional (defaults used if not given):: +The number is the concurrency of the worker. +For production use, it should be set to the number of Plone backends available for indexing load. - ANALYSIS_FILE=/full/path/to/analysis.json - MAPPINGS_FILE=/full/path/to/mappings.json - PREPROCESSINGS_FILE=/full/path/to/preprocessings.json - SENTRY_DSN= (disabled by default) +--------- +OCI Image +--------- -Then run celery:: +For use in Docker, Podman, Kubernetes, ..., an OCI image is provided at ... - celery -A collective.elastic.ingest.celery.app worker -l info +The environment variables above are used as configuration. -Or with debug information:: +Additional the following environment variables are used: + +CELERY_CONCURENCY + The number of concurrent tasks to run. + + Default: 1 + +CELERY_LOGLEVEL + The log level for celery. + + Default: info + + +-------- +Examples +-------- + +Example configuration files are provided in the ``/examples`` directory. + +OpenSearch with Docker Compose +------------------------------ + +A docker-compose file ``docker-compose.yml`` and a ``Dockerfile`` to start an OpenSearch server is provided. + +Precondition: + +- Docker and docker-compose are installed. +- Max virtual memory map needs increase to run this: `sudo sysctl -w vm.max_map_count=262144` (not permanent, `see StackOverflow post `_). + +Steps to start the example OpenSearch Server with the ``ingest-attachment`` plugin installed: + +- enter the directory ``cd examples`` +- build the docker image with + + ```bash + docker buildx use default + docker buildx build --tag opensearch-ingest-attachment:latest Dockerfile + ``` +- start the server with ``docker-compose up``. + +Now you have an OpenSearch server running on ``http://localhost:9200`` and an OpenSearch Dashboard running on ``http://localhost:5601`` (user/pass: admin/admin). +The OpenSearch server has the ``ingest-attachment`` plugin installed. +The plugin enables OpenSearch to extract text from binary files like PDFs. + +Open another terminal. - celery -A collective.elastic.ingest.celery.app worker -l debug +An `.env` file is provided with the environment variables ready to use with the docker-compose file. +Run ``source examples/.env`` to load the environment variables. +Then start the celery worker with ``celery -A collective.elastic.ingest.celery.app worker -l debug``. +In another terminal window `run a Plone backend `_ at ``http://localhost:8080/Plone`` with the add-on `collective.elastic.plone` installed. +There, create an item or modify an existing one. +You should see the indexing task in the celery worker terminal window. + +Basic Mappings +-------------- + +A very basic mappings file ``mappings-basic.json`` is provided. +To use it set `MAPPINGS_FILE=examples/mappings-basic.json` and then start the celery worker. + +Complex Mapping With German Text Analysis +----------------------------------------- + +A complex mappings file with german text analysis configured, ``mappings-german-analysis.json`` is provided. +It comes together with the matching analysis configuration file ``analysis-german.json`` and a stub lexicon file ``elasticsearch-lexicon-german.txt``. +Read the next section for more information about text analysis. + +------------- Text Analysis ------------- @@ -131,6 +269,7 @@ The response delivers the tokens for the analyzed text 'Lehrstellenbörse'. Note: The file ``elasticsearch-lexicon.txt`` with the word list used by the ``decompounder`` of the sample analysis configuration in ``analysis.json.example`` has to be located in the configuration directory of your elasticsearch server. +----------- Source Code ----------- @@ -144,37 +283,18 @@ We appreciate any contribution and if a release is needed to be done on pypi, pl We also offer commercial support if any training, coaching, integration or adaptions are needed. -Contributions -------------- - -Initial implementation was made possible by `Evangelisch-reformierte Landeskirche des Kantons Zürich `_. - -Idea and testing by Peter Holzer - -Concept & code by Jens W. Klein - -Text analysis code and configuration Katja Süss - - - +---------------------------- Installation for development ---------------------------- - clone source code repository, - enter repository directory - recommended: create a virtualenv ``python -mvenv env`` -- development install ``./bin/env/pip install -e .`` -- add redis support ``./bin/env/pip install redis``. -- load environment configuration ``source .env``. - +- development install ``./bin/env/pip install -e .[test,redis,opensearch]`` +- load environment configuration ``source examples/.env``. -Todo ----- - -- query status of a task -- simple statistics about tasks-count: pending, done, errored -- celery retry on failure, i.e. restart of ElasticSearch, Plone, ... +------- License ------- diff --git a/constraints.txt b/constraints.txt deleted file mode 100644 index f145db9..0000000 --- a/constraints.txt +++ /dev/null @@ -1,15 +0,0 @@ -amqp==2.6.1 -billiard==3.6.4.0 -CacheControl==0.12.6 -celery==4.4.7 -certifi==2021.5.30 -chardet==4.0.0 -elasticsearch==7.13.1 -idna==2.10 -kombu==4.6.11 -msgpack==1.0.2 -pytz==2021.1 -redis==3.5.3 -requests==2.25.1 -urllib3==1.26.5 -wcwidth==0.2.5 diff --git a/container/Dockerfile b/container/Dockerfile new file mode 100644 index 0000000..6cd90b7 --- /dev/null +++ b/container/Dockerfile @@ -0,0 +1,21 @@ +# syntax=docker/dockerfile:1 +FROM python:3.12-slim + +ARG IMAGE_VERSION +ARG PACKAGE_VERSION + +# TODO: use https://github.com/opencontainers/image-spec/blob/main/annotations.md +LABEL version="${PACKAGE_VERSION}-${IMAGE_VERSION}" +LABEL description="Ingestion service queue runner between Plone RestAPI and ElasticSearch or OpenSearch." +LABEL maintainer="Jens Klein" +LABEL org.label-schema.name="collective.elastic.ingest" +LABEL org.label-schema.description="Ingestion service queue runner between Plone RestAPI and ElasticSearch or OpenSearch." +LABEL org.label-schema.vendor="Klein & Partner KG and Contributors" + +RUN <=2"] +elasticsearch = ["elasticsearch>=8.0"] +test = [ + "pytest", + "requests-mock", + "pdbpp", +] + +[build-system] +requires = ["setuptools>=61"] +build-backend = "setuptools.build_meta" + +[tool.setuptools.packages.find] +where = ["src"] + +[tool.pytest.ini_options] +minversion = "6.0" +testpaths = [ + "src", +] + +[tool.isort] +profile = "plone" + +[tool.black] +include = "src" + +[tool.codespell] +ignore-words-list = "discreet," +skip = './examples/*,./venv/*' + +[tool.check-manifest] +ignore = [ + ".editorconfig", + ".pre-commit-config.yaml", + "tox.ini", + "mypy.ini", + ".flake8", + "mx.ini", + +] + +[zest.releaser] +create-wheel = true \ No newline at end of file diff --git a/setup.cfg b/setup.cfg deleted file mode 100644 index e4ff5a0..0000000 --- a/setup.cfg +++ /dev/null @@ -1,11 +0,0 @@ -[mypy] -ignore_missing_imports = True - -[bdist_wheel] -universal=1 - -[isort] -profile = plone - -[zest.releaser] -create-wheel = yes \ No newline at end of file diff --git a/setup.py b/setup.py deleted file mode 100644 index f69f4ba..0000000 --- a/setup.py +++ /dev/null @@ -1,56 +0,0 @@ -# -*- coding: utf-8 -*- -from setuptools import find_packages -from setuptools import setup - - -long_description = "\n\n".join( - [ - open("README.rst").read(), - open("CHANGES.rst").read(), - open("CONTRIBUTORS.rst").read(), - ] -) - - -setup( - name="collective.elastic.ingest", - version="1.4.1.dev0", - project_urls={ - "PyPI": "https://pypi.python.org/pypi/collective.elastic.ingest", - "Source": "https://github.com/collective/collective.elastic.ingest", - "Tracker": "https://github.com/collective/collective.elastic.ingest/issues", - }, - description="Addon for ElasticSearch integration with Plone", - long_description=long_description, - # Get more from https://pypi.org/classifiers/ - classifiers=[ - "Environment :: Web Environment", - "Framework :: Plone", - "Framework :: Plone :: Addon", - "Framework :: Plone :: 5.2", - "Framework :: Plone :: 6.0", - "Programming Language :: Python", - "Programming Language :: Python :: 3.7", - "Programming Language :: Python :: 3.8", - "Programming Language :: Python :: 3.9", - "Programming Language :: Python :: 3.10", - "Programming Language :: Python :: 3.11", - "Operating System :: OS Independent", - "License :: OSI Approved :: GNU General Public License v2 (GPLv2)", - ], - keywords="Python Plone", - packages=find_packages("src"), - namespace_packages=["collective", "collective.elastic"], - package_dir={"": "src"}, - include_package_data=True, - zip_safe=False, - python_requires=">=3.7", - install_requires=[ - "CacheControl", - "celery", - "elasticsearch>=7", - "opensearch-py", - "requests", - "setuptools", - ], -) diff --git a/src/collective/__init__.py b/src/collective/__init__.py index 03d08ff..e69de29 100644 --- a/src/collective/__init__.py +++ b/src/collective/__init__.py @@ -1,2 +0,0 @@ -# -*- coding: utf-8 -*- -__import__("pkg_resources").declare_namespace(__name__) diff --git a/src/collective/elastic/__init__.py b/src/collective/elastic/__init__.py index 03d08ff..e69de29 100644 --- a/src/collective/elastic/__init__.py +++ b/src/collective/elastic/__init__.py @@ -1,2 +0,0 @@ -# -*- coding: utf-8 -*- -__import__("pkg_resources").declare_namespace(__name__) diff --git a/src/collective/elastic/ingest/__init__.py b/src/collective/elastic/ingest/__init__.py index e806469..2a5833c 100644 --- a/src/collective/elastic/ingest/__init__.py +++ b/src/collective/elastic/ingest/__init__.py @@ -3,10 +3,17 @@ import os -OPENSEARCH = True if os.environ.get("OPENSEARCH") == "1" else False +OPENSEARCH = os.environ.get("INDEX_OPENSEARCH") == "1" -version_elasticsearch = version("elasticsearch") -ELASTICSEARCH_7 = int(version_elasticsearch[0]) <= 7 - -version_opensearchpy = version("opensearch-py") -OPENSEARCH_2 = int(version_opensearchpy[0]) <= 2 +if OPENSEARCH: + version_opensearchpy = version("opensearch-py") + if int(version_opensearchpy[0]) < 2: + raise ValueError( + "opensearch-py 1.x is not supported, use version 1.x of the collective.elastic.ingest package." + ) +else: + version_elasticsearch = version("elasticsearch") + if int(version_elasticsearch[0]) < 7: + raise ValueError( + "elasticsearch < 7 is not supported, use Version 1.x of the collective.elastic.ingest package." + ) diff --git a/src/collective/elastic/ingest/analysis.py b/src/collective/elastic/ingest/analysis.py new file mode 100644 index 0000000..27e4de2 --- /dev/null +++ b/src/collective/elastic/ingest/analysis.py @@ -0,0 +1,53 @@ +from .client import get_client +from .logging import logger + +import json +import os + + +_analysis_file = os.environ.get("ANALYSIS_FILE", None) + + +ANALYSISMAP = None +if _analysis_file: + try: + with open(_analysis_file) as fp: + ANALYSISMAP = json.load(fp) + except FileNotFoundError: + logger.warning(f"Analysis file '{_analysis_file}' not found.") +else: + logger.info("No analysis file configured.") + + +def update_analysis(index_name): + """Provide index with analyzers to be used in mapping. + + Sample is found in analysis.json.example. + Overwrite with your analyzers by creating an ANALYSIS_FILE `analysis.json`. + See README for details. + + First `update_analysis`, then `create_or_update_mapping`: + Mapping can use analyzers from analysis.json. + """ + + if not ANALYSISMAP: + logger.info("No analyzer configuration given.") + return + analysis_settings = ANALYSISMAP.get("settings", {}) + if not analysis_settings: + logger.warning("No analyzer settings found in configuration.") + return + client = get_client() + if client is None: + logger.warning("No ElasticSearch client available.") + return + if client.indices.exists(index_name): + logger.debug( + f"Analysis for index '{index_name}' already exists, skip creation." + ) + return + logger.info( + f"Create index '{index_name}' with analysis settings " + f"from '{_analysis_file}', but without mapping." + ) + client.indices.create(index_name, body=ANALYSISMAP) diff --git a/src/collective/elastic/ingest/analysis/__init__.py b/src/collective/elastic/ingest/analysis/__init__.py deleted file mode 100644 index 4665f5c..0000000 --- a/src/collective/elastic/ingest/analysis/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .analysis import update_analysis diff --git a/src/collective/elastic/ingest/analysis/analysis.py b/src/collective/elastic/ingest/analysis/analysis.py deleted file mode 100644 index 13df500..0000000 --- a/src/collective/elastic/ingest/analysis/analysis.py +++ /dev/null @@ -1,58 +0,0 @@ -# -*- coding: utf-8 -*- -from ..elastic import get_ingest_client -from ..logging import logger -from collective.elastic.ingest import ELASTICSEARCH_7 -from collective.elastic.ingest import OPENSEARCH -from collective.elastic.ingest import OPENSEARCH_2 - -import json -import os - - -_analysis_file = os.environ.get( - "ANALYSIS_FILE", os.path.join(os.path.dirname(__file__), "analysis.json") -) - -try: - with open(_analysis_file, mode="r") as fp: - ANALYSISMAP = json.load(fp) -except FileNotFoundError: - ANALYSISMAP = None - - -def update_analysis(index_name): - """Provide elasticsearch with analyzers to be used in mapping. - - Sample is found in analysis.json.example. - Overwrite with your analyzers by creating an ANALYSIS_FILE `analysis.json`. - See README for details. - - First `update_analysis`, then `create_or_update_mapping`: - Mapping can use analyzers from analysis.json. - """ - - if ANALYSISMAP: - analysis_settings = ANALYSISMAP.get("settings", {}) - if analysis_settings: - es = get_ingest_client() - if es is None: - logger.warning("No ElasticSearch client available.") - return - if ELASTICSEARCH_7: - index_exists = es.indices.exists(index_name) - else: - index_exists = es.indices.exists(index=index_name) - if index_exists: - return - - logger.info( - f"Create index '{index_name}' with analysis settings " - f"from '{_analysis_file}', but without mapping." - ) - if not OPENSEARCH and ELASTICSEARCH_7 or OPENSEARCH and OPENSEARCH_2: - es.indices.create(index_name, body=ANALYSISMAP) - else: - es.indices.create(index=index_name, settings=analysis_settings) - return - - logger.info("No analyzer configuration found.") diff --git a/src/collective/elastic/ingest/celery.py b/src/collective/elastic/ingest/celery.py index ad09ca8..9ef650a 100644 --- a/src/collective/elastic/ingest/celery.py +++ b/src/collective/elastic/ingest/celery.py @@ -1,6 +1,3 @@ -# -*- coding: utf-8 -*- -from __future__ import absolute_import - from .ingest import ingest from .logging import logger from .plone import fetch_content @@ -60,7 +57,7 @@ def index(path, timestamp, index_name): msg = "Error while writing data to ElasticSearch" logger.exception(msg) return msg - return "indexed {0} on timestamp {1}".format(path, timestamp) + return "indexed {} on timestamp {}".format(path, timestamp) @app.task(name="collective.elastic.ingest.unindex") @@ -72,4 +69,4 @@ def unindex(uid, index_name): msg = "Error while removing data from ElasticSearch" logger.exception(msg) return msg - return "unindexed {0}".format(uid) + return "unindexed {}".format(uid) diff --git a/src/collective/elastic/ingest/client.py b/src/collective/elastic/ingest/client.py new file mode 100644 index 0000000..5300bb4 --- /dev/null +++ b/src/collective/elastic/ingest/client.py @@ -0,0 +1,61 @@ +from . import OPENSEARCH +from .logging import logger + +import os +import threading + + +if OPENSEARCH: + from opensearchpy import OpenSearch +else: + from elasticsearch import Elasticsearch + +_local_storage = threading.local() + + +def get_client(index_server_baseurl: str = ""): + """index client for query or ingest + + either OpenSearch or Elasticsearch client, depending on OPENSEARCH env var + """ + + client = getattr(_local_storage, "client", None) + if client is not None: + return _local_storage.client + + raw_addr = index_server_baseurl or os.environ.get("INDEX_SERVER", "") + use_ssl = bool(int(os.environ.get("INDEX_USE_SSL", "0"))) + addresses = [x for x in raw_addr.split(",") if x.strip()] + if not addresses: + addresses.append("127.0.0.1:9200") + + # TODO: more auth options (cert, bearer token, api-key, etc) + auth = ( + os.environ.get("INDEX_LOGIN", "admin"), + os.environ.get("INDEX_PASSWORD", "admin"), + ) + + if OPENSEARCH: + logger.info(f"Use OpenSearch client at {addresses}") + hosts = [] + for address in addresses: + host, port = address.rsplit(":", 1) + hosts.append({"host": host, "port": port}) + client = OpenSearch( + hosts=hosts, + http_auth=auth, + use_ssl=use_ssl, + verify_certs=False, + ) + info = client.info() + logger.info(f"OpenSearch client info: {info}") + else: + logger.info(f"Use ElasticSearch client at {addresses}") + client = Elasticsearch( + addresses, + use_ssl=use_ssl, + basic_auth=auth, + verify_certs=False, + ) + setattr(_local_storage, "client", client) + return client diff --git a/src/collective/elastic/ingest/elastic.py b/src/collective/elastic/ingest/elastic.py index 1a47158..4e22861 100644 --- a/src/collective/elastic/ingest/elastic.py +++ b/src/collective/elastic/ingest/elastic.py @@ -1,48 +1,8 @@ -# -*- coding: utf-8 -*- +from .client import get_client from .logging import logger -from collective.elastic.ingest import ELASTICSEARCH_7 -from collective.elastic.ingest import OPENSEARCH -from collective.elastic.ingest import version_elasticsearch -from elasticsearch import Elasticsearch -from opensearchpy import OpenSearch - -import os def get_ingest_client(elasticsearch_server_baseurl=None): - """return elasticsearch client for.ingest""" - - raw_addr = elasticsearch_server_baseurl or os.environ.get( - "ELASTICSEARCH_INGEST_SERVER", "http://localhost:9200" - ) - use_ssl = os.environ.get("ELASTICSEARCH_INGEST_USE_SSL", "0") - use_ssl = bool(int(use_ssl)) - addresses = [x for x in raw_addr.split(",") if x.strip()] - if not addresses: - addresses.append("127.0.0.1:9200") - if OPENSEARCH: - """ - TODO Documentation about when to use more than one - ElasticSearch or OpenSearch cluster - """ - (host, port) = addresses[0].rsplit(":", 1) - auth = ( - os.environ.get("ELASTICSEARCH_INGEST_LOGIN", "admin"), - os.environ.get("ELASTICSEARCH_INGEST_PASSWORD", "admin"), - ) - client = OpenSearch( - hosts=[{"host": host, "port": port}], - http_auth=auth, - use_ssl=use_ssl, - verify_certs=False, - ) - info = client.info() - logger.info(f"OpenSearch client info: {info}") - return client - elif ELASTICSEARCH_7: - logger.info(f"ElasticSearch version {version_elasticsearch} installed") - return Elasticsearch( - addresses, - use_ssl=use_ssl, - ) - return Elasticsearch(addresses) + # to be removed in a 3.x release + logger.warn(".elastic.get_client is deprecated, use .client.get_client instead") + return get_client(elasticsearch_server_baseurl) diff --git a/src/collective/elastic/ingest/ingest/__init__.py b/src/collective/elastic/ingest/ingest/__init__.py index 8a32dd5..0deeed6 100644 --- a/src/collective/elastic/ingest/ingest/__init__.py +++ b/src/collective/elastic/ingest/ingest/__init__.py @@ -1,11 +1,11 @@ -# -*- coding: utf-8 -*- +from .. import OPENSEARCH from ..analysis import update_analysis -from ..elastic import get_ingest_client +from ..client import get_client from ..logging import logger from ..mapping import create_or_update_mapping from ..mapping import expanded_processors from ..mapping import EXPANSION_FIELDS -from ..mapping import FIELDMAP +from ..mapping import get_field_map from ..mapping import iterate_schema from ..postprocessing import postprocess from ..preprocessing import preprocess @@ -14,9 +14,6 @@ from .section import enrichWithSection from .security import enrichWithSecurityInfo from .vocabularyfields import stripVocabularyTermTitles -from collective.elastic.ingest import ELASTICSEARCH_7 -from collective.elastic.ingest import OPENSEARCH -from collective.elastic.ingest import OPENSEARCH_2 from pprint import pformat @@ -25,11 +22,12 @@ def _es_pipeline_name(index_name): - return "{0}_{1}".format(PIPELINE_PREFIX, index_name) + return "{}_{}".format(PIPELINE_PREFIX, index_name) def setup_ingest_pipelines(full_schema, index_name): - es = get_ingest_client() + logger.debug("setup ingest piplines") + client = get_client() pipeline_name = _es_pipeline_name(index_name) pipelines = { "description": "Extract Plone Binary attachment information", @@ -37,7 +35,8 @@ def setup_ingest_pipelines(full_schema, index_name): } for section_name, schema_name, field in iterate_schema(full_schema): fqfieldname = "/".join([section_name, schema_name, field["name"]]) - definition = FIELDMAP.get(fqfieldname, FIELDMAP.get(field["field"], None)) + fieldmap = get_field_map() + definition = fieldmap.get(fqfieldname, fieldmap.get(field["field"], None)) if not definition or "pipeline" not in definition: continue source = definition["pipeline"]["source"].format(name=field["name"]) @@ -48,12 +47,15 @@ def setup_ingest_pipelines(full_schema, index_name): if pipelines["processors"]: logger.info(f"update ingest pipelines {pipeline_name}") logger.debug(f"pipeline definitions:\n{pipelines}") - if not OPENSEARCH and ELASTICSEARCH_7 or OPENSEARCH and OPENSEARCH_2: - es.ingest.put_pipeline(pipeline_name, pipelines) + if OPENSEARCH: + client.ingest.put_pipeline(id=pipeline_name, body=pipelines) else: - es.ingest.put_pipeline(id=pipeline_name, processors=pipelines["processors"]) + client.ingest.put_pipeline( + id=pipeline_name, processors=pipelines["processors"] + ) else: - es.ingest.delete_pipeline(pipeline_name) + logger.info(f"delete ingest pipelines {pipeline_name}") + client.ingest.delete_pipeline(pipeline_name) def ingest(content, full_schema, index_name): @@ -79,11 +81,11 @@ def ingest(content, full_schema, index_name): postprocess(content, info) logger.info(f"Index content: {pformat(content)}") - es = get_ingest_client() - es_kwargs = dict( + client = get_client() + kwargs = dict( index=index_name, id=content["UID"], pipeline=_es_pipeline_name(index_name), body=content, ) - es.index(**es_kwargs) + client.index(**kwargs) diff --git a/src/collective/elastic/ingest/ingest/rid.py b/src/collective/elastic/ingest/ingest/rid.py index 56854d6..b3874a5 100644 --- a/src/collective/elastic/ingest/ingest/rid.py +++ b/src/collective/elastic/ingest/ingest/rid.py @@ -4,6 +4,4 @@ def enrichWithRid(content): # BBB backward compatibility elif "catalog_rid" in content["@components"]: content["rid"] = content["@components"]["catalog_rid"] - else: - pass return content diff --git a/src/collective/elastic/ingest/ingest/section.py b/src/collective/elastic/ingest/ingest/section.py index cebb812..3511a08 100644 --- a/src/collective/elastic/ingest/ingest/section.py +++ b/src/collective/elastic/ingest/ingest/section.py @@ -1,5 +1,3 @@ -from ..logging import logger - import os diff --git a/src/collective/elastic/ingest/ingest/vocabularyfields.py b/src/collective/elastic/ingest/ingest/vocabularyfields.py index 7d2b671..4d596bc 100644 --- a/src/collective/elastic/ingest/ingest/vocabularyfields.py +++ b/src/collective/elastic/ingest/ingest/vocabularyfields.py @@ -1,17 +1,14 @@ -from ..logging import logger - - def stripVocabularyTermTitles(content): """If field with vocabulary: Convert field value to token or list of tokens.""" for fieldname in content.keys(): - if type(content[fieldname]) == dict: + if type(content[fieldname]) is dict: if sorted(list(content[fieldname].keys())) == ["title", "token"]: content[fieldname] = content[fieldname]["token"] - if type(content[fieldname]) == list: + if type(content[fieldname]) is list: if ( len(content[fieldname]) > 0 - and type(content[fieldname][0]) == dict + and type(content[fieldname][0]) is dict and sorted(list(content[fieldname][0].keys())) == ["title", "token"] ): content[fieldname] = [el["token"] for el in content[fieldname]] diff --git a/src/collective/elastic/ingest/logging.py b/src/collective/elastic/ingest/logging.py index c0e2e31..bb800c2 100644 --- a/src/collective/elastic/ingest/logging.py +++ b/src/collective/elastic/ingest/logging.py @@ -1,9 +1,5 @@ -# -*- coding: utf-8 -*- -from __future__ import absolute_import - - try: - import collective.elastic.plone # noqa: W291 + import collective.elastic.plone # noqa: W291,F401 import logging logger = logging.getLogger("collective.elastic.ingest") diff --git a/src/collective/elastic/ingest/mapping.py b/src/collective/elastic/ingest/mapping.py index c91b78a..6a2798c 100644 --- a/src/collective/elastic/ingest/mapping.py +++ b/src/collective/elastic/ingest/mapping.py @@ -1,13 +1,12 @@ -# -*- coding: utf-8 -*- -from .elastic import get_ingest_client +from .client import get_client from .logging import logger -from collective.elastic.ingest import ELASTICSEARCH_7 from copy import deepcopy import json import operator import os import pprint +import typing pp = pprint.PrettyPrinter(indent=4) @@ -16,16 +15,23 @@ # to be filled as cache and renewed on create_or_update_mapping EXPANSION_FIELDS = {} -STATE = {"initial": True} +STATE = { + "initial": True, + "fieldmap": {}, +} -DETECTOR_METHODS = {} +DETECTOR_METHODS: dict[str, typing.Callable] = {} -_mappings_file = os.environ.get( - "MAPPINGS_FILE", os.path.join(os.path.dirname(__file__), "mappings.json") -) -with open(_mappings_file, mode="r") as fp: - FIELDMAP = json.load(fp) +def get_field_map() -> dict: + if STATE["fieldmap"] == {}: + _mappings_file = os.environ.get("MAPPINGS_FILE", None) + if not _mappings_file: + raise ValueError("No mappings file configured.") + with open(_mappings_file) as fp: + STATE["fieldmap"] = json.load(fp) + assert isinstance(STATE["fieldmap"], dict) + return STATE["fieldmap"] def iterate_schema(full_schema): @@ -56,10 +62,11 @@ def expanded_processors(processors, source, target): def map_field(field, properties, fqfieldname, seen): - definition = FIELDMAP.get(fqfieldname, FIELDMAP.get(field["field"], None)) + fieldmap = get_field_map() + definition = fieldmap.get(fqfieldname, fieldmap.get(field["field"], None)) if definition is None: logger.warning( - "Ignore: '{0}' field type nor '{1}' FQFN in map.".format( + "Ignore: '{}' field type nor '{}' FQFN in map.".format( field["field"], fqfieldname ) ) @@ -67,7 +74,7 @@ def map_field(field, properties, fqfieldname, seen): seen.add(field["name"]) logger.debug(f"Map field name {field['name']} to definition {definition}") if "type" in definition: - # simple defintion + # simple definition properties[field["name"]] = definition return # complex definition @@ -86,10 +93,11 @@ def map_field(field, properties, fqfieldname, seen): def update_expansion_fields(field, fqfieldname): - definition = FIELDMAP.get(fqfieldname, FIELDMAP.get(field["field"], None)) + fieldmap = get_field_map() + definition = fieldmap.get(fqfieldname, fieldmap.get(field["field"], None)) if definition is None: logger.warning( - "Ignore: '{0}' field type nor '{1}' FQFN in map.".format( + "Ignore: '{}' field type nor '{}' FQFN in map.".format( field["field"], fqfieldname ) ) @@ -120,18 +128,15 @@ def _replacement_detector(field, properties, definition, fqfieldname, seen): def create_or_update_mapping(full_schema, index_name): - es = get_ingest_client() - if es is None: - logger.warning("No ElasticSearch client available.") + client = get_client() + if client is None: + logger.warning("No index client available.") return # get current mapping - if ELASTICSEARCH_7: - index_exists = es.indices.exists(index_name) - else: - index_exists = es.indices.exists(index=index_name) + index_exists = client.indices.exists(index=index_name) if index_exists: - original_mapping = es.indices.get_mapping(index=index_name)[index_name] + original_mapping = client.indices.get_mapping(index=index_name)[index_name] mapping = deepcopy(original_mapping) if "properties" not in mapping["mappings"]: mapping["mappings"]["properties"] = {} @@ -158,14 +163,14 @@ def create_or_update_mapping(full_schema, index_name): if field["name"] in properties: logger.debug( "Skip existing field definition " - "{0} with {1}. Already defined: {2}".format( + "{} with {}. Already defined: {}".format( fqfieldname, value_type, properties[field["name"]] ) ) continue if field["name"] in seen: logger.debug( - "Skip dup field definition {0} with {1}.".format( + "Skip dup field definition {} with {}.".format( fqfieldname, value_type, ) @@ -188,24 +193,17 @@ def create_or_update_mapping(full_schema, index_name): ): logger.info("Update mapping.") logger.debug( - "Mapping is:\n{0}".format( + "Mapping is:\n{}".format( json.dumps(mapping["mappings"], sort_keys=True, indent=2) ) ) - if ELASTICSEARCH_7: - es.indices.put_mapping(index=index_name, body=mapping["mappings"]) - else: - es.indices.put_mapping( - index=[index_name], - body=mapping["mappings"], - ) + client.indices.put_mapping( + index=[index_name], + body=mapping["mappings"], + ) else: logger.debug("No update necessary. Mapping is unchanged.") else: - # from celery.contrib import rdb; rdb.set_trace() logger.info("Create index with mapping.") logger.debug(f"mapping is:\n{json.dumps(mapping, sort_keys=True, indent=2)}") - if ELASTICSEARCH_7: - es.indices.create(index_name, body=mapping) - else: - es.indices.create(index=index_name, mappings=mapping["mappings"]) + client.indices.create(index=index_name, mappings=mapping["mappings"]) diff --git a/src/collective/elastic/ingest/plone.py b/src/collective/elastic/ingest/plone.py index 709a1ba..dcb5dfc 100644 --- a/src/collective/elastic/ingest/plone.py +++ b/src/collective/elastic/ingest/plone.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- from .logging import logger from cachecontrol import CacheControl @@ -44,7 +43,7 @@ def fetch_content(path, timestamp): delay_status = RETRY_STATUS_BASE while True: logger.info( - "fetch content ({0}) from {1} ".format( + "fetch content ({}) from {} ".format( 1 + retries_timestamp + retries_status, url ) ) @@ -53,7 +52,7 @@ def fetch_content(path, timestamp): if resp.status_code != 200: if retries_status > RETRIES_STATUS_MAX: logger.info( - "-> status {0} - retry no.{1}, wait {2:0.3f}s".format( + "-> status {} - retry no.{}, wait {:0.3f}s".format( resp.status_code, retries_status, delay_status ) ) @@ -72,7 +71,7 @@ def fetch_content(path, timestamp): if retries_timestamp > RETRIES_TIMESTAMP_MAX: break logger.info( - "-> timestamp retry - fetch no.{0}, wait {1:0.3f}s".format( + "-> timestamp retry - fetch no.{}, wait {:0.3f}s".format( retries_timestamp, delay_timestamp ) ) @@ -82,14 +81,14 @@ def fetch_content(path, timestamp): continue return result - logger.error("-> can not fetch content {0}".format(url)) + logger.error("-> can not fetch content {}".format(url)) def fetch_schema(refetch=False): # from celery.contrib import rdb; rdb.set_trace() if refetch or time.time() + MAPPING_TIMEOUT_SEK > STATES["mapping_fetched"]: url = _schema_url() - logger.info("fetch full schema from {0}".format(url)) + logger.info("fetch full schema from {}".format(url)) resp = session.get(url) # xxx: check resp here return resp.json() @@ -97,7 +96,7 @@ def fetch_schema(refetch=False): def fetch_binary(url): - logger.info("fetch binary data from {0}".format(url)) + logger.info("fetch binary data from {}".format(url)) resp = session.get(url) # xxx: check resp here return resp.content diff --git a/src/collective/elastic/ingest/postprocessing.py b/src/collective/elastic/ingest/postprocessing.py index a6385cd..2d4f896 100644 --- a/src/collective/elastic/ingest/postprocessing.py +++ b/src/collective/elastic/ingest/postprocessing.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- from .logging import logger from .plone import fetch_binary from collections import OrderedDict diff --git a/src/collective/elastic/ingest/postprocessing_test.py b/src/collective/elastic/ingest/postprocessing_test.py index 8e49e5b..477e220 100644 --- a/src/collective/elastic/ingest/postprocessing_test.py +++ b/src/collective/elastic/ingest/postprocessing_test.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- from . import postprocessing diff --git a/src/collective/elastic/ingest/preprocessing.py b/src/collective/elastic/ingest/preprocessing.py index 0f757f9..876f90a 100644 --- a/src/collective/elastic/ingest/preprocessing.py +++ b/src/collective/elastic/ingest/preprocessing.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- from .logging import logger import json @@ -10,10 +9,10 @@ os.path.join(os.path.dirname(__file__), "preprocessings.json"), ) -with open(_preprocessings_file, mode="r") as fp: +with open(_preprocessings_file) as fp: PREPROCESSOR_CONFIGS = json.load(fp) -### MATCHERS +# MATCHERS MATCHING_FUNCTIONS = {} @@ -37,7 +36,7 @@ def match_content_exists(content, full_schema, config): MATCHING_FUNCTIONS["content_exists"] = match_content_exists -### ACTIONS +# ACTIONS ACTION_FUNCTIONS = {} @@ -45,7 +44,7 @@ def match_content_exists(content, full_schema, config): def action_additional_schema(content, full_schema, config): """add additional fields to a full_schema as fetched from Plone""" if full_schema is None: - # case: in subsequent calls theres no need to modify schema b/c of caching + # case: in subsequent calls there is no need to modify schema b/c of caching return if "additional" not in full_schema: full_schema["additional"] = {} @@ -73,7 +72,7 @@ def action_rewrite(content, full_schema, config): if source_container is None: if enforce: raise ValueError( - "Source container {0} not in content.".format(config["source"]) + "Source container {} not in content.".format(config["source"]) ) return target_container, target_key = _find_last_container_in_path( @@ -82,12 +81,12 @@ def action_rewrite(content, full_schema, config): if target_container is None: if enforce: raise ValueError( - "Target container {0} not in content.".format(config["source"]) + "Target container {} not in content.".format(config["source"]) ) return if source_key not in source_container: if enforce: - raise ValueError("Source {0} not in content.".format(config["source"])) + raise ValueError("Source {} not in content.".format(config["source"])) return target_container[target_key] = source_container[source_key] @@ -123,7 +122,7 @@ def action_empty_removal(content, full_schema, key): def preprocess(content, full_schema): """run full preprocessing pipeline on content and schema""" for ppcfg in PREPROCESSOR_CONFIGS: - logger.debug("Preprocessor configuration:\n{0}\n".format(ppcfg)) + logger.debug("Preprocessor configuration:\n{}\n".format(ppcfg)) matcher = MATCHING_FUNCTIONS[ppcfg["match"]["type"]] if not matcher(content, full_schema, ppcfg["match"]): continue diff --git a/src/collective/elastic/ingest/preprocessing_test.py b/src/collective/elastic/ingest/preprocessing_test.py index 163f249..d67013e 100644 --- a/src/collective/elastic/ingest/preprocessing_test.py +++ b/src/collective/elastic/ingest/preprocessing_test.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- from . import preprocessing import pytest @@ -40,7 +39,6 @@ def test_action_rewrite(): preprocessing.action_rewrite(root, {}, config) assert "rid" in root assert root["rid"] == ridvalue - assert "rid" not in root["@components"] def test_action_rewrite_non_existing(): diff --git a/src/collective/elastic/ingest/removal.py b/src/collective/elastic/ingest/removal.py index 7f10afd..3d49643 100644 --- a/src/collective/elastic/ingest/removal.py +++ b/src/collective/elastic/ingest/removal.py @@ -1,22 +1,13 @@ -# -*- coding: utf-8 -*- -from __future__ import absolute_import - -from .elastic import get_ingest_client - -import logging - - -logger = logging.getLogger(__name__) +from .client import get_client +from .logging import logger def remove(uid, index_name): - es = get_ingest_client() - if es is None: - logger.warning("No ElasticSearch client available.") + client = get_client() + if client is None: + logger.warning("No index client available.") return try: - es.delete(index=index_name, id=uid) + client.delete(index=index_name, id=uid) except Exception: - logger.exception( - "unindexing of {0} on index {1} failed".format(uid, index_name) - ) + logger.exception("unindexing of {} on index {} failed".format(uid, index_name)) diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..56e00b1 --- /dev/null +++ b/tox.ini @@ -0,0 +1,53 @@ +# Generated from: +# https://github.com/plone/meta/tree/master/config/default +# See the inline comments on how to expand/tweak this configuration file +[tox] +# We need 4.4.0 for constrain_package_deps. +min_version = 4.4.0 +envlist = + py38 + py39 + py310 + py311 + py312 + test + lint + +[gh-actions] +python = + 3.8: py38 + 3.9: py39 + 3.10: py310 + 3.11: py311 + 3.12: py312, lint + +[testenv] +use_develop = true +skip_install = false +constrain_package_deps = true + +commands = + pytest {posargs} +extras = + test + elasticsearch + opensearch-py + +[testenv:format] +description = automatically reformats code +skip_install = true +deps = + pre-commit +commands = + pre-commit run -a pyupgrade + pre-commit run -a isort + pre-commit run -a black + +[testenv:lint] +description = run linters that will help improve the code style +skip_install = true +deps = + pre-commit +commands = + pre-commit run -a +