Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SnowFlake Integration for EvaDB #1289

Merged
merged 10 commits into from
Oct 20, 2023
1 change: 1 addition & 0 deletions docs/_toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ parts:
- file: source/reference/databases/mariadb
- file: source/reference/databases/clickhouse
- file: source/reference/databases/github
- file: source/reference/databases/snowflake

- file: source/reference/vector_databases/index
title: Vector Databases
Expand Down
47 changes: 47 additions & 0 deletions docs/source/reference/databases/snowflake.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
Snowflake
==========

The connection to Snowflake is based on the `snowflake-connector-python <https://pypi.org/project/snowflake-connector-python/>`_ library.

Dependency
----------

* snowflake-connector-python

Parameters
----------

Required:

* `user` is the database user.
* `password` is the snowflake account password.
* `database` is the database name.
* `warehouse` is the snowflake warehouse name.
* `account` is the snowflake account number ( can be found in the url ).
* `schema` is the schema name.


.. warning::

Provide the parameters of an already running ``Snowflake`` Data Warehouse. EvaDB only connects to an existing ``Snowflake`` Data Warehouse.

Create Connection
-----------------

.. code-block:: text

CREATE DATABASE snowflake_data WITH ENGINE = 'snowflake', PARAMETERS = {
"user": "<username>",
"password": "<my_password>"
"account": "<account_number>",
"database": "EVADB",
"warehouse": "COMPUTE_WH",
"schema": "SAMPLE_DATA"
};

.. warning::

| In Snowflake Terminology, ``Database`` and ``Schema`` refer to the following.
| A database is a logical grouping of schemas. Each database belongs to a single Snowflake account.
| A schema is a logical grouping of database objects (tables, views, etc.). Each schema belongs to a single database.

2 changes: 2 additions & 0 deletions evadb/third_party/databases/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ def _get_database_handler(engine: str, **kwargs):
return mod.MariaDbHandler(engine, **kwargs)
elif engine == "clickhouse":
return mod.ClickHouseHandler(engine, **kwargs)
elif engine == "snowflake":
return mod.SnowFlakeDbHandler(engine, **kwargs)
elif engine == "github":
return mod.GithubHandler(engine, **kwargs)
else:
Expand Down
15 changes: 15 additions & 0 deletions evadb/third_party/databases/snowflake/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# coding=utf-8
# Copyright 2018-2023 EvaDB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""snowflake integrations"""
3 changes: 3 additions & 0 deletions evadb/third_party/databases/snowflake/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
snowflake-connector-python
pyarrow
pandas
184 changes: 184 additions & 0 deletions evadb/third_party/databases/snowflake/snowflake_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
# coding=utf-8
# Copyright 2018-2023 EvaDB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime

import pandas as pd
import snowflake.connector

from evadb.third_party.databases.types import (
DBHandler,
DBHandlerResponse,
DBHandlerStatus,
)


class SnowFlakeDbHandler(DBHandler):

"""
Class for implementing the SnowFlake DB handler as a backend store for
EvaDB.
"""

def __init__(self, name: str, **kwargs):
"""
Initialize the handler.
Args:
name (str): name of the DB handler instance
**kwargs: arbitrary keyword arguments for establishing the connection.
"""
super().__init__(name)
self.user = kwargs.get("user")
self.password = kwargs.get("password")
self.database = kwargs.get("database")
self.warehouse = kwargs.get("warehouse")
self.account = kwargs.get("account")
self.schema = kwargs.get("schema")

def connect(self):
"""
Establish connection to the database.
Returns:
DBHandlerStatus
"""
try:
self.connection = snowflake.connector.connect(
user=self.user,
password=self.password,
database=self.database,
warehouse=self.warehouse,
schema=self.schema,
account=self.account,
)
# Auto commit is off by default.
self.connection.autocommit = True
return DBHandlerStatus(status=True)
except snowflake.connector.errors.Error as e:
return DBHandlerStatus(status=False, error=str(e))

def disconnect(self):
"""
Disconnect from the database.
"""
if self.connection:
self.connection.close()

