Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ci: Update postgres tests to use testcontainers #2650

Merged
merged 3 commits into from
May 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ The services with containerized replacements currently implemented are:
- Redis
- Trino
- HBase
- Postgres

You can run `make test-python-integration-container` to run tests against the containerized versions of dependencies.

Expand Down
7 changes: 5 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,15 @@ test-python-universal-postgres:
FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.offline_stores.contrib.postgres_repo_configuration \
FEAST_USAGE=False \
IS_TEST=True \
python -m pytest --integration --universal \
python -m pytest -x --integration --universal \
-k "not test_historical_retrieval_fails_on_validation and \
not test_historical_retrieval_with_validation and \
not test_historical_features_persisting and \
not test_historical_retrieval_fails_on_validation and \
not test_universal_cli" \
not test_universal_cli and \
not test_go_feature_server and \
not test_feature_logging and \
not test_universal_types" \
sdk/python/tests

test-python-universal-local:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,10 @@
PostgreSQLDataSourceCreator,
)

POSTGRES_ONLINE_CONFIG = {
"type": "postgres",
"host": "localhost",
"port": "5432",
"database": "postgres",
"db_schema": "feature_store",
"user": "postgres",
"password": "docker",
}

FULL_REPO_CONFIGS = [
IntegrationTestRepoConfig(
provider="local",
offline_store_creator=PostgreSQLDataSourceCreator,
online_store=POSTGRES_ONLINE_CONFIG,
online_store_creator=PostgreSQLDataSourceCreator,
),
]
2 changes: 1 addition & 1 deletion sdk/python/feast/infra/online_stores/contrib/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def online_write_batch(
created_ts,
)
)
# Controll the batch so that we can update the progress
# Control the batch so that we can update the progress
batch_size = 5000
for i in range(0, len(insert_values), batch_size):
cur_batch = insert_values[i : i + batch_size]
Expand Down
54 changes: 53 additions & 1 deletion sdk/python/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,16 +204,68 @@ def teardown():
return TrinoContainerSingleton


class PostgresContainerSingleton:
container = None
is_running = False

postgres_user = "test"
postgres_password = "test"
postgres_db = "test"

@classmethod
def get_singleton(cls):
if not cls.is_running:
cls.container = (
DockerContainer("postgres:latest")
.with_exposed_ports(5432)
.with_env("POSTGRES_USER", cls.postgres_user)
.with_env("POSTGRES_PASSWORD", cls.postgres_password)
.with_env("POSTGRES_DB", cls.postgres_db)
)

cls.container.start()
log_string_to_wait_for = "database system is ready to accept connections"
waited = wait_for_logs(
container=cls.container,
predicate=log_string_to_wait_for,
timeout=30,
interval=10,
)
logger.info("Waited for %s seconds until postgres container was up", waited)
cls.is_running = True
return cls.container

@classmethod
def teardown(cls):
if cls.container:
cls.container.stop()


@pytest.fixture(scope="session")
def postgres_fixture(request):
def teardown():
PostgresContainerSingleton.teardown()

request.addfinalizer(teardown)
return PostgresContainerSingleton


