Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for MySQL pipelining execution #1168

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions vertx-mysql-client/src/main/asciidoc/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ scalability and low overhead.
* Complete data type support
* Stored Procedures support
* TLS/SSL support
* Query pipelining
* MySQL utilities commands support
* Working with MySQL and MariaDB
* Rich collation and charset support
Expand Down Expand Up @@ -92,6 +93,18 @@ You can easily get one from the pool:

Once you are done with the connection you must close it to release it to the pool, so it can be reused.

== Pool versus pooled client

The {@link io.vertx.mysqlclient.MySQLPool} allows you to create a pool or a pooled client

[source,$lang]
----
{@link examples.MySQLClientExamples#poolVersusPooledClient}
----

- pool operations are not pipelined, only connection client are pipelined
- pooled client operations are pipelined

== Pool sharing

include::pool_sharing.adoc[]
Expand Down Expand Up @@ -665,6 +678,7 @@ You can also manage the lifecycle of prepared statements manually by creating a

There is time when you want to batch insert data into the database, you can use `PreparedQuery#executeBatch` which provides a simple API to handle this.
Keep in mind that MySQL does not natively support batching protocol so the API is only a sugar by executing the prepared statement one after another, which means more network round trips are required comparing to inserting multiple rows by executing one prepared statement with a list of values.
In order to gain best performance on the wire, we execute the batch in pipelining mode which means the execution request is sent before the response of previous request returns, if your server or proxy does not support this feature or you might not be interested in it, you can use the single execution API and compose them by yourself instead.

=== tricky DATE & TIME data types

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ public static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json,
obj.setCollation((String)member.getValue());
}
break;
case "pipeliningLimit":
if (member.getValue() instanceof Number) {
obj.setPipeliningLimit(((Number)member.getValue()).intValue());
}
break;
case "serverRsaPublicKeyPath":
if (member.getValue() instanceof String) {
obj.setServerRsaPublicKeyPath((String)member.getValue());
Expand Down Expand Up @@ -81,6 +86,7 @@ public static void toJson(MySQLConnectOptions obj, java.util.Map<String, Object>
if (obj.getCollation() != null) {
json.put("collation", obj.getCollation());
}
json.put("pipeliningLimit", obj.getPipeliningLimit());
if (obj.getServerRsaPublicKeyPath() != null) {
json.put("serverRsaPublicKeyPath", obj.getServerRsaPublicKeyPath());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

package examples;

import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
Expand Down Expand Up @@ -45,7 +46,7 @@ public void gettingStarted() {
.setMaxSize(5);

// Create the client pool
MySQLPool client = MySQLPool.pool(connectOptions, poolOptions);
SqlClient client = MySQLPool.client(connectOptions, poolOptions);

// A simple query
client
Expand Down Expand Up @@ -212,6 +213,22 @@ public void connecting04(Vertx vertx) {
});
}

public void poolVersusPooledClient(Vertx vertx, String sql, MySQLConnectOptions connectOptions, PoolOptions poolOptions) {

// Pooled client
connectOptions.setPipeliningLimit(64);
SqlClient pooledClient = MySQLPool.client(vertx, connectOptions, poolOptions);

// Pipelined
Future<RowSet<Row>> res1 = pooledClient.query(sql).execute();

// Connection pool
MySQLPool pool = MySQLPool.pool(vertx, connectOptions, poolOptions);

// Not pipelined
Future<RowSet<Row>> res2 = pool.query(sql).execute();
}

public void connectWithUnixDomainSocket(Vertx vertx) {
// Connect Options
// Socket file name /var/run/mysqld/mysqld.sock
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (c) 2011-2021 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/

package io.vertx.mysqlclient;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* {@code MySQLBatchException} is thrown if an error occurs during executions when using {@link io.vertx.sqlclient.PreparedQuery#executeBatch(List)}.
* The client will try to execute with all the params no matter if one iteration of the executions fails, the iteration count is calculated from zero.
*/
public class MySQLBatchException extends RuntimeException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you think about a way to combine this type with MySQLException in a hierarchy? It can be convenient for users to identity where the issue comes from when the exception bubbles up to upper layers.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about using a mapping like Map<Integer, MySQLException> to represent the errors in a batching process? I think MySQL needs this because it emulates batching by sending multiple requests and might receive multiple error responses which does not behave like Postgres client batching(one request & one response)

/**
* A mapping between the iteration count and error message.
*/
private final Map<Integer, String> iterationError = new HashMap<>();

public MySQLBatchException() {
super("Error occurs during batch execution");
}

public Map<Integer, String> getIterationError() {
vietj marked this conversation as resolved.
Show resolved Hide resolved
return iterationError;
}

public void reportError(int iteration, String errorMessage) {
iterationError.put(iteration, errorMessage);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public static MySQLConnectOptions fromUri(String connectionUri) throws IllegalAr
public static final Map<String, String> DEFAULT_CONNECTION_ATTRIBUTES;
public static final SslMode DEFAULT_SSL_MODE = SslMode.DISABLED;
public static final String DEFAULT_CHARACTER_ENCODING = "UTF-8";
public static final int DEFAULT_PIPELINING_LIMIT = 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not use the same value than PgConnectOptions which is 256 ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm assuming this is for backward compatibility ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I'm trying to avoid introducing breaking changes.


static {
Map<String, String> defaultAttributes = new HashMap<>();
Expand All @@ -79,6 +80,7 @@ public static MySQLConnectOptions fromUri(String connectionUri) throws IllegalAr
private String serverRsaPublicKeyPath;
private Buffer serverRsaPublicKeyValue;
private String characterEncoding = DEFAULT_CHARACTER_ENCODING;
private int pipeliningLimit = DEFAULT_PIPELINING_LIMIT;
private MySQLAuthenticationPlugin authenticationPlugin = MySQLAuthenticationPlugin.DEFAULT;

public MySQLConnectOptions() {
Expand Down Expand Up @@ -301,6 +303,29 @@ public Buffer getServerRsaPublicKeyValue() {
return serverRsaPublicKeyValue;
}

/**
* Get the pipelining limit count.
*
* @return the pipelining count
*/
public int getPipeliningLimit() {
return pipeliningLimit;
}

/**
* Set the pipelining limit count.
*
* @param pipeliningLimit the count to configure
* @return a reference to this, so the API can be used fluently
*/
public MySQLConnectOptions setPipeliningLimit(int pipeliningLimit) {
if (pipeliningLimit < 1) {
throw new IllegalArgumentException("pipelining limit can not be less than 1");
}
this.pipeliningLimit = pipeliningLimit;
return this;
}

@Override
public MySQLConnectOptions setHost(String host) {
return (MySQLConnectOptions) super.setHost(host);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
package io.vertx.mysqlclient;

/**
* A {@link RuntimeException} signals that an error occurred.
* {@code MySQLException} is the class representing that a MySQL error packet is received from the server,
* This usually signals that an error occurred during a connection establishment or command executions.
*/
public class MySQLException extends RuntimeException {
private final int errorCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.mysqlclient.impl.MySQLPoolOptions;
import io.vertx.mysqlclient.spi.MySQLDriver;
import io.vertx.sqlclient.Pool;
import io.vertx.sqlclient.PoolOptions;
import io.vertx.sqlclient.SqlClient;
import io.vertx.sqlclient.SqlConnection;

import java.util.Collections;
Expand Down Expand Up @@ -99,6 +101,72 @@ static MySQLPool pool(Vertx vertx, List<MySQLConnectOptions> databases, PoolOpti
return (MySQLPool) MySQLDriver.INSTANCE.createPool(vertx, databases, options);
}


/**
* Like {@link #client(String, PoolOptions)} with a default {@code poolOptions}.
*/
static SqlClient client(String connectionUri) {
return client(connectionUri, new PoolOptions());
}

/**
* Like {@link #client(MySQLConnectOptions, PoolOptions)} with {@code connectOptions} build from {@code connectionUri}.
*/
static SqlClient client(String connectionUri, PoolOptions poolOptions) {
return client(MySQLConnectOptions.fromUri(connectionUri), poolOptions);
}

/**
* Like {@link #client(Vertx, String,PoolOptions)} with a default {@code poolOptions}.
*/
static SqlClient client(Vertx vertx, String connectionUri) {
return client(vertx, MySQLConnectOptions.fromUri(connectionUri), new PoolOptions());
}

/**
* Like {@link #client(Vertx, MySQLConnectOptions, PoolOptions)} with {@code connectOptions} build from {@code connectionUri}.
*/
static SqlClient client(Vertx vertx, String connectionUri, PoolOptions poolOptions) {
return client(vertx, MySQLConnectOptions.fromUri(connectionUri), poolOptions);
}

/**
* Create a client backed by a connection pool to the database configured with the given {@code connectOptions} and {@code poolOptions}.
*
* @param poolOptions the options for creating the backing pool
* @return the client
*/
static SqlClient client(MySQLConnectOptions connectOptions, PoolOptions poolOptions) {
return client(null, Collections.singletonList(connectOptions), poolOptions);
}

/**
* Like {@link #client(MySQLConnectOptions, PoolOptions)} with a specific {@link Vertx} instance.
*/
static SqlClient client(Vertx vertx, MySQLConnectOptions connectOptions, PoolOptions poolOptions) {
return client(vertx, Collections.singletonList(connectOptions), poolOptions);
}

/**
* Like {@link #client(List, PoolOptions)} with a specific {@link Vertx} instance.
*/
static SqlClient client(Vertx vertx, List<MySQLConnectOptions> mySQLConnectOptions, PoolOptions options) {
return MySQLDriver.INSTANCE.createPool(vertx, mySQLConnectOptions, new MySQLPoolOptions(options).setPipelined(true));
}

/**
* Create a client backed by a connection pool to the MySQL {@code databases} with round-robin selection.
* Round-robin is applied when a new connection is created by the pool.
*
* @param databases the list of databases
* @param options the options for creating the pool
* @return the pooled client
*/
static SqlClient client(List<MySQLConnectOptions> databases, PoolOptions options) {
return client(null, databases, options);
}


@Override
MySQLPool connectHandler(Handler<SqlConnection> handler);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ public class MySQLConnectionFactory extends ConnectionFactoryBase {
private boolean useAffectedRows;
private SslMode sslMode;
private Buffer serverRsaPublicKey;
private int initialCapabilitiesFlags;
private int pipeliningLimit;
private MySQLAuthenticationPlugin authenticationPlugin;

public MySQLConnectionFactory(VertxInternal vertx, MySQLConnectOptions options) {
Expand Down Expand Up @@ -86,6 +88,8 @@ protected void initializeConfiguration(SqlConnectOptions connectOptions) {
}
}
this.serverRsaPublicKey = serverRsaPublicKey;
this.initialCapabilitiesFlags = initCapabilitiesFlags(database);
this.pipeliningLimit = options.getPipeliningLimit();

// check the SSLMode here
switch (sslMode) {
Expand All @@ -110,10 +114,9 @@ protected void configureNetClientOptions(NetClientOptions netClientOptions) {

@Override
protected Future<Connection> doConnectInternal(SocketAddress server, String username, String password, String database, EventLoopContext context) {
int initialCapabilitiesFlags = initCapabilitiesFlags(database);
Future<NetSocket> fut = netClient.connect(server);
return fut.flatMap(so -> {
MySQLSocketConnection conn = new MySQLSocketConnection((NetSocketInternal) so, cachePreparedStatements, preparedStatementCacheSize, preparedStatementCacheSqlFilter, context);
MySQLSocketConnection conn = new MySQLSocketConnection((NetSocketInternal) so, cachePreparedStatements, preparedStatementCacheSize, preparedStatementCacheSqlFilter, pipeliningLimit, context);
conn.init();
return Future.future(promise -> conn.sendStartupMessage(username, password, database, collation, serverRsaPublicKey, properties, sslMode, initialCapabilitiesFlags, charsetEncoding, authenticationPlugin, promise));
});
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (C) 2017 Julien Viet
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
vietj marked this conversation as resolved.
Show resolved Hide resolved
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.vertx.mysqlclient.impl;

import io.vertx.sqlclient.PoolOptions;

public class MySQLPoolOptions extends PoolOptions {

public MySQLPoolOptions(PoolOptions other) {
super(other);
}

private boolean pipelined;

public boolean isPipelined() {
return pipelined;
}

public MySQLPoolOptions setPipelined(boolean pipelined) {
this.pipelined = pipelined;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ public MySQLSocketConnection(NetSocketInternal socket,
boolean cachePreparedStatements,
int preparedStatementCacheSize,
Predicate<String> preparedStatementCacheSqlFilter,
int pipeliningLimit,
EventLoopContext context) {
super(socket, cachePreparedStatements, preparedStatementCacheSize, preparedStatementCacheSqlFilter, 1, context);
super(socket, cachePreparedStatements, preparedStatementCacheSize, preparedStatementCacheSqlFilter, pipeliningLimit, context);
}

void sendStartupMessage(String username,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ protected final void handleAuthMoreData(byte[] password, ByteBuf payload) {
} else if (flag == FAST_AUTH_STATUS_FLAG) {
// fast auth success
} else {
completionHandler.handle(CommandResponse.failure(new UnsupportedOperationException("Unsupported flag for AuthMoreData : " + flag)));
encoder.onCommandResponse(CommandResponse.failure(new UnsupportedOperationException("Unsupported flag for AuthMoreData : " + flag)));
}
}
}
Expand All @@ -87,7 +87,7 @@ protected final void sendEncryptedPasswordWithServerRsaPublicKey(byte[] password
byte[] passwordInput = Arrays.copyOf(password, password.length + 1); // need to append 0x00(NULL) to the password
encryptedPassword = RsaPublicKeyEncryptor.encrypt(passwordInput, authPluginData, serverRsaPublicKeyContent);
} catch (Exception e) {
completionHandler.handle(CommandResponse.failure(e));
encoder.onCommandResponse(CommandResponse.failure(e));
return;
}
sendBytesAsPacket(encryptedPassword);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ void decodePayload(ByteBuf payload, int payloadLength) {
handleAuthMoreData(cmd.password().getBytes(StandardCharsets.UTF_8), payload);
break;
case OK_PACKET_HEADER:
completionHandler.handle(CommandResponse.success(null));
encoder.onCommandResponse(CommandResponse.success(null));
break;
case ERROR_PACKET_HEADER:
handleErrorPacketPayload(payload);
Expand All @@ -74,7 +74,7 @@ private void handleAuthSwitchRequest(byte[] password, ByteBuf payload) {
authResponse = password;
break;
default:
completionHandler.handle(CommandResponse.failure(new UnsupportedOperationException("Unsupported authentication method: " + pluginName)));
encoder.onCommandResponse(CommandResponse.failure(new UnsupportedOperationException("Unsupported authentication method: " + pluginName)));
return;
}
sendBytesAsPacket(authResponse);
Expand Down
Loading