Skip to content

Commit

Permalink
Remove Support for OpensSearch 1.x and ElasticSearch < 8
Browse files Browse the repository at this point in the history
  • Loading branch information
jensens committed Nov 16, 2023
1 parent 714062b commit 55682bf
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 81 deletions.
4 changes: 2 additions & 2 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ Changelog
[jensens]
- Add docker-compose file to start OpensSearch to example directory and move `.env` to example too.
[jensens]
- rename `ELASTIC_*` environemnt variables to have an consistent naming scheme, see README for details. [jensens]
- rename `ELASTIC_*` environment variables to have an consistent naming scheme, see README for details. [jensens]
- Add tox, Github Actions, CI and CD. [jensens]
- Refactor field-map loading to not happen on startup. [jensens]

- Remove Support for OpensSearch 1.x and ElasticSearch < 8 [jensens]


1.4 (2023-08-17)
Expand Down
4 changes: 2 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
collective.elastic.ingest
=========================

Ingestion service queue runner between Plone RestAPI and ElasticSearch or OpenSearch.
Ingestion service queue runner between Plone RestAPI and ElasticSearch 8+ or OpenSearch 2+.
Provides Celery-tasks to asynchronous index Plone content.

- auto-create Open-/ElasticSearch...
Expand All @@ -29,7 +29,7 @@ Install ``collective.elastic.ingest`` ready to use with redis and opensearch::

Depending on the queue server and index server used, the extra requirements vary:

