Skip to content

Commit

Permalink
[CONJ-1173] Bulk implementation returning individual results
Browse files Browse the repository at this point in the history
  • Loading branch information
rusher committed Apr 23, 2024
1 parent da50647 commit c9786f2
Show file tree
Hide file tree
Showing 22 changed files with 493 additions and 426 deletions.
289 changes: 266 additions & 23 deletions src/main/java/org/mariadb/jdbc/BasePreparedStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
// Copyright (c) 2015-2024 MariaDB Corporation Ab
package org.mariadb.jdbc;

import static org.mariadb.jdbc.util.constants.Capabilities.BULK_UNIT_RESULTS;

import java.io.InputStream;
import java.io.Reader;
import java.math.BigDecimal;
Expand All @@ -12,16 +14,25 @@
import java.sql.ParameterMetaData;
import java.util.*;
import org.mariadb.jdbc.client.ColumnDecoder;
import org.mariadb.jdbc.client.Completion;
import org.mariadb.jdbc.client.DataType;
import org.mariadb.jdbc.client.result.CompleteResult;
import org.mariadb.jdbc.client.result.Result;
import org.mariadb.jdbc.client.util.ClosableLock;
import org.mariadb.jdbc.client.util.Parameters;
import org.mariadb.jdbc.codec.*;
import org.mariadb.jdbc.export.ExceptionFactory;
import org.mariadb.jdbc.export.Prepare;
import org.mariadb.jdbc.message.ClientMessage;
import org.mariadb.jdbc.message.client.BulkExecutePacket;
import org.mariadb.jdbc.message.client.PreparePacket;
import org.mariadb.jdbc.message.server.OkPacket;
import org.mariadb.jdbc.message.server.PrepareResultPacket;
import org.mariadb.jdbc.plugin.Codec;
import org.mariadb.jdbc.plugin.codec.*;
import org.mariadb.jdbc.util.ClientParser;
import org.mariadb.jdbc.util.ParameterList;
import org.mariadb.jdbc.util.constants.ServerStatus;
import org.mariadb.jdbc.util.constants.ColumnFlags;
import org.mariadb.jdbc.util.timeout.QueryTimeoutHandler;

