From 6b5586ff46e3daed0dc9a85c15d96ab184fe36be Mon Sep 17 00:00:00 2001 From: Robert Yi Date: Sat, 9 Jan 2021 00:21:41 -0800 Subject: [PATCH] feat: add splice machine support. (#128) * feat: add splice machine support. --- pipelines/requirements.txt | 1 + pipelines/setup.py | 2 +- .../test_splice_machine_metadata_extractor.py | 82 +++++++++++ .../splice_machine_metadata_extractor.py | 134 ++++++++++++++++++ .../splice_machine_metadata_extractor.sql | 11 ++ pipelines/whale/utils/extractor_wrappers.py | 21 +++ pipelines/whale/utils/task_wrappers.py | 2 + 7 files changed, 252 insertions(+), 1 deletion(-) create mode 100644 pipelines/tests/unit/extractor/test_splice_machine_metadata_extractor.py create mode 100644 pipelines/whale/extractor/splice_machine_metadata_extractor.py create mode 100644 pipelines/whale/extractor/splice_machine_metadata_extractor.sql diff --git a/pipelines/requirements.txt b/pipelines/requirements.txt index 0a51bf46..cb72d44d 100644 --- a/pipelines/requirements.txt +++ b/pipelines/requirements.txt @@ -73,6 +73,7 @@ six==1.15.0 slack-sdk==3.0.0 snowflake-connector-python==2.3.2 snowflake-sqlalchemy==1.2.3 +splicemachinesa==0.3.1 SQLAlchemy==1.3.19 statsd==3.3.0 termcolor==1.1.0 diff --git a/pipelines/setup.py b/pipelines/setup.py index d3c27a88..cddeedd0 100644 --- a/pipelines/setup.py +++ b/pipelines/setup.py @@ -8,7 +8,7 @@ setuptools.setup( name="whale-pipelines", - version="1.3.3", + version="1.4.0", author="Robert Yi", author_email="robert@ryi.me", description="A pared-down metadata scraper + SQL runner.", diff --git a/pipelines/tests/unit/extractor/test_splice_machine_metadata_extractor.py b/pipelines/tests/unit/extractor/test_splice_machine_metadata_extractor.py new file mode 100644 index 00000000..9ab3a958 --- /dev/null +++ b/pipelines/tests/unit/extractor/test_splice_machine_metadata_extractor.py @@ -0,0 +1,82 @@ +import logging +import unittest + +from mock import patch, MagicMock +from pyhocon import ConfigFactory + +from whale.extractor.splice_machine_metadata_extractor import SpliceMachineMetadataExtractor +from whale.extractor import splice_machine_metadata_extractor +from whale.models.table_metadata import TableMetadata, ColumnMetadata + + +class TestSpliceMachineMetadataExtractor(unittest.TestCase): + def setUp(self): + logging.basicConfig(level=logging.INFO) + self.Extractor = SpliceMachineMetadataExtractor + + self.DATABASE = "TEST_DATABASE" + self.CLUSTER = None + + config_dict = { + self.Extractor.HOST_KEY: "TEST_CONNECTION", + self.Extractor.DATABASE_KEY: self.DATABASE, + self.Extractor.CLUSTER_KEY: self.CLUSTER, + self.Extractor.USERNAME_KEY: "TEST_USERNAME", + self.Extractor.PASSWORD_KEY: "TEST_PASSWORD", + } + self.conf = ConfigFactory.from_dict(config_dict) + + def test_extraction_with_empty_result(self): + with patch.object(splice_machine_metadata_extractor, "splice_connect"): + extractor = self.Extractor() + extractor.init(self.conf) + results = extractor.extract() + self.assertEqual(results, None) + + def test_extraction_with_single_result(self): + with patch.object(splice_machine_metadata_extractor, "splice_connect") as mock_connect: + column = ColumnMetadata("column1", None, "int", 0) + table = TableMetadata( + self.DATABASE, + self.CLUSTER, + "test_schema", + "test_table", + None, + [column], + ) + + # Connection returns a cursor + mock_cursor = MagicMock() + mock_execute = MagicMock() + mock_fetchall = MagicMock() + + # self.connection = splice_connect(...) + mock_connection = MagicMock() + mock_connect.return_value = mock_connection + # self.cursor = self.connection.cursor() + mock_connection.cursor.return_value = mock_cursor + + # self.cursor.execute(...) + mock_cursor.execute = mock_execute + + # for row in self.cursor.fetchall() + mock_cursor.fetchall = mock_fetchall + + mock_fetchall.return_value = [ + [ + table.schema, + table.name, + "not-a-view", + column.name, + column.sort_order, + column.type, + ] + ] + + extractor = self.Extractor() + extractor.init(self.conf) + actual = extractor.extract() + expected = table + + self.assertEqual(expected.__repr__(), actual.__repr__()) + self.assertIsNone(extractor.extract()) diff --git a/pipelines/whale/extractor/splice_machine_metadata_extractor.py b/pipelines/whale/extractor/splice_machine_metadata_extractor.py new file mode 100644 index 00000000..4fe64f73 --- /dev/null +++ b/pipelines/whale/extractor/splice_machine_metadata_extractor.py @@ -0,0 +1,134 @@ +import logging +import os +from collections import namedtuple +from jinja2 import Environment, FileSystemLoader + +from pyhocon import ConfigFactory, ConfigTree +from typing import Any, Dict, Iterator, Optional + +from databuilder.extractor.base_extractor import Extractor +from whale.models.table_metadata import TableMetadata, ColumnMetadata +from itertools import groupby +from splicemachinesa.pyodbc import splice_connect + + +TableKey = namedtuple("TableKey", ["schema_name", "table_name"]) + +LOGGER = logging.getLogger(__name__) + + +class SpliceMachineMetadataExtractor(Extractor): + """ + Extracts SpliceMachine table and column metadata from underlying meta store + database using SQLAlchemyExtractor. + Requirements: + snowflake-connector-python + snowflake-sqlalchemy + """ + + WHERE_CLAUSE_SUFFIX_KEY = "where_clause_suffix" + DATABASE_KEY = "database" + CLUSTER_KEY = "cluster" + USERNAME_KEY = "username" + PASSWORD_KEY = "password" + HOST_KEY = "host" + + DEFAULT_CONFIG = ConfigFactory.from_dict( + { + WHERE_CLAUSE_SUFFIX_KEY: "", + DATABASE_KEY: "sm", + CLUSTER_KEY: "master", + } + ) + + def init(self, conf: ConfigTree) -> None: + self.conf = conf.with_fallback(SpliceMachineMetadataExtractor.DEFAULT_CONFIG) + self._database = self.conf.get_string(SpliceMachineMetadataExtractor.DATABASE_KEY) + self._cluster = self.conf.get_string(SpliceMachineMetadataExtractor.CLUSTER_KEY) + self._where_clause_suffix = self.conf.get_string(SpliceMachineMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY) + self._username = self.conf.get_string(SpliceMachineMetadataExtractor.USERNAME_KEY) + self._password = self.conf.get_string(SpliceMachineMetadataExtractor.PASSWORD_KEY) + self._host = self.conf.get_string(SpliceMachineMetadataExtractor.HOST_KEY) + + context = { + "where_clause_suffix": self._where_clause_suffix, + } + + j2_env = Environment(loader=FileSystemLoader(os.path.dirname(os.path.abspath(__file__))), trim_blocks=True) + self.sql_statement = j2_env.get_template('splice_machine_metadata_extractor.sql').render(context) + + LOGGER.info("SQL for splicemachine: {}".format(self.sql_statement)) + self._extract_iter = None + self.connection = splice_connect(self._username, self._password, self._host) + self.cursor = self.connection.cursor() + + def extract(self) -> Optional[TableMetadata]: + if not self._extract_iter: + self._extract_iter = self._get_extract_iter() + try: + return next(self._extract_iter) + except StopIteration: + return None + + def get_scope(self) -> str: + return "extractor.splicemachine" + + def _get_extract_iter(self) -> Iterator[TableMetadata]: + """ + Using itertools.groupby and raw level iterator, it groups to table and + yields TableMetadata + :return: + """ + + for _, group in groupby(self._get_raw_extract_iter(), self._get_table_key): + columns = [] + for row in group: + last_row = row + columns.append( + ColumnMetadata( + name=row["column_name"], + description=None, + col_type=row["column_type"], + sort_order=row["column_sort_order"], + ) + ) + + yield TableMetadata( + database=self._database, + cluster=None, + schema=last_row["schema_name"], + name=last_row["table_name"], + description=None, + columns=columns, + is_view=last_row["table_type"] == "V", + ) + + def _get_raw_extract_iter(self) -> Iterator[Dict[str, Any]]: + """ + Provides iterator of result row from SQLAlchemy extractor + :return: + """ + self.cursor.execute(self.sql_statement) + keys = [ + "schema_name", + "table_name", + "table_type", + "column_name", + "column_sort_order", + "column_type", + ] + + for row in self.cursor.fetchall(): + yield dict(zip(keys, row)) + + def _get_table_key(self, row: Dict[str, Any]) -> Optional[TableKey]: + """ + Table key consists of schema and table name + :param row: + :return: + """ + if row: + return TableKey(schema_name=row["schema_name"], table_name=row["table_name"]) + + return None + diff --git a/pipelines/whale/extractor/splice_machine_metadata_extractor.sql b/pipelines/whale/extractor/splice_machine_metadata_extractor.sql new file mode 100644 index 00000000..b08f6666 --- /dev/null +++ b/pipelines/whale/extractor/splice_machine_metadata_extractor.sql @@ -0,0 +1,11 @@ +SELECT + c.SCHEMANAME, + c.TABLENAME, + t.TABLETYPE, + c.COLUMNNAME, + c.COLUMNNUMBER, + varchar(COLUMNDATATYPE,5000) +FROM SYSVW.SYSCOLUMNSVIEW c +LEFT JOIN SYSVW.SYSTABLESVIEW t + ON c.SCHEMANAME = t.SCHEMANAME + AND c.TABLENAME = t.TABLENAME diff --git a/pipelines/whale/utils/extractor_wrappers.py b/pipelines/whale/utils/extractor_wrappers.py index 16961d87..8037254a 100644 --- a/pipelines/whale/utils/extractor_wrappers.py +++ b/pipelines/whale/utils/extractor_wrappers.py @@ -12,6 +12,7 @@ from whale.extractor.bigquery_watermark_extractor import BigQueryWatermarkExtractor from whale.extractor.glue_extractor import GlueExtractor from whale.extractor.snowflake_metadata_extractor import SnowflakeMetadataExtractor +from whale.extractor.splice_machine_metadata_extractor import SpliceMachineMetadataExtractor from whale.extractor.postgres_metadata_extractor import PostgresMetadataExtractor from whale.extractor.metric_runner import MetricRunner from whale.engine.sql_alchemy_engine import SQLAlchemyEngine @@ -252,6 +253,26 @@ def configure_snowflake_extractors(connection: ConnectionConfigSchema): return extractors, conf +def configure_splice_machine_extractors(connection: ConnectionConfigSchema): + Extractor = SpliceMachineMetadataExtractor + extractor = Extractor() + scope = extractor.get_scope() + + conf = ConfigFactory.from_dict( + { + f"{scope}.{Extractor.HOST_KEY}": connection.uri, + f"{scope}.{Extractor.USERNAME_KEY}": connection.username, + f"{scope}.{Extractor.PASSWORD_KEY}": connection.password, + f"{scope}.{Extractor.WHERE_CLAUSE_SUFFIX_KEY}": connection.where_clause_suffix, + } + ) + + extractors = [extractor] + # extractors, conf = add_metrics(extractors, conf, connection) + + return extractors, conf + + def configure_unscoped_sqlalchemy_engine(connection: ConnectionConfigSchema): Engine = SQLAlchemyEngine engine = Engine() diff --git a/pipelines/whale/utils/task_wrappers.py b/pipelines/whale/utils/task_wrappers.py index 1f4cfe55..7994c0cb 100644 --- a/pipelines/whale/utils/task_wrappers.py +++ b/pipelines/whale/utils/task_wrappers.py @@ -24,6 +24,7 @@ configure_presto_extractors, configure_redshift_extractors, configure_snowflake_extractors, + configure_splice_machine_extractors, configure_unscoped_sqlalchemy_engine, run_build_script, ) @@ -88,6 +89,7 @@ def pull(): "presto": configure_presto_extractors, "redshift": configure_redshift_extractors, "snowflake": configure_snowflake_extractors, + "splicemachine": configure_splice_machine_extractors, } if connection.metadata_source == "build_script":