Skip to content

Commit

Permalink
.client.get_client instead of .elastic.get_ingest_client
Browse files Browse the repository at this point in the history
  • Loading branch information
jensens committed Nov 16, 2023
1 parent 3271369 commit 66d46b8
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 82 deletions.
3 changes: 2 additions & 1 deletion CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ Changelog
- rename `ELASTIC_*` environment variables to have an consistent naming scheme, see README for details. [jensens]
- Add tox, Github Actions, CI and CD. [jensens]
- Refactor field-map loading to not happen on startup. [jensens]
- Remove Support for OpensSearch 1.x and ElasticSearch < 8 [jensens]
- Remove Support for OpenSearch 1.x and ElasticSearch < 8 [jensens]
- rename .elastic.get_ingest_client to .client.get_client [jensens]


1.4 (2023-08-17)
Expand Down
21 changes: 21 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,27 @@ Or with debug information::
The number is the concurrency of the worker.
For production use, it should be set to the number of Plone backends available for indexing load.

---------
OCI Image
---------

For use in Docker, Podman, Kubernetes, ..., an OCI image is provided at ...

The environment variables above are used as configuration.

Additional the following environment variables are used:

CELERY_CONCURENCY
The number of concurrent tasks to run.

Default: 1

CELERY_LOGLEVEL
The log level for celery.

Default: info


--------
Examples
--------
Expand Down
13 changes: 6 additions & 7 deletions src/collective/elastic/ingest/analysis.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from .elastic import get_ingest_client
from .client import get_client
from .logging import logger
from collective.elastic.ingest import OPENSEARCH

import json
import os
Expand All @@ -21,7 +20,7 @@


def update_analysis(index_name):
"""Provide elasticsearch with analyzers to be used in mapping.
"""Provide index with analyzers to be used in mapping.
Sample is found in analysis.json.example.
Overwrite with your analyzers by creating an ANALYSIS_FILE `analysis.json`.
Expand All @@ -38,11 +37,11 @@ def update_analysis(index_name):
if not analysis_settings:
logger.warning("No analyzer settings found in configuration.")
return
es = get_ingest_client()
if es is None:
client = get_client()
if client is None:
logger.warning("No ElasticSearch client available.")
return
if es.indices.exists(index_name):
if client.indices.exists(index_name):
logger.debug(
f"Analysis for index '{index_name}' already exists, skip creation."
)
Expand All @@ -51,4 +50,4 @@ def update_analysis(index_name):
f"Create index '{index_name}' with analysis settings "
f"from '{_analysis_file}', but without mapping."
)
es.indices.create(index_name, body=ANALYSISMAP)
client.indices.create(index_name, body=ANALYSISMAP)
58 changes: 58 additions & 0 deletions src/collective/elastic/ingest/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from . import OPENSEARCH
from .logging import logger

import os


if OPENSEARCH:
from opensearchpy import OpenSearch
else:
from elasticsearch import Elasticsearch


def get_client(index_server_baseurl: str = ""):
"""index client for query or ingest
either OpenSearch or Elasticsearch client, depending on OPENSEARCH env var
"""

raw_addr = index_server_baseurl or os.environ.get("INDEX_SERVER", "")
use_ssl = bool(int(os.environ.get("INDEX_USE_SSL", "0")))
addresses = [x for x in raw_addr.split(",") if x.strip()]
if not addresses:
addresses.append("127.0.0.1:9200")

# TODO: more auth options (cert, bearer token, api-key, etc)
auth = (
os.environ.get("INDEX_LOGIN", "admin"),
os.environ.get("INDEX_PASSWORD", "admin"),
)

if OPENSEARCH:
logger.info(f"Use OpenSearch client at {addresses}")
hosts = []
for address in addresses:
host, port = address.rsplit(":", 1)
hosts.append({"host": host, "port": port})
client = OpenSearch(
hosts=hosts,
http_auth=auth,
use_ssl=use_ssl,
verify_certs=False,
)
info = client.info()
logger.info(f"OpenSearch client info: {info}")
else:
logger.info(f"Use ElasticSearch client at {addresses}")
client = Elasticsearch(
addresses,
use_ssl=use_ssl,
basic_auth=auth,
verify_certs=False,
)
return client


def get_ingest_client(elasticsearch_server_baseurl=None):
logger.warn("get_client is deprecated, use get_search_client instead")
return get_client(elasticsearch_server_baseurl)
57 changes: 5 additions & 52 deletions src/collective/elastic/ingest/elastic.py
Original file line number Diff line number Diff line change
@@ -1,55 +1,8 @@
from . import OPENSEARCH
from .client import get_client
from .logging import logger

import os


if OPENSEARCH:
from opensearchpy import OpenSearch
else:
from elasticsearch import Elasticsearch


def get_search_client(elasticsearch_server_baseurl: str = ""):
"""search client for query or ingest"""

raw_addr = elasticsearch_server_baseurl or os.environ.get("INDEX_SERVER", "")
use_ssl = bool(int(os.environ.get("INDEX_USE_SSL", "0")))
addresses = [x for x in raw_addr.split(",") if x.strip()]
if not addresses:
addresses.append("127.0.0.1:9200")

# TODO: more auth options (cert, bearer token, api-key, etc)
auth = (
os.environ.get("INDEX_LOGIN", "admin"),
os.environ.get("INDEX_PASSWORD", "admin"),
)

