diff --git a/sharding-core/src/main/java/io/shardingsphere/core/constant/ShardingProperties.java b/sharding-core/src/main/java/io/shardingsphere/core/constant/properties/ShardingProperties.java similarity index 98% rename from sharding-core/src/main/java/io/shardingsphere/core/constant/ShardingProperties.java rename to sharding-core/src/main/java/io/shardingsphere/core/constant/properties/ShardingProperties.java index c56bbb8ca179f..d884d2352f979 100644 --- a/sharding-core/src/main/java/io/shardingsphere/core/constant/ShardingProperties.java +++ b/sharding-core/src/main/java/io/shardingsphere/core/constant/properties/ShardingProperties.java @@ -15,7 +15,7 @@ *

*/ -package io.shardingsphere.core.constant; +package io.shardingsphere.core.constant.properties; import com.google.common.base.Joiner; import com.google.common.base.Strings; diff --git a/sharding-core/src/main/java/io/shardingsphere/core/constant/ShardingPropertiesConstant.java b/sharding-core/src/main/java/io/shardingsphere/core/constant/properties/ShardingPropertiesConstant.java similarity index 95% rename from sharding-core/src/main/java/io/shardingsphere/core/constant/ShardingPropertiesConstant.java rename to sharding-core/src/main/java/io/shardingsphere/core/constant/properties/ShardingPropertiesConstant.java index 75c7534d2d43c..4ab5ffe23dcfd 100644 --- a/sharding-core/src/main/java/io/shardingsphere/core/constant/ShardingPropertiesConstant.java +++ b/sharding-core/src/main/java/io/shardingsphere/core/constant/properties/ShardingPropertiesConstant.java @@ -15,8 +15,10 @@ *

