Skip to content

Commit

Permalink
ci: Update postgres tests to use testcontainers (#2650)
Browse files Browse the repository at this point in the history
* ci: Add a postgres testcontainer for tests

Signed-off-by: Achal Shah <[email protected]>

* ci: Update postgres tests to use test containers

Signed-off-by: Achal Shah <[email protected]>

* ci: Update postgres tests to use test containers

Signed-off-by: Achal Shah <[email protected]>
  • Loading branch information
achals authored May 9, 2022
1 parent 5b4b07f commit 640ff12
Show file tree
Hide file tree
Showing 7 changed files with 199 additions and 38 deletions.
1 change: 1 addition & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ The services with containerized replacements currently implemented are:
- Redis
- Trino
- HBase
- Postgres

You can run `make test-python-integration-container` to run tests against the containerized versions of dependencies.

Expand Down
7 changes: 5 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,15 @@ test-python-universal-postgres:
FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.offline_stores.contrib.postgres_repo_configuration \
FEAST_USAGE=False \
IS_TEST=True \
python -m pytest --integration --universal \
python -m pytest -x --integration --universal \
-k "not test_historical_retrieval_fails_on_validation and \
not test_historical_retrieval_with_validation and \
not test_historical_features_persisting and \
not test_historical_retrieval_fails_on_validation and \
not test_universal_cli" \
not test_universal_cli and \
not test_go_feature_server and \
not test_feature_logging and \
not test_universal_types" \
sdk/python/tests

test-python-universal-local:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,10 @@
PostgreSQLDataSourceCreator,
)

POSTGRES_ONLINE_CONFIG = {
"type": "postgres",
"host": "localhost",
"port": "5432",
"database": "postgres",
"db_schema": "feature_store",
"user": "postgres",
"password": "docker",
}

FULL_REPO_CONFIGS = [
IntegrationTestRepoConfig(
provider="local",
offline_store_creator=PostgreSQLDataSourceCreator,
online_store=POSTGRES_ONLINE_CONFIG,
online_store_creator=PostgreSQLDataSourceCreator,
),
]
2 changes: 1 addition & 1 deletion sdk/python/feast/infra/online_stores/contrib/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def online_write_batch(
created_ts,
)
)
# Controll the batch so that we can update the progress
# Control the batch so that we can update the progress
batch_size = 5000
for i in range(0, len(insert_values), batch_size):
cur_batch = insert_values[i : i + batch_size]
Expand Down
54 changes: 53 additions & 1 deletion sdk/python/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,16 +204,68 @@ def teardown():
return TrinoContainerSingleton


class PostgresContainerSingleton:
container = None
is_running = False

postgres_user = "test"
postgres_password = "test"
postgres_db = "test"

@classmethod
def get_singleton(cls):
if not cls.is_running:
cls.container = (
DockerContainer("postgres:latest")
.with_exposed_ports(5432)
.with_env("POSTGRES_USER", cls.postgres_user)
.with_env("POSTGRES_PASSWORD", cls.postgres_password)
.with_env("POSTGRES_DB", cls.postgres_db)
)

cls.container.start()
log_string_to_wait_for = "database system is ready to accept connections"
waited = wait_for_logs(
container=cls.container,
predicate=log_string_to_wait_for,
timeout=30,
interval=10,
)
logger.info("Waited for %s seconds until postgres container was up", waited)
cls.is_running = True
return cls.container

@classmethod
def teardown(cls):
if cls.container:
cls.container.stop()


@pytest.fixture(scope="session")
def postgres_fixture(request):
def teardown():
PostgresContainerSingleton.teardown()

request.addfinalizer(teardown)
return PostgresContainerSingleton


