Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Escape ids more consistently in ml module #1074

Merged
merged 5 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions bigframes/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,9 @@ def label_to_identifier(label: typing.Hashable, strict: bool = False) -> str:
"""
# Column values will be loaded as null if the column name has spaces.
# https://github.com/googleapis/python-bigquery/issues/1566
identifier = str(label).replace(" ", "_")

identifier = str(label)
if strict:
identifier = str(label).replace(" ", "_")
identifier = re.sub(r"[^a-zA-Z0-9_]", "", identifier)
if not identifier:
identifier = "id"
Expand Down
14 changes: 6 additions & 8 deletions bigframes/ml/compose.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from google.cloud import bigquery

from bigframes.core import log_adapter
import bigframes.core.compile.googlesql as sql_utils
from bigframes.ml import base, core, globals, impute, preprocessing, utils
import bigframes.pandas as bpd

Expand Down Expand Up @@ -98,25 +99,22 @@ class SQLScalarColumnTransformer:
def __init__(self, sql: str, target_column: str = "transformed_{0}"):
super().__init__()
self._sql = sql
# TODO: More robust unescaping
self._target_column = target_column.replace("`", "")

PLAIN_COLNAME_RX = re.compile("^[a-z][a-z0-9_]*$", re.IGNORECASE)

def escape(self, colname: str):
colname = colname.replace("`", "")
if self.PLAIN_COLNAME_RX.match(colname):
return colname
return f"`{colname}`"

def _compile_to_sql(
self, X: bpd.DataFrame, columns: Optional[Iterable[str]] = None
) -> List[str]:
if columns is None:
columns = X.columns
result = []
for column in columns:
current_sql = self._sql.format(self.escape(column))
current_target_column = self.escape(self._target_column.format(column))
current_sql = self._sql.format(sql_utils.identifier(column))
current_target_column = sql_utils.identifier(
self._target_column.format(column)
)
result.append(f"{current_sql} AS {current_target_column}")
return result

Expand Down
2 changes: 1 addition & 1 deletion bigframes/ml/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def __init__(self, session: bigframes.Session, model: bigquery.Model):
self._session = session
self._model = model
self._model_manipulation_sql_generator = ml_sql.ModelManipulationSqlGenerator(
self.model_name
self._model.reference
)

def _apply_ml_tvf(
Expand Down
87 changes: 47 additions & 40 deletions bigframes/ml/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import bigframes_vendored.constants as constants
import google.cloud.bigquery

import bigframes.core.compile.googlesql as sql_utils
import bigframes.core.sql as sql_vals


# TODO: Add proper escaping logic from core/compile module
class BaseSqlGenerator:
Expand All @@ -29,10 +32,8 @@ class BaseSqlGenerator:
# General methods
def encode_value(self, v: Union[str, int, float, Iterable[str]]) -> str:
"""Encode a parameter value for SQL"""
if isinstance(v, str):
return f'"{v}"'
elif isinstance(v, int) or isinstance(v, float):
return f"{v}"
if isinstance(v, (str, int, float)):
return sql_vals.simple_literal(v)
elif isinstance(v, Iterable):
inner = ", ".join([self.encode_value(x) for x in v])
return f"[{inner}]"
Expand All @@ -50,7 +51,10 @@ def build_parameters(self, **kwargs: Union[str, int, float, Iterable[str]]) -> s
def build_structs(self, **kwargs: Union[int, float]) -> str:
"""Encode a dict of values into a formatted STRUCT items for SQL"""
indent_str = " "
param_strs = [f"{v} AS {k}" for k, v in kwargs.items()]
param_strs = [
f"{sql_vals.simple_literal(v)} AS {sql_utils.identifier(k)}"
for k, v in kwargs.items()
]
return "\n" + indent_str + f",\n{indent_str}".join(param_strs)

def build_expressions(self, *expr_sqls: str) -> str:
Expand All @@ -61,7 +65,7 @@ def build_expressions(self, *expr_sqls: str) -> str:
def build_schema(self, **kwargs: str) -> str:
"""Encode a dict of values into a formatted schema type items for SQL"""
indent_str = " "
param_strs = [f"{k} {v}" for k, v in kwargs.items()]
param_strs = [f"{sql_utils.identifier(k)} {v}" for k, v in kwargs.items()]
return "\n" + indent_str + f",\n{indent_str}".join(param_strs)

def options(self, **kwargs: Union[str, int, float, Iterable[str]]) -> str:
Expand All @@ -74,7 +78,7 @@ def struct_options(self, **kwargs: Union[int, float]) -> str:

def struct_columns(self, columns: Iterable[str]) -> str:
"""Encode a BQ Table columns to a STRUCT."""
columns_str = ", ".join(columns)
columns_str = ", ".join(map(sql_utils.identifier, columns))
return f"STRUCT({columns_str})"

def input(self, **kwargs: str) -> str:
Expand All @@ -97,38 +101,38 @@ def transform(self, *expr_sqls: str) -> str:

def ml_standard_scaler(self, numeric_expr_sql: str, name: str) -> str:
"""Encode ML.STANDARD_SCALER for BQML"""
return f"""ML.STANDARD_SCALER({numeric_expr_sql}) OVER() AS {name}"""
return f"""ML.STANDARD_SCALER({sql_utils.identifier(numeric_expr_sql)}) OVER() AS {sql_utils.identifier(name)}"""

def ml_max_abs_scaler(self, numeric_expr_sql: str, name: str) -> str:
"""Encode ML.MAX_ABS_SCALER for BQML"""
return f"""ML.MAX_ABS_SCALER({numeric_expr_sql}) OVER() AS {name}"""
return f"""ML.MAX_ABS_SCALER({sql_utils.identifier(numeric_expr_sql)}) OVER() AS {sql_utils.identifier(name)}"""

def ml_min_max_scaler(self, numeric_expr_sql: str, name: str) -> str:
"""Encode ML.MIN_MAX_SCALER for BQML"""
return f"""ML.MIN_MAX_SCALER({numeric_expr_sql}) OVER() AS {name}"""
return f"""ML.MIN_MAX_SCALER({sql_utils.identifier(numeric_expr_sql)}) OVER() AS {sql_utils.identifier(name)}"""

def ml_imputer(
self,
expr_sql: str,
col_name: str,
strategy: str,
name: str,
) -> str:
"""Encode ML.IMPUTER for BQML"""
return f"""ML.IMPUTER({expr_sql}, '{strategy}') OVER() AS {name}"""
return f"""ML.IMPUTER({sql_utils.identifier(col_name)}, '{strategy}') OVER() AS {sql_utils.identifier(name)}"""

def ml_bucketize(
self,
numeric_expr_sql: str,
input_id: str,
array_split_points: Iterable[Union[int, float]],
name: str,
output_id: str,
) -> str:
"""Encode ML.BUCKETIZE for BQML"""
# Use Python value rather than Numpy value to serialization.
points = [
point.item() if hasattr(point, "item") else point
for point in array_split_points
]
return f"""ML.BUCKETIZE({numeric_expr_sql}, {points}, FALSE) AS {name}"""
return f"""ML.BUCKETIZE({sql_utils.identifier(input_id)}, {points}, FALSE) AS {sql_utils.identifier(output_id)}"""

def ml_quantile_bucketize(
self,
Expand All @@ -137,7 +141,7 @@ def ml_quantile_bucketize(
name: str,
) -> str:
"""Encode ML.QUANTILE_BUCKETIZE for BQML"""
return f"""ML.QUANTILE_BUCKETIZE({numeric_expr_sql}, {num_bucket}) OVER() AS {name}"""
return f"""ML.QUANTILE_BUCKETIZE({sql_utils.identifier(numeric_expr_sql)}, {num_bucket}) OVER() AS {sql_utils.identifier(name)}"""

def ml_one_hot_encoder(
self,
Expand All @@ -149,7 +153,7 @@ def ml_one_hot_encoder(
) -> str:
"""Encode ML.ONE_HOT_ENCODER for BQML.
https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-one-hot-encoder for params."""
return f"""ML.ONE_HOT_ENCODER({numeric_expr_sql}, '{drop}', {top_k}, {frequency_threshold}) OVER() AS {name}"""
return f"""ML.ONE_HOT_ENCODER({sql_utils.identifier(numeric_expr_sql)}, '{drop}', {top_k}, {frequency_threshold}) OVER() AS {sql_utils.identifier(name)}"""

def ml_label_encoder(
self,
Expand All @@ -160,14 +164,14 @@ def ml_label_encoder(
) -> str:
"""Encode ML.LABEL_ENCODER for BQML.
https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-label-encoder for params."""
return f"""ML.LABEL_ENCODER({numeric_expr_sql}, {top_k}, {frequency_threshold}) OVER() AS {name}"""
return f"""ML.LABEL_ENCODER({sql_utils.identifier(numeric_expr_sql)}, {top_k}, {frequency_threshold}) OVER() AS {sql_utils.identifier(name)}"""

def ml_polynomial_expand(
self, columns: Iterable[str], degree: int, name: str
) -> str:
"""Encode ML.POLYNOMIAL_EXPAND.
https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-polynomial-expand"""
return f"""ML.POLYNOMIAL_EXPAND({self.struct_columns(columns)}, {degree}) AS {name}"""
return f"""ML.POLYNOMIAL_EXPAND({self.struct_columns(columns)}, {degree}) AS {sql_utils.identifier(name)}"""

def ml_distance(
self,
Expand All @@ -179,7 +183,7 @@ def ml_distance(
) -> str:
"""Encode ML.DISTANCE for BQML.
https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-distance"""
return f"""SELECT *, ML.DISTANCE({col_x}, {col_y}, '{type}') AS {name} FROM ({source_sql})"""
return f"""SELECT *, ML.DISTANCE({sql_utils.identifier(col_x)}, {sql_utils.identifier(col_y)}, '{type}') AS {sql_utils.identifier(name)} FROM ({source_sql})"""


class ModelCreationSqlGenerator(BaseSqlGenerator):
Expand All @@ -189,7 +193,7 @@ def _model_id_sql(
self,
model_ref: google.cloud.bigquery.ModelReference,
):
return f"`{model_ref.project}`.`{model_ref.dataset_id}`.`{model_ref.model_id}`"
return f"{sql_utils.identifier(model_ref.project)}.{sql_utils.identifier(model_ref.dataset_id)}.{sql_utils.identifier(model_ref.model_id)}"

# Model create and alter
def create_model(
Expand Down Expand Up @@ -276,8 +280,11 @@ def create_xgboost_imported_model(
class ModelManipulationSqlGenerator(BaseSqlGenerator):
"""Sql generator for manipulating a model entity. Model name is the full model path of project_id.dataset_id.model_id."""

def __init__(self, model_name: str):
self._model_name = model_name
def __init__(self, model_ref: google.cloud.bigquery.ModelReference):
self._model_ref = model_ref

def _model_ref_sql(self) -> str:
return f"{sql_utils.identifier(self._model_ref.project)}.{sql_utils.identifier(self._model_ref.dataset_id)}.{sql_utils.identifier(self._model_ref.model_id)}"

# Alter model
def alter_model(
Expand All @@ -287,88 +294,88 @@ def alter_model(
"""Encode the ALTER MODEL statement for BQML"""
options_sql = self.options(**options)

parts = [f"ALTER MODEL `{self._model_name}`"]
parts = [f"ALTER MODEL {self._model_ref_sql()}"]
parts.append(f"SET {options_sql}")
return "\n".join(parts)

# ML prediction TVFs
def ml_predict(self, source_sql: str) -> str:
"""Encode ML.PREDICT for BQML"""
return f"""SELECT * FROM ML.PREDICT(MODEL `{self._model_name}`,
return f"""SELECT * FROM ML.PREDICT(MODEL {self._model_ref_sql()},
({source_sql}))"""

def ml_forecast(self, struct_options: Mapping[str, Union[int, float]]) -> str:
"""Encode ML.FORECAST for BQML"""
struct_options_sql = self.struct_options(**struct_options)
return f"""SELECT * FROM ML.FORECAST(MODEL `{self._model_name}`,
return f"""SELECT * FROM ML.FORECAST(MODEL {self._model_ref_sql()},
{struct_options_sql})"""

def ml_generate_text(
self, source_sql: str, struct_options: Mapping[str, Union[int, float]]
) -> str:
"""Encode ML.GENERATE_TEXT for BQML"""
struct_options_sql = self.struct_options(**struct_options)
return f"""SELECT * FROM ML.GENERATE_TEXT(MODEL `{self._model_name}`,
return f"""SELECT * FROM ML.GENERATE_TEXT(MODEL {self._model_ref_sql()},
({source_sql}), {struct_options_sql})"""

def ml_generate_embedding(
self, source_sql: str, struct_options: Mapping[str, Union[int, float]]
) -> str:
"""Encode ML.GENERATE_EMBEDDING for BQML"""
struct_options_sql = self.struct_options(**struct_options)
return f"""SELECT * FROM ML.GENERATE_EMBEDDING(MODEL `{self._model_name}`,
return f"""SELECT * FROM ML.GENERATE_EMBEDDING(MODEL {self._model_ref_sql()},
({source_sql}), {struct_options_sql})"""

def ml_detect_anomalies(
self, source_sql: str, struct_options: Mapping[str, Union[int, float]]
) -> str:
"""Encode ML.DETECT_ANOMALIES for BQML"""
struct_options_sql = self.struct_options(**struct_options)
return f"""SELECT * FROM ML.DETECT_ANOMALIES(MODEL `{self._model_name}`,
return f"""SELECT * FROM ML.DETECT_ANOMALIES(MODEL {self._model_ref_sql()},
{struct_options_sql}, ({source_sql}))"""

# ML evaluation TVFs
def ml_evaluate(self, source_sql: Optional[str] = None) -> str:
"""Encode ML.EVALUATE for BQML"""
if source_sql is None:
return f"""SELECT * FROM ML.EVALUATE(MODEL `{self._model_name}`)"""
return f"""SELECT * FROM ML.EVALUATE(MODEL {self._model_ref_sql()})"""
else:
return f"""SELECT * FROM ML.EVALUATE(MODEL `{self._model_name}`,
return f"""SELECT * FROM ML.EVALUATE(MODEL {self._model_ref_sql()},
({source_sql}))"""

def ml_arima_coefficients(self) -> str:
"""Encode ML.ARIMA_COEFFICIENTS for BQML"""
return f"""SELECT * FROM ML.ARIMA_COEFFICIENTS(MODEL `{self._model_name}`)"""
return f"""SELECT * FROM ML.ARIMA_COEFFICIENTS(MODEL {self._model_ref_sql()})"""

# ML evaluation TVFs
def ml_llm_evaluate(self, source_sql: str, task_type: Optional[str] = None) -> str:
"""Encode ML.EVALUATE for BQML"""
# Note: don't need index as evaluate returns a new table
return f"""SELECT * FROM ML.EVALUATE(MODEL `{self._model_name}`,
return f"""SELECT * FROM ML.EVALUATE(MODEL {self._model_ref_sql()},
({source_sql}), STRUCT("{task_type}" AS task_type))"""

# ML evaluation TVFs
def ml_arima_evaluate(self, show_all_candidate_models: bool = False) -> str:
"""Encode ML.ARMIA_EVALUATE for BQML"""
return f"""SELECT * FROM ML.ARIMA_EVALUATE(MODEL `{self._model_name}`,
return f"""SELECT * FROM ML.ARIMA_EVALUATE(MODEL {self._model_ref_sql()},
STRUCT({show_all_candidate_models} AS show_all_candidate_models))"""

def ml_centroids(self) -> str:
"""Encode ML.CENTROIDS for BQML"""
return f"""SELECT * FROM ML.CENTROIDS(MODEL `{self._model_name}`)"""
return f"""SELECT * FROM ML.CENTROIDS(MODEL {self._model_ref_sql()})"""

def ml_principal_components(self) -> str:
"""Encode ML.PRINCIPAL_COMPONENTS for BQML"""
return f"""SELECT * FROM ML.PRINCIPAL_COMPONENTS(MODEL `{self._model_name}`)"""
return (
f"""SELECT * FROM ML.PRINCIPAL_COMPONENTS(MODEL {self._model_ref_sql()})"""
)

def ml_principal_component_info(self) -> str:
"""Encode ML.PRINCIPAL_COMPONENT_INFO for BQML"""
return (
f"""SELECT * FROM ML.PRINCIPAL_COMPONENT_INFO(MODEL `{self._model_name}`)"""
)
return f"""SELECT * FROM ML.PRINCIPAL_COMPONENT_INFO(MODEL {self._model_ref_sql()})"""

# ML transform TVF, that require a transform_only type model
def ml_transform(self, source_sql: str) -> str:
"""Encode ML.TRANSFORM for BQML"""
return f"""SELECT * FROM ML.TRANSFORM(MODEL `{self._model_name}`,
return f"""SELECT * FROM ML.TRANSFORM(MODEL {self._model_ref_sql()},
({source_sql}))"""
16 changes: 9 additions & 7 deletions tests/system/large/ml/test_compose.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,14 @@ def test_columntransformer_standalone_fit_and_transform(


def test_columntransformer_standalone_fit_transform(new_penguins_df):
# rename column to ensure robustness to column names that must be escaped
new_penguins_df = new_penguins_df.rename(columns={"species": "123 'species'"})
transformer = compose.ColumnTransformer(
[
(
"onehot",
preprocessing.OneHotEncoder(),
"species",
"123 'species'",
),
(
"standard_scale",
Expand All @@ -108,7 +110,7 @@ def test_columntransformer_standalone_fit_transform(new_penguins_df):
"CASE WHEN {0} IS NULL THEN -1 ELSE LENGTH({0}) END",
target_column="len_{0}",
),
"species",
"123 'species'",
),
(
"identity",
Expand All @@ -119,16 +121,16 @@ def test_columntransformer_standalone_fit_transform(new_penguins_df):
)

result = transformer.fit_transform(
new_penguins_df[["species", "culmen_length_mm", "flipper_length_mm"]]
new_penguins_df[["123 'species'", "culmen_length_mm", "flipper_length_mm"]]
).to_pandas()

utils.check_pandas_df_schema_and_index(
result,
columns=[
"onehotencoded_species",
"onehotencoded_123 'species'",
"standard_scaled_culmen_length_mm",
"standard_scaled_flipper_length_mm",
"len_species",
"len_123 'species'",
"culmen_length_mm",
"flipper_length_mm",
],
Expand Down Expand Up @@ -189,8 +191,8 @@ def test_columntransformer_save_load(new_penguins_df, dataset_id):
preprocessing.OneHotEncoder(max_categories=1000001, min_frequency=0),
"species",
),
("standard_scaler", preprocessing.StandardScaler(), "culmen_length_mm"),
("standard_scaler", preprocessing.StandardScaler(), "flipper_length_mm"),
("standard_scaler", preprocessing.StandardScaler(), "culmen lengthmm"),
("standard_scaler", preprocessing.StandardScaler(), "flipper length mm"),
(
"sql_scalar_column_transformer",
compose.SQLScalarColumnTransformer(
Expand Down
Loading