Skip to content

Commit

Permalink
Merge branch 'dal-cluster' into 'master'
Browse files Browse the repository at this point in the history
codegen 2.1.6

See merge request framework/dal-client-opensource!1
  • Loading branch information
cc陈呈(IT) committed Jul 9, 2020
2 parents ac5eaab + e3129c1 commit 1bf106b
Show file tree
Hide file tree
Showing 144 changed files with 3,856 additions and 1,721 deletions.
2 changes: 1 addition & 1 deletion dal-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>com.ctrip.platform</groupId>
<artifactId>dal-client-parent</artifactId>
<version>2.0.19</version>
<version>2.1.6</version>
</parent>
<artifactId>dal-client</artifactId>
<packaging>jar</packaging>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ public void execute() throws Exception {
config.validate();

LogEntry.init();
DalRequestExecutor.init(config.getFactory().getProperty(DalRequestExecutor.MAX_POOL_SIZE),
config.getFactory().getProperty(DalRequestExecutor.KEEP_ALIVE_TIME));
DalRequestExecutor.init(config);

DalStatusManager.initialize(config);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.Set;

import com.ctrip.framework.dal.cluster.client.Cluster;
import com.ctrip.framework.dal.cluster.client.config.LocalizationConfig;
import com.ctrip.platform.dal.common.enums.ShardingCategory;
import com.ctrip.platform.dal.common.enums.TableParseSwitch;
import com.ctrip.platform.dal.dao.DalClientFactory;
Expand All @@ -19,12 +20,18 @@
import com.ctrip.platform.dal.dao.configure.DatabaseSet;
import com.ctrip.platform.dal.dao.configure.dalproperties.DalPropertiesLocator;
import com.ctrip.platform.dal.dao.configure.dalproperties.DalPropertiesManager;
import com.ctrip.platform.dal.dao.datasource.LocalizationValidatable;
import com.ctrip.platform.dal.dao.datasource.ValidationResult;
import com.ctrip.platform.dal.dao.helper.DalElementFactory;
import com.ctrip.platform.dal.dao.helper.EnvUtils;
import com.ctrip.platform.dal.dao.task.DalContextConfigure;
import com.ctrip.platform.dal.dao.task.DalTaskContext;
import com.ctrip.platform.dal.exceptions.DalException;
import com.ctrip.platform.dal.exceptions.TransactionSystemException;

public abstract class ConnectionAction<T> {
private static final EnvUtils envUtils = DalElementFactory.DEFAULT.getEnvUtils();

public DalEventEnum operation;
public String sql;
public String callString;
Expand Down Expand Up @@ -155,6 +162,28 @@ public void populateDbMeta() {
entry.setMaster(connHolder.isMaster());
entry.setShardId(connHolder.getShardId());
}

recordLocalizationValidation();
}

private void recordLocalizationValidation() {
Statement stmt = statement != null ? statement : preparedStatement;
stmt = stmt != null ? stmt : callableStatement;
try {
if (stmt.isWrapperFor(LocalizationValidatable.class)) {
LocalizationValidatable validatable = stmt.unwrap(LocalizationValidatable.class);
LocalizationValidatable.ValidationStatus validationStatus = validatable.getLastValidationStatus();
ValidationResult validationResult = validatable.getLastValidationResult();
if ((validationStatus == LocalizationValidatable.ValidationStatus.OK ||
validationStatus == LocalizationValidatable.ValidationStatus.FAILED) &&
validationResult != null) {
entry.setUcsValidation(validationResult.getUcsValidationMessage());
entry.setDalValidation(validationResult.getDalValidationMessage());
}
}
} catch (Throwable t) {
// ignore
}
}

public void initLogEntry(String logicDbName, DalHints hints) {
Expand All @@ -163,10 +192,14 @@ public void initLogEntry(String logicDbName, DalHints hints) {
if (databaseSet instanceof ClusterDatabaseSet) {
Cluster cluster = ((ClusterDatabaseSet) databaseSet).getCluster();
entry.setClusterName(cluster.getClusterName().toLowerCase());
LocalizationConfig localizationConfig = cluster.getLocalizationConfig();
if (localizationConfig != null)
entry.setDbZone(localizationConfig.getZoneId());
}
entry.setLogicDbName(logicDbName);
entry.setDbCategory(DalClientFactory.getDalConfigure().getDatabaseSet(logicDbName).getDatabaseCategory());
entry.setClientVersion(Version.getVersion());
entry.setClientZone(envUtils.getZone());
entry.setSensitive(hints.is(DalHintEnum.sensitive));
entry.setEvent(operation);
entry.setShardingCategory(shardingCategory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ private DalConnection getConnectionFromDSLocator(DalHints hints,
DataSourceIdentity id = new ClusterDataSourceIdentity(db);
conn = locator.getConnection(id);
meta = DbMeta.createIfAbsent(id, dbSet.getDatabaseCategory(), conn);
if (shardId == null)
shardId = String.valueOf(db.getShardIndex());
}
else if (selectedDataBase instanceof ProviderDataBase) {
ConnectionStringConfigureProvider provider = ((ProviderDataBase) selectedDataBase).getConnectionStringProvider();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ public class LogEntry implements ILogEntry{
private TableParser tableParser = new DefaultTableParser();
private static final String JSON_PATTERN = "{'Decode':'%s','Connect':'%s','Prepare':'%s','Excute':'%s','ClearUp':'%s'}";

private String clientZone;
private String dbZone;
private String ucsValidation;
private String dalValidation;

/**
* Internal performance recorder for performance cost in each stage.
* As each low level DB operation will be logged once at ConnectionAction level, this recorder will
Expand Down Expand Up @@ -453,4 +458,37 @@ public ShardingCategory getShardingCategory() {
public void setShardingCategory(ShardingCategory shardingCategory) {
this.shardingCategory = shardingCategory;
}

public String getClientZone() {
return clientZone;
}

public void setClientZone(String clientZone) {
this.clientZone = clientZone;
}

public String getDbZone() {
return dbZone;
}

public void setDbZone(String dbZone) {
this.dbZone = dbZone;
}

public String getUcsValidation() {
return ucsValidation;
}

public void setUcsValidation(String ucsValidation) {
this.ucsValidation = ucsValidation;
}

public String getDalValidation() {
return dalValidation;
}

public void setDalValidation(String dalValidation) {
this.dalValidation = dalValidation;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.ctrip.platform.dal.dao.cluster;

import com.ctrip.framework.dal.cluster.client.Cluster;

/**
* @author c7ch23en
*/
public interface ClusterManager {

Cluster getOrCreateCluster(String clusterName);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.ctrip.platform.dal.dao.cluster;

import com.ctrip.framework.dal.cluster.client.Cluster;
import com.ctrip.framework.dal.cluster.client.config.ClusterConfig;
import com.ctrip.framework.dal.cluster.client.util.StringUtils;
import com.ctrip.platform.dal.dao.configure.ClusterConfigProvider;
import com.ctrip.platform.dal.exceptions.DalRuntimeException;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* @author c7ch23en
*/
public class ClusterManagerImpl implements ClusterManager {

private final ClusterConfigProvider configProvider;
private static final Map<String, Cluster> clusters = new ConcurrentHashMap<>();

public ClusterManagerImpl(ClusterConfigProvider configProvider) {
this.configProvider = configProvider;
}

@Override
public Cluster getOrCreateCluster(String clusterName) {
if (StringUtils.isEmpty(clusterName))
throw new DalRuntimeException("cluster name is empty");
clusterName = StringUtils.toTrimmedLowerCase(clusterName);
Cluster cluster = clusters.get(clusterName);
if (cluster == null)
synchronized (clusters) {
cluster = clusters.get(clusterName);
if (cluster == null) {
cluster = createCluster(clusterName);
clusters.put(clusterName, cluster);
}
}
return cluster;
}

private Cluster createCluster(String clusterName) {
ClusterConfig config = configProvider.getClusterConfig(clusterName);
return new DynamicCluster(config);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.ctrip.framework.dal.cluster.client.cluster.ClusterType;
import com.ctrip.framework.dal.cluster.client.cluster.DrcCluster;
import com.ctrip.framework.dal.cluster.client.config.ClusterConfig;
import com.ctrip.framework.dal.cluster.client.config.LocalizationConfig;
import com.ctrip.framework.dal.cluster.client.database.Database;
import com.ctrip.framework.dal.cluster.client.database.DatabaseCategory;
import com.ctrip.framework.dal.cluster.client.exception.ClusterRuntimeException;
Expand All @@ -29,9 +30,9 @@ public class DynamicCluster extends ListenableSupport<ClusterSwitchedEvent> impl

private static final ILogger LOGGER = DalElementFactory.DEFAULT.getILogger();
private static final String CAT_LOG_TYPE = "DAL.configure";
private static final String CAT_LOG_NAME_FORMAT = "SwitchCluster:%s";
private static final String CAT_EVENT_NAME_NORMAL_TO_DRC = "NormalToDrc:%s";
private static final String CAT_EVENT_NAME_DRC_TO_NORMAL = "DrcToNormal:%s";
private static final String CAT_LOG_NAME_FORMAT = "Cluster::switchCluster:%s";
private static final String CAT_EVENT_NAME_NORMAL_TO_DRC = "Cluster::normalToDrc:%s";
private static final String CAT_EVENT_NAME_DRC_TO_NORMAL = "Cluster::drcToNormal:%s";

private ClusterConfig clusterConfig;
private AtomicReference<Cluster> innerCluster = new AtomicReference<>();
Expand Down Expand Up @@ -112,6 +113,11 @@ public ClusterIdGeneratorConfig getIdGeneratorConfig() {
return getInnerCluster().getIdGeneratorConfig();
}

@Override
public LocalizationConfig getLocalizationConfig() {
return getInnerCluster().getLocalizationConfig();
}

private void registerListener() {
clusterConfig.addListener(new Listener<ClusterConfig>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,9 @@ public String getInitSQL(){
public String getJdbcInterceptors(){
return null;
}

public Integer getSessionWaitTimeout() {
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ public class ClusterDatabaseSet extends DatabaseSet {
private ClusterIdGeneratorConfigAdapter idGeneratorConfig;

public ClusterDatabaseSet(String name, Cluster cluster, DalConnectionLocator locator) {
this(name, cluster, locator, null);
}

public ClusterDatabaseSet(String name, Cluster cluster, DalConnectionLocator locator, Map<String, String> properties) {
super(properties);
this.databaseSetName = name;
this.cluster = cluster;
this.shardStrategy = new ClusterShardStrategyAdapter(cluster);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@
import com.ctrip.framework.dal.cluster.client.Cluster;
import com.ctrip.framework.dal.cluster.client.config.ClusterConfig;
import com.ctrip.framework.dal.cluster.client.database.DatabaseRole;
import com.ctrip.framework.dal.cluster.client.util.StringUtils;
import com.ctrip.platform.dal.common.enums.DatabaseCategory;
import com.ctrip.platform.dal.dao.client.DalConnectionLocator;
import com.ctrip.platform.dal.dao.cluster.ClusterManager;
import com.ctrip.platform.dal.dao.cluster.ClusterManagerImpl;
import com.ctrip.platform.dal.dao.cluster.DynamicCluster;
import com.ctrip.platform.dal.dao.helper.DalElementFactory;
import com.ctrip.platform.dal.dao.helper.EnvUtils;
import com.ctrip.platform.dal.dao.log.DalLogTypes;
import com.ctrip.platform.dal.dao.log.ILogger;

Expand All @@ -19,15 +23,16 @@
public class ClusterDatabaseSetAdapter implements DatabaseSetAdapter {

private static final ILogger LOGGER = DalElementFactory.DEFAULT.getILogger();
private static final EnvUtils envUtils = DalElementFactory.DEFAULT.getEnvUtils();

private ClusterInfoProvider clusterInfoProvider;
private ClusterConfigProvider clusterConfigProvider;
private ClusterManager clusterManager;
private DalConnectionLocator connectionLocator;

public ClusterDatabaseSetAdapter(DalConnectionLocator connectionLocator) {
this.connectionLocator = connectionLocator;
this.clusterInfoProvider = connectionLocator.getIntegratedConfigProvider();
this.clusterConfigProvider = connectionLocator.getIntegratedConfigProvider();
this.clusterManager = new ClusterManagerImpl(connectionLocator.getIntegratedConfigProvider());
}

@Override
Expand All @@ -44,6 +49,7 @@ public DatabaseSet adapt(DatabaseSet original) {

private boolean adaptable(DefaultDatabaseSet defaultDatabaseSet) {
/*
* 0. no subEnv, not aws
* 1. mysql
* 2. no shard strategy
* 4. no idgen config
Expand All @@ -67,18 +73,17 @@ private boolean adaptable(DefaultDatabaseSet defaultDatabaseSet) {

private ClusterDatabaseSet tryAdapt(DefaultDatabaseSet defaultDatabaseSet) {
try {
List<DataBase> masters = defaultDatabaseSet.getMasterDbs();
if (masters != null && masters.size() == 1) {
String databaseKey = masters.iterator().next().getConnectionString();
DataBase master = defaultDatabaseSet.getMasterDbs().iterator().next();
if ((master instanceof DefaultDataBase) && !(master instanceof ProviderDataBase)) {
String databaseKey = master.getConnectionString();
Map<String, DalConnectionString> failedConnectionStrings = DataSourceConfigureLocatorManager.
getInstance().getFailedConnectionStrings();
if (failedConnectionStrings == null || !failedConnectionStrings.containsKey(databaseKey)) {
ClusterInfo clusterInfo = clusterInfoProvider.getClusterInfo(databaseKey);
if (clusterInfo != null && clusterInfo.getRole() == DatabaseRole.MASTER &&
!clusterInfo.dbSharding()) {
String clusterName = clusterInfo.getClusterName();
ClusterConfig clusterConfig = clusterConfigProvider.getClusterConfig(clusterName);
Cluster cluster = new DynamicCluster(clusterConfig);
Cluster cluster = clusterManager.getOrCreateCluster(clusterName);
LOGGER.logEvent(DalLogTypes.DAL_VALIDATION, "ClusterAdaptSucceeded",
String.format("databaseSet: %s, clusterName: %s",
defaultDatabaseSet.getName(), clusterName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.ctrip.framework.dal.cluster.client.database.DatabaseRole;
import com.ctrip.platform.dal.dao.datasource.DataSourceIdentity;
import com.ctrip.platform.dal.dao.datasource.IClusterDataSourceIdentity;

public class ClusterInfo {

Expand Down Expand Up @@ -42,27 +43,44 @@ public void setClusterName(String clusterName) {
}

public DataSourceIdentity toDataSourceIdentity() {
return new SimpleClusterDataSourceIdentity(toString());
return new SimpleClusterDataSourceIdentity(this);
}

@Override
public String toString() {
return String.format(ID_FORMAT, clusterName, shardIndex, role != null ? role.getValue() : null);
}

static class SimpleClusterDataSourceIdentity implements DataSourceIdentity {
static class SimpleClusterDataSourceIdentity implements DataSourceIdentity, IClusterDataSourceIdentity {

private ClusterInfo clusterInfo;
private String id;

public SimpleClusterDataSourceIdentity(String id) {
this.id = id;
public SimpleClusterDataSourceIdentity(ClusterInfo clusterInfo) {
this.clusterInfo = clusterInfo;
this.id = clusterInfo.toString();
}

@Override
public String getId() {
return id;
}

@Override
public String getClusterName() {
return clusterInfo.getClusterName();
}

@Override
public Integer getShardIndex() {
return clusterInfo.getShardIndex();
}

@Override
public DatabaseRole getDatabaseRole() {
return clusterInfo.getRole();
}

@Override
public boolean equals(Object obj) {
if (obj instanceof SimpleClusterDataSourceIdentity) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ public DalConnectionStringConfigure parse(String name, String connectionString)
String keyName = ConnectionStringKeyHelper.getKeyName(name);
config.setName(keyName);
config.setConnectionUrl(url);
config.setUserName(userName);
config.setPassword(password);
config.setUserName(userName != null ? userName : "");
config.setPassword(password != null ? password : "");
config.setDriverClass(driverClass);
config.setVersion(version);
config.setHostName(dbhost);
Expand Down
Loading

0 comments on commit 1bf106b

Please sign in to comment.