Skip to content

Commit

Permalink
overhaul/refactor/simplifiy preprocessings
Browse files Browse the repository at this point in the history
  • Loading branch information
jensens committed Nov 20, 2023
1 parent 9e463b0 commit b8727a1
Show file tree
Hide file tree
Showing 10 changed files with 353 additions and 163 deletions.
4 changes: 4 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ Changelog

- Fix ElasticSearch support. [jensens]
- Add examples for a docker-compose setup for both, OpenSearch and ElasticSearch. [jensens]
- Add documentation for preprocessings [jensens]
- Remove 2 of the 4 static preprocessings and use preprocessings file for those. [jensens]
- Refactor and add preprocessings to be more consistent and less verbose.
Attention: JSON file format changed [jensens]


2.0.0b6 (2023-11-16)
Expand Down
128 changes: 106 additions & 22 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ Provides Celery-tasks to asynchronous index Plone content.

.. contents:: Table of Contents

------------

Installation
------------
============

We recommended to use a Python virtual environment, create one with ``python3 -m venv venv``, and activate it in the current terminal session with ``source venv/bin/activate``.

Expand All @@ -35,12 +35,15 @@ Depending on the queue server and index server used, the extra requirements vary
- queue server: ``redis`` or ``rabbitmq``.


-------------
Configuration
-------------
=============

Configuration is done via environment variables and JSON files.

-----------
Environment
-----------

Environment variables are:

INDEX_SERVER
Expand Down Expand Up @@ -115,9 +118,93 @@ SENTRY_DSN
Default: disabled


--------
Starting
--------
----------
JSON-Files
----------


``mappings.json``
-----------------

The mappings file is a JSON file with the following structure:

First level: ``Key: Value`` Pairs

The key is
- either a Plone behavior base field starting with ``behaviors/`` or a Plone behavior field starting with ``behaviors/``, followed by the behavior-name, followed by ``/`` and the field-name,
- or the fully qualified dotted name of a zope.schema based field type.

TODO: Document the value.


``preprocessings.json``
-----------------------

Pre-processings are steps done before anything else is processed.
They run on the raw data from the Plone REST API, the full schema fetched from the Plone backend, and the full content object fetched from the Plone backend.
Each preprocessing is a function that takes the data and modifies the full schema or full content.

The pre-processings-file consists of list a processing instructions records.

Each record is a mapping with a ``match``, an ``action`` and a ``configuration``.

The match call an function that returns a boolean value.
If the value is true, the action is executed, otherwise skipped.

There are two matches available

``always``
Always matches.

Example configuration ``{"match": {"type": "always"}, ...}``

This is the default if no match is given.

``content_exists``
Matches if the field ``configuration["path"]`` is present in the content data.
Path can point to a field ``foo`` or check for its sub entries like ``foo/bar/baz``.

Example configuration ``{"match": {"type": "content_exists", "path": "foo"}, ...}``

The action is a function that takes the full schema and content data, the configuration, and then modifies the full schema or full content.

These actions ar available:

``additional_schema``
Adds an additional schema to the full schema.
The configuration must a valid schema to add.

``rewrite``
Moves content data from one position in the field-tree to another.
The configuration must be a mapping with ``source`` and ``target`` keys.
The value of ``source`` is the path to the data to move.
The value of ``target`` is the path to the new location of the data (missing containers are created).
The value of ``enforce`` is a boolean value (default: False). If True, the source must exist, otherwise an error is raised.

Example: ``"configuration": {"source": "@components/collectiveelastic/blocks_plaintext", "target": "blocks_plaintext", "enforce": false}``

``remove``
Deletes a field or sub-field from the content data.
The value of ``target`` is the path to the data to delete.

``field_remove``
Deletes a field from the full schema and its field value from the content.
The value of ``section`` is the section (one of ``behaviors`` or ``types``)
The value of ``name`` is the name of the behavior or type.
The value of ``field`` is the name of the field to delete.

``full_remove``
Deletes a full behavior or type with all its fields from the full schema and its fields values from the content.
The value of ``section`` is the section (one of ``behaviors`` or ``types``)
The value of ``name`` is the name of the behavior or type.

``remove_empty``
Deletes all empty fields from the content data.
A field is considered empty if it is ``None``, ``[]``, ``{}`` or ``""``


Start up
========

Run celery worker::

Expand All @@ -130,9 +217,8 @@ Or with debug information::
The number is the concurrency of the worker.
For production use, it should be set to the number of Plone backends available for indexing load.

---------
OCI Image
---------
OCI Image usage
===============

For use in Docker, Podman, Kubernetes, ..., an OCI image is provided at the `Github Container Registry <https://github.com/collective/collective.elastic.ingest/pkgs/container/collective.elastic.ingest>`_.

Expand All @@ -154,12 +240,12 @@ The `MAPPINGS_FILE` variable defaults to `/configuration/mappings.json`.
By default no file is present.
When a mount is provided to `/configuration`, the mappings file can be placed there.

--------
Examples
--------
========

Example configuration files are provided in the `./examples <https://github.com/collective/collective.elastic.ingest/tree/main/examples>`_ directory.

------------------------------
OpenSearch with Docker Compose
------------------------------

Expand Down Expand Up @@ -197,7 +283,7 @@ In another terminal window `run a Plone backend <https://6.docs.plone.org/instal
There, create an item or modify an existing one.
You should see the indexing task in the celery worker terminal window.


---------------------------------
ElasticSearch with Docker Compose
---------------------------------

Expand Down Expand Up @@ -240,7 +326,7 @@ In another terminal window `run a Plone backend <https://6.docs.plone.org/instal
There, create an item or modify an existing one.
You should see the indexing task in the celery worker terminal window.


