Skip to content

Commit

Permalink
Merge pull request #1507 from cherrylzhao/dev-new
Browse files Browse the repository at this point in the history
#1238 Support Proxy local transaction
  • Loading branch information
terrymanu authored Nov 23, 2018
2 parents 959a447 + 624b02f commit ec67a02
Show file tree
Hide file tree
Showing 20 changed files with 862 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public static BackendHandler createBackendHandler(
final int connectionId, final int sequenceId, final String sql, final BackendConnection backendConnection, final DatabaseType databaseType, final FrontendHandler frontendHandler) {
SQLStatement sqlStatement = new SQLJudgeEngine(sql).judge();
if (SQLType.DCL == sqlStatement.getType() || sqlStatement instanceof SetStatement) {
return new SchemaBroadcastBackendHandler(connectionId, sequenceId, sql, databaseType);
return new SchemaBroadcastBackendHandler(connectionId, sequenceId, sql, backendConnection, databaseType);
}

if (sqlStatement instanceof UseStatement || sqlStatement instanceof ShowDatabasesStatement) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@
import io.shardingsphere.shardingproxy.runtime.GlobalRegistry;
import io.shardingsphere.shardingproxy.transport.common.packet.DatabasePacket;
import io.shardingsphere.shardingproxy.transport.mysql.packet.command.CommandResponsePackets;
import io.shardingsphere.shardingproxy.transport.mysql.packet.generic.ErrPacket;
import io.shardingsphere.shardingproxy.transport.mysql.packet.generic.OKPacket;
import lombok.RequiredArgsConstructor;

import java.sql.SQLException;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
Expand All @@ -45,19 +43,17 @@ public final class SchemaBroadcastBackendHandler implements BackendHandler {

private final String sql;

private final BackendConnection backendConnection;

private final DatabaseType databaseType;

@Override
public CommandResponsePackets execute() {
List<DatabasePacket> packets = new LinkedList<>();
for (String schema : GlobalRegistry.getInstance().getSchemaNames()) {
try (BackendConnection backendConnection = new BackendConnection()) {
BackendHandler backendHandler = BackendHandlerFactory.newTextProtocolInstance(connectionId, sequenceId, sql, backendConnection, databaseType, schema);
CommandResponsePackets commandResponsePackets = backendHandler.execute();
packets.addAll(commandResponsePackets.getPackets());
} catch (final SQLException ex) {
return new CommandResponsePackets(new ErrPacket(1, ex));
}
BackendHandler backendHandler = BackendHandlerFactory.newTextProtocolInstance(connectionId, sequenceId, sql, backendConnection, databaseType, schema);
CommandResponsePackets commandResponsePackets = backendHandler.execute();
packets.addAll(commandResponsePackets.getPackets());
}
return merge(packets);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import io.shardingsphere.shardingproxy.backend.AbstractBackendHandler;
import io.shardingsphere.shardingproxy.backend.BackendExecutorContext;
import io.shardingsphere.shardingproxy.backend.ResultPacket;
import io.shardingsphere.shardingproxy.backend.jdbc.connection.BackendConnection;
import io.shardingsphere.shardingproxy.backend.jdbc.connection.ConnectionStatus;
import io.shardingsphere.shardingproxy.backend.jdbc.execute.JDBCExecuteEngine;
import io.shardingsphere.shardingproxy.backend.jdbc.execute.response.ExecuteQueryResponse;
import io.shardingsphere.shardingproxy.backend.jdbc.execute.response.ExecuteResponse;
Expand All @@ -49,10 +51,8 @@
import io.shardingsphere.shardingproxy.transport.mysql.packet.generic.EofPacket;
import io.shardingsphere.shardingproxy.transport.mysql.packet.generic.ErrPacket;
import io.shardingsphere.shardingproxy.transport.mysql.packet.generic.OKPacket;
import io.shardingsphere.transaction.xa.manager.XATransactionManagerSPILoader;
import lombok.RequiredArgsConstructor;

import javax.transaction.Status;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -111,8 +111,8 @@ private CommandResponsePackets execute(final SQLRouteResult routeResult) throws
}

private boolean isUnsupportedXA(final SQLType sqlType) {
return TransactionType.XA == GlobalRegistry.getInstance().getTransactionType() && SQLType.DDL == sqlType
&& Status.STATUS_NO_TRANSACTION != XATransactionManagerSPILoader.getInstance().getTransactionManager().getStatus();
BackendConnection connection = executeEngine.getBackendConnection();
return TransactionType.XA == connection.getTransactionType() && SQLType.DDL == sqlType && ConnectionStatus.TRANSACTION == connection.getStatus();
}

private CommandResponsePackets merge(final SQLStatement sqlStatement) throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,21 @@

package io.shardingsphere.shardingproxy.backend.jdbc.connection;

import com.google.common.collect.LinkedHashMultimap;
import com.google.common.collect.Multimap;
import io.shardingsphere.core.constant.ConnectionMode;
import io.shardingsphere.core.constant.transaction.TransactionType;
import io.shardingsphere.core.routing.router.masterslave.MasterVisitedManager;
import io.shardingsphere.shardingproxy.runtime.schema.LogicSchema;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
Expand All @@ -39,26 +43,49 @@
* @author zhaojun
* @author zhangliang
*/
@NoArgsConstructor
@Slf4j
@Getter
public final class BackendConnection implements AutoCloseable {

@Getter
@Setter
private LogicSchema logicSchema;

private final Collection<Connection> cachedConnections = new CopyOnWriteArrayList<>();
private final Multimap<String, Connection> cachedConnections = LinkedHashMultimap.create();

private final Collection<Statement> cachedStatements = new CopyOnWriteArrayList<>();

private final Collection<ResultSet> cachedResultSets = new CopyOnWriteArrayList<>();

private final Collection<MethodInvocation> methodInvocations = new ArrayList<>();

@Setter
private ConnectionStatus status = ConnectionStatus.INIT;

private TransactionType transactionType;

public BackendConnection(final TransactionType transactionType) {
this.transactionType = transactionType;
}

/**
* Get connection size.
*
* @return connection size
* Change transaction type of current channel.
*
* @param transactionType transaction type
*/
public int getConnectionSize() {
return cachedConnections.size();
public void setTransactionType(final TransactionType transactionType) {
if (ConnectionStatus.TRANSACTION != status) {
this.transactionType = transactionType;
}
}

/**
* Change logic schema of current channel.
*
* @param logicSchema logic schema
*/
public void setLogicSchema(final LogicSchema logicSchema) {
if (ConnectionStatus.TRANSACTION != status) {
this.logicSchema = logicSchema;
}
}

/**
Expand All @@ -71,11 +98,50 @@ public int getConnectionSize() {
* @throws SQLException SQL exception
*/
public List<Connection> getConnections(final ConnectionMode connectionMode, final String dataSourceName, final int connectionSize) throws SQLException {
if (ConnectionStatus.INIT == status || ConnectionStatus.TERMINATED == status) {
status = ConnectionStatus.RUNNING;
}
Collection<Connection> connections;
synchronized (cachedConnections) {
connections = cachedConnections.get(dataSourceName);
}
List<Connection> result;
if (connections.size() >= connectionSize) {
result = new ArrayList<>(connections).subList(0, connectionSize);
} else if (!connections.isEmpty()) {
result = new ArrayList<>(connectionSize);
result.addAll(connections);
List<Connection> newConnections = createNewConnections(connectionMode, dataSourceName, connectionSize - connections.size());
result.addAll(newConnections);
synchronized (cachedConnections) {
cachedConnections.putAll(dataSourceName, newConnections);
}
} else {
result = createNewConnections(connectionMode, dataSourceName, connectionSize);
synchronized (cachedConnections) {
cachedConnections.putAll(dataSourceName, result);
}
}
return result;
}

private List<Connection> createNewConnections(final ConnectionMode connectionMode, final String dataSourceName, final int connectionSize) throws SQLException {
List<Connection> result = logicSchema.getBackendDataSource().getConnections(connectionMode, dataSourceName, connectionSize);
cachedConnections.addAll(result);
for (Connection each : result) {
replayMethodsInvocation(each);
}
return result;
}

/**
* Get connection size.
*
* @return connection size
*/
public int getConnectionSize() {
return cachedConnections.values().size();
}

/**
* Add statement.
*
Expand All @@ -98,21 +164,39 @@ public void add(final ResultSet resultSet) {
* Cancel statement.
*/
public void cancel() {
Collection<SQLException> exceptions = new LinkedList<>();
for (Statement each : cachedStatements) {
try {
each.cancel();
} catch (final SQLException ignored) {
} catch (final SQLException ex) {
exceptions.add(ex);
}
}
if (!exceptions.isEmpty()) {
log.warn("Failed to cancel statement due to {}", exceptions);
}
cachedStatements.clear();
}

@Override
public void close() throws SQLException {
close(false);
}

/**
* Close cached connection.
*
* @param forceClose force close flag
* @throws SQLException SQL exception
*/
public void close(final boolean forceClose) throws SQLException {
Collection<SQLException> exceptions = new LinkedList<>();
exceptions.addAll(closeResultSets());
exceptions.addAll(closeStatements());
exceptions.addAll(closeConnections());
MasterVisitedManager.clear();
exceptions.addAll(closeStatements());
exceptions.addAll(closeResultSets());
if (ConnectionStatus.TERMINATED == status || forceClose) {
exceptions.addAll(releaseConnections(forceClose));
}
throwSQLExceptionIfNecessary(exceptions);
}

Expand All @@ -125,6 +209,7 @@ private Collection<SQLException> closeResultSets() {
result.add(ex);
}
}
cachedResultSets.clear();
return result;
}

Expand All @@ -137,18 +222,24 @@ private Collection<SQLException> closeStatements() {
result.add(ex);
}
}
cachedStatements.clear();
return result;
}

private Collection<SQLException> closeConnections() {
Collection<SQLException> releaseConnections(final boolean forceRollback) {
Collection<SQLException> result = new LinkedList<>();
for (Connection each : cachedConnections) {
for (Connection each : cachedConnections.values()) {
try {
if (forceRollback && ConnectionStatus.TRANSACTION == status) {
each.rollback();
}
each.close();
} catch (SQLException ex) {
result.add(ex);
}
}
cachedConnections.clear();
methodInvocations.clear();
return result;
}

Expand All @@ -162,4 +253,10 @@ private void throwSQLExceptionIfNecessary(final Collection<SQLException> excepti
}
throw ex;
}

private void replayMethodsInvocation(final Object target) {
for (MethodInvocation each : methodInvocations) {
each.invoke(target);
}
}
}
Loading

0 comments on commit ec67a02

Please sign in to comment.