Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

introduce fexplode function #4156

Draft
wants to merge 20 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ export(nafill)
export(setnafill)
export(.Last.updated)
export(fcoalesce)
export(funnest)

S3method("[", data.table)
S3method("[<-", data.table)
Expand Down
2 changes: 0 additions & 2 deletions R/setkey.R
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,6 @@ CJ = function(..., sorted = TRUE, unique = FALSE)
if (unique) l[[i]] = unique(y)
}
}
nrow = prod( vapply_1i(l, length) ) # lengths(l) will work from R 3.2.0
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

migrated this to C, epsilon more efficient but anyway cleaner to have more logic there; will also make it easier to switch to long vector support

if (nrow > .Machine$integer.max) stop(gettextf("Cross product of elements provided to CJ() would result in %.0f rows which exceeds .Machine$integer.max == %d", nrow, .Machine$integer.max, domain='R-data.table'))
l = .Call(Ccj, l)
setDT(l)
l = setalloccol(l) # a tiny bit wasteful to over-allocate a fixed join table (column slots only), doing it anyway for consistency since
Expand Down
2 changes: 2 additions & 0 deletions R/wrappers.R
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@ fcase = function(..., default=NA) .Call(CfcaseR, default, parent.frame(), as.l
colnamesInt = function(x, cols, check_dups=FALSE) .Call(CcolnamesInt, x, cols, check_dups)
coerceFill = function(x) .Call(CcoerceFillR, x)

funnest = function(x, cols = which(vapply_1b(x, is.list))) setDT(.Call(Cunnest, x, cols))[]

testMsg = function(status=0L, nx=2L, nk=2L) .Call(CtestMsgR, as.integer(status)[1L], as.integer(nx)[1L], as.integer(nk)[1L])
68 changes: 67 additions & 1 deletion inst/tests/tests.Rraw
Original file line number Diff line number Diff line change
Expand Up @@ -12343,7 +12343,7 @@ unlink(f)
test(1882.1, .Machine$integer.max, 2147483647L) # same on all platforms and very unlikely to change in R (which is good)
test(1882.2, ceiling(.Machine$integer.max^(1/3)), 1291)
v = seq_len(1291L)
test(1882.3, CJ(v, v, v), error="Cross product of elements provided to CJ() would result in 2151685171 rows which exceeds .Machine$integer.max == 2147483647")
test(1882.3, CJ(v, v, v), error="Cross product of elements provided for cross-join would result in 2151685171 rows which exceeds .Machine$integer.max == 2147483647")

# no re-read for particular file, #2509
if (test_R.utils) test(1883, fread(testDir("SA2-by-DJZ.csv.gz"), verbose=TRUE, header=FALSE)[c(1,2,1381,.N),],
Expand Down Expand Up @@ -16726,6 +16726,72 @@ DT = data.table(
s4class(x=2L, y="yes", z=1)))
test(2130.03, print(DT), output=c(" x y", "1: 1 <ex_class[3]>", "2: 2 <ex_class[3]>"))

# funnest
x = setDT(list(V1=1:2, V2=c(3,4), V3=list(1:3, 1:2), V4=list(1L, 1:3)))
ans = data.table(
V1 = c(1L, 1L, 1L, 2L, 2L, 2L, 2L, 2L, 2L),
V2 = c(3, 3, 3, 4, 4, 4, 4, 4, 4),
V3 = c(1L, 2L, 3L, 1L, 1L, 1L, 2L, 2L, 2L),
V4 = c(1L, 1L, 1L, 1L, 2L, 3L, 1L, 2L, 3L)
)
test(2131.01, funnest(x), ans)
x[ , V1 := letters[V1]]
ans[ , V1 := letters[V1]]
test(2131.02, funnest(x), ans)
x[ , V1 := factor(V1)]
ans[ , V1 := factor(V1)]
test(2131.03, funnest(x), ans)

x[ , e := expression(1, 2)]
test(2131.04, funnest(x), error='Unsupported column type')
x[ , e := NULL]

x[ , c('r', 'z') := .(as.raw(0), 0+1i)]
ans[ , c('r', 'z') := .(as.raw(0), 0+1i)]
test(2131.05, funnest(x), ans)
x[ , c('r', 'z') := NULL]
ans[ , c('r', 'z') := NULL]

x[ , V3 := .(lapply(V3, function(i) letters[i]))]
ans[ , V3 := letters[V3]]
test(2131.06, funnest(x), ans)

x[ , V3 := .(lapply(V3, factor))]
ans[ , V3 := factor(V3)]
test(2131.07, funnest(x), ans)

x[1L, V3 := .(list(expression(1)))]
test(2131.08, funnest(x), error="Type 'expression' not supported")

x[1L, V3 := .(list(1:3))]
ans[1:3, V3 := factor(1:3)]
ans[ , V3 := factor(V3, levels = c('1', '2', '3', 'a', 'b'))]
test(2131.09, funnest(x), ans)

x[2L, V3 := .(list(c('a', 'b')))]
ans[ , V3 := as.character(V3)]
test(2131.10, funnest(x), ans)

ans = unique(ans[ , !'V4'])[ , V4 := .(rep(x$V4, 3:2))]
test(2131.11, funnest(x, cols=3L), ans)
test(2131.12, funnest(x, cols=2:3), ans)
test(2131.13, funnest(x, cols='a'), error='cols must be an integer vector, got')
test(2131.14, funnest(x, cols=10L), error='cols to unnest must be in [1, ncol(x)=4]')
test(2131.15, funnest(x, cols=1L), x)
test(2131.16, address(funnest(x, cols=1L)) != address(x))

x[ , V4 := NULL]
ans[ , V4 := NULL]
ans = unique(ans)
test(2131.17, funnest(x), ans)

test(2131.18, funnest(1), error='Input to funnest must be a data.table')
x = data.table(a=1)
test(2131.19, funnest(x), x)
test(2131.20, address(funnest(x)) != address(x))

x[ , e := expression(2)]
test(2131.21, funnest(x, cols=2L), error='Unsupported type for unnesting')

########################
# Add new tests here #
Expand Down
27 changes: 27 additions & 0 deletions man/unnest.Rd
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
\name{funnest}
\alias{funnest}
\title{ Unnest/explode list columns }
\description{
For tables with non-rectangular columns (i.e., \code{list}), \code{funnest} "stretches" the table by creating a row for each list element, while also preserving the structure of rectangular columns. Akin to \code{EXPLODE} in U-SQL or HiveQL/SparkQL or \code{UNNEST} from Presto or BigQuery, and similar to \code{\link{melt}} -- both reshape "long", but \code{funnest} does so for "ragged" tables more naturally.
}
\usage{
funnest(x, cols = which(vapply_1b(x, is.list)))
}
\arguments{
\item{x}{ A \code{data.table} }
\item{cols}{ An \code{integer} vector of column indices of which columns to unnest; defaults to all \code{list} columns. Can be useful for unnesting only a subset of a table's \code{list} columns. Note that non-\code{list} columns are skipped; if there are no \code{list} columns provided, a \code{\link{copy}} of the table is returned. }
}
\details{
By default, when \code{length(cols) > 1L}, a \emph{cartesian unnest} is performed, that is, the cross-product (\emph{a la} \code{\link{CJ}}) of each row's list elements is returned. If there are two columns in \code{cols}, \code{A} and \code{B}, the output will thus have \code{sum(lengths(A) * lengths(B))} rows.
}
\value{
A \code{data.table}
}
\seealso{
\code{\link{CJ}}, \code{\link{rbindlist}}
}
\examples{
x = setDT(list(V1 = 1:2, V2 = 3:4, V3 = list(1:3, 1:2), V4 = list(1L, 1:3)))
funnest(x)
}
\keyword{ data }
163 changes: 153 additions & 10 deletions src/cj.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
SEXP cj(SEXP base_list) {
int ncol = LENGTH(base_list);
SEXP out = PROTECT(allocVector(VECSXP, ncol));
int nrow = 1;
// already confirmed to be less than .Machine$integer.max at R level
for (int j=0; j<ncol; ++j) nrow *= length(VECTOR_ELT(base_list, j));
// start with double to allow overflow
double nrow_dbl=1;
for (int j=0; j<ncol; ++j) nrow_dbl *= length(VECTOR_ELT(base_list, j));
if (nrow_dbl > INT_MAX)
error(_("Cross product of elements provided for cross-join would result in %.0f rows which exceeds .Machine$integer.max == %d"), nrow_dbl, INT_MAX);
int nrow = (int) nrow_dbl;
int eachrep = 1;
for (int j=ncol-1; j>=0; --j) {
SEXP source = VECTOR_ELT(base_list, j), target;
Expand All @@ -20,43 +23,43 @@ SEXP cj(SEXP base_list) {
case INTSXP: {
const int *restrict sourceP = INTEGER(source);
int *restrict targetP = INTEGER(target);
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for num_threads(getDTthreads()) if (thislen > OMP_MIN_VALUE)
// default static schedule so two threads won't write to same cache line in last column
// if they did write to same cache line (and will when last column's thislen is small) there's no correctness issue
for (int i=0; i<thislen; ++i) {
const int item = sourceP[i];
const int end = (i+1)*eachrep;
for (int j=i*eachrep; j<end; ++j) targetP[j] = item; // no div, mod or read ops inside loop; just rep a const contiguous write
}
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for num_threads(getDTthreads()) if (ncopy > OMP_MIN_VALUE)
for (int i=1; i<ncopy; ++i) {
memcpy(targetP + i*blocklen, targetP, blocklen*sizeof(int));
}
} break;
case REALSXP: {
const double *restrict sourceP = REAL(source);
double *restrict targetP = REAL(target);
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for num_threads(getDTthreads()) if (thislen > OMP_MIN_VALUE)
for (int i=0; i<thislen; ++i) {
const double item = sourceP[i];
const int end=(i+1)*eachrep;
for (int j=i*eachrep; j<end; ++j) targetP[j] = item;
}
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for num_threads(getDTthreads()) if (ncopy > OMP_MIN_VALUE)
for (int i=1; i<ncopy; ++i) {
memcpy(targetP + i*blocklen, targetP, blocklen*sizeof(double));
}
} break;
case CPLXSXP: {
const Rcomplex *restrict sourceP = COMPLEX(source);
Rcomplex *restrict targetP = COMPLEX(target);
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for num_threads(getDTthreads()) if (thislen > OMP_MIN_VALUE)
for (int i=0; i<thislen; ++i) {
const Rcomplex item = sourceP[i];
const int end=(i+1)*eachrep;
for (int j=i*eachrep; j<end; ++j) targetP[j] = item;
}
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for num_threads(getDTthreads()) if (ncopy > OMP_MIN_VALUE)
for (int i=1; i<ncopy; ++i) {
memcpy(targetP + i*blocklen, targetP, blocklen*sizeof(Rcomplex));
}
Expand Down Expand Up @@ -86,11 +89,151 @@ SEXP cj(SEXP base_list) {
}
} break;
default:
error(_("Type '%s' not supported by CJ."), type2char(TYPEOF(source)));
error(_("Type '%s' not supported by cross-join."), type2char(TYPEOF(source)));
}
eachrep *= thislen;
}
UNPROTECT(1);
return out;
}

SEXP unnest(SEXP x, SEXP cols) {
if (!INHERITS(x, char_datatable))
error(_("Input to funnest must be a data.table"));
int k = LENGTH(cols);
// nothing to unnest -- need to go further, just be sure to copy
if (k == 0)
return (duplicate(x));
int n = LENGTH(VECTOR_ELT(x, 0));
int p = LENGTH(x);

if (TYPEOF(cols) != INTSXP)
error(_("cols must be an integer vector, got %s"), type2char(TYPEOF(cols)));
int *colp = INTEGER(cols);

// ignore non-VECSXP elements of cols; use lcols and lk (list-only versions)
int lcols[k], j;
int lk=0;
for (int i=0; i<k; i++) {
j = colp[i];
if (j < 1 || j > p)
error(_("cols to unnest must be in [1, ncol(x)=%d], but cols[%d]=%d"), p, i, j);
switch(TYPEOF(VECTOR_ELT(x, j-1))) {
case RAWSXP :
case LGLSXP :
case INTSXP :
case REALSXP :
case CPLXSXP :
case STRSXP : break;
case VECSXP : {
lcols[lk++] = j-1; // move to 0-based
} break;
default:
error(_("Unsupported type for unnesting: '%s'"), type2char(TYPEOF(VECTOR_ELT(x, j-1))));
}
}
if (lk == 0)
return (duplicate(x));

int row_counts[n];

/* unnest the specified cols; each row is expanded with cj,
* then the end result is concatentated with rbindlist. in this way,
* we can let cj handle the crossing logic and rbindlist handle such
* things as type coercion
*/
SEXP cj_rowwise = PROTECT(allocVector(VECSXP, n));
for (int i=0; i<n; i++) {
SEXP nest_vals = PROTECT(allocVector(VECSXP, lk));
for (int j=0; j<lk; j++)
SET_VECTOR_ELT(nest_vals, j, VECTOR_ELT(VECTOR_ELT(x, lcols[j]), i));
SET_VECTOR_ELT(cj_rowwise, i, cj(nest_vals));
row_counts[i] = LENGTH(VECTOR_ELT(VECTOR_ELT(cj_rowwise, i), 0));
UNPROTECT(1);
}
SEXP unnest_lcols = rbindlist(cj_rowwise, ScalarLogical(FALSE), ScalarLogical(FALSE), R_NilValue);
UNPROTECT(1);
int out_rows = LENGTH(VECTOR_ELT(unnest_lcols, 0));

SEXP ans = PROTECT(allocVector(VECSXP, p));
for (int j=0, lj=0; j<p; j++) {
if (lj < lk && j == lcols[lj]) { // vec col: plonk from unnested value above
SET_VECTOR_ELT(ans, j, VECTOR_ELT(unnest_lcols, lj++));
} else { // non-vec col: simply cascade each row the right # of times
int outi=0;
SEXP xj = VECTOR_ELT(x, j), ansj;
SET_VECTOR_ELT(ans, j, ansj=allocVector(TYPEOF(xj), out_rows));
switch(TYPEOF(xj)) {
case RAWSXP: {
const Rbyte *source = RAW(xj);
Rbyte *target = RAW(ansj);
for (int i=0; i<n; i++) {
for(int repi=0; repi<row_counts[i]; repi++) {
memcpy(target + outi + repi, &source[i], sizeof(Rbyte));
}
outi += row_counts[i];
}
} break;
case LGLSXP:
case INTSXP: {
const int *source = INTEGER(xj);
int *target = INTEGER(ansj);
for (int i=0; i<n; i++) {
for(int repi=0; repi<row_counts[i]; repi++) {
memcpy(target + outi + repi, &source[i], sizeof(int));
}
outi += row_counts[i];
}
} break;
case REALSXP: {
const double *source = REAL(xj);
double *target = REAL(ansj);
for (int i=0; i<n; i++) {
for(int repi=0; repi<row_counts[i]; repi++) {
memcpy(target + outi + repi, &source[i], sizeof(double));
}
outi += row_counts[i];
}
} break;
case CPLXSXP: {
const Rcomplex *source = COMPLEX(xj);
Rcomplex *target = COMPLEX(ansj);
for (int i=0; i<n; i++) {
for(int repi=0; repi<row_counts[i]; repi++) {
memcpy(target + outi + repi, &source[i], sizeof(Rcomplex));
}
outi += row_counts[i];
}
} break;
case STRSXP: {
for (int i=0; i<n; i++) {
for(int repi=0; repi<row_counts[i]; repi++) {
SET_STRING_ELT(ansj, outi + repi, STRING_ELT(xj, i));
}
outi += row_counts[i];
}
} break;
case VECSXP: {
for (int i=0; i<n; i++) {
for(int repi=0; repi<row_counts[i]; repi++) {
SET_VECTOR_ELT(ansj, outi + repi, VECTOR_ELT(xj, i));
}
outi += row_counts[i];
}
} break;
default: error(_("Unsupported column type '%s'"), type2char(TYPEOF(xj)));
}
copyMostAttrib(xj, ansj);
}
}

// copy names
SEXP ansNames;
SEXP xNames = PROTECT(getAttrib(x, R_NamesSymbol));
setAttrib(ans, R_NamesSymbol, ansNames=allocVector(STRSXP, p));
for (int j=0; j<p; j++) {
SET_STRING_ELT(ansNames, j, STRING_ELT(xNames, j));
}
UNPROTECT(2);
return ans;
}
4 changes: 4 additions & 0 deletions src/data.table.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ bool GetVerbose();

// cj.c
SEXP cj(SEXP base_list);
SEXP unnest(SEXP x, SEXP cols);

// rbindlist.c
SEXP rbindlist(SEXP l, SEXP usenamesArg, SEXP fillArg, SEXP idcolArg);

// dogroups.c
SEXP keepattr(SEXP to, SEXP from);
Expand Down
2 changes: 2 additions & 0 deletions src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ SEXP lock();
SEXP unlock();
SEXP islockedR();
SEXP allNAR();
SEXP unnest();

// .Externals
SEXP fastmean();
Expand Down Expand Up @@ -211,6 +212,7 @@ R_CallMethodDef callMethods[] = {
{"CfrollapplyR", (DL_FUNC) &frollapplyR, -1},
{"CtestMsgR", (DL_FUNC) &testMsgR, -1},
{"C_allNAR", (DL_FUNC) &allNAR, -1},
{"Cunnest", (DL_FUNC) &unnest, -1},
{NULL, NULL, 0}
};

Expand Down
1 change: 1 addition & 0 deletions src/myomp.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@
#define omp_get_wtime() 0
#endif

#define OMP_MIN_VALUE 1024
1 change: 0 additions & 1 deletion src/rbindlist.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#include "data.table.h"
#include <Rdefines.h>
#include <ctype.h> // for isdigit

SEXP rbindlist(SEXP l, SEXP usenamesArg, SEXP fillArg, SEXP idcolArg)
{
Expand Down