------------------
Local/ Development
------------------

Expand All @@ -253,6 +339,7 @@ An environemnt file ``examples/docker/local/.env`` is provided with the environm
Run ``source examples/.env`` to load the environment variables.
Then start the celery worker with ``celery -A collective.elastic.ingest.celery.app worker -l debug``.

-----------------------------------------
Complex Mapping With German Text Analysis
-----------------------------------------

Expand All @@ -262,9 +349,9 @@ A complex mappings file with german text analysis configured, ``mappings-german-
It comes together with the matching analysis configuration file ``analysis-german.json`` and a stub lexicon file ``elasticsearch-lexicon-german.txt``.
Read the next section for more information about text analysis.

-------------

Text Analysis
-------------
=============

Test analysis is optional.
Skip this on a first installation.
Expand Down Expand Up @@ -327,9 +414,8 @@ The response delivers the tokens for the analyzed text 'Lehrstellenbörse'.
Note: The file ``elasticsearch-lexicon.txt`` with the word list used by the ``decompounder`` of the sample analysis configuration in ``analysis.json.example`` has to be located in the configuration directory of your elasticsearch server.


-----------
Source Code
-----------
===========

The sources are in a GIT DVCS with its main branches at `github <https://github.com/collective/collective.elastic.ingest>`_.
There you can report issues too.
Expand All @@ -341,9 +427,8 @@ We appreciate any contribution and if a release is needed to be done on PyPI, pl
We also offer commercial support if any training, coaching, integration or adaptions are needed.


----------------------------
Installation for development
----------------------------
============================

- clone source code repository,
- enter repository directory
Expand All @@ -352,8 +437,7 @@ Installation for development
- load environment configuration ``source examples/.env``.


-------
License
-------
=======

The project is licensed under the GPLv2.
44 changes: 34 additions & 10 deletions src/collective/elastic/ingest/ingest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,12 @@
from ..client import get_client
from ..logging import logger
from ..mapping import create_or_update_mapping
from ..mapping import expanded_processors
from ..mapping import EXPANSION_FIELDS
from ..mapping import get_field_map
from ..mapping import iterate_schema
from ..postprocessing import postprocess
from ..preprocessing import preprocess
from .blocks import enrichWithBlocksPlainText
from .rid import enrichWithRid
from .section import enrichWithSection
from .security import enrichWithSecurityInfo
from .vocabularyfields import stripVocabularyTermTitles
from pprint import pformat

Expand All @@ -22,26 +18,48 @@


def _es_pipeline_name(index_name):
"""Return the name of the ingest pipeline for the given index."""
return "{}_{}".format(PIPELINE_PREFIX, index_name)


def _expand_dict(mapping, **kw):
"""Recursivly expand a dictionary with keyword arguments."""
record = {}
for key, value in mapping.items():
if isinstance(value, str):
value = value.format(**kw)
elif isinstance(value, dict):
value = _expand_dict(value, **kw)
record[key] = value
return record


def _expanded_processors(processors, source, target):
"""Expand a list of processors with source and target."""
result = []
for processor in processors:
result.append(_expand_dict(processor, source=source, target=target))
return result


def setup_ingest_pipelines(full_schema, index_name):
"""Setup ingest pipelines for the given index based on the schema."""
logger.debug("setup ingest piplines")
client = get_client()
pipeline_name = _es_pipeline_name(index_name)
pipelines = {
"description": "Extract Plone Binary attachment information",
"processors": [],
}
fieldmap = get_field_map()
for section_name, schema_name, field in iterate_schema(full_schema):
fqfieldname = "/".join([section_name, schema_name, field["name"]])
fieldmap = get_field_map()
definition = fieldmap.get(fqfieldname, fieldmap.get(field["field"], None))
if not definition or "pipeline" not in definition:
continue
source = definition["pipeline"]["source"].format(name=field["name"])
target = definition["pipeline"]["target"].format(name=field["name"])
pipelines["processors"] += expanded_processors(
pipelines["processors"] += _expanded_processors(
definition["pipeline"]["processors"], source, target
)
if pipelines["processors"]:
Expand All @@ -59,16 +77,22 @@ def setup_ingest_pipelines(full_schema, index_name):


def ingest(content, full_schema, index_name):
"""Preprocess content and schema."""
"""Process content and schema.
This brings it together: Preprocess, create a mapping (and index/pipelines if not exists yet),
then postprocess and finally index the content.
"""

logger.debug(f"Process content: {pformat(content)}")

enrichWithSecurityInfo(content)
enrichWithRid(content)
# special preprocessing logic for section and vocabulary fields
# TODO: refactor as special preprocessing
enrichWithSection(content)
enrichWithBlocksPlainText(content)
stripVocabularyTermTitles(content)

# generic preprocessing accrording to rule in preprocessings.json
preprocess(content, full_schema)

if full_schema:
# first update_analysis, then create_or_update_mapping:
# mapping can use analyzers from analysis.json
Expand Down
5 changes: 0 additions & 5 deletions src/collective/elastic/ingest/ingest/blocks.py

This file was deleted.

7 changes: 0 additions & 7 deletions src/collective/elastic/ingest/ingest/rid.py

This file was deleted.

5 changes: 0 additions & 5 deletions src/collective/elastic/ingest/ingest/security.py

This file was deleted.

Loading

0 comments on commit b8727a1

Please sign in to comment.