Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Use bigrquerystorage for downloads #604

Merged
merged 34 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
eb48ffc
WIP: Use bigrquerystorage for downloads
hadley Apr 2, 2024
1b95bf3
Pass bq-download tests
hadley Apr 10, 2024
d90ac6f
Fix doc buglet; re-document
hadley Apr 10, 2024
6f6783e
Final api = 'json' args
hadley Apr 10, 2024
36ed179
When possible, Use arrow in dbFetch()
hadley Apr 11, 2024
793a251
Add API argument to collect()
hadley Apr 11, 2024
c68a49d
Clarify bug workaround
hadley Apr 11, 2024
9d1f36d
R CMD check fixes
hadley Apr 11, 2024
c28f180
use_tidy_description()
hadley Apr 11, 2024
4a18fa2
Start tests for arrow api
hadley Apr 11, 2024
5590b6d
Polish news
hadley Apr 11, 2024
5c25718
Polish docs
hadley Apr 11, 2024
b8f44a8
Use bigrquerystorage without DBI methods
hadley Apr 18, 2024
7265975
Wrap DB references in I()
hadley Apr 18, 2024
9b52bf3
Don't commit snapshots
hadley Apr 18, 2024
fea1b2f
Restore bigrquerystorage to correct place
hadley Apr 18, 2024
bb4ff9b
Implement `bq_perform_query_schema()`
hadley Apr 18, 2024
17c0d3a
Use `bq_perform_query_schema()` to get vars
hadley Apr 18, 2024
f81a82b
Add some basic type tests
hadley Apr 19, 2024
6043c82
Polish docs some more
hadley Apr 19, 2024
55694bb
Polishing tests
hadley Apr 19, 2024
f777d65
Test & improve argument warnings
hadley Apr 19, 2024
26104ba
Drop R3.6 check since we're losing it soon anyway
hadley Apr 19, 2024
50b69c1
Re-document
hadley Apr 19, 2024
79ee191
Merge commit '60cfc4acca9c584caa15899c827888be990d777f'
hadley Sep 19, 2024
7f59af7
Can now use CRAN nanoparquet
hadley Sep 19, 2024
27c3dc7
Re-add accidentally dropped dep
hadley Sep 20, 2024
120d5a5
Can use CRAN bigrquerystorage
hadley Sep 20, 2024
3858c5e
Fix merge issue in NEWS
hadley Sep 20, 2024
9c51373
Polish docs
hadley Sep 20, 2024
9e6ecee
Switch back to dev version
hadley Sep 20, 2024
97e87ab
Restore more types supported by dev bigrquerystorage
hadley Sep 20, 2024
1446d9f
Improve tests
hadley Sep 20, 2024
fcfb00c
Use correct function name
hadley Sep 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ Description: Easily talk to Google's 'BigQuery' database from R.
License: MIT + file LICENSE
URL: https://bigrquery.r-dbi.org, https://github.com/r-dbi/bigrquery
BugReports: https://github.com/r-dbi/bigrquery/issues
Depends:
Depends:
R (>= 4.0)
Imports:
Imports:
bit64,
brio,
cli,
Expand All @@ -29,8 +29,9 @@ Imports:
prettyunits,
rlang (>= 1.1.0),
tibble,
nanoparquet (> 0.3.1)
nanoparquet (>= 0.3.1)
Suggests:
bigrquerystorage (>= 1.1.0.9000),
blob,
covr,
dbplyr (>= 2.4.0),
Expand All @@ -41,9 +42,7 @@ Suggests:
testthat (>= 3.1.5),
wk (>= 0.3.2),
withr
Remotes:
r-lib/nanoparquet
LinkingTo:
LinkingTo:
cli,
cpp11,
rapidjsonr
Expand All @@ -54,7 +53,7 @@ Config/testthat/start-first: bq-table, dplyr
Encoding: UTF-8
Roxygen: list(markdown = TRUE)
RoxygenNote: 7.3.2
Collate:
Collate:
'bigrquery-package.R'
'bq-auth.R'
'bq-dataset.R'
Expand Down Expand Up @@ -84,3 +83,5 @@ Collate:
'import-standalone-types-check.R'
'utils.R'
'zzz.R'
Remotes:
meztez/bigrquerystorage
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ export(bq_perform_extract)
export(bq_perform_load)
export(bq_perform_query)
export(bq_perform_query_dry_run)
export(bq_perform_query_schema)
export(bq_perform_upload)
export(bq_project_datasets)
export(bq_project_jobs)
Expand Down
148 changes: 77 additions & 71 deletions NEWS.md

