Skip to content

Commit

Permalink
feat: add splice machine support. (#128)
Browse files Browse the repository at this point in the history
* feat: add splice machine support.
  • Loading branch information
rsyi authored Jan 9, 2021
1 parent a40f9fc commit 6b5586f
Show file tree
Hide file tree
Showing 7 changed files with 252 additions and 1 deletion.
1 change: 1 addition & 0 deletions pipelines/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ six==1.15.0
slack-sdk==3.0.0
snowflake-connector-python==2.3.2
snowflake-sqlalchemy==1.2.3
splicemachinesa==0.3.1
SQLAlchemy==1.3.19
statsd==3.3.0
termcolor==1.1.0
Expand Down
2 changes: 1 addition & 1 deletion pipelines/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

setuptools.setup(
name="whale-pipelines",
version="1.3.3",
version="1.4.0",
author="Robert Yi",
author_email="[email protected]",
description="A pared-down metadata scraper + SQL runner.",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import logging
import unittest

from mock import patch, MagicMock
from pyhocon import ConfigFactory

from whale.extractor.splice_machine_metadata_extractor import SpliceMachineMetadataExtractor
from whale.extractor import splice_machine_metadata_extractor
from whale.models.table_metadata import TableMetadata, ColumnMetadata


class TestSpliceMachineMetadataExtractor(unittest.TestCase):
def setUp(self):
logging.basicConfig(level=logging.INFO)
self.Extractor = SpliceMachineMetadataExtractor

self.DATABASE = "TEST_DATABASE"
self.CLUSTER = None

config_dict = {
self.Extractor.HOST_KEY: "TEST_CONNECTION",
self.Extractor.DATABASE_KEY: self.DATABASE,
self.Extractor.CLUSTER_KEY: self.CLUSTER,
self.Extractor.USERNAME_KEY: "TEST_USERNAME",
self.Extractor.PASSWORD_KEY: "TEST_PASSWORD",
}
self.conf = ConfigFactory.from_dict(config_dict)

def test_extraction_with_empty_result(self):
with patch.object(splice_machine_metadata_extractor, "splice_connect"):
extractor = self.Extractor()
extractor.init(self.conf)
results = extractor.extract()
self.assertEqual(results, None)

def test_extraction_with_single_result(self):
with patch.object(splice_machine_metadata_extractor, "splice_connect") as mock_connect:
column = ColumnMetadata("column1", None, "int", 0)
table = TableMetadata(
self.DATABASE,
self.CLUSTER,
"test_schema",
"test_table",
None,
[column],
)

# Connection returns a cursor
mock_cursor = MagicMock()
mock_execute = MagicMock()
mock_fetchall = MagicMock()

# self.connection = splice_connect(...)
mock_connection = MagicMock()
mock_connect.return_value = mock_connection
# self.cursor = self.connection.cursor()
mock_connection.cursor.return_value = mock_cursor

# self.cursor.execute(...)
mock_cursor.execute = mock_execute

# for row in self.cursor.fetchall()
mock_cursor.fetchall = mock_fetchall

mock_fetchall.return_value = [
[
table.schema,
table.name,
"not-a-view",
column.name,
column.sort_order,
column.type,
]
]

extractor = self.Extractor()
extractor.init(self.conf)
actual = extractor.extract()
expected = table

self.assertEqual(expected.__repr__(), actual.__repr__())
self.assertIsNone(extractor.extract())
134 changes: 134 additions & 0 deletions pipelines/whale/extractor/splice_machine_metadata_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import logging
import os
from collections import namedtuple
from jinja2 import Environment, FileSystemLoader

from pyhocon import ConfigFactory, ConfigTree
from typing import Any, Dict, Iterator, Optional

from databuilder.extractor.base_extractor import Extractor
from whale.models.table_metadata import TableMetadata, ColumnMetadata
from itertools import groupby
from splicemachinesa.pyodbc import splice_connect


TableKey = namedtuple("TableKey", ["schema_name", "table_name"])

LOGGER = logging.getLogger(__name__)


class SpliceMachineMetadataExtractor(Extractor):
"""
Extracts SpliceMachine table and column metadata from underlying meta store
database using SQLAlchemyExtractor.
Requirements:
snowflake-connector-python
snowflake-sqlalchemy
"""

WHERE_CLAUSE_SUFFIX_KEY = "where_clause_suffix"
DATABASE_KEY = "database"
CLUSTER_KEY = "cluster"
USERNAME_KEY = "username"
PASSWORD_KEY = "password"
HOST_KEY = "host"

DEFAULT_CONFIG = ConfigFactory.from_dict(
{
WHERE_CLAUSE_SUFFIX_KEY: "",
DATABASE_KEY: "sm",
CLUSTER_KEY: "master",
}
)

def init(self, conf: ConfigTree) -> None:
self.conf = conf.with_fallback(SpliceMachineMetadataExtractor.DEFAULT_CONFIG)
self._database = self.conf.get_string(SpliceMachineMetadataExtractor.DATABASE_KEY)
self._cluster = self.conf.get_string(SpliceMachineMetadataExtractor.CLUSTER_KEY)
self._where_clause_suffix = self.conf.get_string(SpliceMachineMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY)
self._username = self.conf.get_string(SpliceMachineMetadataExtractor.USERNAME_KEY)
self._password = self.conf.get_string(SpliceMachineMetadataExtractor.PASSWORD_KEY)
self._host = self.conf.get_string(SpliceMachineMetadataExtractor.HOST_KEY)

context = {
"where_clause_suffix": self._where_clause_suffix,
}

j2_env = Environment(loader=FileSystemLoader(os.path.dirname(os.path.abspath(__file__))), trim_blocks=True)
self.sql_statement = j2_env.get_template('splice_machine_metadata_extractor.sql').render(context)

LOGGER.info("SQL for splicemachine: {}".format(self.sql_statement))
self._extract_iter = None
self.connection = splice_connect(self._username, self._password, self._host)
self.cursor = self.connection.cursor()

def extract(self) -> Optional[TableMetadata]:
if not self._extract_iter:
self._extract_iter = self._get_extract_iter()
try:
return next(self._extract_iter)
except StopIteration:
return None

def get_scope(self) -> str:
return "extractor.splicemachine"

def _get_extract_iter(self) -> Iterator[TableMetadata]:
"""
Using itertools.groupby and raw level iterator, it groups to table and
yields TableMetadata
:return:
"""

for _, group in groupby(self._get_raw_extract_iter(), self._get_table_key):
columns = []
for row in group:
last_row = row
columns.append(
ColumnMetadata(
name=row["column_name"],
description=None,
col_type=row["column_type"],
sort_order=row["column_sort_order"],
)
)

yield TableMetadata(
database=self._database,
cluster=None,
schema=last_row["schema_name"],
name=last_row["table_name"],
description=None,
columns=columns,
is_view=last_row["table_type"] == "V",
)

def _get_raw_extract_iter(self) -> Iterator[Dict[str, Any]]:
"""
Provides iterator of result row from SQLAlchemy extractor
:return:
"""
self.cursor.execute(self.sql_statement)
keys = [
"schema_name",
"table_name",
"table_type",
"column_name",
"column_sort_order",
"column_type",
]

for row in self.cursor.fetchall():
yield dict(zip(keys, row))

def _get_table_key(self, row: Dict[str, Any]) -> Optional[TableKey]:
"""
Table key consists of schema and table name
:param row:
:return:
"""
if row:
return TableKey(schema_name=row["schema_name"], table_name=row["table_name"])

return None

11 changes: 11 additions & 0 deletions pipelines/whale/extractor/splice_machine_metadata_extractor.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
SELECT
c.SCHEMANAME,
c.TABLENAME,
t.TABLETYPE,
c.COLUMNNAME,
c.COLUMNNUMBER,
varchar(COLUMNDATATYPE,5000)
FROM SYSVW.SYSCOLUMNSVIEW c
LEFT JOIN SYSVW.SYSTABLESVIEW t
ON c.SCHEMANAME = t.SCHEMANAME
AND c.TABLENAME = t.TABLENAME
21 changes: 21 additions & 0 deletions pipelines/whale/utils/extractor_wrappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from whale.extractor.bigquery_watermark_extractor import BigQueryWatermarkExtractor
from whale.extractor.glue_extractor import GlueExtractor
from whale.extractor.snowflake_metadata_extractor import SnowflakeMetadataExtractor
from whale.extractor.splice_machine_metadata_extractor import SpliceMachineMetadataExtractor
from whale.extractor.postgres_metadata_extractor import PostgresMetadataExtractor
from whale.extractor.metric_runner import MetricRunner
from whale.engine.sql_alchemy_engine import SQLAlchemyEngine
Expand Down Expand Up @@ -252,6 +253,26 @@ def configure_snowflake_extractors(connection: ConnectionConfigSchema):
return extractors, conf


def configure_splice_machine_extractors(connection: ConnectionConfigSchema):
Extractor = SpliceMachineMetadataExtractor
extractor = Extractor()
scope = extractor.get_scope()

conf = ConfigFactory.from_dict(
{
f"{scope}.{Extractor.HOST_KEY}": connection.uri,
f"{scope}.{Extractor.USERNAME_KEY}": connection.username,
f"{scope}.{Extractor.PASSWORD_KEY}": connection.password,
f"{scope}.{Extractor.WHERE_CLAUSE_SUFFIX_KEY}": connection.where_clause_suffix,
}
)

extractors = [extractor]
# extractors, conf = add_metrics(extractors, conf, connection)

return extractors, conf


def configure_unscoped_sqlalchemy_engine(connection: ConnectionConfigSchema):
Engine = SQLAlchemyEngine
engine = Engine()
Expand Down
2 changes: 2 additions & 0 deletions pipelines/whale/utils/task_wrappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
configure_presto_extractors,
configure_redshift_extractors,
configure_snowflake_extractors,
configure_splice_machine_extractors,
configure_unscoped_sqlalchemy_engine,
run_build_script,
)
Expand Down Expand Up @@ -88,6 +89,7 @@ def pull():
"presto": configure_presto_extractors,
"redshift": configure_redshift_extractors,
"snowflake": configure_snowflake_extractors,
"splicemachine": configure_splice_machine_extractors,
}

if connection.metadata_source == "build_script":
Expand Down

0 comments on commit 6b5586f

Please sign in to comment.