Skip to content

Commit

Permalink
Wraedy/bigquery db connector (#875)
Browse files Browse the repository at this point in the history
* Create the DatabaseConnector

* Implement DatabaseConnector for the DB connectors

* Add DatabaseConnector to std imports

* Flake8 fix

* Remove reference to padding in copy()

* Add database_discover and fix inheritance

* Remove strict_length from copy()

* Put strict_length back in original order

* Remove strict_length stub from BQ

* Fix discover_database export statement

* Add return annotation to mysql table_exists

* Black formatter pass

* create bigquery folder in databases folde

* create query parity between bigquery and redshift

* mock up copy functionality for bigquery

* fix typo

* add duplicate function to bigquery

* move transaction to helper function

* implement upsert

* fix imports and packages

* add get tables and views methods

* add query return flexibility

* match bigquery apis with redshift

* make s3 to gcs more generic

* add transaction support to bigquery

* remove logs

* add gcs docs

* process job config in function

* finish todo's (and add one more lol)

* [ wip ] AttributeError

* add raw download param

* drop raw download

* copy from GCS docstring

* copy s3 docs

* copy docs

* docstrings

* control flow

* add source path to aws transfer spec

* add Code object to imports

* cleaning up slightly

* check status code

* nice

* pass in required param

* add pattern handling

* add quote character to LoadJobConfig

* add schema to copy from gcs

* drop dist and sortkeys

No longer input params

* add delimiter param

* use schema definition

* write column mapping helper

* pass in formatted schema to load_uri fn

* rename new file

* move file with jason's changes

* move new changes back into file to maintain history

* remove extraneous fn and move project job config

* get back to test parity

* fix bad merge conflict

* remove extra params from copy sig

* clarify transaction guidance

* clean up list blobs

* clean up storage transfer polling

* upgrade cloud storage package

* use list of schema mappings

* scaffolded big file function 😎

* add to docs

* default to compression

we can make this more flexible, just scaffolding

* add temp logging

we can drop this later just trying to get a handle on cycle time

* use decompress

* add logging

* implement unzipping and reuploading cloud file

* logging error

* Add destination path

* Small fix

* add todo's

* drop max wait time

* add kwargs to put blob

Potentially useful for metadata (content type, etc.)

* add verbosity to description

* black formatted

* add gcs to/from helpers

* write to_bigquery function

* update big file logic

* allow jagged rows logic

* test additional methods

* add duplicate table test

* test drop flag for duplicate

* basic test for upsert

* add typing

* move non-essential logs to debug

* move logs to debug

* hey, it works!

* add UUID support for bigquery type map

* add datetime to bigquery type map

* address comments

* address comments

* drop GCS class function

we can pick this up later but it doesn't currently work

* move class back to old location with new import

* revert to old name

* remove transaction error handler

* add description conditional block for s3

* change one more conditional to s3

* handle empty source paths

* reverting new import path

---------

Co-authored-by: Jason Walker <[email protected]>
Co-authored-by: Ian <[email protected]>
Co-authored-by: Kasia Hinkson <[email protected]>
  • Loading branch information
4 people authored Oct 6, 2023
1 parent f6ee530 commit 55d3717
Show file tree
Hide file tree
Showing 12 changed files with 1,736 additions and 442 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,4 @@ RUN python setup.py develop
RUN mkdir /app
WORKDIR /app
# Useful for importing modules that are associated with your python scripts:
env PYTHONPATH=.:/app
ENV PYTHONPATH=.:/app
4 changes: 2 additions & 2 deletions parsons/databases/discover_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from parsons.databases.redshift import Redshift
from parsons.databases.mysql import MySQL
from parsons.databases.postgres import Postgres
from parsons.google.google_bigquery import GoogleBigQuery
from parsons.databases.bigquery import BigQuery


def discover_database(
Expand Down Expand Up @@ -40,7 +40,7 @@ def discover_database(
"Redshift": Redshift,
"MySQL": MySQL,
"Postgres": Postgres,
"GoogleBigQuery": GoogleBigQuery,
"GoogleBigQuery": BigQuery,
}

password_vars = {
Expand Down
2 changes: 1 addition & 1 deletion parsons/databases/redshift/redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,7 @@ def unload(
sql: str
The SQL string to execute to generate the data to unload.
buckey: str
bucket: str
The destination S3 bucket
key_prefix: str
The prefix of the key names that will be written
Expand Down
4 changes: 0 additions & 4 deletions parsons/etl/etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

class ETL(object):
def __init__(self):

pass

def add_column(self, column, value=None, index=None, if_exists="fail"):
Expand Down Expand Up @@ -177,7 +176,6 @@ def get_column_max_width(self, column):
max_width = 0

for v in petl.values(self.table, column):

if len(str(v).encode("utf-8")) > max_width:
max_width = len(str(v).encode("utf-8"))

Expand Down Expand Up @@ -285,7 +283,6 @@ def map_columns(self, column_map, exact_match=True):
"""

for col in self.columns:

if not exact_match:
cleaned_col = col.lower().replace("_", "").replace(" ", "")
else:
Expand Down Expand Up @@ -801,7 +798,6 @@ def _prepend_dict(self, dict_obj, prepend):
new_dict = {}

for k, v in dict_obj.items():

new_dict[prepend + "_" + k] = v

return new_dict
Expand Down
9 changes: 0 additions & 9 deletions parsons/etl/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,11 @@ class Table(ETL, ToFrom):
"""

def __init__(self, lst=[]):

self.table = None

lst_type = type(lst)

if lst_type in [list, tuple]:

# Check for empty list
if not len(lst):
self.table = petl.fromdicts([])
Expand All @@ -59,33 +57,26 @@ def __init__(self, lst=[]):
self._index_count = 0

def __repr__(self):

return repr(petl.dicts(self.table))

def __iter__(self):

return iter(petl.dicts(self.table))

def __getitem__(self, index):

if isinstance(index, int):

return self.row_data(index)

elif isinstance(index, str):

return self.column_data(index)

elif isinstance(index, slice):
tblslice = petl.rowslice(self.table, index.start, index.stop, index.step)
return [row for row in tblslice]

else:

raise TypeError("You must pass a string or an index as a value.")

def __bool__(self):

# Try to get a single row from our table
head_one = petl.head(self.table)

Expand Down
63 changes: 62 additions & 1 deletion parsons/etl/tofrom.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import io
import gzip
from typing import Optional
from parsons.utilities import files, zip_archive


Expand Down Expand Up @@ -619,8 +620,39 @@ def to_postgres(
pg = Postgres(username=username, password=password, host=host, db=db, port=port)
pg.copy(self, table_name, **copy_args)

def to_petl(self):
def to_bigquery(
self,
table_name: str,
app_creds: Optional[str] = None,
project: Optional[str] = None,
**kwargs,
):
"""
Write a table to BigQuery
`Args`:
table_name: str
Table name to write to in BigQuery; this should be in `schema.table` format
app_creds: str
A credentials json string or a path to a json file. Not required
if ``GOOGLE_APPLICATION_CREDENTIALS`` env variable set.
project: str
The project which the client is acting on behalf of. If not passed
then will use the default inferred environment.
\**kwargs: kwargs
Additional keyword arguments passed into the `.copy()` function (`if_exists`,
`max_errors`, etc.)
`Returns`:
``None``
"""

from parsons.databases.bigquery.bigquery import BigQuery

bq = BigQuery(app_creds=app_creds, project=project)
bq.copy(self, table_name=table_name, **kwargs)

def to_petl(self):
return self.table

def to_civis(
Expand Down Expand Up @@ -898,6 +930,35 @@ def from_s3_csv(

return cls(petl.cat(*tbls))

@classmethod
def from_bigquery(cls, sql: str, app_creds: str = None, project: str = None):
"""
Create a ``parsons table`` from a BigQuery statement.
To pull an entire BigQuery table, use a query like ``SELECT * FROM {{ table }}``.
`Args`:
sql: str
A valid SQL statement
app_creds: str
A credentials json string or a path to a json file. Not required
if ``GOOGLE_APPLICATION_CREDENTIALS`` env variable set.
project: str
The project which the client is acting on behalf of. If not passed
then will use the default inferred environment.
TODO - Should users be able to pass in kwargs here? For parameters?
`Returns`:
Parsons Table
See :ref:`parsons-table` for output options.
"""

from parsons.databases.bigquery.bigquery import BigQuery

bq = BigQuery(app_creds=app_creds, project=project)

return bq.query(sql=sql)

@classmethod
def from_dataframe(cls, dataframe, include_index=False):
"""
Expand Down
Loading

0 comments on commit 55d3717

Please sign in to comment.