@pytest.fixture(
params=FULL_REPO_CONFIGS, scope="session", ids=[str(c) for c in FULL_REPO_CONFIGS]
)
def environment(request, worker_id: str, trino_fixture):
def environment(request, worker_id: str, trino_fixture, postgres_fixture):
if "TrinoSourceCreator" in request.param.offline_store_creator.__name__:
e = construct_test_environment(
request.param,
worker_id=worker_id,
offline_container=trino_fixture.get_singleton(),
)
elif "PostgresSourceCreator" in request.param.offline_store_creator.__name__:
e = construct_test_environment(
request.param,
worker_id=worker_id,
offline_container=postgres_fixture.get_singleton(),
)
else:
e = construct_test_environment(request.param, worker_id=worker_id)
proc = Process(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,38 +1,86 @@
from typing import Dict, List, Optional
import logging
from typing import Dict, Optional

import pandas as pd
from testcontainers.core.container import DockerContainer
from testcontainers.core.waiting_utils import wait_for_logs

from feast.data_source import DataSource
from feast.infra.offline_stores.contrib.postgres_offline_store.postgres import (
PostgreSQLOfflineStoreConfig,
PostgreSQLSource,
)
from feast.infra.utils.postgres.connection_utils import _get_conn, df_to_postgres_table
from feast.infra.utils.postgres.connection_utils import df_to_postgres_table
from feast.repo_config import FeastConfigBaseModel
from tests.integration.feature_repos.universal.data_source_creator import (
DataSourceCreator,
)
from tests.integration.feature_repos.universal.online_store_creator import (
OnlineStoreCreator,
)

logger = logging.getLogger(__name__)

class PostgreSQLDataSourceCreator(DataSourceCreator):
tables: List[str] = []

def __init__(self, project_name: str, *args, **kwargs):
super().__init__(project_name)
self.project_name = project_name
class PostgresSourceCreatorSingleton:
postgres_user = "test"
postgres_password = "test"
postgres_db = "test"

running = False

project_name = None
container = None
provided_container = None

self.offline_store_config = PostgreSQLOfflineStoreConfig(
offline_store_config = None

@classmethod
def initialize(cls, project_name: str, *args, **kwargs):
cls.project_name = project_name

if "offline_container" not in kwargs or not kwargs.get(
"offline_container", None
):
# If we don't get an offline container provided, we try to create it on the fly.
# the problem here is that each test creates its own container, which basically
# browns out developer laptops.
cls.container = (
DockerContainer("postgres:latest")
.with_exposed_ports(5432)
.with_env("POSTGRES_USER", cls.postgres_user)
.with_env("POSTGRES_PASSWORD", cls.postgres_password)
.with_env("POSTGRES_DB", cls.postgres_db)
)

cls.container.start()
cls.provided_container = False
log_string_to_wait_for = "database system is ready to accept connections"
waited = wait_for_logs(
container=cls.container,
predicate=log_string_to_wait_for,
timeout=30,
interval=10,
)
logger.info("Waited for %s seconds until postgres container was up", waited)
cls.running = True
else:
cls.provided_container = True
cls.container = kwargs["offline_container"]

cls.offline_store_config = PostgreSQLOfflineStoreConfig(
type="postgres",
host="localhost",
port=5432,
database="postgres",
port=cls.container.get_exposed_port(5432),
database=cls.container.env["POSTGRES_DB"],
db_schema="public",
user="postgres",
password="docker",
user=cls.container.env["POSTGRES_USER"],
password=cls.container.env["POSTGRES_PASSWORD"],
)

@classmethod
def create_data_source(
self,
cls,
df: pd.DataFrame,
destination_name: str,
suffix: Optional[str] = None,
Expand All @@ -41,11 +89,10 @@ def create_data_source(
field_mapping: Dict[str, str] = None,
) -> DataSource:

destination_name = self.get_prefixed_table_name(destination_name)
destination_name = cls.get_prefixed_table_name(destination_name)

df_to_postgres_table(self.offline_store_config, df, destination_name)

self.tables.append(destination_name)
if cls.offline_store_config:
df_to_postgres_table(cls.offline_store_config, df, destination_name)

return PostgreSQLSource(
name=destination_name,
Expand All @@ -55,17 +102,85 @@ def create_data_source(
field_mapping=field_mapping or {"ts_1": "ts"},
)

@classmethod
def create_offline_store_config(cls) -> PostgreSQLOfflineStoreConfig:
assert cls.offline_store_config
return cls.offline_store_config

@classmethod
def get_prefixed_table_name(cls, suffix: str) -> str:
return f"{cls.project_name}_{suffix}"

@classmethod
def create_online_store(cls) -> Dict[str, str]:
assert cls.container
return {
"type": "postgres",
"host": "localhost",
"port": cls.container.get_exposed_port(5432),
"database": cls.postgres_db,
"db_schema": "feature_store",
"user": cls.postgres_user,
"password": cls.postgres_password,
}

@classmethod
def create_saved_dataset_destination(cls):
# FIXME: ...
return None

@classmethod
def teardown(cls):
if not cls.provided_container and cls.running:
cls.container.stop()
cls.running = False
cls.container = None
cls.project = None


class PostgreSQLDataSourceCreator(DataSourceCreator, OnlineStoreCreator):

postgres_user = "test"
postgres_password = "test"
postgres_db = "test"

running = False

def __init__(self, project_name: str, *args, **kwargs):
super().__init__(project_name)
PostgresSourceCreatorSingleton.initialize(project_name, args, kwargs)

def create_data_source(
self,
df: pd.DataFrame,
destination_name: str,
suffix: Optional[str] = None,
timestamp_field="ts",
created_timestamp_column="created_ts",
field_mapping: Dict[str, str] = None,
) -> DataSource:

return PostgresSourceCreatorSingleton.create_data_source(
df,
destination_name,
suffix,
timestamp_field,
created_timestamp_column,
field_mapping,
)

def create_offline_store_config(self) -> FeastConfigBaseModel:
return self.offline_store_config
return PostgresSourceCreatorSingleton.create_offline_store_config()

def get_prefixed_table_name(self, suffix: str) -> str:
return f"{self.project_name}_{suffix}"
return PostgresSourceCreatorSingleton.get_prefixed_table_name(suffix)

def create_online_store(self) -> Dict[str, str]:
return PostgresSourceCreatorSingleton.create_online_store()

def create_saved_dataset_destination(self):
# FIXME: ...
return None

def teardown(self):
with _get_conn(self.offline_store_config) as conn, conn.cursor() as cur:
for table in self.tables:
cur.execute("DROP TABLE IF EXISTS " + table)
PostgresSourceCreatorSingleton.teardown()
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@
"pytest-mock==1.10.4",
"Sphinx!=4.0.0,<4.4.0",
"sphinx-rtd-theme",
"testcontainers>=3.5",
"testcontainers[postgresql]>=3.5",
"adlfs==0.5.9",
"firebase-admin==4.5.2",
"pre-commit",
Expand Down