Skip to content

Commit

Permalink
db.py NO SQL injection problem anymore!
Browse files Browse the repository at this point in the history
  • Loading branch information
PhilippMaschi authored and SongminYu committed Apr 18, 2024
1 parent ed60c07 commit 3b58939
Showing 1 changed file with 31 additions and 34 deletions.
65 changes: 31 additions & 34 deletions utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,29 @@
class DB:

def __init__(self, path):
self.connection = sqlalchemy.create_engine(f'sqlite:///{path}')
self.engine = sqlalchemy.create_engine(f'sqlite:///{path}')
self.metadata = sqlalchemy.MetaData()
self.metadata.reflect(bind=self.engine)

def if_exists(self, table_name: str) -> bool:
return table_name in self.get_table_names()

def get_engine(self):
return self.connection
return self.engine

def close(self):
self.connection.dispose()
self.engine.dispose()

def get_table_names(self):
return sqlalchemy.inspect(self.connection).get_table_names()
return sqlalchemy.inspect(self.engine).get_table_names()

def clear_database(self):
for table_name in self.get_table_names():
with self.connection.connect() as conn:
with self.engine.connect() as conn:
result = conn.execute(sqlalchemy.text(f"drop table {table_name}"))

def drop_table(self, table_name: str):
with self.connection.connect() as conn:
with self.engine.connect() as conn:
result = conn.execute(sqlalchemy.text(f"drop table if exists {table_name}"))

def write_dataframe(
Expand All @@ -45,41 +47,36 @@ def write_dataframe(
): # if_exists: {'replace', 'fail', 'append'}
data_frame.to_sql(
table_name,
self.connection,
self.engine,
index=False,
dtype=data_types,
if_exists=if_exists,
chunksize=10_000,
)

def read_dataframe(self, table_name: str, filter: dict = None, column_names: List[str] = None) -> pd.DataFrame:
"""column_names will only extract certain columns (list)
filter have to be dict with: column_name: value. The table where the column has that value is returned
Returns:
object: pandas Dataframe"""
if filter is None:
filter = {}
if column_names is None:
column_names = []
if len(column_names) > 0:
columns2extract = ""
for name in column_names:
columns2extract += name + ','
columns2extract = columns2extract[:-1] # delete last ","
else:
columns2extract = "*" # select all columns
if len(filter) > 0:
condition_temp = ""
for key, value in filter.items():
condition_temp += key + " == '" + str(value) + "' and "
condition = " where " + condition_temp
condition = condition[0:-5] # deleting last "and"
"""Reads data from a database table with optional filtering and column selection.
Args:
table_name (str): Name of the table to query.
filter (dict, optional): Dictionary with {column_name: value} to filter the data.
column_names (list of str, optional): List of column names to extract.
Returns:
pd.DataFrame: Resulting dataframe.
"""
table = self.metadata.tables[table_name]

if column_names:
query = sqlalchemy.select([table.columns[name] for name in column_names])
else:
condition = ''
query = sqlalchemy.select(table)

dataframe = pd.read_sql('select ' + columns2extract + ' from ' + table_name + condition, con=self.connection)
return dataframe
if filter:
for key, value in filter.items():
query = query.where(table.columns[key] == value)
with self.engine.connect() as conn:
return pd.read_sql(query, conn)

def delete_row_from_table(
self,
Expand All @@ -93,10 +90,10 @@ def delete_row_from_table(
condition = condition[0:-5] # deleting last "and"

query = f"DELETE FROM {table_name}" + condition
self.connection.execute(query)
self.engine.execute(query)

def query(self, sql) -> pd.DataFrame:
return pd.read_sql(sql, self.connection)
return pd.read_sql(sql, self.engine)


def create_db_conn(config: "Config") -> DB:
Expand Down

0 comments on commit 3b58939

Please sign in to comment.