Skip to content

Commit

Permalink
Revert "Auxiliary commit to revert individual files from 4d9a9e0"
Browse files Browse the repository at this point in the history
This reverts commit a82fdb699a13008b878deaab18ae85a440cf05af.
  • Loading branch information
ttnghia committed Oct 25, 2024
1 parent 4d9a9e0 commit 764a7a2
Showing 1 changed file with 274 additions and 5 deletions.
279 changes: 274 additions & 5 deletions java/src/main/java/ai/rapids/cudf/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -1092,6 +1092,224 @@ public static Table readJSON(Schema schema, JSONOptions opts, byte[] buffer) {
return readJSON(schema, opts, buffer, 0, buffer.length);
}

private static class DidViewChange {
ColumnVector changeWasNeeded = null;
boolean noChangeNeeded = false;

public static DidViewChange yes(ColumnVector cv) {
DidViewChange ret = new DidViewChange();
ret.changeWasNeeded = cv;
return ret;
}

public static DidViewChange no() {
DidViewChange ret = new DidViewChange();
ret.noChangeNeeded = true;
return ret;
}
}

private static DidViewChange gatherJSONColumns(Schema schema, TableWithMeta.NestedChildren children,
ColumnView cv) {
// We need to do this recursively to be sure it all matches as expected.
// If we run into problems where the data types don't match, we are not
// going to fix up the data types. We are only going to reorder the columns.
if (schema.getType() == DType.STRUCT) {
if (cv.getType() != DType.STRUCT) {
// The types don't match so just return the input unchanged...
return DidViewChange.no();
} else {
String[] foundNames;
if (children == null) {
foundNames = new String[0];
} else {
foundNames = children.getNames();
}
HashMap<String, Integer> indices = new HashMap<>();
for (int i = 0; i < foundNames.length; i++) {
indices.put(foundNames[i], i);
}
// We might need to rearrange the columns to match what we want.
DType[] types = schema.getChildTypes();
String[] neededNames = schema.getColumnNames();
ColumnView[] columns = new ColumnView[neededNames.length];
try {
boolean somethingChanged = false;
if (columns.length != foundNames.length) {
somethingChanged = true;
}
for (int i = 0; i < columns.length; i++) {
String neededColumnName = neededNames[i];
Integer index = indices.get(neededColumnName);
Schema childSchema = schema.getChild(i);
if (index != null) {
if (childSchema.isStructOrHasStructDescendant()) {
ColumnView child = cv.getChildColumnView(index);
boolean shouldCloseChild = true;
try {
if (index != i) {
somethingChanged = true;
}
DidViewChange childResult = gatherJSONColumns(schema.getChild(i),
children.getChild(index), child);
if (childResult.noChangeNeeded) {
shouldCloseChild = false;
columns[i] = child;
} else {
somethingChanged = true;
columns[i] = childResult.changeWasNeeded;
}
} finally {
if (shouldCloseChild) {
child.close();
}
}
} else {
if (index != i) {
somethingChanged = true;
}
columns[i] = cv.getChildColumnView(index);
}
} else {
somethingChanged = true;
if (types[i] == DType.LIST) {
try (Scalar s = Scalar.listFromNull(childSchema.getChild(0).asHostDataType())) {
columns[i] = ColumnVector.fromScalar(s, (int) cv.getRowCount());
}
} else if (types[i] == DType.STRUCT) {
int numStructChildren = childSchema.getNumChildren();
HostColumnVector.DataType[] structChildren = new HostColumnVector.DataType[numStructChildren];
for (int structChildIndex = 0; structChildIndex < numStructChildren; structChildIndex++) {
structChildren[structChildIndex] = childSchema.getChild(structChildIndex).asHostDataType();
}
try (Scalar s = Scalar.structFromNull(structChildren)) {
columns[i] = ColumnVector.fromScalar(s, (int) cv.getRowCount());
}
} else {
try (Scalar s = Scalar.fromNull(types[i])) {
columns[i] = ColumnVector.fromScalar(s, (int) cv.getRowCount());
}
}
}
}
if (somethingChanged) {
try (ColumnView ret = new ColumnView(cv.type, cv.rows, Optional.of(cv.nullCount),
cv.getValid(), null, columns)) {
return DidViewChange.yes(ret.copyToColumnVector());
}
} else {
return DidViewChange.no();
}
} finally {
for (ColumnView c: columns) {
if (c != null) {
c.close();
}
}
}
}
} else if (schema.getType() == DType.LIST && cv.getType() == DType.LIST) {
if (schema.isStructOrHasStructDescendant()) {
String [] childNames = children.getNames();
if (childNames.length == 2 &&
"offsets".equals(childNames[0]) &&
"element".equals(childNames[1])) {
try (ColumnView child = cv.getChildColumnView(0)){
DidViewChange listResult = gatherJSONColumns(schema.getChild(0),
children.getChild(1), child);
if (listResult.noChangeNeeded) {
return DidViewChange.no();
} else {
try (ColumnView listView = new ColumnView(cv.type, cv.rows,
Optional.of(cv.nullCount), cv.getValid(), cv.getOffsets(),
new ColumnView[]{listResult.changeWasNeeded})) {
return DidViewChange.yes(listView.copyToColumnVector());
} finally {
listResult.changeWasNeeded.close();
}
}
}
}
}
// Nothing to change so just return the input, but we need to inc a ref count to really
// make it work, so for now we are going to turn it into a ColumnVector.
return DidViewChange.no();
} else {
// Nothing to change so just return the input, but we need to inc a ref count to really
// make it work, so for now we are going to turn it into a ColumnVector.
return DidViewChange.no();
}
}