- index server: ``opensearch``, ``elasticsearch7``, or ``elasticsearch8``.
- index server: ``opensearch``, ``elasticsearch``.
- queue server: ``redis`` or ``rabbitmq``.


Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ Issues = "https://github.com/collective/collective.elastic.ingest/issues"
[project.optional-dependencies]
redis = ["celery[redis]"]
rabbitmq = ["celery[librabbitmq]"]
opensearch = ["opensearch-py"]
opensearch = ["opensearch-py>=2"]
elasticsearch = ["elasticsearch>=8.0"]
test = [
"pytest",
Expand Down
12 changes: 8 additions & 4 deletions src/collective/elastic/ingest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,13 @@

if OPENSEARCH:
version_opensearchpy = version("opensearch-py")
OPENSEARCH_2 = int(version_opensearchpy[0]) <= 2
ELASTICSEARCH_7 = True
if int(version_opensearchpy[0]) < 2:
raise ValueError(
"opensearch-py 1.x is not supported, use version 1.x of the collective.elastic.ingest package."
)
else:
version_elasticsearch = version("elasticsearch")
ELASTICSEARCH_7 = int(version_elasticsearch[0]) <= 7
OPENSEARCH_2 = False
if int(version_elasticsearch[0]) < 7:
raise ValueError(
"elasticsearch < 7 is not supported, use Version 1.x of the collective.elastic.ingest package."
)
48 changes: 21 additions & 27 deletions src/collective/elastic/ingest/analysis.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from .elastic import get_ingest_client
from .logging import logger
from collective.elastic.ingest import ELASTICSEARCH_7
from collective.elastic.ingest import OPENSEARCH
from collective.elastic.ingest import OPENSEARCH_2

import json
import os
Expand Down Expand Up @@ -33,28 +31,24 @@ def update_analysis(index_name):
Mapping can use analyzers from analysis.json.
"""

if ANALYSISMAP:
analysis_settings = ANALYSISMAP.get("settings", {})
if analysis_settings:
es = get_ingest_client()
if es is None:
logger.warning("No ElasticSearch client available.")
return
if ELASTICSEARCH_7:
index_exists = es.indices.exists(index_name)
else:
index_exists = es.indices.exists(index=index_name)
if index_exists:
return

logger.info(
f"Create index '{index_name}' with analysis settings "
f"from '{_analysis_file}', but without mapping."
)
if not OPENSEARCH and ELASTICSEARCH_7 or OPENSEARCH and OPENSEARCH_2:
es.indices.create(index_name, body=ANALYSISMAP)
else:
es.indices.create(index=index_name, settings=analysis_settings)
return

logger.info("No analyzer configuration found.")
if not ANALYSISMAP:
logger.info("No analyzer configuration given.")
return
analysis_settings = ANALYSISMAP.get("settings", {})
if not analysis_settings:
logger.warning("No analyzer settings found in configuration.")
return
es = get_ingest_client()
if es is None:
logger.warning("No ElasticSearch client available.")
return
if es.indices.exists(index_name):
logger.debug(
f"Analysis for index '{index_name}' already exists, skip creation."
)
return
logger.info(
f"Create index '{index_name}' with analysis settings "
f"from '{_analysis_file}', but without mapping."
)
es.indices.create(index_name, body=ANALYSISMAP)
41 changes: 22 additions & 19 deletions src/collective/elastic/ingest/elastic.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from . import ELASTICSEARCH_7
from . import OPENSEARCH
from .logging import logger

Expand All @@ -11,27 +10,27 @@
from elasticsearch import Elasticsearch


def get_ingest_client(elasticsearch_server_baseurl=None):
"""return elasticsearch client for.ingest"""
def get_search_client(elasticsearch_server_baseurl: str = ""):
"""search client for query or ingest"""

raw_addr = elasticsearch_server_baseurl or os.environ.get(
"INGEST_SERVER", "http://localhost:9200"
)
use_ssl = os.environ.get("INGEST_USE_SSL", "0")
use_ssl = bool(int(use_ssl))
raw_addr = elasticsearch_server_baseurl or os.environ.get("INGEST_SERVER", "")
use_ssl = bool(int(os.environ.get("INGEST_USE_SSL", "0")))
addresses = [x for x in raw_addr.split(",") if x.strip()]
if not addresses:
addresses.append("127.0.0.1:9200")

# TODO: more auth options (cert, bearer token, api-key, etc)
auth = (
os.environ.get("INGEST_LOGIN", "admin"),
os.environ.get("INGEST_PASSWORD", "admin"),
)

if OPENSEARCH:
logger.info(f"Use OpenSearch client at {addresses}")
hosts = []
for address in addresses:
host, port = address.rsplit(":", 1)
hosts.append({"host": host, "port": port})
auth = (
os.environ.get("INGEST_LOGIN", "admin"),
os.environ.get("INGEST_PASSWORD", "admin"),
)
client = OpenSearch(
hosts=hosts,
http_auth=auth,
Expand All @@ -40,13 +39,17 @@ def get_ingest_client(elasticsearch_server_baseurl=None):
)
info = client.info()
logger.info(f"OpenSearch client info: {info}")
return client
elif ELASTICSEARCH_7:
from . import version_elasticsearch

logger.info(f"ElasticSearch version {version_elasticsearch} installed")
return Elasticsearch(
else:
logger.info(f"Use ElasticSearch client at {addresses}")
client = Elasticsearch(
addresses,
use_ssl=use_ssl,
basic_auth=auth,
verify_certs=False,
)
return Elasticsearch(addresses)
return client


def get_ingest_client(elasticsearch_server_baseurl=None):
logger.warn("get_ingest_client is deprecated, use get_search_client instead")
return get_search_client(elasticsearch_server_baseurl)
8 changes: 3 additions & 5 deletions src/collective/elastic/ingest/ingest/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .. import OPENSEARCH
from ..analysis import update_analysis
from ..elastic import get_ingest_client
from ..logging import logger
Expand All @@ -13,9 +14,6 @@
from .section import enrichWithSection
from .security import enrichWithSecurityInfo
from .vocabularyfields import stripVocabularyTermTitles
from collective.elastic.ingest import ELASTICSEARCH_7
from collective.elastic.ingest import OPENSEARCH
from collective.elastic.ingest import OPENSEARCH_2
from pprint import pformat


Expand Down Expand Up @@ -49,8 +47,8 @@ def setup_ingest_pipelines(full_schema, index_name):
if pipelines["processors"]:
logger.info(f"update ingest pipelines {pipeline_name}")
logger.debug(f"pipeline definitions:\n{pipelines}")
if (not OPENSEARCH and ELASTICSEARCH_7) or (OPENSEARCH and OPENSEARCH_2):
es.ingest.put_pipeline(pipeline_name, pipelines)
if OPENSEARCH:
es.ingest.put_pipeline(id=pipeline_name, body=pipelines)
else:
es.ingest.put_pipeline(id=pipeline_name, processors=pipelines["processors"])
else:
Expand Down
22 changes: 6 additions & 16 deletions src/collective/elastic/ingest/mapping.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from . import ELASTICSEARCH_7
from .elastic import get_ingest_client
from .logging import logger
from copy import deepcopy
Expand Down Expand Up @@ -135,10 +134,7 @@ def create_or_update_mapping(full_schema, index_name):
return

# get current mapping
if ELASTICSEARCH_7:
index_exists = es.indices.exists(index_name)
else:
index_exists = es.indices.exists(index=index_name)
index_exists = es.indices.exists(index=index_name)
if index_exists:
original_mapping = es.indices.get_mapping(index=index_name)[index_name]
mapping = deepcopy(original_mapping)
Expand Down Expand Up @@ -201,20 +197,14 @@ def create_or_update_mapping(full_schema, index_name):
json.dumps(mapping["mappings"], sort_keys=True, indent=2)
)
)
if ELASTICSEARCH_7:
es.indices.put_mapping(index=index_name, body=mapping["mappings"])
else:
es.indices.put_mapping(
index=[index_name],
body=mapping["mappings"],
)
es.indices.put_mapping(
index=[index_name],
body=mapping["mappings"],
)
else:
logger.debug("No update necessary. Mapping is unchanged.")
else:
# from celery.contrib import rdb; rdb.set_trace()
logger.info("Create index with mapping.")
logger.debug(f"mapping is:\n{json.dumps(mapping, sort_keys=True, indent=2)}")
if ELASTICSEARCH_7:
es.indices.create(index_name, body=mapping)
else:
es.indices.create(index=index_name, mappings=mapping["mappings"])
es.indices.create(index=index_name, mappings=mapping["mappings"])
6 changes: 1 addition & 5 deletions src/collective/elastic/ingest/removal.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
from .elastic import get_ingest_client

import logging


logger = logging.getLogger(__name__)
from .logging import logger


def remove(uid, index_name):
Expand Down

0 comments on commit 55682bf

Please sign in to comment.