From 44ae768685f7ee4b8e19f14421cf893ba741b2b3 Mon Sep 17 00:00:00 2001 From: cherrylzhao Date: Sat, 1 Dec 2018 10:48:05 +0800 Subject: [PATCH 1/5] for #1238 Refactor currentSchema => schemaName. --- .../jdbc/connection/BackendConnection.java | 20 +++++++++---------- .../binary/prepare/ComStmtPreparePacket.java | 6 +++--- .../text/fieldlist/ComFieldListPacket.java | 8 ++++---- 3 files changed, 17 insertions(+), 17 deletions(-) diff --git a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/backend/jdbc/connection/BackendConnection.java b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/backend/jdbc/connection/BackendConnection.java index 7e70f73d76488..29106ca50a16c 100644 --- a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/backend/jdbc/connection/BackendConnection.java +++ b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/backend/jdbc/connection/BackendConnection.java @@ -54,10 +54,15 @@ public final class BackendConnection implements AutoCloseable { private static final int MAXIMUM_RETRY_COUNT = 5; - private volatile String currentSchema; + private volatile String schemaName; private LogicSchema logicSchema; + private TransactionType transactionType; + + @Setter + private ChannelHandlerContext context; + private final Multimap cachedConnections = LinkedHashMultimap.create(); private final Collection cachedStatements = new CopyOnWriteArrayList<>(); @@ -70,11 +75,6 @@ public final class BackendConnection implements AutoCloseable { private final ConnectionStateHandler stateHandler = new ConnectionStateHandler(resourceSynchronizer); - private TransactionType transactionType; - - @Setter - private ChannelHandlerContext context; - public BackendConnection(final TransactionType transactionType) { this.transactionType = transactionType; } @@ -94,14 +94,14 @@ public void setTransactionType(final TransactionType transactionType) { /** * Change logic schema of current channel. * - * @param currentSchema current schema + * @param schemaName current schema */ - public void setCurrentSchema(final String currentSchema) { + public void setCurrentSchema(final String schemaName) { if (isSwitchFailed()) { throw new ShardingException("Failed to switch schema, please terminate current transaction."); } - this.currentSchema = currentSchema; - this.logicSchema = GlobalRegistry.getInstance().getLogicSchema(currentSchema); + this.schemaName = schemaName; + this.logicSchema = GlobalRegistry.getInstance().getLogicSchema(schemaName); } @SneakyThrows diff --git a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/query/binary/prepare/ComStmtPreparePacket.java b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/query/binary/prepare/ComStmtPreparePacket.java index 8af6994804851..1b50f548905dc 100644 --- a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/query/binary/prepare/ComStmtPreparePacket.java +++ b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/query/binary/prepare/ComStmtPreparePacket.java @@ -53,7 +53,7 @@ public final class ComStmtPreparePacket implements CommandPacket { @Getter private final int sequenceId; - private final String currentSchema; + private final String schemaName; private final String sql; @@ -62,7 +62,7 @@ public final class ComStmtPreparePacket implements CommandPacket { public ComStmtPreparePacket(final int sequenceId, final BackendConnection backendConnection, final MySQLPacketPayload payload) { this.sequenceId = sequenceId; sql = payload.readStringEOF(); - currentSchema = backendConnection.getCurrentSchema(); + schemaName = backendConnection.getSchemaName(); LogicSchema logicSchema = backendConnection.getLogicSchema(); sqlParsingEngine = new SQLParsingEngine(DatabaseType.MySQL, sql, getShardingRule(logicSchema), logicSchema.getMetaData().getTable()); } @@ -86,7 +86,7 @@ public Optional execute() { new ComStmtPrepareOKPacket(++currentSequenceId, PREPARED_STATEMENT_REGISTRY.register(sql, parametersIndex), getNumColumns(sqlStatement), parametersIndex, 0)); for (int i = 0; i < parametersIndex; i++) { // TODO add column name - result.getPackets().add(new ColumnDefinition41Packet(++currentSequenceId, currentSchema, + result.getPackets().add(new ColumnDefinition41Packet(++currentSequenceId, schemaName, sqlStatement.getTables().isSingleTable() ? sqlStatement.getTables().getSingleTableName() : "", "", "", "", 100, ColumnType.MYSQL_TYPE_VARCHAR, 0)); } if (parametersIndex > 0) { diff --git a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/query/text/fieldlist/ComFieldListPacket.java b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/query/text/fieldlist/ComFieldListPacket.java index f8f5cd2c8f566..06f481bcee8eb 100644 --- a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/query/text/fieldlist/ComFieldListPacket.java +++ b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/query/text/fieldlist/ComFieldListPacket.java @@ -50,7 +50,7 @@ public final class ComFieldListPacket implements CommandPacket { @Getter private final int sequenceId; - private final String currentSchema; + private final String schemaName; private final String table; @@ -60,10 +60,10 @@ public final class ComFieldListPacket implements CommandPacket { public ComFieldListPacket(final int sequenceId, final int connectionId, final MySQLPacketPayload payload, final BackendConnection backendConnection) { this.sequenceId = sequenceId; - this.currentSchema = backendConnection.getCurrentSchema(); + this.schemaName = backendConnection.getSchemaName(); table = payload.readStringNul(); fieldWildcard = payload.readStringEOF(); - backendHandler = BackendHandlerFactory.newTextProtocolInstance(connectionId, sequenceId, String.format(SQL, table, currentSchema), backendConnection, DatabaseType.MySQL); + backendHandler = BackendHandlerFactory.newTextProtocolInstance(connectionId, sequenceId, String.format(SQL, table, schemaName), backendConnection, DatabaseType.MySQL); } @Override @@ -86,7 +86,7 @@ private CommandResponsePackets getColumnDefinition41Packets() throws SQLExceptio int currentSequenceId = 0; while (backendHandler.next()) { String columnName = backendHandler.getResultValue().getData().get(0).toString(); - result.getPackets().add(new ColumnDefinition41Packet(++currentSequenceId, currentSchema, table, table, columnName, columnName, 100, ColumnType.MYSQL_TYPE_VARCHAR, 0)); + result.getPackets().add(new ColumnDefinition41Packet(++currentSequenceId, schemaName, table, table, columnName, columnName, 100, ColumnType.MYSQL_TYPE_VARCHAR, 0)); } result.getPackets().add(new EofPacket(++currentSequenceId)); return result; From 7136e9f5732ef428b9a10ba4f29f21827f882a11 Mon Sep 17 00:00:00 2001 From: cherrylzhao Date: Sat, 1 Dec 2018 11:04:09 +0800 Subject: [PATCH 2/5] for #1238 Refactor BackendConnection.channelHandlerContext => frontendChannel. --- .../backend/jdbc/connection/BackendConnection.java | 4 ++-- .../shardingproxy/frontend/common/FrontendHandler.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/backend/jdbc/connection/BackendConnection.java b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/backend/jdbc/connection/BackendConnection.java index 29106ca50a16c..343a4101cdbc4 100644 --- a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/backend/jdbc/connection/BackendConnection.java +++ b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/backend/jdbc/connection/BackendConnection.java @@ -20,7 +20,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.LinkedHashMultimap; import com.google.common.collect.Multimap; -import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.Channel; import io.shardingsphere.core.constant.ConnectionMode; import io.shardingsphere.core.constant.transaction.TransactionType; import io.shardingsphere.core.exception.ShardingException; @@ -61,7 +61,7 @@ public final class BackendConnection implements AutoCloseable { private TransactionType transactionType; @Setter - private ChannelHandlerContext context; + private Channel frontendChannel; private final Multimap cachedConnections = LinkedHashMultimap.create(); diff --git a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/frontend/common/FrontendHandler.java b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/frontend/common/FrontendHandler.java index 01aabd93f1028..c4c3c0ebb37e3 100644 --- a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/frontend/common/FrontendHandler.java +++ b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/frontend/common/FrontendHandler.java @@ -40,7 +40,7 @@ public abstract class FrontendHandler extends ChannelInboundHandlerAdapter { @Override public final void channelActive(final ChannelHandlerContext context) { - backendConnection.setContext(context); + backendConnection.setFrontendChannel(context.channel()); ChannelThreadExecutorGroup.getInstance().register(context.channel().id()); handshake(context); } From c1514a4a9ed68098ac2b60476c979d076b4b3ecb Mon Sep 17 00:00:00 2001 From: cherrylzhao Date: Sat, 1 Dec 2018 11:32:33 +0800 Subject: [PATCH 3/5] for #1238 Make connectionId binding with BackendConnection. --- .../backend/BackendHandlerFactory.java | 67 +++++++++---------- .../SchemaBroadcastBackendHandler.java | 8 +-- .../backend/SchemaUnicastBackendHandler.java | 4 +- .../jdbc/connection/BackendConnection.java | 3 +- .../frontend/common/FrontendHandler.java | 1 - .../frontend/mysql/CommandExecutor.java | 4 +- .../frontend/mysql/MySQLFrontendHandler.java | 1 + .../packet/command/CommandPacketFactory.java | 9 ++- .../binary/execute/ComStmtExecutePacket.java | 4 +- .../text/fieldlist/ComFieldListPacket.java | 4 +- .../query/text/query/ComQueryPacket.java | 4 +- .../command/CommandPacketFactoryTest.java | 64 +++++++++--------- .../execute/ComStmtExecutePacketTest.java | 4 +- .../fieldlist/ComFieldListPacketTest.java | 6 +- .../query/text/query/ComQueryPacketTest.java | 8 +-- 15 files changed, 90 insertions(+), 101 deletions(-) diff --git a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/backend/BackendHandlerFactory.java b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/backend/BackendHandlerFactory.java index 5a9855ee65582..12b6e2b1219b5 100644 --- a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/backend/BackendHandlerFactory.java +++ b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/backend/BackendHandlerFactory.java @@ -53,68 +53,63 @@ public final class BackendHandlerFactory { private static final GlobalRegistry GLOBAL_REGISTRY = GlobalRegistry.getInstance(); /** - * Create new instance of text protocol backend handler. + * Create new instance by sql judge. * - * @param connectionId connection ID of database connected * @param sequenceId sequence ID of SQL packet * @param sql SQL to be executed * @param backendConnection backend connection * @param databaseType database type - * @return instance of text protocol backend handler + * @param frontendHandler frontend handler + * @return instance of backend handler */ - public static BackendHandler newTextProtocolInstance( - final int connectionId, final int sequenceId, final String sql, final BackendConnection backendConnection, final DatabaseType databaseType) { - LogicSchema logicSchema = backendConnection.getLogicSchema(); - return GLOBAL_REGISTRY.getShardingProperties().getValue(ShardingPropertiesConstant.PROXY_BACKEND_USE_NIO) - ? new NettyBackendHandler(logicSchema, connectionId, sequenceId, sql, databaseType) - : new JDBCBackendHandler(logicSchema, sql, new JDBCExecuteEngine(backendConnection, new StatementExecutorWrapper(logicSchema))); + public static BackendHandler createBackendHandler(final int sequenceId, final String sql, final BackendConnection backendConnection, final DatabaseType databaseType, final FrontendHandler frontendHandler) { + SQLStatement sqlStatement = new SQLJudgeEngine(sql).judge(); + if (SQLType.DCL == sqlStatement.getType() || sqlStatement instanceof SetStatement) { + return new SchemaBroadcastBackendHandler(sequenceId, sql, backendConnection, databaseType); + } + + if (sqlStatement instanceof UseStatement || sqlStatement instanceof ShowDatabasesStatement) { + return new SchemaIgnoreBackendHandler(sqlStatement, frontendHandler); + } + + if (sqlStatement instanceof ShowOtherStatement) { + return new SchemaUnicastBackendHandler(sequenceId, sql, backendConnection, DatabaseType.MySQL); + } + + return newTextProtocolInstance(sequenceId, sql, backendConnection, DatabaseType.MySQL); } /** * Create new instance of text protocol backend handler. * - * @param connectionId connection ID of database connected * @param sequenceId sequence ID of SQL packet * @param sql SQL to be executed - * @param parameters SQL parameters * @param backendConnection backend connection * @param databaseType database type * @return instance of text protocol backend handler */ - public static BackendHandler newBinaryProtocolInstance(final int connectionId, final int sequenceId, final String sql, final List parameters, - final BackendConnection backendConnection, final DatabaseType databaseType) { + public static BackendHandler newTextProtocolInstance(final int sequenceId, final String sql, final BackendConnection backendConnection, final DatabaseType databaseType) { LogicSchema logicSchema = backendConnection.getLogicSchema(); return GLOBAL_REGISTRY.getShardingProperties().getValue(ShardingPropertiesConstant.PROXY_BACKEND_USE_NIO) - ? new NettyBackendHandler(logicSchema, connectionId, sequenceId, sql, databaseType) - : new JDBCBackendHandler(logicSchema, sql, new JDBCExecuteEngine(backendConnection, new PreparedStatementExecutorWrapper(logicSchema, parameters))); + ? new NettyBackendHandler(logicSchema, backendConnection.getConnectionId(), sequenceId, sql, databaseType) + : new JDBCBackendHandler(logicSchema, sql, new JDBCExecuteEngine(backendConnection, new StatementExecutorWrapper(logicSchema))); } /** - * Create new instance by sql judge. - * - * @param connectionId connection ID of database connected + * Create new instance of text protocol backend handler. + * * @param sequenceId sequence ID of SQL packet * @param sql SQL to be executed + * @param parameters SQL parameters * @param backendConnection backend connection * @param databaseType database type - * @param frontendHandler frontend handler - * @return instance of backend handler + * @return instance of text protocol backend handler */ - public static BackendHandler createBackendHandler( - final int connectionId, final int sequenceId, final String sql, final BackendConnection backendConnection, final DatabaseType databaseType, final FrontendHandler frontendHandler) { - SQLStatement sqlStatement = new SQLJudgeEngine(sql).judge(); - if (SQLType.DCL == sqlStatement.getType() || sqlStatement instanceof SetStatement) { - return new SchemaBroadcastBackendHandler(connectionId, sequenceId, sql, backendConnection, databaseType); - } - - if (sqlStatement instanceof UseStatement || sqlStatement instanceof ShowDatabasesStatement) { - return new SchemaIgnoreBackendHandler(sqlStatement, frontendHandler); - } - - if (sqlStatement instanceof ShowOtherStatement) { - return new SchemaUnicastBackendHandler(connectionId, sequenceId, sql, backendConnection, DatabaseType.MySQL); - } - - return newTextProtocolInstance(connectionId, sequenceId, sql, backendConnection, DatabaseType.MySQL); + public static BackendHandler newBinaryProtocolInstance(final int sequenceId, final String sql, final List parameters, + final BackendConnection backendConnection, final DatabaseType databaseType) { + LogicSchema logicSchema = backendConnection.getLogicSchema(); + return GLOBAL_REGISTRY.getShardingProperties().getValue(ShardingPropertiesConstant.PROXY_BACKEND_USE_NIO) + ? new NettyBackendHandler(logicSchema, backendConnection.getConnectionId(), sequenceId, sql, databaseType) + : new JDBCBackendHandler(logicSchema, sql, new JDBCExecuteEngine(backendConnection, new PreparedStatementExecutorWrapper(logicSchema, parameters))); } } diff --git a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/backend/SchemaBroadcastBackendHandler.java b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/backend/SchemaBroadcastBackendHandler.java index e28d18f38db6b..2ab3dbc8f04bb 100644 --- a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/backend/SchemaBroadcastBackendHandler.java +++ b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/backend/SchemaBroadcastBackendHandler.java @@ -37,8 +37,6 @@ @RequiredArgsConstructor public final class SchemaBroadcastBackendHandler implements BackendHandler { - private final int connectionId; - private final int sequenceId; private final String sql; @@ -50,9 +48,9 @@ public final class SchemaBroadcastBackendHandler implements BackendHandler { @Override public CommandResponsePackets execute() { List packets = new LinkedList<>(); - for (String schema : GlobalRegistry.getInstance().getSchemaNames()) { - backendConnection.setCurrentSchema(schema); - BackendHandler backendHandler = BackendHandlerFactory.newTextProtocolInstance(connectionId, sequenceId, sql, backendConnection, databaseType); + for (String each : GlobalRegistry.getInstance().getSchemaNames()) { + backendConnection.setCurrentSchema(each); + BackendHandler backendHandler = BackendHandlerFactory.newTextProtocolInstance(sequenceId, sql, backendConnection, databaseType); CommandResponsePackets commandResponsePackets = backendHandler.execute(); packets.addAll(commandResponsePackets.getPackets()); } diff --git a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/backend/SchemaUnicastBackendHandler.java b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/backend/SchemaUnicastBackendHandler.java index b8f9a195a2449..d5e62b3f6b6b9 100644 --- a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/backend/SchemaUnicastBackendHandler.java +++ b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/backend/SchemaUnicastBackendHandler.java @@ -32,8 +32,8 @@ public final class SchemaUnicastBackendHandler implements BackendHandler { private final BackendHandler backendHandler; - public SchemaUnicastBackendHandler(final int connectionId, final int sequenceId, final String sql, final BackendConnection backendConnection, final DatabaseType databaseType) { - backendHandler = BackendHandlerFactory.newTextProtocolInstance(connectionId, sequenceId, sql, backendConnection, databaseType); + public SchemaUnicastBackendHandler(final int sequenceId, final String sql, final BackendConnection backendConnection, final DatabaseType databaseType) { + backendHandler = BackendHandlerFactory.newTextProtocolInstance(sequenceId, sql, backendConnection, databaseType); } @Override diff --git a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/backend/jdbc/connection/BackendConnection.java b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/backend/jdbc/connection/BackendConnection.java index 343a4101cdbc4..d58f87657ad04 100644 --- a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/backend/jdbc/connection/BackendConnection.java +++ b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/backend/jdbc/connection/BackendConnection.java @@ -20,7 +20,6 @@ import com.google.common.base.Preconditions; import com.google.common.collect.LinkedHashMultimap; import com.google.common.collect.Multimap; -import io.netty.channel.Channel; import io.shardingsphere.core.constant.ConnectionMode; import io.shardingsphere.core.constant.transaction.TransactionType; import io.shardingsphere.core.exception.ShardingException; @@ -61,7 +60,7 @@ public final class BackendConnection implements AutoCloseable { private TransactionType transactionType; @Setter - private Channel frontendChannel; + private int connectionId; private final Multimap cachedConnections = LinkedHashMultimap.create(); diff --git a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/frontend/common/FrontendHandler.java b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/frontend/common/FrontendHandler.java index c4c3c0ebb37e3..96935a5858c2b 100644 --- a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/frontend/common/FrontendHandler.java +++ b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/frontend/common/FrontendHandler.java @@ -40,7 +40,6 @@ public abstract class FrontendHandler extends ChannelInboundHandlerAdapter { @Override public final void channelActive(final ChannelHandlerContext context) { - backendConnection.setFrontendChannel(context.channel()); ChannelThreadExecutorGroup.getInstance().register(context.channel().id()); handshake(context); } diff --git a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/frontend/mysql/CommandExecutor.java b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/frontend/mysql/CommandExecutor.java index 8c72ab7c08674..18c560b3b52da 100644 --- a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/frontend/mysql/CommandExecutor.java +++ b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/frontend/mysql/CommandExecutor.java @@ -22,7 +22,6 @@ import io.netty.channel.ChannelHandlerContext; import io.shardingsphere.shardingproxy.backend.jdbc.connection.BackendConnection; import io.shardingsphere.shardingproxy.frontend.common.FrontendHandler; -import io.shardingsphere.shardingproxy.runtime.ChannelRegistry; import io.shardingsphere.shardingproxy.transport.common.packet.DatabasePacket; import io.shardingsphere.shardingproxy.transport.mysql.constant.ServerErrorCode; import io.shardingsphere.shardingproxy.transport.mysql.packet.MySQLPacketPayload; @@ -90,8 +89,7 @@ public void run() { private CommandPacket getCommandPacket(final MySQLPacketPayload payload, final BackendConnection backendConnection, final FrontendHandler frontendHandler) throws SQLException { int sequenceId = payload.readInt1(); - int connectionId = ChannelRegistry.getInstance().getConnectionId(context.channel().id().asShortText()); - return CommandPacketFactory.newInstance(sequenceId, connectionId, payload, backendConnection, frontendHandler); + return CommandPacketFactory.newInstance(sequenceId, payload, backendConnection, frontendHandler); } private void writeMoreResults(final QueryCommandPacket queryCommandPacket, final int headPacketsCount) throws SQLException { diff --git a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/frontend/mysql/MySQLFrontendHandler.java b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/frontend/mysql/MySQLFrontendHandler.java index 20b30505d0049..37d5dfda3311e 100644 --- a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/frontend/mysql/MySQLFrontendHandler.java +++ b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/frontend/mysql/MySQLFrontendHandler.java @@ -54,6 +54,7 @@ public final class MySQLFrontendHandler extends FrontendHandler { protected void handshake(final ChannelHandlerContext context) { int connectionId = ConnectionIdGenerator.getInstance().nextId(); ChannelRegistry.getInstance().putConnectionId(context.channel().id().asShortText(), connectionId); + getBackendConnection().setConnectionId(connectionId); context.writeAndFlush(new HandshakePacket(connectionId, authenticationHandler.getAuthPluginData())); } diff --git a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/CommandPacketFactory.java b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/CommandPacketFactory.java index 2cf9c8b4afdda..466326eb1ad1d 100644 --- a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/CommandPacketFactory.java +++ b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/CommandPacketFactory.java @@ -48,14 +48,13 @@ public final class CommandPacketFactory { * Create new instance of command packet. * * @param sequenceId sequence id - * @param connectionId MySQL connection id * @param payload MySQL packet payload * @param backendConnection backend connection * @param frontendHandler frontend handler * @return command packet * @throws SQLException SQL exception */ - public static CommandPacket newInstance(final int sequenceId, final int connectionId, final MySQLPacketPayload payload, + public static CommandPacket newInstance(final int sequenceId, final MySQLPacketPayload payload, final BackendConnection backendConnection, final FrontendHandler frontendHandler) throws SQLException { int commandPacketTypeValue = payload.readInt1(); CommandPacketType type = CommandPacketType.valueOf(commandPacketTypeValue); @@ -65,13 +64,13 @@ public static CommandPacket newInstance(final int sequenceId, final int connecti case COM_INIT_DB: return new ComInitDbPacket(sequenceId, payload, frontendHandler); case COM_FIELD_LIST: - return new ComFieldListPacket(sequenceId, connectionId, payload, backendConnection); + return new ComFieldListPacket(sequenceId, payload, backendConnection); case COM_QUERY: - return new ComQueryPacket(sequenceId, connectionId, payload, backendConnection, frontendHandler); + return new ComQueryPacket(sequenceId, payload, backendConnection, frontendHandler); case COM_STMT_PREPARE: return new ComStmtPreparePacket(sequenceId, backendConnection, payload); case COM_STMT_EXECUTE: - return new ComStmtExecutePacket(sequenceId, connectionId, payload, backendConnection); + return new ComStmtExecutePacket(sequenceId, payload, backendConnection); case COM_STMT_CLOSE: return new ComStmtClosePacket(sequenceId, payload); case COM_PING: diff --git a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/query/binary/execute/ComStmtExecutePacket.java b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/query/binary/execute/ComStmtExecutePacket.java index e7743bca2b5ee..b711d288a9dcc 100644 --- a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/query/binary/execute/ComStmtExecutePacket.java +++ b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/query/binary/execute/ComStmtExecutePacket.java @@ -77,7 +77,7 @@ public final class ComStmtExecutePacket implements QueryCommandPacket { private final BackendHandler backendHandler; public ComStmtExecutePacket( - final int sequenceId, final int connectionId, final MySQLPacketPayload payload, final BackendConnection backendConnection) throws SQLException { + final int sequenceId, final MySQLPacketPayload payload, final BackendConnection backendConnection) throws SQLException { this.sequenceId = sequenceId; statementId = payload.readInt4(); binaryStatement = BinaryStatementRegistry.getInstance().getBinaryStatement(statementId); @@ -93,7 +93,7 @@ public ComStmtExecutePacket( binaryStatement.setParameterTypes(getParameterTypes(payload, parametersCount)); } parameters = getParameters(payload, parametersCount); - backendHandler = BackendHandlerFactory.newBinaryProtocolInstance(connectionId, sequenceId, binaryStatement.getSql(), parameters, backendConnection, DatabaseType.MySQL); + backendHandler = BackendHandlerFactory.newBinaryProtocolInstance(sequenceId, binaryStatement.getSql(), parameters, backendConnection, DatabaseType.MySQL); } private List getParameterTypes(final MySQLPacketPayload payload, final int parametersCount) { diff --git a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/query/text/fieldlist/ComFieldListPacket.java b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/query/text/fieldlist/ComFieldListPacket.java index 06f481bcee8eb..a360a853dc472 100644 --- a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/query/text/fieldlist/ComFieldListPacket.java +++ b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/query/text/fieldlist/ComFieldListPacket.java @@ -58,12 +58,12 @@ public final class ComFieldListPacket implements CommandPacket { private final BackendHandler backendHandler; - public ComFieldListPacket(final int sequenceId, final int connectionId, final MySQLPacketPayload payload, final BackendConnection backendConnection) { + public ComFieldListPacket(final int sequenceId, final MySQLPacketPayload payload, final BackendConnection backendConnection) { this.sequenceId = sequenceId; this.schemaName = backendConnection.getSchemaName(); table = payload.readStringNul(); fieldWildcard = payload.readStringEOF(); - backendHandler = BackendHandlerFactory.newTextProtocolInstance(connectionId, sequenceId, String.format(SQL, table, schemaName), backendConnection, DatabaseType.MySQL); + backendHandler = BackendHandlerFactory.newTextProtocolInstance(sequenceId, String.format(SQL, table, schemaName), backendConnection, DatabaseType.MySQL); } @Override diff --git a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/query/text/query/ComQueryPacket.java b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/query/text/query/ComQueryPacket.java index 24e3cf955bb84..68e796b20f490 100644 --- a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/query/text/query/ComQueryPacket.java +++ b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/query/text/query/ComQueryPacket.java @@ -62,10 +62,10 @@ public final class ComQueryPacket implements QueryCommandPacket { private final BackendTransactionManager backendTransactionManager; - public ComQueryPacket(final int sequenceId, final int connectionId, final MySQLPacketPayload payload, final BackendConnection backendConnection, final FrontendHandler frontendHandler) { + public ComQueryPacket(final int sequenceId, final MySQLPacketPayload payload, final BackendConnection backendConnection, final FrontendHandler frontendHandler) { this.sequenceId = sequenceId; sql = payload.readStringEOF(); - backendHandler = BackendHandlerFactory.createBackendHandler(connectionId, sequenceId, sql, backendConnection, DatabaseType.MySQL, frontendHandler); + backendHandler = BackendHandlerFactory.createBackendHandler(sequenceId, sql, backendConnection, DatabaseType.MySQL, frontendHandler); backendTransactionManager = new BackendTransactionManager(backendConnection); } diff --git a/sharding-proxy/src/test/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/CommandPacketFactoryTest.java b/sharding-proxy/src/test/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/CommandPacketFactoryTest.java index b69bd544e8fa9..c08da2ec186fa 100644 --- a/sharding-proxy/src/test/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/CommandPacketFactoryTest.java +++ b/sharding-proxy/src/test/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/CommandPacketFactoryTest.java @@ -97,32 +97,32 @@ private void setMaxConnectionsSizePerQuery() throws ReflectiveOperationException @Test public void assertNewInstanceWithComQuitPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_QUIT.getValue()); - assertThat(CommandPacketFactory.newInstance(1, 1000, payload, backendConnection, frontendHandler), instanceOf(ComQuitPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(ComQuitPacket.class)); } @Test public void assertNewInstanceWithComInitDbPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_INIT_DB.getValue()); - assertThat(CommandPacketFactory.newInstance(1, 1000, payload, backendConnection, frontendHandler), instanceOf(ComInitDbPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(ComInitDbPacket.class)); } @Test public void assertNewInstanceWithComFieldListPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_FIELD_LIST.getValue()); - assertThat(CommandPacketFactory.newInstance(1, 1000, payload, backendConnection, frontendHandler), instanceOf(ComFieldListPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(ComFieldListPacket.class)); } @Test public void assertNewInstanceWithComQueryPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_QUERY.getValue()); when(payload.readStringEOF()).thenReturn("SHOW TABLES"); - assertThat(CommandPacketFactory.newInstance(1, 1000, payload, backendConnection, frontendHandler), instanceOf(ComQueryPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(ComQueryPacket.class)); } @Test public void assertNewInstanceWithComStmtPreparePacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_STMT_PREPARE.getValue()); - assertThat(CommandPacketFactory.newInstance(1, 1000, payload, backendConnection, frontendHandler), instanceOf(ComStmtPreparePacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(ComStmtPreparePacket.class)); } @Test @@ -130,162 +130,162 @@ public void assertNewInstanceWithComStmtExecutePacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_STMT_EXECUTE.getValue(), NewParametersBoundFlag.PARAMETER_TYPE_EXIST.getValue()); when(payload.readInt4()).thenReturn(1); BinaryStatementRegistry.getInstance().register("SELECT * FROM t_order", 1); - assertThat(CommandPacketFactory.newInstance(1, 1000, payload, backendConnection, frontendHandler), instanceOf(ComStmtExecutePacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(ComStmtExecutePacket.class)); } @Test public void assertNewInstanceWithComStmtClosePacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_STMT_CLOSE.getValue()); - assertThat(CommandPacketFactory.newInstance(1, 1000, payload, backendConnection, frontendHandler), instanceOf(ComStmtClosePacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(ComStmtClosePacket.class)); } @Test public void assertNewInstanceWithComPingPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_PING.getValue()); - assertThat(CommandPacketFactory.newInstance(1, 1000, payload, backendConnection, frontendHandler), instanceOf(ComPingPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(ComPingPacket.class)); } @Test public void assertNewInstanceWithComSleepPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_SLEEP.getValue()); - assertThat(CommandPacketFactory.newInstance(1, 1000, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); } @Test public void assertNewInstanceWithComCreateDbPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_CREATE_DB.getValue()); - assertThat(CommandPacketFactory.newInstance(1, 1000, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); } @Test public void assertNewInstanceWithComDropDbPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_DROP_DB.getValue()); - assertThat(CommandPacketFactory.newInstance(1, 1000, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); } @Test public void assertNewInstanceWithComRefreshPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_REFRESH.getValue()); - assertThat(CommandPacketFactory.newInstance(1, 1000, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); } @Test public void assertNewInstanceWithComShutDownPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_SHUTDOWN.getValue()); - assertThat(CommandPacketFactory.newInstance(1, 1000, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); } @Test public void assertNewInstanceWithComStatisticsPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_STATISTICS.getValue()); - assertThat(CommandPacketFactory.newInstance(1, 1000, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); } @Test public void assertNewInstanceWithComProcessInfoPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_PROCESS_INFO.getValue()); - assertThat(CommandPacketFactory.newInstance(1, 1000, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); } @Test public void assertNewInstanceWithComConnectPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_CONNECT.getValue()); - assertThat(CommandPacketFactory.newInstance(1, 1000, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); } @Test public void assertNewInstanceWithComProcessKillPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_PROCESS_KILL.getValue()); - assertThat(CommandPacketFactory.newInstance(1, 1000, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); } @Test public void assertNewInstanceWithComDebugPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_DEBUG.getValue()); - assertThat(CommandPacketFactory.newInstance(1, 1000, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); } @Test public void assertNewInstanceWithComTimePacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_TIME.getValue()); - assertThat(CommandPacketFactory.newInstance(1, 1000, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); } @Test public void assertNewInstanceWithComDelayedInsertPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_DELAYED_INSERT.getValue()); - assertThat(CommandPacketFactory.newInstance(1, 1000, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); } @Test public void assertNewInstanceWithComChangeUserPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_CHANGE_USER.getValue()); - assertThat(CommandPacketFactory.newInstance(1, 1000, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); } @Test public void assertNewInstanceWithComBinlogDumpPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_BINLOG_DUMP.getValue()); - assertThat(CommandPacketFactory.newInstance(1, 1000, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); } @Test public void assertNewInstanceWithComTableDumpPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_TABLE_DUMP.getValue()); - assertThat(CommandPacketFactory.newInstance(1, 1000, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); } @Test public void assertNewInstanceWithComConnectOutPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_CONNECT_OUT.getValue()); - assertThat(CommandPacketFactory.newInstance(1, 1000, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); } @Test public void assertNewInstanceWithComRegisterSlavePacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_REGISTER_SLAVE.getValue()); - assertThat(CommandPacketFactory.newInstance(1, 1000, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); } @Test public void assertNewInstanceWithComStmtSendLongDataPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_STMT_SEND_LONG_DATA.getValue()); - assertThat(CommandPacketFactory.newInstance(1, 1000, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); } @Test public void assertNewInstanceWithComStmtResetPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_STMT_RESET.getValue()); - assertThat(CommandPacketFactory.newInstance(1, 1000, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); } @Test public void assertNewInstanceWithComSetOptionPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_SET_OPTION.getValue()); - assertThat(CommandPacketFactory.newInstance(1, 1000, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); } @Test public void assertNewInstanceWithComStmtFetchPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_STMT_FETCH.getValue()); - assertThat(CommandPacketFactory.newInstance(1, 1000, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); } @Test public void assertNewInstanceWithComDaemonPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_DAEMON.getValue()); - assertThat(CommandPacketFactory.newInstance(1, 1000, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); } @Test public void assertNewInstanceWithComBinlogDumpGTIDPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_BINLOG_DUMP_GTID.getValue()); - assertThat(CommandPacketFactory.newInstance(1, 1000, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); } @Test public void assertNewInstanceWithComResetConnectionPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_RESET_CONNECTION.getValue()); - assertThat(CommandPacketFactory.newInstance(1, 1000, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); } } diff --git a/sharding-proxy/src/test/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/query/binary/execute/ComStmtExecutePacketTest.java b/sharding-proxy/src/test/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/query/binary/execute/ComStmtExecutePacketTest.java index b4fbf060edba1..d88919b53bd83 100644 --- a/sharding-proxy/src/test/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/query/binary/execute/ComStmtExecutePacketTest.java +++ b/sharding-proxy/src/test/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/query/binary/execute/ComStmtExecutePacketTest.java @@ -68,7 +68,7 @@ public void assertWrite() throws SQLException { BinaryStatementRegistry.getInstance().register("SELECT id FROM tbl WHERE id=?", 1); when(payload.readInt4()).thenReturn(1); when(payload.readInt1()).thenReturn(0, 1); - ComStmtExecutePacket actual = new ComStmtExecutePacket(1, 1000, payload, backendConnection); + ComStmtExecutePacket actual = new ComStmtExecutePacket(1, payload, backendConnection); assertThat(actual.getSequenceId(), is(1)); actual.write(payload); verify(payload, times(2)).writeInt4(1); @@ -87,7 +87,7 @@ public void assertExecute() throws SQLException { when(backendHandler.execute()).thenReturn(expectedCommandResponsePackets); when(backendHandler.next()).thenReturn(true, false); when(backendHandler.getResultValue()).thenReturn(new ResultPacket(2, Collections.singletonList(99999L), 1, Collections.singletonList(ColumnType.MYSQL_TYPE_LONG))); - ComStmtExecutePacket packet = new ComStmtExecutePacket(1, 1000, payload, backendConnection); + ComStmtExecutePacket packet = new ComStmtExecutePacket(1, payload, backendConnection); setBackendHandler(packet, backendHandler); Optional actualCommandResponsePackets = packet.execute(); assertTrue(actualCommandResponsePackets.isPresent()); diff --git a/sharding-proxy/src/test/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/query/text/fieldlist/ComFieldListPacketTest.java b/sharding-proxy/src/test/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/query/text/fieldlist/ComFieldListPacketTest.java index 7ad26b68aae04..60db6ad1c78ee 100644 --- a/sharding-proxy/src/test/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/query/text/fieldlist/ComFieldListPacketTest.java +++ b/sharding-proxy/src/test/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/query/text/fieldlist/ComFieldListPacketTest.java @@ -82,7 +82,7 @@ private void setMaxConnectionsSizePerQuery() throws ReflectiveOperationException public void assertWrite() { when(payload.readStringNul()).thenReturn("tbl"); when(payload.readStringEOF()).thenReturn("-"); - ComFieldListPacket actual = new ComFieldListPacket(1, 1000, payload, backendConnection); + ComFieldListPacket actual = new ComFieldListPacket(1, payload, backendConnection); assertThat(actual.getSequenceId(), is(1)); actual.write(payload); verify(payload).writeInt1(CommandPacketType.COM_FIELD_LIST.getValue()); @@ -97,7 +97,7 @@ public void assertExecuteWhenSuccess() throws SQLException { when(backendHandler.next()).thenReturn(true, false); when(backendHandler.getResultValue()).thenReturn(new ResultPacket(1, Collections.singletonList("id"), 1, Collections.singletonList(ColumnType.MYSQL_TYPE_VARCHAR))); when(backendHandler.execute()).thenReturn(new CommandResponsePackets(new FieldCountPacket(1, 1))); - ComFieldListPacket packet = new ComFieldListPacket(1, 1000, payload, backendConnection); + ComFieldListPacket packet = new ComFieldListPacket(1, payload, backendConnection); setBackendHandler(packet); Optional actual = packet.execute(); assertTrue(actual.isPresent()); @@ -123,7 +123,7 @@ public void assertExecuteWhenFailure() throws SQLException { when(payload.readStringEOF()).thenReturn("-"); CommandResponsePackets expected = new CommandResponsePackets(new ErrPacket(1, ServerErrorCode.ER_STD_UNKNOWN_EXCEPTION, "unknown")); when(backendHandler.execute()).thenReturn(expected); - ComFieldListPacket packet = new ComFieldListPacket(1, 1000, payload, backendConnection); + ComFieldListPacket packet = new ComFieldListPacket(1, payload, backendConnection); setBackendHandler(packet); Optional actual = packet.execute(); assertTrue(actual.isPresent()); diff --git a/sharding-proxy/src/test/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/query/text/query/ComQueryPacketTest.java b/sharding-proxy/src/test/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/query/text/query/ComQueryPacketTest.java index a0df0b2122d86..7bb7a7506a379 100644 --- a/sharding-proxy/src/test/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/query/text/query/ComQueryPacketTest.java +++ b/sharding-proxy/src/test/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/query/text/query/ComQueryPacketTest.java @@ -132,7 +132,7 @@ public void assertExecuteWithoutTransaction() throws SQLException { when(backendHandler.execute()).thenReturn(new CommandResponsePackets(expectedFieldCountPacket)); when(backendHandler.next()).thenReturn(true, false); when(backendHandler.getResultValue()).thenReturn(new ResultPacket(2, Collections.singletonList(99999L), 1, Collections.singletonList(ColumnType.MYSQL_TYPE_LONG))); - ComQueryPacket packet = new ComQueryPacket(1, 1000, payload, backendConnection, frontendHandler); + ComQueryPacket packet = new ComQueryPacket(1, payload, backendConnection, frontendHandler); setBackendHandler(packet, backendHandler); Optional actual = packet.execute(); assertTrue(actual.isPresent()); @@ -154,7 +154,7 @@ private void setBackendHandler(final ComQueryPacket packet, final BackendHandler @Test public void assertExecuteTCLWithLocalTransaction() { when(payload.readStringEOF()).thenReturn("COMMIT"); - ComQueryPacket packet = new ComQueryPacket(1, 1000, payload, backendConnection, frontendHandler); + ComQueryPacket packet = new ComQueryPacket(1, payload, backendConnection, frontendHandler); backendConnection.getStateHandler().getAndSetStatus(ConnectionStatus.TRANSACTION); Optional actual = packet.execute(); assertTrue(actual.isPresent()); @@ -165,7 +165,7 @@ public void assertExecuteTCLWithLocalTransaction() { public void assertExecuteTCLWithXATransaction() { backendConnection.setTransactionType(TransactionType.XA); when(payload.readStringEOF()).thenReturn("ROLLBACK"); - ComQueryPacket packet = new ComQueryPacket(1, 1000, payload, backendConnection, frontendHandler); + ComQueryPacket packet = new ComQueryPacket(1, payload, backendConnection, frontendHandler); backendConnection.getStateHandler().getAndSetStatus(ConnectionStatus.TRANSACTION); Optional actual = packet.execute(); assertTrue(actual.isPresent()); @@ -177,7 +177,7 @@ public void assertExecuteTCLWithXATransaction() { public void assertExecuteRollbackWithXATransaction() { backendConnection.setTransactionType(TransactionType.XA); when(payload.readStringEOF()).thenReturn("COMMIT"); - ComQueryPacket packet = new ComQueryPacket(1, 1000, payload, backendConnection, frontendHandler); + ComQueryPacket packet = new ComQueryPacket(1, payload, backendConnection, frontendHandler); backendConnection.getStateHandler().getAndSetStatus(ConnectionStatus.TRANSACTION); Optional actual = packet.execute(); assertTrue(actual.isPresent()); From a85ab98ec658f0bf83389a0e2da85fea855b8ed0 Mon Sep 17 00:00:00 2001 From: cherrylzhao Date: Mon, 3 Dec 2018 12:11:04 +0800 Subject: [PATCH 4/5] for #1238 for code review. --- .../shardingproxy/backend/BackendHandlerFactory.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/backend/BackendHandlerFactory.java b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/backend/BackendHandlerFactory.java index 12b6e2b1219b5..138ad7429da74 100644 --- a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/backend/BackendHandlerFactory.java +++ b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/backend/BackendHandlerFactory.java @@ -53,7 +53,7 @@ public final class BackendHandlerFactory { private static final GlobalRegistry GLOBAL_REGISTRY = GlobalRegistry.getInstance(); /** - * Create new instance by sql judge. + * Create new com query backend handler instance by SQl judge. * * @param sequenceId sequence ID of SQL packet * @param sql SQL to be executed @@ -62,20 +62,18 @@ public final class BackendHandlerFactory { * @param frontendHandler frontend handler * @return instance of backend handler */ - public static BackendHandler createBackendHandler(final int sequenceId, final String sql, final BackendConnection backendConnection, final DatabaseType databaseType, final FrontendHandler frontendHandler) { + public static BackendHandler createBackendHandler(final int sequenceId, final String sql, final BackendConnection backendConnection, + final DatabaseType databaseType, final FrontendHandler frontendHandler) { SQLStatement sqlStatement = new SQLJudgeEngine(sql).judge(); if (SQLType.DCL == sqlStatement.getType() || sqlStatement instanceof SetStatement) { return new SchemaBroadcastBackendHandler(sequenceId, sql, backendConnection, databaseType); } - if (sqlStatement instanceof UseStatement || sqlStatement instanceof ShowDatabasesStatement) { return new SchemaIgnoreBackendHandler(sqlStatement, frontendHandler); } - if (sqlStatement instanceof ShowOtherStatement) { return new SchemaUnicastBackendHandler(sequenceId, sql, backendConnection, DatabaseType.MySQL); } - return newTextProtocolInstance(sequenceId, sql, backendConnection, DatabaseType.MySQL); } From d8fabf3ceb34b4c799e9e308031ea77fba9d8cbe Mon Sep 17 00:00:00 2001 From: cherrylzhao Date: Mon, 3 Dec 2018 12:22:59 +0800 Subject: [PATCH 5/5] for #1238 Simplify CommandPacketFactory. --- .../backend/BackendHandlerFactory.java | 9 +-- .../backend/SchemaIgnoreBackendHandler.java | 10 +-- .../jdbc/connection/BackendConnection.java | 4 +- .../frontend/mysql/CommandExecutor.java | 2 +- .../packet/command/CommandPacketFactory.java | 9 +-- .../command/admin/initdb/ComInitDbPacket.java | 10 +-- .../query/text/query/ComQueryPacket.java | 5 +- .../command/CommandPacketFactoryTest.java | 68 +++++++++---------- .../admin/initdb/ComInitDbPacketTest.java | 11 ++- .../query/text/query/ComQueryPacketTest.java | 12 ++-- 10 files changed, 61 insertions(+), 79 deletions(-) diff --git a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/backend/BackendHandlerFactory.java b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/backend/BackendHandlerFactory.java index 138ad7429da74..f1c431b8dd871 100644 --- a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/backend/BackendHandlerFactory.java +++ b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/backend/BackendHandlerFactory.java @@ -32,7 +32,6 @@ import io.shardingsphere.shardingproxy.backend.jdbc.wrapper.PreparedStatementExecutorWrapper; import io.shardingsphere.shardingproxy.backend.jdbc.wrapper.StatementExecutorWrapper; import io.shardingsphere.shardingproxy.backend.netty.NettyBackendHandler; -import io.shardingsphere.shardingproxy.frontend.common.FrontendHandler; import io.shardingsphere.shardingproxy.runtime.GlobalRegistry; import io.shardingsphere.shardingproxy.runtime.schema.LogicSchema; import lombok.AccessLevel; @@ -53,23 +52,21 @@ public final class BackendHandlerFactory { private static final GlobalRegistry GLOBAL_REGISTRY = GlobalRegistry.getInstance(); /** - * Create new com query backend handler instance by SQl judge. + * Create new com query backend handler instance by SQL judge. * * @param sequenceId sequence ID of SQL packet * @param sql SQL to be executed * @param backendConnection backend connection * @param databaseType database type - * @param frontendHandler frontend handler * @return instance of backend handler */ - public static BackendHandler createBackendHandler(final int sequenceId, final String sql, final BackendConnection backendConnection, - final DatabaseType databaseType, final FrontendHandler frontendHandler) { + public static BackendHandler createBackendHandler(final int sequenceId, final String sql, final BackendConnection backendConnection, final DatabaseType databaseType) { SQLStatement sqlStatement = new SQLJudgeEngine(sql).judge(); if (SQLType.DCL == sqlStatement.getType() || sqlStatement instanceof SetStatement) { return new SchemaBroadcastBackendHandler(sequenceId, sql, backendConnection, databaseType); } if (sqlStatement instanceof UseStatement || sqlStatement instanceof ShowDatabasesStatement) { - return new SchemaIgnoreBackendHandler(sqlStatement, frontendHandler); + return new SchemaIgnoreBackendHandler(sqlStatement, backendConnection); } if (sqlStatement instanceof ShowOtherStatement) { return new SchemaUnicastBackendHandler(sequenceId, sql, backendConnection, DatabaseType.MySQL); diff --git a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/backend/SchemaIgnoreBackendHandler.java b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/backend/SchemaIgnoreBackendHandler.java index 1f97cbe8b77ae..1abefda543ec8 100644 --- a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/backend/SchemaIgnoreBackendHandler.java +++ b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/backend/SchemaIgnoreBackendHandler.java @@ -23,7 +23,7 @@ import io.shardingsphere.core.parsing.parser.dialect.mysql.statement.UseStatement; import io.shardingsphere.core.parsing.parser.sql.SQLStatement; import io.shardingsphere.core.util.SQLUtil; -import io.shardingsphere.shardingproxy.frontend.common.FrontendHandler; +import io.shardingsphere.shardingproxy.backend.jdbc.connection.BackendConnection; import io.shardingsphere.shardingproxy.runtime.GlobalRegistry; import io.shardingsphere.shardingproxy.transport.mysql.constant.ColumnType; import io.shardingsphere.shardingproxy.transport.mysql.constant.ServerErrorCode; @@ -54,7 +54,7 @@ public final class SchemaIgnoreBackendHandler implements BackendHandler { private final SQLStatement sqlStatement; - private final FrontendHandler frontendHandler; + private final BackendConnection backendConnection; private MergedResult mergedResult; @@ -67,7 +67,7 @@ public final class SchemaIgnoreBackendHandler implements BackendHandler { @Override public CommandResponsePackets execute() { if (sqlStatement instanceof UseStatement) { - return handleUseStatement((UseStatement) sqlStatement, frontendHandler); + return handleUseStatement((UseStatement) sqlStatement); } if (sqlStatement instanceof ShowDatabasesStatement) { @@ -90,12 +90,12 @@ public ResultPacket getResultValue() throws SQLException { return new ResultPacket(++currentSequenceId, data, columnCount, columnTypes); } - private CommandResponsePackets handleUseStatement(final UseStatement useStatement, final FrontendHandler frontendHandler) { + private CommandResponsePackets handleUseStatement(final UseStatement useStatement) { String schema = SQLUtil.getExactlyValue(useStatement.getSchema()); if (!GLOBAL_REGISTRY.schemaExists(schema)) { return new CommandResponsePackets(new ErrPacket(1, ServerErrorCode.ER_BAD_DB_ERROR, schema)); } - frontendHandler.getBackendConnection().setCurrentSchema(schema); + backendConnection.setCurrentSchema(schema); return new CommandResponsePackets(new OKPacket(1)); } diff --git a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/backend/jdbc/connection/BackendConnection.java b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/backend/jdbc/connection/BackendConnection.java index d58f87657ad04..7bd08652d58c2 100644 --- a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/backend/jdbc/connection/BackendConnection.java +++ b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/backend/jdbc/connection/BackendConnection.java @@ -93,7 +93,7 @@ public void setTransactionType(final TransactionType transactionType) { /** * Change logic schema of current channel. * - * @param schemaName current schema + * @param schemaName schema name */ public void setCurrentSchema(final String schemaName) { if (isSwitchFailed()) { @@ -124,7 +124,7 @@ private boolean isSwitchFailed() { * @param connectionMode connection mode * @param dataSourceName data source name * @param connectionSize size of connections to be get - * @return connection + * @return connections * @throws SQLException SQL exception */ public List getConnections(final ConnectionMode connectionMode, final String dataSourceName, final int connectionSize) throws SQLException { diff --git a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/frontend/mysql/CommandExecutor.java b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/frontend/mysql/CommandExecutor.java index 18c560b3b52da..695512ff6cbec 100644 --- a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/frontend/mysql/CommandExecutor.java +++ b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/frontend/mysql/CommandExecutor.java @@ -89,7 +89,7 @@ public void run() { private CommandPacket getCommandPacket(final MySQLPacketPayload payload, final BackendConnection backendConnection, final FrontendHandler frontendHandler) throws SQLException { int sequenceId = payload.readInt1(); - return CommandPacketFactory.newInstance(sequenceId, payload, backendConnection, frontendHandler); + return CommandPacketFactory.newInstance(sequenceId, payload, backendConnection); } private void writeMoreResults(final QueryCommandPacket queryCommandPacket, final int headPacketsCount) throws SQLException { diff --git a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/CommandPacketFactory.java b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/CommandPacketFactory.java index 466326eb1ad1d..2d736630a7b54 100644 --- a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/CommandPacketFactory.java +++ b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/CommandPacketFactory.java @@ -18,7 +18,6 @@ package io.shardingsphere.shardingproxy.transport.mysql.packet.command; import io.shardingsphere.shardingproxy.backend.jdbc.connection.BackendConnection; -import io.shardingsphere.shardingproxy.frontend.common.FrontendHandler; import io.shardingsphere.shardingproxy.transport.mysql.packet.MySQLPacketPayload; import io.shardingsphere.shardingproxy.transport.mysql.packet.command.admin.UnsupportedCommandPacket; import io.shardingsphere.shardingproxy.transport.mysql.packet.command.admin.initdb.ComInitDbPacket; @@ -50,23 +49,21 @@ public final class CommandPacketFactory { * @param sequenceId sequence id * @param payload MySQL packet payload * @param backendConnection backend connection - * @param frontendHandler frontend handler * @return command packet * @throws SQLException SQL exception */ - public static CommandPacket newInstance(final int sequenceId, final MySQLPacketPayload payload, - final BackendConnection backendConnection, final FrontendHandler frontendHandler) throws SQLException { + public static CommandPacket newInstance(final int sequenceId, final MySQLPacketPayload payload, final BackendConnection backendConnection) throws SQLException { int commandPacketTypeValue = payload.readInt1(); CommandPacketType type = CommandPacketType.valueOf(commandPacketTypeValue); switch (type) { case COM_QUIT: return new ComQuitPacket(sequenceId); case COM_INIT_DB: - return new ComInitDbPacket(sequenceId, payload, frontendHandler); + return new ComInitDbPacket(sequenceId, payload, backendConnection); case COM_FIELD_LIST: return new ComFieldListPacket(sequenceId, payload, backendConnection); case COM_QUERY: - return new ComQueryPacket(sequenceId, payload, backendConnection, frontendHandler); + return new ComQueryPacket(sequenceId, payload, backendConnection); case COM_STMT_PREPARE: return new ComStmtPreparePacket(sequenceId, backendConnection, payload); case COM_STMT_EXECUTE: diff --git a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/admin/initdb/ComInitDbPacket.java b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/admin/initdb/ComInitDbPacket.java index 1573b4a903999..e7a6d6d68a02b 100644 --- a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/admin/initdb/ComInitDbPacket.java +++ b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/admin/initdb/ComInitDbPacket.java @@ -18,7 +18,7 @@ package io.shardingsphere.shardingproxy.transport.mysql.packet.command.admin.initdb; import com.google.common.base.Optional; -import io.shardingsphere.shardingproxy.frontend.common.FrontendHandler; +import io.shardingsphere.shardingproxy.backend.jdbc.connection.BackendConnection; import io.shardingsphere.shardingproxy.runtime.GlobalRegistry; import io.shardingsphere.shardingproxy.transport.mysql.constant.ServerErrorCode; import io.shardingsphere.shardingproxy.transport.mysql.packet.MySQLPacketPayload; @@ -44,12 +44,12 @@ public final class ComInitDbPacket implements CommandPacket { private final String schema; - private FrontendHandler frontendHandler; + private final BackendConnection backendConnection; - public ComInitDbPacket(final int sequenceId, final MySQLPacketPayload payload, final FrontendHandler frontendHandler) { + public ComInitDbPacket(final int sequenceId, final MySQLPacketPayload payload, final BackendConnection backendConnection) { this.sequenceId = sequenceId; schema = payload.readStringEOF(); - this.frontendHandler = frontendHandler; + this.backendConnection = backendConnection; } @Override @@ -62,7 +62,7 @@ public void write(final MySQLPacketPayload payload) { public Optional execute() { log.debug("Schema name received for Sharding-Proxy: {}", schema); if (GlobalRegistry.getInstance().schemaExists(schema)) { - frontendHandler.getBackendConnection().setCurrentSchema(schema); + backendConnection.setCurrentSchema(schema); return Optional.of(new CommandResponsePackets(new OKPacket(getSequenceId() + 1))); } return Optional.of(new CommandResponsePackets(new ErrPacket(getSequenceId() + 1, ServerErrorCode.ER_BAD_DB_ERROR, schema))); diff --git a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/query/text/query/ComQueryPacket.java b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/query/text/query/ComQueryPacket.java index 68e796b20f490..2cf28eefd6779 100644 --- a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/query/text/query/ComQueryPacket.java +++ b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/query/text/query/ComQueryPacket.java @@ -25,7 +25,6 @@ import io.shardingsphere.shardingproxy.backend.ResultPacket; import io.shardingsphere.shardingproxy.backend.jdbc.connection.BackendConnection; import io.shardingsphere.shardingproxy.backend.jdbc.connection.BackendTransactionManager; -import io.shardingsphere.shardingproxy.frontend.common.FrontendHandler; import io.shardingsphere.shardingproxy.runtime.GlobalRegistry; import io.shardingsphere.shardingproxy.transport.common.packet.DatabasePacket; import io.shardingsphere.shardingproxy.transport.mysql.constant.ServerErrorCode; @@ -62,10 +61,10 @@ public final class ComQueryPacket implements QueryCommandPacket { private final BackendTransactionManager backendTransactionManager; - public ComQueryPacket(final int sequenceId, final MySQLPacketPayload payload, final BackendConnection backendConnection, final FrontendHandler frontendHandler) { + public ComQueryPacket(final int sequenceId, final MySQLPacketPayload payload, final BackendConnection backendConnection) { this.sequenceId = sequenceId; sql = payload.readStringEOF(); - backendHandler = BackendHandlerFactory.createBackendHandler(sequenceId, sql, backendConnection, DatabaseType.MySQL, frontendHandler); + backendHandler = BackendHandlerFactory.createBackendHandler(sequenceId, sql, backendConnection, DatabaseType.MySQL); backendTransactionManager = new BackendTransactionManager(backendConnection); } diff --git a/sharding-proxy/src/test/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/CommandPacketFactoryTest.java b/sharding-proxy/src/test/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/CommandPacketFactoryTest.java index c08da2ec186fa..d0fa65c433eb2 100644 --- a/sharding-proxy/src/test/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/CommandPacketFactoryTest.java +++ b/sharding-proxy/src/test/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/CommandPacketFactoryTest.java @@ -23,7 +23,6 @@ import io.shardingsphere.core.constant.transaction.TransactionType; import io.shardingsphere.core.metadata.ShardingMetaData; import io.shardingsphere.shardingproxy.backend.jdbc.connection.BackendConnection; -import io.shardingsphere.shardingproxy.frontend.common.FrontendHandler; import io.shardingsphere.shardingproxy.runtime.GlobalRegistry; import io.shardingsphere.shardingproxy.runtime.schema.ShardingSchema; import io.shardingsphere.shardingproxy.transport.mysql.constant.NewParametersBoundFlag; @@ -64,9 +63,6 @@ public final class CommandPacketFactoryTest { private BackendConnection backendConnection = new BackendConnection(TransactionType.LOCAL); - @Mock - private FrontendHandler frontendHandler; - @Before public void setUp() throws ReflectiveOperationException { setShardingSchemas(); @@ -97,32 +93,32 @@ private void setMaxConnectionsSizePerQuery() throws ReflectiveOperationException @Test public void assertNewInstanceWithComQuitPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_QUIT.getValue()); - assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(ComQuitPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection), instanceOf(ComQuitPacket.class)); } @Test public void assertNewInstanceWithComInitDbPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_INIT_DB.getValue()); - assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(ComInitDbPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection), instanceOf(ComInitDbPacket.class)); } @Test public void assertNewInstanceWithComFieldListPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_FIELD_LIST.getValue()); - assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(ComFieldListPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection), instanceOf(ComFieldListPacket.class)); } @Test public void assertNewInstanceWithComQueryPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_QUERY.getValue()); when(payload.readStringEOF()).thenReturn("SHOW TABLES"); - assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(ComQueryPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection), instanceOf(ComQueryPacket.class)); } @Test public void assertNewInstanceWithComStmtPreparePacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_STMT_PREPARE.getValue()); - assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(ComStmtPreparePacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection), instanceOf(ComStmtPreparePacket.class)); } @Test @@ -130,162 +126,162 @@ public void assertNewInstanceWithComStmtExecutePacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_STMT_EXECUTE.getValue(), NewParametersBoundFlag.PARAMETER_TYPE_EXIST.getValue()); when(payload.readInt4()).thenReturn(1); BinaryStatementRegistry.getInstance().register("SELECT * FROM t_order", 1); - assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(ComStmtExecutePacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection), instanceOf(ComStmtExecutePacket.class)); } @Test public void assertNewInstanceWithComStmtClosePacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_STMT_CLOSE.getValue()); - assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(ComStmtClosePacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection), instanceOf(ComStmtClosePacket.class)); } @Test public void assertNewInstanceWithComPingPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_PING.getValue()); - assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(ComPingPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection), instanceOf(ComPingPacket.class)); } @Test public void assertNewInstanceWithComSleepPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_SLEEP.getValue()); - assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection), instanceOf(UnsupportedCommandPacket.class)); } @Test public void assertNewInstanceWithComCreateDbPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_CREATE_DB.getValue()); - assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection), instanceOf(UnsupportedCommandPacket.class)); } @Test public void assertNewInstanceWithComDropDbPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_DROP_DB.getValue()); - assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection), instanceOf(UnsupportedCommandPacket.class)); } @Test public void assertNewInstanceWithComRefreshPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_REFRESH.getValue()); - assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection), instanceOf(UnsupportedCommandPacket.class)); } @Test public void assertNewInstanceWithComShutDownPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_SHUTDOWN.getValue()); - assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection), instanceOf(UnsupportedCommandPacket.class)); } @Test public void assertNewInstanceWithComStatisticsPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_STATISTICS.getValue()); - assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection), instanceOf(UnsupportedCommandPacket.class)); } @Test public void assertNewInstanceWithComProcessInfoPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_PROCESS_INFO.getValue()); - assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection), instanceOf(UnsupportedCommandPacket.class)); } @Test public void assertNewInstanceWithComConnectPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_CONNECT.getValue()); - assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection), instanceOf(UnsupportedCommandPacket.class)); } @Test public void assertNewInstanceWithComProcessKillPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_PROCESS_KILL.getValue()); - assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection), instanceOf(UnsupportedCommandPacket.class)); } @Test public void assertNewInstanceWithComDebugPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_DEBUG.getValue()); - assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection), instanceOf(UnsupportedCommandPacket.class)); } @Test public void assertNewInstanceWithComTimePacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_TIME.getValue()); - assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection), instanceOf(UnsupportedCommandPacket.class)); } @Test public void assertNewInstanceWithComDelayedInsertPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_DELAYED_INSERT.getValue()); - assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection), instanceOf(UnsupportedCommandPacket.class)); } @Test public void assertNewInstanceWithComChangeUserPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_CHANGE_USER.getValue()); - assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection), instanceOf(UnsupportedCommandPacket.class)); } @Test public void assertNewInstanceWithComBinlogDumpPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_BINLOG_DUMP.getValue()); - assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection), instanceOf(UnsupportedCommandPacket.class)); } @Test public void assertNewInstanceWithComTableDumpPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_TABLE_DUMP.getValue()); - assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection), instanceOf(UnsupportedCommandPacket.class)); } @Test public void assertNewInstanceWithComConnectOutPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_CONNECT_OUT.getValue()); - assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection), instanceOf(UnsupportedCommandPacket.class)); } @Test public void assertNewInstanceWithComRegisterSlavePacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_REGISTER_SLAVE.getValue()); - assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection), instanceOf(UnsupportedCommandPacket.class)); } @Test public void assertNewInstanceWithComStmtSendLongDataPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_STMT_SEND_LONG_DATA.getValue()); - assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection), instanceOf(UnsupportedCommandPacket.class)); } @Test public void assertNewInstanceWithComStmtResetPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_STMT_RESET.getValue()); - assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection), instanceOf(UnsupportedCommandPacket.class)); } @Test public void assertNewInstanceWithComSetOptionPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_SET_OPTION.getValue()); - assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection), instanceOf(UnsupportedCommandPacket.class)); } @Test public void assertNewInstanceWithComStmtFetchPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_STMT_FETCH.getValue()); - assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection), instanceOf(UnsupportedCommandPacket.class)); } @Test public void assertNewInstanceWithComDaemonPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_DAEMON.getValue()); - assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection), instanceOf(UnsupportedCommandPacket.class)); } @Test public void assertNewInstanceWithComBinlogDumpGTIDPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_BINLOG_DUMP_GTID.getValue()); - assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection), instanceOf(UnsupportedCommandPacket.class)); } @Test public void assertNewInstanceWithComResetConnectionPacket() throws SQLException { when(payload.readInt1()).thenReturn(CommandPacketType.COM_RESET_CONNECTION.getValue()); - assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection, frontendHandler), instanceOf(UnsupportedCommandPacket.class)); + assertThat(CommandPacketFactory.newInstance(1, payload, backendConnection), instanceOf(UnsupportedCommandPacket.class)); } } diff --git a/sharding-proxy/src/test/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/admin/initdb/ComInitDbPacketTest.java b/sharding-proxy/src/test/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/admin/initdb/ComInitDbPacketTest.java index e810b9d6bdaa1..13ccd322946c5 100644 --- a/sharding-proxy/src/test/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/admin/initdb/ComInitDbPacketTest.java +++ b/sharding-proxy/src/test/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/admin/initdb/ComInitDbPacketTest.java @@ -21,7 +21,6 @@ import io.shardingsphere.core.constant.ShardingConstant; import io.shardingsphere.core.constant.transaction.TransactionType; import io.shardingsphere.shardingproxy.backend.jdbc.connection.BackendConnection; -import io.shardingsphere.shardingproxy.frontend.common.FrontendHandler; import io.shardingsphere.shardingproxy.runtime.GlobalRegistry; import io.shardingsphere.shardingproxy.runtime.schema.LogicSchema; import io.shardingsphere.shardingproxy.transport.mysql.constant.ServerErrorCode; @@ -54,8 +53,7 @@ public final class ComInitDbPacketTest { @Mock private MySQLPacketPayload payload; - @Mock - private FrontendHandler frontendHandler; + private BackendConnection backendConnection = new BackendConnection(TransactionType.LOCAL); @Before @SneakyThrows @@ -64,13 +62,12 @@ public void setUp() { Field field = GlobalRegistry.class.getDeclaredField("logicSchemas"); field.setAccessible(true); field.set(GlobalRegistry.getInstance(), logicSchemas); - when(frontendHandler.getBackendConnection()).thenReturn(new BackendConnection(TransactionType.LOCAL)); } @Test public void assertExecuteWithValidSchemaName() { when(payload.readStringEOF()).thenReturn(ShardingConstant.LOGIC_SCHEMA_NAME); - Optional actual = new ComInitDbPacket(1, payload, frontendHandler).execute(); + Optional actual = new ComInitDbPacket(1, payload, backendConnection).execute(); assertTrue(actual.isPresent()); assertThat(actual.get().getPackets().size(), is(1)); assertThat(actual.get().getHeadPacket().getSequenceId(), is(2)); @@ -84,7 +81,7 @@ public void assertExecuteWithValidSchemaName() { public void assertExecuteWithInvalidSchemaName() { String invalidSchema = "invalid_schema"; when(payload.readStringEOF()).thenReturn(invalidSchema); - Optional actual = new ComInitDbPacket(1, payload, frontendHandler).execute(); + Optional actual = new ComInitDbPacket(1, payload, backendConnection).execute(); assertTrue(actual.isPresent()); assertThat(actual.get().getPackets().size(), is(1)); assertThat(actual.get().getHeadPacket().getSequenceId(), is(2)); @@ -96,7 +93,7 @@ public void assertExecuteWithInvalidSchemaName() { @Test public void assertWrite() { when(payload.readStringEOF()).thenReturn(ShardingConstant.LOGIC_SCHEMA_NAME); - ComInitDbPacket actual = new ComInitDbPacket(1, payload, frontendHandler); + ComInitDbPacket actual = new ComInitDbPacket(1, payload, backendConnection); assertThat(actual.getSequenceId(), is(1)); actual.write(payload); verify(payload).writeInt1(CommandPacketType.COM_INIT_DB.getValue()); diff --git a/sharding-proxy/src/test/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/query/text/query/ComQueryPacketTest.java b/sharding-proxy/src/test/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/query/text/query/ComQueryPacketTest.java index 7bb7a7506a379..46d34598e6dcd 100644 --- a/sharding-proxy/src/test/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/query/text/query/ComQueryPacketTest.java +++ b/sharding-proxy/src/test/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/query/text/query/ComQueryPacketTest.java @@ -26,7 +26,6 @@ import io.shardingsphere.shardingproxy.backend.ResultPacket; import io.shardingsphere.shardingproxy.backend.jdbc.connection.BackendConnection; import io.shardingsphere.shardingproxy.backend.jdbc.connection.ConnectionStatus; -import io.shardingsphere.shardingproxy.frontend.common.FrontendHandler; import io.shardingsphere.shardingproxy.runtime.GlobalRegistry; import io.shardingsphere.shardingproxy.runtime.schema.ShardingSchema; import io.shardingsphere.shardingproxy.transport.common.packet.DatabasePacket; @@ -72,9 +71,6 @@ public final class ComQueryPacketTest { private BackendConnection backendConnection = new BackendConnection(TransactionType.LOCAL); - @Mock - private FrontendHandler frontendHandler; - @BeforeClass public static void init() { setGlobalRegistry(); @@ -132,7 +128,7 @@ public void assertExecuteWithoutTransaction() throws SQLException { when(backendHandler.execute()).thenReturn(new CommandResponsePackets(expectedFieldCountPacket)); when(backendHandler.next()).thenReturn(true, false); when(backendHandler.getResultValue()).thenReturn(new ResultPacket(2, Collections.singletonList(99999L), 1, Collections.singletonList(ColumnType.MYSQL_TYPE_LONG))); - ComQueryPacket packet = new ComQueryPacket(1, payload, backendConnection, frontendHandler); + ComQueryPacket packet = new ComQueryPacket(1, payload, backendConnection); setBackendHandler(packet, backendHandler); Optional actual = packet.execute(); assertTrue(actual.isPresent()); @@ -154,7 +150,7 @@ private void setBackendHandler(final ComQueryPacket packet, final BackendHandler @Test public void assertExecuteTCLWithLocalTransaction() { when(payload.readStringEOF()).thenReturn("COMMIT"); - ComQueryPacket packet = new ComQueryPacket(1, payload, backendConnection, frontendHandler); + ComQueryPacket packet = new ComQueryPacket(1, payload, backendConnection); backendConnection.getStateHandler().getAndSetStatus(ConnectionStatus.TRANSACTION); Optional actual = packet.execute(); assertTrue(actual.isPresent()); @@ -165,7 +161,7 @@ public void assertExecuteTCLWithLocalTransaction() { public void assertExecuteTCLWithXATransaction() { backendConnection.setTransactionType(TransactionType.XA); when(payload.readStringEOF()).thenReturn("ROLLBACK"); - ComQueryPacket packet = new ComQueryPacket(1, payload, backendConnection, frontendHandler); + ComQueryPacket packet = new ComQueryPacket(1, payload, backendConnection); backendConnection.getStateHandler().getAndSetStatus(ConnectionStatus.TRANSACTION); Optional actual = packet.execute(); assertTrue(actual.isPresent()); @@ -177,7 +173,7 @@ public void assertExecuteTCLWithXATransaction() { public void assertExecuteRollbackWithXATransaction() { backendConnection.setTransactionType(TransactionType.XA); when(payload.readStringEOF()).thenReturn("COMMIT"); - ComQueryPacket packet = new ComQueryPacket(1, payload, backendConnection, frontendHandler); + ComQueryPacket packet = new ComQueryPacket(1, payload, backendConnection); backendConnection.getStateHandler().getAndSetStatus(ConnectionStatus.TRANSACTION); Optional actual = packet.execute(); assertTrue(actual.isPresent());