Skip to content

Commit

Permalink
ARROW-6601: [Java] Improve JDBC adapter performance & add benchmark
Browse files Browse the repository at this point in the history
Related to [ARROW-6601](https://issues.apache.org/jira/browse/ARROW-6601).

Add a performance test as well to get a baseline number, to avoid performance regression when we change related code.

When working with Jdbc adapter benchmark, I found the jmh result is very worse (about 1680000 ns/op), and I finally found that when we initialize a VectorSchemaRoot,  JdbcToArrowUtils#allocateVectors is invoked which is time consuming, and this is not necessary since we use setSafe API in consumers. After remove this, the jmh result is about 2000 ns/op (3 coulumns with valueCount = 3000).

I think this one should merged into 0.15 release.

Closes apache#5472 from tianchen92/ARROW-6601 and squashes the following commits:

fa97680 <tianchen>  Improve JDBC adapter performance & add benchmark

Authored-by: tianchen <[email protected]>
Signed-off-by: Micah Kornfield <[email protected]>
  • Loading branch information
tianchen92 authored and emkornfield committed Sep 24, 2019
1 parent cde09f7 commit 35b6d07
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ private VectorSchemaRoot createVectorSchemaRoot() {
VectorSchemaRoot root = null;
try {
root = VectorSchemaRoot.create(schema, config.getAllocator());
JdbcToArrowUtils.allocateVectors(root, JdbcToArrowUtils.DEFAULT_BUFFER_SIZE);
} catch (Exception e) {
if (root != null) {
root.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import org.apache.arrow.adapter.jdbc.consumer.VarCharConsumer;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.BaseFixedWidthVector;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.DateMilliVector;
Expand Down Expand Up @@ -87,8 +86,6 @@
*/
public class JdbcToArrowUtils {

public static final int DEFAULT_BUFFER_SIZE = 256;

private static final int JDBC_ARRAY_VALUE_COLUMN = 2;

/**
Expand Down Expand Up @@ -308,18 +305,6 @@ private static JdbcFieldInfo getJdbcFieldInfoForArraySubType(
return fieldInfo;
}

static void allocateVectors(VectorSchemaRoot root, int size) {
List<FieldVector> vectors = root.getFieldVectors();
for (FieldVector fieldVector : vectors) {
if (fieldVector instanceof BaseFixedWidthVector) {
((BaseFixedWidthVector) fieldVector).allocateNew(size);
} else {
fieldVector.allocateNew();
}
fieldVector.setInitialCapacity(size);
}
}

/**
* Iterate the given JDBC {@link ResultSet} object to fetch the data and transpose it to populate
* the given Arrow Vector objects.
Expand Down Expand Up @@ -352,7 +337,6 @@ public static void jdbcToArrowVectors(ResultSet rs, VectorSchemaRoot root, JdbcT

ResultSetMetaData rsmd = rs.getMetaData();
int columnCount = rsmd.getColumnCount();
allocateVectors(root, DEFAULT_BUFFER_SIZE);

JdbcConsumer[] consumers = new JdbcConsumer[columnCount];
for (int i = 1; i <= columnCount; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ public class BinaryConsumer implements JdbcConsumer<VarBinaryVector> {
* Instantiate a BinaryConsumer.
*/
public BinaryConsumer(VarBinaryVector vector, int index) {
if (vector != null) {
vector.allocateNewSafe();
}
this.vector = vector;
this.columnIndexInResultSet = index;
}
Expand Down Expand Up @@ -97,6 +100,7 @@ public void close() throws Exception {
@Override
public void resetValueVector(VarBinaryVector vector) {
this.vector = vector;
this.vector.allocateNewSafe();
this.currentIndex = 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ public class ClobConsumer implements JdbcConsumer<VarCharVector> {
* Instantiate a ClobConsumer.
*/
public ClobConsumer(VarCharVector vector, int index) {
if (vector != null) {
vector.allocateNewSafe();
}
this.vector = vector;
this.columnIndexInResultSet = index;
}
Expand Down Expand Up @@ -92,6 +95,7 @@ public void close() throws Exception {
@Override
public void resetValueVector(VarCharVector vector) {
this.vector = vector;
this.vector.allocateNewSafe();
this.currentIndex = 0;
}
}
11 changes: 11 additions & 0 deletions java/performance/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,17 @@
<artifactId>arrow-avro</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>1.4.196</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-jdbc</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

<properties>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.adapter;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.concurrent.TimeUnit;

import org.apache.arrow.adapter.jdbc.ArrowVectorIterator;
import org.apache.arrow.adapter.jdbc.JdbcToArrow;
import org.apache.arrow.adapter.jdbc.JdbcToArrowConfig;
import org.apache.arrow.adapter.jdbc.JdbcToArrowConfigBuilder;
import org.apache.arrow.memory.BaseAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.junit.Test;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;

/**
* Benchmarks for avro adapter.
*/
@State(Scope.Benchmark)
public class JdbcAdapterBenchmarks {

private final int valueCount = 3000;
private JdbcToArrowConfig config;

private Connection conn = null;
private ResultSet resultSet = null;

private static final String CREATE_STATEMENT =
"CREATE TABLE test_table (f0 INT, f1 LONG, f2 VARCHAR, f3 BOOLEAN);";
private static final String INSERT_STATEMENT =
"INSERT INTO test_table (f0, f1, f2, f3) VALUES (?, ?, ?, ?);";
private static final String QUERY = "SELECT f0, f1, f2, f3 FROM test_table;";
private static final String DROP_STATEMENT = "DROP TABLE test_table;";

/**
* Setup benchmarks.
*/
@Setup
public void prepare() throws Exception {
BaseAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
config = new JdbcToArrowConfigBuilder().setAllocator(allocator).setTargetBatchSize(1024).build();

String url = "jdbc:h2:mem:JdbcAdapterBenchmarks";
String driver = "org.h2.Driver";
Class.forName(driver);
conn = DriverManager.getConnection(url);
try (Statement stmt = conn.createStatement()) {
stmt.executeUpdate(CREATE_STATEMENT);
}

for (int i = 0; i < valueCount; i++) {
// Insert data
try (PreparedStatement stmt = conn.prepareStatement(INSERT_STATEMENT)) {

stmt.setInt(1, i);
stmt.setLong(2, i);
stmt.setString(3, "test" + i);
stmt.setBoolean(4, i % 2 == 0);
stmt.executeUpdate();
}
}

resultSet = conn.createStatement().executeQuery(QUERY);

}

/**
* Tear down benchmarks.
*/
@TearDown
public void tearDown() throws Exception {
try (Statement stmt = conn.createStatement()) {
stmt.executeUpdate(DROP_STATEMENT);
} finally {
if (conn != null) {
conn.close();
conn = null;
}
}
}

/**
* Test {@link JdbcToArrow#sqlToArrowVectorIterator(ResultSet, JdbcToArrowConfig)}
* @return useless. To avoid DCE by JIT.
*/
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public int testJdbcToArrow() throws Exception {
int valueCount = 0;
try (ArrowVectorIterator iter = JdbcToArrow.sqlToArrowVectorIterator(resultSet, config)) {
while (iter.hasNext()) {
VectorSchemaRoot root = iter.next();
IntVector intVector = (IntVector) root.getFieldVectors().get(0);
valueCount += intVector.getValueCount();
root.close();
}
}
return valueCount;
}

@Test
public void evaluate() throws RunnerException {
Options opt = new OptionsBuilder()
.include(JdbcAdapterBenchmarks.class.getSimpleName())
.forks(1)
.build();

new Runner(opt).run();
}
}

0 comments on commit 35b6d07

Please sign in to comment.