Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refreshing snowflake schema w/o waking cluster #4285

Merged
merged 3 commits into from
Dec 2, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 39 additions & 19 deletions redash/query_runner/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def determine_type(cls, data_type, scale):
return TYPE_FLOAT
return t

def run_query(self, query, user):
def _get_connection(self):
region = self.configuration.get('region')

# for us-west we don't need to pass a region (and if we do, it fails to connect)
Expand All @@ -81,6 +81,19 @@ def run_query(self, query, user):
region=region
)

return connection

def _parse_results(self, cursor):
columns = self.fetch_columns(
[(i[0], self.determine_type(i[1], i[5])) for i in cursor.description])
rows = [dict(zip((column['name'] for column in columns), row))
for row in cursor]

data = {'columns': columns, 'rows': rows}
return data

def run_query(self, query, user):
connection = self._get_connection()
cursor = connection.cursor()

try:
Expand All @@ -90,12 +103,7 @@ def run_query(self, query, user):

cursor.execute(query)

columns = self.fetch_columns(
[(i[0], self.determine_type(i[1], i[5])) for i in cursor.description])
rows = [dict(zip((column['name'] for column in columns), row))
for row in cursor]

data = {'columns': columns, 'rows': rows}
data = self._parse_results(cursor)
error = None
json_data = json_dumps(data)
finally:
Expand All @@ -104,30 +112,42 @@ def run_query(self, query, user):

return json_data, error

def _run_query_without_warehouse(self, query):
connection = self._get_connection()
cursor = connection.cursor()
monicagangwar marked this conversation as resolved.
Show resolved Hide resolved

try:
cursor.execute("USE {}".format(self.configuration['database']))

cursor.execute(query)

data = self._parse_results(cursor)
error = None
finally:
cursor.close()
connection.close()

return data, error

def get_schema(self, get_stats=False):
query = """
SELECT col.table_schema,
col.table_name,
col.column_name
FROM {database}.information_schema.columns col
WHERE col.table_schema <> 'INFORMATION_SCHEMA'
SHOW COLUMNS IN DATABASE {database}
""".format(database=self.configuration['database'])

results, error = self.run_query(query, None)
results, error = self._run_query_without_warehouse(query)

if error is not None:
raise Exception("Failed getting schema.")

schema = {}
results = json_loads(results)

for row in results['rows']:
table_name = '{}.{}'.format(row['TABLE_SCHEMA'], row['TABLE_NAME'])
if row['kind'] == 'COLUMN':
table_name = '{}.{}'.format(row['schema_name'], row['table_name'])

if table_name not in schema:
schema[table_name] = {'name': table_name, 'columns': []}
if table_name not in schema:
schema[table_name] = {'name': table_name, 'columns': []}

schema[table_name]['columns'].append(row['COLUMN_NAME'])
schema[table_name]['columns'].append(row['column_name'])

return list(schema.values())

Expand Down