Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

'Bulk' download #167

Closed
max-sixty opened this issue Apr 21, 2018 · 3 comments
Closed

'Bulk' download #167

max-sixty opened this issue Apr 21, 2018 · 3 comments

Comments

@max-sixty
Copy link
Contributor

max-sixty commented Apr 21, 2018

ref: #133

Here's some code we're trying internally to make larger downloads possible / faster.

Let me know thoughts (and anyone is welcome to take this and do a PR if they'd like, similar to bulk upload)

from google.cloud import bigquery, storage


# https://stackoverflow.com/questions/14622526
def create_from_query(query, dataset, table, block=False, if_exists='fail',
                      project=None, credentials=None):
    """
    Create a bigquery table from a query

    Parameters
    ----------
    query : str
        SQL-Like Query to return data values
    dataset : str
        dataset id
    table : str
        name of table
    block : bool (default False)
    if_exists : str (default: 'fail')
        append - Specifies that rows may be appended to an existing table
        fail - Specifies that the output table must be empty
        replace - Specifies that write should replace a table
    project : str (default to env var GOOGLE_CLOUD_PROJECT)
        Google BigQuery Account project ID.
    credentials : GoogleCredentials (optional)
        Name of result column to use for index in results DataFrame

    Returns
    -------
    job: google.cloud.bigquery.job.QueryJob
        Returns the inserted QueryJob
    """
    client = bigquery.Client(project=project, credentials=credentials)
    if dataset not in [x.dataset_id for x in client.list_datasets()]:
        # https://github.com/GoogleCloudPlatform/google-cloud-python/issues/4930
        dataset = bigquery.Dataset(client.dataset(dataset))
        client.create_dataset(dataset)
    config = bigquery.job.QueryJobConfig()
    config.use_legacy_sql = False
    config.allow_large_results = True
    config.destination = client.dataset(dataset).table(table)
    config.write_disposition = if_exists_map[if_exists]
    job_id = uuid.uuid4().hex[:10]
    job = client.query(query=query, job_id=job_id, job_config=config)
    if block:
        wait_for_job(job)
    return job


def export_table_to_gcs(
    dataset,
    table,
    timeout_in_seconds=600,
    bucket=None,
    blob=None,
    zipped=True,
):
    """
    export table to gcs.  returns tuple of (bucket, blob)
    """
    client = bigquery.Client(GOOGLE_CLOUD_PROJECT)
    table_ref = client.dataset(dataset).table(table)
    job_config = ExtractJobConfig()
    job_config.compression = 'GZIP' if zipped else 'NONE'
    bucket = bucket or '{}-temp'.format(client.project)
    blob = blob or '{}/{}.csv'.format(dataset, table)
    if zipped and not blob.endswith('.gz'):
        blob += '.gz'
    destination_uri = 'gs://{}/{}'.format(bucket, blob)
    extract_job = client.extract_table(
        table_ref, destination_uri, job_config=job_config)
    extract_job.result(timeout=timeout_in_seconds)
    logger.info('Exported {}.{} -> {}'.format(dataset, table, destination_uri))
    return bucket, blob


DEFAULT_BUCKET = '{}-temp'.format(GOOGLE_CLOUD_PROJECT)


def read_table(
        dataset,
        table,
        project=None,
        credentials=None,
        timeout_in_seconds=600,
        bucket=DEFAULT_BUCKET,
):
    """
    reads an entire table from gbq into a dataframe
    """

    storage = storage.Client(project, credentials)
    prefix = 'gbq-exports/{}/{}/'.format(dataset, table)
    bucket = storage.get_bucket(bucket)

    for old_blob in bucket.list_blobs(prefix=prefix):
        old_blob.delete()
        logger.info('Old Blob Deleted: {}'.format(old_blob.name))

    bq = bigquery.Client(project=project, credentials=credentials)
    table = bq.dataset(dataset).table(table)

    conf = ExtractJobConfig()
    conf.compression = 'GZIP'

    extract_job = bq.extract_table(
        table,
        'gs://{}/{}*.csv.gz'.format(bucket.name, prefix),
        job_config=conf)

    extract_job.result(timeout=timeout_in_seconds)

    frames = []

    path = tempfile.mkdtemp()
    for blob in bucket.list_blobs(prefix=prefix):
        filename = '{}/{}'.format(path, blob.name.replace('/', '__'))

        with open(filename, 'wb') as f:
            blob.download_to_file(f)

        frames.append(pd.read_csv(filename))
        blob.delete()
        logger.info('Processed: {}'.format(blob.name))

    return pd.concat(frames)


def read_gbq_bulk(
    query,
    project=None,
    bucket=None,
    dataset='temp_pandas',
    credentials=None,
):

    table_name = uuid.uuid4().hex[:6]
    create_from_query(
        query=query,
        dataset=dataset,
        table=table_name,
        project=project,
        credentials=credentials,
        block=True,
    )

    df = read_table(
        dataset=dataset,
        table=table_name,
        project=project,
    )

    bq = bigquery.Client(project=project, credentials=credentials)
    table = bq.dataset(dataset).table(table_name)
    bq.delete_table(table)

    return df
@smrgit
Copy link

smrgit commented Sep 10, 2018

I'm looking at this same sort of issue @max-sixty -- specifically I want to be able to export a large amount of 'tidy' data from BQ into a pandas df (typically with 3 columns) and then pivot it into a matrix for downstream matrix-manipulations. Have you come up with any improvements on the approach you posted above? or any suggestions for my particular use-case?

@max-sixty
Copy link
Contributor Author

How large @smrgit ?

If it's in memory, this approach works well. If larger than memory, then pandas likely isn't going to be the best tool.

We also made some changes to the vanilla from_gbq which improves performance, so it may be worth checking the latest release.

@tswast
Copy link
Collaborator

tswast commented Aug 9, 2019

Closed by #270

@tswast tswast closed this as completed Aug 9, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants