From 1f630e07835e38c13d6f68251f3ac91cf1514c73 Mon Sep 17 00:00:00 2001 From: Ishan Date: Thu, 8 Feb 2024 16:53:17 +0530 Subject: [PATCH 1/4] setting up pagination for fetching table data --- dbt_automation/utils/bigquery.py | 14 +++++++++++--- .../utils/interfaces/warehouse_interface.py | 4 +++- dbt_automation/utils/postgres.py | 19 +++++++++++++++---- 3 files changed, 29 insertions(+), 8 deletions(-) diff --git a/dbt_automation/utils/bigquery.py b/dbt_automation/utils/bigquery.py index c54095f..2bcaf2d 100644 --- a/dbt_automation/utils/bigquery.py +++ b/dbt_automation/utils/bigquery.py @@ -50,14 +50,22 @@ def get_table_columns(self, schema: str, table: str) -> list: column_names = [field.name for field in table.schema] return column_names - def get_table_data(self, schema: str, table: str, limit: int) -> list: + def get_table_data( + self, schema: str, table: str, limit: int, page: int = 1, page_token: str = None + ) -> dict: """returns limited rows from the specified table in the given schema""" table_ref = f"{schema}.{table}" table: bigquery.Table = self.bqclient.get_table(table_ref) - records = self.bqclient.list_rows(table=table, max_results=limit) + records = self.bqclient.list_rows( + table=table, max_results=limit, page_token=page_token + ) rows = [dict(record) for record in records] - return rows + return { + "total_rows": records.total_rows, + "next_page": records.next_page_token, + "rows": rows, + } def get_columnspec(self, schema: str, table_id: str): """fetch the list of columns from a BigQuery table.""" diff --git a/dbt_automation/utils/interfaces/warehouse_interface.py b/dbt_automation/utils/interfaces/warehouse_interface.py index 07e13e0..e54ec76 100644 --- a/dbt_automation/utils/interfaces/warehouse_interface.py +++ b/dbt_automation/utils/interfaces/warehouse_interface.py @@ -15,7 +15,9 @@ def get_schemas(self): pass @abstractmethod - def get_table_data(self, schema: str, table: str, limit: int): + def get_table_data( + self, schema: str, table: str, limit: int, page: int = 1, page_token: str = None + ): pass @abstractmethod diff --git a/dbt_automation/utils/postgres.py b/dbt_automation/utils/postgres.py index 4b671ee..019300c 100644 --- a/dbt_automation/utils/postgres.py +++ b/dbt_automation/utils/postgres.py @@ -1,4 +1,5 @@ """helpers for postgres""" + from logging import basicConfig, getLogger, INFO import psycopg2 import os @@ -81,20 +82,30 @@ def get_schemas(self) -> list: ) return [x[0] for x in resultset] - def get_table_data(self, schema: str, table: str, limit: int) -> list: - """returns limited rows from the specified table in the given schema""" + def get_table_data( + self, schema: str, table: str, limit: int, page: int = 1, page_token: str = None + ) -> dict: + """ + returns limited rows from the specified table in the given schema + """ + offset = (page - 1) * limit + total_rows = self.execute(f"SELECT COUNT(*) FROM {schema}.{table}")[0][0] resultset = self.execute( f""" SELECT * FROM {schema}.{table} - LIMIT {limit}; + OFFSET {offset} LIMIT {limit}; """ ) # returns an array of tuples of values col_names = [desc[0] for desc in self.cursor.description] rows = [dict(zip(col_names, row)) for row in resultset] - return rows + return { + "total_rows": total_rows, + "next_page": page + 1 if (page * limit) < total_rows else None, + "rows": rows, + } def get_table_columns(self, schema: str, table: str) -> list: """returns the column names of the specified table in the given schema""" From 61e2d20e9a77a295c17c409de0a67f6032aae1a2 Mon Sep 17 00:00:00 2001 From: Ishan Date: Thu, 8 Feb 2024 17:45:03 +0530 Subject: [PATCH 2/4] using raw query in bigquery to fetch table data --- dbt_automation/utils/bigquery.py | 46 +++++++++++++++---- .../utils/interfaces/warehouse_interface.py | 8 +++- dbt_automation/utils/postgres.py | 32 ++++++++++--- 3 files changed, 68 insertions(+), 18 deletions(-) diff --git a/dbt_automation/utils/bigquery.py b/dbt_automation/utils/bigquery.py index 2bcaf2d..1bf3154 100644 --- a/dbt_automation/utils/bigquery.py +++ b/dbt_automation/utils/bigquery.py @@ -6,6 +6,7 @@ from google.cloud.exceptions import NotFound from google.oauth2 import service_account import json +from dbt_automation.utils.columnutils import quote_columnname from dbt_automation.utils.interfaces.warehouse_interface import WarehouseInterface basicConfig(level=INFO) @@ -30,7 +31,7 @@ def __init__(self, conn_info=None, location=None): def execute(self, statement: str, **kwargs) -> list: """run a query and return the results""" - query_job = self.bqclient.query(statement, **kwargs) + query_job = self.bqclient.query(statement, location=self.location, **kwargs) return query_job.result() def get_tables(self, schema: str) -> list: @@ -51,20 +52,45 @@ def get_table_columns(self, schema: str, table: str) -> list: return column_names def get_table_data( - self, schema: str, table: str, limit: int, page: int = 1, page_token: str = None + self, + schema: str, + table: str, + limit: int, + page: int = 1, + order_by: str = None, + order: int = 1, # ASC ) -> dict: """returns limited rows from the specified table in the given schema""" - table_ref = f"{schema}.{table}" - table: bigquery.Table = self.bqclient.get_table(table_ref) - records = self.bqclient.list_rows( - table=table, max_results=limit, page_token=page_token + + offset = (page - 1) * limit + total_rows = self.execute( + f"SELECT COUNT(*) as total_rows FROM `{schema}`.`{table}`" ) - rows = [dict(record) for record in records] + total_rows = next(total_rows).total_rows + + # select + query = f""" + SELECT * + FROM `{schema}`.`{table}` + """ + + # order + if order_by: + query += f""" + ORDER BY {quote_columnname(order_by)} {"ASC" if order == 1 else "DESC"} + """ + + # offset, limit + query += f""" + LIMIT {limit} OFFSET {offset} + """ + + result = self.execute(query) return { - "total_rows": records.total_rows, - "next_page": records.next_page_token, - "rows": rows, + "total_rows": total_rows, + "next_page": (page + 1) if (page * limit) < total_rows else None, + "rows": [dict(record) for record in result], } def get_columnspec(self, schema: str, table_id: str): diff --git a/dbt_automation/utils/interfaces/warehouse_interface.py b/dbt_automation/utils/interfaces/warehouse_interface.py index e54ec76..6ebe465 100644 --- a/dbt_automation/utils/interfaces/warehouse_interface.py +++ b/dbt_automation/utils/interfaces/warehouse_interface.py @@ -16,7 +16,13 @@ def get_schemas(self): @abstractmethod def get_table_data( - self, schema: str, table: str, limit: int, page: int = 1, page_token: str = None + self, + schema: str, + table: str, + limit: int, + page: int = 1, + order_by: str = None, + order: int = 1, # ASC ): pass diff --git a/dbt_automation/utils/postgres.py b/dbt_automation/utils/postgres.py index 019300c..0ec455b 100644 --- a/dbt_automation/utils/postgres.py +++ b/dbt_automation/utils/postgres.py @@ -3,6 +3,7 @@ from logging import basicConfig, getLogger, INFO import psycopg2 import os +from dbt_automation.utils.columnutils import quote_columnname from dbt_automation.utils.interfaces.warehouse_interface import WarehouseInterface @@ -83,7 +84,13 @@ def get_schemas(self) -> list: return [x[0] for x in resultset] def get_table_data( - self, schema: str, table: str, limit: int, page: int = 1, page_token: str = None + self, + schema: str, + table: str, + limit: int, + page: int = 1, + order_by: str = None, + order: int = 1, # ASC ) -> dict: """ returns limited rows from the specified table in the given schema @@ -91,13 +98,24 @@ def get_table_data( offset = (page - 1) * limit total_rows = self.execute(f"SELECT COUNT(*) FROM {schema}.{table}")[0][0] - resultset = self.execute( - f""" - SELECT * - FROM {schema}.{table} - OFFSET {offset} LIMIT {limit}; + # select + query = f""" + SELECT * + FROM {schema}.{table} + """ + + # order + if order_by: + query += f""" + ORDER BY {quote_columnname(order_by)} {"ASC" if order == 1 else "DESC"} """ - ) # returns an array of tuples of values + + # offset, limit + query += f""" + OFFSET {offset} LIMIT {limit}; + """ + + resultset = self.execute(query) # returns an array of tuples of values col_names = [desc[0] for desc in self.cursor.description] rows = [dict(zip(col_names, row)) for row in resultset] From a3bc570056bab453b12654316cff6145f2b17cda Mon Sep 17 00:00:00 2001 From: Ishan Date: Thu, 8 Feb 2024 18:05:44 +0530 Subject: [PATCH 3/4] frontend will handle the pagination logic via another api; just send the data back in get_table_data --- dbt_automation/utils/bigquery.py | 17 +++++++---------- dbt_automation/utils/postgres.py | 10 +++------- 2 files changed, 10 insertions(+), 17 deletions(-) diff --git a/dbt_automation/utils/bigquery.py b/dbt_automation/utils/bigquery.py index 1bf3154..a02ffd2 100644 --- a/dbt_automation/utils/bigquery.py +++ b/dbt_automation/utils/bigquery.py @@ -59,14 +59,14 @@ def get_table_data( page: int = 1, order_by: str = None, order: int = 1, # ASC - ) -> dict: + ) -> list: """returns limited rows from the specified table in the given schema""" offset = (page - 1) * limit - total_rows = self.execute( - f"SELECT COUNT(*) as total_rows FROM `{schema}`.`{table}`" - ) - total_rows = next(total_rows).total_rows + # total_rows = self.execute( + # f"SELECT COUNT(*) as total_rows FROM `{schema}`.`{table}`" + # ) + # total_rows = next(total_rows).total_rows # select query = f""" @@ -86,12 +86,9 @@ def get_table_data( """ result = self.execute(query) + rows = [dict(record) for record in result] - return { - "total_rows": total_rows, - "next_page": (page + 1) if (page * limit) < total_rows else None, - "rows": [dict(record) for record in result], - } + return rows def get_columnspec(self, schema: str, table_id: str): """fetch the list of columns from a BigQuery table.""" diff --git a/dbt_automation/utils/postgres.py b/dbt_automation/utils/postgres.py index 0ec455b..3124370 100644 --- a/dbt_automation/utils/postgres.py +++ b/dbt_automation/utils/postgres.py @@ -91,12 +91,12 @@ def get_table_data( page: int = 1, order_by: str = None, order: int = 1, # ASC - ) -> dict: + ) -> list: """ returns limited rows from the specified table in the given schema """ offset = (page - 1) * limit - total_rows = self.execute(f"SELECT COUNT(*) FROM {schema}.{table}")[0][0] + # total_rows = self.execute(f"SELECT COUNT(*) FROM {schema}.{table}")[0][0] # select query = f""" @@ -119,11 +119,7 @@ def get_table_data( col_names = [desc[0] for desc in self.cursor.description] rows = [dict(zip(col_names, row)) for row in resultset] - return { - "total_rows": total_rows, - "next_page": page + 1 if (page * limit) < total_rows else None, - "rows": rows, - } + return rows def get_table_columns(self, schema: str, table: str) -> list: """returns the column names of the specified table in the given schema""" From ea9b756b3f0232d7e2b1351ea8fbe02d14b16cae Mon Sep 17 00:00:00 2001 From: Rohit Chatterjee Date: Thu, 8 Feb 2024 18:36:41 +0530 Subject: [PATCH 4/4] location is now passed in every call to execute() --- dbt_automation/utils/bigquery.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dbt_automation/utils/bigquery.py b/dbt_automation/utils/bigquery.py index a02ffd2..5bb30e8 100644 --- a/dbt_automation/utils/bigquery.py +++ b/dbt_automation/utils/bigquery.py @@ -117,7 +117,6 @@ def get_json_columnspec( FROM keys CROSS JOIN UNNEST(keys.keys) AS k ''', - location=self.location, ) return [json_field["k"] for json_field in query]