diff --git a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/ShardingPreparedStatement.java b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/ShardingPreparedStatement.java index 52c7f4306e2a7..a3655cad207d3 100644 --- a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/ShardingPreparedStatement.java +++ b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/ShardingPreparedStatement.java @@ -115,9 +115,9 @@ public boolean execute() throws SQLException { } } - private void clearRouteContext() throws SQLException { + protected void clearRouteContext() throws SQLException { + super.clearRouteContext(); clearParameters(); - setCurrentResultSet(null); } @Override diff --git a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/ShardingStatement.java b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/ShardingStatement.java index aa38ad1dde6ab..d3a3c6842e120 100644 --- a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/ShardingStatement.java +++ b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/ShardingStatement.java @@ -24,9 +24,8 @@ import com.dangdang.ddframe.rdb.sharding.parser.result.merger.MergeContext; import com.dangdang.ddframe.rdb.sharding.router.SQLExecutionUnit; import com.dangdang.ddframe.rdb.sharding.router.SQLRouteResult; -import com.google.common.base.Charsets; -import com.google.common.hash.HashCode; -import com.google.common.hash.Hashing; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; import lombok.AccessLevel; import lombok.Getter; import lombok.Setter; @@ -37,9 +36,10 @@ import java.sql.Statement; import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; +import java.util.Deque; +import java.util.Iterator; +import java.util.LinkedList; import java.util.List; -import java.util.Map; /** * 支持分片的静态语句对象. @@ -61,8 +61,7 @@ public class ShardingStatement extends AbstractStatementAdapter { @Getter private final int resultSetHoldability; - @Getter(AccessLevel.PROTECTED) - private final Map cachedRoutedStatements = new HashMap<>(); + private final Deque> cachedRoutedStatements = Lists.newLinkedList(); @Getter(AccessLevel.PROTECTED) @Setter(AccessLevel.PROTECTED) @@ -71,11 +70,11 @@ public class ShardingStatement extends AbstractStatementAdapter { @Setter(AccessLevel.PROTECTED) private ResultSet currentResultSet; - public ShardingStatement(final ShardingConnection shardingConnection) { + ShardingStatement(final ShardingConnection shardingConnection) { this(shardingConnection, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT); } - public ShardingStatement(final ShardingConnection shardingConnection, final int resultSetType, final int resultSetConcurrency) { + ShardingStatement(final ShardingConnection shardingConnection, final int resultSetType, final int resultSetConcurrency) { this(shardingConnection, resultSetType, resultSetConcurrency, ResultSet.HOLD_CURSORS_OVER_COMMIT); } @@ -85,6 +84,8 @@ public ShardingStatement(final ShardingConnection shardingConnection, final int this.resultSetType = resultSetType; this.resultSetConcurrency = resultSetConcurrency; this.resultSetHoldability = resultSetHoldability; + cachedRoutedStatements.add(new LinkedList()); + cachedRoutedStatements.add(new LinkedList()); } @Override @@ -94,51 +95,94 @@ public Connection getConnection() throws SQLException { @Override public ResultSet executeQuery(final String sql) throws SQLException { - if (null != currentResultSet && !currentResultSet.isClosed()) { - currentResultSet.close(); + ResultSet rs; + try { + rs = ResultSetFactory.getResultSet(generateExecutor(sql).executeQuery(), mergeContext); + } finally { + clearRouteContext(); } - currentResultSet = ResultSetFactory.getResultSet(generateExecutor(sql).executeQuery(), mergeContext); - return currentResultSet; + setCurrentResultSet(rs); + return rs; } @Override public int executeUpdate(final String sql) throws SQLException { - return generateExecutor(sql).executeUpdate(); + try { + return generateExecutor(sql).executeUpdate(); + } finally { + clearRouteContext(); + } } @Override public int executeUpdate(final String sql, final int autoGeneratedKeys) throws SQLException { - return generateExecutor(sql).executeUpdate(autoGeneratedKeys); + try { + return generateExecutor(sql).executeUpdate(autoGeneratedKeys); + } finally { + clearRouteContext(); + } } @Override public int executeUpdate(final String sql, final int[] columnIndexes) throws SQLException { - return generateExecutor(sql).executeUpdate(columnIndexes); + try { + return generateExecutor(sql).executeUpdate(columnIndexes); + } finally { + clearRouteContext(); + } } @Override public int executeUpdate(final String sql, final String[] columnNames) throws SQLException { - return generateExecutor(sql).executeUpdate(columnNames); + try { + return generateExecutor(sql).executeUpdate(columnNames); + } finally { + clearRouteContext(); + } } @Override public boolean execute(final String sql) throws SQLException { - return generateExecutor(sql).execute(); + try { + return generateExecutor(sql).execute(); + } finally { + clearRouteContext(); + } } @Override public boolean execute(final String sql, final int autoGeneratedKeys) throws SQLException { - return generateExecutor(sql).execute(autoGeneratedKeys); + try { + return generateExecutor(sql).execute(autoGeneratedKeys); + } finally { + clearRouteContext(); + } } @Override public boolean execute(final String sql, final int[] columnIndexes) throws SQLException { - return generateExecutor(sql).execute(columnIndexes); + try { + return generateExecutor(sql).execute(columnIndexes); + } finally { + clearRouteContext(); + } } @Override public boolean execute(final String sql, final String[] columnNames) throws SQLException { - return generateExecutor(sql).execute(columnNames); + try { + return generateExecutor(sql).execute(columnNames); + } finally { + clearRouteContext(); + } + } + + protected void clearRouteContext() throws SQLException { + setCurrentResultSet(null); + List firstList = cachedRoutedStatements.pollFirst(); + cachedRoutedStatements.getFirst().addAll(firstList); + firstList.clear(); + cachedRoutedStatements.addLast(firstList); } private StatementExecutor generateExecutor(final String sql) throws SQLException { @@ -154,12 +198,18 @@ private StatementExecutor generateExecutor(final String sql) throws SQLException } protected Statement getStatement(final Connection connection, final String sql) throws SQLException { - HashCode hashCode = Hashing.md5().newHasher().putInt(connection.hashCode()).putString(sql, Charsets.UTF_8).hash(); - if (cachedRoutedStatements.containsKey(hashCode)) { - return cachedRoutedStatements.get(hashCode); + Statement statement = null; + for (Iterator iterator = cachedRoutedStatements.getFirst().iterator(); iterator.hasNext();) { + Statement each = iterator.next(); + if (each.getConnection() == connection) { + statement = each; + iterator.remove(); + } } - Statement statement = generateStatement(connection, sql); - cachedRoutedStatements.put(hashCode, statement); + if (null == statement) { + statement = generateStatement(connection, sql); + } + cachedRoutedStatements.getLast().add(statement); return statement; } @@ -191,7 +241,13 @@ public ResultSet getResultSet() throws SQLException { } @Override - public Collection getRoutedStatements() throws SQLException { - return cachedRoutedStatements.values(); + protected void clearRouteStatements() { + cachedRoutedStatements.getFirst().clear(); + cachedRoutedStatements.getLast().clear(); + } + + @Override + public Collection getRoutedStatements() { + return Lists.newArrayList(Iterators.concat(cachedRoutedStatements.getFirst().iterator(), cachedRoutedStatements.getLast().iterator())); } } diff --git a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/adapter/AbstractStatementAdapter.java b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/adapter/AbstractStatementAdapter.java index 7de8c683b0a05..7b65bd8d6931c 100644 --- a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/adapter/AbstractStatementAdapter.java +++ b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/adapter/AbstractStatementAdapter.java @@ -42,13 +42,15 @@ public abstract class AbstractStatementAdapter extends AbstractUnsupportedOperat private int fetchSize; + protected abstract void clearRouteStatements(); + @Override public final void close() throws SQLException { for (Statement each : getRoutedStatements()) { each.close(); } closed = true; - getRoutedStatements().clear(); + clearRouteStatements(); } @Override @@ -218,7 +220,6 @@ public final ResultSet getGeneratedKeys() throws SQLException { * 获取路由的静态语句对象集合. * * @return 路由的静态语句对象集合 - * @throws SQLException SQL执行异常 */ - protected abstract Collection getRoutedStatements() throws SQLException; + protected abstract Collection getRoutedStatements(); } diff --git a/sharding-jdbc-core/src/test/java/com/dangdang/ddframe/rdb/sharding/jdbc/AllJDBCTests.java b/sharding-jdbc-core/src/test/java/com/dangdang/ddframe/rdb/sharding/jdbc/AllJDBCTests.java index dada010c4dcce..2cb74da215d6c 100644 --- a/sharding-jdbc-core/src/test/java/com/dangdang/ddframe/rdb/sharding/jdbc/AllJDBCTests.java +++ b/sharding-jdbc-core/src/test/java/com/dangdang/ddframe/rdb/sharding/jdbc/AllJDBCTests.java @@ -56,7 +56,8 @@ MasterSlaveDataSourceTest.class, ParameterListTest.class, AbstractPreparedStatementAdapterTest.class, - ShardingConnectionTest.class, + ShardingConnectionTest.class, + ShardingPreparedStatementTableOnlyTest.class, }) public class AllJDBCTests { } diff --git a/sharding-jdbc-core/src/test/java/com/dangdang/ddframe/rdb/sharding/jdbc/ShardingPreparedStatementTableOnlyTest.java b/sharding-jdbc-core/src/test/java/com/dangdang/ddframe/rdb/sharding/jdbc/ShardingPreparedStatementTableOnlyTest.java new file mode 100644 index 0000000000000..352be8c952103 --- /dev/null +++ b/sharding-jdbc-core/src/test/java/com/dangdang/ddframe/rdb/sharding/jdbc/ShardingPreparedStatementTableOnlyTest.java @@ -0,0 +1,56 @@ +/* + * Copyright 1999-2015 dangdang.com. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *

+ */ + +package com.dangdang.ddframe.rdb.sharding.jdbc; + +import com.dangdang.ddframe.rdb.integrate.tbl.AbstractShardingTablesOnlyDBUnitTest; +import org.junit.Before; +import org.junit.Test; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +public final class ShardingPreparedStatementTableOnlyTest extends AbstractShardingTablesOnlyDBUnitTest { + + private ShardingDataSource shardingDataSource; + + @Before + public void init() throws SQLException { + shardingDataSource = getShardingDataSource(); + } + + @Test + public void assertExecuteQueryWithParameter() throws SQLException { + String sql = "SELECT COUNT(*) AS `orders_count` FROM `t_order` WHERE `status` = ?"; + try ( + Connection connection = shardingDataSource.getConnection(); + PreparedStatement preparedStatement = connection.prepareStatement(sql)) { + preparedStatement.setString(1, "init"); + ResultSet resultSet = preparedStatement.executeQuery(); + assertTrue(resultSet.next()); + assertThat(resultSet.getLong(1), is(20L)); + ShardingPreparedStatement sps = (ShardingPreparedStatement) preparedStatement; + assertThat(sps.getRoutedStatements().size(), is(10)); + } + } +} diff --git a/sharding-jdbc-core/src/test/java/com/dangdang/ddframe/rdb/sharding/jdbc/adapter/StatementAdapterTest.java b/sharding-jdbc-core/src/test/java/com/dangdang/ddframe/rdb/sharding/jdbc/adapter/StatementAdapterTest.java index b2cfcd7fd6cb9..3151ccd7afe9d 100644 --- a/sharding-jdbc-core/src/test/java/com/dangdang/ddframe/rdb/sharding/jdbc/adapter/StatementAdapterTest.java +++ b/sharding-jdbc-core/src/test/java/com/dangdang/ddframe/rdb/sharding/jdbc/adapter/StatementAdapterTest.java @@ -154,7 +154,12 @@ public void assertOverMaxUpdateRow() throws SQLException { AbstractStatementAdapter statement = new AbstractStatementAdapter(Statement.class) { @Override - protected Collection getRoutedStatements() throws SQLException { + protected void clearRouteStatements() { + + } + + @Override + protected Collection getRoutedStatements() { return Lists.newArrayList(st1, st2); } diff --git a/sharding-jdbc-doc/content/post/release_notes.md b/sharding-jdbc-doc/content/post/release_notes.md index d6c74b4fd75c2..0fd599fcfa0b3 100644 --- a/sharding-jdbc-doc/content/post/release_notes.md +++ b/sharding-jdbc-doc/content/post/release_notes.md @@ -20,6 +20,7 @@ weight = 1 1. [ISSUE #149](https://github.com/dangdangdotcom/sharding-jdbc/issues/149) INSERT IGNORE INTO时如果数据重了忽略时返回的成-1了,应该返回0 1. [ISSUE #118](https://github.com/dangdangdotcom/sharding-jdbc/issues/118) 同一个线程内先执行DQL后执行DML,DML操作在从库上执行 1. [ISSUE #122](https://github.com/dangdangdotcom/sharding-jdbc/issues/122) bed的fail重试问题 +1. [ISSUE #152](https://github.com/dangdangdotcom/sharding-jdbc/issues/152) 可能同一个connection多线程导致问题 ## 1.3.2