Skip to content

Commit

Permalink
Merge pull request #52 from DalgoT4D/order-by-pagination
Browse files Browse the repository at this point in the history
Order by pagination
  • Loading branch information
fatchat authored Feb 8, 2024
2 parents 4476576 + ea9b756 commit d584094
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 16 deletions.
44 changes: 37 additions & 7 deletions dbt_automation/utils/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from google.cloud.exceptions import NotFound
from google.oauth2 import service_account
import json
from dbt_automation.utils.columnutils import quote_columnname
from dbt_automation.utils.interfaces.warehouse_interface import WarehouseInterface

basicConfig(level=INFO)
Expand All @@ -30,7 +31,7 @@ def __init__(self, conn_info=None, location=None):

def execute(self, statement: str, **kwargs) -> list:
"""run a query and return the results"""
query_job = self.bqclient.query(statement, **kwargs)
query_job = self.bqclient.query(statement, location=self.location, **kwargs)
return query_job.result()

def get_tables(self, schema: str) -> list:
Expand All @@ -50,12 +51,42 @@ def get_table_columns(self, schema: str, table: str) -> list:
column_names = [field.name for field in table.schema]
return column_names

def get_table_data(self, schema: str, table: str, limit: int) -> list:
def get_table_data(
self,
schema: str,
table: str,
limit: int,
page: int = 1,
order_by: str = None,
order: int = 1, # ASC
) -> list:
"""returns limited rows from the specified table in the given schema"""
table_ref = f"{schema}.{table}"
table: bigquery.Table = self.bqclient.get_table(table_ref)
records = self.bqclient.list_rows(table=table, max_results=limit)
rows = [dict(record) for record in records]

offset = (page - 1) * limit
# total_rows = self.execute(
# f"SELECT COUNT(*) as total_rows FROM `{schema}`.`{table}`"
# )
# total_rows = next(total_rows).total_rows

# select
query = f"""
SELECT *
FROM `{schema}`.`{table}`
"""

# order
if order_by:
query += f"""
ORDER BY {quote_columnname(order_by)} {"ASC" if order == 1 else "DESC"}
"""

# offset, limit
query += f"""
LIMIT {limit} OFFSET {offset}
"""

result = self.execute(query)
rows = [dict(record) for record in result]

return rows

Expand Down Expand Up @@ -86,7 +117,6 @@ def get_json_columnspec(
FROM keys
CROSS JOIN UNNEST(keys.keys) AS k
''',
location=self.location,
)
return [json_field["k"] for json_field in query]

Expand Down
10 changes: 9 additions & 1 deletion dbt_automation/utils/interfaces/warehouse_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,15 @@ def get_schemas(self):
pass

@abstractmethod
def get_table_data(self, schema: str, table: str, limit: int):
def get_table_data(
self,
schema: str,
table: str,
limit: int,
page: int = 1,
order_by: str = None,
order: int = 1, # ASC
):
pass

@abstractmethod
Expand Down
41 changes: 33 additions & 8 deletions dbt_automation/utils/postgres.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
"""helpers for postgres"""

from logging import basicConfig, getLogger, INFO
import psycopg2
import os
from dbt_automation.utils.columnutils import quote_columnname
from dbt_automation.utils.interfaces.warehouse_interface import WarehouseInterface


Expand Down Expand Up @@ -81,16 +83,39 @@ def get_schemas(self) -> list:
)
return [x[0] for x in resultset]

def get_table_data(self, schema: str, table: str, limit: int) -> list:
"""returns limited rows from the specified table in the given schema"""
def get_table_data(
self,
schema: str,
table: str,
limit: int,
page: int = 1,
order_by: str = None,
order: int = 1, # ASC
) -> list:
"""
returns limited rows from the specified table in the given schema
"""
offset = (page - 1) * limit
# total_rows = self.execute(f"SELECT COUNT(*) FROM {schema}.{table}")[0][0]

resultset = self.execute(
f"""
SELECT *
FROM {schema}.{table}
LIMIT {limit};
# select
query = f"""
SELECT *
FROM {schema}.{table}
"""

# order
if order_by:
query += f"""
ORDER BY {quote_columnname(order_by)} {"ASC" if order == 1 else "DESC"}
"""
) # returns an array of tuples of values

# offset, limit
query += f"""
OFFSET {offset} LIMIT {limit};
"""

resultset = self.execute(query) # returns an array of tuples of values
col_names = [desc[0] for desc in self.cursor.description]
rows = [dict(zip(col_names, row)) for row in resultset]

Expand Down

0 comments on commit d584094

Please sign in to comment.