- {column}
+
+ {column.name}
handleSelect(e, column)}
+ onClick={e => handleSelect(e, column.name)}
/>
))}
@@ -81,6 +97,14 @@ SchemaItem.defaultProps = {
onSelect: () => {},
};
+function itemExists(item) {
+ if ("visible" in item) {
+ return item.visible;
+ } else {
+ return false;
+ }
+};
+
function applyFilter(schema, filterString, showHidden, toggleString) {
const filters = filter(filterString.toLowerCase().split(/\s+/), s => s.length > 0);
@@ -97,6 +121,9 @@ function applyFilter(schema, filterString, showHidden, toggleString) {
}
}
+ // Filter out all columns set to invisible
+ schema = filter(schema, itemExists);
+
// Empty string: return original schema
if (filters.length === 0) {
return schema;
@@ -110,7 +137,7 @@ function applyFilter(schema, filterString, showHidden, toggleString) {
schema,
item =>
includes(item.name.toLowerCase(), nameFilter) ||
- some(item.columns, column => includes(column.toLowerCase(), columnFilter))
+ some(item.columns, column => includes(column.name.toLowerCase(), columnFilter))
);
}
@@ -120,7 +147,7 @@ function applyFilter(schema, filterString, showHidden, toggleString) {
return filter(
map(schema, item => {
if (includes(item.name.toLowerCase(), nameFilter)) {
- item = { ...item, columns: filter(item.columns, column => includes(column.toLowerCase(), columnFilter)) };
+ item = { ...item, columns: filter(item.columns, column => includes(column.name.toLowerCase(), columnFilter)) };
return item.columns.length > 0 ? item : null;
}
})
@@ -155,11 +182,16 @@ export default function SchemaBrowser({ schema, dataSourceId, onRefresh, onItemS
const [handleToggleChange] = useDebouncedCallback(setShowHidden, 100);
const [listRef, setListRef] = useState(null);
+ const [showSchemaInfo, setShowSchemaInfo] = useState(false);
+ const [tableName, setTableName] = useState("");
+ const [tableDescription, setTableDescription] = useState("");
+ const [tableMetadata, setTableMetadata] = useState([]);
+ const [sampleQueries, setSampleQueries] = useState([]);
+
useEffect(() => {
setExpandedFlags({});
}, [schema]);
-
useEffect(() => {
if (listRef) {
listRef.recomputeRowHeights();
@@ -177,6 +209,18 @@ export default function SchemaBrowser({ schema, dataSourceId, onRefresh, onItemS
});
}
+ function openSchemaInfo(table) {
+ setTableName(table.name);
+ setTableDescription(table.description);
+ setTableMetadata(table.columns);
+ setSampleQueries(Object.values(table.sample_queries));
+ setShowSchemaInfo(true);
+ }
+
+ function closeSchemaInfo() {
+ setShowSchemaInfo(false);
+ };
+
return (
@@ -224,6 +268,7 @@ export default function SchemaBrowser({ schema, dataSourceId, onRefresh, onItemS
expanded={expandedFlags[item.name]}
onToggle={() => toggleTable(item.name)}
onSelect={onItemSelect}
+ onShowSchema={openSchemaInfo}
/>
);
}}
@@ -231,6 +276,14 @@ export default function SchemaBrowser({ schema, dataSourceId, onRefresh, onItemS
)}
+
);
}
diff --git a/client/app/services/data-source.js b/client/app/services/data-source.js
index c7de5d086d..827daa9f05 100644
--- a/client/app/services/data-source.js
+++ b/client/app/services/data-source.js
@@ -7,11 +7,13 @@ export const IMG_ROOT = "/static/images/db-logos";
const DataSource = {
query: () => axios.get("api/data_sources"),
get: ({ id }) => axios.get(`api/data_sources/${id}`),
+ post: ({ id }) => axios.post(`api/data_sources/${id}`),
types: () => axios.get("api/data_sources/types"),
create: data => axios.post(`api/data_sources`, data),
save: data => axios.post(`api/data_sources/${data.id}`, data),
test: data => axios.post(`api/data_sources/${data.id}/test`),
delete: ({ id }) => axios.delete(`api/data_sources/${id}`),
+ updateSchema: ({ id, data }) => axios.post(`api/data_sources/${id}/schema`, data),
fetchSchema: (data, refresh = false) => {
const params = {};
diff --git a/migrations/versions/118aa16f565b_.py b/migrations/versions/118aa16f565b_.py
new file mode 100644
index 0000000000..6265d6eddb
--- /dev/null
+++ b/migrations/versions/118aa16f565b_.py
@@ -0,0 +1,38 @@
+"""empty message
+
+Revision ID: 118aa16f565b
+Revises: cf135a57332e
+Create Date: 2019-02-05 20:16:52.182780
+
+"""
+from alembic import op
+import sqlalchemy as sa
+
+
+# revision identifiers, used by Alembic.
+revision = "118aa16f565b"
+down_revision = "cf135a57332e"
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+ op.create_table(
+ "tablemetadata_queries_link",
+ sa.Column("table_id", sa.Integer(), nullable=False),
+ sa.Column("query_id", sa.Integer(), nullable=False),
+ sa.ForeignKeyConstraint(["query_id"], ["queries.id"], ondelete="CASCADE"),
+ sa.ForeignKeyConstraint(
+ ["table_id"], ["table_metadata.id"], ondelete="CASCADE"
+ ),
+ sa.PrimaryKeyConstraint("table_id", "query_id"),
+ )
+ op.drop_column(u"table_metadata", "sample_query")
+
+
+def downgrade():
+ op.add_column(
+ u"table_metadata",
+ sa.Column("sample_query", sa.TEXT(), autoincrement=False, nullable=True),
+ )
+ op.drop_table("tablemetadata_queries_link")
diff --git a/migrations/versions/151a4c333e96_.py b/migrations/versions/151a4c333e96_.py
new file mode 100644
index 0000000000..57e42527b1
--- /dev/null
+++ b/migrations/versions/151a4c333e96_.py
@@ -0,0 +1,24 @@
+"""empty message
+
+Revision ID: 151a4c333e96
+Revises: 2ba47e9812b1, e5c7a4e2df4d
+Create Date: 2019-03-26 19:32:29.052222
+
+"""
+from alembic import op
+import sqlalchemy as sa
+
+
+# revision identifiers, used by Alembic.
+revision = "151a4c333e96"
+down_revision = ("2ba47e9812b1", "e5c7a4e2df4d")
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+ pass
+
+
+def downgrade():
+ pass
diff --git a/migrations/versions/171aaafb2d52_add_more_db_indexes.py b/migrations/versions/171aaafb2d52_add_more_db_indexes.py
new file mode 100644
index 0000000000..fa95c8db15
--- /dev/null
+++ b/migrations/versions/171aaafb2d52_add_more_db_indexes.py
@@ -0,0 +1,101 @@
+"""Add more db indexes.
+
+Revision ID: 171aaafb2d52
+Revises: ba150362b02e
+Create Date: 2019-12-02 11:48:52.611441
+
+"""
+from alembic import op
+
+
+# revision identifiers, used by Alembic.
+revision = "171aaafb2d52"
+down_revision = "ba150362b02e"
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+ op.create_index(
+ op.f("ix_column_metadata_exists"), "column_metadata", ["exists"], unique=False
+ )
+ op.create_index(
+ op.f("ix_column_metadata_name"), "column_metadata", ["name"], unique=False
+ )
+ op.create_index(
+ op.f("ix_column_metadata_table_id"),
+ "column_metadata",
+ ["table_id"],
+ unique=False,
+ )
+ op.create_index(
+ "ix_column_metadata_table_id_exists",
+ "column_metadata",
+ ["table_id", "exists"],
+ unique=False,
+ )
+ op.create_index(
+ "ix_column_metadata_table_id_name_exists",
+ "column_metadata",
+ ["table_id", "exists", "name"],
+ unique=False,
+ )
+ op.create_index(
+ "ix_column_metadata_table_id_pkey",
+ "column_metadata",
+ ["table_id", "id"],
+ unique=False,
+ )
+ op.create_index(
+ op.f("ix_table_metadata_data_source_id"),
+ "table_metadata",
+ ["data_source_id"],
+ unique=False,
+ )
+ op.create_index(
+ "ix_table_metadata_data_source_id_exists",
+ "table_metadata",
+ ["data_source_id", "exists"],
+ unique=False,
+ )
+ op.create_index(
+ "ix_table_metadata_data_source_id_name_exists",
+ "table_metadata",
+ ["data_source_id", "exists", "name"],
+ unique=False,
+ )
+ op.create_index(
+ op.f("ix_table_metadata_exists"), "table_metadata", ["exists"], unique=False
+ )
+ op.create_index(
+ op.f("ix_table_metadata_name"), "table_metadata", ["name"], unique=False
+ )
+ op.create_index(
+ op.f("ix_table_metadata_sample_updated_at"),
+ "table_metadata",
+ ["sample_updated_at"],
+ unique=False,
+ )
+
+
+def downgrade():
+ op.drop_index(
+ op.f("ix_table_metadata_sample_updated_at"), table_name="table_metadata"
+ )
+ op.drop_index(op.f("ix_table_metadata_name"), table_name="table_metadata")
+ op.drop_index(op.f("ix_table_metadata_exists"), table_name="table_metadata")
+ op.drop_index(
+ "ix_table_metadata_data_source_id_name_exists", table_name="table_metadata"
+ )
+ op.drop_index(
+ "ix_table_metadata_data_source_id_exists", table_name="table_metadata"
+ )
+ op.drop_index(op.f("ix_table_metadata_data_source_id"), table_name="table_metadata")
+ op.drop_index("ix_column_metadata_table_id_pkey", table_name="column_metadata")
+ op.drop_index(
+ "ix_column_metadata_table_id_name_exists", table_name="column_metadata"
+ )
+ op.drop_index("ix_column_metadata_table_id_exists", table_name="column_metadata")
+ op.drop_index(op.f("ix_column_metadata_table_id"), table_name="column_metadata")
+ op.drop_index(op.f("ix_column_metadata_name"), table_name="column_metadata")
+ op.drop_index(op.f("ix_column_metadata_exists"), table_name="column_metadata")
diff --git a/migrations/versions/280daa582976_.py b/migrations/versions/280daa582976_.py
new file mode 100644
index 0000000000..983efbc862
--- /dev/null
+++ b/migrations/versions/280daa582976_.py
@@ -0,0 +1,59 @@
+"""Add column metadata and table metadata
+
+Revision ID: 280daa582976
+Revises: 151a4c333e96
+Create Date: 2019-01-24 18:23:53.040608
+
+"""
+from alembic import op
+import sqlalchemy as sa
+
+
+# revision identifiers, used by Alembic.
+revision = "280daa582976"
+down_revision = "151a4c333e96"
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+ op.create_table(
+ "table_metadata",
+ sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False),
+ sa.Column("created_at", sa.DateTime(timezone=True), nullable=False),
+ sa.Column("id", sa.Integer(), nullable=False),
+ sa.Column("org_id", sa.Integer(), nullable=False),
+ sa.Column("data_source_id", sa.Integer(), nullable=False),
+ sa.Column("exists", sa.Boolean(), nullable=False),
+ sa.Column("name", sa.String(length=255), nullable=False),
+ sa.Column("description", sa.String(length=4096), nullable=True),
+ sa.Column("column_metadata", sa.Boolean(), nullable=False),
+ sa.Column("sample_query", sa.Text(), nullable=True),
+ sa.ForeignKeyConstraint(
+ ["data_source_id"], ["data_sources.id"], ondelete="CASCADE"
+ ),
+ sa.ForeignKeyConstraint(["org_id"], ["organizations.id"]),
+ sa.PrimaryKeyConstraint("id"),
+ )
+ op.create_table(
+ "column_metadata",
+ sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False),
+ sa.Column("created_at", sa.DateTime(timezone=True), nullable=False),
+ sa.Column("id", sa.Integer(), nullable=False),
+ sa.Column("org_id", sa.Integer(), nullable=False),
+ sa.Column("table_id", sa.Integer(), nullable=False),
+ sa.Column("name", sa.String(length=255), nullable=False),
+ sa.Column("type", sa.String(length=255), nullable=True),
+ sa.Column("example", sa.String(length=4096), nullable=True),
+ sa.Column("exists", sa.Boolean(), nullable=False),
+ sa.ForeignKeyConstraint(
+ ["table_id"], ["table_metadata.id"], ondelete="CASCADE"
+ ),
+ sa.ForeignKeyConstraint(["org_id"], ["organizations.id"]),
+ sa.PrimaryKeyConstraint("id"),
+ )
+
+
+def downgrade():
+ op.drop_table("column_metadata")
+ op.drop_table("table_metadata")
diff --git a/migrations/versions/6adb92e75691_.py b/migrations/versions/6adb92e75691_.py
new file mode 100644
index 0000000000..c2825e4867
--- /dev/null
+++ b/migrations/versions/6adb92e75691_.py
@@ -0,0 +1,27 @@
+"""Add sample_updated_at column to table_metadata
+
+Revision ID: 6adb92e75691
+Revises: 280daa582976
+Create Date: 2019-04-10 20:13:13.714589
+
+"""
+from alembic import op
+import sqlalchemy as sa
+
+
+# revision identifiers, used by Alembic.
+revision = "6adb92e75691"
+down_revision = "280daa582976"
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+ op.add_column(
+ "table_metadata",
+ sa.Column("sample_updated_at", sa.DateTime(timezone=True), nullable=True),
+ )
+
+
+def downgrade():
+ op.drop_column("table_metadata", "sample_updated_at")
diff --git a/migrations/versions/ba150362b02e_.py b/migrations/versions/ba150362b02e_.py
new file mode 100644
index 0000000000..636415557a
--- /dev/null
+++ b/migrations/versions/ba150362b02e_.py
@@ -0,0 +1,26 @@
+"""Add description field for data_sources
+
+Revision ID: ba150362b02e
+Revises: 118aa16f565b
+Create Date: 2019-02-05 21:21:08.069390
+
+"""
+from alembic import op
+import sqlalchemy as sa
+
+
+# revision identifiers, used by Alembic.
+revision = "ba150362b02e"
+down_revision = "118aa16f565b"
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+ op.add_column(
+ "data_sources", sa.Column("description", sa.String(length=4096), nullable=True)
+ )
+
+
+def downgrade():
+ op.drop_column("data_sources", "description")
diff --git a/migrations/versions/cf135a57332e_.py b/migrations/versions/cf135a57332e_.py
new file mode 100644
index 0000000000..01694219e5
--- /dev/null
+++ b/migrations/versions/cf135a57332e_.py
@@ -0,0 +1,32 @@
+"""Add column description and table visibility fields
+
+Revision ID: cf135a57332e
+Revises: 6adb92e75691
+Create Date: 2019-02-05 19:52:48.233070
+
+"""
+from alembic import op
+import sqlalchemy as sa
+
+
+# revision identifiers, used by Alembic.
+revision = "cf135a57332e"
+down_revision = "6adb92e75691"
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+ op.add_column(
+ "column_metadata",
+ sa.Column("description", sa.String(length=4096), nullable=True),
+ )
+ op.add_column(
+ "table_metadata",
+ sa.Column("visible", sa.Boolean(), nullable=False, server_default="True"),
+ )
+
+
+def downgrade():
+ op.drop_column("table_metadata", "visible")
+ op.drop_column("column_metadata", "description")
diff --git a/migrations/versions/eb2f788f997e_.py b/migrations/versions/eb2f788f997e_.py
new file mode 100644
index 0000000000..f1e420906c
--- /dev/null
+++ b/migrations/versions/eb2f788f997e_.py
@@ -0,0 +1,23 @@
+"""empty message
+
+Revision ID: eb2f788f997e
+Revises: d1eae8b9893e
+Create Date: 2017-03-02 12:20:00.029066
+"""
+from alembic import op
+import sqlalchemy as sa
+
+
+# revision identifiers, used by Alembic.
+revision = "eb2f788f997e"
+down_revision = "d1eae8b9893e"
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+ pass
+
+
+def downgrade():
+ pass
diff --git a/redash/cli/data_sources.py b/redash/cli/data_sources.py
index 4a94bfd12d..fe15075395 100644
--- a/redash/cli/data_sources.py
+++ b/redash/cli/data_sources.py
@@ -4,7 +4,7 @@
from flask.cli import AppGroup
from sqlalchemy.orm.exc import NoResultFound
-from redash import models
+from redash import models, tasks
from redash.query_runner import (
get_configuration_schema_for_query_runner_type,
query_runners,
@@ -202,6 +202,38 @@ def update_attr(obj, attr, new_value):
setattr(obj, attr, new_value)
+@manager.command()
+@click.argument("name")
+@click.option(
+ "--org",
+ "organization",
+ default="default",
+ help="The organization the user belongs to (leave blank for " "'default').",
+)
+@click.option(
+ "--count",
+ "num_tables",
+ default=50,
+ help="number of tables to process data samples for",
+)
+def refresh_samples(name, num_tables=50, organization="default"):
+ """Refresh table samples by data source name."""
+ try:
+ org = models.Organization.get_by_slug(organization)
+ data_source = models.DataSource.query.filter(
+ models.DataSource.name == name, models.DataSource.org == org
+ ).one()
+ print(
+ "Refreshing samples for data source: {} (id={})".format(
+ name, data_source.id
+ )
+ )
+ tasks.refresh_samples(data_source.id, num_tables)
+ except NoResultFound:
+ print("Couldn't find data source named: {}".format(name))
+ exit(1)
+
+
@manager.command()
@click.argument("name")
@click.option("--name", "new_name", default=None, help="new name for the data source")
diff --git a/redash/handlers/api.py b/redash/handlers/api.py
index e26ffd329f..be29664f12 100644
--- a/redash/handlers/api.py
+++ b/redash/handlers/api.py
@@ -153,7 +153,9 @@ def json_representation(data, code, headers=None):
api.add_org_resource(
DataSourceResource, "/api/data_sources/
", endpoint="data_source"
)
-api.add_resource(DataSourceToggleStringResource, "/api/data_sources//toggle_string")
+api.add_resource(
+ DataSourceToggleStringResource, "/api/data_sources//toggle_string"
+)
api.add_org_resource(GroupListResource, "/api/groups", endpoint="groups")
diff --git a/redash/handlers/data_sources.py b/redash/handlers/data_sources.py
index f41202f54d..bba47582db 100644
--- a/redash/handlers/data_sources.py
+++ b/redash/handlers/data_sources.py
@@ -5,7 +5,7 @@
from funcy import project
from sqlalchemy.exc import IntegrityError
-from redash import models
+from redash import models, settings
from redash.handlers.base import BaseResource, get_object_or_404, require_fields
from redash.permissions import (
require_access,
@@ -18,6 +18,7 @@
query_runners,
NotSupported,
)
+from redash.tasks.queries import refresh_schema
from redash.utils import filter_none
from redash.utils.configuration import ConfigurationContainer, ValidationError
@@ -25,7 +26,9 @@
class DataSourceTypeListResource(BaseResource):
@require_admin
def get(self):
- return [q.to_dict() for q in sorted(query_runners.values(), key=lambda q: q.name())]
+ return [
+ q.to_dict() for q in sorted(query_runners.values(), key=lambda q: q.name())
+ ]
class DataSourceResource(BaseResource):
@@ -58,8 +61,12 @@ def post(self, data_source_id):
data_source.type = req["type"]
data_source.name = req["name"]
+ data_source.description = req["description"] if "description" in req else ""
models.db.session.add(data_source)
+ # Refresh the stored schemas when a data source is updated
+ refresh_schema.delay(data_source.id)
+
try:
models.db.session.commit()
except IntegrityError as e:
@@ -114,7 +121,7 @@ def get(self):
continue
try:
- d = ds.to_dict()
+ d = ds.to_dict(all=True)
d["view_only"] = all(
project(ds.groups, self.current_user.group_ids).values()
)
@@ -149,10 +156,18 @@ def post(self):
try:
datasource = models.DataSource.create_with_group(
- org=self.current_org, name=req["name"], type=req["type"], options=config
+ org=self.current_org,
+ name=req["name"],
+ type=req["type"],
+ description=req["description"] if "description" in req else "",
+ options=config,
)
models.db.session.commit()
+
+ # Refresh the stored schemas when a new data source is added to the list
+ refresh_schema.delay(datasource.id)
+
except IntegrityError as e:
models.db.session.rollback()
if req["name"] in str(e):
@@ -177,6 +192,16 @@ def post(self):
class DataSourceSchemaResource(BaseResource):
+ @require_admin
+ def post(self, data_source_id):
+ data_source = get_object_or_404(
+ models.DataSource.get_by_id_and_org, data_source_id, self.current_org
+ )
+ new_schema_data = request.get_json(force=True)
+ models.DataSource.save_schema(new_schema_data)
+ # Force update the schema cache to have all changes available right away
+ data_source.schema_cache.populate(forced=True)
+
def get(self, data_source_id):
data_source = get_object_or_404(
models.DataSource.get_by_id_and_org, data_source_id, self.current_org
@@ -272,8 +297,6 @@ def get(self, data_source_id):
)
require_access(data_source.groups, self.current_user, view_only)
try:
- return {
- "toggle_string": data_source.options.get("toggle_table_string", "")
- }
+ return {"toggle_string": data_source.options.get("toggle_table_string", "")}
except Exception:
abort(400)
diff --git a/redash/models/__init__.py b/redash/models/__init__.py
index b1a05fb680..99c2948793 100644
--- a/redash/models/__init__.py
+++ b/redash/models/__init__.py
@@ -9,7 +9,14 @@
from sqlalchemy.dialects import postgresql
from sqlalchemy.event import listens_for
from sqlalchemy.ext.hybrid import hybrid_property
-from sqlalchemy.orm import backref, contains_eager, joinedload, subqueryload, load_only
+from sqlalchemy.orm import (
+ backref,
+ contains_eager,
+ joinedload,
+ subqueryload,
+ load_only,
+ relationship,
+)
from sqlalchemy.orm.exc import NoResultFound # noqa: F401
from sqlalchemy import func
from sqlalchemy_utils import generic_relationship
@@ -80,6 +87,219 @@ def get(self, query_id):
scheduled_queries_executions = ScheduledQueriesExecutions()
+def cleanup_data_in_table(table_model):
+ ttl_days_ago = utils.utcnow() - datetime.timedelta(
+ days=settings.SCHEMA_METADATA_TTL_DAYS
+ )
+
+ table_model.query.filter(
+ table_model.exists.is_(False), table_model.updated_at < ttl_days_ago
+ ).delete()
+
+ db.session.commit()
+
+
+class SchemaCache:
+ """
+ This caches schema requests in redis and uses a method to
+ serve stale values while the cache is being populated or
+ updated to handle the thundering herd problem.
+ """
+
+ # SCHEMAS_REFRESH_SCHEDULE is in minutes, converting to seconds here:
+ timeout = settings.SCHEMAS_REFRESH_SCHEDULE * 60
+ # keeping the stale cached items for 10 minutes longer
+ # than its timeout to make sure repopulation can work
+ stale_cache_timeout = 60 * 10
+
+ def __init__(self, data_source):
+ self.data_source = data_source
+ self.client = redis_connection
+ self.cache_key = f"data_source:schema:cache:{self.data_source.id}"
+ self.lock_key = f"{self.cache_key}:lock"
+ self.fresh_key = f"{self.cache_key}:fresh"
+
+ def populate(self, schema=None, forced=False):
+ """
+ This is the central method to populate the cache and return
+ either the provided fallback schema or the value loaded
+ from the database.
+
+ It uses Redis locking to make sure the retrieval from the
+ database isn't run many times at once.
+
+ It also sets a separate key that indicates freshness that has
+ a shorter ttl than the actual cache key that contains the
+ schema.
+
+ In the get_schema method it'll check the freshness key first
+ and trigger a repopulation of the cache key if it's stale.
+ """
+ lock = redis_connection.lock(self.lock_key, timeout=self.timeout)
+ acquired = lock.acquire(blocking=False)
+
+ if acquired or forced:
+ try:
+ schema = TableMetadata.load(self.data_source)
+ except Exception:
+ raise
+ else:
+ key_timeout = self.timeout + self.stale_cache_timeout
+ pipeline = redis_connection.pipeline()
+ pipeline.set(self.cache_key, utils.json_dumps(schema), key_timeout)
+ pipeline.set(self.fresh_key, 1, self.timeout)
+ pipeline.execute()
+ finally:
+ if acquired:
+ lock.release()
+
+ return schema or []
+
+
+@generic_repr("id", "name", "data_source_id", "org_id", "exists", "column_metadata")
+class TableMetadata(TimestampMixin, db.Model):
+ id = Column(db.Integer, primary_key=True)
+ org_id = Column(db.Integer, db.ForeignKey("organizations.id"))
+ data_source_id = Column(
+ db.Integer, db.ForeignKey("data_sources.id", ondelete="CASCADE"), index=True
+ )
+ exists = Column(db.Boolean, default=True, index=True)
+ visible = Column(db.Boolean, default=True)
+ name = Column(db.String(255), index=True)
+ description = Column(db.String(4096), nullable=True)
+ column_metadata = Column(db.Boolean, default=False)
+ sample_updated_at = Column(db.DateTime(True), nullable=True, index=True)
+ sample_queries = relationship(
+ "Query", secondary="tablemetadata_queries_link", backref="relevant_tables"
+ )
+ existing_columns = db.relationship(
+ "ColumnMetadata",
+ backref="table",
+ order_by="ColumnMetadata.name",
+ primaryjoin="and_(TableMetadata.id == ColumnMetadata.table_id, ColumnMetadata.exists.is_(True))",
+ )
+
+ __tablename__ = "table_metadata"
+ __table_args__ = (
+ db.Index("ix_table_metadata_data_source_id_exists", "data_source_id", "exists"),
+ db.Index(
+ "ix_table_metadata_data_source_id_name_exists",
+ "data_source_id",
+ "exists",
+ "name",
+ ),
+ )
+
+ def __str__(self):
+ return str(self.name)
+
+ @classmethod
+ def store(cls, data_source, existing_tables_set, table_data):
+ """
+ Insert new or update all existing tables to reflect the provided data.
+ """
+ existing_tables = cls.query.filter(
+ cls.name.in_(existing_tables_set), cls.data_source_id == data_source.id,
+ )
+ table_names = set()
+ for table in existing_tables:
+ table_names.add(table.name)
+ for name, value in table_data[table.name].items():
+ setattr(table, name, value)
+ db.session.add(table)
+
+ # Find the tables that need to be created by subtracting the sets:
+ for table_name in existing_tables_set.difference(table_names):
+ db.session.add(cls(**table_data[table_name]))
+ db.session.commit()
+
+ @classmethod
+ def load(cls, data_source):
+ """
+ When called will fetch all table and column metadata from
+ the database and serialize it with the TableMetadataSerializer.
+ """
+ # due to the unfortunate import time side effects of
+ # Redash's package layout this needs to be done inline
+ from redash.serializers import TableMetadataSerializer
+
+ schema = []
+ tables = (
+ cls.query.filter(
+ cls.data_source_id == data_source.id, cls.exists.is_(True),
+ )
+ .order_by(cls.name)
+ .options(joinedload(cls.existing_columns), joinedload(cls.sample_queries),)
+ )
+
+ for table in tables:
+ schema.append(
+ TableMetadataSerializer(table, with_favorite_state=False).serialize()
+ )
+
+ return schema
+
+
+@generic_repr("id", "name", "type", "table_id", "org_id", "exists")
+class ColumnMetadata(TimestampMixin, db.Model):
+ id = Column(db.Integer, primary_key=True)
+ org_id = Column(db.Integer, db.ForeignKey("organizations.id"))
+ table_id = Column(
+ db.Integer, db.ForeignKey("table_metadata.id", ondelete="CASCADE"), index=True
+ )
+ name = Column(db.String(255), index=True)
+ type = Column(db.String(255), nullable=True)
+ example = Column(db.String(4096), nullable=True)
+ exists = Column(db.Boolean, default=True, index=True)
+ description = Column(db.String(4096), nullable=True)
+
+ __tablename__ = "column_metadata"
+ __table_args__ = (
+ db.Index("ix_column_metadata_table_id_pkey", "table_id", "id"),
+ db.Index("ix_column_metadata_table_id_exists", "table_id", "exists"),
+ db.Index(
+ "ix_column_metadata_table_id_name_exists", "table_id", "exists", "name"
+ ),
+ )
+
+ def __str__(self):
+ return str(self.name)
+
+ @classmethod
+ def store(cls, table, existing_columns_set, column_data):
+ existing_columns = cls.query.filter(
+ cls.name.in_(existing_columns_set), cls.table_id == table.id,
+ ).all()
+
+ column_names = set()
+ for column in existing_columns:
+ column_names.add(column.name)
+ for name, value in column_data[column.name].items():
+ setattr(column, name, value)
+ db.session.add(column)
+
+ # Find the columns that need to be created by subtracting the sets:
+ for column_name in existing_columns_set.difference(column_names):
+ db.session.add(cls(**column_data[column_name]))
+ db.session.commit()
+
+
+class TableMetadataQueriesLink(db.Model):
+ table_id = Column(
+ db.Integer,
+ db.ForeignKey("table_metadata.id", ondelete="CASCADE"),
+ primary_key=True,
+ )
+ query_id = Column(
+ db.Integer, db.ForeignKey("queries.id", ondelete="CASCADE"), primary_key=True
+ )
+
+ __tablename__ = "tablemetadata_queries_link"
+
+ def __str__(self):
+ return str(self.id)
+
+
@generic_repr("id", "name", "type", "org_id", "created_at")
class DataSource(BelongsToOrgMixin, db.Model):
id = Column(db.Integer, primary_key=True)
@@ -96,6 +316,7 @@ class DataSource(BelongsToOrgMixin, db.Model):
)
),
)
+ description = Column(db.String(4096), nullable=True)
queue_name = Column(db.String(255), default="queries")
scheduled_queue_name = Column(db.String(255), default="scheduled_queries")
created_at = Column(db.DateTime(True), default=db.func.now())
@@ -117,6 +338,7 @@ def to_dict(self, all=False, with_permissions_for=None):
"id": self.id,
"name": self.name,
"type": self.type,
+ "description": self.description,
"syntax": self.query_runner.syntax,
"paused": self.paused,
"pause_reason": self.pause_reason,
@@ -169,6 +391,35 @@ def all(cls, org, group_ids=None):
def get_by_id(cls, _id):
return cls.query.filter(cls.id == _id).one()
+ @classmethod
+ def save_schema(cls, schema_info):
+ # There was a change in column data.
+ if "columnId" in schema_info:
+ ColumnMetadata.query.filter(
+ ColumnMetadata.table_id == schema_info["tableId"],
+ ColumnMetadata.id == schema_info["columnId"],
+ ).update(schema_info["schema"])
+ db.session.commit()
+ return
+
+ sample_queries = schema_info["schema"].pop("sample_queries", None)
+ if sample_queries is not None:
+ table_metadata_object = TableMetadata.query.filter(
+ TableMetadata.id == schema_info["tableId"]
+ ).first()
+ table_metadata_object.sample_queries = []
+
+ query_ids = [sample_query["id"] for sample_query in sample_queries.values()]
+ query_objects = Query.query.filter(Query.id.in_(query_ids))
+ table_metadata_object.sample_queries.extend(query_objects)
+ db.session.add(table_metadata_object)
+ db.session.commit()
+
+ TableMetadata.query.filter(TableMetadata.id == schema_info["tableId"]).update(
+ schema_info["schema"]
+ )
+ db.session.commit()
+
def delete(self):
Query.query.filter(Query.data_source == self).update(
dict(data_source_id=None, latest_query_data_id=None)
@@ -176,43 +427,52 @@ def delete(self):
QueryResult.query.filter(QueryResult.data_source == self).delete()
res = db.session.delete(self)
db.session.commit()
-
- redis_connection.delete(self._schema_key)
-
return res
def get_schema(self, refresh=False):
- cache = None
- if not refresh:
- cache = redis_connection.get(self._schema_key)
-
- if cache is None:
- query_runner = self.query_runner
- schema = query_runner.get_schema(get_stats=refresh)
-
- try:
- out_schema = self._sort_schema(schema)
- except Exception:
- logging.exception(
- "Error sorting schema columns for data_source {}".format(self.id)
- )
- out_schema = schema
- finally:
- redis_connection.set(self._schema_key, json_dumps(out_schema))
+ """
+ Get or set the schema from Redis.
+
+ This will first check for the fresh key and either
+ return the schema value if it's still fresh or
+ repopulate the cache key and return the stale value.
+
+ This will refresh the schema from the data source's API
+ when requested with the refresh parameter, which will also
+ (re)populate the cache.
+ """
+ if refresh:
+ from redash.tasks.queries import refresh_schema
+
+ refresh_schema.delay(self.id)
+
+ # First let's try to find out if there is a cached schema
+ # already and hasn't timed out yet and load it with json.
+ schema = redis_connection.get(self.schema_cache.cache_key)
+ if schema:
+ schema = utils.json_loads(schema)
else:
- out_schema = json_loads(cache)
-
- return out_schema
-
- def _sort_schema(self, schema):
- return [
- {"name": i["name"], "columns": sorted(i["columns"])}
- for i in sorted(schema, key=lambda x: x["name"])
- ]
+ # Otherwise we assume the cache key has timed out or was
+ # never populated before.
+ schema = []
+
+ # Now check if there is a fresh key from the last time populating.
+ is_fresh = redis_connection.get(self.schema_cache.fresh_key)
+ if is_fresh:
+ # If the cache value is still fresh, just return it.
+ return schema
+ else:
+ # Otherwise pass the stale value to the populate method
+ # so it can use it as a fallback in case a population
+ # lock is in place already (e.g. another user has already
+ # tried to fetch the schema). If the lock can be created
+ # successfully, it'll actually load the schema using the
+ # load method and set the cache and refresh keys.
+ return self.schema_cache.populate(schema)
@property
- def _schema_key(self):
- return "data_source:schema:{}".format(self.id)
+ def schema_cache(self):
+ return SchemaCache(self)
@property
def _pause_key(self):
diff --git a/redash/query_runner/__init__.py b/redash/query_runner/__init__.py
index 8fbe0bd1a4..120c53a73e 100644
--- a/redash/query_runner/__init__.py
+++ b/redash/query_runner/__init__.py
@@ -53,6 +53,7 @@ class BaseQueryRunner(object):
deprecated = False
should_annotate_query = True
noop_query = None
+ sample_query = None
def __init__(self, configuration):
self.syntax = "sql"
@@ -121,6 +122,25 @@ def _run_query_internal(self, query):
raise Exception("Failed running query [%s]." % query)
return json_loads(results)["rows"]
+ def get_table_sample(self, table_name):
+ if self.sample_query is None:
+ raise NotImplementedError()
+
+ query = self.sample_query.format(table=table_name)
+
+ results, error = self.run_query(query, None)
+ if error is not None:
+ logger.exception(error)
+ raise NotSupported()
+
+ rows = json_loads(results).get("rows", [])
+ if len(rows) > 0:
+ sample = rows[0]
+ else:
+ sample = {}
+
+ return sample
+
@classmethod
def to_dict(cls):
return {
diff --git a/redash/query_runner/athena.py b/redash/query_runner/athena.py
index 292ebdac51..98f49dc637 100644
--- a/redash/query_runner/athena.py
+++ b/redash/query_runner/athena.py
@@ -50,6 +50,10 @@ def format(self, operation, parameters=None):
class Athena(BaseQueryRunner):
noop_query = "SELECT 1"
+ # This takes a 1% random sample from {table}, reducing
+ # the runtime and data scanned for the query
+ sample_query = "SELECT * FROM {table} TABLESAMPLE SYSTEM (1) LIMIT 1"
+
@classmethod
def name(cls):
return "Amazon Athena"
@@ -83,6 +87,7 @@ def configuration_schema(cls):
"default": "_v",
"info": "This string will be used to toggle visibility of tables in the schema browser when editing a query in order to remove non-useful tables from sight.",
},
+ "samples": {"type": "boolean", "title": "Show Data Samples"},
},
"required": ["region", "s3_staging_dir"],
"extra_options": ["glue"],
@@ -180,9 +185,20 @@ def __get_schema_from_glue(self):
columns["Name"]
for columns in table["StorageDescriptor"]["Columns"]
]
- schema[table_name] = {"name": table_name, "columns": column}
+ metadata = [
+ {"name": column_data["Name"], "type": column_data["Type"]}
+ for column_data in table["StorageDescriptor"]["Columns"]
+ ]
+ schema[table_name] = {
+ "name": table_name,
+ "columns": column,
+ "metadata": metadata,
+ }
for partition in table.get("PartitionKeys", []):
schema[table_name]["columns"].append(partition["Name"])
+ schema[table_name]["metadata"].append(
+ {"name": partition["Name"], "type": partition["Type"]}
+ )
return list(schema.values())
def get_schema(self, get_stats=False):
@@ -191,7 +207,7 @@ def get_schema(self, get_stats=False):
schema = {}
query = """
- SELECT table_schema, table_name, column_name
+ SELECT table_schema, table_name, column_name, data_type AS column_type
FROM information_schema.columns
WHERE table_schema NOT IN ('information_schema')
"""
@@ -204,8 +220,11 @@ def get_schema(self, get_stats=False):
for row in results["rows"]:
table_name = "{0}.{1}".format(row["table_schema"], row["table_name"])
if table_name not in schema:
- schema[table_name] = {"name": table_name, "columns": []}
+ schema[table_name] = {"name": table_name, "columns": [], "metadata": []}
schema[table_name]["columns"].append(row["column_name"])
+ schema[table_name]["metadata"].append(
+ {"name": row["column_name"], "type": row["column_type"],}
+ )
return list(schema.values())
diff --git a/redash/query_runner/big_query.py b/redash/query_runner/big_query.py
index 5607832639..69df3043eb 100644
--- a/redash/query_runner/big_query.py
+++ b/redash/query_runner/big_query.py
@@ -1,5 +1,6 @@
import datetime
import logging
+import operator
import sys
import time
from base64 import b64decode
@@ -86,6 +87,7 @@ def _get_query_results(jobs, project_id, location, job_id, start_index):
class BigQuery(BaseQueryRunner):
should_annotate_query = False
noop_query = "SELECT 1"
+ sample_query = "#standardSQL\n SELECT * FROM {table} LIMIT 1"
@classmethod
def enabled(cls):
@@ -123,6 +125,7 @@ def configuration_schema(cls):
"default": "_v",
"info": "This string will be used to toggle visibility of tables in the schema browser when editing a query in order to remove non-useful tables from sight.",
},
+ "samples": {"type": "boolean", "title": "Show Data Samples"},
},
"required": ["jsonKeyFile", "projectId"],
"order": [
@@ -250,22 +253,122 @@ def _get_query_result(self, jobs, query):
def _get_columns_schema(self, table_data):
columns = []
+ metadata = []
for column in table_data.get("schema", {}).get("fields", []):
- columns.extend(self._get_columns_schema_column(column))
+ metadatum = self._get_column_metadata(column)
+ metadata.extend(metadatum)
+ columns.extend(map(operator.itemgetter("name"), metadatum))
project_id = self._get_project_id()
table_name = table_data["id"].replace("%s:" % project_id, "")
- return {"name": table_name, "columns": columns}
+ return {"name": table_name, "columns": columns, "metadata": metadata}
- def _get_columns_schema_column(self, column):
- columns = []
+ def _get_column_metadata(self, column):
+ metadata = []
if column["type"] == "RECORD":
for field in column["fields"]:
- columns.append("{}.{}".format(column["name"], field["name"]))
+ field_name = u"{}.{}".format(column["name"], field["name"])
+ metadata.append({"name": field_name, "type": field["type"]})
else:
- columns.append(column["name"])
+ metadata.append({"name": column["name"], "type": column["type"]})
+ return metadata
+
+ def _columns_and_samples_to_dict(self, schema, samples):
+ samples_dict = {}
+ if not samples:
+ return samples_dict
+
+ # If a sample exists, its shape/length should be analogous to
+ # the schema provided (i.e their lengths should match up)
+ for i, column in enumerate(schema):
+ if column["type"] == "RECORD":
+ if column.get("mode", None) == "REPEATED":
+ # Repeated fields have multiple samples of the same format.
+ # We only need to show the first one as an example.
+ associated_sample = [] if len(samples[i]) == 0 else samples[i][0]
+ else:
+ associated_sample = samples[i] or []
+
+ for j, field in enumerate(column["fields"]):
+ field_name = u"{}.{}".format(column["name"], field["name"])
+ samples_dict[field_name] = None
+ if len(associated_sample) > 0:
+ samples_dict[field_name] = associated_sample[j]
+ else:
+ samples_dict[column["name"]] = samples[i]
+
+ return samples_dict
+
+ def _flatten_samples(self, samples):
+ samples_list = []
+ for field in samples:
+ value = field["v"]
+ if isinstance(value, dict):
+ samples_list.append(self._flatten_samples(value.get("f", [])))
+ elif isinstance(value, list):
+ samples_list.append(self._flatten_samples(value))
+ else:
+ samples_list.append(value)
+
+ return samples_list
+
+ def get_table_sample(self, table_name):
+ if not self.configuration.get("loadSchema", False):
+ return {}
+
+ service = self._get_bigquery_service()
+ project_id = self._get_project_id()
+
+ dataset_id, table_id = table_name.split(".", 1)
+
+ try:
+ # NOTE: the `sample_response` is limited by `maxResults` here.
+ # Without this limit, the response would be very large and require
+ # pagination using `nextPageToken`.
+ sample_response = (
+ service.tabledata()
+ .list(
+ projectId=project_id,
+ datasetId=dataset_id,
+ tableId=table_id,
+ fields="rows",
+ maxResults=1,
+ )
+ .execute()
+ )
+ schema_response = (
+ service.tables()
+ .get(
+ projectId=project_id,
+ datasetId=dataset_id,
+ tableId=table_id,
+ fields="schema,id",
+ )
+ .execute()
+ )
+ table_rows = sample_response.get("rows", [])
+
+ if len(table_rows) == 0:
+ samples = []
+ else:
+ samples = table_rows[0].get("f", [])
+
+ schema = schema_response.get("schema", {}).get("fields", [])
+ columns = self._get_columns_schema(schema_response).get("columns", [])
+
+ flattened_samples = self._flatten_samples(samples)
+ samples_dict = self._columns_and_samples_to_dict(schema, flattened_samples)
+ return samples_dict
+ except HttpError as http_error:
+ logger.exception(
+ "Error communicating with server for sample for table %s: %s",
+ table_name,
+ http_error,
+ )
- return columns
+ # If there is an error getting the sample using the API,
+ # try to do it by running a `select *` with a limit.
+ return super().get_table_sample(table_name)
def get_schema(self, get_stats=False):
if not self.configuration.get("loadSchema", False):
diff --git a/redash/query_runner/mysql.py b/redash/query_runner/mysql.py
index a5b73bef32..f1c03d05d3 100644
--- a/redash/query_runner/mysql.py
+++ b/redash/query_runner/mysql.py
@@ -51,6 +51,7 @@ def __init__(self):
class Mysql(BaseSQLQueryRunner):
noop_query = "SELECT 1"
+ sample_query = "SELECT * FROM {table} LIMIT 1"
@classmethod
def configuration_schema(cls):
@@ -72,6 +73,7 @@ def configuration_schema(cls):
"default": "_v",
"info": "This string will be used to toggle visibility of tables in the schema browser when editing a query in order to remove non-useful tables from sight.",
},
+ "samples": {"type": "boolean", "title": "Show Data Samples"},
},
"order": ["host", "port", "user", "passwd", "db"],
"required": ["db"],
@@ -132,7 +134,8 @@ def _get_tables(self, schema):
query = """
SELECT col.table_schema as table_schema,
col.table_name as table_name,
- col.column_name as column_name
+ col.column_name as column_name,
+ col.column_type as column_type
FROM `information_schema`.`columns` col
WHERE col.table_schema NOT IN ('information_schema', 'performance_schema', 'mysql', 'sys');
"""
@@ -151,9 +154,12 @@ def _get_tables(self, schema):
table_name = row["table_name"]
if table_name not in schema:
- schema[table_name] = {"name": table_name, "columns": []}
+ schema[table_name] = {"name": table_name, "columns": [], "metadata": []}
schema[table_name]["columns"].append(row["column_name"])
+ schema[table_name]["metadata"].append(
+ {"name": row["column_name"], "type": row["column_type"]}
+ )
return list(schema.values())
diff --git a/redash/query_runner/pg.py b/redash/query_runner/pg.py
index efbea9a626..6c2f3f0b81 100644
--- a/redash/query_runner/pg.py
+++ b/redash/query_runner/pg.py
@@ -102,9 +102,12 @@ def build_schema(query_result, schema):
table_name = row["table_name"]
if table_name not in schema:
- schema[table_name] = {"name": table_name, "columns": []}
+ schema[table_name] = {"name": table_name, "columns": [], "metadata": []}
schema[table_name]["columns"].append(row["column_name"])
+ schema[table_name]["metadata"].append(
+ {"name": row["column_name"], "type": row["column_type"]}
+ )
def _create_cert_file(configuration, key, ssl_config):
@@ -135,6 +138,7 @@ def _get_ssl_config(configuration):
class PostgreSQL(BaseSQLQueryRunner):
noop_query = "SELECT 1"
+ sample_query = "SELECT * FROM {table} LIMIT 1"
@classmethod
def configuration_schema(cls):
@@ -177,6 +181,7 @@ def configuration_schema(cls):
"default": "_v",
"info": "This string will be used to toggle visibility of tables in the schema browser when editing a query in order to remove non-useful tables from sight.",
},
+ "samples": {"type": "boolean", "title": "Show Data Samples"},
},
"order": ["host", "port", "user", "password"],
"required": ["dbname"],
@@ -216,7 +221,8 @@ def _get_tables(self, schema):
query = """
SELECT s.nspname as table_schema,
c.relname as table_name,
- a.attname as column_name
+ a.attname as column_name,
+ a.atttypid::regtype::varchar as column_type
FROM pg_class c
JOIN pg_namespace s
ON c.relnamespace = s.oid
@@ -225,13 +231,16 @@ def _get_tables(self, schema):
ON a.attrelid = c.oid
AND a.attnum > 0
AND NOT a.attisdropped
+ JOIN pg_type t
+ ON a.atttypid = t.oid
WHERE c.relkind IN ('m', 'f', 'p')
UNION
SELECT table_schema,
table_name,
- column_name
+ column_name,
+ data_type as column_type
FROM information_schema.columns
WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
"""
@@ -296,6 +305,7 @@ def run_query(self, query, user):
class Redshift(PostgreSQL):
+ sample_query = "SELECT * FROM {table} LIMIT 1"
@classmethod
def type(cls):
@@ -346,6 +356,13 @@ def configuration_schema(cls):
"title": "Query Group for Scheduled Queries",
"default": "default",
},
+ "toggle_table_string": {
+ "type": "string",
+ "title": "Toggle Table String",
+ "default": "_v",
+ "info": "This string will be used to toggle visibility of tables in the schema browser when editing a query in order to remove non-useful tables from sight.",
+ },
+ "samples": {"type": "boolean", "title": "Show Data Samples"},
},
"order": [
"host",
@@ -388,12 +405,13 @@ def _get_tables(self, schema):
SELECT DISTINCT table_name,
table_schema,
column_name,
+ data_type AS column_type,
ordinal_position AS pos
FROM svv_columns
WHERE table_schema NOT IN ('pg_internal','pg_catalog','information_schema')
AND table_schema NOT LIKE 'pg_temp_%'
)
- SELECT table_name, table_schema, column_name
+ SELECT table_name, table_schema, column_name, column_type
FROM tables
WHERE
HAS_SCHEMA_PRIVILEGE(table_schema, 'USAGE') AND
diff --git a/redash/query_runner/presto.py b/redash/query_runner/presto.py
index a7ffddc3f9..d16d0167b1 100644
--- a/redash/query_runner/presto.py
+++ b/redash/query_runner/presto.py
@@ -33,6 +33,7 @@
class Presto(BaseQueryRunner):
noop_query = "SHOW TABLES"
+ sample_query = "SELECT * FROM {table} TABLESAMPLE SYSTEM (1) LIMIT 1"
@classmethod
def configuration_schema(cls):
@@ -52,6 +53,7 @@ def configuration_schema(cls):
"default": "_v",
"info": "This string will be used to toggle visibility of tables in the schema browser when editing a query in order to remove non-useful tables from sight.",
},
+ "samples": {"type": "boolean", "title": "Show Data Samples"},
},
"order": [
"host",
@@ -76,7 +78,7 @@ def type(cls):
def get_schema(self, get_stats=False):
schema = {}
query = """
- SELECT table_schema, table_name, column_name
+ SELECT table_schema, table_name, column_name, data_type AS column_type
FROM information_schema.columns
WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
"""
@@ -92,9 +94,12 @@ def get_schema(self, get_stats=False):
table_name = "{}.{}".format(row["table_schema"], row["table_name"])
if table_name not in schema:
- schema[table_name] = {"name": table_name, "columns": []}
+ schema[table_name] = {"name": table_name, "columns": [], "metadata": []}
schema[table_name]["columns"].append(row["column_name"])
+ schema[table_name]["metadata"].append(
+ {"name": row["column_name"], "type": row["column_type"],}
+ )
return list(schema.values())
diff --git a/redash/serializers/__init__.py b/redash/serializers/__init__.py
index 93516814f6..3d0239d6a6 100644
--- a/redash/serializers/__init__.py
+++ b/redash/serializers/__init__.py
@@ -75,15 +75,13 @@ class Serializer(object):
class QuerySerializer(Serializer):
def __init__(self, object_or_list, **kwargs):
self.object_or_list = object_or_list
+ self.with_favorite_state = kwargs.pop("with_favorite_state", True)
self.options = kwargs
def serialize(self):
if isinstance(self.object_or_list, models.Query):
result = serialize_query(self.object_or_list, **self.options)
- if (
- self.options.get("with_favorite_state", True)
- and not current_user.is_api_user()
- ):
+ if self.with_favorite_state and not current_user.is_api_user():
result["is_favorite"] = models.Favorite.is_favorite(
current_user.id, self.object_or_list
)
@@ -91,7 +89,7 @@ def serialize(self):
result = [
serialize_query(query, **self.options) for query in self.object_or_list
]
- if self.options.get("with_favorite_state", True):
+ if self.with_favorite_state:
favorite_ids = models.Favorite.are_favorites(
current_user.id, self.object_or_list
)
@@ -101,6 +99,80 @@ def serialize(self):
return result
+class ColumnMetadataSerializer(Serializer):
+ def __init__(self, object_or_list):
+ self.object_or_list = object_or_list
+
+ def serialize(self):
+ if isinstance(self.object_or_list, models.ColumnMetadata):
+ result = serialize_column_metadata(self.object_or_list)
+ else:
+ result = [
+ serialize_column_metadata(column_metadata)
+ for column_metadata in self.object_or_list
+ ]
+ return result
+
+
+class TableMetadataSerializer(Serializer):
+ def __init__(self, object_or_list, **kwargs):
+ self.object_or_list = object_or_list
+ self.options = kwargs
+
+ def serialize(self):
+ if isinstance(self.object_or_list, models.TableMetadata):
+ result = serialize_table_metadata(self.object_or_list, self.options)
+ else:
+ result = [
+ serialize_table_metadata(column_metadata, self.options)
+ for column_metadata in self.object_or_list
+ ]
+ return result
+
+
+def serialize_table_metadata(table_metadata, options, include_columns=True):
+ sample_queries_dict = dict(
+ [
+ (v["id"], v)
+ for v in QuerySerializer(
+ table_metadata.sample_queries, **options
+ ).serialize()
+ ]
+ )
+ d = {
+ "id": table_metadata.id,
+ "org_id": table_metadata.org_id,
+ "data_source_id": table_metadata.data_source_id,
+ "exists": table_metadata.exists,
+ "visible": table_metadata.visible,
+ "name": table_metadata.name,
+ "description": table_metadata.description,
+ "column_metadata": table_metadata.column_metadata,
+ "sample_updated_at": table_metadata.sample_updated_at,
+ "sample_queries": sample_queries_dict,
+ }
+ if include_columns:
+ d["columns"] = [
+ ColumnMetadataSerializer(column).serialize()
+ for column in table_metadata.existing_columns
+ ]
+ return d
+
+
+def serialize_column_metadata(column_metadata):
+ d = {
+ "id": column_metadata.id,
+ "org_id": column_metadata.org_id,
+ "table_id": column_metadata.table_id,
+ "name": column_metadata.name,
+ "type": column_metadata.type,
+ "example": column_metadata.example,
+ "exists": column_metadata.exists,
+ "description": column_metadata.description,
+ }
+ return d
+
+
def serialize_query(
query,
with_stats=False,
diff --git a/redash/settings/__init__.py b/redash/settings/__init__.py
index 3765db4cef..307d1e3cd7 100644
--- a/redash/settings/__init__.py
+++ b/redash/settings/__init__.py
@@ -55,7 +55,11 @@
os.environ.get("REDASH_QUERY_RESULTS_CLEANUP_MAX_AGE", "7")
)
-SCHEMAS_REFRESH_SCHEDULE = int(os.environ.get("REDASH_SCHEMAS_REFRESH_SCHEDULE", 30))
+SCHEMAS_REFRESH_SCHEDULE = int(os.environ.get("REDASH_SCHEMAS_REFRESH_SCHEDULE", 360))
+SCHEMAS_REFRESH_QUEUE = os.environ.get("REDASH_SCHEMAS_REFRESH_QUEUE", "schemas")
+SCHEMA_REFRESH_TIME_LIMIT = int(
+ os.environ.get("REDASH_SCHEMA_REFRESH_TIME_LIMIT", 3600)
+)
AUTH_TYPE = os.environ.get("REDASH_AUTH_TYPE", "api_key")
INVITATION_TOKEN_MAX_AGE = int(
@@ -479,6 +483,22 @@ def email_server_is_configured():
os.environ.get("REDASH_SCHEMA_RUN_TABLE_SIZE_CALCULATIONS", "false")
)
+# Frequency of clearing out old schema metadata.
+SCHEMA_METADATA_TTL_DAYS = int(os.environ.get("REDASH_SCHEMA_METADATA_TTL_DAYS", 60))
+
+# Frequency of schema samples updates
+SCHEMA_SAMPLE_UPDATE_FREQUENCY_DAYS = int(
+ os.environ.get("REDASH_SCHEMA_SAMPLE_UPDATE_FREQUENCY_DAYS", 14)
+)
+SCHEMA_SAMPLE_UPDATE_TIMEOUT = int(
+ os.environ.get("REDASH_SCHEMA_SAMPLE_UPDATE_TIMEOUT", SCHEMA_REFRESH_TIME_LIMIT)
+)
+
+# Frequency of schema samples refresh when no samples are stored
+SCHEMA_SAMPLE_REFRESH_FREQUENCY_DAYS = int(
+ os.environ.get("REDASH_SCHEMA_SAMPLE_REFRESH_FREQUENCY_DAYS", 2)
+)
+
# kylin
KYLIN_OFFSET = int(os.environ.get("REDASH_KYLIN_OFFSET", 0))
KYLIN_LIMIT = int(os.environ.get("REDASH_KYLIN_LIMIT", 50000))
diff --git a/redash/tasks/__init__.py b/redash/tasks/__init__.py
index 98df1d12ff..ab1827d9ca 100644
--- a/redash/tasks/__init__.py
+++ b/redash/tasks/__init__.py
@@ -10,8 +10,12 @@
execute_query,
refresh_queries,
refresh_schemas,
+ refresh_schema,
cleanup_query_results,
empty_schedules,
+ cleanup_schema_metadata,
+ refresh_samples,
+ update_sample,
)
from .alerts import check_alerts_for_query
from .failure_report import send_aggregated_errors
diff --git a/redash/tasks/queries/__init__.py b/redash/tasks/queries/__init__.py
index 7c5d34fb0f..35d43b739a 100644
--- a/redash/tasks/queries/__init__.py
+++ b/redash/tasks/queries/__init__.py
@@ -1,7 +1,9 @@
from .maintenance import (
refresh_queries,
+ refresh_schema,
refresh_schemas,
cleanup_query_results,
empty_schedules,
)
from .execution import execute_query, enqueue_query
+from .samples import cleanup_schema_metadata, refresh_samples, update_sample
diff --git a/redash/tasks/queries/maintenance.py b/redash/tasks/queries/maintenance.py
index 6b944c4903..e73f1c7161 100644
--- a/redash/tasks/queries/maintenance.py
+++ b/redash/tasks/queries/maintenance.py
@@ -8,10 +8,12 @@
QueryDetachedFromDataSourceError,
)
from redash.tasks.failure_report import track_failure
+from redash.tasks.queries.samples import refresh_samples
from redash.utils import json_dumps, sentry
-from redash.worker import job, get_job_logger
+from redash.worker import get_job_logger, job
from .execution import enqueue_query
+from .samples import truncate_long_string
logger = get_job_logger(__name__)
@@ -129,36 +131,132 @@ def cleanup_query_results():
logger.info("Deleted %d unused query results.", deleted_count)
-@job("schemas")
-def refresh_schema(data_source_id):
+@job(settings.SCHEMAS_REFRESH_QUEUE, timeout=settings.SCHEMA_REFRESH_TIME_LIMIT)
+def refresh_schema(data_source_id, max_type_string_length=250):
ds = models.DataSource.get_by_id(data_source_id)
- logger.info(u"task=refresh_schema state=start ds_id=%s", ds.id)
+ logger.info("task=refresh_schema state=start ds_id=%s", ds.id)
+ lock_key = "data_source:schema:refresh:{}:lock".format(data_source_id)
+ lock = redis_connection.lock(lock_key, timeout=settings.SCHEMA_REFRESH_TIME_LIMIT)
+ acquired = lock.acquire(blocking=False)
start_time = time.time()
- try:
- ds.get_schema(refresh=True)
- logger.info(
- u"task=refresh_schema state=finished ds_id=%s runtime=%.2f",
- ds.id,
- time.time() - start_time,
- )
- statsd_client.incr("refresh_schema.success")
- except JobTimeoutException:
- logger.info(
- u"task=refresh_schema state=timeout ds_id=%s runtime=%.2f",
- ds.id,
- time.time() - start_time,
- )
- statsd_client.incr("refresh_schema.timeout")
- except Exception:
- logger.warning(
- u"Failed refreshing schema for the data source: %s", ds.name, exc_info=1
- )
- statsd_client.incr("refresh_schema.error")
- logger.info(
- u"task=refresh_schema state=failed ds_id=%s runtime=%.2f",
- ds.id,
- time.time() - start_time,
- )
+
+ if acquired:
+ logger.info("task=refresh_schema state=locked ds_id=%s", ds.id)
+ try:
+ # Stores data from the updated schema that tells us which
+ # columns and which tables currently exist
+ existing_tables_set = set()
+ existing_columns_set = set()
+
+ # Stores data that will be inserted into postgres
+ table_data = {}
+ column_data = {}
+
+ new_column_names = {}
+ new_column_metadata = {}
+
+ for table in ds.query_runner.get_schema(get_stats=True):
+ table_name = table["name"]
+ existing_tables_set.add(table_name)
+
+ table_data[table_name] = {
+ "org_id": ds.org_id,
+ "name": table_name,
+ "data_source_id": ds.id,
+ "column_metadata": "metadata" in table,
+ "exists": True,
+ }
+ new_column_names[table_name] = table["columns"]
+ new_column_metadata[table_name] = table.get("metadata", None)
+
+ models.TableMetadata.store(ds, existing_tables_set, table_data)
+
+ all_existing_persisted_tables = models.TableMetadata.query.filter(
+ models.TableMetadata.exists.is_(True),
+ models.TableMetadata.data_source_id == ds.id,
+ ).all()
+
+ for table in all_existing_persisted_tables:
+ for i, column in enumerate(new_column_names.get(table.name, [])):
+ existing_columns_set.add(column)
+ column_data[column] = {
+ "org_id": ds.org_id,
+ "table_id": table.id,
+ "name": column,
+ "type": None,
+ "exists": True,
+ }
+
+ if table.column_metadata:
+ column_type = new_column_metadata[table.name][i]["type"]
+ column_type = truncate_long_string(
+ column_type, max_type_string_length
+ )
+ column_data[column]["type"] = column_type
+
+ models.ColumnMetadata.store(table, existing_columns_set, column_data)
+
+ existing_columns_list = list(existing_columns_set)
+
+ # If a column did not exist, set the 'column_exists' flag to false.
+ models.ColumnMetadata.query.filter(
+ models.ColumnMetadata.exists.is_(True),
+ models.ColumnMetadata.table_id == table.id,
+ ~models.ColumnMetadata.name.in_(existing_columns_list),
+ ).update(
+ {"exists": False, "updated_at": models.db.func.now()},
+ synchronize_session="fetch",
+ )
+
+ # Clear the set for the next round
+ existing_columns_set.clear()
+
+ # If a table did not exist in the get_schema() response above,
+ # set the 'exists' flag to false.
+ existing_tables_list = list(existing_tables_set)
+ models.TableMetadata.query.filter(
+ models.TableMetadata.exists.is_(True),
+ models.TableMetadata.data_source_id == ds.id,
+ ~models.TableMetadata.name.in_(existing_tables_list),
+ ).update(
+ {"exists": False, "updated_at": models.db.func.now()},
+ synchronize_session="fetch",
+ )
+
+ models.db.session.commit()
+
+ logger.info("task=refresh_schema state=caching ds_id=%s", ds.id)
+ ds.schema_cache.populate(forced=True)
+ logger.info("task=refresh_schema state=cached ds_id=%s", ds.id)
+
+ logger.info(
+ "task=refresh_schema state=finished ds_id=%s runtime=%.2f",
+ ds.id,
+ time.time() - start_time,
+ )
+ statsd_client.incr("refresh_schema.success")
+ except JobTimeoutException:
+ logger.info(
+ "task=refresh_schema state=timeout ds_id=%s runtime=%.2f",
+ ds.id,
+ time.time() - start_time,
+ )
+ statsd_client.incr("refresh_schema.timeout")
+ except Exception:
+ logger.warning(
+ "Failed refreshing schema for the data source: %s", ds.name, exc_info=1
+ )
+ statsd_client.incr("refresh_schema.error")
+ logger.info(
+ "task=refresh_schema state=failed ds_id=%s runtime=%.2f",
+ ds.id,
+ time.time() - start_time,
+ )
+ finally:
+ lock.release()
+ logger.info("task=refresh_schema state=unlocked ds_id=%s", ds.id)
+ else:
+ logger.info("task=refresh_schema state=alreadylocked ds_id=%s", ds.id)
def refresh_schemas():
@@ -191,9 +289,9 @@ def refresh_schemas():
)
else:
refresh_schema.delay(ds.id)
+ refresh_samples.delay(ds.id, table_sample_limit=50)
logger.info(
u"task=refresh_schemas state=finish total_runtime=%.2f",
time.time() - global_start_time,
)
-
diff --git a/redash/tasks/queries/samples.py b/redash/tasks/queries/samples.py
new file mode 100644
index 0000000000..0990739d77
--- /dev/null
+++ b/redash/tasks/queries/samples.py
@@ -0,0 +1,128 @@
+import datetime
+import time
+
+from redash import models, settings, utils
+from redash.query_runner import NotSupported
+from redash.worker import get_job_logger, job
+from sqlalchemy import or_
+from sqlalchemy.orm import load_only
+
+logger = get_job_logger(__name__)
+
+
+def truncate_long_string(original_str, max_length):
+ # Remove null characters so we can save as string to postgres
+ new_str = original_str.replace("\x00", "")
+
+ if new_str and len(new_str) > max_length:
+ new_str = "{}...".format(new_str[:max_length])
+ return new_str
+
+
+@job(settings.SCHEMAS_REFRESH_QUEUE, timeout=settings.SCHEMA_SAMPLE_UPDATE_TIMEOUT)
+def update_sample(data_source_id, table_name, table_id, sample_updated_at):
+ """
+ For a given table, look up a sample row for it and update
+ the "example" fields for it in the column_metadata table.
+ """
+ logger.info(u"task=update_sample state=start table_name=%s", table_name)
+ start_time = time.time()
+ ds = models.DataSource.get_by_id(data_source_id)
+
+ persisted_columns = models.ColumnMetadata.query.filter(
+ models.ColumnMetadata.exists.is_(True),
+ models.ColumnMetadata.table_id == table_id,
+ ).options(load_only("id", "name", "example"))
+
+ update_threshold = utils.utcnow() - datetime.timedelta(
+ days=settings.SCHEMA_SAMPLE_UPDATE_FREQUENCY_DAYS
+ )
+
+ first_column = persisted_columns.first()
+
+ if (
+ first_column
+ and sample_updated_at
+ and first_column.example
+ and sample_updated_at > update_threshold
+ ):
+ # Look at the first example in the persisted columns.
+ # If this is *not* empty AND sample_updated_at is recent, don't update sample
+ logger.info(
+ u"task=update_sample state=abort - recent sample exists table_name=%s",
+ table_name,
+ )
+ return
+
+ sample = None
+ try:
+ sample = ds.query_runner.get_table_sample(table_name)
+ except NotSupported:
+ logger.info(u"Unable to fetch samples for {}".format(table_name))
+
+ if not sample:
+ return
+
+ # If a column exists, add a sample to it.
+ for persisted_column in persisted_columns.all():
+ column_example = sample.get(persisted_column.name, None)
+ column_example = (
+ column_example if isinstance(column_example, str) else str(column_example)
+ ) # noqa: F821
+ persisted_column.example = truncate_long_string(column_example, 4000)
+ models.db.session.add(persisted_column)
+
+ models.db.session.commit()
+ logger.info(
+ u"task=update_sample state=finished table_name=%s runtime=%.2f",
+ table_name,
+ time.time() - start_time,
+ )
+ return sample
+
+
+@job(settings.SCHEMAS_REFRESH_QUEUE, timeout=settings.SCHEMA_REFRESH_TIME_LIMIT)
+def refresh_samples(data_source_id, table_sample_limit):
+ """
+ For a given data source, refresh the data samples stored for each
+ table. This is done for tables with no samples or samples older
+ than DAYS_AGO
+ """
+ logger.info(u"task=refresh_samples state=start ds_id=%s", data_source_id)
+ ds = models.DataSource.get_by_id(data_source_id)
+
+ if not ds.query_runner.configuration.get("samples", False):
+ return
+
+ DAYS_AGO = utils.utcnow() - datetime.timedelta(
+ days=settings.SCHEMA_SAMPLE_REFRESH_FREQUENCY_DAYS
+ )
+
+ # Find all existing tables that have an empty or old sample_updated_at
+ tables_to_sample = (
+ models.TableMetadata.query.filter(
+ models.TableMetadata.exists.is_(True),
+ models.TableMetadata.data_source_id == data_source_id,
+ or_(
+ models.TableMetadata.sample_updated_at.is_(None),
+ models.TableMetadata.sample_updated_at < DAYS_AGO,
+ ),
+ )
+ .limit(table_sample_limit)
+ .all()
+ )
+
+ tasks = []
+ for table in tables_to_sample:
+ tasks.append((ds.id, table.name, table.id, table.sample_updated_at))
+ table.sample_updated_at = models.db.func.now()
+ models.db.session.add(table)
+ models.db.session.commit()
+
+ for task_args in tasks:
+ update_sample.delay(*task_args)
+
+
+def cleanup_schema_metadata():
+ models.cleanup_data_in_table(models.TableMetadata)
+ models.cleanup_data_in_table(models.ColumnMetadata)
diff --git a/redash/tasks/schedule.py b/redash/tasks/schedule.py
index bbce3b5024..c9dbaf653e 100644
--- a/redash/tasks/schedule.py
+++ b/redash/tasks/schedule.py
@@ -13,6 +13,7 @@
refresh_queries,
empty_schedules,
refresh_schemas,
+ cleanup_schema_metadata,
cleanup_query_results,
purge_failed_jobs,
version_check,
@@ -66,6 +67,7 @@ def periodic_job_definitions():
"func": send_aggregated_errors,
"interval": timedelta(minutes=settings.SEND_FAILURE_EMAIL_INTERVAL),
},
+ {"func": cleanup_schema_metadata, "interval": timedelta(days=3)},
]
if settings.VERSION_CHECK:
diff --git a/tests/factories.py b/tests/factories.py
index e46ca9c0ad..fab6a36698 100644
--- a/tests/factories.py
+++ b/tests/factories.py
@@ -102,6 +102,15 @@ def __call__(self):
org_id=1,
)
+table_metadata_factory = ModelFactory(
+ redash.models.TableMetadata, data_source_id=1, exists=True, name="table", org_id=1
+)
+
+column_metadata_factory = ModelFactory(
+ redash.models.ColumnMetadata, table_id=1, name="column", org_id=1
+)
+
+
access_permission_factory = ModelFactory(
redash.models.AccessPermission,
object_id=query_factory.create,
@@ -212,6 +221,12 @@ def create_org(self, **kwargs):
return org
+ def create_table_metadata(self, **kwargs):
+ return table_metadata_factory.create(**kwargs)
+
+ def create_column_metadata(self, **kwargs):
+ return column_metadata_factory.create(**kwargs)
+
def create_user(self, **kwargs):
args = {"org": self.org, "group_ids": [self.default_group.id]}
diff --git a/tests/handlers/test_data_sources.py b/tests/handlers/test_data_sources.py
index 9185a986bf..ca86e58ef6 100644
--- a/tests/handlers/test_data_sources.py
+++ b/tests/handlers/test_data_sources.py
@@ -24,6 +24,48 @@ def test_fails_if_user_doesnt_belong_to_org(self):
)
self.assertEqual(response.status_code, 404)
+ def test_get_schema_returns_expected_values(self):
+ data_source = self.factory.create_data_source()
+ table_metadata = self.factory.create_table_metadata(
+ data_source_id=data_source.id
+ )
+ column_metadata = self.factory.create_column_metadata(
+ table_id=table_metadata.id, type="boolean", example=True
+ )
+ admin = self.factory.create_admin()
+ data_source.schema_cache.populate()
+ response = self.make_request(
+ "get", "/api/data_sources/{}/schema".format(data_source.id), user=admin
+ )
+
+ return_value = [
+ {
+ "id": table_metadata.id,
+ "data_source_id": 1,
+ "org_id": 1,
+ "name": "table",
+ "column_metadata": False,
+ "exists": True,
+ "visible": True,
+ "description": None,
+ "sample_updated_at": None,
+ "sample_queries": {},
+ "columns": [
+ {
+ "id": 1,
+ "org_id": 1,
+ "table_id": 1,
+ "name": "column",
+ "type": "boolean",
+ "description": None,
+ "exists": True,
+ "example": True,
+ }
+ ],
+ }
+ ]
+ self.assertEqual(return_value, response.json["schema"])
+
class TestDataSourceListGet(BaseTestCase):
def test_returns_each_data_source_once(self):
diff --git a/tests/models/test_data_sources.py b/tests/models/test_data_sources.py
index fa503979d6..8e94fe1402 100644
--- a/tests/models/test_data_sources.py
+++ b/tests/models/test_data_sources.py
@@ -1,100 +1,9 @@
-import mock
-from mock import patch
from tests import BaseTestCase
from redash.models import DataSource, Query, QueryResult
from redash.utils.configuration import ConfigurationContainer
-class DataSourceTest(BaseTestCase):
- def test_get_schema(self):
- return_value = [{"name": "table", "columns": []}]
-
- with mock.patch(
- "redash.query_runner.pg.PostgreSQL.get_schema"
- ) as patched_get_schema:
- patched_get_schema.return_value = return_value
-
- schema = self.factory.data_source.get_schema()
-
- self.assertEqual(return_value, schema)
-
- def test_get_schema_uses_cache(self):
- return_value = [{"name": "table", "columns": []}]
- with mock.patch(
- "redash.query_runner.pg.PostgreSQL.get_schema"
- ) as patched_get_schema:
- patched_get_schema.return_value = return_value
-
- self.factory.data_source.get_schema()
- schema = self.factory.data_source.get_schema()
-
- self.assertEqual(return_value, schema)
- self.assertEqual(patched_get_schema.call_count, 1)
-
- def test_get_schema_skips_cache_with_refresh_true(self):
- return_value = [{"name": "table", "columns": []}]
- with mock.patch(
- "redash.query_runner.pg.PostgreSQL.get_schema"
- ) as patched_get_schema:
- patched_get_schema.return_value = return_value
-
- self.factory.data_source.get_schema()
- new_return_value = [{"name": "new_table", "columns": []}]
- patched_get_schema.return_value = new_return_value
- schema = self.factory.data_source.get_schema(refresh=True)
-
- self.assertEqual(new_return_value, schema)
- self.assertEqual(patched_get_schema.call_count, 2)
-
- def test_schema_sorter(self):
- input_data = [
- {"name": "zoo", "columns": ["is_zebra", "is_snake", "is_cow"]},
- {
- "name": "all_terain_vehicle",
- "columns": ["has_wheels", "has_engine", "has_all_wheel_drive"],
- },
- ]
-
- expected_output = [
- {
- "name": "all_terain_vehicle",
- "columns": ["has_all_wheel_drive", "has_engine", "has_wheels"],
- },
- {"name": "zoo", "columns": ["is_cow", "is_snake", "is_zebra"]},
- ]
-
- real_output = self.factory.data_source._sort_schema(input_data)
-
- self.assertEqual(real_output, expected_output)
-
- def test_model_uses_schema_sorter(self):
- orig_schema = [
- {"name": "zoo", "columns": ["is_zebra", "is_snake", "is_cow"]},
- {
- "name": "all_terain_vehicle",
- "columns": ["has_wheels", "has_engine", "has_all_wheel_drive"],
- },
- ]
-
- sorted_schema = [
- {
- "name": "all_terain_vehicle",
- "columns": ["has_all_wheel_drive", "has_engine", "has_wheels"],
- },
- {"name": "zoo", "columns": ["is_cow", "is_snake", "is_zebra"]},
- ]
-
- with mock.patch(
- "redash.query_runner.pg.PostgreSQL.get_schema"
- ) as patched_get_schema:
- patched_get_schema.return_value = orig_schema
-
- out_schema = self.factory.data_source.get_schema()
-
- self.assertEqual(out_schema, sorted_schema)
-
-
class TestDataSourceCreate(BaseTestCase):
def test_adds_data_source_to_default_group(self):
data_source = DataSource.create_with_group(
@@ -160,10 +69,3 @@ def test_deletes_child_models(self):
self.assertEqual(
0, QueryResult.query.filter(QueryResult.data_source == data_source).count()
)
-
- @patch("redash.redis_connection.delete")
- def test_deletes_schema(self, mock_redis):
- data_source = self.factory.create_data_source()
- data_source.delete()
-
- mock_redis.assert_called_with(data_source._schema_key)
diff --git a/tests/query_runner/test_athena.py b/tests/query_runner/test_athena.py
index 9db7b52f84..27b89529f0 100644
--- a/tests/query_runner/test_athena.py
+++ b/tests/query_runner/test_athena.py
@@ -78,7 +78,11 @@ def test_external_table(self):
)
with self.stubber:
assert query_runner.get_schema() == [
- {"columns": ["row_id"], "name": "test1.jdbc_table"}
+ {
+ "columns": ["row_id"],
+ "name": "test1.jdbc_table",
+ "metadata": [{"type": "int", "name": "row_id"}],
+ }
]
def test_partitioned_table(self):
@@ -131,7 +135,14 @@ def test_partitioned_table(self):
)
with self.stubber:
assert query_runner.get_schema() == [
- {"columns": ["sk", "category"], "name": "test1.partitioned_table"}
+ {
+ "columns": ["sk", "category"],
+ "name": "test1.partitioned_table",
+ "metadata": [
+ {"type": "int", "name": "sk"},
+ {"type": "int", "name": "category"},
+ ],
+ }
]
def test_view(self):
@@ -167,7 +178,11 @@ def test_view(self):
)
with self.stubber:
assert query_runner.get_schema() == [
- {"columns": ["sk"], "name": "test1.view"}
+ {
+ "columns": ["sk"],
+ "name": "test1.view",
+ "metadata": [{"type": "int", "name": "sk"}],
+ }
]
def test_dodgy_table_does_not_break_schema_listing(self):
@@ -211,5 +226,9 @@ def test_dodgy_table_does_not_break_schema_listing(self):
)
with self.stubber:
assert query_runner.get_schema() == [
- {"columns": ["region"], "name": "test1.csv"}
+ {
+ "columns": ["region"],
+ "name": "test1.csv",
+ "metadata": [{"type": "string", "name": "region"}],
+ }
]
diff --git a/tests/query_runner/test_bigquery.py b/tests/query_runner/test_bigquery.py
new file mode 100644
index 0000000000..183df3dade
--- /dev/null
+++ b/tests/query_runner/test_bigquery.py
@@ -0,0 +1,54 @@
+from mock import patch
+from tests import BaseTestCase
+
+from redash.query_runner.big_query import BigQuery
+
+
+class TestBigQuery(BaseTestCase):
+ def test_get_table_sample_returns_expected_result(self):
+ SAMPLES_RESPONSE = {
+ "rows": [
+ {
+ "f": [
+ {"v": "2017-10-28"},
+ {"v": "2019-03-28T18:57:04.485091"},
+ {"v": "3341"},
+ {"v": "2451"},
+ {"v": "Iran"},
+ ]
+ }
+ ]
+ }
+
+ SCHEMA_RESPONSE = {
+ "id": "project:dataset.table",
+ "schema": {
+ "fields": [
+ {"type": "DATE", "name": "submission_date", "mode": "NULLABLE"},
+ {"type": "DATETIME", "name": "generated_time", "mode": "NULLABLE"},
+ {"type": "INTEGER", "name": "mau", "mode": "NULLABLE"},
+ {"type": "INTEGER", "name": "wau", "mode": "NULLABLE"},
+ {"type": "STRING", "name": "country", "mode": "NULLABLE"},
+ ]
+ },
+ }
+
+ EXPECTED_SAMPLES_DICT = {
+ "submission_date": "2017-10-28",
+ "country": "Iran",
+ "wau": "2451",
+ "mau": "3341",
+ "generated_time": "2019-03-28T18:57:04.485091",
+ }
+
+ with patch.object(BigQuery, "_get_bigquery_service") as get_bq_service:
+ tabledata_list = get_bq_service.return_value.tabledata.return_value.list
+ tabledata_list.return_value.execute.return_value = SAMPLES_RESPONSE
+
+ tables_get = get_bq_service.return_value.tables.return_value.get
+ tables_get.return_value.execute.return_value = SCHEMA_RESPONSE
+
+ query_runner = BigQuery({"loadSchema": True, "projectId": "test_project"})
+ table_sample = query_runner.get_table_sample("dataset.table")
+
+ self.assertEqual(table_sample, EXPECTED_SAMPLES_DICT)
diff --git a/tests/query_runner/test_get_schema_format.py b/tests/query_runner/test_get_schema_format.py
new file mode 100644
index 0000000000..81c750b5d5
--- /dev/null
+++ b/tests/query_runner/test_get_schema_format.py
@@ -0,0 +1,70 @@
+import json
+import mock
+
+from unittest import TestCase
+
+from redash.query_runner.presto import Presto
+from redash.query_runner.athena import Athena
+from redash.query_runner.mysql import Mysql
+from redash.query_runner.pg import PostgreSQL, Redshift
+
+
+class TestBaseQueryRunner(TestCase):
+ def setUp(self):
+ self.query_runners = [
+ {"instance": Presto({}), "mock_location": "presto.Presto"},
+ {"instance": Athena({}), "mock_location": "athena.Athena"},
+ {"instance": Mysql({"db": None}), "mock_location": "mysql.Mysql"},
+ {"instance": PostgreSQL({}), "mock_location": "pg.PostgreSQL"},
+ {"instance": Redshift({}), "mock_location": "pg.Redshift"},
+ ]
+
+ def _setup_mock(self, function_to_patch):
+ patcher = mock.patch(function_to_patch)
+ patched_function = patcher.start()
+ self.addCleanup(patcher.stop)
+ return patched_function
+
+ def assert_correct_schema_format(self, query_runner, mock_location):
+ EXPECTED_SCHEMA_RESULT = [
+ {
+ "columns": ["created_date"],
+ "metadata": [{"name": "created_date", "type": "varchar",}],
+ "name": "default.table_name",
+ }
+ ]
+
+ get_schema_query_response = {
+ "rows": [
+ {
+ "table_schema": "default",
+ "table_name": "table_name",
+ "column_type": "varchar",
+ "column_name": "created_date",
+ }
+ ]
+ }
+ get_samples_query_response = {"rows": [{"created_date": "2017-10-26"}]}
+
+ self.run_count = 0
+
+ def query_runner_resonses(query, user):
+ response = (json.dumps(get_schema_query_response), None)
+ if self.run_count > 0:
+ response = (json.dumps(get_samples_query_response), None)
+ self.run_count += 1
+ return response
+
+ self.patched_run_query = self._setup_mock(
+ "redash.query_runner.{location}.run_query".format(location=mock_location)
+ )
+ self.patched_run_query.side_effect = query_runner_resonses
+
+ schema = query_runner.get_schema()
+ self.assertEqual(schema, EXPECTED_SCHEMA_RESULT)
+
+ def test_get_schema_format(self):
+ for runner in self.query_runners:
+ self.assert_correct_schema_format(
+ runner["instance"], runner["mock_location"]
+ )
diff --git a/tests/query_runner/test_pg.py b/tests/query_runner/test_pg.py
index 17fc35315b..eba60857eb 100644
--- a/tests/query_runner/test_pg.py
+++ b/tests/query_runner/test_pg.py
@@ -10,9 +10,20 @@ def test_handles_dups_between_public_and_other_schemas(self):
"table_schema": "public",
"table_name": "main.users",
"column_name": "id",
+ "column_type": "character varying",
+ },
+ {
+ "table_schema": "main",
+ "table_name": "users",
+ "column_name": "id",
+ "column_type": "character varying",
+ },
+ {
+ "table_schema": "main",
+ "table_name": "users",
+ "column_name": "name",
+ "column_type": "character varying",
},
- {"table_schema": "main", "table_name": "users", "column_name": "id"},
- {"table_schema": "main", "table_name": "users", "column_name": "name"},
]
}
diff --git a/tests/tasks/test_queries.py b/tests/tasks/test_queries.py
index 063e7c2780..d7ef7008cf 100644
--- a/tests/tasks/test_queries.py
+++ b/tests/tasks/test_queries.py
@@ -1,3 +1,4 @@
+import datetime
from unittest import TestCase
import uuid
@@ -8,7 +9,7 @@
from tests import BaseTestCase
from redash import redis_connection, rq_redis_connection, models
-from redash.utils import json_dumps
+from redash.utils import json_dumps, utcnow
from redash.query_runner.pg import PostgreSQL
from redash.tasks.queries.execution import (
QueryExecutionError,
@@ -222,3 +223,23 @@ def test_success_after_failure(self, _):
)
q = models.Query.get_by_id(q.id)
self.assertEqual(q.schedule_failures, 0)
+
+
+class TestPruneSchemaMetadata(BaseTestCase):
+ def test_cleanup_data_in_table(self):
+ data_source = self.factory.create_data_source()
+
+ # Create an existing table with a non-existing column
+ self.factory.create_table_metadata(
+ data_source_id=data_source.id,
+ org_id=data_source.org_id,
+ exists=False,
+ updated_at=(utcnow() - datetime.timedelta(days=70)),
+ )
+ all_tables = models.TableMetadata.query.all()
+ self.assertEqual(len(all_tables), 1)
+
+ models.cleanup_data_in_table(models.TableMetadata)
+
+ all_tables = models.TableMetadata.query.all()
+ self.assertEqual(len(all_tables), 0)
diff --git a/tests/tasks/test_refresh_schemas.py b/tests/tasks/test_refresh_schemas.py
index 8cb1210b90..6895ef57ce 100644
--- a/tests/tasks/test_refresh_schemas.py
+++ b/tests/tasks/test_refresh_schemas.py
@@ -1,10 +1,59 @@
+import copy
+import datetime
+
from mock import patch
from tests import BaseTestCase
-from redash.tasks import refresh_schemas
+from redash import models, redis_connection, utils
+from redash.tasks import refresh_schemas, refresh_schema, update_sample, refresh_samples
+from redash.models import TableMetadata, ColumnMetadata
+from redash.serializers import ColumnMetadataSerializer, TableMetadataSerializer
class TestRefreshSchemas(BaseTestCase):
+ def setUp(self):
+ super(TestRefreshSchemas, self).setUp()
+
+ self.COLUMN_NAME = "first_column"
+ self.COLUMN_TYPE = "text"
+ self.COLUMN_EXAMPLE = "some text for column value"
+ self.EXPECTED_COLUMN_METADATA = {
+ "id": 1,
+ "org_id": 1,
+ "table_id": 1,
+ "name": self.COLUMN_NAME,
+ "type": self.COLUMN_TYPE,
+ "example": self.COLUMN_EXAMPLE,
+ "exists": True,
+ "description": None,
+ }
+
+ get_schema_patcher = patch("redash.query_runner.pg.PostgreSQL.get_schema")
+ self.patched_get_schema = get_schema_patcher.start()
+ self.addCleanup(get_schema_patcher.stop)
+ self.default_schema_return_value = [
+ {
+ "name": "table",
+ "columns": [self.COLUMN_NAME],
+ "metadata": [{"name": self.COLUMN_NAME, "type": self.COLUMN_TYPE,}],
+ }
+ ]
+ self.patched_get_schema.return_value = self.default_schema_return_value
+
+ get_table_sample_patcher = patch(
+ "redash.query_runner.BaseQueryRunner.get_table_sample"
+ )
+ patched_get_table_sample = get_table_sample_patcher.start()
+ self.addCleanup(get_table_sample_patcher.stop)
+ patched_get_table_sample.return_value = {self.COLUMN_NAME: self.COLUMN_EXAMPLE}
+ lock_key = "data_source:schema:refresh:{}:lock".format(
+ self.factory.data_source.id
+ )
+ redis_connection.delete(lock_key)
+
+ def tearDown(self):
+ self.factory.data_source.query_runner.configuration["samples"] = False
+
def test_calls_refresh_of_all_data_sources(self):
self.factory.data_source # trigger creation
with patch(
@@ -29,3 +78,301 @@ def test_skips_paused_data_sources(self):
) as refresh_job:
refresh_schemas()
refresh_job.assert_called()
+
+ def test_refresh_schema_creates_tables(self):
+ EXPECTED_TABLE_METADATA = {
+ "id": 1,
+ "org_id": 1,
+ "exists": True,
+ "name": u"table",
+ "visible": True,
+ "description": None,
+ "column_metadata": True,
+ "data_source_id": 1,
+ "sample_updated_at": None,
+ "sample_queries": {},
+ "columns": [self.EXPECTED_COLUMN_METADATA],
+ }
+
+ refresh_schema(self.factory.data_source.id)
+ update_sample(
+ self.factory.data_source.id,
+ "table",
+ 1,
+ utils.utcnow() - datetime.timedelta(days=90),
+ )
+ table_metadata = TableMetadata.query.all()
+ column_metadata = ColumnMetadata.query.all()
+
+ self.assertEqual(len(table_metadata), 1)
+ self.assertEqual(len(column_metadata), 1)
+ self.assertEqual(
+ TableMetadataSerializer(
+ table_metadata[0], with_favorite_state=False
+ ).serialize(),
+ EXPECTED_TABLE_METADATA,
+ )
+ self.assertEqual(
+ ColumnMetadataSerializer(column_metadata[0]).serialize(),
+ self.EXPECTED_COLUMN_METADATA,
+ )
+
+ def test_refresh_schema_deleted_table_marked(self):
+ refresh_schema(self.factory.data_source.id)
+ table_metadata = TableMetadata.query.all()
+ column_metadata = ColumnMetadata.query.all()
+
+ self.assertEqual(len(table_metadata), 1)
+ self.assertEqual(len(column_metadata), 1)
+ self.assertTrue(
+ TableMetadataSerializer(
+ table_metadata[0], with_favorite_state=False
+ ).serialize()["exists"]
+ )
+
+ # Table is gone, `exists` should be False.
+ self.patched_get_schema.return_value = []
+
+ refresh_schema(self.factory.data_source.id)
+ table_metadata = TableMetadata.query.all()
+ column_metadata = ColumnMetadata.query.all()
+
+ self.assertEqual(len(table_metadata), 1)
+ self.assertEqual(len(column_metadata), 1)
+ self.assertFalse(
+ TableMetadataSerializer(
+ table_metadata[0], with_favorite_state=False
+ ).serialize()["exists"]
+ )
+
+ # Table is back, `exists` should be True again.
+ self.patched_get_schema.return_value = self.default_schema_return_value
+ refresh_schema(self.factory.data_source.id)
+ table_metadata = TableMetadata.query.all()
+ self.assertTrue(
+ TableMetadataSerializer(
+ table_metadata[0], with_favorite_state=False
+ ).serialize()["exists"]
+ )
+
+ def test_refresh_schema_table_with_new_metadata_updated(self):
+ refresh_schema(self.factory.data_source.id)
+ table_metadata = TableMetadata.query.all()
+ column_metadata = ColumnMetadata.query.all()
+
+ self.assertEqual(len(table_metadata), 1)
+ self.assertEqual(len(column_metadata), 1)
+ self.assertTrue(
+ TableMetadataSerializer(
+ table_metadata[0], with_favorite_state=False
+ ).serialize()["column_metadata"]
+ )
+
+ # Table has no metdata field, `column_metadata` should be False.
+ self.patched_get_schema.return_value = [
+ {"name": "table", "columns": [self.COLUMN_NAME],}
+ ]
+
+ refresh_schema(self.factory.data_source.id)
+ table_metadata = TableMetadata.query.all()
+ column_metadata = ColumnMetadata.query.all()
+
+ self.assertEqual(len(table_metadata), 1)
+ self.assertEqual(len(column_metadata), 1)
+ self.assertFalse(
+ TableMetadataSerializer(
+ table_metadata[0], with_favorite_state=False
+ ).serialize()["column_metadata"]
+ )
+
+ # Table metadata field is back, `column_metadata` should be True again.
+ self.patched_get_schema.return_value = self.default_schema_return_value
+ refresh_schema(self.factory.data_source.id)
+ table_metadata = TableMetadata.query.all()
+ self.assertTrue(
+ TableMetadataSerializer(
+ table_metadata[0], with_favorite_state=False
+ ).serialize()["column_metadata"]
+ )
+
+ def test_refresh_schema_delete_column(self):
+ NEW_COLUMN_NAME = "new_column"
+ refresh_schema(self.factory.data_source.id)
+ column_metadata = ColumnMetadata.query.all()
+
+ self.assertTrue(
+ ColumnMetadataSerializer(column_metadata[0]).serialize()["exists"]
+ )
+
+ self.patched_get_schema.return_value = [
+ {
+ "name": "table",
+ "columns": [NEW_COLUMN_NAME],
+ "metadata": [{"name": NEW_COLUMN_NAME, "type": self.COLUMN_TYPE,}],
+ }
+ ]
+
+ refresh_schema(self.factory.data_source.id)
+ column_metadata = ColumnMetadata.query.all()
+ self.assertEqual(len(column_metadata), 2)
+
+ self.assertFalse(
+ ColumnMetadataSerializer(column_metadata[1]).serialize()["exists"]
+ )
+ self.assertTrue(
+ ColumnMetadataSerializer(column_metadata[0]).serialize()["exists"]
+ )
+
+ def test_refresh_schema_update_column(self):
+ UPDATED_COLUMN_TYPE = "varchar"
+
+ refresh_schema(self.factory.data_source.id)
+ update_sample(
+ self.factory.data_source.id,
+ "table",
+ 1,
+ utils.utcnow() - datetime.timedelta(days=90),
+ )
+ column_metadata = ColumnMetadata.query.all()
+ self.assertEqual(
+ ColumnMetadataSerializer(column_metadata[0]).serialize(),
+ self.EXPECTED_COLUMN_METADATA,
+ )
+
+ updated_schema = copy.deepcopy(self.default_schema_return_value)
+ updated_schema[0]["metadata"][0]["type"] = UPDATED_COLUMN_TYPE
+ self.patched_get_schema.return_value = updated_schema
+
+ refresh_schema(self.factory.data_source.id)
+ column_metadata = ColumnMetadata.query.all()
+ self.assertNotEqual(
+ ColumnMetadataSerializer(column_metadata[0]).serialize(),
+ self.EXPECTED_COLUMN_METADATA,
+ )
+ self.assertEqual(
+ ColumnMetadataSerializer(column_metadata[0]).serialize()["type"],
+ UPDATED_COLUMN_TYPE,
+ )
+
+ def test_refresh_samples_rate_limits(self):
+ NEW_COLUMN_NAME = "new_column"
+ NUM_TABLES = 105
+ tables = []
+
+ for i in range(NUM_TABLES):
+ tables.append(
+ {
+ "name": "table{}".format(i),
+ "columns": [NEW_COLUMN_NAME],
+ "metadata": [{"name": NEW_COLUMN_NAME, "type": self.COLUMN_TYPE,}],
+ }
+ )
+
+ self.patched_get_schema.return_value = tables
+ self.factory.data_source.query_runner.configuration["samples"] = True
+
+ refresh_schema(self.factory.data_source.id)
+ refresh_samples(self.factory.data_source.id, 50)
+
+ # There's a total of 105 tables
+ table_metadata = TableMetadata.query.count()
+ self.assertEqual(table_metadata, NUM_TABLES)
+
+ # 50 tables are processed on the first call
+ table_metadata = TableMetadata.query.filter(
+ TableMetadata.sample_updated_at.is_(None)
+ ).all()
+ self.assertEqual(len(table_metadata), 55)
+
+ # 50 more tables are processed on the second call
+ refresh_samples(self.factory.data_source.id, 50)
+ table_metadata = TableMetadata.query.filter(
+ TableMetadata.sample_updated_at.is_(None)
+ ).all()
+ self.assertEqual(len(table_metadata), 5)
+
+ # All tables are processed by the third call
+ refresh_samples(self.factory.data_source.id, 50)
+ table_metadata = TableMetadata.query.filter(
+ TableMetadata.sample_updated_at.is_(None)
+ ).all()
+ self.assertEqual(len(table_metadata), 0)
+
+ def test_refresh_samples_refreshes(self):
+ NEW_COLUMN_NAME = "new_column"
+ NUM_TABLES = 5
+ TIME_BEFORE_UPDATE = utils.utcnow()
+ tables = []
+
+ for i in range(NUM_TABLES):
+ tables.append(
+ {
+ "name": "table{}".format(i),
+ "columns": [NEW_COLUMN_NAME],
+ "metadata": [{"name": NEW_COLUMN_NAME, "type": self.COLUMN_TYPE,}],
+ }
+ )
+
+ self.patched_get_schema.return_value = tables
+ self.factory.data_source.query_runner.configuration["samples"] = True
+
+ refresh_schema(self.factory.data_source.id)
+ refresh_samples(self.factory.data_source.id, 50)
+
+ # There's a total of 5 processed tables
+ table_metadata = TableMetadata.query.filter(
+ TableMetadata.sample_updated_at.isnot(None)
+ )
+ self.assertEqual(table_metadata.count(), NUM_TABLES)
+ self.assertTrue(table_metadata.first().sample_updated_at > TIME_BEFORE_UPDATE)
+
+ table_metadata.update(
+ {"sample_updated_at": utils.utcnow() - datetime.timedelta(days=30)}
+ )
+ models.db.session.commit()
+
+ TIME_BEFORE_UPDATE = utils.utcnow()
+ refresh_samples(self.factory.data_source.id, 50)
+ table_metadata_list = TableMetadata.query.filter(
+ TableMetadata.sample_updated_at.isnot(None)
+ )
+ self.assertTrue(
+ table_metadata_list.first().sample_updated_at > TIME_BEFORE_UPDATE
+ )
+
+ def test_refresh_schema_doesnt_overwrite_samples(self):
+ self.factory.data_source.query_runner.configuration["samples"] = True
+
+ refresh_schema(self.factory.data_source.id)
+ column_metadata = ColumnMetadata.query.first()
+ self.assertEqual(column_metadata.example, None)
+
+ update_sample(
+ self.factory.data_source.id,
+ "table",
+ 1,
+ utils.utcnow() - datetime.timedelta(days=90),
+ )
+ column_metadata = ColumnMetadata.query.first()
+ self.assertEqual(column_metadata.example, self.COLUMN_EXAMPLE)
+
+ # Check that a schema refresh doesn't overwrite examples
+ refresh_schema(self.factory.data_source.id)
+ column_metadata = ColumnMetadata.query.first()
+ self.assertEqual(column_metadata.example, self.COLUMN_EXAMPLE)
+
+ def test_refresh_samples_applied_to_one_data_source(self):
+ ds1 = self.factory.create_data_source()
+ ds2 = self.factory.create_data_source()
+
+ ds1.query_runner.configuration["samples"] = True
+ ds2.query_runner.configuration["samples"] = True
+
+ refresh_schema(ds1.id)
+ refresh_schema(ds2.id)
+ refresh_samples(ds1.id, 50)
+
+ table_metadata = TableMetadata.query.filter(
+ TableMetadata.sample_updated_at.isnot(None)
+ )
+ self.assertEqual(table_metadata.count(), len(self.default_schema_return_value))
diff --git a/tests/test_cli.py b/tests/test_cli.py
index 537c83d03d..8ce4fbbf48 100644
--- a/tests/test_cli.py
+++ b/tests/test_cli.py
@@ -3,10 +3,11 @@
from click.testing import CliRunner
from tests import BaseTestCase
+from redash import utils
from redash.utils.configuration import ConfigurationContainer
from redash.query_runner import query_runners
from redash.cli import manager
-from redash.models import DataSource, Group, Organization, User, db
+from redash.models import TableMetadata, DataSource, Group, Organization, User, db
class DataSourceCommandTests(BaseTestCase):
@@ -169,6 +170,18 @@ def test_connection_bad_delete(self):
self.assertIn("Couldn't find", result.output)
self.assertEqual(DataSource.query.count(), 1)
+ def test_refresh_samples(self):
+ self.factory.create_data_source(
+ name="test1",
+ type="sqlite",
+ options=ConfigurationContainer({"dbpath": "/tmp/test.db"}),
+ )
+ runner = CliRunner()
+ result = runner.invoke(manager, ["ds", "refresh_samples", "test1"])
+ self.assertFalse(result.exception)
+ self.assertEqual(result.exit_code, 0)
+ self.assertIn("Refreshing", result.output)
+
def test_options_edit(self):
self.factory.create_data_source(
name="test1",