private static Table gatherJSONColumns(Schema schema, TableWithMeta twm, int emptyRowCount) {
String[] neededColumns = schema.getColumnNames();
if (neededColumns == null || neededColumns.length == 0) {
return twm.releaseTable();
} else {
String[] foundNames = twm.getColumnNames();
HashMap<String, Integer> indices = new HashMap<>();
for (int i = 0; i < foundNames.length; i++) {
indices.put(foundNames[i], i);
}
// We might need to rearrange the columns to match what we want.
DType[] types = schema.getChildTypes();
ColumnVector[] columns = new ColumnVector[neededColumns.length];
try (Table tbl = twm.releaseTable()) {
int rowCount = tbl == null ? emptyRowCount : (int)tbl.getRowCount();
if (rowCount < 0) {
throw new IllegalStateException(
"No empty row count provided and the table read has no row count or columns");
}
for (int i = 0; i < columns.length; i++) {
String neededColumnName = neededColumns[i];
Integer index = indices.get(neededColumnName);
if (index != null) {
if (schema.getChild(i).isStructOrHasStructDescendant()) {
DidViewChange gathered = gatherJSONColumns(schema.getChild(i), twm.getChild(index),
tbl.getColumn(index));
if (gathered.noChangeNeeded) {
columns[i] = tbl.getColumn(index).incRefCount();
} else {
columns[i] = gathered.changeWasNeeded;
}
} else {
columns[i] = tbl.getColumn(index).incRefCount();
}
} else {
if (types[i] == DType.LIST) {
Schema listSchema = schema.getChild(i);
Schema elementSchema = listSchema.getChild(0);
try (Scalar s = Scalar.listFromNull(elementSchema.asHostDataType())) {
columns[i] = ColumnVector.fromScalar(s, rowCount);
}
} else if (types[i] == DType.STRUCT) {
Schema structSchema = schema.getChild(i);
int numStructChildren = structSchema.getNumChildren();
DataType[] structChildrenTypes = new DataType[numStructChildren];
for (int j = 0; j < numStructChildren; j++) {
structChildrenTypes[j] = structSchema.getChild(j).asHostDataType();
}
try (Scalar s = Scalar.structFromNull(structChildrenTypes)) {
columns[i] = ColumnVector.fromScalar(s, rowCount);
}
} else {
try (Scalar s = Scalar.fromNull(types[i])) {
columns[i] = ColumnVector.fromScalar(s, rowCount);
}
}
}
}
return new Table(columns);
} finally {
for (ColumnVector c: columns) {
if (c != null) {
c.close();
}
}
}
}
}

/**
* Read a JSON file.
* @param schema the schema of the file. You may use Schema.INFERRED to infer the schema.
Expand Down Expand Up @@ -1121,7 +1339,8 @@ public static Table readJSON(Schema schema, JSONOptions opts, File path) {
cudfPruneSchema,
opts.experimental(),
opts.getLineDelimiter()))) {
return twm.releaseTable();

return gatherJSONColumns(schema, twm, -1);
}
}

Expand All @@ -1137,6 +1356,23 @@ public static Table readJSON(Schema schema, JSONOptions opts, File path) {
*/
public static Table readJSON(Schema schema, JSONOptions opts, byte[] buffer, long offset,
long len, HostMemoryAllocator hostMemoryAllocator) {
return readJSON(schema, opts, buffer, offset, len, hostMemoryAllocator, -1);
}