/** Common methods for prepare statement, for client and server prepare statement. */
public abstract class BasePreparedStatement extends Statement implements PreparedStatement {
Expand All @@ -38,7 +49,7 @@ public abstract class BasePreparedStatement extends Statement implements Prepare
/** PREPARE command result */
protected Prepare prepareResult = null;

protected Boolean isCommandInsert = null;
protected final boolean canCachePrepStmts;

/**
* Constructor
Expand All @@ -59,6 +70,7 @@ public BasePreparedStatement(
ClosableLock lock,
boolean canUseServerTimeout,
boolean canUseServerMaxRows,
boolean canCachePrepStmts,
int autoGeneratedKeys,
int resultSetType,
int resultSetConcurrency,
Expand All @@ -73,6 +85,7 @@ public BasePreparedStatement(
resultSetConcurrency,
defaultFetchSize);
this.sql = sql;
this.canCachePrepStmts = canCachePrepStmts;
}

@Override
Expand All @@ -94,20 +107,6 @@ public String toString() {
return sb.toString();
}

protected void checkIfInsertCommand() {
if (isCommandInsert == null) {
if (sql == null) {
isCommandInsert = false;
} else {
ClientParser parser =
ClientParser.parameterParts(
sql, (con.getContext().getServerStatus() & ServerStatus.NO_BACKSLASH_ESCAPES) > 0);
isInsertDuplicate = parser.isInsertDuplicate();
isCommandInsert = parser.isInsert() && !isInsertDuplicate;
}
}
}

@Override
public String getLastSql() {
return sql;
Expand Down Expand Up @@ -173,12 +172,6 @@ public void setParameter(int index, org.mariadb.jdbc.client.util.Parameter param
parameters.set(index, param);
}

@Override
public abstract int[] executeBatch() throws SQLException;

@Override
public abstract long[] executeLargeBatch() throws SQLException;

// ***************************************************************************************************
// methods inherited from Statement that are disabled
// ***************************************************************************************************
Expand Down Expand Up @@ -1615,4 +1608,254 @@ public void setObject(int parameterIndex, Object x, SQLType targetSqlType) throw
targetSqlType == null ? null : targetSqlType.getVendorTypeNumber(),
null);
}

protected abstract boolean executeInternalPreparedBatch() throws SQLException;

@Override
@SuppressWarnings("try")
public int[] executeBatch() throws SQLException {
checkNotClosed();
if (batchParameters == null || batchParameters.isEmpty()) return new int[0];
try (ClosableLock ignore = lock.closeableLock();
QueryTimeoutHandler ignore2 = this.con.handleTimeout(queryTimeout)) {
boolean wasBulk = executeInternalPreparedBatch();

int[] updates = new int[batchParameters.size()];

// server 11.5 return a result-set with all unitary results
if (wasBulk && con.getContext().hasClientCapability(BULK_UNIT_RESULTS)) {
int updateIdx = 0;
Arrays.fill(updates, Statement.SUCCESS_NO_INFO);
for (Completion completion : results) {
if (completion instanceof CompleteResult
&& ((CompleteResult) completion).isBulkResult()) {
Result unitaryResults = ((CompleteResult) completion);
if (unitaryResults.isBulkResult()) {
unitaryResults.beforeFirst();
while (unitaryResults.next()) {
updates[updateIdx++] = unitaryResults.getInt(2);
}
}
}
}
currResult = results.remove(0);
return updates;
}

// specific case for BULK INSERT
// return not Statement.SUCCESS_NO_INFO, but 1
if (wasBulk && clientParser.isInsert() && !clientParser.isInsertDuplicate()) {
int numberOfResult = 0;
for (int i = 0; i < results.size(); i++) {
numberOfResult += (int) ((OkPacket) results.get(i)).getAffectedRows();
}
if (numberOfResult == updates.length) {
Arrays.fill(updates, 1);
currResult = results.remove(0);
return updates;
}
}

if (results.size() != updates.length) {
Arrays.fill(updates, Statement.SUCCESS_NO_INFO);
} else {
for (int i = 0; i < updates.length; i++) {
if (results.get(i) instanceof OkPacket) {
updates[i] = (int) ((OkPacket) results.get(i)).getAffectedRows();
} else {
updates[i] = org.mariadb.jdbc.Statement.SUCCESS_NO_INFO;
}
}
}
currResult = results.remove(0);
return updates;

} catch (SQLException e) {
results = null;
currResult = null;
throw e;
} finally {
localInfileInputStream = null;
batchParameters.clear();
}
}

@Override
@SuppressWarnings("try")
public long[] executeLargeBatch() throws SQLException {
checkNotClosed();
if (batchParameters == null || batchParameters.isEmpty()) return new long[0];
try (ClosableLock ignore = lock.closeableLock();
QueryTimeoutHandler ignore2 = this.con.handleTimeout(queryTimeout)) {
boolean wasBulk = executeInternalPreparedBatch();

long[] updates = new long[batchParameters.size()];

// server 11.5 return a result-set with all unitary results
if (wasBulk && con.getContext().hasClientCapability(BULK_UNIT_RESULTS)) {
int updateIdx = 0;
Arrays.fill(updates, Statement.SUCCESS_NO_INFO);
for (Completion completion : results) {
if (completion instanceof CompleteResult
&& ((CompleteResult) completion).isBulkResult()) {
Result unitaryResults = ((CompleteResult) completion);
if (unitaryResults.isBulkResult()) {
unitaryResults.beforeFirst();
while (unitaryResults.next()) {
updates[updateIdx++] = unitaryResults.getLong(2);
}
}
}
}
currResult = results.remove(0);
return updates;
}

// specific case for BULK INSERT
// return not Statement.SUCCESS_NO_INFO, but 1
if (wasBulk && clientParser.isInsert() && !clientParser.isInsertDuplicate()) {
long numberOfResult = 0;
for (int i = 0; i < results.size(); i++) {
numberOfResult += ((OkPacket) results.get(i)).getAffectedRows();
}
if (numberOfResult == updates.length) {
Arrays.fill(updates, 1);
currResult = results.remove(0);
return updates;
}
}

if (results.size() != updates.length) {
Arrays.fill(updates, Statement.SUCCESS_NO_INFO);
} else {
for (int i = 0; i < updates.length; i++) {
if (results.get(i) instanceof OkPacket) {
updates[i] = ((OkPacket) results.get(i)).getAffectedRows();
} else {
updates[i] = org.mariadb.jdbc.Statement.SUCCESS_NO_INFO;
}
}
}
currResult = results.remove(0);
return updates;

} catch (SQLException e) {
results = null;
currResult = null;
throw e;
} finally {
batchParameters.clear();
}
}

/**
* Send COM_STMT_PREPARE + X * COM_STMT_BULK_EXECUTE, then read for the all answers
*
* @param cmd command
* @throws SQLException if IOException / Command error
*/
protected void executeBatchBulk(String cmd) throws SQLException {
List<Completion> res;
if (prepareResult == null && canCachePrepStmts)
prepareResult = con.getContext().getPrepareCache().get(cmd, this);
try {
if (prepareResult == null) {
ClientMessage[] packets;
packets =
new ClientMessage[] {
new PreparePacket(cmd), new BulkExecutePacket(null, batchParameters, cmd, this)
};
res =
con.getClient()
.executePipeline(
packets,
this,
0,
maxRows,
ResultSet.CONCUR_READ_ONLY,
ResultSet.TYPE_FORWARD_ONLY,
closeOnCompletion,
false);

// in case of failover, prepare is done in failover, skipping prepare result
if (res.get(0) instanceof PrepareResultPacket) {
results = res.subList(1, res.size());
} else {
results = res;
}
} else {
results =
con.getClient()
.execute(
new BulkExecutePacket(prepareResult, batchParameters, cmd, this),
this,
0,
maxRows,
ResultSet.CONCUR_READ_ONLY,
ResultSet.TYPE_FORWARD_ONLY,
closeOnCompletion,
false);
}

} catch (SQLException bue) {
results = null;
throw exceptionFactory()
.createBatchUpdate(Collections.emptyList(), batchParameters.size(), bue);
}
}

/**
* reset prepare statement in case of a failover. (Command need then to be re-prepared on server)
*/
public void reset() {
lock.lock();
try {
prepareResult = null;
} finally {
lock.unlock();
}
}

@Override
public ResultSet getGeneratedKeys() throws SQLException {
checkNotClosed();
if (autoGeneratedKeys != java.sql.Statement.RETURN_GENERATED_KEYS) {
throw new SQLException(
"Cannot return generated keys: query was not set with Statement.RETURN_GENERATED_KEYS");
}

List<String[]> insertIds = new ArrayList<>();
if (currResult != null) {
List<Completion> resList = new ArrayList<>(results);
resList.add(currResult);
for (Completion completion : resList) {
if (completion instanceof CompleteResult && ((CompleteResult) completion).isBulkResult()) {
Result unitaryResults = ((CompleteResult) completion);
if (unitaryResults.isBulkResult()) {
unitaryResults.beforeFirst();
while (unitaryResults.next()) {
int autoGeneratedId = unitaryResults.getInt(1);
if (autoGeneratedId != 0) {
insertIds.add(new String[] {"" + autoGeneratedId});
}
}
}
}
}

if (!insertIds.isEmpty()) {
// has unit results
String[][] ids = insertIds.toArray(new String[0][]);
return CompleteResult.createResultSet(
"insert_id",
DataType.BIGINT,
ids,
con.getContext(),
ColumnFlags.AUTO_INCREMENT | ColumnFlags.UNSIGNED,
resultSetType);
}
}

return super.getGeneratedKeys();
}
}
Loading

0 comments on commit c9786f2

Please sign in to comment.