Skip to content

Commit

Permalink
Cleanup code
Browse files Browse the repository at this point in the history
  • Loading branch information
betodealmeida committed Jan 22, 2024
1 parent b984b9c commit f512f33
Showing 1 changed file with 28 additions and 25 deletions.
53 changes: 28 additions & 25 deletions superset/sql_parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,32 +325,13 @@ def _extract_tables_from_statement(self, statement: exp.Expression) -> set[Table

pseudo_query = parse_one(f"SELECT {literal.this}", dialect=self._dialect)
sources = pseudo_query.find_all(exp.Table)
elif statement:
sources = []
for scope in traverse_scope(statement):
for source in scope.sources.values():
if not isinstance(source, exp.Table):
continue

# CTEs in the parent scope look like tables (and are represented by
# exp.Table objects), but should not be considered as such;
# otherwise a user with access to table `foo` could access any table
# with a query like this:
#
# WITH foo AS (SELECT * FROM bar) SELECT * FROM foo
#
parent_sources = scope.parent.sources if scope.parent else {}
ctes_in_scope = {
name
for name, parent_scope in parent_sources.items()
if isinstance(parent_scope, Scope)
and parent_scope.scope_type == ScopeType.CTE
}
if source.name not in ctes_in_scope:
sources.append(source)

else:
return set()
sources = [
source
for scope in traverse_scope(statement)
for source in scope.sources.values()
if isinstance(source, exp.Table) and not self._is_cte(source, scope)
]

return {
Table(
Expand All @@ -361,6 +342,28 @@ def _extract_tables_from_statement(self, statement: exp.Expression) -> set[Table
for source in sources
}

def _is_cte(self, source: exp.Table, scope: Scope) -> bool:
"""
Is the source a CTE?
CTEs in the parent scope look like tables (and are represented by
exp.Table objects), but should not be considered as such;
otherwise a user with access to table `foo` could access any table
with a query like this:
WITH foo AS (SELECT * FROM target_table) SELECT * FROM foo
"""
parent_sources = scope.parent.sources if scope.parent else {}
ctes_in_scope = {
name
for name, parent_scope in parent_sources.items()
if isinstance(parent_scope, Scope)
and parent_scope.scope_type == ScopeType.CTE
}

return source.name in ctes_in_scope

@property
def limit(self) -> Optional[int]:
return self._limit
Expand Down

0 comments on commit f512f33

Please sign in to comment.