diff --git a/sharding-core/src/main/java/io/shardingsphere/core/rule/DataSourceParameter.java b/sharding-core/src/main/java/io/shardingsphere/core/rule/DataSourceParameter.java index 93ab7a047d241..0d8e7ade11526 100644 --- a/sharding-core/src/main/java/io/shardingsphere/core/rule/DataSourceParameter.java +++ b/sharding-core/src/main/java/io/shardingsphere/core/rule/DataSourceParameter.java @@ -42,7 +42,7 @@ public final class DataSourceParameter { private static final int DEFAULT_MAX_POOL_SIZE = 50; - private static final int DEFAULT_MIN_POOL_SIZE = 5; + private static final int DEFAULT_MIN_POOL_SIZE = 1; private ProxyPoolType proxyDatasourceType = ProxyPoolType.VENDOR; diff --git a/sharding-jdbc/sharding-jdbc-core/src/main/java/io/shardingsphere/shardingjdbc/jdbc/adapter/AbstractConnectionAdapter.java b/sharding-jdbc/sharding-jdbc-core/src/main/java/io/shardingsphere/shardingjdbc/jdbc/adapter/AbstractConnectionAdapter.java index 7eac853d3ca02..3e4e26265ee10 100644 --- a/sharding-jdbc/sharding-jdbc-core/src/main/java/io/shardingsphere/shardingjdbc/jdbc/adapter/AbstractConnectionAdapter.java +++ b/sharding-jdbc/sharding-jdbc-core/src/main/java/io/shardingsphere/shardingjdbc/jdbc/adapter/AbstractConnectionAdapter.java @@ -18,7 +18,7 @@ package io.shardingsphere.shardingjdbc.jdbc.adapter; import com.google.common.base.Preconditions; -import com.google.common.collect.HashMultimap; +import com.google.common.collect.LinkedHashMultimap; import com.google.common.collect.Multimap; import io.shardingsphere.core.constant.ConnectionMode; import io.shardingsphere.core.constant.transaction.TransactionOperationType; @@ -28,10 +28,10 @@ import io.shardingsphere.core.event.transaction.xa.XATransactionEvent; import io.shardingsphere.core.hint.HintManagerHolder; import io.shardingsphere.core.routing.router.masterslave.MasterVisitedManager; +import io.shardingsphere.core.transaction.TransactionTypeHolder; import io.shardingsphere.shardingjdbc.jdbc.adapter.executor.ForceExecuteCallback; import io.shardingsphere.shardingjdbc.jdbc.adapter.executor.ForceExecuteTemplate; import io.shardingsphere.shardingjdbc.jdbc.unsupported.AbstractUnsupportedOperationConnection; -import io.shardingsphere.core.transaction.TransactionTypeHolder; import io.shardingsphere.spi.root.RootInvokeHook; import io.shardingsphere.spi.root.SPIRootInvokeHook; import io.shardingsphere.spi.transaction.ShardingTransactionHandler; @@ -60,7 +60,7 @@ @Getter public abstract class AbstractConnectionAdapter extends AbstractUnsupportedOperationConnection { - private final Multimap cachedConnections = HashMultimap.create(); + private final Multimap cachedConnections = LinkedHashMultimap.create(); private boolean autoCommit = true; @@ -186,10 +186,13 @@ public void execute(final Connection connection) throws SQLException { connection.setAutoCommit(autoCommit); } }); - } else if (TransactionType.XA == transactionType) { - shardingTransactionHandler.doInTransaction(new XATransactionEvent(TransactionOperationType.BEGIN)); - } else if (TransactionType.BASE == transactionType) { - shardingTransactionHandler.doInTransaction(new SagaTransactionEvent(TransactionOperationType.BEGIN, this)); + } + if (!autoCommit) { + if (TransactionType.XA == transactionType) { + shardingTransactionHandler.doInTransaction(new XATransactionEvent(TransactionOperationType.BEGIN)); + } else if (TransactionType.BASE == transactionType) { + shardingTransactionHandler.doInTransaction(new SagaTransactionEvent(TransactionOperationType.BEGIN, this)); + } } } diff --git a/sharding-jdbc/sharding-jdbc-core/src/test/java/io/shardingsphere/shardingjdbc/jdbc/adapter/ConnectionAdapterTest.java b/sharding-jdbc/sharding-jdbc-core/src/test/java/io/shardingsphere/shardingjdbc/jdbc/adapter/ConnectionAdapterTest.java index a730b7141e276..24d0f108d4779 100644 --- a/sharding-jdbc/sharding-jdbc-core/src/test/java/io/shardingsphere/shardingjdbc/jdbc/adapter/ConnectionAdapterTest.java +++ b/sharding-jdbc/sharding-jdbc-core/src/test/java/io/shardingsphere/shardingjdbc/jdbc/adapter/ConnectionAdapterTest.java @@ -18,10 +18,15 @@ package io.shardingsphere.shardingjdbc.jdbc.adapter; import com.google.common.collect.Multimap; +import io.shardingsphere.core.constant.transaction.TransactionType; +import io.shardingsphere.core.transaction.TransactionTypeHolder; import io.shardingsphere.shardingjdbc.common.base.AbstractShardingJDBCDatabaseAndTableTest; import io.shardingsphere.shardingjdbc.jdbc.core.connection.ShardingConnection; +import io.shardingsphere.shardingjdbc.jdbc.core.datasource.FixedBaseShardingTransactionHandler; +import io.shardingsphere.shardingjdbc.jdbc.core.datasource.FixedXAShardingTransactionHandler; import io.shardingsphere.shardingjdbc.jdbc.util.JDBCTestSQL; import lombok.SneakyThrows; +import org.junit.After; import org.junit.Test; import java.lang.reflect.Field; @@ -30,6 +35,7 @@ import java.sql.SQLException; import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -38,6 +44,13 @@ public final class ConnectionAdapterTest extends AbstractShardingJDBCDatabaseAnd private String sql = JDBCTestSQL.SELECT_GROUP_BY_USER_ID_SQL; + @After + public void tearDown() { + TransactionTypeHolder.clear(); + FixedXAShardingTransactionHandler.getInvokes().clear(); + FixedBaseShardingTransactionHandler.getInvokes().clear(); + } + @Test public void assertSetAutoCommit() throws SQLException { try (ShardingConnection actual = getShardingDataSource().getConnection()) { @@ -57,6 +70,24 @@ private void assertAutoCommit(final ShardingConnection actual, final boolean aut } } + @Test + public void assertIgnoreAutoCommitForXA() throws SQLException { + TransactionTypeHolder.set(TransactionType.XA); + try (ShardingConnection actual = getShardingDataSource().getConnection()) { + actual.setAutoCommit(true); + assertNull(FixedXAShardingTransactionHandler.getInvokes().get("begin")); + } + } + + @Test + public void assertIgnoreAutoCommitForBase() throws SQLException { + TransactionTypeHolder.set(TransactionType.BASE); + try (ShardingConnection actual = getShardingDataSource().getConnection()) { + actual.setAutoCommit(true); + assertNull(FixedBaseShardingTransactionHandler.getInvokes().get("begin")); + } + } + @Test // TODO 缺少断言,做柔性事务时补充 public void assertCommit() throws SQLException { @@ -67,6 +98,15 @@ public void assertCommit() throws SQLException { } } + @Test + public void assertXACommit() throws SQLException { + TransactionTypeHolder.set(TransactionType.XA); + try (ShardingConnection actual = getShardingDataSource().getConnection()) { + actual.commit(); + assertNotNull(FixedXAShardingTransactionHandler.getInvokes().get("commit")); + } + } + @Test // TODO 缺少断言,做柔性事务时补充 public void assertRollback() throws SQLException { @@ -77,6 +117,15 @@ public void assertRollback() throws SQLException { } } + @Test + public void assertXARollback() throws SQLException { + TransactionTypeHolder.set(TransactionType.XA); + try (ShardingConnection actual = getShardingDataSource().getConnection()) { + actual.rollback(); + assertNotNull(FixedXAShardingTransactionHandler.getInvokes().get("rollback")); + } + } + @Test public void assertClose() throws SQLException { try (ShardingConnection actual = getShardingDataSource().getConnection()) { diff --git a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/backend/jdbc/connection/ConnectionStateHandler.java b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/backend/jdbc/connection/ConnectionStateHandler.java index 153994ef41ba8..e76e4f9b3544d 100644 --- a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/backend/jdbc/connection/ConnectionStateHandler.java +++ b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/backend/jdbc/connection/ConnectionStateHandler.java @@ -79,6 +79,9 @@ void doNotifyIfNecessary() { if (status.compareAndSet(ConnectionStatus.RUNNING, ConnectionStatus.RELEASE)) { resourceSynchronizer.doNotify(); } + if (status.compareAndSet(ConnectionStatus.TERMINATED, ConnectionStatus.RELEASE)) { + resourceSynchronizer.doNotify(); + } } /** @@ -87,7 +90,7 @@ void doNotifyIfNecessary() { * @throws InterruptedException interrupted exception */ public void waitUntilConnectionReleasedIfNecessary() throws InterruptedException { - if (ConnectionStatus.RUNNING == status.get()) { + if (ConnectionStatus.RUNNING == status.get() || ConnectionStatus.TERMINATED == status.get()) { while (!status.compareAndSet(ConnectionStatus.RELEASE, ConnectionStatus.RUNNING)) { resourceSynchronizer.doAwait(); } diff --git a/sharding-proxy/src/test/java/io/shardingsphere/shardingproxy/backend/jdbc/connection/ConnectionStateHandlerTest.java b/sharding-proxy/src/test/java/io/shardingsphere/shardingproxy/backend/jdbc/connection/ConnectionStateHandlerTest.java index 9e84f16c230ed..16863e35cfae1 100644 --- a/sharding-proxy/src/test/java/io/shardingsphere/shardingproxy/backend/jdbc/connection/ConnectionStateHandlerTest.java +++ b/sharding-proxy/src/test/java/io/shardingsphere/shardingproxy/backend/jdbc/connection/ConnectionStateHandlerTest.java @@ -58,4 +58,33 @@ public void run() { notifyThread.join(); assertTrue(flag.get()); } + + @Test + public void assertWaitUntilConnectionReleaseForTransaction() throws InterruptedException { + final AtomicBoolean flag = new AtomicBoolean(true); + Thread waitThread = new Thread(new Runnable() { + @Override + @SneakyThrows + public void run() { + connectionStateHandler.getAndSetStatus(ConnectionStatus.TERMINATED); + connectionStateHandler.waitUntilConnectionReleasedIfNecessary(); + if (ConnectionStatus.RUNNING != connectionStateHandler.getStatus()) { + flag.getAndSet(false); + } + } + }); + Thread notifyThread = new Thread(new Runnable() { + @Override + @SneakyThrows + public void run() { + Thread.sleep(2000); + connectionStateHandler.doNotifyIfNecessary(); + } + }); + waitThread.start(); + notifyThread.start(); + waitThread.join(); + notifyThread.join(); + assertTrue(flag.get()); + } } diff --git a/sharding-transaction/sharding-transaction-xa/src/main/resources/jta.properties b/sharding-transaction/sharding-transaction-xa/src/main/resources/jta.properties index 6fd797b0702bb..0aab601c1e471 100644 --- a/sharding-transaction/sharding-transaction-xa/src/main/resources/jta.properties +++ b/sharding-transaction/sharding-transaction-xa/src/main/resources/jta.properties @@ -1,4 +1,4 @@ com.atomikos.icatch.serial_jta_transactions = false com.atomikos.icatch.default_jta_timeout = 1000000 com.atomikos.icatch.max_actives = 10000 -com.atomikos.icatch.enable_logging = false +com.atomikos.icatch.enable_logging = true diff --git a/sharding-transaction/sharding-transaction-xa/src/test/java/io/shardingsphere/transaction/xa/convert/swap/DataSourceSwapperRegistryTest.java b/sharding-transaction/sharding-transaction-xa/src/test/java/io/shardingsphere/transaction/xa/convert/swap/DataSourceSwapperRegistryTest.java index a63309e9f041b..f8e2ce52c0717 100644 --- a/sharding-transaction/sharding-transaction-xa/src/test/java/io/shardingsphere/transaction/xa/convert/swap/DataSourceSwapperRegistryTest.java +++ b/sharding-transaction/sharding-transaction-xa/src/test/java/io/shardingsphere/transaction/xa/convert/swap/DataSourceSwapperRegistryTest.java @@ -80,7 +80,7 @@ public void assertBuildParameterFromUnsupportedDataSource() { assertNull(actual.getUsername()); assertNull(actual.getPassword()); assertThat(actual.getMaximumPoolSize(), is(50)); - assertThat(actual.getMinimumPoolSize(), is(5)); + assertThat(actual.getMinimumPoolSize(), is(1)); assertThat(actual.getConnectionTimeout(), is(30 * 1000L)); assertThat(actual.getIdleTimeout(), is(60 * 1000L)); assertThat(actual.getMaxLifetime(), is(0L));