diff --git a/client/app/assets/less/ant.less b/client/app/assets/less/ant.less index 70a481ce9d..808ce5a208 100644 --- a/client/app/assets/less/ant.less +++ b/client/app/assets/less/ant.less @@ -14,6 +14,7 @@ @import '~antd/lib/radio/style/index'; @import '~antd/lib/time-picker/style/index'; @import '~antd/lib/pagination/style/index'; +@import '~antd/lib/drawer/style/index'; @import '~antd/lib/table/style/index'; @import '~antd/lib/popover/style/index'; @import '~antd/lib/icon/style/index'; diff --git a/client/app/assets/less/inc/schema-browser.less b/client/app/assets/less/inc/schema-browser.less index 0034391086..d547a78790 100644 --- a/client/app/assets/less/inc/schema-browser.less +++ b/client/app/assets/less/inc/schema-browser.less @@ -7,14 +7,14 @@ div.table-name { border-radius: @redash-radius; position: relative; - .copy-to-editor { + .copy-to-editor, .info { display: none; } &:hover { background: fade(@redash-gray, 10%); - .copy-to-editor { + .copy-to-editor, .info { display: flex; } } @@ -36,7 +36,7 @@ div.table-name { background: transparent; } - .copy-to-editor { + .copy-to-editor, .info { color: fade(@redash-gray, 90%); cursor: pointer; position: absolute; @@ -49,6 +49,10 @@ div.table-name { justify-content: center; } + .info { + right: 20px + } + .table-open { padding: 0 22px 0 26px; overflow: hidden; @@ -56,14 +60,14 @@ div.table-name { white-space: nowrap; position: relative; - .copy-to-editor { + .copy-to-editor, .info { display: none; } &:hover { background: fade(@redash-gray, 10%); - .copy-to-editor { + .copy-to-editor, .info { display: flex; } } diff --git a/client/app/components/proptypes.js b/client/app/components/proptypes.js index 05b585904d..b569545aba 100644 --- a/client/app/components/proptypes.js +++ b/client/app/components/proptypes.js @@ -11,6 +11,13 @@ export const DataSource = PropTypes.shape({ type_name: PropTypes.string, }); +export const DataSourceMetadata = PropTypes.shape({ + key: PropTypes.number, + name: PropTypes.string, + type: PropTypes.string, + example: PropTypes.string, +}); + export const Table = PropTypes.shape({ columns: PropTypes.arrayOf(PropTypes.string).isRequired, }); diff --git a/client/app/components/queries/SchemaData.jsx b/client/app/components/queries/SchemaData.jsx new file mode 100644 index 0000000000..664ec27972 --- /dev/null +++ b/client/app/components/queries/SchemaData.jsx @@ -0,0 +1,83 @@ +import React from 'react'; +import PropTypes from 'prop-types'; +import { react2angular } from 'react2angular'; +import Drawer from 'antd/lib/drawer'; +import Table from 'antd/lib/table'; + +import { DataSourceMetadata } from '@/components/proptypes'; + +function textWrapRenderer(text) { + return ( +
+ {text} +
+ ); +} + +class SchemaData extends React.PureComponent { + static propTypes = { + show: PropTypes.bool.isRequired, + onClose: PropTypes.func.isRequired, + tableName: PropTypes.string, + tableMetadata: PropTypes.arrayOf(DataSourceMetadata), + }; + + static defaultProps = { + tableName: '', + tableMetadata: [], + }; + + render() { + const columns = [{ + title: 'Column Name', + dataIndex: 'name', + width: 400, + key: 'name', + render: textWrapRenderer, + }, { + title: 'Column Type', + dataIndex: 'type', + width: 400, + key: 'type', + render: textWrapRenderer, + }]; + + const hasExample = + this.props.tableMetadata.some(columnMetadata => columnMetadata.example); + + if (hasExample) { + columns.push({ + title: 'Example', + dataIndex: 'example', + width: 400, + key: 'example', + render: textWrapRenderer, + }); + } + + return ( + + + + ); + } +} + +export default function init(ngModule) { + ngModule.component('schemaData', react2angular(SchemaData, null, [])); +} + +init.init = true; diff --git a/client/app/components/queries/schema-browser.html b/client/app/components/queries/schema-browser.html index fe7e26669e..da96134170 100644 --- a/client/app/components/queries/schema-browser.html +++ b/client/app/components/queries/schema-browser.html @@ -26,15 +26,24 @@ {{table.name}} ({{table.size}}) +
-
{{column}} +
+ {{column.name}} + ng-click="$ctrl.itemSelected($event, [column.name])">
+ diff --git a/client/app/components/queries/schema-browser.js b/client/app/components/queries/schema-browser.js index ded89d09bc..d6f692e753 100644 --- a/client/app/components/queries/schema-browser.js +++ b/client/app/components/queries/schema-browser.js @@ -11,6 +11,17 @@ function SchemaBrowserCtrl($rootScope, $scope) { $scope.$broadcast('vsRepeatTrigger'); }; + $scope.showSchemaInfo = false; + $scope.openSchemaInfo = ($event, tableName, tableMetadata) => { + $scope.tableName = tableName; + $scope.tableMetadata = tableMetadata; + $scope.showSchemaInfo = true; + $event.stopPropagation(); + }; + $scope.closeSchemaInfo = () => { + $scope.$apply(() => { $scope.showSchemaInfo = false; }); + }; + this.getSize = (table) => { let size = 22; diff --git a/migrations/versions/280daa582976_.py b/migrations/versions/280daa582976_.py new file mode 100644 index 0000000000..6bf3b6bd19 --- /dev/null +++ b/migrations/versions/280daa582976_.py @@ -0,0 +1,55 @@ +"""Add column metadata and table metadata + +Revision ID: 280daa582976 +Revises: b8a479422596 +Create Date: 2019-01-24 18:23:53.040608 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '280daa582976' +down_revision = 'b8a479422596' +branch_labels = None +depends_on = None + + +def upgrade(): + op.create_table( + 'table_metadata', + sa.Column('updated_at', sa.DateTime(timezone=True), nullable=False), + sa.Column('created_at', sa.DateTime(timezone=True), nullable=False), + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('org_id', sa.Integer(), nullable=False), + sa.Column('data_source_id', sa.Integer(), nullable=False), + sa.Column('exists', sa.Boolean(), nullable=False), + sa.Column('name', sa.String(length=255), nullable=False), + sa.Column('description', sa.String(length=4096), nullable=True), + sa.Column('column_metadata', sa.Boolean(), nullable=False), + sa.Column('sample_query', sa.Text(), nullable=True), + sa.ForeignKeyConstraint(['data_source_id'], ['data_sources.id'], ondelete="CASCADE"), + sa.ForeignKeyConstraint(['org_id'], ['organizations.id']), + sa.PrimaryKeyConstraint('id') + ) + op.create_table( + 'column_metadata', + sa.Column('updated_at', sa.DateTime(timezone=True), nullable=False), + sa.Column('created_at', sa.DateTime(timezone=True), nullable=False), + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('org_id', sa.Integer(), nullable=False), + sa.Column('table_id', sa.Integer(), nullable=False), + sa.Column('name', sa.String(length=255), nullable=False), + sa.Column('type', sa.String(length=255), nullable=True), + sa.Column('example', sa.String(length=4096), nullable=True), + sa.Column('exists', sa.Boolean(), nullable=False), + sa.ForeignKeyConstraint(['table_id'], ['table_metadata.id'], ondelete="CASCADE"), + sa.ForeignKeyConstraint(['org_id'], ['organizations.id']), + sa.PrimaryKeyConstraint('id') + ) + + +def downgrade(): + op.drop_table('column_metadata') + op.drop_table('table_metadata') diff --git a/migrations/versions/6adb92e75691_.py b/migrations/versions/6adb92e75691_.py new file mode 100644 index 0000000000..c3997d0c9b --- /dev/null +++ b/migrations/versions/6adb92e75691_.py @@ -0,0 +1,25 @@ +"""Add sample_updated_at column to table_metadata + +Revision ID: 6adb92e75691 +Revises: 280daa582976 +Create Date: 2019-04-10 20:13:13.714589 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '6adb92e75691' +down_revision = '280daa582976' +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column('table_metadata', sa.Column( + 'sample_updated_at', sa.DateTime(timezone=True), nullable=True)) + + +def downgrade(): + op.drop_column('table_metadata', 'sample_updated_at') diff --git a/redash/cli/data_sources.py b/redash/cli/data_sources.py index 76e54a88c7..d0cb39ebaa 100644 --- a/redash/cli/data_sources.py +++ b/redash/cli/data_sources.py @@ -9,6 +9,7 @@ from redash import models from redash.query_runner import (get_configuration_schema_for_query_runner_type, query_runners) +from redash.tasks import refresh_samples from redash.utils import json_loads from redash.utils.configuration import ConfigurationContainer @@ -110,7 +111,7 @@ def new(name=None, type=None, options=None, organization='default'): options_obj = {} - for k, prop in schema['properties'].iteritems(): + for k, prop in sorted(schema['properties'].iteritems()): required = k in schema.get('required', []) default_value = "<>" if required: @@ -172,6 +173,27 @@ def update_attr(obj, attr, new_value): setattr(obj, attr, new_value) +@manager.command() +@click.argument('name') +@click.option('--org', 'organization', default='default', + help="The organization the user belongs to (leave blank for " + "'default').") +@click.option('--count', 'num_tables', default=50, + help="number of tables to process data samples for") +def refresh_data_samples(name, num_tables=50, organization='default'): + """Refresh table samples by data source name.""" + try: + org = models.Organization.get_by_slug(organization) + data_source = models.DataSource.query.filter( + models.DataSource.name == name, + models.DataSource.org == org).one() + print("Refreshing samples for data source: {} (id={})".format(name, data_source.id)) + refresh_samples(data_source.id, num_tables) + except NoResultFound: + print("Couldn't find data source named: {}".format(name)) + exit(1) + + @manager.command() @click.argument('name') @click.option('--name', 'new_name', default=None, diff --git a/redash/handlers/data_sources.py b/redash/handlers/data_sources.py index 82396925b7..e42393bf63 100644 --- a/redash/handlers/data_sources.py +++ b/redash/handlers/data_sources.py @@ -6,10 +6,11 @@ from six import text_type from sqlalchemy.exc import IntegrityError -from redash import models +from redash import models, settings from redash.handlers.base import BaseResource, get_object_or_404, require_fields from redash.permissions import (require_access, require_admin, require_permission, view_only) +from redash.tasks.queries import refresh_schema from redash.query_runner import (get_configuration_schema_for_query_runner_type, query_runners, NotSupported) from redash.utils import filter_none @@ -52,6 +53,9 @@ def post(self, data_source_id): data_source.name = req['name'] models.db.session.add(data_source) + # Refresh the stored schemas when a data source is updated + refresh_schema.apply_async(args=(data_source.id,), queue=settings.SCHEMAS_REFRESH_QUEUE) + try: models.db.session.commit() except IntegrityError as e: @@ -127,6 +131,9 @@ def post(self): options=config) models.db.session.commit() + + # Refresh the stored schemas when a new data source is added to the list + refresh_schema.apply_async(args=(datasource.id,), queue=settings.SCHEMAS_REFRESH_QUEUE) except IntegrityError as e: models.db.session.rollback() if req['name'] in e.message: @@ -150,9 +157,10 @@ def get(self, data_source_id): refresh = request.args.get('refresh') is not None response = {} - try: - response['schema'] = data_source.get_schema(refresh) + if refresh: + refresh_schema.apply_async(args=(data_source.id,), queue=settings.SCHEMAS_REFRESH_QUEUE) + response['schema'] = data_source.get_schema() except NotSupported: response['error'] = { 'code': 1, diff --git a/redash/models/__init__.py b/redash/models/__init__.py index c85a394300..1c73df6345 100644 --- a/redash/models/__init__.py +++ b/redash/models/__init__.py @@ -7,6 +7,7 @@ import pytz import xlsxwriter +from operator import itemgetter from six import python_2_unicode_compatible, text_type from sqlalchemy import distinct, or_, and_, UniqueConstraint from sqlalchemy.dialects import postgresql @@ -65,6 +66,66 @@ def get(self, query_id): scheduled_queries_executions = ScheduledQueriesExecutions() +@python_2_unicode_compatible +@generic_repr('id', 'name', 'data_source_id', 'org_id', 'exists', 'column_metadata') +class TableMetadata(TimestampMixin, db.Model): + id = Column(db.Integer, primary_key=True) + org_id = Column(db.Integer, db.ForeignKey("organizations.id")) + data_source_id = Column(db.Integer, db.ForeignKey("data_sources.id", ondelete="CASCADE")) + exists = Column(db.Boolean, default=True) + name = Column(db.String(255)) + description = Column(db.String(4096), nullable=True) + column_metadata = Column(db.Boolean, default=False) + sample_query = Column("sample_query", db.Text, nullable=True) + sample_updated_at = Column(db.DateTime(True), nullable=True) + + __tablename__ = 'table_metadata' + + def __str__(self): + return text_type(self.name) + + def to_dict(self): + return { + 'id': self.id, + 'org_id': self.org_id, + 'data_source_id': self.data_source_id, + 'exists': self.exists, + 'name': self.name, + 'description': self.description, + 'column_metadata': self.column_metadata, + 'sample_query': self.sample_query, + 'sample_updated_at': self.sample_updated_at, + } + + +@python_2_unicode_compatible +@generic_repr('id', 'name', 'type', 'table_id', 'org_id', 'exists') +class ColumnMetadata(TimestampMixin, db.Model): + id = Column(db.Integer, primary_key=True) + org_id = Column(db.Integer, db.ForeignKey("organizations.id")) + table_id = Column(db.Integer, db.ForeignKey("table_metadata.id", ondelete="CASCADE")) + name = Column(db.String(255)) + type = Column(db.String(255), nullable=True) + example = Column(db.String(4096), nullable=True) + exists = Column(db.Boolean, default=True) + + __tablename__ = 'column_metadata' + + def __str__(self): + return text_type(self.name) + + def to_dict(self): + return { + 'id': self.id, + 'org_id': self.org_id, + 'table_id': self.table_id, + 'name': self.name, + 'type': self.type, + 'example': self.example, + 'exists': self.exists, + } + + @python_2_unicode_compatible @generic_repr('id', 'name', 'type', 'org_id', 'created_at') class DataSource(BelongsToOrgMixin, db.Model): @@ -145,22 +206,42 @@ def delete(self): db.session.commit() return res - def get_schema(self, refresh=False): - key = "data_source:schema:{}".format(self.id) - - cache = None - if not refresh: - cache = redis_connection.get(key) - - if cache is None: - query_runner = self.query_runner - schema = sorted(query_runner.get_schema(get_stats=refresh), key=lambda t: t['name']) - - redis_connection.set(key, json_dumps(schema)) - else: - schema = json_loads(cache) - - return schema + def get_schema(self): + schema = [] + columns_by_table_id = {} + + tables = TableMetadata.query.filter( + TableMetadata.data_source_id == self.id, + TableMetadata.exists.is_(True), + ).all() + table_ids = [table.id for table in tables] + + columns = ColumnMetadata.query.filter( + ColumnMetadata.exists.is_(True), + ColumnMetadata.table_id.in_(table_ids), + ).all() + + for column in columns: + columns_by_table_id.setdefault(column.table_id, []).append({ + 'key': column.id, + 'name': column.name, + 'type': column.type, + 'exists': column.exists, + 'example': column.example + }) + + for table in tables: + table_info = { + 'name': table.name, + 'exists': table.exists, + 'hasColumnMetadata': table.column_metadata, + 'columns': []} + + table_info['columns'] = sorted( + columns_by_table_id.get(table.id, []), key=itemgetter('name')) + schema.append(table_info) + + return sorted(schema, key=itemgetter('name')) def _pause_key(self): return 'ds:{}:pause'.format(self.id) diff --git a/redash/query_runner/__init__.py b/redash/query_runner/__init__.py index 1162f181bf..6106a61b7c 100644 --- a/redash/query_runner/__init__.py +++ b/redash/query_runner/__init__.py @@ -55,6 +55,7 @@ class NotSupported(Exception): class BaseQueryRunner(object): noop_query = None configuration_properties = None + sample_query = None def __init__(self, configuration): self.syntax = 'sql' @@ -125,6 +126,25 @@ def _run_query_internal(self, query): raise Exception("Failed running query [%s]." % query) return json_loads(results)['rows'] + def get_table_sample(self, table_name): + if self.sample_query is None: + raise NotImplementedError() + + query = self.sample_query.format(table=table_name) + + results, error = self.run_query(query, None) + if error is not None: + logger.exception(error) + raise NotSupported() + + rows = json_loads(results).get('rows', []) + if len(rows) > 0: + sample = rows[0] + else: + sample = {} + + return sample + @classmethod def to_dict(cls): return { diff --git a/redash/query_runner/athena.py b/redash/query_runner/athena.py index 12633363c3..5469223749 100644 --- a/redash/query_runner/athena.py +++ b/redash/query_runner/athena.py @@ -44,6 +44,10 @@ def format(self, operation, parameters=None): class Athena(BaseQueryRunner): noop_query = 'SELECT 1' + # This takes a 1% random sample from {table}, reducing + # the runtime and data scanned for the query + sample_query = "SELECT * FROM {table} TABLESAMPLE SYSTEM (1) LIMIT 1" + @classmethod def name(cls): return "Amazon Athena" @@ -89,6 +93,10 @@ def configuration_schema(cls): "default": "_v", "info": "This string will be used to toggle visibility of tables in the schema browser when editing a query in order to remove non-useful tables from sight." }, + 'samples': { + 'type': 'boolean', + 'title': 'Show Data Samples' + }, }, 'required': ['region', 's3_staging_dir'], 'order': ['region', 'aws_access_key', 'aws_secret_key', 's3_staging_dir', 'schema', 'work_group'], @@ -143,9 +151,18 @@ def __get_schema_from_glue(self): table_name = '%s.%s' % (database['Name'], table['Name']) if table_name not in schema: column = [columns['Name'] for columns in table['StorageDescriptor']['Columns']] - schema[table_name] = {'name': table_name, 'columns': column} + metadata = [{ + "name": column_data['Name'], + "type": column_data['Type'] + } for column_data in table['StorageDescriptor']['Columns']] + schema[table_name] = {'name': table_name, 'columns': column, 'metadata': metadata} for partition in table.get('PartitionKeys', []): schema[table_name]['columns'].append(partition['Name']) + schema[table_name]['metadata'].append({ + "name": partition['Name'], + "type": partition['Type'] + }) + return schema.values() def get_schema(self, get_stats=False): @@ -154,7 +171,7 @@ def get_schema(self, get_stats=False): schema = {} query = """ - SELECT table_schema, table_name, column_name + SELECT table_schema, table_name, column_name, data_type AS column_type FROM information_schema.columns WHERE table_schema NOT IN ('information_schema') """ @@ -164,11 +181,17 @@ def get_schema(self, get_stats=False): raise Exception("Failed getting schema.") results = json_loads(results) - for row in results['rows']: + + for i, row in enumerate(results['rows']): table_name = '{0}.{1}'.format(row['table_schema'], row['table_name']) if table_name not in schema: - schema[table_name] = {'name': table_name, 'columns': []} + schema[table_name] = {'name': table_name, 'columns': [], 'metadata': []} + schema[table_name]['columns'].append(row['column_name']) + schema[table_name]['metadata'].append({ + "name": row['column_name'], + "type": row['column_type'], + }) return schema.values() diff --git a/redash/query_runner/mysql.py b/redash/query_runner/mysql.py index a7246020f3..55d513043b 100644 --- a/redash/query_runner/mysql.py +++ b/redash/query_runner/mysql.py @@ -54,7 +54,12 @@ class Mysql(BaseSQLQueryRunner): "default": "_v", "info": "This string will be used to toggle visibility of tables in the schema browser when editing a query in order to remove non-useful tables from sight." }, + 'samples': { + 'type': 'boolean', + 'title': 'Show Data Samples' + }, } + sample_query = "SELECT * FROM {table} LIMIT 1" @classmethod def configuration_schema(cls): @@ -107,7 +112,8 @@ def _get_tables(self, schema): query = """ SELECT col.table_schema as table_schema, col.table_name as table_name, - col.column_name as column_name + col.column_name as column_name, + col.column_type as column_type FROM `information_schema`.`columns` col WHERE col.table_schema NOT IN ('information_schema', 'performance_schema', 'mysql', 'sys'); """ @@ -119,16 +125,20 @@ def _get_tables(self, schema): results = json_loads(results) - for row in results['rows']: + for i, row in enumerate(results['rows']): if row['table_schema'] != self.configuration['db']: table_name = u'{}.{}'.format(row['table_schema'], row['table_name']) else: table_name = row['table_name'] if table_name not in schema: - schema[table_name] = {'name': table_name, 'columns': []} + schema[table_name] = {'name': table_name, 'columns': [], 'metadata': []} schema[table_name]['columns'].append(row['column_name']) + schema[table_name]['metadata'].append({ + "name": row['column_name'], + "type": row['column_type'], + }) return schema.values() diff --git a/redash/query_runner/pg.py b/redash/query_runner/pg.py index 40d7b0f778..4ac34eff9f 100644 --- a/redash/query_runner/pg.py +++ b/redash/query_runner/pg.py @@ -97,7 +97,12 @@ class PostgreSQL(BaseSQLQueryRunner): "default": "_v", "info": "This string will be used to toggle visibility of tables in the schema browser when editing a query in order to remove non-useful tables from sight." }, + "samples": { + "type": "boolean", + "title": "Show Data Samples" + }, } + sample_query = "SELECT * FROM {table} LIMIT 1" @classmethod def configuration_schema(cls): @@ -128,9 +133,13 @@ def _get_definitions(self, schema, query): table_name = row['table_name'] if table_name not in schema: - schema[table_name] = {'name': table_name, 'columns': []} + schema[table_name] = {'name': table_name, 'columns': [], 'metadata': []} schema[table_name]['columns'].append(row['column_name']) + schema[table_name]['metadata'].append({ + "name": row['column_name'], + "type": row['column_type'], + }) def _get_tables(self, schema): ''' @@ -150,7 +159,8 @@ def _get_tables(self, schema): query = """ SELECT s.nspname as table_schema, c.relname as table_name, - a.attname as column_name + a.attname as column_name, + a.atttypid::regtype::varchar as column_type FROM pg_class c JOIN pg_namespace s ON c.relnamespace = s.oid @@ -159,13 +169,16 @@ def _get_tables(self, schema): ON a.attrelid = c.oid AND a.attnum > 0 AND NOT a.attisdropped + JOIN pg_type t + ON a.atttypid = t.oid WHERE c.relkind IN ('m', 'f', 'p') UNION SELECT table_schema, table_name, - column_name + column_name, + data_type as column_type FROM information_schema.columns WHERE table_schema NOT IN ('pg_catalog', 'information_schema') """ @@ -250,6 +263,10 @@ class Redshift(PostgreSQL): "default": "_v", "info": "This string will be used to toggle visibility of tables in the schema browser when editing a query in order to remove non-useful tables from sight." }, + "samples": { + "type": "boolean", + "title": "Show Data Samples" + }, } @classmethod @@ -294,11 +311,12 @@ def _get_tables(self, schema): SELECT DISTINCT table_name, table_schema, column_name, + data_type AS column_type, ordinal_position AS pos FROM svv_columns WHERE table_schema NOT IN ('pg_internal','pg_catalog','information_schema') ) - SELECT table_name, table_schema, column_name + SELECT table_name, table_schema, column_name, column_type FROM tables WHERE HAS_SCHEMA_PRIVILEGE(table_schema, 'USAGE') AND diff --git a/redash/query_runner/presto.py b/redash/query_runner/presto.py index bd8a386994..22ffeac3dd 100644 --- a/redash/query_runner/presto.py +++ b/redash/query_runner/presto.py @@ -60,8 +60,16 @@ class Presto(BaseQueryRunner): "default": "_v", "info": "This string will be used to toggle visibility of tables in the schema browser when editing a query in order to remove non-useful tables from sight." }, + 'samples': { + 'type': 'boolean', + 'title': 'Show Data Samples' + }, } + # This takes a 1% random sample from {table}, reducing + # the runtime and data scanned for the query + sample_query = "SELECT * FROM {table} TABLESAMPLE SYSTEM (1) LIMIT 1" + @classmethod def configuration_schema(cls): return { @@ -82,13 +90,12 @@ def type(cls): def get_schema(self, get_stats=False): schema = {} query = """ - SELECT table_schema, table_name, column_name + SELECT table_schema, table_name, column_name, data_type AS column_type FROM information_schema.columns WHERE table_schema NOT IN ('pg_catalog', 'information_schema') """ results, error = self.run_query(query, None) - if error is not None: raise Exception("Failed getting schema.") @@ -96,12 +103,14 @@ def get_schema(self, get_stats=False): for row in results['rows']: table_name = '{}.{}'.format(row['table_schema'], row['table_name']) - if table_name not in schema: - schema[table_name] = {'name': table_name, 'columns': []} + schema[table_name] = {'name': table_name, 'columns': [], 'metadata': []} schema[table_name]['columns'].append(row['column_name']) - + schema[table_name]['metadata'].append({ + "name": row['column_name'], + "type": row['column_type'], + }) return schema.values() def run_query(self, query, user): diff --git a/redash/settings/__init__.py b/redash/settings/__init__.py index 864d04385a..e51aba1543 100644 --- a/redash/settings/__init__.py +++ b/redash/settings/__init__.py @@ -36,8 +36,9 @@ QUERY_RESULTS_CLEANUP_COUNT = int(os.environ.get("REDASH_QUERY_RESULTS_CLEANUP_COUNT", "100")) QUERY_RESULTS_CLEANUP_MAX_AGE = int(os.environ.get("REDASH_QUERY_RESULTS_CLEANUP_MAX_AGE", "7")) -SCHEMAS_REFRESH_SCHEDULE = int(os.environ.get("REDASH_SCHEMAS_REFRESH_SCHEDULE", 30)) +SCHEMAS_REFRESH_SCHEDULE = int(os.environ.get("REDASH_SCHEMAS_REFRESH_SCHEDULE", 360)) SCHEMAS_REFRESH_QUEUE = os.environ.get("REDASH_SCHEMAS_REFRESH_QUEUE", "celery") +SAMPLES_REFRESH_QUEUE = os.environ.get("REDASH_SAMPLES_REFRESH_QUEUE", "celery") AUTH_TYPE = os.environ.get("REDASH_AUTH_TYPE", "api_key") INVITATION_TOKEN_MAX_AGE = int(os.environ.get("REDASH_INVITATION_TOKEN_MAX_AGE", 60 * 60 * 24 * 7)) @@ -330,6 +331,12 @@ def email_server_is_configured(): # Enhance schema fetching SCHEMA_RUN_TABLE_SIZE_CALCULATIONS = parse_boolean(os.environ.get("REDASH_SCHEMA_RUN_TABLE_SIZE_CALCULATIONS", "false")) +# Frequency of clearing out old schema metadata. +SCHEMA_METADATA_TTL_DAYS = int(os.environ.get("REDASH_SCHEMA_METADATA_TTL_DAYS", 60)) + +# Frequency of schema samples refresh +SCHEMA_SAMPLE_REFRESH_FREQUENCY_DAYS = int(os.environ.get("REDASH_SCHEMA_SAMPLE_REFRESH_FREQUENCY_DAYS", 14)) + # kylin KYLIN_OFFSET = int(os.environ.get('REDASH_KYLIN_OFFSET', 0)) KYLIN_LIMIT = int(os.environ.get('REDASH_KYLIN_LIMIT', 50000)) diff --git a/redash/tasks/__init__.py b/redash/tasks/__init__.py index e5de680381..da4a3bd7ce 100644 --- a/redash/tasks/__init__.py +++ b/redash/tasks/__init__.py @@ -1,3 +1,3 @@ from .general import record_event, version_check, send_mail, sync_user_details -from .queries import QueryTask, refresh_queries, refresh_schemas, cleanup_query_results, execute_query +from .queries import QueryTask, refresh_queries, refresh_schemas, refresh_schema, cleanup_query_results, execute_query, update_sample, cleanup_schema_metadata, refresh_samples from .alerts import check_alerts_for_query diff --git a/redash/tasks/queries.py b/redash/tasks/queries.py index c666d5eeef..bd4a688b25 100644 --- a/redash/tasks/queries.py +++ b/redash/tasks/queries.py @@ -1,15 +1,20 @@ import logging import signal import time +import datetime import redis +from celery import group from celery.exceptions import SoftTimeLimitExceeded, TimeLimitExceeded from celery.result import AsyncResult from celery.utils.log import get_task_logger from six import text_type +from sqlalchemy.orm import load_only +from sqlalchemy import or_ -from redash import models, redis_connection, settings, statsd_client -from redash.query_runner import InterruptException +from redash import models, redis_connection, settings, statsd_client, utils +from redash.models import TableMetadata, ColumnMetadata, db +from redash.query_runner import InterruptException, NotSupported from redash.tasks.alerts import check_alerts_for_query from redash.utils import gen_query_hash, json_dumps, utcnow, mustache_render from redash.worker import celery @@ -229,13 +234,268 @@ def cleanup_query_results(): logger.info("Deleted %d unused query results.", deleted_count) -@celery.task(name="redash.tasks.refresh_schema", time_limit=90, soft_time_limit=60) +def truncate_long_string(original_str, max_length): + # Remove null characters so we can save as string to postgres + new_str = original_str.replace('\x00', '') + + if new_str and len(new_str) > max_length: + new_str = u'{}...'.format(new_str[:max_length]) + return new_str + + +@celery.task(name="redash.tasks.update_sample") +def update_sample(data_source_id, table_name, table_id): + """ + For a given table, find look up a sample row for it and update + the "example" fields for it in the column_metadata table. + """ + logger.info(u"task=update_sample state=start table_name=%s", table_name) + start_time = time.time() + ds = models.DataSource.get_by_id(data_source_id) + sample = None + try: + sample = ds.query_runner.get_table_sample(table_name) + except NotSupported: + logger.info(u"Unable to fetch samples for {}".format(table_name)) + + if not sample: + return + + persisted_columns = ColumnMetadata.query.filter( + ColumnMetadata.exists.is_(True), + ColumnMetadata.table_id == table_id, + ).options(load_only('id')).all() + + # If a column exists, add a sample to it. + column_examples = [] + for persisted_column in persisted_columns: + column_example = sample.get(persisted_column.name, None) + column_example = column_example if isinstance( + column_example, unicode) else str(column_example) + column_example = truncate_long_string(column_example, 4000) + + column_examples.append({ + "id": persisted_column.id, + "example": column_example + }) + + models.db.session.bulk_update_mappings( + ColumnMetadata, + column_examples + ) + models.db.session.commit() + logger.info(u"task=update_sample state=finished table_name=%s runtime=%.2f", + table_name, time.time() - start_time) + + +@celery.task(name="redash.tasks.refresh_samples") +def refresh_samples(data_source_id, table_sample_limit): + """ + For a given data source, refresh the data samples stored for each + table. This is done for tables with no samples or samples older + than DAYS_AGO + """ + logger.info(u"task=refresh_samples state=start ds_id=%s", data_source_id) + ds = models.DataSource.get_by_id(data_source_id) + + if not ds.query_runner.configuration.get('samples', False): + return + + DAYS_AGO = ( + utils.utcnow() - datetime.timedelta(days=settings.SCHEMA_SAMPLE_REFRESH_FREQUENCY_DAYS)) + + # Find all existing tables that have an empty or old sample_updated_at + tables_to_sample = TableMetadata.query.filter( + TableMetadata.exists.is_(True), + TableMetadata.data_source_id == data_source_id, + or_( + TableMetadata.sample_updated_at.is_(None), + TableMetadata.sample_updated_at < DAYS_AGO + ) + ).limit(table_sample_limit).all() + + tasks = [] + for table in tables_to_sample: + tasks.append( + update_sample.signature( + args=(ds.id, table.name, table.id), + queue=settings.SCHEMAS_REFRESH_QUEUE + ) + ) + table.sample_updated_at = db.func.now() + models.db.session.add(table) + + group(tasks).apply_async() + models.db.session.commit() + + +def cleanup_data_in_table(table_model): + TTL_DAYS_AGO = ( + utils.utcnow() - datetime.timedelta(days=settings.SCHEMA_METADATA_TTL_DAYS)) + + table_model.query.filter( + table_model.exists.is_(False), + table_model.updated_at < TTL_DAYS_AGO + ).delete() + + db.session.commit() + + +@celery.task(name="redash.tasks.cleanup_schema_metadata") +def cleanup_schema_metadata(): + cleanup_data_in_table(TableMetadata) + cleanup_data_in_table(ColumnMetadata) + + +def insert_or_update_table_metadata(data_source, existing_tables_set, table_data): + # Update all persisted tables that exist to reflect this. + persisted_tables = TableMetadata.query.filter( + TableMetadata.name.in_(existing_tables_set), + TableMetadata.data_source_id == data_source.id, + ) + persisted_table_data = [] + for persisted_table in persisted_tables: + # Add IDs to persisted table data so it can be used for updates. + table_data[persisted_table.name]['id'] = persisted_table.id + persisted_table_data.append(table_data[persisted_table.name]) + + models.db.session.bulk_update_mappings( + TableMetadata, + persisted_table_data + ) + + # Find the tables that need to be created by subtracting the sets: + persisted_table_set = set([col_data['name'] for col_data in persisted_table_data]) + tables_to_create = existing_tables_set.difference(persisted_table_set) + + table_metadata = [table_data[table_name] for table_name in tables_to_create] + + models.db.session.bulk_insert_mappings( + TableMetadata, + table_metadata + ) + + +def insert_or_update_column_metadata(table, existing_columns_set, column_data): + persisted_columns = ColumnMetadata.query.filter( + ColumnMetadata.name.in_(existing_columns_set), + ColumnMetadata.table_id == table.id, + ).all() + + persisted_column_data = [] + for persisted_column in persisted_columns: + # Add IDs to persisted column data so it can be used for updates. + column_data[persisted_column.name]['id'] = persisted_column.id + persisted_column_data.append(column_data[persisted_column.name]) + + models.db.session.bulk_update_mappings( + ColumnMetadata, + persisted_column_data + ) + + # Find the columns that need to be created by subtracting the sets: + persisted_column_set = set([col_data['name'] for col_data in persisted_column_data]) + columns_to_create = existing_columns_set.difference(persisted_column_set) + + column_metadata = [column_data[col_name] for col_name in columns_to_create] + + models.db.session.bulk_insert_mappings( + ColumnMetadata, + column_metadata + ) + + +@celery.task(name="redash.tasks.refresh_schema", time_limit=600, soft_time_limit=300) def refresh_schema(data_source_id): ds = models.DataSource.get_by_id(data_source_id) logger.info(u"task=refresh_schema state=start ds_id=%s", ds.id) start_time = time.time() + + MAX_TYPE_STRING_LENGTH = 250 try: - ds.get_schema(refresh=True) + schema = ds.query_runner.get_schema(get_stats=True) + + # Stores data from the updated schema that tells us which + # columns and which tables currently exist + existing_tables_set = set() + existing_columns_set = set() + + # Stores data that will be inserted into postgres + table_data = {} + column_data = {} + + new_column_names = {} + new_column_metadata = {} + for table in schema: + table_name = table['name'] + existing_tables_set.add(table_name) + + table_data[table_name] = { + 'org_id': ds.org_id, + 'name': table_name, + 'data_source_id': ds.id, + 'column_metadata': "metadata" in table, + 'exists': True + } + new_column_names[table_name] = table['columns'] + new_column_metadata[table_name] = table.get('metadata', None) + + insert_or_update_table_metadata(ds, existing_tables_set, table_data) + models.db.session.commit() + + all_existing_persisted_tables = TableMetadata.query.filter( + TableMetadata.exists.is_(True), + TableMetadata.data_source_id == ds.id, + ).all() + + for table in all_existing_persisted_tables: + for i, column in enumerate(new_column_names.get(table.name, [])): + existing_columns_set.add(column) + column_data[column] = { + 'org_id': ds.org_id, + 'table_id': table.id, + 'name': column, + 'type': None, + 'exists': True + } + + if table.column_metadata: + column_type = new_column_metadata[table.name][i]['type'] + column_type = truncate_long_string(column_type, MAX_TYPE_STRING_LENGTH) + column_data[column]['type'] = column_type + + insert_or_update_column_metadata(table, existing_columns_set, column_data) + models.db.session.commit() + + existing_columns_list = list(existing_columns_set) + + # If a column did not exist, set the 'column_exists' flag to false. + ColumnMetadata.query.filter( + ColumnMetadata.exists.is_(True), + ColumnMetadata.table_id == table.id, + ~ColumnMetadata.name.in_(existing_columns_list), + ).update({ + "exists": False, + "updated_at": db.func.now() + }, synchronize_session='fetch') + + # Clear the set for the next round + existing_columns_set.clear() + + # If a table did not exist in the get_schema() response above, + # set the 'exists' flag to false. + existing_tables_list = list(existing_tables_set) + TableMetadata.query.filter( + TableMetadata.exists.is_(True), + TableMetadata.data_source_id == ds.id, + ~TableMetadata.name.in_(existing_tables_list) + ).update({ + "exists": False, + "updated_at": db.func.now() + }, synchronize_session='fetch') + + models.db.session.commit() + logger.info(u"task=refresh_schema state=finished ds_id=%s runtime=%.2f", ds.id, time.time() - start_time) statsd_client.incr('refresh_schema.success') except SoftTimeLimitExceeded: @@ -252,6 +512,7 @@ def refresh_schemas(): """ Refreshes the data sources schemas. """ + TABLE_SAMPLE_LIMIT = 50 blacklist = [int(ds_id) for ds_id in redis_connection.smembers('data_sources:schema:blacklist') if ds_id] global_start_time = time.time() @@ -266,6 +527,7 @@ def refresh_schemas(): logger.info(u"task=refresh_schema state=skip ds_id=%s reason=org_disabled", ds.id) else: refresh_schema.apply_async(args=(ds.id,), queue=settings.SCHEMAS_REFRESH_QUEUE) + refresh_samples.apply_async(args=(ds.id, TABLE_SAMPLE_LIMIT), queue=settings.SAMPLES_REFRESH_QUEUE) logger.info(u"task=refresh_schemas state=finish total_runtime=%.2f", time.time() - global_start_time) diff --git a/redash/worker.py b/redash/worker.py index b46db432e2..c9f716e207 100644 --- a/redash/worker.py +++ b/redash/worker.py @@ -29,6 +29,10 @@ 'sync_user_details': { 'task': 'redash.tasks.sync_user_details', 'schedule': timedelta(minutes=1), + }, + 'cleanup_schema_metadata': { + 'task': 'redash.tasks.cleanup_schema_metadata', + 'schedule': timedelta(days=3), } } diff --git a/tests/factories.py b/tests/factories.py index 06feb3480c..671aaa0ba0 100644 --- a/tests/factories.py +++ b/tests/factories.py @@ -79,6 +79,15 @@ def __call__(self): data_source=data_source_factory.create, org_id=1) +table_metadata_factory = ModelFactory(redash.models.TableMetadata, + data_source_id=1, + exists=True, + name='table') + +column_metadata_factory = ModelFactory(redash.models.ColumnMetadata, + table_id=1, + name='column') + query_with_params_factory = ModelFactory(redash.models.Query, name='New Query with Params', description='', @@ -178,6 +187,12 @@ def create_org(self, **kwargs): return org + def create_table_metadata(self, **kwargs): + return table_metadata_factory.create(**kwargs) + + def create_column_metadata(self, **kwargs): + return column_metadata_factory.create(**kwargs) + def create_user(self, **kwargs): args = { 'org': self.org, diff --git a/tests/models/test_data_sources.py b/tests/models/test_data_sources.py index 037ff77a05..429cbbd5f4 100644 --- a/tests/models/test_data_sources.py +++ b/tests/models/test_data_sources.py @@ -1,4 +1,3 @@ -import mock from tests import BaseTestCase from redash.models import DataSource, Query, QueryResult @@ -7,38 +6,43 @@ class DataSourceTest(BaseTestCase): def test_get_schema(self): - return_value = [{'name': 'table', 'columns': []}] - - with mock.patch('redash.query_runner.pg.PostgreSQL.get_schema') as patched_get_schema: - patched_get_schema.return_value = return_value - - schema = self.factory.data_source.get_schema() - - self.assertEqual(return_value, schema) - - def test_get_schema_uses_cache(self): - return_value = [{'name': 'table', 'columns': []}] - with mock.patch('redash.query_runner.pg.PostgreSQL.get_schema') as patched_get_schema: - patched_get_schema.return_value = return_value - - self.factory.data_source.get_schema() - schema = self.factory.data_source.get_schema() - - self.assertEqual(return_value, schema) - self.assertEqual(patched_get_schema.call_count, 1) - - def test_get_schema_skips_cache_with_refresh_true(self): - return_value = [{'name': 'table', 'columns': []}] - with mock.patch('redash.query_runner.pg.PostgreSQL.get_schema') as patched_get_schema: - patched_get_schema.return_value = return_value - - self.factory.data_source.get_schema() - new_return_value = [{'name': 'new_table', 'columns': []}] - patched_get_schema.return_value = new_return_value - schema = self.factory.data_source.get_schema(refresh=True) + data_source = self.factory.create_data_source() - self.assertEqual(new_return_value, schema) - self.assertEqual(patched_get_schema.call_count, 2) + # Create an existing table with a non-existing column + table_metadata = self.factory.create_table_metadata( + data_source_id=data_source.id, + org_id=data_source.org_id + ) + column_metadata = self.factory.create_column_metadata( + table_id=table_metadata.id, + org_id=data_source.org_id, + type='boolean', + example=True, + exists=False + ) + + # Create a non-existing table with an existing column + table_metadata = self.factory.create_table_metadata( + data_source_id=data_source.id, + org_id=data_source.org_id, + name='table_doesnt_exist', + exists=False + ) + column_metadata = self.factory.create_column_metadata( + table_id=table_metadata.id, + org_id=data_source.org_id, + type='boolean', + example=True, + ) + + return_value = [{ + 'name': 'table', + 'hasColumnMetadata': False, + 'exists': True, + 'columns': [] + }] + schema = data_source.get_schema() + self.assertEqual(return_value, schema) class TestDataSourceCreate(BaseTestCase): diff --git a/tests/query_runner/test_athena.py b/tests/query_runner/test_athena.py index fe444de64f..7c7a139a13 100644 --- a/tests/query_runner/test_athena.py +++ b/tests/query_runner/test_athena.py @@ -72,7 +72,11 @@ def test_external_table(self): {'DatabaseName': 'test1'}, ) with self.stubber: - assert query_runner.get_schema() == [{'columns': ['row_id'], 'name': 'test1.jdbc_table'}] + assert query_runner.get_schema() == [{ + 'columns': ['row_id'], + 'name': 'test1.jdbc_table', + 'metadata': [{'type': 'int', 'name': 'row_id'}] + }] def test_partitioned_table(self): """ @@ -118,7 +122,11 @@ def test_partitioned_table(self): {'DatabaseName': 'test1'}, ) with self.stubber: - assert query_runner.get_schema() == [{'columns': ['sk', 'category'], 'name': 'test1.partitioned_table'}] + assert query_runner.get_schema() == [{ + 'columns': ['sk', 'category'], + 'name': 'test1.partitioned_table', + 'metadata': [{'type': 'int', 'name': 'sk'}, {'type': 'int', 'name': 'category'}] + }] def test_view(self): query_runner = Athena({'glue': True, 'region': 'mars-east-1'}) @@ -150,7 +158,11 @@ def test_view(self): {'DatabaseName': 'test1'}, ) with self.stubber: - assert query_runner.get_schema() == [{'columns': ['sk'], 'name': 'test1.view'}] + assert query_runner.get_schema() == [{ + 'columns': ['sk'], + 'name': 'test1.view', + 'metadata': [{'type': 'int', 'name': 'sk'}] + }] def test_dodgy_table_does_not_break_schema_listing(self): """ @@ -187,4 +199,8 @@ def test_dodgy_table_does_not_break_schema_listing(self): {'DatabaseName': 'test1'}, ) with self.stubber: - assert query_runner.get_schema() == [{'columns': ['region'], 'name': 'test1.csv'}] + assert query_runner.get_schema() == [{ + 'columns': ['region'], + 'name': 'test1.csv', + 'metadata': [{'type': 'string', 'name': 'region'}] + }] diff --git a/tests/query_runner/test_get_schema_format.py b/tests/query_runner/test_get_schema_format.py new file mode 100644 index 0000000000..7ebbf6eb56 --- /dev/null +++ b/tests/query_runner/test_get_schema_format.py @@ -0,0 +1,77 @@ +import json +import mock + +from unittest import TestCase + +from redash.query_runner.presto import Presto +from redash.query_runner.athena import Athena +from redash.query_runner.mysql import Mysql +from redash.query_runner.pg import PostgreSQL, Redshift + +class TestBaseQueryRunner(TestCase): + def setUp(self): + self.query_runners = [{ + 'instance': Presto({}), + 'mock_location': 'presto.Presto' + }, { + 'instance': Athena({}), + 'mock_location': 'athena.Athena' + }, { + 'instance': Mysql({'db': None}), + 'mock_location': 'mysql.Mysql' + }, { + 'instance': PostgreSQL({}), + 'mock_location': 'pg.PostgreSQL' + }, { + 'instance': Redshift({}), + 'mock_location': 'pg.Redshift' + }] + + def _setup_mock(self, function_to_patch): + patcher = mock.patch(function_to_patch) + patched_function = patcher.start() + self.addCleanup(patcher.stop) + return patched_function + + def assert_correct_schema_format(self, query_runner, mock_location): + EXPECTED_SCHEMA_RESULT = [{ + 'columns': ['created_date'], + 'metadata': [{ + 'name': 'created_date', + 'type': 'varchar', + }], + 'name': 'default.table_name' + }] + + get_schema_query_response = { + "rows": [{ + "table_schema": "default", + "table_name": "table_name", + "column_type": "varchar", + "column_name": "created_date" + }] + } + get_samples_query_response = { + "rows": [{ + "created_date": "2017-10-26" + }] + } + + self.run_count = 0 + def query_runner_resonses(query, user): + response = (json.dumps(get_schema_query_response), None) + if self.run_count > 0: + response = (json.dumps(get_samples_query_response), None) + self.run_count += 1 + return response + + self.patched_run_query = self._setup_mock( + 'redash.query_runner.{location}.run_query'.format(location=mock_location)) + self.patched_run_query.side_effect = query_runner_resonses + + schema = query_runner.get_schema() + self.assertEqual(schema, EXPECTED_SCHEMA_RESULT) + + def test_get_schema_format(self): + for runner in self.query_runners: + self.assert_correct_schema_format(runner['instance'], runner['mock_location']) diff --git a/tests/tasks/test_queries.py b/tests/tasks/test_queries.py index 758d6e5402..50a0f5a7ac 100644 --- a/tests/tasks/test_queries.py +++ b/tests/tasks/test_queries.py @@ -3,11 +3,14 @@ import uuid import mock +import datetime from tests import BaseTestCase -from redash import redis_connection, models +from redash import redis_connection, models, utils +from redash.models import TableMetadata from redash.query_runner.pg import PostgreSQL -from redash.tasks.queries import QueryExecutionError, enqueue_query, execute_query +from redash.tasks.queries import (QueryExecutionError, enqueue_query, + execute_query, cleanup_data_in_table) FakeResult = namedtuple('FakeResult', 'id') @@ -124,3 +127,24 @@ def test_success_after_failure(self): scheduled_query_id=q.id) q = models.Query.get_by_id(q.id) self.assertEqual(q.schedule_failures, 0) + + +class TestPruneSchemaMetadata(BaseTestCase): + + def test_cleanup_data_in_table(self): + data_source = self.factory.create_data_source() + + # Create an existing table with a non-existing column + table_metadata = self.factory.create_table_metadata( + data_source_id=data_source.id, + org_id=data_source.org_id, + exists=False, + updated_at=(utils.utcnow() - datetime.timedelta(days=70)) + ) + all_tables = TableMetadata.query.all() + self.assertEqual(len(all_tables), 1) + + cleanup_data_in_table(TableMetadata) + + all_tables = TableMetadata.query.all() + self.assertEqual(len(all_tables), 0) diff --git a/tests/tasks/test_refresh_schemas.py b/tests/tasks/test_refresh_schemas.py index df29f5f207..cdad208b01 100644 --- a/tests/tasks/test_refresh_schemas.py +++ b/tests/tasks/test_refresh_schemas.py @@ -1,10 +1,53 @@ +import copy +import datetime + from mock import patch from tests import BaseTestCase -from redash.tasks import refresh_schemas +from redash import models, utils +from redash.tasks import (refresh_schemas, refresh_schema, + update_sample, refresh_samples) +from redash.models import TableMetadata, ColumnMetadata class TestRefreshSchemas(BaseTestCase): + def setUp(self): + super(TestRefreshSchemas, self).setUp() + + self.COLUMN_NAME = 'first_column' + self.COLUMN_TYPE = 'text' + self.COLUMN_EXAMPLE = 'some text for column value' + self.EXPECTED_COLUMN_METADATA = { + 'id': 1, + 'org_id': 1, + 'table_id': 1, + 'name': self.COLUMN_NAME, + 'type': self.COLUMN_TYPE, + 'example': self.COLUMN_EXAMPLE, + 'exists': True, + } + + get_schema_patcher = patch('redash.query_runner.pg.PostgreSQL.get_schema') + self.patched_get_schema = get_schema_patcher.start() + self.addCleanup(get_schema_patcher.stop) + self.default_schema_return_value = [{ + 'name': 'table', + 'columns': [self.COLUMN_NAME], + 'metadata': [{ + 'name': self.COLUMN_NAME, + 'type': self.COLUMN_TYPE, + }] + }] + self.patched_get_schema.return_value = self.default_schema_return_value + + get_table_sample_patcher = patch('redash.query_runner.BaseQueryRunner.get_table_sample') + patched_get_table_sample = get_table_sample_patcher.start() + self.addCleanup(get_table_sample_patcher.stop) + patched_get_table_sample.return_value = {self.COLUMN_NAME: self.COLUMN_EXAMPLE} + + def tearDown(self): + self.factory.data_source.query_runner.configuration['samples'] = False + def test_calls_refresh_of_all_data_sources(self): self.factory.data_source # trigger creation with patch('redash.tasks.queries.refresh_schema.apply_async') as refresh_job: @@ -23,3 +66,251 @@ def test_skips_paused_data_sources(self): with patch('redash.tasks.queries.refresh_schema.apply_async') as refresh_job: refresh_schemas() refresh_job.assert_called() + + def test_refresh_schema_creates_tables(self): + EXPECTED_TABLE_METADATA = { + 'id': 1, + 'org_id': 1, + 'exists': True, + 'name': 'table', + 'sample_query': None, + 'description': None, + 'column_metadata': True, + 'data_source_id': 1, + 'sample_updated_at': None, + } + + refresh_schema(self.factory.data_source.id) + update_sample( + self.factory.data_source.id, + 'table', + 1 + ) + table_metadata = TableMetadata.query.all() + column_metadata = ColumnMetadata.query.all() + + self.assertEqual(len(table_metadata), 1) + self.assertEqual(len(column_metadata), 1) + self.assertEqual(table_metadata[0].to_dict(), EXPECTED_TABLE_METADATA) + self.assertEqual(column_metadata[0].to_dict(), self.EXPECTED_COLUMN_METADATA) + + def test_refresh_schema_deleted_table_marked(self): + refresh_schema(self.factory.data_source.id) + table_metadata = TableMetadata.query.all() + column_metadata = ColumnMetadata.query.all() + + self.assertEqual(len(table_metadata), 1) + self.assertEqual(len(column_metadata), 1) + self.assertTrue(table_metadata[0].to_dict()['exists']) + + # Table is gone, `exists` should be False. + self.patched_get_schema.return_value = [] + + refresh_schema(self.factory.data_source.id) + table_metadata = TableMetadata.query.all() + column_metadata = ColumnMetadata.query.all() + + self.assertEqual(len(table_metadata), 1) + self.assertEqual(len(column_metadata), 1) + self.assertFalse(table_metadata[0].to_dict()['exists']) + + # Table is back, `exists` should be True again. + self.patched_get_schema.return_value = self.default_schema_return_value + refresh_schema(self.factory.data_source.id) + table_metadata = TableMetadata.query.all() + self.assertTrue(table_metadata[0].to_dict()['exists']) + + def test_refresh_schema_table_with_new_metadata_updated(self): + refresh_schema(self.factory.data_source.id) + table_metadata = TableMetadata.query.all() + column_metadata = ColumnMetadata.query.all() + + self.assertEqual(len(table_metadata), 1) + self.assertEqual(len(column_metadata), 1) + self.assertTrue(table_metadata[0].to_dict()['column_metadata']) + + # Table has no metdata field, `column_metadata` should be False. + self.patched_get_schema.return_value = [{ + 'name': 'table', + 'columns': [self.COLUMN_NAME], + }] + + refresh_schema(self.factory.data_source.id) + table_metadata = TableMetadata.query.all() + column_metadata = ColumnMetadata.query.all() + + self.assertEqual(len(table_metadata), 1) + self.assertEqual(len(column_metadata), 1) + self.assertFalse(table_metadata[0].to_dict()['column_metadata']) + + # Table metadata field is back, `column_metadata` should be True again. + self.patched_get_schema.return_value = self.default_schema_return_value + refresh_schema(self.factory.data_source.id) + table_metadata = TableMetadata.query.all() + self.assertTrue(table_metadata[0].to_dict()['column_metadata']) + + def test_refresh_schema_delete_column(self): + NEW_COLUMN_NAME = 'new_column' + refresh_schema(self.factory.data_source.id) + column_metadata = ColumnMetadata.query.all() + + self.assertTrue(column_metadata[0].to_dict()['exists']) + + self.patched_get_schema.return_value = [{ + 'name': 'table', + 'columns': [NEW_COLUMN_NAME], + 'metadata': [{ + 'name': NEW_COLUMN_NAME, + 'type': self.COLUMN_TYPE, + }] + }] + + refresh_schema(self.factory.data_source.id) + column_metadata = ColumnMetadata.query.all() + self.assertEqual(len(column_metadata), 2) + + self.assertFalse(column_metadata[1].to_dict()['exists']) + self.assertTrue(column_metadata[0].to_dict()['exists']) + + def test_refresh_schema_update_column(self): + UPDATED_COLUMN_TYPE = 'varchar' + + refresh_schema(self.factory.data_source.id) + update_sample( + self.factory.data_source.id, + 'table', + 1 + ) + column_metadata = ColumnMetadata.query.all() + self.assertEqual(column_metadata[0].to_dict(), self.EXPECTED_COLUMN_METADATA) + + updated_schema = copy.deepcopy(self.default_schema_return_value) + updated_schema[0]['metadata'][0]['type'] = UPDATED_COLUMN_TYPE + self.patched_get_schema.return_value = updated_schema + + refresh_schema(self.factory.data_source.id) + column_metadata = ColumnMetadata.query.all() + self.assertNotEqual(column_metadata[0].to_dict(), self.EXPECTED_COLUMN_METADATA) + self.assertEqual(column_metadata[0].to_dict()['type'], UPDATED_COLUMN_TYPE) + + def test_refresh_samples_rate_limits(self): + NEW_COLUMN_NAME = 'new_column' + NUM_TABLES = 105 + tables = [] + + for i in range(NUM_TABLES): + tables.append({ + 'name': 'table{}'.format(i), + 'columns': [NEW_COLUMN_NAME], + 'metadata': [{ + 'name': NEW_COLUMN_NAME, + 'type': self.COLUMN_TYPE, + }] + }) + + self.patched_get_schema.return_value = tables + self.factory.data_source.query_runner.configuration['samples'] = True + + refresh_schema(self.factory.data_source.id) + refresh_samples(self.factory.data_source.id, 50) + + # There's a total of 105 tables + table_metadata = TableMetadata.query.count() + self.assertEqual(table_metadata, NUM_TABLES) + + # 50 tables are processed on the first call + table_metadata = TableMetadata.query.filter( + TableMetadata.sample_updated_at.is_(None) + ).all() + self.assertEqual(len(table_metadata), 55) + + # 50 more tables are processed on the second call + refresh_samples(self.factory.data_source.id, 50) + table_metadata = TableMetadata.query.filter( + TableMetadata.sample_updated_at.is_(None) + ).all() + self.assertEqual(len(table_metadata), 5) + + # All tables are processed by the third call + refresh_samples(self.factory.data_source.id, 50) + table_metadata = TableMetadata.query.filter( + TableMetadata.sample_updated_at.is_(None) + ).all() + self.assertEqual(len(table_metadata), 0) + + def test_refresh_samples_refreshes(self): + NEW_COLUMN_NAME = 'new_column' + NUM_TABLES = 5 + TIME_BEFORE_UPDATE = utils.utcnow() + tables = [] + + for i in range(NUM_TABLES): + tables.append({ + 'name': 'table{}'.format(i), + 'columns': [NEW_COLUMN_NAME], + 'metadata': [{ + 'name': NEW_COLUMN_NAME, + 'type': self.COLUMN_TYPE, + }] + }) + + self.patched_get_schema.return_value = tables + self.factory.data_source.query_runner.configuration['samples'] = True + + refresh_schema(self.factory.data_source.id) + refresh_samples(self.factory.data_source.id, 50) + + # There's a total of 5 processed tables + table_metadata = TableMetadata.query.filter( + TableMetadata.sample_updated_at.isnot(None) + ) + self.assertEqual(table_metadata.count(), NUM_TABLES) + self.assertTrue(table_metadata.first().sample_updated_at > TIME_BEFORE_UPDATE) + + table_metadata.update({ + 'sample_updated_at': utils.utcnow() - datetime.timedelta(days=30) + }) + models.db.session.commit() + + TIME_BEFORE_UPDATE = utils.utcnow() + refresh_samples(self.factory.data_source.id, 50) + table_metadata_list = TableMetadata.query.filter( + TableMetadata.sample_updated_at.isnot(None) + ) + self.assertTrue(table_metadata_list.first().sample_updated_at > TIME_BEFORE_UPDATE) + + def test_refresh_schema_doesnt_overwrite_samples(self): + self.factory.data_source.query_runner.configuration['samples'] = True + + refresh_schema(self.factory.data_source.id) + column_metadata = ColumnMetadata.query.first() + self.assertEqual(column_metadata.example, None) + + update_sample( + self.factory.data_source.id, + 'table', + 1 + ) + column_metadata = ColumnMetadata.query.first() + self.assertEqual(column_metadata.example, self.COLUMN_EXAMPLE) + + # Check that a schema refresh doesn't overwrite examples + refresh_schema(self.factory.data_source.id) + column_metadata = ColumnMetadata.query.first() + self.assertEqual(column_metadata.example, self.COLUMN_EXAMPLE) + + def test_refresh_samples_applied_to_one_data_source(self): + ds1 = self.factory.create_data_source() + ds2 = self.factory.create_data_source() + + ds1.query_runner.configuration['samples'] = True + ds2.query_runner.configuration['samples'] = True + + refresh_schema(ds1.id) + refresh_schema(ds2.id) + refresh_samples(ds1.id, 50) + + table_metadata = TableMetadata.query.filter( + TableMetadata.sample_updated_at.isnot(None) + ) + self.assertEqual(table_metadata.count(), len(self.default_schema_return_value)) diff --git a/tests/test_cli.py b/tests/test_cli.py index 0546272cda..3c927ba5ae 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -3,10 +3,11 @@ from click.testing import CliRunner from tests import BaseTestCase +from redash import utils from redash.utils.configuration import ConfigurationContainer from redash.query_runner import query_runners from redash.cli import manager -from redash.models import DataSource, Group, Organization, User, db +from redash.models import TableMetadata, DataSource, Group, Organization, User, db class DataSourceCommandTests(BaseTestCase): @@ -16,7 +17,7 @@ def test_interactive_new(self): result = runner.invoke( manager, ['ds', 'new'], - input="test\n%s\n\n\n\n\nexample.com\n\n\ntestdb\n" % (pg_i,)) + input="test\n%s\ntestdb\n" % (pg_i,)) self.assertFalse(result.exception) self.assertEqual(result.exit_code, 0) self.assertEqual(DataSource.query.count(), 1) @@ -139,6 +140,16 @@ def test_connection_bad_delete(self): self.assertIn("Couldn't find", result.output) self.assertEqual(DataSource.query.count(), 1) + def test_refresh_samples(self): + ds = self.factory.create_data_source( + name='test1', type='sqlite', + options=ConfigurationContainer({"dbpath": "/tmp/test.db"})) + runner = CliRunner() + result = runner.invoke(manager, ['ds', 'refresh_data_samples', 'test1']) + self.assertFalse(result.exception) + self.assertEqual(result.exit_code, 0) + self.assertIn('Refreshing', result.output) + def test_options_edit(self): self.factory.create_data_source( name='test1', type='sqlite', diff --git a/tests/test_models.py b/tests/test_models.py index 357cc245e5..cc6490563c 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -242,7 +242,10 @@ def test_failure_extends_schedule(self): Execution failures recorded for a query result in exponential backoff for scheduling future execution. """ - query = self.factory.create_query(schedule={'interval':'60', 'until':None, 'time': None, 'day_of_week':None}, schedule_failures=4) + + query = self.factory.create_query(schedule={'interval':'60', 'until':None, 'time': None, 'day_of_week':None}) + # can't be set in create_query due to gen_query_hash + query.schedule_failures = 4 retrieved_at = utcnow() - datetime.timedelta(minutes=16) query_result = self.factory.create_query_result( retrieved_at=retrieved_at, query_text=query.query_text,