Skip to content

Commit

Permalink
Upgrade ES dependencies to match cluster version (#3029)
Browse files Browse the repository at this point in the history
* Upgrade ES dependencies to match cluster version

* Fix ingestion server test ES connection instantiation

* Remove unnecessary error catching

* Reduce redundant variable names

* Replace `http_auth` with `basic_auth`

* Workaround Elasticsearch client's lack of AWS auth support

* Remove IAM based authentication

We do not use this in production and Elasticsearch client no longer supports it directly; we will try and see if everything still works if we remove it entirely and if not, we can employ the nasty/hacky workaround

* Remove unnecessary transport node specification
  • Loading branch information
sarayourfriend authored Sep 19, 2023
1 parent aa005ce commit 95b5911
Show file tree
Hide file tree
Showing 16 changed files with 762 additions and 875 deletions.
3 changes: 2 additions & 1 deletion api/Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ django-tqdm = "~=1.3"
django-uuslug = "~=2.0"
djangorestframework = "~=3.14"
drf-spectacular = "*"
elasticsearch-dsl = "~=7.4"
elasticsearch = "==8.8.2"
elasticsearch-dsl = "~=8.9"
future = "~=0.18"
gunicorn = "~=21.2"
limit = "~=0.2"
Expand Down
484 changes: 226 additions & 258 deletions api/Pipfile.lock

Large diffs are not rendered by default.

20 changes: 9 additions & 11 deletions api/api/models/media.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from django.db import models
from django.utils.html import format_html

from elasticsearch import Elasticsearch, TransportError
from elasticsearch import Elasticsearch, NotFoundError

from api.models.base import OpenLedgerModel
from api.models.mixins import ForeignIdentifierMixin, IdentifierMixin, MediaMixin
Expand Down Expand Up @@ -275,16 +275,14 @@ def _perform_index_update(self, method: str, raise_errors: bool, **es_method_arg
refresh=True,
**es_method_args,
)
except TransportError as e:
if e.status_code == 404:
# This is expected for the filtered index, but we should still
# log, just in case.
logger.warning(
f"Document with _id {document_id} not found "
f"in {index} index. No update performed."
)
else:
raise e
except NotFoundError:
# This is expected for the filtered index, but we should still
# log, just in case.
logger.warning(
f"Document with _id {document_id} not found "
f"in {index} index. No update performed."
)
continue


class AbstractDeletedMedia(PerformIndexUpdateMixin, OpenLedgerModel):
Expand Down
35 changes: 12 additions & 23 deletions api/conf/settings/elasticsearch.py
Original file line number Diff line number Diff line change
@@ -1,55 +1,44 @@
"""This file contains configuration pertaining to Elasticsearch."""

from aws_requests_auth.aws_auth import AWSRequestsAuth
from decouple import config
from elasticsearch import Elasticsearch, RequestsHttpConnection
from elasticsearch import Elasticsearch
from elasticsearch_dsl import connections

from api.constants.media_types import MEDIA_TYPES
from conf.settings.aws import AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY


def _elasticsearch_connect():
def _elasticsearch_connect() -> tuple[Elasticsearch, str]:
"""
Connect to configured Elasticsearch domain.
:return: An Elasticsearch connection object.
"""

es_scheme = config("ELASTICSEARCH_SCHEME", default="http://")
es_url = config("ELASTICSEARCH_URL", default="localhost")
es_port = config("ELASTICSEARCH_PORT", default=9200, cast=int)
es_aws_region = config("ELASTICSEARCH_AWS_REGION", default="us-east-1")

auth = AWSRequestsAuth(
aws_access_key=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
aws_host=es_url,
aws_region=es_aws_region,
aws_service="es",
)
auth.encode = lambda x: bytes(x.encode("utf-8"))

es_endpoint = f"{es_scheme}{es_url}:{es_port}"

_es = Elasticsearch(
host=es_url,
port=es_port,
connection_class=RequestsHttpConnection,
timeout=10,
es_endpoint,
request_timeout=10,
max_retries=1,
retry_on_timeout=True,
http_auth=auth,
wait_for_status="yellow",
)
_es.info()
return _es
_es.cluster.health(wait_for_status="yellow")
return _es, es_endpoint


SETUP_ES = config("SETUP_ES", default=True, cast=bool)
if SETUP_ES:
ES = _elasticsearch_connect()
ES, ES_ENDPOINT = _elasticsearch_connect()
#: Elasticsearch client, also aliased to connection 'default'

connections.add_connection("default", ES)
else:
ES = None
ES, ES_ENDPOINT = None, None

MEDIA_INDEX_MAPPING = {
media_type: config(f"{media_type.upper()}_INDEX_NAME", default=media_type)
Expand Down
7 changes: 6 additions & 1 deletion api/test/factory/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
from test.factory.models.audio import (
AudioAddOnFactory,
AudioFactory,
AudioReportFactory,
MatureAudioFactory,
)
from test.factory.models.image import ImageFactory, MatureImageFactory
from test.factory.models.image import (
ImageFactory,
ImageReportFactory,
MatureImageFactory,
)
from test.factory.models.oauth2 import (
AccessTokenFactory,
OAuth2RegistrationFactory,
Expand Down
9 changes: 8 additions & 1 deletion api/test/factory/models/audio.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import factory
from factory.django import DjangoModelFactory

from api.models.audio import Audio, AudioAddOn, MatureAudio
from api.models.audio import Audio, AudioAddOn, AudioReport, MatureAudio


class MatureAudioFactory(DjangoModelFactory):
Expand All @@ -28,3 +28,10 @@ class Meta:
audio_identifier = IdentifierFactory(AudioFactory)

waveform_peaks = Faker("waveform")


class AudioReportFactory(DjangoModelFactory):
class Meta:
model = AudioReport

media_obj = factory.SubFactory(AudioFactory)
11 changes: 9 additions & 2 deletions api/test/factory/models/image.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from test.factory.models.media import MediaFactory
from test.factory.models.media import MediaFactory, MediaReportFactory

import factory
from factory.django import DjangoModelFactory

from api.models.image import Image, MatureImage
from api.models.image import Image, ImageReport, MatureImage


class MatureImageFactory(DjangoModelFactory):
Expand All @@ -18,3 +18,10 @@ class ImageFactory(MediaFactory):

class Meta:
model = Image


class ImageReportFactory(MediaReportFactory):
class Meta:
model = ImageReport

media_obj = factory.SubFactory(ImageFactory)
17 changes: 17 additions & 0 deletions api/test/factory/models/media.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from django.conf import settings

import factory
import pook
from elasticsearch import Elasticsearch
from elasticsearch_dsl.response import Hit
from factory.django import DjangoModelFactory
Expand Down Expand Up @@ -95,6 +96,12 @@ def create(cls, *args, **kwargs) -> AbstractMedia | tuple[AbstractMedia, Hit]:
skip_es = kwargs.pop("skip_es", False)
with_hit = kwargs.pop("with_hit", False)

pook_active = pook.isactive()
if pook_active:
# Temporarily disable pook so that the calls to ES to create
# the factory document don't fail
pook.disable()

model_class = cls._meta.get_model_class()
if cls._highest_pre_existing_pk is None:
response = settings.ES.search(
Expand All @@ -121,6 +128,10 @@ def create(cls, *args, **kwargs) -> AbstractMedia | tuple[AbstractMedia, Hit]:
if mature_reported:
cls._mature_factory.create(media_obj=model)

if pook_active:
# Reactivate pook if it was active
pook.activate()

if with_hit:
return model, hit

Expand Down Expand Up @@ -154,6 +165,7 @@ def _save_model_to_es(

origin_index = media._meta.db_table
source_document = cls._create_es_source_document(media, mature)

es.create(
index=origin_index,
id=str(media.pk),
Expand Down Expand Up @@ -190,3 +202,8 @@ class IdentifierFactory(factory.SubFactory):
def evaluate(self, instance, step, extra):
model = super().evaluate(instance, step, extra)
return model.identifier


class MediaReportFactory(DjangoModelFactory):
class Meta:
abstract = True
42 changes: 40 additions & 2 deletions api/test/unit/conftest.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,28 @@
from dataclasses import dataclass
from test.factory import models as model_factories
from test.factory.models.media import CREATED_BY_FIXTURE_MARKER, MediaFactory
from test.factory.models.media import (
CREATED_BY_FIXTURE_MARKER,
MediaFactory,
MediaReportFactory,
)
from unittest.mock import MagicMock

from rest_framework.test import APIClient, APIRequestFactory

import pook
import pytest
from elasticsearch import Elasticsearch
from fakeredis import FakeRedis

from api.models import (
Audio,
DeletedAudio,
DeletedImage,
Image,
MatureAudio,
MatureImage,
)
from api.models.media import AbstractDeletedMedia, AbstractMatureMedia, AbstractMedia
from api.serializers.audio_serializers import (
AudioReportRequestSerializer,
AudioSearchRequestSerializer,
Expand Down Expand Up @@ -66,10 +80,18 @@ class MediaTypeConfig:
origin_index: str
filtered_index: str
model_factory: MediaFactory
model_class: AbstractMedia
mature_factory: MediaFactory
mature_class: AbstractMatureMedia
search_request_serializer: MediaSearchRequestSerializer
model_serializer: MediaSerializer
report_serializer: MediaReportRequestSerializer
report_factory: MediaReportFactory
deleted_class: AbstractDeletedMedia

@property
def indexes(self):
return (self.origin_index, self.filtered_index)


MEDIA_TYPE_CONFIGS = {
Expand All @@ -79,21 +101,29 @@ class MediaTypeConfig:
origin_index="image",
filtered_index="image-filtered",
model_factory=model_factories.ImageFactory,
model_class=Image,
mature_factory=model_factories.MatureImageFactory,
search_request_serializer=ImageSearchRequestSerializer,
model_serializer=ImageSerializer,
report_serializer=ImageReportRequestSerializer,
report_factory=model_factories.ImageReportFactory,
mature_class=MatureImage,
deleted_class=DeletedImage,
),
"audio": MediaTypeConfig(
media_type="audio",
url_prefix="audio",
origin_index="audio",
filtered_index="audio-filtered",
model_factory=model_factories.AudioFactory,
model_class=Audio,
mature_factory=model_factories.MatureAudioFactory,
search_request_serializer=AudioSearchRequestSerializer,
model_serializer=AudioSerializer,
report_serializer=AudioReportRequestSerializer,
report_factory=model_factories.AudioReportFactory,
mature_class=MatureAudio,
deleted_class=DeletedAudio,
),
}

Expand Down Expand Up @@ -129,8 +159,16 @@ def cleanup_elasticsearch_test_documents(request, settings):

es: Elasticsearch = settings.ES

# If pook was activated by a test and not deactivated
# (usually because the test failed and something prevent
# pook from cleaning up after itself), disable here so that
# the ES request on the next line doesn't get intercepted,
# causing pook to raise an exception about the request not
# matching and the fixture documents not getting cleaned.
pook.disable()

es.delete_by_query(
index="*",
body={"query": {"match": {"tags.name": CREATED_BY_FIXTURE_MARKER}}},
query={"match": {"tags.name": CREATED_BY_FIXTURE_MARKER}},
refresh=True,
)
21 changes: 12 additions & 9 deletions api/test/unit/controllers/test_search_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -576,15 +576,19 @@ def test_no_post_process_results_recursion(
hit_count=hit_count,
)

es_host = settings.ES.transport.kwargs["host"]
es_port = settings.ES.transport.kwargs["port"]

# `origin_index` enforced by passing `exact_index=True` below.
es_endpoint = (
f"http://{es_host}:{es_port}/{image_media_type_config.origin_index}/_search"
f"{settings.ES_ENDPOINT}/{image_media_type_config.origin_index}/_search"
)

mock_search = pook.post(es_endpoint).times(1).reply(200).json(mock_es_response).mock
mock_search = (
pook.post(es_endpoint)
.times(1)
.reply(200)
.header("x-elastic-product", "Elasticsearch")
.json(mock_es_response)
.mock
)

# Ensure dead link filtering does not remove any results
pook.head(
Expand Down Expand Up @@ -682,12 +686,9 @@ def test_post_process_results_recurses_as_needed(
base_hits=mock_es_response_1["hits"]["hits"],
)

es_host = settings.ES.transport.kwargs["host"]
es_port = settings.ES.transport.kwargs["port"]

# `origin_index` enforced by passing `exact_index=True` below.
es_endpoint = (
f"http://{es_host}:{es_port}/{image_media_type_config.origin_index}/_search"
f"{settings.ES_ENDPOINT}/{image_media_type_config.origin_index}/_search"
)

# `from` is always 0 if there is no query mask
Expand All @@ -703,6 +704,7 @@ def test_post_process_results_recurses_as_needed(
.body(re.compile('from":0'))
.times(1)
.reply(200)
.header("x-elastic-product", "Elasticsearch")
.json(mock_es_response_1)
.mock
)
Expand All @@ -714,6 +716,7 @@ def test_post_process_results_recurses_as_needed(
.body(re.compile('from":0'))
.times(1)
.reply(200)
.header("x-elastic-product", "Elasticsearch")
.json(mock_es_response_2)
.mock
)
Expand Down
Loading

0 comments on commit 95b5911

Please sign in to comment.