/**
* Read JSON formatted data.
* @param schema the schema of the data. You may use Schema.INFERRED to infer the schema.
* @param opts various JSON parsing options.
* @param buffer raw UTF8 formatted bytes.
* @param offset the starting offset into buffer.
* @param len the number of bytes to parse.
* @param hostMemoryAllocator allocator for host memory buffers
* @param emptyRowCount the number of rows to return if no columns were read.
* @return the data parsed as a table on the GPU.
*/
public static Table readJSON(Schema schema, JSONOptions opts, byte[] buffer, long offset,
long len, HostMemoryAllocator hostMemoryAllocator,
int emptyRowCount) {
if (len <= 0) {
len = buffer.length - offset;
}
Expand All @@ -1145,10 +1381,16 @@ public static Table readJSON(Schema schema, JSONOptions opts, byte[] buffer, lon
assert offset >= 0 && offset < buffer.length;
try (HostMemoryBuffer newBuf = hostMemoryAllocator.allocate(len)) {
newBuf.setBytes(0, buffer, offset, len);
return readJSON(schema, opts, newBuf, 0, len);
return readJSON(schema, opts, newBuf, 0, len, emptyRowCount);
}
}

public static Table readJSON(Schema schema, JSONOptions opts, byte[] buffer, long offset,
long len, int emptyRowCount) {
return readJSON(schema, opts, buffer, offset, len, DefaultHostMemoryAllocator.get(),
emptyRowCount);
}

public static Table readJSON(Schema schema, JSONOptions opts, byte[] buffer, long offset,
long len) {
return readJSON(schema, opts, buffer, offset, len, DefaultHostMemoryAllocator.get());
Expand Down Expand Up @@ -1222,7 +1464,22 @@ public static TableWithMeta readAndInferJSON(JSONOptions opts, DataSource ds) {
* @return the data parsed as a table on the GPU.
*/
public static Table readJSON(Schema schema, JSONOptions opts, HostMemoryBuffer buffer,
long offset, long len) {
long offset, long len) {
return readJSON(schema, opts, buffer, offset, len, -1);
}

/**
* Read JSON formatted data.
* @param schema the schema of the data. You may use Schema.INFERRED to infer the schema.
* @param opts various JSON parsing options.
* @param buffer raw UTF8 formatted bytes.
* @param offset the starting offset into buffer.
* @param len the number of bytes to parse.
* @param emptyRowCount the number of rows to use if no columns were found.
* @return the data parsed as a table on the GPU.
*/
public static Table readJSON(Schema schema, JSONOptions opts, HostMemoryBuffer buffer,
long offset, long len, int emptyRowCount) {
if (len <= 0) {
len = buffer.length - offset;
}
Expand Down Expand Up @@ -1251,7 +1508,7 @@ public static Table readJSON(Schema schema, JSONOptions opts, HostMemoryBuffer b
cudfPruneSchema,
opts.experimental(),
opts.getLineDelimiter()))) {
return twm.releaseTable();
return gatherJSONColumns(schema, twm, emptyRowCount);
}
}

Expand All @@ -1263,6 +1520,18 @@ public static Table readJSON(Schema schema, JSONOptions opts, HostMemoryBuffer b
* @return the data parsed as a table on the GPU.
*/
public static Table readJSON(Schema schema, JSONOptions opts, DataSource ds) {
return readJSON(schema, opts, ds, -1);
}

/**
* Read JSON formatted data.
* @param schema the schema of the data. You may use Schema.INFERRED to infer the schema.
* @param opts various JSON parsing options.
* @param ds the DataSource to read from.
* @param emptyRowCount the number of rows to return if no columns were read.
* @return the data parsed as a table on the GPU.
*/
public static Table readJSON(Schema schema, JSONOptions opts, DataSource ds, int emptyRowCount) {
long dsHandle = DataSourceHelper.createWrapperDataSource(ds);
// only prune the schema if one is provided
boolean cudfPruneSchema = schema.getColumnNames() != null &&
Expand All @@ -1285,7 +1554,7 @@ public static Table readJSON(Schema schema, JSONOptions opts, DataSource ds) {
opts.experimental(),
opts.getLineDelimiter(),
dsHandle))) {
return twm.releaseTable();
return gatherJSONColumns(schema, twm, emptyRowCount);
} finally {
DataSourceHelper.destroyWrapperDataSource(dsHandle);
}
Expand Down

0 comments on commit 764a7a2

Please sign in to comment.