def check_connection(self) -> DBHandlerStatus:
"""
Method for checking the status of database connection.
Returns:
DBHandlerStatus
"""
if self.connection:
return DBHandlerStatus(status=True)
else:
return DBHandlerStatus(status=False, error="Not connected to the database.")

def get_tables(self) -> DBHandlerResponse:
"""
Method to get the list of tables from database.
Returns:
DBHandlerStatus
"""
if not self.connection:
return DBHandlerResponse(data=None, error="Not connected to the database.")

try:
query = f"SELECT table_name as table_name FROM information_schema.tables WHERE table_schema='{self.schema}'"
cursor = self.connection.cursor()
cursor.execute(query)
tables_df = self._fetch_results_as_df(cursor)
return DBHandlerResponse(data=tables_df)
except snowflake.connector.errors.Error as e:
return DBHandlerResponse(data=None, error=str(e))

def get_columns(self, table_name: str) -> DBHandlerResponse:
"""
Method to retrieve the columns of the specified table from the database.
Args:
table_name (str): name of the table whose columns are to be retrieved.
Returns:
DBHandlerStatus
"""
if not self.connection:
return DBHandlerResponse(data=None, error="Not connected to the database.")

try:
query = f"SELECT column_name as name, data_type as dtype FROM information_schema.columns WHERE table_name='{table_name}'"
cursor = self.connection.cursor()
cursor.execute(query)
columns_df = self._fetch_results_as_df(cursor)
columns_df["dtype"] = columns_df["dtype"].apply(
self._snowflake_to_python_types
)
return DBHandlerResponse(data=columns_df)
except snowflake.connector.errors.Error as e:
return DBHandlerResponse(data=None, error=str(e))

def _fetch_results_as_df(self, cursor):
"""
Fetch results from the cursor for the executed query and return the
query results as dataframe.
"""
try:
res = cursor.fetchall()
res_df = pd.DataFrame(
res, columns=[desc[0].lower() for desc in cursor.description]
)
return res_df
except snowflake.connector.errors.ProgrammingError as e:
if str(e) == "no results to fetch":
return pd.DataFrame({"status": ["success"]})
raise e

def execute_native_query(self, query_string: str) -> DBHandlerResponse:
"""
Executes the native query on the database.
Args:
query_string (str): query in native format
Returns:
DBHandlerResponse
"""
if not self.connection:
return DBHandlerResponse(data=None, error="Not connected to the database.")

try:
cursor = self.connection.cursor()
cursor.execute(query_string)
return DBHandlerResponse(data=self._fetch_results_as_df(cursor))
except snowflake.connector.errors.Error as e:
return DBHandlerResponse(data=None, error=str(e))

def _snowflake_to_python_types(self, snowflake_type: str):
mapping = {
"TEXT": str,
"NUMBER": int,
"INT": int,
"DECIMAL": float,
"STRING": str,
"CHAR": str,
"BOOLEAN": bool,
"BINARY": bytes,
"DATE": datetime.date,
"TIME": datetime.time,
"TIMESTAMP": datetime.datetime
# Add more mappings as needed
}

if snowflake_type in mapping:
return mapping[snowflake_type]
else:
raise Exception(
f"Unsupported column {snowflake_type} encountered in the snowflake. Please raise a feature request!"
)
22 changes: 22 additions & 0 deletions test/third_party_tests/test_native_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,28 @@ def test_should_run_query_in_clickhouse(self):
self._execute_native_query()
self._execute_evadb_query()

@pytest.mark.skip(
reason="Snowflake does not come with a free version of account, so integration test is not feasible"
)
def test_should_run_query_in_snowflake(self):
# Create database.
params = {
"user": "eva",
"password": "password",
"account": "account_number",
"database": "EVADB",
"schema": "SAMPLE_DATA",
"warehouse": "warehouse",
}
xzdandy marked this conversation as resolved.
Show resolved Hide resolved
query = f"""CREATE DATABASE test_data_source
WITH ENGINE = "snowflake",
PARAMETERS = {params};"""
execute_query_fetch_all(self.evadb, query)

# Test executions.
self._execute_native_query()
self._execute_evadb_query()

def test_should_run_query_in_sqlite(self):
# Create database.
import os
Expand Down
Loading