Large diffs are not rendered by default.

109 changes: 84 additions & 25 deletions R/bq-download.R
Original file line number Diff line number Diff line change
@@ -1,61 +1,77 @@
#' Download table data
#'
#' This retrieves rows in chunks of `page_size`. It is most suitable for results
#' of smaller queries (<100 MB, say). For larger queries, it is better to
#' export the results to a CSV file stored on google cloud and use the
#' bq command line tool to download locally.
#' @description
#' This function provides two ways to download data from BigQuery, transfering
#' data using either JSON or arrow, depending on the `api` argument. If
#' bigrquerystorage is installed, `api = "arrow"` will be used (because it's
#' so much faster, but see the limitions below), otherwise you can select
#' deliberately by using `api = "json"` or `api = "arrow"`.
#'
#' @section Complex data:
#' bigrquery will retrieve nested and repeated columns in to list-columns
#' ## Arrow API
#'
#' The arrow API is much faster, but has heavier dependencies: bigrquerystorage
#' requires the arrow package, which can be tricky to compile on Linux (but you
#' usually should be able to get a binary from
#' [Posit Public Package Manager](https://posit.co/products/cloud/public-package-manager/).
#'
#' There's one known limitation of `api = "arrow"`: when querying public data,
#' you'll now need to provide a `billing` project.
#'
#' ## JSON API
#'
#' The JSON API retrieves rows in chunks of `page_size`. It is most suitable
#' for results of smaller queries (<100 MB, say). Unfortunately due to
#' limitations in the BigQuery API, you may need to vary this parameter
#' depending on the complexity of the underlying data.
#'
#' The JSON API will convert nested and repeated columns in to list-columns
#' as follows:
#'
#' * Repeated values (arrays) will become a list-column of vectors.
#' * Records will become list-columns of named lists.
#' * Repeated records will become list-columns of data frames.
#'
#' @section Larger datasets:
#' In my timings, this code takes around 1 minute per 100 MB of data.
#' If you need to download considerably more than this, I recommend:
#'
#' * Export a `.csv` file to Cloud Storage using [bq_table_save()].
#' * Use the `gsutil` command line utility to download it.
#' * Read the csv file into R with `readr::read_csv()` or `data.table::fread()`.
#'
#' Unfortunately you can not export nested or repeated formats into CSV, and
#' the formats that BigQuery supports (arvn and ndjson) that allow for
#' nested/repeated values, are not well supported in R.
#'
#' @return Because data retrieval may generate list-columns and the `data.frame`
#' print method can have problems with list-columns, this method returns
#' a tibble. If you need a `data.frame`, coerce the results with
#' [as.data.frame()].
#' @param x A [bq_table]
#' @param n_max Maximum number of results to retrieve. Use `Inf` to retrieve all
#' rows.
#' @param page_size The number of rows requested per chunk. It is recommended to
#' leave this unspecified until you have evidence that the `page_size`
#' selected automatically by `bq_table_download()` is problematic.
#' @param page_size (JSON only) The number of rows requested per chunk. It is
#' recommended to leave this unspecified until you have evidence that the
#' `page_size` selected automatically by `bq_table_download()` is problematic.
#'
#' When `page_size = NULL` bigrquery determines a conservative, natural chunk
#' size empirically. If you specify the `page_size`, it is important that each
#' chunk fits on one page, i.e. that the requested row limit is low enough to
#' prevent the API from paginating based on response size.
#' @param start_index Starting row index (zero-based).
#' @param max_connections Number of maximum simultaneous connections to
#' BigQuery servers.
#' @param start_index (JSON only) Starting row index (zero-based).
#' @param max_connections (JSON only) Number of maximum simultaneous
#' connections to BigQuery servers.
#' @param api Which API to use? The `"json"` API works where ever bigrquery
#' does, but is slow and can require fiddling with the `page_size` parameter.
#' The `"arrow"` API is faster and more reliable, but only works if you
#' have also installed the bigrquerystorage package.
#'
#' Because the `"arrow"` API is so much faster, it will be used automatically
#' if the bigrquerystorage package is installed.
#' @inheritParams api-job
#' @param bigint The R type that BigQuery's 64-bit integer types should be
#' mapped to. The default is `"integer"`, which returns R's `integer` type,
#' but results in `NA` for values above/below +/- 2147483647. `"integer64"`
#' returns a [bit64::integer64], which allows the full range of 64 bit
#' integers.
#' @param billing (Arrow only) Project to bill; defaults to the project of `x`,
#' and typically only needs to be specified if you're working with public
#' datasets.
#' @param max_results `r lifecycle::badge("deprecated")` Deprecated. Please use
#' `n_max` instead.
#' @section Google BigQuery API documentation:
#' * [list](https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/list)
#' @export
#' @examplesIf bq_testable()
#' df <- bq_table_download("publicdata.samples.natality", n_max = 35000)
#' df <- bq_table_download("publicdata.samples.natality", n_max = 35000, billing = bq_test_project())
bq_table_download <-
function(x,
n_max = Inf,
Expand All @@ -64,20 +80,55 @@ bq_table_download <-
max_connections = 6L,
quiet = NA,
bigint = c("integer", "integer64", "numeric", "character"),
api = c("json", "arrow"),
billing = x$project,
max_results = deprecated()) {
x <- as_bq_table(x)
check_number_whole(n_max, min = 0, allow_infinite = TRUE)
check_number_whole(start_index, min = 0)
check_number_whole(max_connections, min = 1)
quiet <- check_quiet(quiet)
bigint <- arg_match(bigint)
api <- check_api(api)

if (lifecycle::is_present(max_results)) {
lifecycle::deprecate_warn(
"1.4.0", "bq_table_download(max_results)", "bq_table_download(n_max)"
)
n_max <- max_results
}

if (api == "arrow") {
check_installed("bigrquerystorage", "required to download using arrow API")
if (!missing(page_size)) {
cli::cli_warn(
'{.arg page_size} is ignored when {.code api == "arrow"}',
call = environment()
)
}
if (!missing(start_index)) {
cli::cli_warn(
'{.arg start_index} is ignored when {.code api == "arrow"}',
call = environment()
)
}
if (!missing(max_connections)) {
cli::cli_warn(
'{.arg max_connections} is ignored when {.code api == "arrow"}',
call = environment()
)
}

return(bigrquerystorage::bqs_table_download(
x = toString(x),
parent = billing,
n_max = n_max,
quiet = quiet,
bigint = bigint,
as_tibble = TRUE
))
}

params <- set_row_params(
nrow = bq_table_nrow(x),
n_max = n_max,
Expand Down Expand Up @@ -202,6 +253,14 @@ bq_table_download <-
parse_postprocess(table_data, bigint = bigint)
}

check_api <- function(api = c("json", "arrow"), error_call = caller_env()) {
if (identical(api, c("json", "arrow"))) {
if (has_bigrquerystorage()) "arrow" else "json"
} else {
arg_match(api, error_call = error_call)
}
}

# This function is a modified version of
# https://github.com/r-dbi/RPostgres/blob/master/R/PqResult.R
parse_postprocess <- function(df, bigint) {
Expand Down
72 changes: 58 additions & 14 deletions R/bq-perform.R
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ export_json <- function(values) {
#' Google Cloud.
#'
#' For Google Cloud Storage URIs: Each URI can contain one
#' `'*'`` wildcard character and it must come after the 'bucket' name.
#' `'*'` wildcard character and it must come after the 'bucket' name.
#' Size limits related to load jobs apply to external data sources.
#'
#' For Google Cloud Bigtable URIs: Exactly one URI can be specified and
Expand Down Expand Up @@ -358,21 +358,13 @@ bq_perform_query_dry_run <- function(query, billing,
parameters = NULL,
use_legacy_sql = FALSE) {

check_string(query)
check_string(billing)
check_bool(use_legacy_sql)

query <- list(
query = unbox(query),
useLegacySql = unbox(use_legacy_sql)
query <- bq_perform_query_data(
query = query,
default_dataset = default_dataset,
parameters = parameters,
use_legacy_sql = use_legacy_sql
)
if (!is.null(parameters)) {
parameters <- as_bq_params(parameters)
query$queryParameters <- as_json(parameters)
}
if (!is.null(default_dataset)) {
query$defaultDataset <- datasetReference(default_dataset)
}

url <- bq_path(billing, jobs = "")
body <- list(configuration = list(query = query, dryRun = unbox(TRUE)))
Expand All @@ -386,6 +378,58 @@ bq_perform_query_dry_run <- function(query, billing,
structure(bytes, class = "bq_bytes")
}

#' @export
#' @rdname api-perform
bq_perform_query_schema <- function(query, billing,
...,
default_dataset = NULL,
parameters = NULL) {

query <- bq_perform_query_data(
query = query,
default_dataset = default_dataset,
parameters = parameters,
use_legacy_sql = FALSE
)

url <- bq_path(billing, jobs = "")
body <- list(configuration = list(query = query, dryRun = unbox(TRUE)))

res <- bq_post(
url,
body = bq_body(body, ...),
query = list(fields = "statistics")
)
# https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
res$statistics$query$schema$fields
}

bq_perform_query_data <- function(query,
...,
default_dataset = NULL,
parameters = NULL,
use_legacy_sql = FALSE,
call = caller_env()) {
check_string(query, error_call = call)
check_bool(use_legacy_sql, error_call = call)

query <- list(
query = unbox(query),
useLegacySql = unbox(use_legacy_sql)
)
if (!is.null(parameters)) {
parameters <- as_bq_params(parameters)
query$queryParameters <- as_json(parameters)
}
if (!is.null(default_dataset)) {
query$defaultDataset <- datasetReference(default_dataset)
}

query
}



#' @export
#' @rdname api-perform
bq_perform_copy <- function(src, dest,
Expand Down
2 changes: 1 addition & 1 deletion R/dbi-connection.R
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ setMethod("dbCreateTable", "BigQueryConnection", dbCreateTable_bq)

dbReadTable_bq <- function(conn, name, ...) {
tb <- as_bq_table(conn, name)
bq_table_download(tb, ...)
bq_table_download(tb, ..., api = "json")
}

#' @rdname DBI
Expand Down
31 changes: 22 additions & 9 deletions R/dbi-result.R
Original file line number Diff line number Diff line change
Expand Up @@ -100,18 +100,31 @@ setMethod(
"dbFetch", "BigQueryResult",
function(res, n = -1, ...) {
check_number_whole(n, min = -1, allow_infinite = TRUE)
if (n == -1) n <- Inf

if (n == -1 || n == Inf) {
if (has_bigrquerystorage() && n == Inf && res@cursor$cur() == 0) {
# https://github.com/meztez/bigrquerystorage/issues/48
n <- res@cursor$left()

# If possible, download complete dataset using arrow
data <- bq_table_download(res@bq_table,
n_max = n,
bigint = res@bigint,
quiet = res@quiet,
api = "arrow"
)
} else {
# Otherwise, fall back to slower JSON API
data <- bq_table_download(res@bq_table,
n_max = n,
start_index = res@cursor$cur(),
page_size = res@page_size,
bigint = res@bigint,
quiet = res@quiet,
api = "json"
)
}

data <- bq_table_download(res@bq_table,
n_max = n,
start_index = res@cursor$cur(),
page_size = res@page_size,
bigint = res@bigint,
quiet = res@quiet
)

res@cursor$adv(nrow(data))

data
Expand Down
Loading
Loading