diff --git a/CHANGES.rst b/CHANGES.rst index a6d1142..50d291c 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -6,6 +6,10 @@ Changelog - Fix ElasticSearch support. [jensens] - Add examples for a docker-compose setup for both, OpenSearch and ElasticSearch. [jensens] +- Add documentation for preprocessings [jensens] +- Remove 2 of the 4 static preprocessings and use preprocessings file for those. [jensens] +- Refactor and add preprocessings to be more consistent and less verbose. + Attention: JSON file format changed [jensens] 2.0.0b6 (2023-11-16) diff --git a/README.rst b/README.rst index 4c353f1..a3356dc 100644 --- a/README.rst +++ b/README.rst @@ -19,9 +19,9 @@ Provides Celery-tasks to asynchronous index Plone content. .. contents:: Table of Contents ------------- + Installation ------------- +============ We recommended to use a Python virtual environment, create one with ``python3 -m venv venv``, and activate it in the current terminal session with ``source venv/bin/activate``. @@ -35,12 +35,15 @@ Depending on the queue server and index server used, the extra requirements vary - queue server: ``redis`` or ``rabbitmq``. -------------- Configuration -------------- +============= Configuration is done via environment variables and JSON files. +----------- +Environment +----------- + Environment variables are: INDEX_SERVER @@ -115,9 +118,93 @@ SENTRY_DSN Default: disabled --------- -Starting --------- +---------- +JSON-Files +---------- + + +``mappings.json`` +----------------- + +The mappings file is a JSON file with the following structure: + +First level: ``Key: Value`` Pairs + +The key is +- either a Plone behavior base field starting with ``behaviors/`` or a Plone behavior field starting with ``behaviors/``, followed by the behavior-name, followed by ``/`` and the field-name, +- or the fully qualified dotted name of a zope.schema based field type. + +TODO: Document the value. + + +``preprocessings.json`` +----------------------- + +Pre-processings are steps done before anything else is processed. +They run on the raw data from the Plone REST API, the full schema fetched from the Plone backend, and the full content object fetched from the Plone backend. +Each preprocessing is a function that takes the data and modifies the full schema or full content. + +The pre-processings-file consists of list a processing instructions records. + +Each record is a mapping with a ``match``, an ``action`` and a ``configuration``. + +The match call an function that returns a boolean value. +If the value is true, the action is executed, otherwise skipped. + +There are two matches available + +``always`` + Always matches. + + Example configuration ``{"match": {"type": "always"}, ...}`` + + This is the default if no match is given. + +``content_exists`` + Matches if the field ``configuration["path"]`` is present in the content data. + Path can point to a field ``foo`` or check for its sub entries like ``foo/bar/baz``. + + Example configuration ``{"match": {"type": "content_exists", "path": "foo"}, ...}`` + +The action is a function that takes the full schema and content data, the configuration, and then modifies the full schema or full content. + +These actions ar available: + +``additional_schema`` + Adds an additional schema to the full schema. + The configuration must a valid schema to add. + +``rewrite`` + Moves content data from one position in the field-tree to another. + The configuration must be a mapping with ``source`` and ``target`` keys. + The value of ``source`` is the path to the data to move. + The value of ``target`` is the path to the new location of the data (missing containers are created). + The value of ``enforce`` is a boolean value (default: False). If True, the source must exist, otherwise an error is raised. + + Example: ``"configuration": {"source": "@components/collectiveelastic/blocks_plaintext", "target": "blocks_plaintext", "enforce": false}`` + +``remove`` + Deletes a field or sub-field from the content data. + The value of ``target`` is the path to the data to delete. + +``field_remove`` + Deletes a field from the full schema and its field value from the content. + The value of ``section`` is the section (one of ``behaviors`` or ``types``) + The value of ``name`` is the name of the behavior or type. + The value of ``field`` is the name of the field to delete. + +``full_remove`` + Deletes a full behavior or type with all its fields from the full schema and its fields values from the content. + The value of ``section`` is the section (one of ``behaviors`` or ``types``) + The value of ``name`` is the name of the behavior or type. + +``remove_empty`` + Deletes all empty fields from the content data. + A field is considered empty if it is ``None``, ``[]``, ``{}`` or ``""`` + + +Start up +======== Run celery worker:: @@ -130,9 +217,8 @@ Or with debug information:: The number is the concurrency of the worker. For production use, it should be set to the number of Plone backends available for indexing load. ---------- -OCI Image ---------- +OCI Image usage +=============== For use in Docker, Podman, Kubernetes, ..., an OCI image is provided at the `Github Container Registry `_. @@ -154,12 +240,12 @@ The `MAPPINGS_FILE` variable defaults to `/configuration/mappings.json`. By default no file is present. When a mount is provided to `/configuration`, the mappings file can be placed there. --------- Examples --------- +======== Example configuration files are provided in the `./examples `_ directory. +------------------------------ OpenSearch with Docker Compose ------------------------------ @@ -197,7 +283,7 @@ In another terminal window `run a Plone backend `_. There you can report issues too. @@ -341,9 +427,8 @@ We appreciate any contribution and if a release is needed to be done on PyPI, pl We also offer commercial support if any training, coaching, integration or adaptions are needed. ----------------------------- Installation for development ----------------------------- +============================ - clone source code repository, - enter repository directory @@ -352,8 +437,7 @@ Installation for development - load environment configuration ``source examples/.env``. -------- License -------- +======= The project is licensed under the GPLv2. diff --git a/src/collective/elastic/ingest/ingest/__init__.py b/src/collective/elastic/ingest/ingest/__init__.py index 0deeed6..4522a7d 100644 --- a/src/collective/elastic/ingest/ingest/__init__.py +++ b/src/collective/elastic/ingest/ingest/__init__.py @@ -3,16 +3,12 @@ from ..client import get_client from ..logging import logger from ..mapping import create_or_update_mapping -from ..mapping import expanded_processors from ..mapping import EXPANSION_FIELDS from ..mapping import get_field_map from ..mapping import iterate_schema from ..postprocessing import postprocess from ..preprocessing import preprocess -from .blocks import enrichWithBlocksPlainText -from .rid import enrichWithRid from .section import enrichWithSection -from .security import enrichWithSecurityInfo from .vocabularyfields import stripVocabularyTermTitles from pprint import pformat @@ -22,10 +18,32 @@ def _es_pipeline_name(index_name): + """Return the name of the ingest pipeline for the given index.""" return "{}_{}".format(PIPELINE_PREFIX, index_name) +def _expand_dict(mapping, **kw): + """Recursivly expand a dictionary with keyword arguments.""" + record = {} + for key, value in mapping.items(): + if isinstance(value, str): + value = value.format(**kw) + elif isinstance(value, dict): + value = _expand_dict(value, **kw) + record[key] = value + return record + + +def _expanded_processors(processors, source, target): + """Expand a list of processors with source and target.""" + result = [] + for processor in processors: + result.append(_expand_dict(processor, source=source, target=target)) + return result + + def setup_ingest_pipelines(full_schema, index_name): + """Setup ingest pipelines for the given index based on the schema.""" logger.debug("setup ingest piplines") client = get_client() pipeline_name = _es_pipeline_name(index_name) @@ -33,15 +51,15 @@ def setup_ingest_pipelines(full_schema, index_name): "description": "Extract Plone Binary attachment information", "processors": [], } + fieldmap = get_field_map() for section_name, schema_name, field in iterate_schema(full_schema): fqfieldname = "/".join([section_name, schema_name, field["name"]]) - fieldmap = get_field_map() definition = fieldmap.get(fqfieldname, fieldmap.get(field["field"], None)) if not definition or "pipeline" not in definition: continue source = definition["pipeline"]["source"].format(name=field["name"]) target = definition["pipeline"]["target"].format(name=field["name"]) - pipelines["processors"] += expanded_processors( + pipelines["processors"] += _expanded_processors( definition["pipeline"]["processors"], source, target ) if pipelines["processors"]: @@ -59,16 +77,22 @@ def setup_ingest_pipelines(full_schema, index_name): def ingest(content, full_schema, index_name): - """Preprocess content and schema.""" + """Process content and schema. + + This brings it together: Preprocess, create a mapping (and index/pipelines if not exists yet), + then postprocess and finally index the content. + """ logger.debug(f"Process content: {pformat(content)}") - enrichWithSecurityInfo(content) - enrichWithRid(content) + # special preprocessing logic for section and vocabulary fields + # TODO: refactor as special preprocessing enrichWithSection(content) - enrichWithBlocksPlainText(content) stripVocabularyTermTitles(content) + + # generic preprocessing accrording to rule in preprocessings.json preprocess(content, full_schema) + if full_schema: # first update_analysis, then create_or_update_mapping: # mapping can use analyzers from analysis.json diff --git a/src/collective/elastic/ingest/ingest/blocks.py b/src/collective/elastic/ingest/ingest/blocks.py deleted file mode 100644 index 4adfc51..0000000 --- a/src/collective/elastic/ingest/ingest/blocks.py +++ /dev/null @@ -1,5 +0,0 @@ -def enrichWithBlocksPlainText(content): - cce = content["@components"]["collectiveelastic"] - if "blocks_plaintext" in cce: - content["blocks_plaintext"] = cce["blocks_plaintext"] - return content diff --git a/src/collective/elastic/ingest/ingest/rid.py b/src/collective/elastic/ingest/ingest/rid.py deleted file mode 100644 index b3874a5..0000000 --- a/src/collective/elastic/ingest/ingest/rid.py +++ /dev/null @@ -1,7 +0,0 @@ -def enrichWithRid(content): - if "collectiveelastic" in content["@components"]: - content["rid"] = content["@components"]["collectiveelastic"]["catalog_rid"] - # BBB backward compatibility - elif "catalog_rid" in content["@components"]: - content["rid"] = content["@components"]["catalog_rid"] - return content diff --git a/src/collective/elastic/ingest/ingest/security.py b/src/collective/elastic/ingest/ingest/security.py deleted file mode 100644 index df49f39..0000000 --- a/src/collective/elastic/ingest/ingest/security.py +++ /dev/null @@ -1,5 +0,0 @@ -def enrichWithSecurityInfo(content): - content["allowedRolesAndUsers"] = content["@components"]["collectiveelastic"][ - "allowedRolesAndUsers" - ] - return content diff --git a/src/collective/elastic/ingest/mapping.py b/src/collective/elastic/ingest/mapping.py index 5ca6334..67ef4b1 100644 --- a/src/collective/elastic/ingest/mapping.py +++ b/src/collective/elastic/ingest/mapping.py @@ -32,6 +32,10 @@ def get_field_map() -> dict: + """The field map from file passed as filename in an env var. + + To not load 1000s of times, this is cached. + """ if STATE["fieldmap"] == {}: _mappings_file = os.environ.get("MAPPINGS_FILE", None) if not _mappings_file: @@ -45,33 +49,24 @@ def get_field_map() -> dict: def iterate_schema( full_schema: dict[str, dict[str, dict[str, dict]]], ) -> typing.Generator[tuple, None, None]: + """Iterate over the 3 Levels of the schema and flattend yield field definitions. + + The full_schema is the dict as received from Plone "@cesp" endpoint. + The endpoint is defined in collective.elastic.plone. + + The yielded tuple containes the section_name, schema_name and field definition. + """ for section_name, section in sorted( full_schema.items(), key=operator.itemgetter(0) ): for schema_name, schema in sorted(section.items(), key=operator.itemgetter(0)): + logger.debug(f"Schema: {section_name}/{schema_name}\n{schema}") for field in sorted(schema, key=operator.itemgetter("name")): yield section_name, schema_name, field -def _expand_dict(mapping, **kw): - record = {} - for key, value in mapping.items(): - if isinstance(value, str): - value = value.format(**kw) - elif isinstance(value, dict): - value = _expand_dict(value, **kw) - record[key] = value - return record - - -def expanded_processors(processors, source, target): - result = [] - for processor in processors: - result.append(_expand_dict(processor, source=source, target=target)) - return result - - def map_field(field, properties, fqfieldname, seen): + """Map a field to a definition and add it to the properties.""" fieldmap = get_field_map() definition = fieldmap.get(fqfieldname, fieldmap.get(field["field"], None)) if definition is None: @@ -103,6 +98,12 @@ def map_field(field, properties, fqfieldname, seen): def update_expansion_fields(field, fqfieldname): + """Remember expansion fields for later use in post_processors. + + This are fields where content need to be added in a postprocessing step, + i.e. images or files to be fetched from Plone, because the binaries are + not part of the inital information. + """ fieldmap = get_field_map() definition = fieldmap.get(fqfieldname, fieldmap.get(field["field"], None)) if definition is None: @@ -128,9 +129,8 @@ def _replacement_detector(field, properties, definition, fqfieldname, seen): properties[field["name"]] = definition["detection"]["default"] return replacement["name"] = field["name"] - update_expansion_fields( - field, fqfieldname - ) # TODO Needed here? Was part of map_field. + # since we replace here, a new expansion field could be needed + update_expansion_fields(field, fqfieldname) map_field(replacement, properties, fqfieldname, seen) @@ -138,6 +138,18 @@ def _replacement_detector(field, properties, definition, fqfieldname, seen): def create_or_update_mapping(full_schema, index_name: str) -> None: + """Create or update the mapping for the given index. + + Based on the full_schema as provided by the "@cesp" endpoint, + enriched by the preprocessings. + + Based on the MAPPINGS_FILE env var JSON file. + + It globally collects fields for later expansion in postprocessing. + + Finaly it initially creates the index (also index settings are set) + or updates the mapping for the given index. + """ client = get_client() if client is None: logger.warning("No index client available.") @@ -153,7 +165,9 @@ def create_or_update_mapping(full_schema, index_name: str) -> None: else: # ftr: here is the basic structure of a mapping mapping = { - "mappings": {"properties": {},}, + "mappings": { + "properties": {}, + }, "settings": DEFAULT_INDEX_SETTINGS, } # process mapping @@ -183,6 +197,7 @@ def create_or_update_mapping(full_schema, index_name: str) -> None: map_field(field, properties, fqfieldname, seen) # Mapping for blocks_plaintext (not a schema field, but received from api expansion "collectiveelastic") + # TODO: handle this with preprocessings.json map_field( dict(name="blocks_plaintext", field="blocks_plaintext"), properties, diff --git a/src/collective/elastic/ingest/preprocessing.py b/src/collective/elastic/ingest/preprocessing.py index 876f90a..60094ee 100644 --- a/src/collective/elastic/ingest/preprocessing.py +++ b/src/collective/elastic/ingest/preprocessing.py @@ -97,7 +97,8 @@ def action_rewrite(content, full_schema, config): def action_remove(content, full_schema, config): """remove unused entry""" target, target_key = _find_last_container_in_path( - content, config["target"].split("/") + content, + config["target"].split("/"), ) if target and target_key in target: del target[target_key] @@ -106,6 +107,41 @@ def action_remove(content, full_schema, config): ACTION_FUNCTIONS["remove"] = action_remove +def action_field_remove(content, full_schema, config): + """remove full field from content and schema.""" + if config["field"] in content: + del content[config["field"]] + if not full_schema: + # cached schema, not passed, no need to process + return + section = full_schema[config["section"]] + fields = section[config["name"]] + index = [f["name"] for f in fields].index(config["field"]) + del fields[index] + + +ACTION_FUNCTIONS["field_remove"] = action_field_remove + + +def action_full_remove(content, full_schema, config): + """remove full behavior or types fields.""" + section = full_schema[config["section"]] + if full_schema: + # we need to cache the fields, because in subsequent calls there is no schema provided + fields = section[config["name"]] + if "__fields" not in "config": + config["__fields"] = fields + else: + fields = config["__fields"] + for field in fields: + if field["name"] in content: + del content[field["name"]] + del section[config["name"]] + + +ACTION_FUNCTIONS["full_remove"] = action_full_remove + + def action_empty_removal(content, full_schema, key): """remove empty fields""" to_remove = set() @@ -123,8 +159,9 @@ def preprocess(content, full_schema): """run full preprocessing pipeline on content and schema""" for ppcfg in PREPROCESSOR_CONFIGS: logger.debug("Preprocessor configuration:\n{}\n".format(ppcfg)) - matcher = MATCHING_FUNCTIONS[ppcfg["match"]["type"]] - if not matcher(content, full_schema, ppcfg["match"]): + match = ppcfg.get("match", {"type": "always"}) + matcher = MATCHING_FUNCTIONS[match["type"]] + if not matcher(content, full_schema, match): continue action = ACTION_FUNCTIONS[ppcfg["action"]] action(content, full_schema, ppcfg.get("configuration", {})) diff --git a/src/collective/elastic/ingest/preprocessing_test.py b/src/collective/elastic/ingest/preprocessing_test.py index d67013e..4a77c9e 100644 --- a/src/collective/elastic/ingest/preprocessing_test.py +++ b/src/collective/elastic/ingest/preprocessing_test.py @@ -57,3 +57,65 @@ def test_action_rewrite_non_existing_forced(): } with pytest.raises(ValueError): preprocessing.action_rewrite(root, {}, config) + + +def test_action_field_remove(): + full_schema = { + "behaviors": { + "plone.basic": [ + {"field": "zope.schema._bootstrapfields.TextLine", "name": "title"}, + {"field": "zope.schema._bootstrapfields.Text", "name": "description"}, + ] + } + } + config = { + "section": "behaviors", + "name": "plone.basic", + "field": "title", + } + root = { + "foo": "bar", + "title": "Foo", + } + from .preprocessing import action_field_remove + + action_field_remove(root, full_schema, config) + + assert root == {"foo": "bar"} + assert len(full_schema["behaviors"]["plone.basic"]) == 1 + + +def test_action_field_remove(): + full_schema = { + "behaviors": { + "plone.basic": [ + {"field": "zope.schema._bootstrapfields.TextLine", "name": "title"}, + {"field": "zope.schema._bootstrapfields.Text", "name": "description"}, + ], + "plone.categorization": [ + { + "field": "zope.schema._field.Tuple", + "name": "subjects", + "value_type": {"field": "zope.schema._bootstrapfields.TextLine"}, + }, + {"field": "zope.schema._field.Choice", "name": "language"}, + ], + } + } + config = { + "section": "behaviors", + "name": "plone.categorization", + } + root = { + "title": "Foo", + "description": "Bar", + "subjects": ["Foo", "Bar"], + "language": "de", + "baz": "Baaz", + } + from .preprocessing import action_full_remove + + action_full_remove(root, full_schema, config) + + assert root == {"baz": "Baaz", "description": "Bar", "title": "Foo"} + assert "plone.categorization" not in full_schema["behaviors"] diff --git a/src/collective/elastic/ingest/preprocessings.json b/src/collective/elastic/ingest/preprocessings.json index 472f166..a712474 100644 --- a/src/collective/elastic/ingest/preprocessings.json +++ b/src/collective/elastic/ingest/preprocessings.json @@ -1,175 +1,156 @@ [ { - "match": { - "type": "always" - }, - "action": "remove", + "comment": "Remove all empty fields.", + "action": "remove_empty" + }, + { + "comment": "ProxyIndex needs this information, essential rewrite, do not remove", + "action": "rewrite", "configuration": { - "target": "blocks" + "source": "@components/collectiveelastic/catalog_rid", + "target": "rid", + "enforce": true } }, { - "match": { - "type": "always" - }, + "comment": "CESP Restapi endpoint needs this information, essential rewrite, do not remove", + "action": "rewrite", + "configuration": { + "source": "@components/collectiveelastic/allowedRolesAndUsers", + "target": "allowedRolesAndUsers", + "enforce": true + } + }, + { + "comment": "If Volto is available, this is important for full text search, do not remove unless in Classic UI only environments", + "action": "rewrite", + "configuration": { + "source": "@components/collectiveelastic/blocks_plaintext", + "target": "blocks_plaintext", + "enforce": false + } + }, + { + "comment": "If volto.blocks is available, this removes a field not used in search.", "action": "remove", "configuration": { - "target": "blocks_layout" + "target": "blocks" } }, - { - "match": { - "type": "always" - }, - "action": "additional_schema", + "comment": "If volto.blocks is available, this removes a field not used in search.", + "action": "remove", "configuration": { - "name": "portal_type", - "field": "zope.schema._field.ASCIILine" + "target": "blocks_layout" } }, { - "match": { - "type": "always" - }, + "comment": "We want the field to be called portal_type, not @type, so we rewrite it.", "action": "rewrite", "configuration": { "source": "@type", "target": "portal_type" } }, - { - "match": { - "type": "always" - }, + "comment": "After all the rewrites, we want to remove the @components field, as it is not needed anymore.", "action": "remove", "configuration": { "target": "@components" } }, { - "match": { - "type": "always" - }, + "comment": "No need for restapi specific folder/collection information.", "action": "remove", "configuration": { "target": "items" } }, { - "match": { - "type": "always" - }, + "comment": "No need for restapi specific information.", "action": "remove", "configuration": { "target": "items_total" } }, { - "match": { - "type": "always" - }, + "comment": "No need for restapi specific information.", "action": "remove", "configuration": { "target": "previous_item" } }, { - "match": { - "type": "always" - }, + "comment": "No need for restapi specific information.", "action": "remove", "configuration": { "target": "next_item" } }, { - "match": { - "type": "always" - }, + "comment": "No need for restapi versioning related information.", "action": "remove", "configuration": { "target": "version" } }, { - "match": { - "type": "always" - }, - "action": "remove", - "configuration": { - "target": "version_enabled" - } - }, - { - "match": { - "type": "always" - }, - "action": "remove_empty" - }, - - { - "match": { - "type": "always" - }, - "action": "remove", + "comment": "No need for plone.versioning related information.", + "action": "full_remove", "configuration": { - "target": "showFields" + "section": "behaviors", + "name": "plone.versioning" } }, { - "match": { - "type": "always" - }, - "action": "remove", + "comment": "No need for plone.collection related information.", + "action": "full_remove", "configuration": { - "target": "query" + "section": "behaviors", + "name": "plone.collection" } }, { - "match": { - "type": "always" - }, - "action": "remove", + "comment": "No need for plone.thumb_icon related information.", + "action": "full_remove", "configuration": { - "target": "sort_on" + "section": "behaviors", + "name": "plone.thumb_icon" } }, { - "match": { - "type": "always" - }, - "action": "remove", + "comment": "No need for plone.tableofcontents related information.", + "action": "full_remove", "configuration": { - "target": "sort_reversed" + "section": "behaviors", + "name": "plone.tableofcontents" } }, { - "match": { - "type": "always" - }, - "action": "remove", + "comment": "No need for plone.layoutaware (Mosaic) layout related.", + "action": "field_remove", "configuration": { - "target": "limit" + "section": "behaviors", + "name": "plone.layoutaware", + "field": "contentLayout" } }, { - "match": { - "type": "always" - }, - "action": "remove", + "comment": "No need for plone.layoutaware (Mosaic) layout related.", + "action": "field_remove", "configuration": { - "target": "item_count" + "section": "behaviors", + "name": "plone.layoutaware", + "field": "pageSiteLayout" } }, { - "match": { - "type": "always" - }, - "action": "remove", + "comment": "No need for plone.layoutaware (Mosaic) layout related.", + "action": "field_remove", "configuration": { - "target": "customViewFields" + "section": "behaviors", + "name": "plone.layoutaware", + "field": "sectionSiteLayout" } }