Skip to content

Commit

Permalink
Add index metadata for PostgreSQL (#144)
Browse files Browse the repository at this point in the history
* Adding index meta data for Postgres

* Avoid writing indexes multiple times
  • Loading branch information
peterthesling authored Mar 1, 2021
1 parent 8b4917c commit ce95bba
Show file tree
Hide file tree
Showing 12 changed files with 909 additions and 25 deletions.
324 changes: 324 additions & 0 deletions pipelines/tests/unit/extractor/test_postgres_index_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,324 @@
import logging
import unittest
from typing import Any, Dict
from mock import MagicMock, patch
from pyhocon import ConfigFactory

from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor

from whale.extractor.postgres_index_extractor import PostgresIndexExtractor
from whale.models.index_metadata import TableIndexesMetadata, IndexMetadata


class TestPostgresIndexExtractor(unittest.TestCase):
def setUp(self) -> None:
logging.basicConfig(level=logging.INFO)

config_dict = {
f"extractor.sqlalchemy.{SQLAlchemyExtractor.CONN_STRING}": "TEST_CONNECTION",
PostgresIndexExtractor.CLUSTER_KEY: "MY_CLUSTER",
PostgresIndexExtractor.USE_CATALOG_AS_CLUSTER_NAME: False,
PostgresIndexExtractor.DATABASE_KEY: "postgres",
}

self.conf = ConfigFactory.from_dict(config_dict)

def test_extraction_with_empty_query_result(self) -> None:
with patch.object(SQLAlchemyExtractor, "_get_connection"):
extractor = PostgresIndexExtractor()
extractor.init(self.conf)

results = extractor.extract()
self.assertEqual(results, None)

def test_extraction_with_single_result(self) -> None:
with patch.object(SQLAlchemyExtractor, "_get_connection") as mock_connection:
connection = MagicMock()
mock_connection.return_value = connection
sql_execute = MagicMock()
connection.execute = sql_execute

table = {
"cluster": self.conf[PostgresIndexExtractor.CLUSTER_KEY],
"schema": "test_schema",
"table": "test_table",
}

sql_execute.return_value = [
self._union(
{
"index_name": "idx_1",
"is_primary": False,
"is_clustered": False,
"is_unique": True,
"column_name": "idx_column_1",
},
table,
),
self._union(
{
"index_name": "idx_1",
"is_primary": False,
"is_clustered": False,
"is_unique": True,
"column_name": "idx_column_2",
},
table,
),
]

extractor = PostgresIndexExtractor()
extractor.init(self.conf)
actual = extractor.extract()

expected_index = IndexMetadata(
name="idx_1",
description=None,
index_type=None,
architecture=None,
constraint="unique",
columns=["idx_column_1", "idx_column_2"],
tags=None,
)

expected = TableIndexesMetadata(
database="postgres",
cluster="MY_CLUSTER",
schema="test_schema",
table="test_table",
indexes=[expected_index],
)

self.assertEqual(expected.__repr__(), actual.__repr__())
self.assertIsNone(extractor.extract())

def test_extraction_with_multiple_result(self) -> None:
with patch.object(SQLAlchemyExtractor, "_get_connection") as mock_connection:
connection = MagicMock()
mock_connection.return_value = connection
sql_execute = MagicMock()
connection.execute = sql_execute

table_1 = {
"cluster": self.conf[PostgresIndexExtractor.CLUSTER_KEY],
"schema": "test_schema_1",
"table": "test_table_1",
}

table_2 = {
"cluster": self.conf[PostgresIndexExtractor.CLUSTER_KEY],
"schema": "test_schema_2",
"table": "test_table_2",
}

sql_execute.return_value = [
self._union(
{
"index_name": "idx_1",
"is_primary": False,
"is_clustered": False,
"is_unique": True,
"column_name": "idx_1_column_1",
},
table_1,
),
self._union(
{
"index_name": "idx_1",
"is_primary": False,
"is_clustered": False,
"is_unique": True,
"column_name": "idx_1_column_2",
},
table_1,
),
self._union(
{
"index_name": "idx_2",
"is_primary": False,
"is_clustered": False,
"is_unique": True,
"column_name": "idx_2_column_1",
},
table_1,
),
self._union(
{
"index_name": "idx_3",
"is_primary": False,
"is_clustered": False,
"is_unique": True,
"column_name": "idx_3_column_1",
},
table_2,
),
self._union(
{
"index_name": "idx_3",
"is_primary": False,
"is_clustered": False,
"is_unique": True,
"column_name": "idx_3_column_2",
},
table_2,
),
]

extractor = PostgresIndexExtractor()
extractor.init(self.conf)

expected_index_1 = IndexMetadata(
name="idx_1",
description=None,
index_type=None,
architecture=None,
constraint="unique",
columns=["idx_1_column_1", "idx_1_column_2"],
tags=None,
)

expected_index_2 = IndexMetadata(
name="idx_2",
description=None,
index_type=None,
architecture=None,
constraint="unique",
columns=["idx_2_column_1"],
tags=None,
)

expected = TableIndexesMetadata(
database="postgres",
cluster="MY_CLUSTER",
schema="test_schema_1",
table="test_table_1",
indexes=[expected_index_1, expected_index_2],
)

self.assertEqual(expected.__repr__(), extractor.extract().__repr__())

expected_index = IndexMetadata(
name="idx_3",
description=None,
index_type=None,
architecture=None,
constraint="unique",
columns=["idx_3_column_1", "idx_3_column_2"],
tags=None,
)

expected = TableIndexesMetadata(
database="postgres",
cluster="MY_CLUSTER",
schema="test_schema_2",
table="test_table_2",
indexes=[expected_index],
)

self.assertEqual(expected.__repr__(), extractor.extract().__repr__())
self.assertIsNone(extractor.extract())

def _union(self, target: Dict[Any, Any], extra: Dict[Any, Any]) -> Dict[Any, Any]:
target.update(extra)
return target


class TestPostgresIndexExtractorWithWhereClause(unittest.TestCase):
def setUp(self) -> None:
logging.basicConfig(level=logging.INFO)
self.where_clause_suffix = """
where table_schema in ('public') and table_name = 'movies'
"""

config_dict = {
PostgresIndexExtractor.WHERE_CLAUSE_SUFFIX_KEY: self.where_clause_suffix,
PostgresIndexExtractor.DATABASE_KEY: "postgres",
f"extractor.sqlalchemy.{SQLAlchemyExtractor.CONN_STRING}": "TEST_CONNECTION",
}
self.conf = ConfigFactory.from_dict(config_dict)

def test_sql_statement(self) -> None:
"""
Test Extraction with empty result from query
"""
with patch.object(SQLAlchemyExtractor, "_get_connection"):
extractor = PostgresIndexExtractor()
extractor.init(self.conf)
self.assertTrue(self.where_clause_suffix in extractor.sql_stmt)


class TestPostgresIndexExtractorClusterKeyNoTableCatalog(unittest.TestCase):
# test when USE_CATALOG_AS_CLUSTER_NAME is false and CLUSTER_KEY is specified
def setUp(self) -> None:
logging.basicConfig(level=logging.INFO)
self.cluster_key = "not_master"

config_dict = {
PostgresIndexExtractor.CLUSTER_KEY: self.cluster_key,
PostgresIndexExtractor.DATABASE_KEY: "postgres",
f"extractor.sqlalchemy.{SQLAlchemyExtractor.CONN_STRING}": "TEST_CONNECTION",
PostgresIndexExtractor.USE_CATALOG_AS_CLUSTER_NAME: False,
}
self.conf = ConfigFactory.from_dict(config_dict)

def test_sql_statement(self) -> None:
"""
Test Extraction with empty result from query
"""
with patch.object(SQLAlchemyExtractor, "_get_connection"):
extractor = PostgresIndexExtractor()
extractor.init(self.conf)
self.assertTrue(self.cluster_key in extractor.sql_stmt)


class TestPostgresIndexExtractorNoClusterKeyNoTableCatalog(unittest.TestCase):
# test when USE_CATALOG_AS_CLUSTER_NAME is false and CLUSTER_KEY is NOT specified
def setUp(self) -> None:
logging.basicConfig(level=logging.INFO)

config_dict = {
f"extractor.sqlalchemy.{SQLAlchemyExtractor.CONN_STRING}": "TEST_CONNECTION",
PostgresIndexExtractor.DATABASE_KEY: "postgres",
PostgresIndexExtractor.USE_CATALOG_AS_CLUSTER_NAME: False,
}
self.conf = ConfigFactory.from_dict(config_dict)

def test_sql_statement(self) -> None:
"""
Test Extraction with empty result from query
"""
with patch.object(SQLAlchemyExtractor, "_get_connection"):
extractor = PostgresIndexExtractor()
extractor.init(self.conf)
self.assertTrue(
PostgresIndexExtractor.DEFAULT_CLUSTER_NAME in extractor.sql_stmt
)


class TestPostgresIndexExtractorTableCatalogEnabled(unittest.TestCase):
# test when USE_CATALOG_AS_CLUSTER_NAME is true (CLUSTER_KEY should be ignored)
def setUp(self) -> None:
logging.basicConfig(level=logging.INFO)
self.cluster_key = "not_master"

config_dict = {
PostgresIndexExtractor.CLUSTER_KEY: self.cluster_key,
PostgresIndexExtractor.DATABASE_KEY: "postgres",
f"extractor.sqlalchemy.{SQLAlchemyExtractor.CONN_STRING}": "TEST_CONNECTION",
PostgresIndexExtractor.USE_CATALOG_AS_CLUSTER_NAME: True,
}
self.conf = ConfigFactory.from_dict(config_dict)

def test_sql_statement(self) -> None:
"""
Test Extraction with empty result from query
"""
with patch.object(SQLAlchemyExtractor, "_get_connection"):
extractor = PostgresIndexExtractor()
extractor.init(self.conf)
self.assertTrue("table_catalog" in extractor.sql_stmt)
self.assertFalse(self.cluster_key in extractor.sql_stmt)


if __name__ == "__main__":
unittest.main()
33 changes: 33 additions & 0 deletions pipelines/tests/unit/loader/test_whale_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import Dict, Iterable, Any, Callable # noqa: F401

from whale.models.table_metadata import TableMetadata
from whale.models.index_metadata import TableIndexesMetadata, IndexMetadata
from whale.loader import whale_loader
from whale.utils import paths

Expand Down Expand Up @@ -74,3 +75,35 @@ def test_load_catalog_specified(patched_config):
assert "mock_table" in written_record
assert "mock_catalog" in written_record
assert "mock_database" in written_record

def test_load_index_metadata(patched_config):
index_metadata = IndexMetadata(
name="mock_index",
columns=["mock_column_1", "mock_column_2"],
)

record = TableIndexesMetadata(
database="mock_database",
cluster="mock_catalog",
schema="mock_schema",
table="mock_table",
indexes=[index_metadata],
)

loader = whale_loader.WhaleLoader()
loader.init(patched_config)
loader.load(record)

loader.close()
file_path = os.path.join(
patched_config.get("base_directory"),
"mock_database/mock_catalog.mock_schema.mock_table.md",
)
with open(file_path, "r") as f:
written_record = f.read()

assert "mock_schema" in written_record
assert "mock_table" in written_record
assert "mock_catalog" in written_record
assert "mock_database" in written_record
assert "mock_index" in written_record
Loading

0 comments on commit ce95bba

Please sign in to comment.