Skip to content

Commit

Permalink
Merge pull request #1168 from BillyYccc/feature/mysql-query-pipelinin…
Browse files Browse the repository at this point in the history
…g-support

Add support for MySQL pipelining execution
  • Loading branch information
vietj authored May 9, 2022
2 parents 78ba717 + f71739f commit bef0b4b
Show file tree
Hide file tree
Showing 31 changed files with 829 additions and 155 deletions.
14 changes: 14 additions & 0 deletions vertx-mysql-client/src/main/asciidoc/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ scalability and low overhead.
* Complete data type support
* Stored Procedures support
* TLS/SSL support
* Query pipelining
* MySQL utilities commands support
* Working with MySQL and MariaDB
* Rich collation and charset support
Expand Down Expand Up @@ -92,6 +93,18 @@ You can easily get one from the pool:

Once you are done with the connection you must close it to release it to the pool, so it can be reused.

== Pool versus pooled client

The {@link io.vertx.mysqlclient.MySQLPool} allows you to create a pool or a pooled client

[source,$lang]
----
{@link examples.MySQLClientExamples#poolVersusPooledClient}
----

- pool operations are not pipelined, only connection client are pipelined
- pooled client operations are pipelined

== Pool sharing

include::pool_sharing.adoc[]
Expand Down Expand Up @@ -665,6 +678,7 @@ You can also manage the lifecycle of prepared statements manually by creating a

There is time when you want to batch insert data into the database, you can use `PreparedQuery#executeBatch` which provides a simple API to handle this.
Keep in mind that MySQL does not natively support batching protocol so the API is only a sugar by executing the prepared statement one after another, which means more network round trips are required comparing to inserting multiple rows by executing one prepared statement with a list of values.
In order to gain best performance on the wire, we execute the batch in pipelining mode which means the execution request is sent before the response of previous request returns, if your server or proxy does not support this feature or you might not be interested in it, you can use the single execution API and compose them by yourself instead.

=== tricky DATE & TIME data types

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ public static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json,
obj.setCollation((String)member.getValue());
}
break;
case "pipeliningLimit":
if (member.getValue() instanceof Number) {
obj.setPipeliningLimit(((Number)member.getValue()).intValue());
}
break;
case "serverRsaPublicKeyPath":
if (member.getValue() instanceof String) {
obj.setServerRsaPublicKeyPath((String)member.getValue());
Expand Down Expand Up @@ -81,6 +86,7 @@ public static void toJson(MySQLConnectOptions obj, java.util.Map<String, Object>
if (obj.getCollation() != null) {
json.put("collation", obj.getCollation());
}
json.put("pipeliningLimit", obj.getPipeliningLimit());
if (obj.getServerRsaPublicKeyPath() != null) {
json.put("serverRsaPublicKeyPath", obj.getServerRsaPublicKeyPath());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

package examples;

import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
Expand Down Expand Up @@ -45,7 +46,7 @@ public void gettingStarted() {
.setMaxSize(5);

// Create the client pool
MySQLPool client = MySQLPool.pool(connectOptions, poolOptions);
SqlClient client = MySQLPool.client(connectOptions, poolOptions);

// A simple query
client
Expand Down Expand Up @@ -212,6 +213,22 @@ public void connecting04(Vertx vertx) {
});
}

public void poolVersusPooledClient(Vertx vertx, String sql, MySQLConnectOptions connectOptions, PoolOptions poolOptions) {

// Pooled client
connectOptions.setPipeliningLimit(64);
SqlClient pooledClient = MySQLPool.client(vertx, connectOptions, poolOptions);

// Pipelined
Future<RowSet<Row>> res1 = pooledClient.query(sql).execute();

// Connection pool
MySQLPool pool = MySQLPool.pool(vertx, connectOptions, poolOptions);

// Not pipelined
Future<RowSet<Row>> res2 = pool.query(sql).execute();
}

public void connectWithUnixDomainSocket(Vertx vertx) {
// Connect Options
// Socket file name /var/run/mysqld/mysqld.sock
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (c) 2011-2021 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/

package io.vertx.mysqlclient;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* {@code MySQLBatchException} is thrown if an error occurs during executions when using {@link io.vertx.sqlclient.PreparedQuery#executeBatch(List)}.
* The client will try to execute with all the params no matter if one iteration of the executions fails, the iteration count is counted from zero.
*/
public class MySQLBatchException extends RuntimeException {
/**
* A mapping between the iteration count and errors, the key is consistent with the batching param list index.
*/
private final Map<Integer, Throwable> iterationError = new HashMap<>();

public MySQLBatchException() {
super("Error occurs during batch execution");
}

/**
* Get the detailed errors of all failed iterations in batching.
*
* @return the iteration count and error mapping
*/
public Map<Integer, Throwable> getIterationError() {
return iterationError;
}

public void reportError(int iteration, Throwable error) {
iterationError.put(iteration, error);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public static MySQLConnectOptions fromUri(String connectionUri) throws IllegalAr
public static final Map<String, String> DEFAULT_CONNECTION_ATTRIBUTES;
public static final SslMode DEFAULT_SSL_MODE = SslMode.DISABLED;
public static final String DEFAULT_CHARACTER_ENCODING = "UTF-8";
public static final int DEFAULT_PIPELINING_LIMIT = 1;

static {
Map<String, String> defaultAttributes = new HashMap<>();
Expand All @@ -79,6 +80,7 @@ public static MySQLConnectOptions fromUri(String connectionUri) throws IllegalAr
private String serverRsaPublicKeyPath;
private Buffer serverRsaPublicKeyValue;
private String characterEncoding = DEFAULT_CHARACTER_ENCODING;
private int pipeliningLimit = DEFAULT_PIPELINING_LIMIT;
private MySQLAuthenticationPlugin authenticationPlugin = MySQLAuthenticationPlugin.DEFAULT;

public MySQLConnectOptions() {
Expand Down Expand Up @@ -301,6 +303,29 @@ public Buffer getServerRsaPublicKeyValue() {
return serverRsaPublicKeyValue;
}

/**
* Get the pipelining limit count.
*
* @return the pipelining count
*/
public int getPipeliningLimit() {
return pipeliningLimit;
}

/**
* Set the pipelining limit count.
*
* @param pipeliningLimit the count to configure
* @return a reference to this, so the API can be used fluently
*/
public MySQLConnectOptions setPipeliningLimit(int pipeliningLimit) {
if (pipeliningLimit < 1) {
throw new IllegalArgumentException("pipelining limit can not be less than 1");
}
this.pipeliningLimit = pipeliningLimit;
return this;
}

@Override
public MySQLConnectOptions setHost(String host) {
return (MySQLConnectOptions) super.setHost(host);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
package io.vertx.mysqlclient;

/**
* A {@link RuntimeException} signals that an error occurred.
* {@code MySQLException} is the class representing that a MySQL error packet is received from the server,
* This usually signals that an error occurred during a connection establishment or command executions.
*/
public class MySQLException extends RuntimeException {
private final int errorCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.mysqlclient.impl.MySQLPoolOptions;
import io.vertx.mysqlclient.spi.MySQLDriver;
import io.vertx.sqlclient.Pool;
import io.vertx.sqlclient.PoolOptions;
import io.vertx.sqlclient.SqlClient;
import io.vertx.sqlclient.SqlConnection;

import java.util.Collections;
Expand Down Expand Up @@ -99,6 +101,72 @@ static MySQLPool pool(Vertx vertx, List<MySQLConnectOptions> databases, PoolOpti
return (MySQLPool) MySQLDriver.INSTANCE.createPool(vertx, databases, options);
}


/**
* Like {@link #client(String, PoolOptions)} with a default {@code poolOptions}.
*/
static SqlClient client(String connectionUri) {
return client(connectionUri, new PoolOptions());
}

/**
* Like {@link #client(MySQLConnectOptions, PoolOptions)} with {@code connectOptions} build from {@code connectionUri}.
*/
static SqlClient client(String connectionUri, PoolOptions poolOptions) {
return client(MySQLConnectOptions.fromUri(connectionUri), poolOptions);
}

/**
* Like {@link #client(Vertx, String,PoolOptions)} with a default {@code poolOptions}.
*/
static SqlClient client(Vertx vertx, String connectionUri) {
return client(vertx, MySQLConnectOptions.fromUri(connectionUri), new PoolOptions());
}

/**
* Like {@link #client(Vertx, MySQLConnectOptions, PoolOptions)} with {@code connectOptions} build from {@code connectionUri}.
*/
static SqlClient client(Vertx vertx, String connectionUri, PoolOptions poolOptions) {
return client(vertx, MySQLConnectOptions.fromUri(connectionUri), poolOptions);
}

/**
* Create a client backed by a connection pool to the database configured with the given {@code connectOptions} and {@code poolOptions}.
*
* @param poolOptions the options for creating the backing pool
* @return the client
*/
static SqlClient client(MySQLConnectOptions connectOptions, PoolOptions poolOptions) {
return client(null, Collections.singletonList(connectOptions), poolOptions);
}

/**
* Like {@link #client(MySQLConnectOptions, PoolOptions)} with a specific {@link Vertx} instance.
*/
static SqlClient client(Vertx vertx, MySQLConnectOptions connectOptions, PoolOptions poolOptions) {
return client(vertx, Collections.singletonList(connectOptions), poolOptions);
}

/**
* Like {@link #client(List, PoolOptions)} with a specific {@link Vertx} instance.
*/
static SqlClient client(Vertx vertx, List<MySQLConnectOptions> mySQLConnectOptions, PoolOptions options) {
return MySQLDriver.INSTANCE.createPool(vertx, mySQLConnectOptions, new MySQLPoolOptions(options).setPipelined(true));
}

/**
* Create a client backed by a connection pool to the MySQL {@code databases} with round-robin selection.
* Round-robin is applied when a new connection is created by the pool.
*
* @param databases the list of databases
* @param options the options for creating the pool
* @return the pooled client
*/
static SqlClient client(List<MySQLConnectOptions> databases, PoolOptions options) {
return client(null, databases, options);
}


@Override
MySQLPool connectHandler(Handler<SqlConnection> handler);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ public class MySQLConnectionFactory extends ConnectionFactoryBase {
private boolean useAffectedRows;
private SslMode sslMode;
private Buffer serverRsaPublicKey;
private int initialCapabilitiesFlags;
private int pipeliningLimit;
private MySQLAuthenticationPlugin authenticationPlugin;

public MySQLConnectionFactory(VertxInternal vertx, MySQLConnectOptions options) {
Expand Down Expand Up @@ -86,6 +88,8 @@ protected void initializeConfiguration(SqlConnectOptions connectOptions) {
}
}
this.serverRsaPublicKey = serverRsaPublicKey;
this.initialCapabilitiesFlags = initCapabilitiesFlags(database);
this.pipeliningLimit = options.getPipeliningLimit();

// check the SSLMode here
switch (sslMode) {
Expand All @@ -110,10 +114,9 @@ protected void configureNetClientOptions(NetClientOptions netClientOptions) {

@Override
protected Future<Connection> doConnectInternal(SocketAddress server, String username, String password, String database, EventLoopContext context) {
int initialCapabilitiesFlags = initCapabilitiesFlags(database);
Future<NetSocket> fut = netClient.connect(server);
return fut.flatMap(so -> {
MySQLSocketConnection conn = new MySQLSocketConnection((NetSocketInternal) so, cachePreparedStatements, preparedStatementCacheSize, preparedStatementCacheSqlFilter, context);
MySQLSocketConnection conn = new MySQLSocketConnection((NetSocketInternal) so, cachePreparedStatements, preparedStatementCacheSize, preparedStatementCacheSqlFilter, pipeliningLimit, context);
conn.init();
return Future.future(promise -> conn.sendStartupMessage(username, password, database, collation, serverRsaPublicKey, properties, sslMode, initialCapabilitiesFlags, charsetEncoding, authenticationPlugin, promise));
});
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (c) 2011-2022 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.mysqlclient.impl;

import io.vertx.sqlclient.PoolOptions;

public class MySQLPoolOptions extends PoolOptions {

public MySQLPoolOptions(PoolOptions other) {
super(other);
}

private boolean pipelined;

public boolean isPipelined() {
return pipelined;
}

public MySQLPoolOptions setPipelined(boolean pipelined) {
this.pipelined = pipelined;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ public MySQLSocketConnection(NetSocketInternal socket,
boolean cachePreparedStatements,
int preparedStatementCacheSize,
Predicate<String> preparedStatementCacheSqlFilter,
int pipeliningLimit,
EventLoopContext context) {
super(socket, cachePreparedStatements, preparedStatementCacheSize, preparedStatementCacheSqlFilter, 1, context);
super(socket, cachePreparedStatements, preparedStatementCacheSize, preparedStatementCacheSqlFilter, pipeliningLimit, context);
}

void sendStartupMessage(String username,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ protected final void handleAuthMoreData(byte[] password, ByteBuf payload) {
} else if (flag == FAST_AUTH_STATUS_FLAG) {
// fast auth success
} else {
completionHandler.handle(CommandResponse.failure(new UnsupportedOperationException("Unsupported flag for AuthMoreData : " + flag)));
encoder.handleCommandResponse(CommandResponse.failure(new UnsupportedOperationException("Unsupported flag for AuthMoreData : " + flag)));
}
}
}
Expand All @@ -87,7 +87,7 @@ protected final void sendEncryptedPasswordWithServerRsaPublicKey(byte[] password
byte[] passwordInput = Arrays.copyOf(password, password.length + 1); // need to append 0x00(NULL) to the password
encryptedPassword = RsaPublicKeyEncryptor.encrypt(passwordInput, authPluginData, serverRsaPublicKeyContent);
} catch (Exception e) {
completionHandler.handle(CommandResponse.failure(e));
encoder.handleCommandResponse(CommandResponse.failure(e));
return;
}
sendBytesAsPacket(encryptedPassword);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ void decodePayload(ByteBuf payload, int payloadLength) {
handleAuthMoreData(cmd.password().getBytes(StandardCharsets.UTF_8), payload);
break;
case OK_PACKET_HEADER:
completionHandler.handle(CommandResponse.success(null));
encoder.handleCommandResponse(CommandResponse.success(null));
break;
case ERROR_PACKET_HEADER:
handleErrorPacketPayload(payload);
Expand All @@ -74,7 +74,7 @@ private void handleAuthSwitchRequest(byte[] password, ByteBuf payload) {
authResponse = password;
break;
default:
completionHandler.handle(CommandResponse.failure(new UnsupportedOperationException("Unsupported authentication method: " + pluginName)));
encoder.handleCommandResponse(CommandResponse.failure(new UnsupportedOperationException("Unsupported authentication method: " + pluginName)));
return;
}
sendBytesAsPacket(authResponse);
Expand Down
Loading

0 comments on commit bef0b4b

Please sign in to comment.