diff --git a/ci/check_disallowed_imports.py b/ci/check_disallowed_imports.py index 517e12547073..391f6d302510 100755 --- a/ci/check_disallowed_imports.py +++ b/ci/check_disallowed_imports.py @@ -14,7 +14,7 @@ def generate_dependency_graph(*args): command = ("pydeps", "--show-deps", *args) print(f"Running: {' '.join(command)}") # noqa: T201 - result = subprocess.check_output(command, text=True) + result = subprocess.check_output(command, text=True) # noqa: S603 return json.loads(result) diff --git a/ci/make_geography_db.py b/ci/make_geography_db.py index 966e5292a4d8..3c89df802ea9 100755 --- a/ci/make_geography_db.py +++ b/ci/make_geography_db.py @@ -90,7 +90,7 @@ def main() -> None: args = parser.parse_args() - response = requests.get(args.input_data_url) + response = requests.get(args.input_data_url, timeout=600) response.raise_for_status() input_data = response.json() db_path = Path(args.output_directory).joinpath("geography.duckdb") diff --git a/docs/how-to/visualization/example_streamlit_app/example_streamlit_app.py b/docs/how-to/visualization/example_streamlit_app/example_streamlit_app.py index 4c6563a298ab..85e31a7e3f99 100644 --- a/docs/how-to/visualization/example_streamlit_app/example_streamlit_app.py +++ b/docs/how-to/visualization/example_streamlit_app/example_streamlit_app.py @@ -13,7 +13,8 @@ @st.cache_data def get_emoji(): resp = requests.get( - "https://raw.githubusercontent.com/omnidan/node-emoji/master/lib/emoji.json" + "https://raw.githubusercontent.com/omnidan/node-emoji/master/lib/emoji.json", + timeout=60, ) resp.raise_for_status() emojis = resp.json() diff --git a/ibis/backends/bigquery/__init__.py b/ibis/backends/bigquery/__init__.py index eef174a0ce00..2e09a04185fb 100644 --- a/ibis/backends/bigquery/__init__.py +++ b/ibis/backends/bigquery/__init__.py @@ -54,7 +54,7 @@ "https://www.googleapis.com/auth/drive", ] CLIENT_ID = "546535678771-gvffde27nd83kfl6qbrnletqvkdmsese.apps.googleusercontent.com" -CLIENT_SECRET = "iU5ohAF2qcqrujegE3hQ1cPt" +CLIENT_SECRET = "iU5ohAF2qcqrujegE3hQ1cPt" # noqa: S105 def _create_user_agent(application_name: str) -> str: diff --git a/ibis/backends/bigquery/tests/unit/udf/test_core.py b/ibis/backends/bigquery/tests/unit/udf/test_core.py index 309887d13279..6486a595bbb4 100644 --- a/ibis/backends/bigquery/tests/unit/udf/test_core.py +++ b/ibis/backends/bigquery/tests/unit/udf/test_core.py @@ -56,7 +56,7 @@ def f(a): ) f.seek(0) code = builtins.compile(f.read(), f.name, "exec") - exec(code, d) + exec(code, d) # noqa: S102 f = d["f"] js = compile(f) snapshot.assert_match(js, "out.js") diff --git a/ibis/backends/clickhouse/__init__.py b/ibis/backends/clickhouse/__init__.py index d676d95a7870..cc99b4d6251b 100644 --- a/ibis/backends/clickhouse/__init__.py +++ b/ibis/backends/clickhouse/__init__.py @@ -423,7 +423,7 @@ def insert( elif not isinstance(obj, ir.Table): obj = ibis.memtable(obj) - query = self._build_insert_query(target=name, source=obj) + query = self._build_insert_from_table(target=name, source=obj) external_tables = self._collect_in_memory_tables(obj, {}) external_data = self._normalize_external_tables(external_tables) return self.con.command(query.sql(self.name), external_data=external_data) diff --git a/ibis/backends/duckdb/tests/test_register.py b/ibis/backends/duckdb/tests/test_register.py index 0bffd3b4d86d..0030c33f6018 100644 --- a/ibis/backends/duckdb/tests/test_register.py +++ b/ibis/backends/duckdb/tests/test_register.py @@ -166,7 +166,9 @@ def test_temp_directory(tmp_path): @pytest.fixture(scope="session") def pgurl(): # pragma: no cover pgcon = ibis.postgres.connect( - user="postgres", password="postgres", host="localhost" + user="postgres", + password="postgres", # noqa: S106 + host="localhost", ) df = pd.DataFrame({"x": [1.0, 2.0, 3.0, 1.0], "y": ["a", "b", "c", "a"]}) @@ -193,7 +195,11 @@ def test_read_postgres(con, pgurl): # pragma: no cover @pytest.fixture(scope="session") def mysqlurl(): # pragma: no cover - mysqlcon = ibis.mysql.connect(user="ibis", password="ibis", database="ibis_testing") + mysqlcon = ibis.mysql.connect( + user="ibis", + password="ibis", # noqa: S106 + database="ibis_testing", + ) df = pd.DataFrame({"x": [1.0, 2.0, 3.0, 1.0], "y": ["a", "b", "c", "a"]}) s = ibis.schema(dict(x="float64", y="str")) diff --git a/ibis/backends/impala/__init__.py b/ibis/backends/impala/__init__.py index 0371d3fffe8c..65d89abc149a 100644 --- a/ibis/backends/impala/__init__.py +++ b/ibis/backends/impala/__init__.py @@ -1239,9 +1239,7 @@ def _register_in_memory_table(self, op: ops.InMemoryTable) -> None: ).sql(self.name, pretty=True) data = op.data.to_frame().itertuples(index=False) - specs = ", ".join("?" * len(schema)) - table = sg.table(name, quoted=quoted).sql(self.name) - insert_stmt = f"INSERT INTO {table} VALUES ({specs})" + insert_stmt = self._build_insert_template(name, schema=schema) with self._safe_raw_sql(create_stmt) as cur: for row in data: cur.execute(insert_stmt, row) diff --git a/ibis/backends/mssql/__init__.py b/ibis/backends/mssql/__init__.py index 0bde35b60b52..4789700f4472 100644 --- a/ibis/backends/mssql/__init__.py +++ b/ibis/backends/mssql/__init__.py @@ -6,8 +6,6 @@ import datetime import struct from contextlib import closing -from functools import partial -from itertools import repeat from operator import itemgetter from typing import TYPE_CHECKING, Any @@ -25,7 +23,7 @@ from ibis.backends import CanCreateCatalog, CanCreateDatabase, CanCreateSchema, NoUrl from ibis.backends.mssql.compiler import MSSQLCompiler from ibis.backends.sql import SQLBackend -from ibis.backends.sql.compiler import C +from ibis.backends.sql.compiler import STAR, C if TYPE_CHECKING: from collections.abc import Iterable, Mapping @@ -287,25 +285,35 @@ def raw_sql(self, query: str | sg.Expression, **kwargs: Any) -> Any: return cursor def create_catalog(self, name: str, force: bool = False) -> None: - name = self._quote(name) + expr = ( + sg.select(STAR) + .from_(sg.table("databases", db="sys")) + .where(C.name.eq(sge.convert(name))) + ) + stmt = sge.Create( + kind="DATABASE", this=sg.to_identifier(name, quoted=self.compiler.quoted) + ).sql(self.dialect) create_stmt = ( f"""\ -IF NOT EXISTS (SELECT name FROM sys.databases WHERE name = {name}) +IF NOT EXISTS ({expr.sql(self.dialect)}) BEGIN - CREATE DATABASE {name}; + {stmt}; END; GO""" if force - else f"CREATE DATABASE {name}" + else stmt ) with self._safe_raw_sql(create_stmt): pass def drop_catalog(self, name: str, force: bool = False) -> None: - name = self._quote(name) - if_exists = "IF EXISTS " * force - - with self._safe_raw_sql(f"DROP DATABASE {if_exists}{name}"): + with self._safe_raw_sql( + sge.Drop( + kind="DATABASE", + this=sg.to_identifier(name, quoted=self.compiler.quoted), + exists=force, + ) + ): pass def create_database( @@ -313,31 +321,44 @@ def create_database( ) -> None: current_catalog = self.current_catalog should_switch_catalog = catalog is not None and catalog != current_catalog + quoted = self.compiler.quoted - name = self._quote(name) + expr = ( + sg.select(STAR) + .from_(sg.table("schemas", db="sys")) + .where(C.name.eq(sge.convert(name))) + ) + stmt = sge.Create( + kind="SCHEMA", this=sg.to_identifier(name, quoted=quoted) + ).sql(self.dialect) create_stmt = ( f"""\ -IF NOT EXISTS (SELECT * FROM sys.schemas WHERE name = {name}) +IF NOT EXISTS ({expr.sql(self.dialect)}) BEGIN - CREATE SCHEMA {name}; + {stmt}; END; GO""" if force - else f"CREATE SCHEMA {name}" + else stmt ) with self.begin() as cur: if should_switch_catalog: - cur.execute(f"USE {self._quote(catalog)}") + cur.execute( + sge.Use(this=sg.to_identifier(catalog, quoted=quoted)).sql( + self.dialect + ) + ) cur.execute(create_stmt) if should_switch_catalog: - cur.execute(f"USE {self._quote(current_catalog)}") - - def _quote(self, name: str): - return sg.to_identifier(name, quoted=True).sql(self.dialect) + cur.execute( + sge.Use(this=sg.to_identifier(current_catalog, quoted=quoted)).sql( + self.dialect + ) + ) def drop_database( self, name: str, catalog: str | None = None, force: bool = False @@ -345,18 +366,30 @@ def drop_database( current_catalog = self.current_catalog should_switch_catalog = catalog is not None and catalog != current_catalog - name = self._quote(name) - - if_exists = "IF EXISTS " * force + quoted = self.compiler.quoted with self.begin() as cur: if should_switch_catalog: - cur.execute(f"USE {self._quote(catalog)}") + cur.execute( + sge.Use(this=sg.to_identifier(catalog, quoted=quoted)).sql( + self.dialect + ) + ) - cur.execute(f"DROP SCHEMA {if_exists}{name}") + cur.execute( + sge.Drop( + kind="SCHEMA", + exists=force, + this=sg.to_identifier(name, quoted=quoted), + ).sql(self.dialect) + ) if should_switch_catalog: - cur.execute(f"USE {self._quote(current_catalog)}") + cur.execute( + sge.Use(this=sg.to_identifier(current_catalog, quoted=quoted)).sql( + self.dialect + ) + ) def list_tables( self, @@ -570,19 +603,11 @@ def _register_in_memory_table(self, op: ops.InMemoryTable) -> None: df = op.data.to_frame() data = df.itertuples(index=False) - cols = ", ".join( - ident.sql(self.dialect) - for ident in map( - partial(sg.to_identifier, quoted=quoted), schema.keys() - ) - ) - specs = ", ".join(repeat("?", len(schema))) - table = sg.table(name, quoted=quoted) - sql = f"INSERT INTO {table.sql(self.dialect)} ({cols}) VALUES ({specs})" + insert_stmt = self._build_insert_template(name, schema=schema, columns=True) with self._safe_raw_sql(create_stmt) as cur: if not df.empty: - cur.executemany(sql, data) + cur.executemany(insert_stmt, data) def _to_sqlglot( self, expr: ir.Expr, *, limit: str | None = None, params=None, **_: Any diff --git a/ibis/backends/mysql/__init__.py b/ibis/backends/mysql/__init__.py index 467fa9b7155e..db6ba97b37ac 100644 --- a/ibis/backends/mysql/__init__.py +++ b/ibis/backends/mysql/__init__.py @@ -5,8 +5,7 @@ import contextlib import re import warnings -from functools import cached_property, partial -from itertools import repeat +from functools import cached_property from operator import itemgetter from typing import TYPE_CHECKING, Any from urllib.parse import parse_qs, urlparse @@ -26,7 +25,7 @@ from ibis.backends.mysql.compiler import MySQLCompiler from ibis.backends.mysql.datatypes import _type_from_cursor_info from ibis.backends.sql import SQLBackend -from ibis.backends.sql.compiler import TRUE, C +from ibis.backends.sql.compiler import STAR, TRUE, C if TYPE_CHECKING: from collections.abc import Mapping @@ -195,7 +194,16 @@ def list_databases(self, like: str | None = None) -> list[str]: def _get_schema_using_query(self, query: str) -> sch.Schema: with self.begin() as cur: - cur.execute(f"SELECT * FROM ({query}) AS tmp LIMIT 0") + cur.execute( + sg.select(STAR) + .from_( + sg.parse_one(query, dialect=self.dialect).subquery( + sg.to_identifier("tmp", quoted=self.compiler.quoted) + ) + ) + .limit(0) + .sql(self.dialect) + ) return sch.Schema( { @@ -207,10 +215,12 @@ def _get_schema_using_query(self, query: str) -> sch.Schema: def get_schema( self, name: str, *, catalog: str | None = None, database: str | None = None ) -> sch.Schema: - table = sg.table(name, db=database, catalog=catalog, quoted=True).sql(self.name) + table = sg.table( + name, db=database, catalog=catalog, quoted=self.compiler.quoted + ).sql(self.dialect) with self.begin() as cur: - cur.execute(f"DESCRIBE {table}") + cur.execute(sge.Describe(this=table).sql(self.dialect)) result = cur.fetchall() type_mapper = self.compiler.type_mapper @@ -497,19 +507,14 @@ def _register_in_memory_table(self, op: ops.InMemoryTable) -> None: ) create_stmt_sql = create_stmt.sql(self.name) - columns = schema.keys() df = op.data.to_frame() # nan can not be used with MySQL df = df.replace(np.nan, None) data = df.itertuples(index=False) - cols = ", ".join( - ident.sql(self.name) - for ident in map(partial(sg.to_identifier, quoted=quoted), columns) + sql = self._build_insert_template( + name, schema=schema, columns=True, placeholder="%s" ) - specs = ", ".join(repeat("%s", len(columns))) - table = sg.table(name, quoted=quoted) - sql = f"INSERT INTO {table.sql(self.name)} ({cols}) VALUES ({specs})" with self.begin() as cur: cur.execute(create_stmt_sql) diff --git a/ibis/backends/oracle/__init__.py b/ibis/backends/oracle/__init__.py index 1107a5bc2cbc..0fab694e532f 100644 --- a/ibis/backends/oracle/__init__.py +++ b/ibis/backends/oracle/__init__.py @@ -10,6 +10,7 @@ from operator import itemgetter from typing import TYPE_CHECKING, Any +import numpy as np import oracledb import sqlglot as sg import sqlglot.expressions as sge @@ -501,16 +502,18 @@ def _register_in_memory_table(self, op: ops.InMemoryTable) -> None: this=sg.to_identifier(name, quoted=quoted), expressions=column_defs ), properties=sge.Properties(expressions=[sge.TemporaryProperty()]), - ).sql(self.name, pretty=True) + ).sql(self.name) - data = op.data.to_frame().itertuples(index=False) - specs = ", ".join(f":{i}" for i, _ in enumerate(schema)) - table = sg.table(name, quoted=quoted).sql(self.name) - insert_stmt = f"INSERT INTO {table} VALUES ({specs})" + data = op.data.to_frame().replace({np.nan: None}) + insert_stmt = self._build_insert_template( + name, schema=schema, placeholder=":{i:d}" + ) with self.begin() as cur: cur.execute(create_stmt) - for row in data: - cur.execute(insert_stmt, row) + for start, end in util.chunks(len(data), chunk_size=128): + cur.executemany( + insert_stmt, list(data.iloc[start:end].itertuples(index=False)) + ) atexit.register(self._clean_up_tmp_table, name) diff --git a/ibis/backends/postgres/__init__.py b/ibis/backends/postgres/__init__.py index 1deaa654b895..6f6be6a61b7e 100644 --- a/ibis/backends/postgres/__init__.py +++ b/ibis/backends/postgres/__init__.py @@ -6,7 +6,7 @@ import inspect import textwrap from functools import partial -from itertools import repeat, takewhile +from itertools import takewhile from operator import itemgetter from typing import TYPE_CHECKING, Any from urllib.parse import parse_qs, urlparse @@ -148,7 +148,6 @@ def _register_in_memory_table(self, op: ops.InMemoryTable) -> None: ) create_stmt_sql = create_stmt.sql(self.dialect) - columns = schema.keys() df = op.data.to_frame() # nan gets compiled into 'NaN'::float which throws errors in non-float columns # In order to hold NaN values, pandas automatically converts integer columns @@ -161,13 +160,9 @@ def _register_in_memory_table(self, op: ops.InMemoryTable) -> None: df[col] = df[col].replace(np.nan, None) data = df.itertuples(index=False) - cols = ", ".join( - ident.sql(self.dialect) - for ident in map(partial(sg.to_identifier, quoted=quoted), columns) + sql = self._build_insert_template( + name, schema=schema, columns=True, placeholder="%s" ) - specs = ", ".join(repeat("%s", len(columns))) - table = sg.table(name, quoted=quoted) - sql = f"INSERT INTO {table.sql(self.dialect)} ({cols}) VALUES ({specs})" with self.begin() as cur: cur.execute(create_stmt_sql) diff --git a/ibis/backends/postgres/tests/test_client.py b/ibis/backends/postgres/tests/test_client.py index 6a2d635786d4..6f465d74d13c 100644 --- a/ibis/backends/postgres/tests/test_client.py +++ b/ibis/backends/postgres/tests/test_client.py @@ -14,8 +14,9 @@ from __future__ import annotations import os -import random +import hypothesis as h +import hypothesis.strategies as st import numpy as np import pandas as pd import pandas.testing as tm @@ -260,7 +261,8 @@ def test_port(): ibis.connect("postgresql://postgres:postgres@localhost:1337/ibis_testing") -def test_pgvector_type_load(con): +@h.given(st.integers(min_value=4, max_value=1000)) +def test_pgvector_type_load(con, vector_size): """ CREATE TABLE items (id bigserial PRIMARY KEY, embedding vector(3)); INSERT INTO items (embedding) VALUES ('[1,2,3]'), ('[4,5,6]'); @@ -279,7 +281,7 @@ def test_pgvector_type_load(con): query = f""" DROP TABLE IF EXISTS itemsvrandom; - CREATE TABLE itemsvrandom (id bigserial PRIMARY KEY, embedding vector({random.randint(4, 1000)})); + CREATE TABLE itemsvrandom (id bigserial PRIMARY KEY, embedding vector({vector_size})); """ with con.raw_sql(query): diff --git a/ibis/backends/pyspark/tests/test_ddl.py b/ibis/backends/pyspark/tests/test_ddl.py index 834b61aa517a..835cb07c550c 100644 --- a/ibis/backends/pyspark/tests/test_ddl.py +++ b/ibis/backends/pyspark/tests/test_ddl.py @@ -2,6 +2,7 @@ import os import shutil +import tempfile from posixpath import join as pjoin import pytest @@ -14,7 +15,7 @@ @pytest.fixture -def temp_view(con) -> str: +def temp_view(con): name = util.gen_name("view") yield name con.drop_view(name, force=True) @@ -42,7 +43,10 @@ def test_drop_non_empty_database(con, alltypes, temp_table_db): @pytest.fixture def temp_base(): - base = pjoin(f"/tmp/{util.gen_name('pyspark_testing')}", util.gen_name("temp_base")) + base = pjoin( + f"{tempfile.gettempdir()}/{util.gen_name('pyspark_testing')}", + util.gen_name("temp_base"), + ) yield base shutil.rmtree(base, ignore_errors=True) diff --git a/ibis/backends/risingwave/__init__.py b/ibis/backends/risingwave/__init__.py index a6ee2e696e0f..f90f22de0f48 100644 --- a/ibis/backends/risingwave/__init__.py +++ b/ibis/backends/risingwave/__init__.py @@ -2,8 +2,6 @@ from __future__ import annotations -from functools import partial -from itertools import repeat from operator import itemgetter from typing import TYPE_CHECKING @@ -307,16 +305,11 @@ def _register_in_memory_table(self, op: ops.InMemoryTable) -> None: ) create_stmt_sql = create_stmt.sql(self.dialect) - columns = schema.keys() df = op.data.to_frame() data = df.itertuples(index=False) - cols = ", ".join( - ident.sql(self.dialect) - for ident in map(partial(sg.to_identifier, quoted=quoted), columns) + sql = self._build_insert_template( + name, schema=schema, columns=True, placeholder="%s" ) - specs = ", ".join(repeat("%s", len(columns))) - table = sg.table(name, quoted=quoted) - sql = f"INSERT INTO {table.sql(self.dialect)} ({cols}) VALUES ({specs})" with self.begin() as cur: cur.execute(create_stmt_sql) extras.execute_batch(cur, sql, data, 128) diff --git a/ibis/backends/snowflake/__init__.py b/ibis/backends/snowflake/__init__.py index db227cfe9789..158603ffbef6 100644 --- a/ibis/backends/snowflake/__init__.py +++ b/ibis/backends/snowflake/__init__.py @@ -35,6 +35,7 @@ from ibis.backends.snowflake.compiler import SnowflakeCompiler from ibis.backends.snowflake.converter import SnowflakePandasData from ibis.backends.sql import SQLBackend +from ibis.backends.sql.compiler import STAR from ibis.backends.sql.datatypes import SnowflakeType if TYPE_CHECKING: @@ -873,7 +874,8 @@ def read_csv( # https://docs.snowflake.com/en/sql-reference/sql/put#optional-parameters threads = min((os.cpu_count() or 2) // 2, 99) table = table_name or ibis.util.gen_name("read_csv_snowflake") - quoted = self.compiler.quoted + compiler = self.compiler + quoted = compiler.quoted qtable = sg.to_identifier(table, quoted=quoted) parse_header = header = kwargs.pop("parse_header", True) @@ -904,7 +906,7 @@ def read_csv( if str(path).startswith("https://"): with tempfile.NamedTemporaryFile() as tmp: tmpname = tmp.name - urlretrieve(path, filename=tmpname) + urlretrieve(path, filename=tmpname) # noqa: S310 tmp.flush() cur.execute( f"PUT 'file://{tmpname}' @{stage} PARALLEL = {threads:d}" @@ -916,33 +918,53 @@ def read_csv( # handle setting up the schema in python because snowflake is # broken for csv globs: it cannot parse the result of the following - # query in USING TEMPLATE - (info,) = cur.execute( - f""" - SELECT ARRAY_AGG(OBJECT_CONSTRUCT(*)) - WITHIN GROUP (ORDER BY ORDER_ID ASC) - FROM TABLE( - INFER_SCHEMA( - LOCATION => '@{stage}', - FILE_FORMAT => '{file_format}' - ) - ) - """ - ).fetchone() - columns = ", ".join( - "{} {}{}".format( - sg.to_identifier(field["COLUMN_NAME"], quoted=quoted).sql( - self.name + # query in USING TEMPLATE + query = sg.select( + sge.WithinGroup( + this=sge.ArrayAgg(this=sge.StarMap(this=STAR)), + expression=sge.Order( + expressions=[sge.Ordered(this=sg.column("ORDER_ID"))] ), - field["TYPE"], - " NOT NULL" if not field["NULLABLE"] else "", ) - for field in json.loads(info) + ).from_( + compiler.f.anon.TABLE( + compiler.f.anon.INFER_SCHEMA( + sge.Kwarg( + this=compiler.v.LOCATION, + expression=sge.convert(f"@{stage}"), + ), + sge.Kwarg( + this=compiler.v.FILE_FORMAT, + expression=sge.convert(file_format), + ), + ) + ) ) + (info,) = cur.execute(query.sql(self.dialect)).fetchone() stmts = [ # create a temporary table using the stage and format inferred # from the CSV - f"CREATE TEMP TABLE {qtable} ({columns})", + sge.Create( + kind="TABLE", + this=sge.Schema( + this=qtable, + expressions=[ + sge.ColumnDef( + this=sg.to_identifier( + field["COLUMN_NAME"], quoted=quoted + ), + kind=field["TYPE"], + constraints=( + [sge.NotNullColumnConstraint()] + if not field["NULLABLE"] + else None + ), + ) + for field in json.loads(info) + ], + ), + properties=sge.Properties(expressions=[sge.TemporaryProperty()]), + ).sql(self.dialect), # load the CSV into the table f""" COPY INTO {qtable} @@ -979,7 +1001,8 @@ def read_json( stage = util.gen_name("read_json_stage") file_format = util.gen_name("read_json_format") table = table_name or util.gen_name("read_json_snowflake") - qtable = sg.to_identifier(table, quoted=self.compiler.quoted) + quoted = self.compiler.quoted + qtable = sg.table(table, quoted=quoted) threads = min((os.cpu_count() or 2) // 2, 99) kwargs.setdefault("strip_outer_array", True) @@ -994,6 +1017,28 @@ def read_json( f"CREATE TEMP STAGE {stage} FILE_FORMAT = {file_format}", ] + compiler = self.compiler + query = sg.select( + sge.WithinGroup( + this=sge.ArrayAgg(this=sge.StarMap(this=STAR)), + expression=sge.Order( + expressions=[sge.Ordered(this=sg.column("ORDER_ID"))] + ), + ) + ).from_( + compiler.f.anon.TABLE( + compiler.f.anon.INFER_SCHEMA( + sge.Kwarg( + this=compiler.v.LOCATION, + expression=sge.convert(f"@{stage}"), + ), + sge.Kwarg( + this=compiler.v.FILE_FORMAT, + expression=sge.convert(file_format), + ), + ) + ) + ) with self._safe_raw_sql(";\n".join(stmts)) as cur: cur.execute( f"PUT 'file://{Path(path).absolute()}' @{stage} PARALLEL = {threads:d}" @@ -1001,25 +1046,19 @@ def read_json( cur.execute( ";\n".join( [ - f""" - CREATE TEMP TABLE {qtable} - USING TEMPLATE ( - SELECT ARRAY_AGG(OBJECT_CONSTRUCT(*)) - WITHIN GROUP (ORDER BY ORDER_ID ASC) - FROM TABLE( - INFER_SCHEMA( - LOCATION => '@{stage}', - FILE_FORMAT => '{file_format}' - ) - ) - ) - """, + f"CREATE TEMP TABLE {qtable} USING TEMPLATE ({query.sql(self.dialect)})", # load the JSON file into the table - f""" - COPY INTO {qtable} - FROM @{stage} - MATCH_BY_COLUMN_NAME = {str(match_by_column_name).upper()} - """, + sge.Copy( + this=qtable, + kind=True, + files=[sge.Table(this=sge.Var(this=f"@{stage}"))], + params=[ + sge.CopyParameter( + this=self.compiler.v.MATCH_BY_COLUMN_NAME, + expression=sge.convert(match_by_column_name), + ) + ], + ).sql(self.dialect), ] ) ) @@ -1060,7 +1099,7 @@ def read_parquet( stage = util.gen_name("read_parquet_stage") table = table_name or util.gen_name("read_parquet_snowflake") quoted = self.compiler.quoted - qtable = sg.to_identifier(table, quoted=quoted) + qtable = sg.table(table, quoted=quoted) threads = min((os.cpu_count() or 2) // 2, 99) kwargs.setdefault("USE_LOGICAL_TYPE", True) @@ -1069,27 +1108,52 @@ def read_parquet( ) type_mapper = self.compiler.type_mapper - names_types = [ - (name, type_mapper.to_string(typ), typ.nullable) - for name, typ in schema.items() - ] - - snowflake_schema = ", ".join( - f"{sg.to_identifier(col, quoted=quoted)} {typ}{' NOT NULL' * (not is_nullable)}" - for col, typ, is_nullable in names_types - ) - - cols = ", ".join(f"$1:{col}::{typ}" for col, typ, _ in names_types) stmts = [ f"CREATE TEMP STAGE {stage} FILE_FORMAT = (TYPE = PARQUET {options})", - f"CREATE TEMP TABLE {qtable} ({snowflake_schema})", + sge.Create( + kind="TABLE", + this=sge.Schema( + this=qtable, + expressions=[ + sge.ColumnDef( + this=sg.to_identifier(col, quoted=quoted), + kind=type_mapper.from_ibis(typ), + constraints=( + [sge.NotNullColumnConstraint()] + if not typ.nullable + else None + ), + ) + for col, typ in schema.items() + ], + ), + properties=sge.Properties(expressions=[sge.TemporaryProperty()]), + ).sql(self.dialect), ] query = ";\n".join(stmts) + + param = sge.Parameter(this=sge.convert(1)) + copy_select = ( + sg.select( + *( + sg.cast( + self.compiler.f.get_path(param, sge.convert(col)), + type_mapper.from_ibis(typ), + ) + for col, typ in schema.items() + ) + ) + .from_(sge.Table(this=sge.Var(this=f"@{stage}"))) + .subquery() + ) + copy_query = sge.Copy(this=qtable, kind=True, files=[copy_select]).sql( + self.dialect + ) with self._safe_raw_sql(query) as cur: cur.execute(f"PUT 'file://{abspath}' @{stage} PARALLEL = {threads:d}") - cur.execute(f"COPY INTO {qtable} FROM (SELECT {cols} FROM @{stage})") + cur.execute(copy_query) return self.table(table) @@ -1140,7 +1204,7 @@ def insert( self._run_pre_execute_hooks(obj) - query = self._build_insert_query( + query = self._build_insert_from_table( target=table_name, source=obj, db=db, catalog=catalog ) table = sg.table( diff --git a/ibis/backends/sql/__init__.py b/ibis/backends/sql/__init__.py index b03d7db0fda8..fa99048e1f56 100644 --- a/ibis/backends/sql/__init__.py +++ b/ibis/backends/sql/__init__.py @@ -1,6 +1,7 @@ from __future__ import annotations import abc +from functools import partial from typing import TYPE_CHECKING, Any, ClassVar import sqlglot as sg @@ -427,14 +428,14 @@ def insert( self._run_pre_execute_hooks(obj) - query = self._build_insert_query( + query = self._build_insert_from_table( target=table_name, source=obj, db=db, catalog=catalog ) with self._safe_raw_sql(query): pass - def _build_insert_query( + def _build_insert_from_table( self, *, target: str, source, db: str | None = None, catalog: str | None = None ): compiler = self.compiler @@ -459,6 +460,55 @@ def _build_insert_query( ) return query + def _build_insert_template( + self, + name, + *, + schema: sch.Schema, + catalog: str | None = None, + columns: bool = False, + placeholder: str = "?", + ) -> str: + """Builds an INSERT INTO table VALUES query string with placeholders. + + Parameters + ---------- + name + Name of the table to insert into + schema + Ibis schema of the table to insert into + catalog + Catalog name of the table to insert into + columns + Whether to render the columns to insert into + placeholder + Placeholder string. Can be a format string with a single `{i}` spec. + + Returns + ------- + str + The query string + """ + quoted = self.compiler.quoted + return sge.insert( + sge.Values( + expressions=[ + sge.Tuple( + expressions=[ + sge.Var(this=placeholder.format(i=i)) + for i in range(len(schema)) + ] + ) + ] + ), + into=sg.table(name, catalog=catalog, quoted=quoted), + columns=( + map(partial(sg.to_identifier, quoted=quoted), schema.keys()) + if columns + else None + ), + ).sql(self.dialect) + def truncate_table( self, name: str, database: str | None = None, schema: str | None = None ) -> None: diff --git a/ibis/backends/sql/dialects.py b/ibis/backends/sql/dialects.py index b7166deeb90a..f51e38d3bd75 100644 --- a/ibis/backends/sql/dialects.py +++ b/ibis/backends/sql/dialects.py @@ -392,7 +392,7 @@ class Generator(Postgres.Generator): TABLE_HINTS = False QUERY_HINTS = False NVL2_SUPPORTED = False - PARAMETER_TOKEN = "$" + PARAMETER_TOKEN = "$" # noqa: S105 TABLESAMPLE_SIZE_IS_ROWS = False TABLESAMPLE_SEED_KEYWORD = "REPEATABLE" SUPPORTS_SELECT_INTO = True diff --git a/ibis/backends/sqlite/__init__.py b/ibis/backends/sqlite/__init__.py index 960729ea894d..cde03ac18dca 100644 --- a/ibis/backends/sqlite/__init__.py +++ b/ibis/backends/sqlite/__init__.py @@ -178,10 +178,20 @@ def _inspect_schema( if database is None: database = "main" - quoted_db = _quote(database) - quoted_table = _quote(table_name) + quoted = self.compiler.quoted + quoted_db = sg.to_identifier(database, quoted=quoted) + quoted_table = sg.to_identifier(table_name, quoted=quoted) - sql = f'SELECT name, type, "notnull" FROM {quoted_db}.pragma_table_info({quoted_table})' + sql = ( + sg.select("name", "type", sg.to_identifier("notnull", quoted=quoted)) + .from_( + sge.Table( + this=self.compiler.f.anon.pragma_table_info(quoted_table), + db=quoted_db, + ) + ) + .sql(self.dialect) + ) cur.execute(sql) rows = cur.fetchall() if not rows: @@ -193,8 +203,16 @@ def _inspect_schema( # first row and assume that matches the rest of the rows unknown = [name for name, (typ, _) in table_info.items() if not typ] if unknown: - queries = ", ".join(f"typeof({_quote(name)})" for name in unknown) - cur.execute(f"SELECT {queries} FROM {quoted_db}.{quoted_table} LIMIT 1") + queries = ( + self.compiler.f.typeof(sg.to_identifier(name, quoted=quoted)) + for name in unknown + ) + cur.execute( + sg.select(*queries) + .from_(sg.table(table_name, db=database, quoted=quoted)) + .limit(1) + .sql(self.dialect) + ) row = cur.fetchone() if row is not None: for name, typ in zip(unknown, row): @@ -310,10 +328,8 @@ def _register_in_memory_table(self, op: ops.InMemoryTable) -> None: df = op.data.to_frame() data = df.itertuples(index=False) - cols = ", ".join(_quote(col) for col in op.schema.keys()) - specs = ", ".join(["?"] * len(op.schema)) - insert_stmt = ( - f"INSERT INTO {table.sql(self.name)} ({cols}) VALUES ({specs})" + insert_stmt = self._build_insert_template( + op.name, schema=op.schema, catalog="temp", columns=True ) with self.begin() as cur: @@ -578,12 +594,12 @@ def insert( self._run_pre_execute_hooks(obj) - query = self._build_insert_query( + query = self._build_insert_from_table( target=table_name, source=obj, catalog=database ) insert_stmt = query.sql(self.name) with self.begin() as cur: if overwrite: - cur.execute(f"DELETE FROM {table.sql(self.name)}") + cur.execute(sge.Delete(this=table).sql(self.dialect)) cur.execute(insert_stmt) diff --git a/ibis/backends/trino/__init__.py b/ibis/backends/trino/__init__.py index a02ac97568e8..c933833ab03c 100644 --- a/ibis/backends/trino/__init__.py +++ b/ibis/backends/trino/__init__.py @@ -562,9 +562,7 @@ def _register_in_memory_table(self, op: ops.InMemoryTable) -> None: ).sql(self.name, pretty=True) data = op.data.to_frame().itertuples(index=False) - specs = ", ".join("?" * len(schema)) - table = sg.table(name, quoted=quoted).sql(self.name) - insert_stmt = f"INSERT INTO {table} VALUES ({specs})" + insert_stmt = self._build_insert_template(name, schema=schema) with self.begin() as cur: cur.execute(create_stmt) for row in data: diff --git a/ibis/common/typing.py b/ibis/common/typing.py index bcc11265a5cd..e538e284f306 100644 --- a/ibis/common/typing.py +++ b/ibis/common/typing.py @@ -193,7 +193,7 @@ def evaluate_annotations( for k, v in annots.items(): if isinstance(v, str): try: - v = eval(v, globalns, localns) + v = eval(v, globalns, localns) # noqa: S307 except NameError: if not best_effort: raise diff --git a/ibis/examples/gen_registry.py b/ibis/examples/gen_registry.py index 24657e608e2c..2178df87ed76 100755 --- a/ibis/examples/gen_registry.py +++ b/ibis/examples/gen_registry.py @@ -82,7 +82,7 @@ def add_movielens_example( raw_bytes = source_zip.read_bytes() else: resp = requests.get( - f"https://files.grouplens.org/datasets/movielens/{filename}" + f"https://files.grouplens.org/datasets/movielens/{filename}", timeout=600 ) resp.raise_for_status() raw_bytes = resp.content @@ -135,7 +135,7 @@ def download_and_convert(filename: str, *, bar: tqdm.tqdm): table = con.read_csv(BASE_URL.format(filename)) table.to_parquet(parquet_path, codec="zstd") else: - resp = requests.get(BASE_URL.format(filename)) + resp = requests.get(BASE_URL.format(filename), timeout=600) resp.raise_for_status() raw_bytes = resp.content @@ -172,7 +172,7 @@ def add_zones_geojson(data_path: Path) -> None: file_path = Path(file_name) if not file_path.exists(): - urlretrieve(url, data_path / file_path) + urlretrieve(url, filename=data_path / file_path) # noqa: S310 def add_imdb_example(data_path: Path) -> None: @@ -307,8 +307,9 @@ def main(parser): add_nycflights13_example(data_path, metadata=metadata) print("Adding R examples...") # noqa: T201 + # generate data from R - subprocess.check_call(["Rscript", str(EXAMPLES_DIRECTORY / "gen_examples.R")]) + subprocess.check_call(["Rscript", str(EXAMPLES_DIRECTORY / "gen_examples.R")]) # noqa: S603, S607 verify_case(parser, metadata) diff --git a/ibis/expr/tests/test_decompile.py b/ibis/expr/tests/test_decompile.py index 35186311742e..983264232e10 100644 --- a/ibis/expr/tests/test_decompile.py +++ b/ibis/expr/tests/test_decompile.py @@ -69,7 +69,7 @@ def test_basic(expr, expected): rendered = decompile(expr) locals_ = {} - exec(rendered, {}, locals_) + exec(rendered, {}, locals_) # noqa: S102 restored = locals_["result"] if isinstance(expected, ir.Expr): diff --git a/ibis/tests/util.py b/ibis/tests/util.py index 1b4826168e00..5f08fd9b6812 100644 --- a/ibis/tests/util.py +++ b/ibis/tests/util.py @@ -68,7 +68,7 @@ def assert_decompile_roundtrip( # execute the rendered python code locals_ = {} - exec(rendered, {}, locals_) + exec(rendered, {}, locals_) # noqa: S102 restored = locals_["result"] assert eq(expr.unbind(), restored) diff --git a/ibis/util.py b/ibis/util.py index 611c088ce711..610bd224f375 100644 --- a/ibis/util.py +++ b/ibis/util.py @@ -678,3 +678,28 @@ def __ne__(self, other): return self.obj != other.obj else: return NotImplemented + + +def chunks(n: int, *, chunk_size: int) -> Iterator[tuple[int, int]]: + """Return an iterator of chunk start and end indices. + + Parameters + ---------- + n + The total number of elements. + chunk_size + The size of each chunk. + + Returns + ------- + int + THE start and end indices of each chunk. + + Examples + -------- + >>> list(chunks(10, chunk_size=3)) + [(0, 3), (3, 6), (6, 9), (9, 10)] + >>> list(chunks(10, chunk_size=4)) + [(0, 4), (4, 8), (8, 10)] + """ + return ((start, min(start + chunk_size, n)) for start in range(0, n, chunk_size)) diff --git a/pyproject.toml b/pyproject.toml index 113addaa95c4..448932780bde 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -384,6 +384,7 @@ select = [ "UP", # pyupgrade "W", # pycodestyle "YTT", # flake8-2020 + "S", # flake8-bandit ] ignore = [ "B028", # required stacklevel argument to warn @@ -432,6 +433,7 @@ ignore = [ "SIM300", # yoda conditions "UP007", # Optional[str] -> str | None "UP038", # non-pep604-isinstance, results in slower code + "S101", # ignore "Use of `assert` detected" ] # none of these codes will be automatically fixed by ruff unfixable = [ @@ -446,7 +448,12 @@ required-imports = ["from __future__ import annotations"] [tool.ruff.lint.per-file-ignores] "*test*.py" = [ - "D", # ignore all docstring lints in tests + "D", # ignore all docstring lints in tests + "S301", # pickle is allowed in tests + "S603", # ignore subprocess untrusted input warnings in test files, input is under control of ibis + "S607", # ignore subprocess untrusted exe path warnings in test files, input is under control of ibis + "S608", # ignore sql injection warnings in test files, input is under control of ibis + "S108", # /tmp usage refers to hdfs path ] "{docs,ci}/**/*.py" = ["INP001"] "{ci/release/verify_release,docs/**/*_impl}.py" = [