From 3b589392743f0b05f8518bf8addb1d3f2095f6b3 Mon Sep 17 00:00:00 2001 From: PhilippMaschi Date: Wed, 17 Apr 2024 23:15:37 +0200 Subject: [PATCH] db.py NO SQL injection problem anymore! --- utils/db.py | 65 +++++++++++++++++++++++++---------------------------- 1 file changed, 31 insertions(+), 34 deletions(-) diff --git a/utils/db.py b/utils/db.py index 3f822ea..05505a8 100644 --- a/utils/db.py +++ b/utils/db.py @@ -13,27 +13,29 @@ class DB: def __init__(self, path): - self.connection = sqlalchemy.create_engine(f'sqlite:///{path}') + self.engine = sqlalchemy.create_engine(f'sqlite:///{path}') + self.metadata = sqlalchemy.MetaData() + self.metadata.reflect(bind=self.engine) def if_exists(self, table_name: str) -> bool: return table_name in self.get_table_names() def get_engine(self): - return self.connection + return self.engine def close(self): - self.connection.dispose() + self.engine.dispose() def get_table_names(self): - return sqlalchemy.inspect(self.connection).get_table_names() + return sqlalchemy.inspect(self.engine).get_table_names() def clear_database(self): for table_name in self.get_table_names(): - with self.connection.connect() as conn: + with self.engine.connect() as conn: result = conn.execute(sqlalchemy.text(f"drop table {table_name}")) def drop_table(self, table_name: str): - with self.connection.connect() as conn: + with self.engine.connect() as conn: result = conn.execute(sqlalchemy.text(f"drop table if exists {table_name}")) def write_dataframe( @@ -45,7 +47,7 @@ def write_dataframe( ): # if_exists: {'replace', 'fail', 'append'} data_frame.to_sql( table_name, - self.connection, + self.engine, index=False, dtype=data_types, if_exists=if_exists, @@ -53,33 +55,28 @@ def write_dataframe( ) def read_dataframe(self, table_name: str, filter: dict = None, column_names: List[str] = None) -> pd.DataFrame: - """column_names will only extract certain columns (list) - filter have to be dict with: column_name: value. The table where the column has that value is returned - - Returns: - object: pandas Dataframe""" - if filter is None: - filter = {} - if column_names is None: - column_names = [] - if len(column_names) > 0: - columns2extract = "" - for name in column_names: - columns2extract += name + ',' - columns2extract = columns2extract[:-1] # delete last "," - else: - columns2extract = "*" # select all columns - if len(filter) > 0: - condition_temp = "" - for key, value in filter.items(): - condition_temp += key + " == '" + str(value) + "' and " - condition = " where " + condition_temp - condition = condition[0:-5] # deleting last "and" + """Reads data from a database table with optional filtering and column selection. + + Args: + table_name (str): Name of the table to query. + filter (dict, optional): Dictionary with {column_name: value} to filter the data. + column_names (list of str, optional): List of column names to extract. + + Returns: + pd.DataFrame: Resulting dataframe. + """ + table = self.metadata.tables[table_name] + + if column_names: + query = sqlalchemy.select([table.columns[name] for name in column_names]) else: - condition = '' + query = sqlalchemy.select(table) - dataframe = pd.read_sql('select ' + columns2extract + ' from ' + table_name + condition, con=self.connection) - return dataframe + if filter: + for key, value in filter.items(): + query = query.where(table.columns[key] == value) + with self.engine.connect() as conn: + return pd.read_sql(query, conn) def delete_row_from_table( self, @@ -93,10 +90,10 @@ def delete_row_from_table( condition = condition[0:-5] # deleting last "and" query = f"DELETE FROM {table_name}" + condition - self.connection.execute(query) + self.engine.execute(query) def query(self, sql) -> pd.DataFrame: - return pd.read_sql(sql, self.connection) + return pd.read_sql(sql, self.engine) def create_db_conn(config: "Config") -> DB: