Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport release-1.12] [r] Connect re-indexer to blockwise iterator #2748

Merged
merged 1 commit into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 141 additions & 20 deletions apis/r/R/BlockwiseIter.R
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ BlockwiseReadIterBase <- R6::R6Class(
coords,
axis,
...,
reindex_disable_on_axis = NULL
reindex_disable_on_axis = NA
) {
super$initialize(sr)
stopifnot(
Expand Down Expand Up @@ -55,6 +55,16 @@ BlockwiseReadIterBase <- R6::R6Class(
}
private$.coords <- coords
# Check reindex_disable_on_axis
if (is_scalar_logical(reindex_disable_on_axis)) {
reindex_disable_on_axis <- if (isTRUE(reindex_disable_on_axis)) { # TRUE
bit64::seq.integer64(0L, ndim)
} else if (isFALSE(reindex_disable_on_axis)) { # FALSE
NULL
} else { # NA
ax <- bit64::seq.integer64(0L, ndim)
ax[ax != self$axis]
}
}
if (!is.null(reindex_disable_on_axis)) {
stopifnot(
"'reindex_disable_on_axis' must be a vector of integers" = (
Expand All @@ -63,11 +73,27 @@ BlockwiseReadIterBase <- R6::R6Class(
),
"'reindex_disable_on_axis' must be finite" = is.finite(reindex_disable_on_axis),
"'reindex_disable_on_axis' must be within the range of dimensions of the array" = all(
reindex_disable_on_axis >= 0 && reindex_disable_on_axis <= ndim
reindex_disable_on_axis >= 0 & reindex_disable_on_axis <= ndim
)
)
reindex_disable_on_axis <- unique(bit64::as.integer64(reindex_disable_on_axis))
}
private$.reindex_disable_on_axis <- reindex_disable_on_axis
axes_to_reindex <- self$axes_to_reindex
private$.reindexers <- vector("list", length = length(axes_to_reindex))
shape <- self$array$shape()
dnames <- self$array$dimnames()
for (i in seq_along(axes_to_reindex)) {
ax <- as.numeric(axes_to_reindex[i]) + 1L
coords <- as.list(CoordsStrider$new(start = 0L, end = shape[ax] - 1L))
coords <- if (length(coords) == 1L) {
coords[[1L]]
} else {
unlist64(coords)
}
private$.reindexers[[i]] <- IntIndexer$new(coords)
names(private$.reindexers)[i] <- dnames[ax]
}
},
#' @description Check if the iterated read is complete or not
#'
Expand All @@ -90,8 +116,8 @@ BlockwiseReadIterBase <- R6::R6Class(
}
private$reset()
dimnam <- self$array$dimnames()[self$axis + 1L]
nextelems <- self$coords_axis$next_element()
private$set_dim_points(dimnam, nextelems)
private$.nextelems <- self$coords_axis$next_element()
private$set_dim_points(dimnam, private$.nextelems)
return(private$.read_next())
}
),
Expand All @@ -102,6 +128,19 @@ BlockwiseReadIterBase <- R6::R6Class(
#' @field axis The axis to iterate over in a blockwise fashion
#'
axis = function() private$.axis,
#' @field axes_to_reindex The axes to re-index
#'
axes_to_reindex = function() {
ax <- bit64::seq.integer64(0L, self$array$ndim() - 1L)
ax <- ax[!ax %in% self$reindex_disable_on_axis]
if (length(ax)) {
ax <- ax[ax != self$axis]
}
if (!length(ax)) {
return(NULL)
}
return(ax)
},
#' @field coords A list of \code{\link{CoordsStrider}} objects
#'
coords = function() private$.coords,
Expand All @@ -113,13 +152,26 @@ BlockwiseReadIterBase <- R6::R6Class(
},
#' @field reindex_disable_on_axis Additional axes that will not be re-indexed
#'
reindex_disable_on_axis = function() private$.reindex_disable_on_axis
reindex_disable_on_axis = function() private$.reindex_disable_on_axis,
#' @field reindexable Shorthand to see if this iterator is poised to be
#' re-indexed or not
#'
reindexable = function() length(self$axes_to_reindex) ||
!bit64::as.integer64(self$axis) %in% self$reindex_disable_on_axis
),
private = list(
.array = NULL,
.coords = list(),
.axis = integer(1L),
.nextelems = NULL,
.reindex_disable_on_axis = NULL,
.reindexers = list(),
# @description Throw an error saying that re-indexed
# iterators are not concatenatable
.notConcatenatable = function() stop(errorCondition(
message = "Re-indexed blockwise iterators are not concatenatable",
class = "notConcatenatableError"
)),
# @description Reset internal state of SOMA Reader while keeping array open
reset = function() {
if (is.null(private$soma_reader_pointer)) {
Expand All @@ -128,6 +180,48 @@ BlockwiseReadIterBase <- R6::R6Class(
sr_reset(private$soma_reader_pointer)
return(invisible(NULL))
},
# @description Re-index an Arrow table
reindex_arrow_table = function(tbl) {
stopifnot(
"'tbl' must be an Arrow table" = R6::is.R6(tbl) && inherits(tbl, 'Table')
)
dname <- self$array$dimnames()[self$axis + 1L]
if (!dname %in% names(tbl)) {
stop(
"Cannot find ",
sQuote(dname),
" in the provided Arrow table",
call. = FALSE
)
}
op <- options(arrow.int64_downcast = FALSE)
on.exit(options(op), add = TRUE, after = FALSE)
coords <- self$coords
coords[[dname]] <- CoordsStrider$new(
private$.nextelems,
stride = coords[[dname]]$stride
)
if (!bit64::as.integer64(self$axis) %in% self$reindex_disable_on_axis) {
indexer <- IntIndexer$new(private$.nextelems)
tbl[[dname]] <- indexer$get_indexer(
tbl[[dname]]$as_vector(),
nomatch_na = TRUE
)
rm(indexer)
}
for (dname in names(private$.reindexers)) {
if (!dname %in% names(tbl)) {
""
}
indexer <- private$.reindexers[[dname]]
tbl[[dname]] <- indexer$get_indexer(
tbl[[dname]]$as_vector(),
nomatch_na = TRUE
)
}
attr(tbl, "coords") <- coords
return(tbl)
},
# @description Set dimension selection on given axis
set_dim_points = function(dimname, points) {
stopifnot(
Expand Down Expand Up @@ -156,14 +250,22 @@ BlockwiseTableReadIter <- R6::R6Class(
classname = "BlockwiseTableReadIter",
inherit = BlockwiseReadIterBase,
public = list(
#' @description ...
#' @description Concatenate the remainder of the blockwise iterator
#'
#' @return ...
#' @return An Arrow Table with the remainder of the iterator
#'
concat = function() soma_array_to_arrow_table_concat(self)
concat = function() {
if (self$reindexable) {
private$.notConcatenatable()
}
return(soma_array_to_arrow_table_concat(self))
}
),
private = list(
soma_reader_transform = function(x) soma_array_to_arrow_table(x)
soma_reader_transform = function(x) {
tbl <- soma_array_to_arrow_table(x)
return(private$reindex_arrow_table(tbl))
}
)
)

Expand Down Expand Up @@ -194,7 +296,7 @@ BlockwiseSparseReadIter <- R6::R6Class(
axis,
...,
repr = "T",
reindex_disable_on_axis = NULL
reindex_disable_on_axis = NA
) {
super$initialize(
sr,
Expand All @@ -204,14 +306,28 @@ BlockwiseSparseReadIter <- R6::R6Class(
...,
reindex_disable_on_axis = reindex_disable_on_axis
)
private$.repr <- match.arg(repr)
stopifnot(
"Sparse reads only work with two-dimensional arrays" = self$array$ndim() == 2L
)
reprs <- c(
'T',
if (!bit64::as.integer64(0L) %in% self$reindex_disable_on_axis)'R',
if (!bit64::as.integer64(1L) %in% self$reindex_disable_on_axis) 'C'
)
private$.repr <- match.arg(repr, choices = reprs)
private$.shape <- sapply(coords, length)
},
#' @description ...
#' @description Concatenate the remainder of the blockwise iterator
#'
#' @return ...
#' @return A sparse matrix (determined by \code{self$repr}) with
#' the remainder of the iterator
#'
concat = function() soma_array_to_sparse_matrix_concat(self, private$.zero_based)
concat = function() {
if (self$reindexable) {
private$.notConcatenatable()
}
return(soma_array_to_sparse_matrix_concat(self, private$.zero_based))
}
),
active = list(
#' @field repr Representation of the sparse matrix to return
Expand All @@ -222,11 +338,16 @@ BlockwiseSparseReadIter <- R6::R6Class(
.repr = character(1L),
.shape = NULL,
.zero_based = FALSE,
soma_reader_transform = function(x) arrow_table_to_sparse(
soma_array_to_arrow_table(x),
repr = self$repr,
shape = private$.shape,
zero_based = private$.zero_based
)
soma_reader_transform = function(x) {
tbl <- private$reindex_arrow_table(soma_array_to_arrow_table(x))
mat <- arrow_table_to_sparse(
tbl,
repr = self$repr,
shape = private$.shape,
zero_based = private$.zero_based
)
attr(mat, "coords") <- attr(tbl, "coords", exact = TRUE)
return(mat)
}
)
)
5 changes: 3 additions & 2 deletions apis/r/R/SOMASparseNDArrayRead.R
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ SOMASparseNDArrayRead <- R6::R6Class(
axis,
...,
size = NULL,
reindex_disable_on_axis = NULL
reindex_disable_on_axis = NA
) {
return(SOMASparseNDArrayBlockwiseRead$new(
self$sr,
Expand Down Expand Up @@ -210,14 +210,15 @@ SOMASparseNDArrayBlockwiseRead <- R6::R6Class(
axis,
...,
size,
reindex_disable_on_axis = NULL
reindex_disable_on_axis = NA
) {
super$initialize(sr, array, coords)
stopifnot(
"'size' must be a single integer value" = is.null(size) ||
rlang::is_integerish(size, 1L, finite = TRUE) ||
(inherits(size, 'integer64') && length(size) == 1L && is.finite(size)),
"'reindex_disable_on_axis' must be a vector of integers" = is.null(reindex_disable_on_axis) ||
is_scalar_logical(reindex_disable_on_axis) ||
rlang::is_integerish(reindex_disable_on_axis, finite = TRUE) ||
(inherits(reindex_disable_on_axis, 'integer64') && all(is.finite(reindex_disable_on_axis)))
)
Expand Down
16 changes: 14 additions & 2 deletions apis/r/man/BlockwiseReadIterBase.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 20 additions & 5 deletions apis/r/man/BlockwiseSparseReadIter.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions apis/r/man/BlockwiseTableReadIter.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading