Skip to content

Commit

Permalink
SnowFlake Integration for EvaDB (#1289)
Browse files Browse the repository at this point in the history
Please suggest if this feature needs more test cases

---------

Co-authored-by: Lohith K S <[email protected]>
  • Loading branch information
kslohith and Lohith K S authored Oct 20, 2023
1 parent a6fdd6a commit c637a71
Show file tree
Hide file tree
Showing 8 changed files with 418 additions and 0 deletions.
1 change: 1 addition & 0 deletions docs/_toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ parts:
- file: source/reference/databases/mariadb
- file: source/reference/databases/clickhouse
- file: source/reference/databases/github
- file: source/reference/databases/snowflake

- file: source/reference/vector_databases/index
title: Vector Databases
Expand Down
47 changes: 47 additions & 0 deletions docs/source/reference/databases/snowflake.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
Snowflake
==========

The connection to Snowflake is based on the `snowflake-connector-python <https://pypi.org/project/snowflake-connector-python/>`_ library.

Dependency
----------

* snowflake-connector-python

Parameters
----------

Required:

* `user` is the database user.
* `password` is the snowflake account password.
* `database` is the database name.
* `warehouse` is the snowflake warehouse name.
* `account` is the snowflake account number ( can be found in the url ).
* `schema` is the schema name.


.. warning::

Provide the parameters of an already running ``Snowflake`` Data Warehouse. EvaDB only connects to an existing ``Snowflake`` Data Warehouse.

Create Connection
-----------------

.. code-block:: text
CREATE DATABASE snowflake_data WITH ENGINE = 'snowflake', PARAMETERS = {
"user": "<username>",
"password": "<my_password>"
"account": "<account_number>",
"database": "EVADB",
"warehouse": "COMPUTE_WH",
"schema": "SAMPLE_DATA"
};
.. warning::

| In Snowflake Terminology, ``Database`` and ``Schema`` refer to the following.
| A database is a logical grouping of schemas. Each database belongs to a single Snowflake account.
| A schema is a logical grouping of database objects (tables, views, etc.). Each schema belongs to a single database.
2 changes: 2 additions & 0 deletions evadb/third_party/databases/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ def _get_database_handler(engine: str, **kwargs):
return mod.MariaDbHandler(engine, **kwargs)
elif engine == "clickhouse":
return mod.ClickHouseHandler(engine, **kwargs)
elif engine == "snowflake":
return mod.SnowFlakeDbHandler(engine, **kwargs)
elif engine == "github":
return mod.GithubHandler(engine, **kwargs)
else:
Expand Down
15 changes: 15 additions & 0 deletions evadb/third_party/databases/snowflake/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# coding=utf-8
# Copyright 2018-2023 EvaDB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""snowflake integrations"""
3 changes: 3 additions & 0 deletions evadb/third_party/databases/snowflake/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
snowflake-connector-python
pyarrow
pandas
184 changes: 184 additions & 0 deletions evadb/third_party/databases/snowflake/snowflake_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
# coding=utf-8
# Copyright 2018-2023 EvaDB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime

import pandas as pd
import snowflake.connector

from evadb.third_party.databases.types import (
DBHandler,
DBHandlerResponse,
DBHandlerStatus,
)


class SnowFlakeDbHandler(DBHandler):

"""
Class for implementing the SnowFlake DB handler as a backend store for
EvaDB.
"""

def __init__(self, name: str, **kwargs):
"""
Initialize the handler.
Args:
name (str): name of the DB handler instance
**kwargs: arbitrary keyword arguments for establishing the connection.
"""
super().__init__(name)
self.user = kwargs.get("user")
self.password = kwargs.get("password")
self.database = kwargs.get("database")
self.warehouse = kwargs.get("warehouse")
self.account = kwargs.get("account")
self.schema = kwargs.get("schema")

def connect(self):
"""
Establish connection to the database.
Returns:
DBHandlerStatus
"""
try:
self.connection = snowflake.connector.connect(
user=self.user,
password=self.password,
database=self.database,
warehouse=self.warehouse,
schema=self.schema,
account=self.account,
)
# Auto commit is off by default.
self.connection.autocommit = True
return DBHandlerStatus(status=True)
except snowflake.connector.errors.Error as e:
return DBHandlerStatus(status=False, error=str(e))

def disconnect(self):
"""
Disconnect from the database.
"""
if self.connection:
self.connection.close()

def check_connection(self) -> DBHandlerStatus:
"""
Method for checking the status of database connection.
Returns:
DBHandlerStatus
"""
if self.connection:
return DBHandlerStatus(status=True)
else:
return DBHandlerStatus(status=False, error="Not connected to the database.")

def get_tables(self) -> DBHandlerResponse:
"""
Method to get the list of tables from database.
Returns:
DBHandlerStatus
"""
if not self.connection:
return DBHandlerResponse(data=None, error="Not connected to the database.")

try:
query = f"SELECT table_name as table_name FROM information_schema.tables WHERE table_schema='{self.schema}'"
cursor = self.connection.cursor()
cursor.execute(query)
tables_df = self._fetch_results_as_df(cursor)
return DBHandlerResponse(data=tables_df)
except snowflake.connector.errors.Error as e:
return DBHandlerResponse(data=None, error=str(e))

def get_columns(self, table_name: str) -> DBHandlerResponse:
"""
Method to retrieve the columns of the specified table from the database.
Args:
table_name (str): name of the table whose columns are to be retrieved.
Returns:
DBHandlerStatus
"""
if not self.connection:
return DBHandlerResponse(data=None, error="Not connected to the database.")

try:
query = f"SELECT column_name as name, data_type as dtype FROM information_schema.columns WHERE table_name='{table_name}'"
cursor = self.connection.cursor()
cursor.execute(query)
columns_df = self._fetch_results_as_df(cursor)
columns_df["dtype"] = columns_df["dtype"].apply(
self._snowflake_to_python_types
)
return DBHandlerResponse(data=columns_df)
except snowflake.connector.errors.Error as e:
return DBHandlerResponse(data=None, error=str(e))

def _fetch_results_as_df(self, cursor):
"""
Fetch results from the cursor for the executed query and return the
query results as dataframe.
"""
try:
res = cursor.fetchall()
res_df = pd.DataFrame(
res, columns=[desc[0].lower() for desc in cursor.description]
)
return res_df
except snowflake.connector.errors.ProgrammingError as e:
if str(e) == "no results to fetch":
return pd.DataFrame({"status": ["success"]})
raise e

def execute_native_query(self, query_string: str) -> DBHandlerResponse:
"""
Executes the native query on the database.
Args:
query_string (str): query in native format
Returns:
DBHandlerResponse
"""
if not self.connection:
return DBHandlerResponse(data=None, error="Not connected to the database.")

try:
cursor = self.connection.cursor()
cursor.execute(query_string)
return DBHandlerResponse(data=self._fetch_results_as_df(cursor))
except snowflake.connector.errors.Error as e:
return DBHandlerResponse(data=None, error=str(e))

def _snowflake_to_python_types(self, snowflake_type: str):
mapping = {
"TEXT": str,
"NUMBER": int,
"INT": int,
"DECIMAL": float,
"STRING": str,
"CHAR": str,
"BOOLEAN": bool,
"BINARY": bytes,
"DATE": datetime.date,
"TIME": datetime.time,
"TIMESTAMP": datetime.datetime
# Add more mappings as needed
}

if snowflake_type in mapping:
return mapping[snowflake_type]
else:
raise Exception(
f"Unsupported column {snowflake_type} encountered in the snowflake. Please raise a feature request!"
)
22 changes: 22 additions & 0 deletions test/third_party_tests/test_native_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,28 @@ def test_should_run_query_in_clickhouse(self):
self._execute_native_query()
self._execute_evadb_query()

@pytest.mark.skip(
reason="Snowflake does not come with a free version of account, so integration test is not feasible"
)
def test_should_run_query_in_snowflake(self):
# Create database.
params = {
"user": "eva",
"password": "password",
"account": "account_number",
"database": "EVADB",
"schema": "SAMPLE_DATA",
"warehouse": "warehouse",
}
query = f"""CREATE DATABASE test_data_source
WITH ENGINE = "snowflake",
PARAMETERS = {params};"""
execute_query_fetch_all(self.evadb, query)

# Test executions.
self._execute_native_query()
self._execute_evadb_query()

def test_should_run_query_in_sqlite(self):
# Create database.
import os
Expand Down
Loading

0 comments on commit c637a71

Please sign in to comment.