From bb8110d58619428b152e48d28669065ced1e2374 Mon Sep 17 00:00:00 2001 From: ssmith Date: Thu, 31 Oct 2024 15:41:42 -0500 Subject: [PATCH 1/5] update hive proxy client to use impyla instead of pyhive --- apollo/integrations/db/hive_proxy_client.py | 88 ++++----------------- requirements.in | 8 +- requirements.txt | 20 +++-- tests/test_hive_client.py | 13 ++- 4 files changed, 35 insertions(+), 94 deletions(-) diff --git a/apollo/integrations/db/hive_proxy_client.py b/apollo/integrations/db/hive_proxy_client.py index 0b5db12..505be28 100644 --- a/apollo/integrations/db/hive_proxy_client.py +++ b/apollo/integrations/db/hive_proxy_client.py @@ -1,65 +1,37 @@ -import time -from base64 import standard_b64encode from typing import ( Any, Dict, Optional, ) -from pyhive import hive -from thrift.transport import THttpClient -from TCLIService.ttypes import TOperationState +from impala import dbapi +from impala.hiveserver2 import HiveServer2Connection -from apollo.agent.models import AgentError from apollo.integrations.db.base_db_proxy_client import BaseDbProxyClient _ATTR_CONNECT_ARGS = "connect_args" -class HiveProxyCursor(hive.Cursor): - def async_execute(self, query: str, timeout: int, **kwargs: Any) -> None: # noqa - start_time = time.time() - - self.execute(query, async_=True) - - pending_states = ( - TOperationState.INITIALIZED_STATE, - TOperationState.PENDING_STATE, - TOperationState.RUNNING_STATE, - ) - time_passed = 0 - while self.poll().operationState in pending_states: - time_passed = time.time() - start_time - if time_passed > timeout: - self.cancel() - break - time.sleep(10) - - resp = self.poll() - if resp.operationState == TOperationState.ERROR_STATE: - msg = "Query failed, see cluster logs for details" - if time_passed > 0: - msg += f" (runtime: {time_passed}s)" - raise AgentError(msg, query, resp) - elif resp.operationState == TOperationState.CANCELED_STATE: - raise AgentError(f"Time out executing query: {time_passed}s", query, resp) - - -class HiveProxyConnection(hive.Connection): - def cursor(self, *args: Any, **kwargs: Any): - return HiveProxyCursor(self, *args, **kwargs) +class HiveProxyConnection(HiveServer2Connection): + def cursor(self): + # If close_finished_queries is true, impala will close every query once a DDL/DML query execution is finished + # or all rows are fetched. It will also call GetLog() before closing the query to get query metadata from Hive. + # GetLog() is not available for spark databricks causing this to break. + # + # Setting close_finished_queries to false will only close queries when execute() is called again + # or the cursor is closed. GetLog() is not automatically called so spark databricks works. + # With False the cursor will not have a rowcount for DML statements, this is fine for MC. + # https://github.com/cloudera/impyla/blob/e4c76169f7e5765c09b11c92fceb862dbb9b72be/impala/hiveserver2.py#L122 + return super().cursor(self, close_finished_queries=False) class HiveProxyClient(BaseDbProxyClient): """ Proxy client for Hive. Credentials are expected to be supplied under "connect_args" and will be passed directly to `hive.Connection`, so only attributes supported as parameters by - `hive.Connection` should be passed. If "mode" is not set to "binary", then the "connect_args" - will be used to create a new thrift transport that will be passed to `hive.Connection`. + `hive.Connection` should be passed. """ - _MODE_BINARY = "binary" - def __init__(self, credentials: Optional[Dict], **kwargs: Any): # noqa super().__init__(connection_type="hive") if not credentials or _ATTR_CONNECT_ARGS not in credentials: @@ -67,35 +39,9 @@ def __init__(self, credentials: Optional[Dict], **kwargs: Any): # noqa f"Hive agent client requires {_ATTR_CONNECT_ARGS} in credentials" ) - if credentials.get("mode") != self._MODE_BINARY: - self._connection = self._create_http_connection( - **credentials[_ATTR_CONNECT_ARGS] - ) - else: - self._connection = HiveProxyConnection(**credentials[_ATTR_CONNECT_ARGS]) - - @classmethod - def _create_http_connection( - cls, - url: str, - username: str, - password: str, - user_agent: Optional[str] = None, - **kwargs: Any, # noqa - ) -> hive.Connection: - transport = THttpClient.THttpClient(url) - - auth = standard_b64encode(f"{username}:{password}".encode()).decode() - headers = dict(Authorization=f"Basic {auth}") - if user_agent: - headers["User-Agent"] = user_agent - - transport.setCustomHeaders(headers) - - try: - return HiveProxyConnection(thrift_transport=transport) - except EOFError or MemoryError: - raise AgentError("Error creating connection - credentials might be invalid") + self._connection = HiveProxyConnection( + dbapi.connect(**credentials[_ATTR_CONNECT_ARGS]) + ) @property def wrapped_client(self): diff --git a/requirements.in b/requirements.in index cb27ae6..56db4a8 100644 --- a/requirements.in +++ b/requirements.in @@ -39,9 +39,7 @@ tableauserverclient @ git+https://github.com/tableau/server-client-python.git@ma teradatasql>=20.0.0.15 oscrypto @ git+https://github.com/wbond/oscrypto@master -# Note: 'pyhive[hive]' extras uses sasl that does not support Python 3.11, -# See https://github.com/cloudera/python-sasl/issues/30. Hence PyHive also supports -# pure-sasl via additional extras 'pyhive[hive_pure_sasl]' which support Python 3.11. -pyhive[hive_pure_sasl]==0.7.0 ; python_version >= "3.11" -pyhive[hive]==0.6.5 ; python_version < "3.11" +# Note this is a beta version of impyla that is needed in order to support HTTPS connections on python 3.12. +# It should be updated to stable version 0.20.0 once that is released. +impyla==0.20a1 werkzeug==3.0.3 diff --git a/requirements.txt b/requirements.txt index 0d0a15a..c73f7cd 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ # -# This file is autogenerated by pip-compile with Python 3.11 +# This file is autogenerated by pip-compile with Python 3.12 # by the following command: # # pip-compile requirements.in @@ -29,6 +29,8 @@ azure-mgmt-storage==21.2.1 # via -r requirements.in azure-storage-blob==12.23.0 # via -r requirements.in +bitarray==3.0.0 + # via impyla blinker==1.8.2 # via flask boto3==1.34.151 @@ -89,8 +91,6 @@ flask==2.3.3 # flask-compress flask-compress==1.14 # via -r requirements.in -future==1.0.0 - # via pyhive google-api-core==2.19.1 # via # google-api-python-client @@ -129,6 +129,8 @@ idna==3.7 # via # requests # snowflake-connector-python +impyla==0.20a1 + # via -r requirements.in isodate==0.6.1 # via # azure-mgmt-storage @@ -206,9 +208,7 @@ protobuf==4.25.3 psycopg2-binary==2.9.9 # via -r requirements.in pure-sasl==0.6.2 - # via - # pyhive - # thrift-sasl + # via thrift-sasl pyarrow==14.0.1 # via # -r requirements.in @@ -225,8 +225,6 @@ pycryptodome==3.20.0 # via # -r requirements.in # teradatasql -pyhive[hive-pure-sasl]==0.7.0 ; python_version >= "3.11" - # via -r requirements.in pyjwt[crypto]==2.8.0 # via # -r requirements.in @@ -246,7 +244,6 @@ python-dateutil==2.9.0.post0 # via # botocore # pandas - # pyhive pytz==2024.1 # via # pandas @@ -274,6 +271,7 @@ s3transfer==0.10.2 six==1.16.0 # via # azure-core + # impyla # isodate # presto-python-client # python-dateutil @@ -294,10 +292,10 @@ teradatasql==20.0.0.15 thrift==0.16.0 # via # databricks-sql-connector - # pyhive + # impyla # thrift-sasl thrift-sasl==0.4.3 - # via pyhive + # via impyla tomlkit==0.12.5 # via snowflake-connector-python typing-extensions==4.12.2 diff --git a/tests/test_hive_client.py b/tests/test_hive_client.py index 898d672..2071fa7 100644 --- a/tests/test_hive_client.py +++ b/tests/test_hive_client.py @@ -22,9 +22,11 @@ _HIVE_CREDENTIALS = { "host": "localhost", "port": "10000", - "username": "foo", + "user": "foo", "database": "fizz", - "auth": None, + "auth_mechanism": "PLAIN", + "timeout": 870, + "use_ssl": False, } @@ -70,7 +72,7 @@ def _test_run_query( {"method": "cursor", "store": "_cursor"}, { "target": "_cursor", - "method": "async_execute", + "method": "execute", "args": [ query, None, @@ -104,10 +106,7 @@ def _test_run_query( "hive", "run_query", operation_dict, - { - "connect_args": _HIVE_CREDENTIALS, - "mode": "binary", - }, + {"connect_args": _HIVE_CREDENTIALS}, ) if raise_exception: From 4f8b838d4d64c5617ea732aeda6efd7f19552272 Mon Sep 17 00:00:00 2001 From: ssmith Date: Thu, 31 Oct 2024 15:58:11 -0500 Subject: [PATCH 2/5] add dbapi mock --- tests/test_hive_client.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/test_hive_client.py b/tests/test_hive_client.py index 2071fa7..5028a36 100644 --- a/tests/test_hive_client.py +++ b/tests/test_hive_client.py @@ -8,7 +8,7 @@ from unittest.mock import ( Mock, call, - patch, + patch, MagicMock, ) from apollo.agent.agent import Agent @@ -38,7 +38,8 @@ def setUp(self) -> None: self._mock_connection.cursor.return_value = self._mock_cursor @patch("apollo.integrations.db.hive_proxy_client.HiveProxyConnection") - def test_query(self, mock_connect): + @patch("apollo.integrations.db.hive_proxy_client.dbapi.connect") + def test_query(self, mock_dbapi_connect, mock_connect): query = "SELECT idx, value FROM table" # noqa expected_data = [ [ From f510c4f5f991d914115a4bac8531922d17742e00 Mon Sep 17 00:00:00 2001 From: ssmith Date: Thu, 31 Oct 2024 16:07:39 -0500 Subject: [PATCH 3/5] formatting and test fix --- tests/test_hive_client.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/tests/test_hive_client.py b/tests/test_hive_client.py index 5028a36..9a851ec 100644 --- a/tests/test_hive_client.py +++ b/tests/test_hive_client.py @@ -8,7 +8,8 @@ from unittest.mock import ( Mock, call, - patch, MagicMock, + patch, + MagicMock, ) from apollo.agent.agent import Agent @@ -55,11 +56,14 @@ def test_query(self, mock_dbapi_connect, mock_connect): ["idx", "integer", None, None, None, None, None], ["value", "float", None, None, None, None, None], ] - self._test_run_query(mock_connect, query, expected_data, expected_description) + self._test_run_query( + mock_connect, mock_dbapi_connect, query, expected_data, expected_description + ) def _test_run_query( self, mock_connect: Mock, + mock_dbapi_connect: Mock, query: str, data: List, description: List, @@ -123,8 +127,8 @@ def _test_run_query( self.assertTrue(ATTRIBUTE_NAME_RESULT in response.result) result = response.result.get(ATTRIBUTE_NAME_RESULT) - mock_connect.assert_called_with(**_HIVE_CREDENTIALS) - self._mock_cursor.async_execute.assert_has_calls( + mock_dbapi_connect.assert_called_with(**_HIVE_CREDENTIALS) + self._mock_cursor.execute.assert_has_calls( [ call(query, None), ] From 0e11e9f0c6f6b64e14397e32018e296642b3174c Mon Sep 17 00:00:00 2001 From: ssmith Date: Mon, 4 Nov 2024 11:08:41 -0600 Subject: [PATCH 4/5] fix hive proxy cursor method --- apollo/integrations/db/hive_proxy_client.py | 29 +++++++++------------ 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/apollo/integrations/db/hive_proxy_client.py b/apollo/integrations/db/hive_proxy_client.py index 505be28..3464fb3 100644 --- a/apollo/integrations/db/hive_proxy_client.py +++ b/apollo/integrations/db/hive_proxy_client.py @@ -5,26 +5,12 @@ ) from impala import dbapi -from impala.hiveserver2 import HiveServer2Connection from apollo.integrations.db.base_db_proxy_client import BaseDbProxyClient _ATTR_CONNECT_ARGS = "connect_args" -class HiveProxyConnection(HiveServer2Connection): - def cursor(self): - # If close_finished_queries is true, impala will close every query once a DDL/DML query execution is finished - # or all rows are fetched. It will also call GetLog() before closing the query to get query metadata from Hive. - # GetLog() is not available for spark databricks causing this to break. - # - # Setting close_finished_queries to false will only close queries when execute() is called again - # or the cursor is closed. GetLog() is not automatically called so spark databricks works. - # With False the cursor will not have a rowcount for DML statements, this is fine for MC. - # https://github.com/cloudera/impyla/blob/e4c76169f7e5765c09b11c92fceb862dbb9b72be/impala/hiveserver2.py#L122 - return super().cursor(self, close_finished_queries=False) - - class HiveProxyClient(BaseDbProxyClient): """ Proxy client for Hive. Credentials are expected to be supplied under "connect_args" and @@ -39,9 +25,18 @@ def __init__(self, credentials: Optional[Dict], **kwargs: Any): # noqa f"Hive agent client requires {_ATTR_CONNECT_ARGS} in credentials" ) - self._connection = HiveProxyConnection( - dbapi.connect(**credentials[_ATTR_CONNECT_ARGS]) - ) + self._connection = dbapi.connect(**credentials[_ATTR_CONNECT_ARGS]) + + def cursor(self): + # If close_finished_queries is true, impala will close every query once a DDL/DML query execution is finished + # or all rows are fetched. It will also call GetLog() before closing the query to get query metadata from Hive. + # GetLog() is not available for spark databricks causing this to break. + # + # Setting close_finished_queries to false will only close queries when execute() is called again + # or the cursor is closed. GetLog() is not automatically called so spark databricks works. + # With False the cursor will not have a rowcount for DML statements, this is fine for MC. + # https://github.com/cloudera/impyla/blob/e4c76169f7e5765c09b11c92fceb862dbb9b72be/impala/hiveserver2.py#L122 + return self._connection.cursor(close_finished_queries=False) @property def wrapped_client(self): From 49d3ef36439b003862c0acde79e91d3e3c71cd64 Mon Sep 17 00:00:00 2001 From: ssmith Date: Mon, 4 Nov 2024 11:22:49 -0600 Subject: [PATCH 5/5] update hive client unit tests --- tests/test_hive_client.py | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/tests/test_hive_client.py b/tests/test_hive_client.py index 9a851ec..cfa6742 100644 --- a/tests/test_hive_client.py +++ b/tests/test_hive_client.py @@ -5,12 +5,7 @@ Optional, ) from unittest import TestCase -from unittest.mock import ( - Mock, - call, - patch, - MagicMock, -) +from unittest.mock import Mock, call, patch from apollo.agent.agent import Agent from apollo.agent.constants import ( @@ -38,9 +33,8 @@ def setUp(self) -> None: self._mock_cursor = Mock() self._mock_connection.cursor.return_value = self._mock_cursor - @patch("apollo.integrations.db.hive_proxy_client.HiveProxyConnection") @patch("apollo.integrations.db.hive_proxy_client.dbapi.connect") - def test_query(self, mock_dbapi_connect, mock_connect): + def test_query(self, mock_connect): query = "SELECT idx, value FROM table" # noqa expected_data = [ [ @@ -56,14 +50,11 @@ def test_query(self, mock_dbapi_connect, mock_connect): ["idx", "integer", None, None, None, None, None], ["value", "float", None, None, None, None, None], ] - self._test_run_query( - mock_connect, mock_dbapi_connect, query, expected_data, expected_description - ) + self._test_run_query(mock_connect, query, expected_data, expected_description) def _test_run_query( self, mock_connect: Mock, - mock_dbapi_connect: Mock, query: str, data: List, description: List, @@ -127,7 +118,7 @@ def _test_run_query( self.assertTrue(ATTRIBUTE_NAME_RESULT in response.result) result = response.result.get(ATTRIBUTE_NAME_RESULT) - mock_dbapi_connect.assert_called_with(**_HIVE_CREDENTIALS) + mock_connect.assert_called_with(**_HIVE_CREDENTIALS) self._mock_cursor.execute.assert_has_calls( [ call(query, None),