Skip to content

Commit

Permalink
Merge pull request #1540 from cherrylzhao/dev
Browse files Browse the repository at this point in the history
for #1238 Make connectionId binding with backendConnection
  • Loading branch information
terrymanu authored Dec 3, 2018
2 parents 0304460 + d8fabf3 commit b2bd30c
Show file tree
Hide file tree
Showing 19 changed files with 117 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import io.shardingsphere.shardingproxy.backend.jdbc.wrapper.PreparedStatementExecutorWrapper;
import io.shardingsphere.shardingproxy.backend.jdbc.wrapper.StatementExecutorWrapper;
import io.shardingsphere.shardingproxy.backend.netty.NettyBackendHandler;
import io.shardingsphere.shardingproxy.frontend.common.FrontendHandler;
import io.shardingsphere.shardingproxy.runtime.GlobalRegistry;
import io.shardingsphere.shardingproxy.runtime.schema.LogicSchema;
import lombok.AccessLevel;
Expand All @@ -53,68 +52,59 @@ public final class BackendHandlerFactory {
private static final GlobalRegistry GLOBAL_REGISTRY = GlobalRegistry.getInstance();

/**
* Create new instance of text protocol backend handler.
* Create new com query backend handler instance by SQL judge.
*
* @param connectionId connection ID of database connected
* @param sequenceId sequence ID of SQL packet
* @param sql SQL to be executed
* @param backendConnection backend connection
* @param databaseType database type
* @return instance of text protocol backend handler
* @return instance of backend handler
*/
public static BackendHandler newTextProtocolInstance(
final int connectionId, final int sequenceId, final String sql, final BackendConnection backendConnection, final DatabaseType databaseType) {
LogicSchema logicSchema = backendConnection.getLogicSchema();
return GLOBAL_REGISTRY.getShardingProperties().<Boolean>getValue(ShardingPropertiesConstant.PROXY_BACKEND_USE_NIO)
? new NettyBackendHandler(logicSchema, connectionId, sequenceId, sql, databaseType)
: new JDBCBackendHandler(logicSchema, sql, new JDBCExecuteEngine(backendConnection, new StatementExecutorWrapper(logicSchema)));
public static BackendHandler createBackendHandler(final int sequenceId, final String sql, final BackendConnection backendConnection, final DatabaseType databaseType) {
SQLStatement sqlStatement = new SQLJudgeEngine(sql).judge();
if (SQLType.DCL == sqlStatement.getType() || sqlStatement instanceof SetStatement) {
return new SchemaBroadcastBackendHandler(sequenceId, sql, backendConnection, databaseType);
}
if (sqlStatement instanceof UseStatement || sqlStatement instanceof ShowDatabasesStatement) {
return new SchemaIgnoreBackendHandler(sqlStatement, backendConnection);
}
if (sqlStatement instanceof ShowOtherStatement) {
return new SchemaUnicastBackendHandler(sequenceId, sql, backendConnection, DatabaseType.MySQL);
}
return newTextProtocolInstance(sequenceId, sql, backendConnection, DatabaseType.MySQL);
}

/**
* Create new instance of text protocol backend handler.
*
* @param connectionId connection ID of database connected
* @param sequenceId sequence ID of SQL packet
* @param sql SQL to be executed
* @param parameters SQL parameters
* @param backendConnection backend connection
* @param databaseType database type
* @return instance of text protocol backend handler
*/
public static BackendHandler newBinaryProtocolInstance(final int connectionId, final int sequenceId, final String sql, final List<Object> parameters,
final BackendConnection backendConnection, final DatabaseType databaseType) {
public static BackendHandler newTextProtocolInstance(final int sequenceId, final String sql, final BackendConnection backendConnection, final DatabaseType databaseType) {
LogicSchema logicSchema = backendConnection.getLogicSchema();
return GLOBAL_REGISTRY.getShardingProperties().<Boolean>getValue(ShardingPropertiesConstant.PROXY_BACKEND_USE_NIO)
? new NettyBackendHandler(logicSchema, connectionId, sequenceId, sql, databaseType)
: new JDBCBackendHandler(logicSchema, sql, new JDBCExecuteEngine(backendConnection, new PreparedStatementExecutorWrapper(logicSchema, parameters)));
? new NettyBackendHandler(logicSchema, backendConnection.getConnectionId(), sequenceId, sql, databaseType)
: new JDBCBackendHandler(logicSchema, sql, new JDBCExecuteEngine(backendConnection, new StatementExecutorWrapper(logicSchema)));
}

/**
* Create new instance by sql judge.
*
* @param connectionId connection ID of database connected
* Create new instance of text protocol backend handler.
*
* @param sequenceId sequence ID of SQL packet
* @param sql SQL to be executed
* @param parameters SQL parameters
* @param backendConnection backend connection
* @param databaseType database type
* @param frontendHandler frontend handler
* @return instance of backend handler
* @return instance of text protocol backend handler
*/
public static BackendHandler createBackendHandler(
final int connectionId, final int sequenceId, final String sql, final BackendConnection backendConnection, final DatabaseType databaseType, final FrontendHandler frontendHandler) {
SQLStatement sqlStatement = new SQLJudgeEngine(sql).judge();
if (SQLType.DCL == sqlStatement.getType() || sqlStatement instanceof SetStatement) {
return new SchemaBroadcastBackendHandler(connectionId, sequenceId, sql, backendConnection, databaseType);
}

if (sqlStatement instanceof UseStatement || sqlStatement instanceof ShowDatabasesStatement) {
return new SchemaIgnoreBackendHandler(sqlStatement, frontendHandler);
}

if (sqlStatement instanceof ShowOtherStatement) {
return new SchemaUnicastBackendHandler(connectionId, sequenceId, sql, backendConnection, DatabaseType.MySQL);
}

return newTextProtocolInstance(connectionId, sequenceId, sql, backendConnection, DatabaseType.MySQL);
public static BackendHandler newBinaryProtocolInstance(final int sequenceId, final String sql, final List<Object> parameters,
final BackendConnection backendConnection, final DatabaseType databaseType) {
LogicSchema logicSchema = backendConnection.getLogicSchema();
return GLOBAL_REGISTRY.getShardingProperties().<Boolean>getValue(ShardingPropertiesConstant.PROXY_BACKEND_USE_NIO)
? new NettyBackendHandler(logicSchema, backendConnection.getConnectionId(), sequenceId, sql, databaseType)
: new JDBCBackendHandler(logicSchema, sql, new JDBCExecuteEngine(backendConnection, new PreparedStatementExecutorWrapper(logicSchema, parameters)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@
@RequiredArgsConstructor
public final class SchemaBroadcastBackendHandler implements BackendHandler {

private final int connectionId;

private final int sequenceId;

private final String sql;
Expand All @@ -50,9 +48,9 @@ public final class SchemaBroadcastBackendHandler implements BackendHandler {
@Override
public CommandResponsePackets execute() {
List<DatabasePacket> packets = new LinkedList<>();
for (String schema : GlobalRegistry.getInstance().getSchemaNames()) {
backendConnection.setCurrentSchema(schema);
BackendHandler backendHandler = BackendHandlerFactory.newTextProtocolInstance(connectionId, sequenceId, sql, backendConnection, databaseType);
for (String each : GlobalRegistry.getInstance().getSchemaNames()) {
backendConnection.setCurrentSchema(each);
BackendHandler backendHandler = BackendHandlerFactory.newTextProtocolInstance(sequenceId, sql, backendConnection, databaseType);
CommandResponsePackets commandResponsePackets = backendHandler.execute();
packets.addAll(commandResponsePackets.getPackets());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import io.shardingsphere.core.parsing.parser.dialect.mysql.statement.UseStatement;
import io.shardingsphere.core.parsing.parser.sql.SQLStatement;
import io.shardingsphere.core.util.SQLUtil;
import io.shardingsphere.shardingproxy.frontend.common.FrontendHandler;
import io.shardingsphere.shardingproxy.backend.jdbc.connection.BackendConnection;
import io.shardingsphere.shardingproxy.runtime.GlobalRegistry;
import io.shardingsphere.shardingproxy.transport.mysql.constant.ColumnType;
import io.shardingsphere.shardingproxy.transport.mysql.constant.ServerErrorCode;
Expand Down Expand Up @@ -54,7 +54,7 @@ public final class SchemaIgnoreBackendHandler implements BackendHandler {

private final SQLStatement sqlStatement;

private final FrontendHandler frontendHandler;
private final BackendConnection backendConnection;

private MergedResult mergedResult;

Expand All @@ -67,7 +67,7 @@ public final class SchemaIgnoreBackendHandler implements BackendHandler {
@Override
public CommandResponsePackets execute() {
if (sqlStatement instanceof UseStatement) {
return handleUseStatement((UseStatement) sqlStatement, frontendHandler);
return handleUseStatement((UseStatement) sqlStatement);
}

if (sqlStatement instanceof ShowDatabasesStatement) {
Expand All @@ -90,12 +90,12 @@ public ResultPacket getResultValue() throws SQLException {
return new ResultPacket(++currentSequenceId, data, columnCount, columnTypes);
}

private CommandResponsePackets handleUseStatement(final UseStatement useStatement, final FrontendHandler frontendHandler) {
private CommandResponsePackets handleUseStatement(final UseStatement useStatement) {
String schema = SQLUtil.getExactlyValue(useStatement.getSchema());
if (!GLOBAL_REGISTRY.schemaExists(schema)) {
return new CommandResponsePackets(new ErrPacket(1, ServerErrorCode.ER_BAD_DB_ERROR, schema));
}
frontendHandler.getBackendConnection().setCurrentSchema(schema);
backendConnection.setCurrentSchema(schema);
return new CommandResponsePackets(new OKPacket(1));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ public final class SchemaUnicastBackendHandler implements BackendHandler {

private final BackendHandler backendHandler;

public SchemaUnicastBackendHandler(final int connectionId, final int sequenceId, final String sql, final BackendConnection backendConnection, final DatabaseType databaseType) {
backendHandler = BackendHandlerFactory.newTextProtocolInstance(connectionId, sequenceId, sql, backendConnection, databaseType);
public SchemaUnicastBackendHandler(final int sequenceId, final String sql, final BackendConnection backendConnection, final DatabaseType databaseType) {
backendHandler = BackendHandlerFactory.newTextProtocolInstance(sequenceId, sql, backendConnection, databaseType);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.LinkedHashMultimap;
import com.google.common.collect.Multimap;
import io.netty.channel.ChannelHandlerContext;
import io.shardingsphere.core.constant.ConnectionMode;
import io.shardingsphere.core.constant.transaction.TransactionType;
import io.shardingsphere.core.exception.ShardingException;
Expand Down Expand Up @@ -54,10 +53,15 @@ public final class BackendConnection implements AutoCloseable {

private static final int MAXIMUM_RETRY_COUNT = 5;

private volatile String currentSchema;
private volatile String schemaName;

private LogicSchema logicSchema;

private TransactionType transactionType;

@Setter
private int connectionId;

private final Multimap<String, Connection> cachedConnections = LinkedHashMultimap.create();

private final Collection<Statement> cachedStatements = new CopyOnWriteArrayList<>();
Expand All @@ -70,11 +74,6 @@ public final class BackendConnection implements AutoCloseable {

private final ConnectionStateHandler stateHandler = new ConnectionStateHandler(resourceSynchronizer);

private TransactionType transactionType;

@Setter
private ChannelHandlerContext context;

public BackendConnection(final TransactionType transactionType) {
this.transactionType = transactionType;
}
Expand All @@ -94,14 +93,14 @@ public void setTransactionType(final TransactionType transactionType) {
/**
* Change logic schema of current channel.
*
* @param currentSchema current schema
* @param schemaName schema name
*/
public void setCurrentSchema(final String currentSchema) {
public void setCurrentSchema(final String schemaName) {
if (isSwitchFailed()) {
throw new ShardingException("Failed to switch schema, please terminate current transaction.");
}
this.currentSchema = currentSchema;
this.logicSchema = GlobalRegistry.getInstance().getLogicSchema(currentSchema);
this.schemaName = schemaName;
this.logicSchema = GlobalRegistry.getInstance().getLogicSchema(schemaName);
}

@SneakyThrows
Expand All @@ -125,7 +124,7 @@ private boolean isSwitchFailed() {
* @param connectionMode connection mode
* @param dataSourceName data source name
* @param connectionSize size of connections to be get
* @return connection
* @return connections
* @throws SQLException SQL exception
*/
public List<Connection> getConnections(final ConnectionMode connectionMode, final String dataSourceName, final int connectionSize) throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ public abstract class FrontendHandler extends ChannelInboundHandlerAdapter {

@Override
public final void channelActive(final ChannelHandlerContext context) {
backendConnection.setContext(context);
ChannelThreadExecutorGroup.getInstance().register(context.channel().id());
handshake(context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.netty.channel.ChannelHandlerContext;
import io.shardingsphere.shardingproxy.backend.jdbc.connection.BackendConnection;
import io.shardingsphere.shardingproxy.frontend.common.FrontendHandler;
import io.shardingsphere.shardingproxy.runtime.ChannelRegistry;
import io.shardingsphere.shardingproxy.transport.common.packet.DatabasePacket;
import io.shardingsphere.shardingproxy.transport.mysql.constant.ServerErrorCode;
import io.shardingsphere.shardingproxy.transport.mysql.packet.MySQLPacketPayload;
Expand Down Expand Up @@ -90,8 +89,7 @@ public void run() {

private CommandPacket getCommandPacket(final MySQLPacketPayload payload, final BackendConnection backendConnection, final FrontendHandler frontendHandler) throws SQLException {
int sequenceId = payload.readInt1();
int connectionId = ChannelRegistry.getInstance().getConnectionId(context.channel().id().asShortText());
return CommandPacketFactory.newInstance(sequenceId, connectionId, payload, backendConnection, frontendHandler);
return CommandPacketFactory.newInstance(sequenceId, payload, backendConnection);
}

private void writeMoreResults(final QueryCommandPacket queryCommandPacket, final int headPacketsCount) throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public final class MySQLFrontendHandler extends FrontendHandler {
protected void handshake(final ChannelHandlerContext context) {
int connectionId = ConnectionIdGenerator.getInstance().nextId();
ChannelRegistry.getInstance().putConnectionId(context.channel().id().asShortText(), connectionId);
getBackendConnection().setConnectionId(connectionId);
context.writeAndFlush(new HandshakePacket(connectionId, authenticationHandler.getAuthPluginData()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package io.shardingsphere.shardingproxy.transport.mysql.packet.command;

import io.shardingsphere.shardingproxy.backend.jdbc.connection.BackendConnection;
import io.shardingsphere.shardingproxy.frontend.common.FrontendHandler;
import io.shardingsphere.shardingproxy.transport.mysql.packet.MySQLPacketPayload;
import io.shardingsphere.shardingproxy.transport.mysql.packet.command.admin.UnsupportedCommandPacket;
import io.shardingsphere.shardingproxy.transport.mysql.packet.command.admin.initdb.ComInitDbPacket;
Expand Down Expand Up @@ -48,30 +47,27 @@ public final class CommandPacketFactory {
* Create new instance of command packet.
*
* @param sequenceId sequence id
* @param connectionId MySQL connection id
* @param payload MySQL packet payload
* @param backendConnection backend connection
* @param frontendHandler frontend handler
* @return command packet
* @throws SQLException SQL exception
*/
public static CommandPacket newInstance(final int sequenceId, final int connectionId, final MySQLPacketPayload payload,
final BackendConnection backendConnection, final FrontendHandler frontendHandler) throws SQLException {
public static CommandPacket newInstance(final int sequenceId, final MySQLPacketPayload payload, final BackendConnection backendConnection) throws SQLException {
int commandPacketTypeValue = payload.readInt1();
CommandPacketType type = CommandPacketType.valueOf(commandPacketTypeValue);
switch (type) {
case COM_QUIT:
return new ComQuitPacket(sequenceId);
case COM_INIT_DB:
return new ComInitDbPacket(sequenceId, payload, frontendHandler);
return new ComInitDbPacket(sequenceId, payload, backendConnection);
case COM_FIELD_LIST:
return new ComFieldListPacket(sequenceId, connectionId, payload, backendConnection);
return new ComFieldListPacket(sequenceId, payload, backendConnection);
case COM_QUERY:
return new ComQueryPacket(sequenceId, connectionId, payload, backendConnection, frontendHandler);
return new ComQueryPacket(sequenceId, payload, backendConnection);
case COM_STMT_PREPARE:
return new ComStmtPreparePacket(sequenceId, backendConnection, payload);
case COM_STMT_EXECUTE:
return new ComStmtExecutePacket(sequenceId, connectionId, payload, backendConnection);
return new ComStmtExecutePacket(sequenceId, payload, backendConnection);
case COM_STMT_CLOSE:
return new ComStmtClosePacket(sequenceId, payload);
case COM_PING:
Expand Down
Loading

0 comments on commit b2bd30c

Please sign in to comment.