if OPENSEARCH:
logger.info(f"Use OpenSearch client at {addresses}")
hosts = []
for address in addresses:
host, port = address.rsplit(":", 1)
hosts.append({"host": host, "port": port})
client = OpenSearch(
hosts=hosts,
http_auth=auth,
use_ssl=use_ssl,
verify_certs=False,
)
info = client.info()
logger.info(f"OpenSearch client info: {info}")
else:
logger.info(f"Use ElasticSearch client at {addresses}")
client = Elasticsearch(
addresses,
use_ssl=use_ssl,
basic_auth=auth,
verify_certs=False,
)
return client


def get_inDEX_client(elasticsearch_server_baseurl=None):
logger.warn("get_index_client is deprecated, use get_search_client instead")
return get_search_client(elasticsearch_server_baseurl)
def get_ingest_client(elasticsearch_server_baseurl=None):
# to be removed in a 3.x release
logger.warn(".elastic.get_client is deprecated, use .client.get_client instead")
return get_client(elasticsearch_server_baseurl)
18 changes: 10 additions & 8 deletions src/collective/elastic/ingest/ingest/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from .. import OPENSEARCH
from ..analysis import update_analysis
from ..elastic import get_ingest_client
from ..client import get_client
from ..logging import logger
from ..mapping import create_or_update_mapping
from ..mapping import expanded_processors
Expand All @@ -27,7 +27,7 @@ def _es_pipeline_name(index_name):

def setup_ingest_pipelines(full_schema, index_name):
logger.debug("setup ingest piplines")
es = get_ingest_client()
client = get_client()
pipeline_name = _es_pipeline_name(index_name)
pipelines = {
"description": "Extract Plone Binary attachment information",
Expand All @@ -48,12 +48,14 @@ def setup_ingest_pipelines(full_schema, index_name):
logger.info(f"update ingest pipelines {pipeline_name}")
logger.debug(f"pipeline definitions:\n{pipelines}")
if OPENSEARCH:
es.ingest.put_pipeline(id=pipeline_name, body=pipelines)
client.ingest.put_pipeline(id=pipeline_name, body=pipelines)
else:
es.ingest.put_pipeline(id=pipeline_name, processors=pipelines["processors"])
client.ingest.put_pipeline(
id=pipeline_name, processors=pipelines["processors"]
)
else:
logger.info(f"delete ingest pipelines {pipeline_name}")
es.ingest.delete_pipeline(pipeline_name)
client.ingest.delete_pipeline(pipeline_name)


def ingest(content, full_schema, index_name):
Expand All @@ -79,11 +81,11 @@ def ingest(content, full_schema, index_name):
postprocess(content, info)

logger.info(f"Index content: {pformat(content)}")
es = get_ingest_client()
es_kwargs = dict(
client = get_client()
kwargs = dict(
index=index_name,
id=content["UID"],
pipeline=_es_pipeline_name(index_name),
body=content,
)
es.index(**es_kwargs)
client.index(**kwargs)
17 changes: 8 additions & 9 deletions src/collective/elastic/ingest/mapping.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .elastic import get_ingest_client
from .client import get_client
from .logging import logger
from copy import deepcopy

Expand Down Expand Up @@ -128,15 +128,15 @@ def _replacement_detector(field, properties, definition, fqfieldname, seen):


def create_or_update_mapping(full_schema, index_name):
es = get_ingest_client()
if es is None:
logger.warning("No ElasticSearch client available.")
client = get_client()
if client is None:
logger.warning("No index client available.")
return

# get current mapping
index_exists = es.indices.exists(index=index_name)
index_exists = client.indices.exists(index=index_name)
if index_exists:
original_mapping = es.indices.get_mapping(index=index_name)[index_name]
original_mapping = client.indices.get_mapping(index=index_name)[index_name]
mapping = deepcopy(original_mapping)
if "properties" not in mapping["mappings"]:
mapping["mappings"]["properties"] = {}
Expand Down Expand Up @@ -197,14 +197,13 @@ def create_or_update_mapping(full_schema, index_name):
json.dumps(mapping["mappings"], sort_keys=True, indent=2)
)
)
es.indices.put_mapping(
client.indices.put_mapping(
index=[index_name],
body=mapping["mappings"],
)
else:
logger.debug("No update necessary. Mapping is unchanged.")
else:
# from celery.contrib import rdb; rdb.set_trace()
logger.info("Create index with mapping.")
logger.debug(f"mapping is:\n{json.dumps(mapping, sort_keys=True, indent=2)}")
es.indices.create(index=index_name, mappings=mapping["mappings"])
client.indices.create(index=index_name, mappings=mapping["mappings"])
10 changes: 5 additions & 5 deletions src/collective/elastic/ingest/removal.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from .elastic import get_ingest_client
from .client import get_client
from .logging import logger


def remove(uid, index_name):
es = get_ingest_client()
if es is None:
logger.warning("No ElasticSearch client available.")
client = get_client()
if client is None:
logger.warning("No index client available.")
return
try:
es.delete(index=index_name, id=uid)
client.delete(index=index_name, id=uid)
except Exception:
logger.exception("unindexing of {} on index {} failed".format(uid, index_name))

0 comments on commit 66d46b8

Please sign in to comment.