diff --git a/api/local.env b/api/local.env index db0f89123..2bfd4fca1 100644 --- a/api/local.env +++ b/api/local.env @@ -65,10 +65,10 @@ HIDE_SQL_PARAMETER_LOGS=TRUE # Opensearch Environment Variables ############################ -OPENSEARCH_HOST=opensearch-node -OPENSEARCH_PORT=9200 -OPENSEARCH_USE_SSL=FALSE -OPENSEARCH_VERIFY_CERTS=FALSE +SEARCH_ENDPOINT=opensearch-node +SEARCH_PORT=9200 +SEARCH_USE_SSL=FALSE +SEARCH_VERIFY_CERTS=FALSE ############################ # AWS Defaults @@ -126,4 +126,4 @@ IS_LOCAL_FOREIGN_TABLE=true ############################ # File path for the export_opportunity_data task -EXPORT_OPP_DATA_FILE_PATH=/tmp \ No newline at end of file +EXPORT_OPP_DATA_FILE_PATH=/tmp diff --git a/api/src/adapters/search/opensearch_client.py b/api/src/adapters/search/opensearch_client.py index 3ce90abe9..e3d186a9f 100644 --- a/api/src/adapters/search/opensearch_client.py +++ b/api/src/adapters/search/opensearch_client.py @@ -1,7 +1,6 @@ import logging from typing import Any, Generator, Iterable -import boto3 import opensearchpy from src.adapters.search.opensearch_config import OpensearchConfig, get_opensearch_config @@ -253,24 +252,25 @@ def scroll( def _get_connection_parameters(opensearch_config: OpensearchConfig) -> dict[str, Any]: - # See: https://opensearch.org/docs/latest/clients/python-low-level/#connecting-to-amazon-opensearch-serverless + # See: https://opensearch.org/docs/latest/clients/python-low-level/#connecting-to-opensearch # for further details on configuring the connection to OpenSearch - params = dict( - hosts=[{"host": opensearch_config.host, "port": opensearch_config.port}], + hosts=[{"host": opensearch_config.search_endpoint, "port": opensearch_config.search_port}], http_compress=True, - use_ssl=opensearch_config.use_ssl, - verify_certs=opensearch_config.verify_certs, + use_ssl=opensearch_config.search_use_ssl, + verify_certs=opensearch_config.search_verify_certs, connection_class=opensearchpy.RequestsHttpConnection, - pool_maxsize=opensearch_config.connection_pool_size, + pool_maxsize=opensearch_config.search_connection_pool_size, ) - # If an AWS region is set, we assume we're running non-locally - # and will attempt to authenticate with AOSS - if opensearch_config.aws_region is not None: - # Get credentials and authorize with AWS Opensearch Serverless (aoss) - credentials = boto3.Session().get_credentials() - auth = opensearchpy.AWSV4SignerAuth(credentials, opensearch_config.aws_region, "aoss") + # We'll assume if the aws_region is set, we're running in AWS + # and should connect using the session credentials + if opensearch_config.aws_region: + # Get credentials and authorize with AWS Opensearch Serverless (es) + # TODO - once we have the user setup in Opensearch, we want to change to this approach + # credentials = boto3.Session().get_credentials() + # auth = opensearchpy.AWSV4SignerAuth(credentials, opensearch_config.aws_region, "es") + auth = (opensearch_config.search_username, opensearch_config.search_password) params["http_auth"] = auth return params diff --git a/api/src/adapters/search/opensearch_config.py b/api/src/adapters/search/opensearch_config.py index c96e73cb7..a6bd34e1f 100644 --- a/api/src/adapters/search/opensearch_config.py +++ b/api/src/adapters/search/opensearch_config.py @@ -1,7 +1,6 @@ import logging from pydantic import Field -from pydantic_settings import SettingsConfigDict from src.util.env_config import PydanticBaseEnvConfig @@ -9,18 +8,18 @@ class OpensearchConfig(PydanticBaseEnvConfig): - model_config = SettingsConfigDict(env_prefix="OPENSEARCH_") - # TODO - hacky fix to get the API working again, host/port should - # be defined in terraform env vars - host: str = Field(default="NOT_DEFINED") # OPENSEARCH_HOST - port: int = Field(default=1) # OPENSEARCH_PORT - use_ssl: bool = Field(default=True) # OPENSEARCH_USE_SSL - verify_certs: bool = Field(default=True) # OPENSEARCH_VERIFY_CERTS - connection_pool_size: int = Field(default=10) # OPENSEARCH_CONNECTION_POOL_SIZE + search_endpoint: str = Field(default="NOT_DEFINED") # SEARCH_ENDPOINT + search_port: int = Field(default=443) # SEARCH_PORT - # AWS configuration - aws_region: str | None = Field(default=None) # OPENSEARCH_AWS_REGION + search_username: str | None = Field(default=None) # SEARCH_USERNAME + search_password: str | None = Field(default=None) # SEARCH_PASSWORD + + search_use_ssl: bool = Field(default=True) # SEARCH_USE_SSL + search_verify_certs: bool = Field(default=True) # SEARCH_VERIFY_CERTS + search_connection_pool_size: int = Field(default=10) # SEARCH_CONNECTION_POOL_SIZE + + aws_region: str | None = Field(default=None) def get_opensearch_config() -> OpensearchConfig: @@ -29,11 +28,11 @@ def get_opensearch_config() -> OpensearchConfig: logger.info( "Constructed opensearch configuration", extra={ - "host": opensearch_config.host, - "port": opensearch_config.port, - "use_ssl": opensearch_config.use_ssl, - "verify_certs": opensearch_config.verify_certs, - "connection_pool_size": opensearch_config.connection_pool_size, + "search_endpoint": opensearch_config.search_endpoint, + "search_port": opensearch_config.search_port, + "search_use_ssl": opensearch_config.search_use_ssl, + "search_verify_certs": opensearch_config.search_verify_certs, + "search_connection_pool_size": opensearch_config.search_connection_pool_size, }, ) diff --git a/api/src/search/backend/load_opportunities_to_index.py b/api/src/search/backend/load_opportunities_to_index.py index dcf778037..61deb895f 100644 --- a/api/src/search/backend/load_opportunities_to_index.py +++ b/api/src/search/backend/load_opportunities_to_index.py @@ -118,7 +118,7 @@ def fetch_opportunities(self) -> Iterator[Sequence[Opportunity]]: CurrentOpportunitySummary.opportunity_status.isnot(None), ) .options(selectinload("*"), noload(Opportunity.all_opportunity_summaries)) - .execution_options(yield_per=5000) + .execution_options(yield_per=1000) ) .scalars() .partitions() diff --git a/api/src/search/backend/load_search_data.py b/api/src/search/backend/load_search_data.py index 38f26a7f4..f7119d4b3 100644 --- a/api/src/search/backend/load_search_data.py +++ b/api/src/search/backend/load_search_data.py @@ -3,6 +3,7 @@ import src.adapters.db as db import src.adapters.search as search from src.adapters.db import flask_db +from src.adapters.search import flask_opensearch from src.search.backend.load_opportunities_to_index import LoadOpportunitiesToIndex from src.search.backend.load_search_data_blueprint import load_search_data_blueprint from src.task.ecs_background_task import ecs_background_task @@ -17,8 +18,9 @@ help="Whether to run a full refresh, or only incrementally update oppportunities", ) @flask_db.with_db_session() +@flask_opensearch.with_search_client() @ecs_background_task(task_name="load-opportunity-data-opensearch") -def load_opportunity_data(db_session: db.Session, full_refresh: bool) -> None: - search_client = search.SearchClient() - +def load_opportunity_data( + search_client: search.SearchClient, db_session: db.Session, full_refresh: bool +) -> None: LoadOpportunitiesToIndex(db_session, search_client, full_refresh).run() diff --git a/api/tests/src/adapters/search/test_opensearch_client.py b/api/tests/src/adapters/search/test_opensearch_client.py index b60653142..ef201eeb4 100644 --- a/api/tests/src/adapters/search/test_opensearch_client.py +++ b/api/tests/src/adapters/search/test_opensearch_client.py @@ -206,7 +206,7 @@ def test_get_connection_parameters(): # Mostly validating defaults get used assert params == { - "hosts": [{"host": config.host, "port": 9200}], + "hosts": [{"host": config.search_endpoint, "port": 9200}], "http_compress": True, "use_ssl": False, "verify_certs": False,