*/ -package io.shardingsphere.core.constant; +package io.shardingsphere.core.constant.properties; +import io.shardingsphere.core.constant.ConnectionMode; +import io.shardingsphere.core.constant.transaction.TransactionType; import lombok.Getter; import lombok.RequiredArgsConstructor; diff --git a/sharding-core/src/main/java/io/shardingsphere/core/constant/transaction/TransactionOperationType.java b/sharding-core/src/main/java/io/shardingsphere/core/constant/transaction/TransactionOperationType.java new file mode 100644 index 0000000000000..e528651fa0771 --- /dev/null +++ b/sharding-core/src/main/java/io/shardingsphere/core/constant/transaction/TransactionOperationType.java @@ -0,0 +1,52 @@ +/* + * Copyright 2016-2018 shardingsphere.io. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *

+ */ + +package io.shardingsphere.core.constant.transaction; + +import com.google.common.base.Optional; + +/** + * Transaction operation type. + * + * @author zhaojun + */ +public enum TransactionOperationType { + + BEGIN, COMMIT, ROLLBACK; + + /** + * Get operation type. + * + * @param sql SQL + * @return transaction operation type + */ + // TODO :hongjun move to TCLParser, need parse comment etc + public static Optional getOperationType(final String sql) { + switch (sql.toUpperCase()) { + case "BEGIN": + case "START TRANSACTION": + case "SET AUTOCOMMIT=0": + return Optional.of(TransactionOperationType.BEGIN); + case "COMMIT": + return Optional.of(TransactionOperationType.COMMIT); + case "ROLLBACK": + return Optional.of(TransactionOperationType.ROLLBACK); + default: + return Optional.absent(); + } + } +} diff --git a/sharding-core/src/main/java/io/shardingsphere/core/constant/TransactionType.java b/sharding-core/src/main/java/io/shardingsphere/core/constant/transaction/TransactionType.java similarity index 93% rename from sharding-core/src/main/java/io/shardingsphere/core/constant/TransactionType.java rename to sharding-core/src/main/java/io/shardingsphere/core/constant/transaction/TransactionType.java index d9ab457f10d17..b1183b699e9ed 100644 --- a/sharding-core/src/main/java/io/shardingsphere/core/constant/TransactionType.java +++ b/sharding-core/src/main/java/io/shardingsphere/core/constant/transaction/TransactionType.java @@ -15,7 +15,7 @@ *

*/ -package io.shardingsphere.core.constant; +package io.shardingsphere.core.constant.transaction; /** * Transaction type. diff --git a/sharding-core/src/test/java/io/shardingsphere/core/constant/ShardingPropertiesConstantTest.java b/sharding-core/src/test/java/io/shardingsphere/core/constant/ShardingPropertiesConstantTest.java index f4f6dcf2c13d7..a69db9b96eb38 100644 --- a/sharding-core/src/test/java/io/shardingsphere/core/constant/ShardingPropertiesConstantTest.java +++ b/sharding-core/src/test/java/io/shardingsphere/core/constant/ShardingPropertiesConstantTest.java @@ -17,6 +17,7 @@ package io.shardingsphere.core.constant; +import io.shardingsphere.core.constant.properties.ShardingPropertiesConstant; import org.junit.Test; import static org.hamcrest.CoreMatchers.is; diff --git a/sharding-core/src/test/java/io/shardingsphere/core/constant/ShardingPropertiesTest.java b/sharding-core/src/test/java/io/shardingsphere/core/constant/ShardingPropertiesTest.java index f831931b59957..564e5dd5bc74b 100644 --- a/sharding-core/src/test/java/io/shardingsphere/core/constant/ShardingPropertiesTest.java +++ b/sharding-core/src/test/java/io/shardingsphere/core/constant/ShardingPropertiesTest.java @@ -17,6 +17,8 @@ package io.shardingsphere.core.constant; +import io.shardingsphere.core.constant.properties.ShardingProperties; +import io.shardingsphere.core.constant.properties.ShardingPropertiesConstant; import org.junit.Before; import org.junit.Test; diff --git a/sharding-jdbc-orchestration-spring/sharding-jdbc-orchestration-spring-boot-starter/src/test/java/io/shardingsphere/jdbc/spring/boot/type/OrchestrationSpringBootShardingTest.java b/sharding-jdbc-orchestration-spring/sharding-jdbc-orchestration-spring-boot-starter/src/test/java/io/shardingsphere/jdbc/spring/boot/type/OrchestrationSpringBootShardingTest.java index 33ed1a3e2a683..7f6b9989162c7 100644 --- a/sharding-jdbc-orchestration-spring/sharding-jdbc-orchestration-spring-boot-starter/src/test/java/io/shardingsphere/jdbc/spring/boot/type/OrchestrationSpringBootShardingTest.java +++ b/sharding-jdbc-orchestration-spring/sharding-jdbc-orchestration-spring-boot-starter/src/test/java/io/shardingsphere/jdbc/spring/boot/type/OrchestrationSpringBootShardingTest.java @@ -18,8 +18,8 @@ package io.shardingsphere.jdbc.spring.boot.type; import io.shardingsphere.core.api.ConfigMapContext; -import io.shardingsphere.core.constant.ShardingProperties; -import io.shardingsphere.core.constant.ShardingPropertiesConstant; +import io.shardingsphere.core.constant.properties.ShardingProperties; +import io.shardingsphere.core.constant.properties.ShardingPropertiesConstant; import io.shardingsphere.core.jdbc.core.ShardingContext; import io.shardingsphere.core.jdbc.core.datasource.ShardingDataSource; import io.shardingsphere.jdbc.spring.boot.util.EmbedTestingServer; diff --git a/sharding-jdbc-orchestration-spring/sharding-jdbc-orchestration-spring-namespace/src/test/java/io/shardingsphere/jdbc/orchestration/spring/OrchestrationMasterSlaveNamespaceTest.java b/sharding-jdbc-orchestration-spring/sharding-jdbc-orchestration-spring-namespace/src/test/java/io/shardingsphere/jdbc/orchestration/spring/OrchestrationMasterSlaveNamespaceTest.java index 3f75292caaf97..584707c73f3c9 100644 --- a/sharding-jdbc-orchestration-spring/sharding-jdbc-orchestration-spring-namespace/src/test/java/io/shardingsphere/jdbc/orchestration/spring/OrchestrationMasterSlaveNamespaceTest.java +++ b/sharding-jdbc-orchestration-spring/sharding-jdbc-orchestration-spring-namespace/src/test/java/io/shardingsphere/jdbc/orchestration/spring/OrchestrationMasterSlaveNamespaceTest.java @@ -21,8 +21,8 @@ import io.shardingsphere.core.api.algorithm.masterslave.MasterSlaveLoadBalanceAlgorithm; import io.shardingsphere.core.api.algorithm.masterslave.RandomMasterSlaveLoadBalanceAlgorithm; import io.shardingsphere.core.api.algorithm.masterslave.RoundRobinMasterSlaveLoadBalanceAlgorithm; -import io.shardingsphere.core.constant.ShardingProperties; -import io.shardingsphere.core.constant.ShardingPropertiesConstant; +import io.shardingsphere.core.constant.properties.ShardingProperties; +import io.shardingsphere.core.constant.properties.ShardingPropertiesConstant; import io.shardingsphere.core.jdbc.core.datasource.MasterSlaveDataSource; import io.shardingsphere.core.rule.MasterSlaveRule; import io.shardingsphere.jdbc.orchestration.internal.OrchestrationMasterSlaveDataSource; diff --git a/sharding-jdbc-orchestration-spring/sharding-jdbc-orchestration-spring-namespace/src/test/java/io/shardingsphere/jdbc/orchestration/spring/OrchestrationShardingNamespaceTest.java b/sharding-jdbc-orchestration-spring/sharding-jdbc-orchestration-spring-namespace/src/test/java/io/shardingsphere/jdbc/orchestration/spring/OrchestrationShardingNamespaceTest.java index 4339a76ca3610..0425fe8891e8f 100644 --- a/sharding-jdbc-orchestration-spring/sharding-jdbc-orchestration-spring-namespace/src/test/java/io/shardingsphere/jdbc/orchestration/spring/OrchestrationShardingNamespaceTest.java +++ b/sharding-jdbc-orchestration-spring/sharding-jdbc-orchestration-spring-namespace/src/test/java/io/shardingsphere/jdbc/orchestration/spring/OrchestrationShardingNamespaceTest.java @@ -23,8 +23,8 @@ import io.shardingsphere.core.api.config.strategy.InlineShardingStrategyConfiguration; import io.shardingsphere.core.api.config.strategy.NoneShardingStrategyConfiguration; import io.shardingsphere.core.api.config.strategy.StandardShardingStrategyConfiguration; -import io.shardingsphere.core.constant.ShardingProperties; -import io.shardingsphere.core.constant.ShardingPropertiesConstant; +import io.shardingsphere.core.constant.properties.ShardingProperties; +import io.shardingsphere.core.constant.properties.ShardingPropertiesConstant; import io.shardingsphere.core.jdbc.core.datasource.ShardingDataSource; import io.shardingsphere.core.rule.BindingTableRule; import io.shardingsphere.core.rule.DataNode; diff --git a/sharding-jdbc-spring/sharding-jdbc-spring-boot-starter/src/test/java/io/shardingsphere/jdbc/spring/boot/type/SpringBootShardingTest.java b/sharding-jdbc-spring/sharding-jdbc-spring-boot-starter/src/test/java/io/shardingsphere/jdbc/spring/boot/type/SpringBootShardingTest.java index ad9ed895a1f9e..528d20255512d 100644 --- a/sharding-jdbc-spring/sharding-jdbc-spring-boot-starter/src/test/java/io/shardingsphere/jdbc/spring/boot/type/SpringBootShardingTest.java +++ b/sharding-jdbc-spring/sharding-jdbc-spring-boot-starter/src/test/java/io/shardingsphere/jdbc/spring/boot/type/SpringBootShardingTest.java @@ -18,8 +18,8 @@ package io.shardingsphere.jdbc.spring.boot.type; import io.shardingsphere.core.api.ConfigMapContext; -import io.shardingsphere.core.constant.ShardingProperties; -import io.shardingsphere.core.constant.ShardingPropertiesConstant; +import io.shardingsphere.core.constant.properties.ShardingProperties; +import io.shardingsphere.core.constant.properties.ShardingPropertiesConstant; import io.shardingsphere.core.jdbc.core.ShardingContext; import io.shardingsphere.core.jdbc.core.datasource.ShardingDataSource; import org.apache.commons.dbcp2.BasicDataSource; diff --git a/sharding-jdbc-spring/sharding-jdbc-spring-namespace/src/test/java/io/shardingsphere/jdbc/spring/ShardingNamespaceTest.java b/sharding-jdbc-spring/sharding-jdbc-spring-namespace/src/test/java/io/shardingsphere/jdbc/spring/ShardingNamespaceTest.java index 5f0c20489dab8..9512b87800758 100644 --- a/sharding-jdbc-spring/sharding-jdbc-spring-namespace/src/test/java/io/shardingsphere/jdbc/spring/ShardingNamespaceTest.java +++ b/sharding-jdbc-spring/sharding-jdbc-spring-namespace/src/test/java/io/shardingsphere/jdbc/spring/ShardingNamespaceTest.java @@ -23,8 +23,8 @@ import io.shardingsphere.core.api.config.strategy.InlineShardingStrategyConfiguration; import io.shardingsphere.core.api.config.strategy.NoneShardingStrategyConfiguration; import io.shardingsphere.core.api.config.strategy.StandardShardingStrategyConfiguration; -import io.shardingsphere.core.constant.ShardingProperties; -import io.shardingsphere.core.constant.ShardingPropertiesConstant; +import io.shardingsphere.core.constant.properties.ShardingProperties; +import io.shardingsphere.core.constant.properties.ShardingPropertiesConstant; import io.shardingsphere.core.jdbc.core.datasource.ShardingDataSource; import io.shardingsphere.core.rule.BindingTableRule; import io.shardingsphere.core.rule.DataNode; diff --git a/sharding-jdbc/src/main/java/io/shardingsphere/core/jdbc/adapter/AbstractConnectionAdapter.java b/sharding-jdbc/src/main/java/io/shardingsphere/core/jdbc/adapter/AbstractConnectionAdapter.java index ce2868a546288..56fef016b87d4 100644 --- a/sharding-jdbc/src/main/java/io/shardingsphere/core/jdbc/adapter/AbstractConnectionAdapter.java +++ b/sharding-jdbc/src/main/java/io/shardingsphere/core/jdbc/adapter/AbstractConnectionAdapter.java @@ -18,19 +18,15 @@ package io.shardingsphere.core.jdbc.adapter; import com.google.common.base.Preconditions; -import io.shardingsphere.core.constant.TCLType; -import io.shardingsphere.core.constant.TransactionType; +import io.shardingsphere.core.constant.transaction.TransactionOperationType; import io.shardingsphere.core.hint.HintManagerHolder; import io.shardingsphere.core.jdbc.unsupported.AbstractUnsupportedOperationConnection; import io.shardingsphere.core.routing.router.masterslave.MasterVisitedManager; import io.shardingsphere.core.util.EventBusInstance; -import io.shardingsphere.transaction.api.ShardingTransactionManagerFactory; -import io.shardingsphere.transaction.api.local.LocalTransactionManager; -import io.shardingsphere.transaction.common.TransactionContext; -import io.shardingsphere.transaction.common.TransactionContextHolder; -import io.shardingsphere.transaction.common.event.LocalTransactionEvent; -import io.shardingsphere.transaction.common.event.TransactionEvent; -import io.shardingsphere.transaction.common.event.TransactionEventFactory; +import io.shardingsphere.transaction.TransactionTypeHolder; +import io.shardingsphere.transaction.event.ShardingTransactionEvent; +import io.shardingsphere.transaction.event.local.LocalTransactionEvent; +import io.shardingsphere.transaction.event.xa.XATransactionEvent; import javax.sql.DataSource; import java.sql.Connection; @@ -67,8 +63,6 @@ public abstract class AbstractConnectionAdapter extends AbstractUnsupportedOpera * @throws SQLException SQL exception */ public final Connection getConnection(final String dataSourceName) throws SQLException { - TransactionType transactionType = TransactionContextHolder.get().getTransactionType(); - TransactionContextHolder.set(new TransactionContext(ShardingTransactionManagerFactory.getShardingTransactionManager(transactionType), transactionType)); if (cachedConnections.containsKey(dataSourceName)) { return cachedConnections.get(dataSourceName); } @@ -94,19 +88,30 @@ public final boolean getAutoCommit() { @Override public final void setAutoCommit(final boolean autoCommit) { this.autoCommit = autoCommit; - TransactionContextHolder.set(new TransactionContext(new LocalTransactionManager(), TransactionType.LOCAL)); recordMethodInvocation(Connection.class, "setAutoCommit", new Class[] {boolean.class}, new Object[] {autoCommit}); - EventBusInstance.getInstance().post(buildTransactionEvent(TCLType.BEGIN)); + EventBusInstance.getInstance().post(createTransactionEvent(TransactionOperationType.BEGIN)); } @Override public final void commit() { - EventBusInstance.getInstance().post(buildTransactionEvent(TCLType.COMMIT)); + EventBusInstance.getInstance().post(createTransactionEvent(TransactionOperationType.COMMIT)); } @Override public final void rollback() { - EventBusInstance.getInstance().post(buildTransactionEvent(TCLType.ROLLBACK)); + EventBusInstance.getInstance().post(createTransactionEvent(TransactionOperationType.ROLLBACK)); + } + + private ShardingTransactionEvent createTransactionEvent(final TransactionOperationType operationType) { + switch (TransactionTypeHolder.get()) { + case LOCAL: + return new LocalTransactionEvent(operationType, cachedConnections.values(), autoCommit); + case XA: + return new XATransactionEvent(operationType); + case BASE: + default: + throw new UnsupportedOperationException(TransactionTypeHolder.get().name()); + } } @Override @@ -114,7 +119,7 @@ public final void close() throws SQLException { closed = true; HintManagerHolder.clear(); MasterVisitedManager.clear(); - TransactionContextHolder.clear(); + TransactionTypeHolder.clear(); Collection exceptions = new LinkedList<>(); for (Connection each : cachedConnections.values()) { try { @@ -181,14 +186,4 @@ public final int getHoldability() { @Override public final void setHoldability(final int holdability) { } - - private TransactionEvent buildTransactionEvent(final TCLType tclType) { - TransactionEvent result = TransactionEventFactory.create(tclType); - if (result instanceof LocalTransactionEvent) { - LocalTransactionEvent localTransactionEvent = (LocalTransactionEvent) result; - localTransactionEvent.setCachedConnections(cachedConnections.values()); - localTransactionEvent.setAutoCommit(autoCommit); - } - return result; - } } diff --git a/sharding-jdbc/src/main/java/io/shardingsphere/core/jdbc/adapter/AbstractDataSourceAdapter.java b/sharding-jdbc/src/main/java/io/shardingsphere/core/jdbc/adapter/AbstractDataSourceAdapter.java index ce22916fa2fbc..c9c65f8b1f280 100644 --- a/sharding-jdbc/src/main/java/io/shardingsphere/core/jdbc/adapter/AbstractDataSourceAdapter.java +++ b/sharding-jdbc/src/main/java/io/shardingsphere/core/jdbc/adapter/AbstractDataSourceAdapter.java @@ -19,12 +19,8 @@ import com.google.common.base.Preconditions; import io.shardingsphere.core.constant.DatabaseType; -import io.shardingsphere.core.constant.TransactionType; import io.shardingsphere.core.jdbc.unsupported.AbstractUnsupportedOperationDataSource; import io.shardingsphere.core.listener.JDBCListenerRegister; -import io.shardingsphere.transaction.api.ShardingTransactionManagerFactory; -import io.shardingsphere.transaction.common.TransactionContext; -import io.shardingsphere.transaction.common.TransactionContextHolder; import lombok.Getter; import javax.sql.DataSource; @@ -42,8 +38,6 @@ public abstract class AbstractDataSourceAdapter extends AbstractUnsupportedOperationDataSource { static { - TransactionType transactionType = TransactionContextHolder.get().getTransactionType(); - TransactionContextHolder.set(new TransactionContext(ShardingTransactionManagerFactory.getShardingTransactionManager(transactionType), transactionType)); JDBCListenerRegister.register(); } diff --git a/sharding-jdbc/src/main/java/io/shardingsphere/core/jdbc/core/datasource/MasterSlaveDataSource.java b/sharding-jdbc/src/main/java/io/shardingsphere/core/jdbc/core/datasource/MasterSlaveDataSource.java index ea59f90238659..ee4d11a7dbbb9 100644 --- a/sharding-jdbc/src/main/java/io/shardingsphere/core/jdbc/core/datasource/MasterSlaveDataSource.java +++ b/sharding-jdbc/src/main/java/io/shardingsphere/core/jdbc/core/datasource/MasterSlaveDataSource.java @@ -19,8 +19,8 @@ import io.shardingsphere.core.api.ConfigMapContext; import io.shardingsphere.core.api.config.MasterSlaveRuleConfiguration; -import io.shardingsphere.core.constant.ShardingProperties; -import io.shardingsphere.core.constant.ShardingPropertiesConstant; +import io.shardingsphere.core.constant.properties.ShardingProperties; +import io.shardingsphere.core.constant.properties.ShardingPropertiesConstant; import io.shardingsphere.core.jdbc.adapter.AbstractDataSourceAdapter; import io.shardingsphere.core.jdbc.core.connection.MasterSlaveConnection; import io.shardingsphere.core.rule.MasterSlaveRule; diff --git a/sharding-jdbc/src/main/java/io/shardingsphere/core/jdbc/core/datasource/ShardingDataSource.java b/sharding-jdbc/src/main/java/io/shardingsphere/core/jdbc/core/datasource/ShardingDataSource.java index 180ecadf9943b..d663af60af4fb 100644 --- a/sharding-jdbc/src/main/java/io/shardingsphere/core/jdbc/core/datasource/ShardingDataSource.java +++ b/sharding-jdbc/src/main/java/io/shardingsphere/core/jdbc/core/datasource/ShardingDataSource.java @@ -21,8 +21,8 @@ import io.shardingsphere.core.api.config.MasterSlaveRuleConfiguration; import io.shardingsphere.core.api.config.ShardingRuleConfiguration; import io.shardingsphere.core.constant.ConnectionMode; -import io.shardingsphere.core.constant.ShardingProperties; -import io.shardingsphere.core.constant.ShardingPropertiesConstant; +import io.shardingsphere.core.constant.properties.ShardingProperties; +import io.shardingsphere.core.constant.properties.ShardingPropertiesConstant; import io.shardingsphere.core.exception.ShardingException; import io.shardingsphere.core.executor.ExecutorEngine; import io.shardingsphere.core.executor.type.connection.ConnectionStrictlyExecutorEngine; diff --git a/sharding-jdbc/src/main/java/io/shardingsphere/core/listener/JDBCListenerRegister.java b/sharding-jdbc/src/main/java/io/shardingsphere/core/listener/JDBCListenerRegister.java index 0b1de45a7e72d..1e94e44afc558 100644 --- a/sharding-jdbc/src/main/java/io/shardingsphere/core/listener/JDBCListenerRegister.java +++ b/sharding-jdbc/src/main/java/io/shardingsphere/core/listener/JDBCListenerRegister.java @@ -17,7 +17,8 @@ package io.shardingsphere.core.listener; -import io.shardingsphere.transaction.common.listener.TransactionListener; +import io.shardingsphere.transaction.listener.local.LocalTransactionListener; +import io.shardingsphere.transaction.listener.xa.XATransactionListener; import lombok.AccessLevel; import lombok.NoArgsConstructor; @@ -33,6 +34,7 @@ public final class JDBCListenerRegister { * Register all listeners. */ public static void register() { - new TransactionListener().register(); + new LocalTransactionListener().register(); + new XATransactionListener().register(); } } diff --git a/sharding-jdbc/src/test/java/io/shardingsphere/core/jdbc/core/datasource/ShardingDataSourceTest.java b/sharding-jdbc/src/test/java/io/shardingsphere/core/jdbc/core/datasource/ShardingDataSourceTest.java index 780b13c948a68..3392c04502664 100644 --- a/sharding-jdbc/src/test/java/io/shardingsphere/core/jdbc/core/datasource/ShardingDataSourceTest.java +++ b/sharding-jdbc/src/test/java/io/shardingsphere/core/jdbc/core/datasource/ShardingDataSourceTest.java @@ -22,7 +22,8 @@ import io.shardingsphere.core.api.config.MasterSlaveRuleConfiguration; import io.shardingsphere.core.api.config.ShardingRuleConfiguration; import io.shardingsphere.core.api.config.TableRuleConfiguration; -import io.shardingsphere.core.constant.ShardingPropertiesConstant; +import io.shardingsphere.core.constant.DatabaseType; +import io.shardingsphere.core.constant.properties.ShardingPropertiesConstant; import io.shardingsphere.core.executor.ExecutorEngine; import io.shardingsphere.core.rule.ShardingRule; import org.junit.Test; @@ -108,7 +109,7 @@ public void assertGetDatabaseProductNameForMasterSlave() throws SQLException { private void assertDatabaseProductName(final Map dataSourceMap, final Connection... connections) throws SQLException { try { - createShardingDataSource(dataSourceMap).getDatabaseType(); + assertThat(createShardingDataSource(dataSourceMap).getDatabaseType(), is(DatabaseType.H2)); } finally { for (Connection each : connections) { verify(each, atLeast(1)).close(); diff --git a/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/JDBCBackendHandler.java b/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/JDBCBackendHandler.java index 48620f1a0c14b..ef7661a889033 100644 --- a/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/JDBCBackendHandler.java +++ b/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/JDBCBackendHandler.java @@ -19,7 +19,7 @@ import io.shardingsphere.core.constant.DatabaseType; import io.shardingsphere.core.constant.SQLType; -import io.shardingsphere.core.constant.TransactionType; +import io.shardingsphere.core.constant.transaction.TransactionType; import io.shardingsphere.core.merger.MergeEngineFactory; import io.shardingsphere.core.merger.MergedResult; import io.shardingsphere.core.metadata.table.executor.TableMetaDataLoader; @@ -44,6 +44,7 @@ import io.shardingsphere.proxy.transport.mysql.packet.generic.EofPacket; import io.shardingsphere.proxy.transport.mysql.packet.generic.ErrPacket; import io.shardingsphere.proxy.transport.mysql.packet.generic.OKPacket; +import io.shardingsphere.transaction.manager.ShardingTransactionManagerRegistry; import lombok.RequiredArgsConstructor; import javax.transaction.Status; @@ -100,7 +101,8 @@ private CommandResponsePackets execute(final SQLRouteResult routeResult) throws } private boolean isUnsupportedXA(final SQLType sqlType) throws SQLException { - return TransactionType.XA == RULE_REGISTRY.getTransactionType() && SQLType.DDL == sqlType && Status.STATUS_NO_TRANSACTION != RULE_REGISTRY.getTransactionManager().getStatus(); + return TransactionType.XA == RULE_REGISTRY.getTransactionType() && SQLType.DDL == sqlType + && Status.STATUS_NO_TRANSACTION != ShardingTransactionManagerRegistry.getInstance().getShardingTransactionManager(TransactionType.XA).getStatus(); } private CommandResponsePackets merge(final SQLStatement sqlStatement) throws SQLException { diff --git a/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/datasource/JDBCBackendDataSource.java b/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/datasource/JDBCBackendDataSource.java index 5a3dd6e71c786..08b2f811a6d3a 100644 --- a/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/datasource/JDBCBackendDataSource.java +++ b/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/datasource/JDBCBackendDataSource.java @@ -17,7 +17,8 @@ package io.shardingsphere.proxy.backend.jdbc.datasource; -import io.shardingsphere.core.constant.TransactionType; +import io.shardingsphere.core.constant.transaction.TransactionType; +import io.shardingsphere.core.exception.ShardingException; import io.shardingsphere.core.rule.DataSourceParameter; import io.shardingsphere.proxy.backend.BackendDataSource; import lombok.Getter; @@ -47,7 +48,13 @@ public JDBCBackendDataSource(final TransactionType transactionType, final Map createDataSourceMap(final TransactionType transactionType, final Map dataSourceParameters) { Map result = new LinkedHashMap<>(dataSourceParameters.size()); for (Entry entry : dataSourceParameters.entrySet()) { - result.put(entry.getKey(), getBackendDataSourceFactory(transactionType).build(entry.getKey(), entry.getValue())); + try { + result.put(entry.getKey(), getBackendDataSourceFactory(transactionType).build(entry.getKey(), entry.getValue())); + // CHECKSTYLE:OFF + } catch (final Exception ex) { + // CHECKSTYLE:ON + throw new ShardingException(String.format("Can not build data source, name is `%s`.", entry.getKey()), ex); + } } return result; } @@ -61,16 +68,6 @@ private JDBCBackendDataSourceFactory getBackendDataSourceFactory(final Transacti } } - /** - * Get data source. - * - * @param dataSourceName data source name - * @return data source - */ - public DataSource getDataSource(final String dataSourceName) { - return dataSourceMap.get(dataSourceName); - } - /** * Get connection. * @@ -79,6 +76,6 @@ public DataSource getDataSource(final String dataSourceName) { * @throws SQLException SQL exception */ public Connection getConnection(final String dataSourceName) throws SQLException { - return getDataSource(dataSourceName).getConnection(); + return dataSourceMap.get(dataSourceName).getConnection(); } } diff --git a/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/datasource/JDBCBackendDataSourceFactory.java b/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/datasource/JDBCBackendDataSourceFactory.java index 9429ad1d2e730..21420848204a8 100644 --- a/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/datasource/JDBCBackendDataSourceFactory.java +++ b/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/datasource/JDBCBackendDataSourceFactory.java @@ -34,6 +34,7 @@ public interface JDBCBackendDataSourceFactory { * @param dataSourceName data source name * @param dataSourceParameter data source connection parameter * @return data source for connect backend databases + * @throws Exception when the data source can not be build */ - DataSource build(String dataSourceName, DataSourceParameter dataSourceParameter); + DataSource build(String dataSourceName, DataSourceParameter dataSourceParameter) throws Exception; } diff --git a/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/datasource/JDBCRawBackendDataSourceFactory.java b/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/datasource/JDBCRawBackendDataSourceFactory.java index f7d081a24438a..a77a1b184d165 100644 --- a/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/datasource/JDBCRawBackendDataSourceFactory.java +++ b/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/datasource/JDBCRawBackendDataSourceFactory.java @@ -31,10 +31,12 @@ */ public final class JDBCRawBackendDataSourceFactory implements JDBCBackendDataSourceFactory { + private static final String DRIVER_CLASS_NAME = "com.mysql.jdbc.Driver"; + @Override public DataSource build(final String dataSourceName, final DataSourceParameter dataSourceParameter) { HikariConfig config = new HikariConfig(); - config.setDriverClassName("com.mysql.jdbc.Driver"); + config.setDriverClassName(DRIVER_CLASS_NAME); config.setJdbcUrl(dataSourceParameter.getUrl()); config.setUsername(dataSourceParameter.getUsername()); config.setPassword(dataSourceParameter.getPassword()); @@ -43,16 +45,16 @@ public DataSource build(final String dataSourceName, final DataSourceParameter d config.setIdleTimeout(dataSourceParameter.getIdleTimeout()); config.setMaxLifetime(dataSourceParameter.getMaxLifetime()); config.setMaximumPoolSize(dataSourceParameter.getMaximumPoolSize()); - config.addDataSourceProperty("useServerPrepStmts", "true"); + config.addDataSourceProperty("useServerPrepStmts", Boolean.TRUE.toString()); config.addDataSourceProperty("cachePrepStmts", "true"); config.addDataSourceProperty("prepStmtCacheSize", 250); config.addDataSourceProperty("prepStmtCacheSqlLimit", 2048); - config.addDataSourceProperty("useLocalSessionState", "true"); - config.addDataSourceProperty("rewriteBatchedStatements", "true"); - config.addDataSourceProperty("cacheResultSetMetadata", "true"); - config.addDataSourceProperty("cacheServerConfiguration", "true"); - config.addDataSourceProperty("elideSetAutoCommits", "true"); - config.addDataSourceProperty("maintainTimeStats", "false"); + config.addDataSourceProperty("useLocalSessionState", Boolean.TRUE.toString()); + config.addDataSourceProperty("rewriteBatchedStatements", Boolean.TRUE.toString()); + config.addDataSourceProperty("cacheResultSetMetadata", Boolean.TRUE.toString()); + config.addDataSourceProperty("cacheServerConfiguration", Boolean.TRUE.toString()); + config.addDataSourceProperty("elideSetAutoCommits", Boolean.TRUE.toString()); + config.addDataSourceProperty("maintainTimeStats", Boolean.FALSE.toString()); config.addDataSourceProperty("netTimeoutForStreamingResults", 0); return new HikariDataSource(config); } diff --git a/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/datasource/JDBCXABackendDataSourceFactory.java b/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/datasource/JDBCXABackendDataSourceFactory.java index df8d3fb75c6ab..f8cb01a63e710 100644 --- a/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/datasource/JDBCXABackendDataSourceFactory.java +++ b/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/datasource/JDBCXABackendDataSourceFactory.java @@ -17,12 +17,13 @@ package io.shardingsphere.proxy.backend.jdbc.datasource; -import com.atomikos.jdbc.AtomikosDataSourceBean; -import com.google.common.base.Optional; +import io.shardingsphere.core.constant.transaction.TransactionType; import io.shardingsphere.core.rule.DataSourceParameter; +import io.shardingsphere.transaction.manager.ShardingTransactionManagerRegistry; +import io.shardingsphere.transaction.manager.xa.XATransactionManager; import javax.sql.DataSource; -import java.util.Properties; +import javax.sql.XADataSource; /** * Backend data source factory using {@code AtomikosDataSourceBean} for JDBC and XA protocol. @@ -32,35 +33,12 @@ */ public final class JDBCXABackendDataSourceFactory implements JDBCBackendDataSourceFactory { - @Override - public DataSource build(final String dataSourceName, final DataSourceParameter dataSourceParameter) { - AtomikosDataSourceBean result = new AtomikosDataSourceBean(); - result.setUniqueResourceName(dataSourceName); - result.setXaDataSourceClassName("com.mysql.jdbc.jdbc2.optional.MysqlXADataSource"); - result.setMaxPoolSize(dataSourceParameter.getMaximumPoolSize()); - result.setTestQuery("SELECT 1"); - result.setXaProperties(getProperties(dataSourceParameter)); - return result; - } + private static final String XA_DRIVER_CLASS_NAME = "com.mysql.jdbc.jdbc2.optional.MysqlXADataSource"; - private Properties getProperties(final DataSourceParameter dataSourceParameter) { - Properties result = new Properties(); - result.setProperty("user", dataSourceParameter.getUsername()); - result.setProperty("password", Optional.fromNullable(dataSourceParameter.getPassword()).or("")); - result.setProperty("URL", dataSourceParameter.getUrl()); - result.setProperty("pinGlobalTxToPhysicalConnection", Boolean.TRUE.toString()); - result.setProperty("autoReconnect", Boolean.TRUE.toString()); - result.setProperty("useServerPrepStmts", Boolean.TRUE.toString()); - result.setProperty("cachePrepStmts", Boolean.TRUE.toString()); - result.setProperty("prepStmtCacheSize", "250"); - result.setProperty("prepStmtCacheSqlLimit", "2048"); - result.setProperty("useLocalSessionState", Boolean.TRUE.toString()); - result.setProperty("rewriteBatchedStatements", Boolean.TRUE.toString()); - result.setProperty("cacheResultSetMetadata", Boolean.TRUE.toString()); - result.setProperty("cacheServerConfiguration", Boolean.TRUE.toString()); - result.setProperty("elideSetAutoCommits", Boolean.TRUE.toString()); - result.setProperty("maintainTimeStats", Boolean.FALSE.toString()); - result.setProperty("netTimeoutForStreamingResults", "0"); - return result; + @Override + public DataSource build(final String dataSourceName, final DataSourceParameter dataSourceParameter) throws Exception { + XADataSource dataSource = (XADataSource) Class.forName(XA_DRIVER_CLASS_NAME).newInstance(); + XATransactionManager xaTransactionManager = (XATransactionManager) ShardingTransactionManagerRegistry.getInstance().getShardingTransactionManager(TransactionType.XA); + return xaTransactionManager.wrapDataSource(dataSource, dataSourceName, dataSourceParameter); } } diff --git a/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/execute/memory/ConnectionStrictlyExecuteEngine.java b/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/execute/memory/ConnectionStrictlyExecuteEngine.java index ce7f4d107cdb9..4ed7cf431b2bc 100644 --- a/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/execute/memory/ConnectionStrictlyExecuteEngine.java +++ b/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/execute/memory/ConnectionStrictlyExecuteEngine.java @@ -17,7 +17,7 @@ package io.shardingsphere.proxy.backend.jdbc.execute.memory; -import io.shardingsphere.core.constant.TransactionType; +import io.shardingsphere.core.constant.transaction.TransactionType; import io.shardingsphere.core.exception.ShardingException; import io.shardingsphere.core.merger.QueryResult; import io.shardingsphere.core.routing.SQLRouteResult; diff --git a/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/execute/stream/MemoryStrictlyExecuteEngine.java b/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/execute/stream/MemoryStrictlyExecuteEngine.java index 6d4715b4cf876..a56eea725876b 100644 --- a/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/execute/stream/MemoryStrictlyExecuteEngine.java +++ b/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/execute/stream/MemoryStrictlyExecuteEngine.java @@ -18,7 +18,7 @@ package io.shardingsphere.proxy.backend.jdbc.execute.stream; import com.google.common.collect.Lists; -import io.shardingsphere.core.constant.TransactionType; +import io.shardingsphere.core.constant.transaction.TransactionType; import io.shardingsphere.core.exception.ShardingException; import io.shardingsphere.core.merger.QueryResult; import io.shardingsphere.core.routing.SQLExecutionUnit; diff --git a/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/transaction/DefaultTransactionEngine.java b/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/transaction/DefaultTransactionEngine.java index fd0c8b83f21a5..49a96a0938d2a 100644 --- a/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/transaction/DefaultTransactionEngine.java +++ b/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/transaction/DefaultTransactionEngine.java @@ -22,14 +22,10 @@ * * @author zhaojun */ -public final class DefaultTransactionEngine extends TransactionEngine { - - public DefaultTransactionEngine(final String sql) { - super(sql); - } +public final class DefaultTransactionEngine implements TransactionEngine { @Override public boolean execute() { - return parseSQL().isPresent(); + return false; } } diff --git a/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/transaction/TransactionEngine.java b/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/transaction/TransactionEngine.java index cef4ede6b7f51..165d49061f9e6 100644 --- a/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/transaction/TransactionEngine.java +++ b/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/transaction/TransactionEngine.java @@ -17,41 +17,14 @@ package io.shardingsphere.proxy.backend.jdbc.transaction; -import com.google.common.base.Optional; -import io.shardingsphere.core.constant.TCLType; -import lombok.Getter; -import lombok.RequiredArgsConstructor; -import lombok.Setter; - import java.sql.SQLException; /** - * Abstract transaction engine. + * Transaction engine. * * @author zhaojun */ -@RequiredArgsConstructor -@Getter -@Setter -public abstract class TransactionEngine { - - private final String sql; - - // TODO :yonglun move to TCLParser - protected final Optional parseSQL() { - switch (sql.toUpperCase()) { - case "BEGIN": - case "START TRANSACTION": - case "SET AUTOCOMMIT=0": - return Optional.of(TCLType.BEGIN); - case "COMMIT": - return Optional.of(TCLType.COMMIT); - case "ROLLBACK": - return Optional.of(TCLType.ROLLBACK); - default: - return Optional.absent(); - } - } +public interface TransactionEngine { /** * Execute transaction with binding transaction manager. @@ -59,5 +32,5 @@ protected final Optional parseSQL() { * @return skip or not skip access backend databases * @throws SQLException SQL exception */ - public abstract boolean execute() throws SQLException; + boolean execute() throws SQLException; } diff --git a/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/transaction/TransactionEngineFactory.java b/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/transaction/TransactionEngineFactory.java index f36eda9b472ee..9a6a515e10e98 100644 --- a/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/transaction/TransactionEngineFactory.java +++ b/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/transaction/TransactionEngineFactory.java @@ -22,27 +22,25 @@ import lombok.NoArgsConstructor; /** - * Create transaction engine based on current transaction type. + * Transaction engine factory. * * @author zhaojun */ @NoArgsConstructor(access = AccessLevel.PRIVATE) public final class TransactionEngineFactory { - private static final RuleRegistry RULE_REGISTRY = RuleRegistry.getInstance(); - /** - * Create transaction engine from SQL. + * Create new instance of transaction engine. * * @param sql SQL - * @return transaction engine + * @return new instance of transaction engine */ - public static TransactionEngine create(final String sql) { - switch (RULE_REGISTRY.getTransactionType()) { + public static TransactionEngine newInstance(final String sql) { + switch (RuleRegistry.getInstance().getTransactionType()) { case XA: - return new XaTransactionEngine(sql); + return new XATransactionEngine(sql); default: - return new DefaultTransactionEngine(sql); + return new DefaultTransactionEngine(); } } } diff --git a/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/transaction/XATransactionEngine.java b/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/transaction/XATransactionEngine.java new file mode 100644 index 0000000000000..4b1d2f32b33ad --- /dev/null +++ b/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/transaction/XATransactionEngine.java @@ -0,0 +1,57 @@ +/* + * Copyright 2016-2018 shardingsphere.io. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *

+ */ + +package io.shardingsphere.proxy.backend.jdbc.transaction; + +import com.google.common.base.Optional; +import io.shardingsphere.core.constant.transaction.TransactionOperationType; +import io.shardingsphere.core.constant.transaction.TransactionType; +import io.shardingsphere.core.util.EventBusInstance; +import io.shardingsphere.transaction.manager.ShardingTransactionManagerRegistry; +import io.shardingsphere.transaction.TransactionTypeHolder; +import io.shardingsphere.transaction.event.xa.XATransactionEvent; +import lombok.RequiredArgsConstructor; + +import javax.transaction.Status; +import java.sql.SQLException; + +/** + * Execute XA transaction intercept. + * + * @author zhaojun + */ +@RequiredArgsConstructor +public final class XATransactionEngine implements TransactionEngine { + + private final String sql; + + @Override + public boolean execute() throws SQLException { + Optional operationType = TransactionOperationType.getOperationType(sql); + if (operationType.isPresent() && isInTransaction(operationType.get())) { + TransactionTypeHolder.set(TransactionType.XA); + EventBusInstance.getInstance().post(new XATransactionEvent(operationType.get())); + return true; + } + return false; + } + + private boolean isInTransaction(final TransactionOperationType operationType) throws SQLException { + return TransactionOperationType.ROLLBACK != operationType + || Status.STATUS_NO_TRANSACTION != ShardingTransactionManagerRegistry.getInstance().getShardingTransactionManager(TransactionType.XA).getStatus(); + } +} diff --git a/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/transaction/XaTransactionEngine.java b/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/transaction/XaTransactionEngine.java deleted file mode 100644 index 000ec35bfc947..0000000000000 --- a/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/transaction/XaTransactionEngine.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Copyright 2016-2018 shardingsphere.io. - *

- * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - *

- */ - -package io.shardingsphere.proxy.backend.jdbc.transaction; - -import com.google.common.base.Optional; -import io.shardingsphere.core.constant.TCLType; -import io.shardingsphere.core.constant.TransactionType; -import io.shardingsphere.core.util.EventBusInstance; -import io.shardingsphere.proxy.config.RuleRegistry; -import io.shardingsphere.transaction.common.TransactionContext; -import io.shardingsphere.transaction.common.TransactionContextHolder; -import io.shardingsphere.transaction.common.event.XaTransactionEvent; - -import javax.transaction.Status; -import java.sql.SQLException; - -/** - * Execute XA transaction intercept. - * - * @author zhaojun - */ -public final class XaTransactionEngine extends TransactionEngine { - - private static final RuleRegistry RULE_REGISTRY = RuleRegistry.getInstance(); - - public XaTransactionEngine(final String sql) { - super(sql); - } - - @Override - public boolean execute() throws SQLException { - Optional tclType = parseSQL(); - if (tclType.isPresent() && isInTransaction(tclType.get())) { - TransactionContextHolder.set(new TransactionContext(RULE_REGISTRY.getTransactionManager(), TransactionType.XA)); - EventBusInstance.getInstance().post(new XaTransactionEvent(tclType.get(), getSql())); - return true; - } - return false; - } - - private boolean isInTransaction(final TCLType tclType) throws SQLException { - return TCLType.ROLLBACK != tclType || Status.STATUS_NO_TRANSACTION != RULE_REGISTRY.getTransactionManager().getStatus(); - } -} diff --git a/sharding-proxy/src/main/java/io/shardingsphere/proxy/config/ProxyTableMetaDataConnectionManager.java b/sharding-proxy/src/main/java/io/shardingsphere/proxy/config/ProxyTableMetaDataConnectionManager.java index b38cf1e4182a8..1136e162e8df0 100644 --- a/sharding-proxy/src/main/java/io/shardingsphere/proxy/config/ProxyTableMetaDataConnectionManager.java +++ b/sharding-proxy/src/main/java/io/shardingsphere/proxy/config/ProxyTableMetaDataConnectionManager.java @@ -36,6 +36,6 @@ public final class ProxyTableMetaDataConnectionManager implements TableMetaDataC @Override public Connection getConnection(final String dataSourceName) throws SQLException { - return backendDataSource.getDataSource(dataSourceName).getConnection(); + return backendDataSource.getConnection(dataSourceName); } } diff --git a/sharding-proxy/src/main/java/io/shardingsphere/proxy/config/RuleRegistry.java b/sharding-proxy/src/main/java/io/shardingsphere/proxy/config/RuleRegistry.java index 615c5bb79a184..e7ab2e0cbe588 100644 --- a/sharding-proxy/src/main/java/io/shardingsphere/proxy/config/RuleRegistry.java +++ b/sharding-proxy/src/main/java/io/shardingsphere/proxy/config/RuleRegistry.java @@ -22,9 +22,9 @@ import io.shardingsphere.core.api.config.ShardingRuleConfiguration; import io.shardingsphere.core.constant.ConnectionMode; import io.shardingsphere.core.constant.DatabaseType; -import io.shardingsphere.core.constant.ShardingProperties; -import io.shardingsphere.core.constant.ShardingPropertiesConstant; -import io.shardingsphere.core.constant.TransactionType; +import io.shardingsphere.core.constant.properties.ShardingProperties; +import io.shardingsphere.core.constant.properties.ShardingPropertiesConstant; +import io.shardingsphere.core.constant.transaction.TransactionType; import io.shardingsphere.core.metadata.ShardingMetaData; import io.shardingsphere.core.rule.DataSourceParameter; import io.shardingsphere.core.rule.MasterSlaveRule; @@ -33,10 +33,7 @@ import io.shardingsphere.jdbc.orchestration.internal.OrchestrationProxyConfiguration; import io.shardingsphere.jdbc.orchestration.internal.eventbus.ProxyEventBusEvent; import io.shardingsphere.proxy.backend.jdbc.datasource.JDBCBackendDataSource; -import io.shardingsphere.transaction.api.ShardingTransactionManager; -import io.shardingsphere.transaction.api.ShardingTransactionManagerFactory; -import io.shardingsphere.transaction.common.TransactionContext; -import io.shardingsphere.transaction.common.TransactionContextHolder; +import io.shardingsphere.transaction.TransactionTypeHolder; import lombok.AccessLevel; import lombok.Getter; import lombok.NoArgsConstructor; @@ -82,8 +79,6 @@ public final class RuleRegistry { private TransactionType transactionType; - private ShardingTransactionManager transactionManager; - private ProxyAuthority proxyAuthority; private ShardingMetaData metaData; @@ -108,8 +103,7 @@ public synchronized void init(final OrchestrationProxyConfiguration config) { showSQL = shardingProperties.getValue(ShardingPropertiesConstant.SQL_SHOW); connectionMode = ConnectionMode.valueOf(shardingProperties.getValue(ShardingPropertiesConstant.CONNECTION_MODE)); transactionType = TransactionType.valueOf(shardingProperties.getValue(ShardingPropertiesConstant.PROXY_TRANSACTION_MODE)); - transactionManager = ShardingTransactionManagerFactory.getShardingTransactionManager(transactionType); - TransactionContextHolder.set(new TransactionContext(transactionManager, transactionType)); + TransactionTypeHolder.set(transactionType); acceptorSize = shardingProperties.getValue(ShardingPropertiesConstant.ACCEPTOR_SIZE); executorSize = shardingProperties.getValue(ShardingPropertiesConstant.EXECUTOR_SIZE); // TODO :jiaqi force off use NIO for backend, this feature is not complete yet diff --git a/sharding-proxy/src/main/java/io/shardingsphere/proxy/frontend/common/executor/ExecutorGroup.java b/sharding-proxy/src/main/java/io/shardingsphere/proxy/frontend/common/executor/ExecutorGroup.java index 943092ed76432..1694ea6dc0b55 100644 --- a/sharding-proxy/src/main/java/io/shardingsphere/proxy/frontend/common/executor/ExecutorGroup.java +++ b/sharding-proxy/src/main/java/io/shardingsphere/proxy/frontend/common/executor/ExecutorGroup.java @@ -19,7 +19,7 @@ import io.netty.channel.ChannelId; import io.netty.channel.EventLoopGroup; -import io.shardingsphere.core.constant.TransactionType; +import io.shardingsphere.core.constant.transaction.TransactionType; import io.shardingsphere.proxy.config.RuleRegistry; import lombok.RequiredArgsConstructor; diff --git a/sharding-proxy/src/main/java/io/shardingsphere/proxy/listener/ProxyListenerRegister.java b/sharding-proxy/src/main/java/io/shardingsphere/proxy/listener/ProxyListenerRegister.java index a48cc9cbbee58..29d7446ea7742 100644 --- a/sharding-proxy/src/main/java/io/shardingsphere/proxy/listener/ProxyListenerRegister.java +++ b/sharding-proxy/src/main/java/io/shardingsphere/proxy/listener/ProxyListenerRegister.java @@ -17,7 +17,8 @@ package io.shardingsphere.proxy.listener; -import io.shardingsphere.transaction.common.listener.TransactionListener; +import io.shardingsphere.transaction.listener.local.LocalTransactionListener; +import io.shardingsphere.transaction.listener.xa.XATransactionListener; import lombok.AccessLevel; import lombok.NoArgsConstructor; @@ -33,6 +34,7 @@ public final class ProxyListenerRegister { * Register all listeners. */ public static void register() { - new TransactionListener().register(); + new LocalTransactionListener().register(); + new XATransactionListener().register(); } } diff --git a/sharding-proxy/src/main/java/io/shardingsphere/proxy/transport/mysql/packet/command/query/text/query/ComQueryPacket.java b/sharding-proxy/src/main/java/io/shardingsphere/proxy/transport/mysql/packet/command/query/text/query/ComQueryPacket.java index 9e53d2072283a..6e8a28dd120c4 100644 --- a/sharding-proxy/src/main/java/io/shardingsphere/proxy/transport/mysql/packet/command/query/text/query/ComQueryPacket.java +++ b/sharding-proxy/src/main/java/io/shardingsphere/proxy/transport/mysql/packet/command/query/text/query/ComQueryPacket.java @@ -62,7 +62,7 @@ public ComQueryPacket(final int sequenceId, final int connectionId, final MySQLP this.sequenceId = sequenceId; sql = payload.readStringEOF(); backendHandler = BackendHandlerFactory.newTextProtocolInstance(connectionId, sequenceId, sql, backendConnection, DatabaseType.MySQL); - transactionEngine = TransactionEngineFactory.create(sql); + transactionEngine = TransactionEngineFactory.newInstance(sql); } public ComQueryPacket(final int sequenceId, final String sql) { diff --git a/sharding-proxy/src/main/resources/META-INF/services/io.shardingsphere.transaction.api.xa.XATransactionManager b/sharding-proxy/src/main/resources/META-INF/services/io.shardingsphere.transaction.api.xa.XATransactionManager deleted file mode 100644 index dd784f3e58903..0000000000000 --- a/sharding-proxy/src/main/resources/META-INF/services/io.shardingsphere.transaction.api.xa.XATransactionManager +++ /dev/null @@ -1 +0,0 @@ -io.shardingsphere.transaction.api.xa.AtomikosTransactionManager diff --git a/sharding-proxy/src/main/resources/conf/config.yaml b/sharding-proxy/src/main/resources/conf/config.yaml index de0a79397de87..a7f432520eb84 100644 --- a/sharding-proxy/src/main/resources/conf/config.yaml +++ b/sharding-proxy/src/main/resources/conf/config.yaml @@ -57,12 +57,13 @@ # props: # # MEMORY_STRICTLY: Proxy holds as many connections as the count of actual tables routed in a database. # # The benefit of this approach is saving memory for Proxy by Stream ResultSet. - +# # # CONNECTION_STRICTLY: Proxy will release connections after get the overall rows from the ResultSet. # # Meanwhile, the cost of the memory will be increased. # connection.mode: MEMORY_STRICTLY # acceptor.size: 16 # The default value is available processors count * 2. # executor.size: 16 # Infinite by default. +# proxy.transaction.mode: XA # sql.show: false # #orchestration: @@ -75,4 +76,4 @@ # #proxyAuthority: # username: root -# password: root \ No newline at end of file +# password: root diff --git a/sharding-proxy/src/main/resources/conf/jta.properties b/sharding-proxy/src/main/resources/conf/jta.properties index 826de48a7843d..37591b05f25f8 100644 --- a/sharding-proxy/src/main/resources/conf/jta.properties +++ b/sharding-proxy/src/main/resources/conf/jta.properties @@ -1,20 +1,3 @@ -# -# Copyright 2016-2018 shardingsphere.io. -#

-# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -#

-# - com.atomikos.icatch.serial_jta_transactions=false com.atomikos.icatch.default_jta_timeout = 1000000 com.atomikos.icatch.max_actives = 10000 diff --git a/sharding-transaction/src/main/java/io/shardingsphere/transaction/common/TransactionContextHolder.java b/sharding-transaction/src/main/java/io/shardingsphere/transaction/TransactionTypeHolder.java similarity index 53% rename from sharding-transaction/src/main/java/io/shardingsphere/transaction/common/TransactionContextHolder.java rename to sharding-transaction/src/main/java/io/shardingsphere/transaction/TransactionTypeHolder.java index d14285f1f4400..8f2022a560aa0 100644 --- a/sharding-transaction/src/main/java/io/shardingsphere/transaction/common/TransactionContextHolder.java +++ b/sharding-transaction/src/main/java/io/shardingsphere/transaction/TransactionTypeHolder.java @@ -15,43 +15,46 @@ *

*/ -package io.shardingsphere.transaction.common; +package io.shardingsphere.transaction; + +import io.shardingsphere.core.constant.transaction.TransactionType; /** - * Hold transaction context for current thread. + * Hold transaction type for current thread. * * @author zhaojun + * @author zhangliang */ -public final class TransactionContextHolder { +public final class TransactionTypeHolder { - private static final ThreadLocal CONTEXT = new ThreadLocal() { + private static final ThreadLocal CONTEXT = new ThreadLocal() { @Override - protected TransactionContext initialValue() { - return new TransactionContext(); + protected TransactionType initialValue() { + return TransactionType.LOCAL; } }; /** - * Get transaction context for current thread. + * Get transaction type for current thread. * - * @return TransactionContext + * @return transaction type */ - public static TransactionContext get() { + public static TransactionType get() { return CONTEXT.get(); } /** - * Set transaction context for current thread. + * Set transaction type for current thread. * - * @param context Transaction context + * @param transactionType transaction type */ - public static void set(final TransactionContext context) { - CONTEXT.set(context); + public static void set(final TransactionType transactionType) { + CONTEXT.set(transactionType); } /** - * Clear transaction context for current thread. + * Clear transaction type for current thread. */ public static void clear() { CONTEXT.remove(); diff --git a/sharding-transaction/src/main/java/io/shardingsphere/transaction/api/ShardingTransactionManagerFactory.java b/sharding-transaction/src/main/java/io/shardingsphere/transaction/api/ShardingTransactionManagerFactory.java deleted file mode 100644 index a6ffd2444bd5a..0000000000000 --- a/sharding-transaction/src/main/java/io/shardingsphere/transaction/api/ShardingTransactionManagerFactory.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright 2016-2018 shardingsphere.io. - *

- * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - *

- */ - -package io.shardingsphere.transaction.api; - -import io.shardingsphere.core.constant.TransactionType; -import io.shardingsphere.transaction.api.local.LocalTransactionManager; -import io.shardingsphere.transaction.api.xa.XATransactionManagerSPILoader; -import lombok.AccessLevel; -import lombok.NoArgsConstructor; - -/** - * Sharding transaction manager factory. - * - * @author zhangliang - */ -@NoArgsConstructor(access = AccessLevel.PRIVATE) -public final class ShardingTransactionManagerFactory { - - /** - * Get sharding transaction manager. - * - * @param transactionType transaction type - * @return sharding transaction manager - */ - public static ShardingTransactionManager getShardingTransactionManager(final TransactionType transactionType) { - switch (transactionType) { - case LOCAL: - return new LocalTransactionManager(); - case XA: - return XATransactionManagerSPILoader.getInstance().getTransactionManager(); - case BASE: - default: - return null; - } - } -} diff --git a/sharding-transaction/src/main/java/io/shardingsphere/transaction/api/xa/AtomikosTransactionManager.java b/sharding-transaction/src/main/java/io/shardingsphere/transaction/api/xa/AtomikosTransactionManager.java deleted file mode 100644 index 7977d51cc4845..0000000000000 --- a/sharding-transaction/src/main/java/io/shardingsphere/transaction/api/xa/AtomikosTransactionManager.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Copyright 2016-2018 shardingsphere.io. - *

- * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - *

- */ - -package io.shardingsphere.transaction.api.xa; - -import com.atomikos.icatch.jta.UserTransactionManager; -import io.shardingsphere.core.exception.ShardingException; -import io.shardingsphere.transaction.common.event.TransactionEvent; - -import javax.transaction.HeuristicMixedException; -import javax.transaction.HeuristicRollbackException; -import javax.transaction.NotSupportedException; -import javax.transaction.RollbackException; -import javax.transaction.SystemException; -import java.sql.SQLException; - -/** - * Atomikos XA transaction manager. - * - * @author zhaojun - */ -public final class AtomikosTransactionManager implements XATransactionManager { - - private static final UserTransactionManager TRANSACTION_MANAGER = AtomikosUserTransaction.getInstance(); - - static { - try { - TRANSACTION_MANAGER.init(); - } catch (final SystemException ex) { - throw new ShardingException(ex); - } - } - - @Override - public void begin(final TransactionEvent transactionEvent) throws SQLException { - try { - TRANSACTION_MANAGER.begin(); - } catch (final SystemException | NotSupportedException ex) { - throw new SQLException(ex); - } - } - - @Override - public void commit(final TransactionEvent transactionEvent) throws SQLException { - try { - TRANSACTION_MANAGER.commit(); - } catch (final RollbackException | HeuristicMixedException | HeuristicRollbackException | SystemException ex) { - throw new SQLException(ex); - } - } - - @Override - public void rollback(final TransactionEvent transactionEvent) throws SQLException { - try { - TRANSACTION_MANAGER.rollback(); - } catch (final SystemException ex) { - throw new SQLException(ex); - } - } - - @Override - public int getStatus() throws SQLException { - try { - return TRANSACTION_MANAGER.getStatus(); - } catch (final SystemException ex) { - throw new SQLException(ex); - } - } -} diff --git a/sharding-transaction/src/main/java/io/shardingsphere/transaction/common/TransactionContext.java b/sharding-transaction/src/main/java/io/shardingsphere/transaction/common/TransactionContext.java deleted file mode 100644 index 51606268322ae..0000000000000 --- a/sharding-transaction/src/main/java/io/shardingsphere/transaction/common/TransactionContext.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright 2016-2018 shardingsphere.io. - *

- * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - *

- */ - -package io.shardingsphere.transaction.common; - -import io.shardingsphere.core.constant.TransactionType; -import io.shardingsphere.transaction.api.ShardingTransactionManager; -import lombok.Getter; -import lombok.NoArgsConstructor; - -/** - * Hold Transaction Context. - * - * @author zhaojun - */ -@NoArgsConstructor -@Getter -public final class TransactionContext { - - private ShardingTransactionManager transactionManager; - - private TransactionType transactionType = TransactionType.LOCAL; - - public TransactionContext(final ShardingTransactionManager transactionManager, final TransactionType transactionType) { - this.transactionManager = transactionManager; - this.transactionType = transactionType; - } -} diff --git a/sharding-transaction/src/main/java/io/shardingsphere/transaction/common/event/TransactionEvent.java b/sharding-transaction/src/main/java/io/shardingsphere/transaction/common/event/TransactionEvent.java deleted file mode 100644 index 5c02c1b41c74a..0000000000000 --- a/sharding-transaction/src/main/java/io/shardingsphere/transaction/common/event/TransactionEvent.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright 2016-2018 shardingsphere.io. - *

- * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - *

- */ - -package io.shardingsphere.transaction.common.event; - -import com.google.common.base.Optional; -import io.shardingsphere.core.constant.TCLType; -import lombok.Getter; -import lombok.RequiredArgsConstructor; -import lombok.Setter; - -import java.util.UUID; - -/** - * Abstract Transaction Event. - * - * @author zhaojun - */ -@RequiredArgsConstructor -@Getter -public abstract class TransactionEvent { - - private final String id = UUID.randomUUID().toString(); - - private final TCLType tclType; - - @Setter - private Exception exception; - - /** - * Get exception. - * - * @return exception - */ - // TODO why not use sharding exception directly? - public Optional getException() { - return Optional.fromNullable(exception); - } -} diff --git a/sharding-transaction/src/main/java/io/shardingsphere/transaction/common/event/TransactionEventFactory.java b/sharding-transaction/src/main/java/io/shardingsphere/transaction/common/event/TransactionEventFactory.java deleted file mode 100644 index ad696c81a32f6..0000000000000 --- a/sharding-transaction/src/main/java/io/shardingsphere/transaction/common/event/TransactionEventFactory.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright 2016-2018 shardingsphere.io. - *

- * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - *

- */ - -package io.shardingsphere.transaction.common.event; - -import io.shardingsphere.core.constant.TCLType; -import io.shardingsphere.transaction.common.TransactionContextHolder; -import lombok.AccessLevel; -import lombok.NoArgsConstructor; - -/** - * Transaction event factory. - * - * @author zhaojun - */ -@NoArgsConstructor(access = AccessLevel.PRIVATE) -public final class TransactionEventFactory { - - /** - * Create transaction event. - * - * @param tclType TCL type - * @return transaction event - */ - public static TransactionEvent create(final TCLType tclType) { - switch (TransactionContextHolder.get().getTransactionType()) { - case LOCAL: - return new LocalTransactionEvent(tclType); - case XA: - return new XaTransactionEvent(tclType, ""); - case BASE: - default: - return null; - } - } -} diff --git a/sharding-transaction/src/main/java/io/shardingsphere/transaction/common/event/XaTransactionEvent.java b/sharding-transaction/src/main/java/io/shardingsphere/transaction/common/event/XaTransactionEvent.java deleted file mode 100644 index 67c9dd5ecd15c..0000000000000 --- a/sharding-transaction/src/main/java/io/shardingsphere/transaction/common/event/XaTransactionEvent.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright 2016-2018 shardingsphere.io. - *

- * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - *

- */ - -package io.shardingsphere.transaction.common.event; - -import com.google.common.base.Optional; -import io.shardingsphere.core.constant.TCLType; -import io.shardingsphere.core.exception.ShardingException; -import lombok.Getter; - -/** - * XA transaction event. - * - * @author zhaojun - */ -@Getter -public final class XaTransactionEvent extends TransactionEvent { - - private final String sql; - - public XaTransactionEvent(final TCLType tclType, final String sql) { - super(tclType); - this.sql = sql; - } - - @Override - public Optional getException() { - return Optional.fromNullable((ShardingException) super.getException().orNull()); - } -} diff --git a/sharding-transaction/src/main/java/io/shardingsphere/transaction/common/listener/TransactionListener.java b/sharding-transaction/src/main/java/io/shardingsphere/transaction/common/listener/TransactionListener.java deleted file mode 100644 index 6dfae8075ca18..0000000000000 --- a/sharding-transaction/src/main/java/io/shardingsphere/transaction/common/listener/TransactionListener.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright 2016-2018 shardingsphere.io. - *

- * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - *

- */ - -package io.shardingsphere.transaction.common.listener; - -import com.google.common.eventbus.AllowConcurrentEvents; -import com.google.common.eventbus.Subscribe; -import io.shardingsphere.core.util.EventBusInstance; -import io.shardingsphere.transaction.api.ShardingTransactionManager; -import io.shardingsphere.transaction.common.TransactionContextHolder; -import io.shardingsphere.transaction.common.event.TransactionEvent; - -import java.sql.SQLException; - -/** - * Transaction Listener. - * - * @author zhaojun - */ -public final class TransactionListener { - - /** - * Register transaction listener into event bus. - */ - public void register() { - EventBusInstance.getInstance().register(this); - } - - /** - * Listen event. - * - * @param transactionEvent transaction event - * @throws SQLException SQL exception - */ - @Subscribe - @AllowConcurrentEvents - public void listen(final TransactionEvent transactionEvent) throws SQLException { - ShardingTransactionManager transactionManager = TransactionContextHolder.get().getTransactionManager(); - switch (transactionEvent.getTclType()) { - case BEGIN: - transactionManager.begin(transactionEvent); - break; - case COMMIT: - transactionManager.commit(transactionEvent); - break; - case ROLLBACK: - transactionManager.rollback(transactionEvent); - break; - default: - } - } -} diff --git a/sharding-core/src/main/java/io/shardingsphere/core/constant/TCLType.java b/sharding-transaction/src/main/java/io/shardingsphere/transaction/event/ShardingTransactionEvent.java similarity index 63% rename from sharding-core/src/main/java/io/shardingsphere/core/constant/TCLType.java rename to sharding-transaction/src/main/java/io/shardingsphere/transaction/event/ShardingTransactionEvent.java index 5f0bdbba2b8db..725578a81604d 100644 --- a/sharding-core/src/main/java/io/shardingsphere/core/constant/TCLType.java +++ b/sharding-transaction/src/main/java/io/shardingsphere/transaction/event/ShardingTransactionEvent.java @@ -15,14 +15,22 @@ *

*/ -package io.shardingsphere.core.constant; +package io.shardingsphere.transaction.event; + +import io.shardingsphere.core.constant.transaction.TransactionOperationType; /** - * TCL Type. + * Sharding transaction event. * * @author zhaojun + * @author zhangliang */ -public enum TCLType { +public interface ShardingTransactionEvent { - SET, BEGIN, COMMIT, ROLLBACK, SAVEPOINT + /** + * Get transaction operation type. + * + * @return transaction operation type + */ + TransactionOperationType getOperationType(); } diff --git a/sharding-transaction/src/main/java/io/shardingsphere/transaction/common/event/LocalTransactionEvent.java b/sharding-transaction/src/main/java/io/shardingsphere/transaction/event/local/LocalTransactionEvent.java similarity index 54% rename from sharding-transaction/src/main/java/io/shardingsphere/transaction/common/event/LocalTransactionEvent.java rename to sharding-transaction/src/main/java/io/shardingsphere/transaction/event/local/LocalTransactionEvent.java index 63cce2ab9390a..e225eaff8db8f 100644 --- a/sharding-transaction/src/main/java/io/shardingsphere/transaction/common/event/LocalTransactionEvent.java +++ b/sharding-transaction/src/main/java/io/shardingsphere/transaction/event/local/LocalTransactionEvent.java @@ -15,13 +15,12 @@ *

*/ -package io.shardingsphere.transaction.common.event; +package io.shardingsphere.transaction.event.local; -import com.google.common.base.Optional; -import io.shardingsphere.core.constant.TCLType; -import io.shardingsphere.core.exception.ShardingException; +import io.shardingsphere.core.constant.transaction.TransactionOperationType; +import io.shardingsphere.transaction.event.ShardingTransactionEvent; import lombok.Getter; -import lombok.Setter; +import lombok.RequiredArgsConstructor; import java.sql.Connection; import java.util.Collection; @@ -31,20 +30,13 @@ * * @author zhaojun */ +@RequiredArgsConstructor @Getter -@Setter -public final class LocalTransactionEvent extends TransactionEvent { +public final class LocalTransactionEvent implements ShardingTransactionEvent { - private Collection cachedConnections; + private final TransactionOperationType operationType; + + private final Collection cachedConnections; - private boolean autoCommit = true; - - public LocalTransactionEvent(final TCLType tclType) { - super(tclType); - } - - @Override - public Optional getException() { - return Optional.fromNullable((ShardingException) super.getException().orNull()); - } + private final boolean autoCommit; } diff --git a/sharding-transaction/src/main/java/io/shardingsphere/transaction/api/xa/XATransactionManager.java b/sharding-transaction/src/main/java/io/shardingsphere/transaction/event/xa/XATransactionEvent.java similarity index 57% rename from sharding-transaction/src/main/java/io/shardingsphere/transaction/api/xa/XATransactionManager.java rename to sharding-transaction/src/main/java/io/shardingsphere/transaction/event/xa/XATransactionEvent.java index 6edc845f82a78..5d9af7fc8e5f7 100644 --- a/sharding-transaction/src/main/java/io/shardingsphere/transaction/api/xa/XATransactionManager.java +++ b/sharding-transaction/src/main/java/io/shardingsphere/transaction/event/xa/XATransactionEvent.java @@ -15,14 +15,21 @@ *

*/ -package io.shardingsphere.transaction.api.xa; +package io.shardingsphere.transaction.event.xa; -import io.shardingsphere.transaction.api.ShardingTransactionManager; +import io.shardingsphere.core.constant.transaction.TransactionOperationType; +import io.shardingsphere.transaction.event.ShardingTransactionEvent; +import lombok.Getter; +import lombok.RequiredArgsConstructor; /** - * XA transaction manager. + * XA transaction event. * - * @author zhangliang + * @author zhaojun */ -public interface XATransactionManager extends ShardingTransactionManager { +@RequiredArgsConstructor +@Getter +public final class XATransactionEvent implements ShardingTransactionEvent { + + private final TransactionOperationType operationType; } diff --git a/sharding-transaction/src/main/java/io/shardingsphere/transaction/api/xa/AtomikosUserTransaction.java b/sharding-transaction/src/main/java/io/shardingsphere/transaction/listener/ShardingTransactionListener.java similarity index 51% rename from sharding-transaction/src/main/java/io/shardingsphere/transaction/api/xa/AtomikosUserTransaction.java rename to sharding-transaction/src/main/java/io/shardingsphere/transaction/listener/ShardingTransactionListener.java index 137c89ad32787..43978b45b1dcc 100644 --- a/sharding-transaction/src/main/java/io/shardingsphere/transaction/api/xa/AtomikosUserTransaction.java +++ b/sharding-transaction/src/main/java/io/shardingsphere/transaction/listener/ShardingTransactionListener.java @@ -15,28 +15,31 @@ *

*/ -package io.shardingsphere.transaction.api.xa; +package io.shardingsphere.transaction.listener; -import com.atomikos.icatch.jta.UserTransactionManager; -import lombok.AccessLevel; -import lombok.NoArgsConstructor; +import io.shardingsphere.transaction.event.ShardingTransactionEvent; + +import java.sql.SQLException; /** - * Hold singleton atomikos userTransaction. + * Sharding transaction listener. * - * @author zhaojun + * @author zhangliang + * + * @param transaction event type */ -@NoArgsConstructor(access = AccessLevel.PRIVATE) -public final class AtomikosUserTransaction { +public interface ShardingTransactionListener { - private static final UserTransactionManager TRANSACTION_MANAGER = new UserTransactionManager(); + /** + * Register sharding transaction listener into event bus. + */ + void register(); /** - * Get singleton of {@code UserTransactionManager}. + * Listen event. * - * @return {@code UserTransactionManager} + * @param transactionEvent transaction event + * @throws SQLException SQL exception */ - public static UserTransactionManager getInstance() { - return TRANSACTION_MANAGER; - } + void listen(T transactionEvent) throws SQLException; } diff --git a/sharding-transaction/src/main/java/io/shardingsphere/transaction/listener/ShardingTransactionListenerAdapter.java b/sharding-transaction/src/main/java/io/shardingsphere/transaction/listener/ShardingTransactionListenerAdapter.java new file mode 100644 index 0000000000000..7cbc9a685361c --- /dev/null +++ b/sharding-transaction/src/main/java/io/shardingsphere/transaction/listener/ShardingTransactionListenerAdapter.java @@ -0,0 +1,55 @@ +/* + * Copyright 2016-2018 shardingsphere.io. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *

+ */ + +package io.shardingsphere.transaction.listener; + +import io.shardingsphere.core.util.EventBusInstance; +import io.shardingsphere.transaction.event.ShardingTransactionEvent; +import io.shardingsphere.transaction.manager.ShardingTransactionManager; + +import java.sql.SQLException; + +/** + * Sharding transaction listener adapter. + * + * @author zhangliang + * + * @param transaction event type + */ +public abstract class ShardingTransactionListenerAdapter implements ShardingTransactionListener { + + @Override + public final void register() { + EventBusInstance.getInstance().register(this); + } + + @SuppressWarnings("unchecked") + protected final void doTransaction(final ShardingTransactionManager shardingTransactionManager, final T transactionEvent) throws SQLException { + switch (transactionEvent.getOperationType()) { + case BEGIN: + shardingTransactionManager.begin(transactionEvent); + break; + case COMMIT: + shardingTransactionManager.commit(transactionEvent); + break; + case ROLLBACK: + shardingTransactionManager.rollback(transactionEvent); + break; + default: + } + } +} diff --git a/sharding-transaction/src/main/java/io/shardingsphere/transaction/listener/local/LocalTransactionListener.java b/sharding-transaction/src/main/java/io/shardingsphere/transaction/listener/local/LocalTransactionListener.java new file mode 100644 index 0000000000000..7b5d587b1a79a --- /dev/null +++ b/sharding-transaction/src/main/java/io/shardingsphere/transaction/listener/local/LocalTransactionListener.java @@ -0,0 +1,45 @@ +/* + * Copyright 2016-2018 shardingsphere.io. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *

+ */ + +package io.shardingsphere.transaction.listener.local; + +import com.google.common.eventbus.AllowConcurrentEvents; +import com.google.common.eventbus.Subscribe; +import io.shardingsphere.core.constant.transaction.TransactionType; +import io.shardingsphere.transaction.event.local.LocalTransactionEvent; +import io.shardingsphere.transaction.listener.ShardingTransactionListenerAdapter; +import io.shardingsphere.transaction.manager.ShardingTransactionManager; +import io.shardingsphere.transaction.manager.ShardingTransactionManagerRegistry; + +import java.sql.SQLException; + +/** + * Local transaction listener. + * + * @author zhangliang + */ +public final class LocalTransactionListener extends ShardingTransactionListenerAdapter { + + private final ShardingTransactionManager shardingTransactionManager = ShardingTransactionManagerRegistry.getInstance().getShardingTransactionManager(TransactionType.LOCAL); + + @Subscribe + @AllowConcurrentEvents + @Override + public void listen(final LocalTransactionEvent transactionEvent) throws SQLException { + doTransaction(shardingTransactionManager, transactionEvent); + } +} diff --git a/sharding-transaction/src/main/java/io/shardingsphere/transaction/listener/xa/XATransactionListener.java b/sharding-transaction/src/main/java/io/shardingsphere/transaction/listener/xa/XATransactionListener.java new file mode 100644 index 0000000000000..7db9fe8bd99af --- /dev/null +++ b/sharding-transaction/src/main/java/io/shardingsphere/transaction/listener/xa/XATransactionListener.java @@ -0,0 +1,45 @@ +/* + * Copyright 2016-2018 shardingsphere.io. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *

+ */ + +package io.shardingsphere.transaction.listener.xa; + +import com.google.common.eventbus.AllowConcurrentEvents; +import com.google.common.eventbus.Subscribe; +import io.shardingsphere.core.constant.transaction.TransactionType; +import io.shardingsphere.transaction.event.xa.XATransactionEvent; +import io.shardingsphere.transaction.listener.ShardingTransactionListenerAdapter; +import io.shardingsphere.transaction.manager.ShardingTransactionManager; +import io.shardingsphere.transaction.manager.ShardingTransactionManagerRegistry; + +import java.sql.SQLException; + +/** + * XA transaction listener. + * + * @author zhangliang + */ +public final class XATransactionListener extends ShardingTransactionListenerAdapter { + + private final ShardingTransactionManager shardingTransactionManager = ShardingTransactionManagerRegistry.getInstance().getShardingTransactionManager(TransactionType.XA); + + @Subscribe + @AllowConcurrentEvents + @Override + public void listen(final XATransactionEvent transactionEvent) throws SQLException { + doTransaction(shardingTransactionManager, transactionEvent); + } +} diff --git a/sharding-transaction/src/main/java/io/shardingsphere/transaction/api/ShardingTransactionManager.java b/sharding-transaction/src/main/java/io/shardingsphere/transaction/manager/ShardingTransactionManager.java similarity index 78% rename from sharding-transaction/src/main/java/io/shardingsphere/transaction/api/ShardingTransactionManager.java rename to sharding-transaction/src/main/java/io/shardingsphere/transaction/manager/ShardingTransactionManager.java index 1a729ce98914f..fd5f4291c7bf3 100644 --- a/sharding-transaction/src/main/java/io/shardingsphere/transaction/api/ShardingTransactionManager.java +++ b/sharding-transaction/src/main/java/io/shardingsphere/transaction/manager/ShardingTransactionManager.java @@ -15,9 +15,9 @@ *

*/ -package io.shardingsphere.transaction.api; +package io.shardingsphere.transaction.manager; -import io.shardingsphere.transaction.common.event.TransactionEvent; +import io.shardingsphere.transaction.event.ShardingTransactionEvent; import java.sql.SQLException; /** @@ -25,8 +25,10 @@ * * @author zhaojun * @author zhangliang + * + * @param transaction event type */ -public interface ShardingTransactionManager { +public interface ShardingTransactionManager { /** * Begin transaction. @@ -34,7 +36,7 @@ public interface ShardingTransactionManager { * @param transactionEvent transaction event * @throws SQLException SQL exception */ - void begin(TransactionEvent transactionEvent) throws SQLException; + void begin(T transactionEvent) throws SQLException; /** * Commit transaction. @@ -42,7 +44,7 @@ public interface ShardingTransactionManager { * @param transactionEvent transaction event * @throws SQLException SQL exception */ - void commit(TransactionEvent transactionEvent) throws SQLException; + void commit(T transactionEvent) throws SQLException; /** * Rollback transaction. @@ -50,7 +52,7 @@ public interface ShardingTransactionManager { * @param transactionEvent transaction event * @throws SQLException SQL exception */ - void rollback(TransactionEvent transactionEvent) throws SQLException; + void rollback(T transactionEvent) throws SQLException; /** * Obtain the status of the transaction associated with the current thread. diff --git a/sharding-transaction/src/main/java/io/shardingsphere/transaction/manager/ShardingTransactionManagerRegistry.java b/sharding-transaction/src/main/java/io/shardingsphere/transaction/manager/ShardingTransactionManagerRegistry.java new file mode 100644 index 0000000000000..6e0b97519cb05 --- /dev/null +++ b/sharding-transaction/src/main/java/io/shardingsphere/transaction/manager/ShardingTransactionManagerRegistry.java @@ -0,0 +1,74 @@ +/* + * Copyright 2016-2018 shardingsphere.io. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *

+ */ + +package io.shardingsphere.transaction.manager; + +import io.shardingsphere.core.constant.transaction.TransactionType; +import io.shardingsphere.transaction.manager.local.LocalTransactionManager; +import io.shardingsphere.transaction.manager.xa.XATransactionManagerSPILoader; + +import java.util.HashMap; +import java.util.Map; + +/** + * Sharding transaction manager register. + * + * @author zhangliang + */ +public final class ShardingTransactionManagerRegistry { + + private static final ShardingTransactionManagerRegistry INSTANCE = new ShardingTransactionManagerRegistry(); + + private final Map shardingTransactionManagers = new HashMap<>(TransactionType.values().length, 1); + + private ShardingTransactionManagerRegistry() { + for (TransactionType each : TransactionType.values()) { + shardingTransactionManagers.put(each, loadShardingTransactionManager(each)); + } + } + + private ShardingTransactionManager loadShardingTransactionManager(final TransactionType transactionType) { + switch (transactionType) { + case LOCAL: + return new LocalTransactionManager(); + case XA: + return XATransactionManagerSPILoader.getInstance().getTransactionManager(); + case BASE: + default: + return null; + } + } + + /** + * Get instance of sharding transaction manager register. + * + * @return instance of sharding transaction manager register + */ + public static ShardingTransactionManagerRegistry getInstance() { + return INSTANCE; + } + + /** + * Get sharding transaction manager. + * + * @param transactionType transaction type + * @return sharding transaction manager + */ + public ShardingTransactionManager getShardingTransactionManager(final TransactionType transactionType) { + return shardingTransactionManagers.get(transactionType); + } +} diff --git a/sharding-transaction/src/main/java/io/shardingsphere/transaction/api/base/SagaTransactionManager.java b/sharding-transaction/src/main/java/io/shardingsphere/transaction/manager/base/SagaTransactionManager.java similarity index 71% rename from sharding-transaction/src/main/java/io/shardingsphere/transaction/api/base/SagaTransactionManager.java rename to sharding-transaction/src/main/java/io/shardingsphere/transaction/manager/base/SagaTransactionManager.java index 16ec8b1a5aca3..6dcf179c0e2dc 100644 --- a/sharding-transaction/src/main/java/io/shardingsphere/transaction/api/base/SagaTransactionManager.java +++ b/sharding-transaction/src/main/java/io/shardingsphere/transaction/manager/base/SagaTransactionManager.java @@ -15,10 +15,10 @@ *

*/ -package io.shardingsphere.transaction.api.base; +package io.shardingsphere.transaction.manager.base; -import io.shardingsphere.transaction.api.ShardingTransactionManager; -import io.shardingsphere.transaction.common.event.TransactionEvent; +import io.shardingsphere.transaction.manager.ShardingTransactionManager; +import io.shardingsphere.transaction.event.ShardingTransactionEvent; import javax.transaction.Status; @@ -30,15 +30,15 @@ public final class SagaTransactionManager implements ShardingTransactionManager { @Override - public void begin(final TransactionEvent transactionEvent) { + public void begin(final ShardingTransactionEvent transactionEvent) { } @Override - public void commit(final TransactionEvent transactionEvent) { + public void commit(final ShardingTransactionEvent transactionEvent) { } @Override - public void rollback(final TransactionEvent transactionEvent) { + public void rollback(final ShardingTransactionEvent transactionEvent) { } @Override diff --git a/sharding-transaction/src/main/java/io/shardingsphere/transaction/api/local/LocalTransactionManager.java b/sharding-transaction/src/main/java/io/shardingsphere/transaction/manager/local/LocalTransactionManager.java similarity index 67% rename from sharding-transaction/src/main/java/io/shardingsphere/transaction/api/local/LocalTransactionManager.java rename to sharding-transaction/src/main/java/io/shardingsphere/transaction/manager/local/LocalTransactionManager.java index b462445d5a131..25d1d02795b0c 100644 --- a/sharding-transaction/src/main/java/io/shardingsphere/transaction/api/local/LocalTransactionManager.java +++ b/sharding-transaction/src/main/java/io/shardingsphere/transaction/manager/local/LocalTransactionManager.java @@ -15,11 +15,10 @@ *

*/ -package io.shardingsphere.transaction.api.local; +package io.shardingsphere.transaction.manager.local; -import io.shardingsphere.transaction.api.ShardingTransactionManager; -import io.shardingsphere.transaction.common.event.TransactionEvent; -import io.shardingsphere.transaction.common.event.LocalTransactionEvent; +import io.shardingsphere.transaction.event.local.LocalTransactionEvent; +import io.shardingsphere.transaction.manager.ShardingTransactionManager; import javax.transaction.Status; import java.sql.Connection; @@ -32,15 +31,14 @@ * * @author zhaojun */ -public final class LocalTransactionManager implements ShardingTransactionManager { +public final class LocalTransactionManager implements ShardingTransactionManager { @Override - public void begin(final TransactionEvent transactionEvent) throws SQLException { - LocalTransactionEvent localTransactionEvent = (LocalTransactionEvent) transactionEvent; + public void begin(final LocalTransactionEvent event) throws SQLException { Collection exceptions = new LinkedList<>(); - for (Connection each : localTransactionEvent.getCachedConnections()) { + for (Connection each : event.getCachedConnections()) { try { - each.setAutoCommit(localTransactionEvent.isAutoCommit()); + each.setAutoCommit(event.isAutoCommit()); } catch (final SQLException ex) { exceptions.add(ex); } @@ -49,10 +47,9 @@ public void begin(final TransactionEvent transactionEvent) throws SQLException { } @Override - public void commit(final TransactionEvent transactionEvent) throws SQLException { - LocalTransactionEvent localTransactionEvent = (LocalTransactionEvent) transactionEvent; + public void commit(final LocalTransactionEvent event) throws SQLException { Collection exceptions = new LinkedList<>(); - for (Connection each : localTransactionEvent.getCachedConnections()) { + for (Connection each : event.getCachedConnections()) { try { each.commit(); } catch (final SQLException ex) { @@ -63,10 +60,9 @@ public void commit(final TransactionEvent transactionEvent) throws SQLException } @Override - public void rollback(final TransactionEvent transactionEvent) throws SQLException { - LocalTransactionEvent localTransactionEvent = (LocalTransactionEvent) transactionEvent; + public void rollback(final LocalTransactionEvent event) throws SQLException { Collection exceptions = new LinkedList<>(); - for (Connection each : localTransactionEvent.getCachedConnections()) { + for (Connection each : event.getCachedConnections()) { try { each.rollback(); } catch (final SQLException ex) { diff --git a/sharding-transaction/src/main/java/io/shardingsphere/transaction/manager/xa/XATransactionManager.java b/sharding-transaction/src/main/java/io/shardingsphere/transaction/manager/xa/XATransactionManager.java new file mode 100644 index 0000000000000..ddd3f97141d19 --- /dev/null +++ b/sharding-transaction/src/main/java/io/shardingsphere/transaction/manager/xa/XATransactionManager.java @@ -0,0 +1,44 @@ +/* + * Copyright 2016-2018 shardingsphere.io. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *

+ */ + +package io.shardingsphere.transaction.manager.xa; + +import io.shardingsphere.transaction.event.xa.XATransactionEvent; +import io.shardingsphere.core.rule.DataSourceParameter; +import io.shardingsphere.transaction.manager.ShardingTransactionManager; + +import javax.sql.DataSource; +import javax.sql.XADataSource; + +/** + * XA transaction manager. + * + * @author zhangliang + */ +public interface XATransactionManager extends ShardingTransactionManager { + + /** + * Wrap the specific {@link XADataSource} and enroll it with a JTA. + * + * @param dataSource the data source to wrap + * @param dataSourceName the data source name + * @param dataSourceParameter the data source parameter + * @return the wrapped data source + * @throws Exception if can not wrap the data source + */ + DataSource wrapDataSource(XADataSource dataSource, String dataSourceName, DataSourceParameter dataSourceParameter) throws Exception; +} diff --git a/sharding-transaction/src/main/java/io/shardingsphere/transaction/api/xa/XATransactionManagerSPILoader.java b/sharding-transaction/src/main/java/io/shardingsphere/transaction/manager/xa/XATransactionManagerSPILoader.java similarity index 93% rename from sharding-transaction/src/main/java/io/shardingsphere/transaction/api/xa/XATransactionManagerSPILoader.java rename to sharding-transaction/src/main/java/io/shardingsphere/transaction/manager/xa/XATransactionManagerSPILoader.java index fe94ab81b66e2..4f20561ac6139 100644 --- a/sharding-transaction/src/main/java/io/shardingsphere/transaction/api/xa/XATransactionManagerSPILoader.java +++ b/sharding-transaction/src/main/java/io/shardingsphere/transaction/manager/xa/XATransactionManagerSPILoader.java @@ -15,8 +15,9 @@ *

*/ -package io.shardingsphere.transaction.api.xa; +package io.shardingsphere.transaction.manager.xa; +import io.shardingsphere.transaction.manager.xa.atomikos.AtomikosTransactionManager; import lombok.Getter; import lombok.extern.slf4j.Slf4j; diff --git a/sharding-transaction/src/main/java/io/shardingsphere/transaction/manager/xa/atomikos/AtomikosTransactionManager.java b/sharding-transaction/src/main/java/io/shardingsphere/transaction/manager/xa/atomikos/AtomikosTransactionManager.java new file mode 100644 index 0000000000000..67105217f2c44 --- /dev/null +++ b/sharding-transaction/src/main/java/io/shardingsphere/transaction/manager/xa/atomikos/AtomikosTransactionManager.java @@ -0,0 +1,122 @@ +/* + * Copyright 2016-2018 shardingsphere.io. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *

+ */ + +package io.shardingsphere.transaction.manager.xa.atomikos; + +import com.atomikos.icatch.jta.UserTransactionManager; +import com.atomikos.jdbc.AtomikosDataSourceBean; +import com.google.common.base.Optional; +import io.shardingsphere.core.exception.ShardingException; +import io.shardingsphere.transaction.event.xa.XATransactionEvent; +import io.shardingsphere.core.rule.DataSourceParameter; +import io.shardingsphere.transaction.manager.xa.XATransactionManager; + +import javax.sql.DataSource; +import javax.sql.XADataSource; +import javax.transaction.HeuristicMixedException; +import javax.transaction.HeuristicRollbackException; +import javax.transaction.NotSupportedException; +import javax.transaction.RollbackException; +import javax.transaction.SystemException; +import java.sql.SQLException; +import java.util.Properties; + +/** + * Atomikos XA transaction manager. + * + * @author zhaojun + */ +public final class AtomikosTransactionManager implements XATransactionManager { + + private static final UserTransactionManager USER_TRANSACTION_MANAGER = new UserTransactionManager(); + + static { + try { + USER_TRANSACTION_MANAGER.init(); + } catch (final SystemException ex) { + throw new ShardingException(ex); + } + } + + @Override + public void begin(final XATransactionEvent event) throws SQLException { + try { + USER_TRANSACTION_MANAGER.begin(); + } catch (final SystemException | NotSupportedException ex) { + throw new SQLException(ex); + } + } + + @Override + public void commit(final XATransactionEvent event) throws SQLException { + try { + USER_TRANSACTION_MANAGER.commit(); + } catch (final RollbackException | HeuristicMixedException | HeuristicRollbackException | SystemException ex) { + throw new SQLException(ex); + } + } + + @Override + public void rollback(final XATransactionEvent event) throws SQLException { + try { + USER_TRANSACTION_MANAGER.rollback(); + } catch (final SystemException ex) { + throw new SQLException(ex); + } + } + + @Override + public int getStatus() throws SQLException { + try { + return USER_TRANSACTION_MANAGER.getStatus(); + } catch (final SystemException ex) { + throw new SQLException(ex); + } + } + + @Override + public DataSource wrapDataSource(final XADataSource dataSource, final String dataSourceName, final DataSourceParameter dataSourceParameter) { + AtomikosDataSourceBean result = new AtomikosDataSourceBean(); + result.setUniqueResourceName(dataSourceName); + result.setMaxPoolSize(dataSourceParameter.getMaximumPoolSize()); + result.setTestQuery("SELECT 1"); + result.setXaProperties(getProperties(dataSourceParameter)); + result.setXaDataSource(dataSource); + return result; + } + + private Properties getProperties(final DataSourceParameter dataSourceParameter) { + Properties result = new Properties(); + result.setProperty("user", dataSourceParameter.getUsername()); + result.setProperty("password", Optional.fromNullable(dataSourceParameter.getPassword()).or("")); + result.setProperty("URL", dataSourceParameter.getUrl()); + result.setProperty("pinGlobalTxToPhysicalConnection", Boolean.TRUE.toString()); + result.setProperty("autoReconnect", Boolean.TRUE.toString()); + result.setProperty("useServerPrepStmts", Boolean.TRUE.toString()); + result.setProperty("cachePrepStmts", Boolean.TRUE.toString()); + result.setProperty("prepStmtCacheSize", "250"); + result.setProperty("prepStmtCacheSqlLimit", "2048"); + result.setProperty("useLocalSessionState", Boolean.TRUE.toString()); + result.setProperty("rewriteBatchedStatements", Boolean.TRUE.toString()); + result.setProperty("cacheResultSetMetadata", Boolean.TRUE.toString()); + result.setProperty("cacheServerConfiguration", Boolean.TRUE.toString()); + result.setProperty("elideSetAutoCommits", Boolean.TRUE.toString()); + result.setProperty("maintainTimeStats", Boolean.FALSE.toString()); + result.setProperty("netTimeoutForStreamingResults", "0"); + return result; + } +}