@pytest.fixture(
params=FULL_REPO_CONFIGS, scope="session", ids=[str(c) for c in FULL_REPO_CONFIGS]
)
def environment(request, worker_id: str, trino_fixture):
def environment(request, worker_id: str, trino_fixture, postgres_fixture):
if "TrinoSourceCreator" in request.param.offline_store_creator.__name__:
e = construct_test_environment(
request.param,
worker_id=worker_id,
offline_container=trino_fixture.get_singleton(),
)
elif "PostgresSourceCreator" in request.param.offline_store_creator.__name__:
e = construct_test_environment(
request.param,
worker_id=worker_id,
offline_container=postgres_fixture.get_singleton(),
)
else:
e = construct_test_environment(request.param, worker_id=worker_id)
proc = Process(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,38 +1,86 @@
from typing import Dict, List, Optional
import logging
from typing import Dict, Optional

import pandas as pd
from testcontainers.core.container import DockerContainer
from testcontainers.core.waiting_utils import wait_for_logs

from feast.data_source import DataSource
from feast.infra.offline_stores.contrib.postgres_offline_store.postgres import (
PostgreSQLOfflineStoreConfig,
PostgreSQLSource,
)
from feast.infra.utils.postgres.connection_utils import _get_conn, df_to_postgres_table
from feast.infra.utils.postgres.connection_utils import df_to_postgres_table
from feast.repo_config import FeastConfigBaseModel
from tests.integration.feature_repos.universal.data_source_creator import (
DataSourceCreator,
)
from tests.integration.feature_repos.universal.online_store_creator import (
OnlineStoreCreator,
)

logger = logging.getLogger(__name__)

class PostgreSQLDataSourceCreator(DataSourceCreator):
tables: List[str] = []

def __init__(self, project_name: str, *args, **kwargs):
super().__init__(project_name)
self.project_name = project_name
class PostgresSourceCreatorSingleton:
postgres_user = "test"
postgres_password = "test"
postgres_db = "test"

running = False

project_name = None
container = None
provided_container = None

self.offline_store_config = PostgreSQLOfflineStoreConfig(
offline_store_config = None

@classmethod
def initialize(cls, project_name: str, *args, **kwargs):
cls.project_name = project_name

if "offline_container" not in kwargs or not kwargs.get(
"offline_container", None
):
# If we don't get an offline container provided, we try to create it on the fly.
# the problem here is that each test creates its own container, which basically
# browns out developer laptops.
cls.container = (
DockerContainer("postgres:latest")
.with_exposed_ports(5432)
.with_env("POSTGRES_USER", cls.postgres_user)
.with_env("POSTGRES_PASSWORD", cls.postgres_password)
.with_env("POSTGRES_DB", cls.postgres_db)
)

cls.container.start()
cls.provided_container = False
log_string_to_wait_for = "database system is ready to accept connections"
waited = wait_for_logs(
container=cls.container,
predicate=log_string_to_wait_for,
timeout=30,
interval=10,
)
logger.info("Waited for %s seconds until postgres container was up", waited)
cls.running = True
else:
cls.provided_container = True
cls.container = kwargs["offline_container"]

cls.offline_store_config = PostgreSQLOfflineStoreConfig(
type="postgres",
host="localhost",
port=5432,
database="postgres",
port=cls.container.get_exposed_port(5432),
database=cls.container.env["POSTGRES_DB"],
db_schema="public",
user="postgres",
password="docker",
user=cls.container.env["POSTGRES_USER"],
password=cls.container.env["POSTGRES_PASSWORD"],
)

@classmethod
def create_data_source(
self,
cls,
df: pd.DataFrame,
destination_name: str,
suffix: Optional[str] = None,
Expand All @@ -41,11 +89,10 @@ def create_data_source(
field_mapping: Dict[str, str] = None,
) -> DataSource:

destination_name = self.get_prefixed_table_name(destination_name)
destination_name = cls.get_prefixed_table_name(destination_name)

df_to_postgres_table(self.offline_store_config, df, destination_name)

self.tables.append(destination_name)
if cls.offline_store_config:
df_to_postgres_table(cls.offline_store_config, df, destination_name)

return PostgreSQLSource(
name=destination_name,
Expand All @@ -55,17 +102,85 @@ def create_data_source(
field_mapping=field_mapping or {"ts_1": "ts"},
)

@classmethod
def create_offline_store_config(cls) -> PostgreSQLOfflineStoreConfig:
assert cls.offline_store_config
return cls.offline_store_config

@classmethod
def get_prefixed_table_name(cls, suffix: str) -> str:
return f"{cls.project_name}_{suffix}"

@classmethod
def create_online_store(cls) -> Dict[str, str]:
assert cls.container
return {
"type": "postgres",
"host": "localhost",
"port": cls.container.get_exposed_port(5432),
"database": cls.postgres_db,
"db_schema": "feature_store",
"user": cls.postgres_user,
"password": cls.postgres_password,
}

@classmethod
def create_saved_dataset_destination(cls):
# FIXME: ...
return None

@classmethod
def teardown(cls):
if not cls.provided_container and cls.running:
cls.container.stop()
cls.running = False
cls.container = None
cls.project = None


class PostgreSQLDataSourceCreator(DataSourceCreator, OnlineStoreCreator):

postgres_user = "test"
postgres_password = "test"
postgres_db = "test"

running = False

def __init__(self, project_name: str, *args, **kwargs):
super().__init__(project_name)
PostgresSourceCreatorSingleton.initialize(project_name, args, kwargs)

def create_data_source(
self,
df: pd.DataFrame,
destination_name: str,
suffix: Optional[str] = None,
timestamp_field="ts",
created_timestamp_column="created_ts",
field_mapping: Dict[str, str] = None,
) -> DataSource:

return PostgresSourceCreatorSingleton.create_data_source(
df,
destination_name,
suffix,
timestamp_field,
created_timestamp_column,
field_mapping,
)

def create_offline_store_config(self) -> FeastConfigBaseModel:
return self.offline_store_config
return PostgresSourceCreatorSingleton.create_offline_store_config()

def get_prefixed_table_name(self, suffix: str) -> str:
return f"{self.project_name}_{suffix}"
return PostgresSourceCreatorSingleton.get_prefixed_table_name(suffix)

def create_online_store(self) -> Dict[str, str]:
return PostgresSourceCreatorSingleton.create_online_store()

def create_saved_dataset_destination(self):
# FIXME: ...
return None

def teardown(self):
with _get_conn(self.offline_store_config) as conn, conn.cursor() as cur:
for table in self.tables:
cur.execute("DROP TABLE IF EXISTS " + table)
PostgresSourceCreatorSingleton.teardown()
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@
"pytest-mock==1.10.4",
"Sphinx!=4.0.0,<4.4.0",
"sphinx-rtd-theme",
"testcontainers>=3.5",
"testcontainers[postgresql]>=3.5",
"adlfs==0.5.9",
"firebase-admin==4.5.2",
"pre-commit",
Expand Down

0 comments on commit 640ff12

Please sign in to comment.