Skip to content

Commit

Permalink
add MasterSlavePreparedStatement
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Aug 30, 2017
1 parent 1c68d93 commit 5b1caf5
Show file tree
Hide file tree
Showing 4 changed files with 206 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.dangdang.ddframe.rdb.sharding.hint.HintManagerHolder;
import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractConnectionAdapter;
import com.dangdang.ddframe.rdb.sharding.jdbc.core.datasource.MasterSlaveDataSource;
import com.dangdang.ddframe.rdb.sharding.jdbc.core.statement.MasterSlavePreparedStatement;
import com.dangdang.ddframe.rdb.sharding.jdbc.core.statement.MasterSlaveStatement;
import com.dangdang.ddframe.rdb.sharding.parsing.SQLJudgeEngine;
import com.dangdang.ddframe.rdb.sharding.parsing.parser.sql.SQLStatement;
Expand Down Expand Up @@ -77,72 +78,48 @@ public DatabaseMetaData getMetaData() throws SQLException {
}

@Override
public PreparedStatement prepareStatement(final String sql) throws SQLException {
Collection<Connection> connections = getConnection(sql);
if (1 != connections.size()) {
throw new UnsupportedOperationException("Cannot support DDL for prepare statement and master-slave only.");
}
return connections.iterator().next().prepareStatement(sql);
public Statement createStatement() throws SQLException {
return new MasterSlaveStatement(this);
}

@Override
public PreparedStatement prepareStatement(final String sql, final int resultSetType, final int resultSetConcurrency) throws SQLException {
Collection<Connection> connections = getConnection(sql);
if (1 != connections.size()) {
throw new UnsupportedOperationException("Cannot support DDL for prepare statement and master-slave only.");
}
return connections.iterator().next().prepareStatement(sql, resultSetType, resultSetConcurrency);
public Statement createStatement(final int resultSetType, final int resultSetConcurrency) throws SQLException {
return new MasterSlaveStatement(this, resultSetType, resultSetConcurrency);
}

@Override
public PreparedStatement prepareStatement(final String sql, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) throws SQLException {
Collection<Connection> connections = getConnection(sql);
if (1 != connections.size()) {
throw new UnsupportedOperationException("Cannot support DDL for prepare statement and master-slave only.");
}
return connections.iterator().next().prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
public Statement createStatement(final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) throws SQLException {
return new MasterSlaveStatement(this, resultSetType, resultSetConcurrency, resultSetHoldability);
}

@Override
public PreparedStatement prepareStatement(final String sql, final int autoGeneratedKeys) throws SQLException {
Collection<Connection> connections = getConnection(sql);
if (1 != connections.size()) {
throw new UnsupportedOperationException("Cannot support DDL for prepare statement and master-slave only.");
}
return connections.iterator().next().prepareStatement(sql, autoGeneratedKeys);
public PreparedStatement prepareStatement(final String sql) throws SQLException {
return new MasterSlavePreparedStatement(this, sql);
}

@Override
public PreparedStatement prepareStatement(final String sql, final int[] columnIndexes) throws SQLException {
Collection<Connection> connections = getConnection(sql);
if (1 != connections.size()) {
throw new UnsupportedOperationException("Cannot support DDL for prepare statement and master-slave only.");
}
return connections.iterator().next().prepareStatement(sql, columnIndexes);
public PreparedStatement prepareStatement(final String sql, final int resultSetType, final int resultSetConcurrency) throws SQLException {
return new MasterSlavePreparedStatement(this, sql, resultSetType, resultSetConcurrency);
}

@Override
public PreparedStatement prepareStatement(final String sql, final String[] columnNames) throws SQLException {
Collection<Connection> connections = getConnection(sql);
if (1 != connections.size()) {
throw new UnsupportedOperationException("Cannot support DDL for prepare statement and master-slave only.");
}
return connections.iterator().next().prepareStatement(sql, columnNames);
public PreparedStatement prepareStatement(final String sql, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) throws SQLException {
return new MasterSlavePreparedStatement(this, sql, resultSetType, resultSetConcurrency, resultSetHoldability);
}

@Override
public Statement createStatement() throws SQLException {
return new MasterSlaveStatement(this);
public PreparedStatement prepareStatement(final String sql, final int autoGeneratedKeys) throws SQLException {
return new MasterSlavePreparedStatement(this, sql, autoGeneratedKeys);
}

@Override
public Statement createStatement(final int resultSetType, final int resultSetConcurrency) throws SQLException {
return new MasterSlaveStatement(this, resultSetType, resultSetConcurrency);
public PreparedStatement prepareStatement(final String sql, final int[] columnIndexes) throws SQLException {
return new MasterSlavePreparedStatement(this, sql, columnIndexes);
}

@Override
public Statement createStatement(final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) throws SQLException {
return new MasterSlaveStatement(this, resultSetType, resultSetConcurrency, resultSetHoldability);
public PreparedStatement prepareStatement(final String sql, final String[] columnNames) throws SQLException {
return new MasterSlavePreparedStatement(this, sql, columnNames);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/

package com.dangdang.ddframe.rdb.sharding.jdbc.core.statement;

import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractPreparedStatementAdapter;
import com.dangdang.ddframe.rdb.sharding.jdbc.core.connection.MasterSlaveConnection;
import com.google.common.base.Preconditions;
import lombok.Getter;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collection;
import java.util.LinkedList;

/**
* PreparedStatement that support master-slave.
*
* @author zhangliang
*/
@Getter
public final class MasterSlavePreparedStatement extends AbstractPreparedStatementAdapter {

private final MasterSlaveConnection connection;

private final Collection<PreparedStatement> routedStatements = new LinkedList<>();

public MasterSlavePreparedStatement(final MasterSlaveConnection connection, final String sql) throws SQLException {
this(connection, sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT);
}

public MasterSlavePreparedStatement(final MasterSlaveConnection connection, final String sql, final int resultSetType, final int resultSetConcurrency) throws SQLException {
this(connection, sql, resultSetType, resultSetConcurrency, ResultSet.HOLD_CURSORS_OVER_COMMIT);
}

public MasterSlavePreparedStatement(
final MasterSlaveConnection connection, final String sql, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) throws SQLException {
this.connection = connection;
for (Connection each : connection.getConnection(sql)) {
PreparedStatement preparedStatement = each.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
routedStatements.add(preparedStatement);
}
}

public MasterSlavePreparedStatement(final MasterSlaveConnection connection, final String sql, final int autoGeneratedKeys) throws SQLException {
this.connection = connection;
for (Connection each : connection.getConnection(sql)) {
PreparedStatement preparedStatement = each.prepareStatement(sql, autoGeneratedKeys);
routedStatements.add(preparedStatement);
}
}

public MasterSlavePreparedStatement(final MasterSlaveConnection connection, final String sql, final int[] columnIndexes) throws SQLException {
this.connection = connection;
for (Connection each : connection.getConnection(sql)) {
PreparedStatement preparedStatement = each.prepareStatement(sql, columnIndexes);
routedStatements.add(preparedStatement);
}
}

public MasterSlavePreparedStatement(final MasterSlaveConnection connection, final String sql, final String[] columnNames) throws SQLException {
this.connection = connection;
for (Connection each : connection.getConnection(sql)) {
PreparedStatement preparedStatement = each.prepareStatement(sql, columnNames);
routedStatements.add(preparedStatement);
}
}

@Override
public ResultSet executeQuery() throws SQLException {
Preconditions.checkState(1 == routedStatements.size());
return routedStatements.iterator().next().executeQuery();
}

@Override
public int executeUpdate() throws SQLException {
Preconditions.checkState(1 == routedStatements.size());
return routedStatements.iterator().next().executeUpdate();
}

@Override
public boolean execute() throws SQLException {
boolean result = false;
for (PreparedStatement each : routedStatements) {
result = each.execute();
}
return result;
}

@Override
public void clearBatch() throws SQLException {
for (PreparedStatement each : routedStatements) {
each.clearBatch();
}
}

@Override
public void addBatch() throws SQLException {
for (PreparedStatement each : routedStatements) {
each.addBatch();
}
}

@Override
public int[] executeBatch() throws SQLException {
Preconditions.checkState(1 == routedStatements.size());
return routedStatements.iterator().next().executeBatch();
}

@Override
public ResultSet getResultSet() throws SQLException {
Preconditions.checkState(1 == routedStatements.size());
return routedStatements.iterator().next().getResultSet();
}

@Override
public ResultSet getGeneratedKeys() throws SQLException {
Preconditions.checkState(1 == routedStatements.size());
return routedStatements.iterator().next().getGeneratedKeys();
}

@Override
public int getResultSetHoldability() throws SQLException {
return routedStatements.iterator().next().getResultSetHoldability();
}

@Override
public int getResultSetConcurrency() throws SQLException {
return routedStatements.iterator().next().getResultSetConcurrency();
}

@Override
public int getResultSetType() throws SQLException {
return routedStatements.iterator().next().getResultSetType();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,14 @@
import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractStatementAdapter;
import com.dangdang.ddframe.rdb.sharding.jdbc.core.connection.MasterSlaveConnection;
import com.google.common.base.Preconditions;
import lombok.AccessLevel;
import lombok.Getter;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;

/**
* Statement that support master-slave.
Expand All @@ -46,8 +45,7 @@ public final class MasterSlaveStatement extends AbstractStatementAdapter {

private final int resultSetHoldability;

@Getter(AccessLevel.NONE)
private Statement routedStatement;
private final Collection<Statement> routedStatements = new LinkedList<>();

public MasterSlaveStatement(final MasterSlaveConnection connection) {
this(connection, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT);
Expand All @@ -67,50 +65,44 @@ public MasterSlaveStatement(final MasterSlaveConnection connection, final int re

@Override
public ResultSet executeQuery(final String sql) throws SQLException {
Collection<Connection> connections = connection.getConnection(sql);
Preconditions.checkState(1 == connections.size());
routedStatement = connections.iterator().next().createStatement();
return routedStatement.executeQuery(sql);
return getSingleStatement(sql).executeQuery(sql);
}

@Override
public int executeUpdate(final String sql) throws SQLException {
Collection<Connection> connections = connection.getConnection(sql);
Preconditions.checkState(1 == connections.size());
routedStatement = connections.iterator().next().createStatement();
return routedStatement.executeUpdate(sql);
return getSingleStatement(sql).executeUpdate(sql);
}

@Override
public int executeUpdate(final String sql, final int autoGeneratedKeys) throws SQLException {
Collection<Connection> connections = connection.getConnection(sql);
Preconditions.checkState(1 == connections.size());
routedStatement = connections.iterator().next().createStatement();
return routedStatement.executeUpdate(sql, autoGeneratedKeys);
return getSingleStatement(sql).executeUpdate(sql, autoGeneratedKeys);
}

@Override
public int executeUpdate(final String sql, final int[] columnIndexes) throws SQLException {
Collection<Connection> connections = connection.getConnection(sql);
Preconditions.checkState(1 == connections.size());
routedStatement = connections.iterator().next().createStatement();
return routedStatement.executeUpdate(sql, columnIndexes);
return getSingleStatement(sql).executeUpdate(sql, columnIndexes);
}

@Override
public int executeUpdate(final String sql, final String[] columnNames) throws SQLException {
return getSingleStatement(sql).executeUpdate(sql, columnNames);
}

private Statement getSingleStatement(final String sql) throws SQLException {
Collection<Connection> connections = connection.getConnection(sql);
Preconditions.checkState(1 == connections.size());
routedStatement = connections.iterator().next().createStatement();
return routedStatement.executeUpdate(sql, columnNames);
Statement result = connections.iterator().next().createStatement();
routedStatements.add(result);
return result;
}

@Override
public boolean execute(final String sql) throws SQLException {
boolean result = false;
for (Connection each : connection.getConnection(sql)) {
routedStatement = each.createStatement();
result = routedStatement.execute(sql);
Statement statement = each.createStatement();
routedStatements.add(statement);
result = statement.execute(sql);
}
return result;
}
Expand All @@ -119,8 +111,9 @@ public boolean execute(final String sql) throws SQLException {
public boolean execute(final String sql, final int autoGeneratedKeys) throws SQLException {
boolean result = false;
for (Connection each : connection.getConnection(sql)) {
routedStatement = each.createStatement();
result = routedStatement.execute(sql, autoGeneratedKeys);
Statement statement = each.createStatement();
routedStatements.add(statement);
result = statement.execute(sql, autoGeneratedKeys);
}
return result;
}
Expand All @@ -129,8 +122,9 @@ public boolean execute(final String sql, final int autoGeneratedKeys) throws SQL
public boolean execute(final String sql, final int[] columnIndexes) throws SQLException {
boolean result = false;
for (Connection each : connection.getConnection(sql)) {
routedStatement = each.createStatement();
result = routedStatement.execute(sql, columnIndexes);
Statement statement = each.createStatement();
routedStatements.add(statement);
result = statement.execute(sql, columnIndexes);
}
return result;
}
Expand All @@ -139,24 +133,22 @@ public boolean execute(final String sql, final int[] columnIndexes) throws SQLEx
public boolean execute(final String sql, final String[] columnNames) throws SQLException {
boolean result = false;
for (Connection each : connection.getConnection(sql)) {
routedStatement = each.createStatement();
result = routedStatement.execute(sql, columnNames);
Statement statement = each.createStatement();
routedStatements.add(statement);
result = statement.execute(sql, columnNames);
}
return result;
}

@Override
public ResultSet getGeneratedKeys() throws SQLException {
return routedStatement.getGeneratedKeys();
Preconditions.checkState(1 == routedStatements.size());
return routedStatements.iterator().next().getGeneratedKeys();
}

@Override
public ResultSet getResultSet() throws SQLException {
return routedStatement.getResultSet();
}

@Override
protected Collection<Statement> getRoutedStatements() {
return Collections.singletonList(routedStatement);
Preconditions.checkState(1 == routedStatements.size());
return routedStatements.iterator().next().getResultSet();
}
}
Loading

0 comments on commit 5b1caf5

Please sign in to comment.