Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade Delta Kernel dependency from 3.1.0 to 3.2.0 #16513

Merged
merged 2 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion extensions-contrib/druid-deltalake-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
<modelVersion>4.0.0</modelVersion>

<properties>
<delta-kernel.version>3.1.0</delta-kernel.version>
<delta-kernel.version>3.2.0</delta-kernel.version>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@
import io.delta.kernel.ScanBuilder;
import io.delta.kernel.Snapshot;
import io.delta.kernel.Table;
import io.delta.kernel.TableNotFoundException;
import io.delta.kernel.client.TableClient;
import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.data.FilteredColumnarBatch;
import io.delta.kernel.data.Row;
import io.delta.kernel.defaults.client.DefaultTableClient;
import io.delta.kernel.defaults.engine.DefaultEngine;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.TableNotFoundException;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.internal.InternalScanFileUtils;
import io.delta.kernel.internal.data.ScanStateRow;
Expand Down Expand Up @@ -120,7 +120,7 @@ public boolean needsFormat()

/**
* Instantiates a {@link DeltaInputSourceReader} to read the Delta table rows. If a {@link DeltaSplit} is supplied,
* the Delta files and schema are obtained from it to instantiate the reader. Otherwise, a Delta table client is
* the Delta files and schema are obtained from it to instantiate the reader. Otherwise, the Delta engine is
* instantiated with the supplied configuration to read the table.
*
* @param inputRowSchema schema for {@link org.apache.druid.data.input.InputRow}
Expand All @@ -134,40 +134,40 @@ public InputSourceReader reader(
File temporaryDirectory
)
{
final TableClient tableClient = createTableClient();
final Engine engine = createDeltaEngine();
try {
final List<CloseableIterator<FilteredColumnarBatch>> scanFileDataIters = new ArrayList<>();

if (deltaSplit != null) {
final Row scanState = deserialize(tableClient, deltaSplit.getStateRow());
final Row scanState = deserialize(engine, deltaSplit.getStateRow());
final StructType physicalReadSchema =
ScanStateRow.getPhysicalDataReadSchema(tableClient, scanState);
ScanStateRow.getPhysicalDataReadSchema(engine, scanState);

for (String file : deltaSplit.getFiles()) {
final Row scanFile = deserialize(tableClient, file);
final Row scanFile = deserialize(engine, file);
scanFileDataIters.add(
getTransformedDataIterator(tableClient, scanState, scanFile, physicalReadSchema, Optional.empty())
getTransformedDataIterator(engine, scanState, scanFile, physicalReadSchema, Optional.empty())
);
}
} else {
final Table table = Table.forPath(tableClient, tablePath);
final Snapshot latestSnapshot = table.getLatestSnapshot(tableClient);
final StructType fullSnapshotSchema = latestSnapshot.getSchema(tableClient);
final Table table = Table.forPath(engine, tablePath);
final Snapshot latestSnapshot = table.getLatestSnapshot(engine);
final StructType fullSnapshotSchema = latestSnapshot.getSchema(engine);
final StructType prunedSchema = pruneSchema(
fullSnapshotSchema,
inputRowSchema.getColumnsFilter()
);

final ScanBuilder scanBuilder = latestSnapshot.getScanBuilder(tableClient);
final ScanBuilder scanBuilder = latestSnapshot.getScanBuilder(engine);
if (filter != null) {
scanBuilder.withFilter(tableClient, filter.getFilterPredicate(fullSnapshotSchema));
scanBuilder.withFilter(engine, filter.getFilterPredicate(fullSnapshotSchema));
}
final Scan scan = scanBuilder.withReadSchema(tableClient, prunedSchema).build();
final CloseableIterator<FilteredColumnarBatch> scanFilesIter = scan.getScanFiles(tableClient);
final Row scanState = scan.getScanState(tableClient);
final Scan scan = scanBuilder.withReadSchema(engine, prunedSchema).build();
final CloseableIterator<FilteredColumnarBatch> scanFilesIter = scan.getScanFiles(engine);
final Row scanState = scan.getScanState(engine);

final StructType physicalReadSchema =
ScanStateRow.getPhysicalDataReadSchema(tableClient, scanState);
ScanStateRow.getPhysicalDataReadSchema(engine, scanState);

while (scanFilesIter.hasNext()) {
final FilteredColumnarBatch scanFileBatch = scanFilesIter.next();
Expand All @@ -176,7 +176,7 @@ public InputSourceReader reader(
while (scanFileRows.hasNext()) {
final Row scanFile = scanFileRows.next();
scanFileDataIters.add(
getTransformedDataIterator(tableClient, scanState, scanFile, physicalReadSchema, scan.getRemainingFilter())
getTransformedDataIterator(engine, scanState, scanFile, physicalReadSchema, scan.getRemainingFilter())
);
}
}
Expand All @@ -203,26 +203,26 @@ public Stream<InputSplit<DeltaSplit>> createSplits(InputFormat inputFormat, @Nul
return Stream.of(new InputSplit<>(deltaSplit));
}

final TableClient tableClient = createTableClient();
final Engine engine = createDeltaEngine();
final Snapshot latestSnapshot;
final Table table = Table.forPath(engine, tablePath);
try {
final Table table = Table.forPath(tableClient, tablePath);
latestSnapshot = table.getLatestSnapshot(tableClient);
latestSnapshot = table.getLatestSnapshot(engine);
}
catch (TableNotFoundException e) {
throw InvalidInput.exception(e, "tablePath[%s] not found.", tablePath);
}
final StructType fullSnapshotSchema = latestSnapshot.getSchema(tableClient);
final StructType fullSnapshotSchema = latestSnapshot.getSchema(engine);

final ScanBuilder scanBuilder = latestSnapshot.getScanBuilder(tableClient);
final ScanBuilder scanBuilder = latestSnapshot.getScanBuilder(engine);
if (filter != null) {
scanBuilder.withFilter(tableClient, filter.getFilterPredicate(fullSnapshotSchema));
scanBuilder.withFilter(engine, filter.getFilterPredicate(fullSnapshotSchema));
}
final Scan scan = scanBuilder.withReadSchema(tableClient, fullSnapshotSchema).build();
final Scan scan = scanBuilder.withReadSchema(engine, fullSnapshotSchema).build();
// scan files iterator for the current snapshot
final CloseableIterator<FilteredColumnarBatch> scanFilesIterator = scan.getScanFiles(tableClient);
final CloseableIterator<FilteredColumnarBatch> scanFilesIterator = scan.getScanFiles(engine);

final Row scanState = scan.getScanState(tableClient);
final Row scanState = scan.getScanState(engine);
final String scanStateStr = RowSerde.serializeRowToJson(scanState);

Iterator<DeltaSplit> deltaSplitIterator = Iterators.transform(
Expand Down Expand Up @@ -256,9 +256,9 @@ public InputSource withSplit(InputSplit<DeltaSplit> split)
);
}

private Row deserialize(TableClient tableClient, String row)
private Row deserialize(Engine engine, String row)
{
return RowSerde.deserializeRowFromJson(tableClient, row);
return RowSerde.deserializeRowFromJson(engine, row);
}

/**
Expand All @@ -285,17 +285,17 @@ private StructType pruneSchema(final StructType baseSchema, final ColumnsFilter
}

/**
* @return a table client where the client is initialized with {@link Configuration} class that uses the class's
* @return a Delta engine initialized with {@link Configuration} class that uses the class's
* class loader instead of the context classloader. The latter by default doesn't know about the extension classes,
* so the table client cannot load runtime classes resulting in {@link ClassNotFoundException}.
* so the Delta engine cannot load runtime classes resulting in {@link ClassNotFoundException}.
*/
private TableClient createTableClient()
private Engine createDeltaEngine()
{
final ClassLoader currCtxClassloader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
final Configuration conf = new Configuration();
return DefaultTableClient.create(conf);
return DefaultEngine.create(conf);
}
finally {
Thread.currentThread().setContextClassLoader(currCtxClassloader);
Expand All @@ -308,7 +308,7 @@ private TableClient createTableClient()
* SingleThreadedTableReader.java</a>.
*/
private CloseableIterator<FilteredColumnarBatch> getTransformedDataIterator(
final TableClient tableClient,
final Engine engine,
final Row scanState,
final Row scanFile,
final StructType physicalReadSchema,
Expand All @@ -317,14 +317,14 @@ private CloseableIterator<FilteredColumnarBatch> getTransformedDataIterator(
{
final FileStatus fileStatus = InternalScanFileUtils.getAddFileStatus(scanFile);

final CloseableIterator<ColumnarBatch> physicalDataIter = tableClient.getParquetHandler().readParquetFiles(
final CloseableIterator<ColumnarBatch> physicalDataIter = engine.getParquetHandler().readParquetFiles(
Utils.singletonCloseableIterator(fileStatus),
physicalReadSchema,
optionalPredicate
);

return Scan.transformPhysicalData(
tableClient,
engine,
scanState,
scanFile,
physicalDataIter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.delta.kernel.client.TableClient;
import io.delta.kernel.data.Row;
import io.delta.kernel.defaults.internal.data.DefaultJsonRow;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.internal.util.VectorUtils;
import io.delta.kernel.types.ArrayType;
import io.delta.kernel.types.BooleanType;
Expand Down Expand Up @@ -84,13 +84,12 @@ public static String serializeRowToJson(Row row)
/**
* Utility method to deserialize a {@link Row} object from the JSON form.
*/
public static Row deserializeRowFromJson(TableClient tableClient, String jsonRowWithSchema)
public static Row deserializeRowFromJson(Engine engine, String jsonRowWithSchema)
{
try {
JsonNode jsonNode = OBJECT_MAPPER.readTree(jsonRowWithSchema);
JsonNode schemaNode = jsonNode.get("schema");
StructType schema =
tableClient.getJsonHandler().deserializeStructType(schemaNode.asText());
StructType schema = engine.getJsonHandler().deserializeStructType(schemaNode.asText());
return parseRowFromJsonWithSchema((ObjectNode) jsonNode.get("row"), schema);
}
catch (JsonProcessingException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
package org.apache.druid.delta.input;

import io.delta.kernel.Scan;
import io.delta.kernel.TableNotFoundException;
import io.delta.kernel.client.TableClient;
import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.data.FilteredColumnarBatch;
import io.delta.kernel.data.Row;
import io.delta.kernel.defaults.client.DefaultTableClient;
import io.delta.kernel.defaults.engine.DefaultEngine;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.TableNotFoundException;
import io.delta.kernel.internal.InternalScanFileUtils;
import io.delta.kernel.internal.data.ScanStateRow;
import io.delta.kernel.internal.util.Utils;
Expand Down Expand Up @@ -68,13 +68,13 @@ public void testDeltaInputRow(
final List<Map<String, Object>> expectedRows
) throws TableNotFoundException, IOException
{
final TableClient tableClient = DefaultTableClient.create(new Configuration());
final Scan scan = DeltaTestUtils.getScan(tableClient, deltaTablePath);
final Engine engine = DefaultEngine.create(new Configuration());
final Scan scan = DeltaTestUtils.getScan(engine, deltaTablePath);

final Row scanState = scan.getScanState(tableClient);
final StructType physicalReadSchema = ScanStateRow.getPhysicalDataReadSchema(tableClient, scanState);
final Row scanState = scan.getScanState(engine);
final StructType physicalReadSchema = ScanStateRow.getPhysicalDataReadSchema(engine, scanState);

final CloseableIterator<FilteredColumnarBatch> scanFileIter = scan.getScanFiles(tableClient);
final CloseableIterator<FilteredColumnarBatch> scanFileIter = scan.getScanFiles(engine);
int totalRecordCount = 0;
while (scanFileIter.hasNext()) {
final FilteredColumnarBatch scanFileBatch = scanFileIter.next();
Expand All @@ -84,13 +84,13 @@ public void testDeltaInputRow(
final Row scanFile = scanFileRows.next();
final FileStatus fileStatus = InternalScanFileUtils.getAddFileStatus(scanFile);

final CloseableIterator<ColumnarBatch> physicalDataIter = tableClient.getParquetHandler().readParquetFiles(
final CloseableIterator<ColumnarBatch> physicalDataIter = engine.getParquetHandler().readParquetFiles(
Utils.singletonCloseableIterator(fileStatus),
physicalReadSchema,
Optional.empty()
);
final CloseableIterator<FilteredColumnarBatch> dataIter = Scan.transformPhysicalData(
tableClient,
engine,
scanState,
scanFile,
physicalDataIter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,19 @@
import io.delta.kernel.ScanBuilder;
import io.delta.kernel.Snapshot;
import io.delta.kernel.Table;
import io.delta.kernel.TableNotFoundException;
import io.delta.kernel.client.TableClient;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.TableNotFoundException;
import io.delta.kernel.types.StructType;

public class DeltaTestUtils
{
public static Scan getScan(final TableClient tableClient, final String deltaTablePath) throws TableNotFoundException
public static Scan getScan(final Engine engine, final String deltaTablePath) throws TableNotFoundException
{
final Table table = Table.forPath(tableClient, deltaTablePath);
final Snapshot snapshot = table.getLatestSnapshot(tableClient);
final StructType readSchema = snapshot.getSchema(tableClient);
final ScanBuilder scanBuilder = snapshot.getScanBuilder(tableClient)
.withReadSchema(tableClient, readSchema);
final Table table = Table.forPath(engine, deltaTablePath);
final Snapshot snapshot = table.getLatestSnapshot(engine);
final StructType readSchema = snapshot.getSchema(engine);
final ScanBuilder scanBuilder = snapshot.getScanBuilder(engine)
.withReadSchema(engine, readSchema);
return scanBuilder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
package org.apache.druid.delta.input;

import io.delta.kernel.Scan;
import io.delta.kernel.TableNotFoundException;
import io.delta.kernel.data.Row;
import io.delta.kernel.defaults.client.DefaultTableClient;
import io.delta.kernel.defaults.engine.DefaultEngine;
import io.delta.kernel.exceptions.TableNotFoundException;
import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

Expand All @@ -46,13 +46,13 @@ public static Collection<Object[]> data()
@ParameterizedTest(name = "{index}:with context {0}")
public void testSerializeDeserializeRoundtrip(final String tablePath) throws TableNotFoundException
{
final DefaultTableClient tableClient = DefaultTableClient.create(new Configuration());
final Scan scan = DeltaTestUtils.getScan(tableClient, tablePath);
final Row scanState = scan.getScanState(tableClient);
final DefaultEngine engine = DefaultEngine.create(new Configuration());
final Scan scan = DeltaTestUtils.getScan(engine, tablePath);
final Row scanState = scan.getScanState(engine);

final String rowJson = RowSerde.serializeRowToJson(scanState);
final Row row = RowSerde.deserializeRowFromJson(tableClient, rowJson);
final Row row = RowSerde.deserializeRowFromJson(engine, rowJson);

Assert.assertEquals(scanState.getSchema(), row.getSchema());
Assertions.assertEquals(scanState.getSchema(), row.getSchema());
}
}
Loading