Skip to content

Commit

Permalink
Merge pull request #1318 from johnrobbet/obreader_read_by_partition
Browse files Browse the repository at this point in the history
Obreader read by partition
  • Loading branch information
TrafalgarLuo authored Jun 16, 2022
2 parents 8c92ce2 + e740300 commit 894b57b
Show file tree
Hide file tree
Showing 11 changed files with 470 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.sql.Connection;
import java.util.List;

import com.alibaba.datax.plugin.reader.oceanbasev10reader.ext.ObReaderKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -52,6 +53,21 @@ public void preCheck() {

@Override
public List<Configuration> split(int adviceNumber) {
String splitPk = originalConfig.getString(Key.SPLIT_PK);
List<String> quotedColumns = originalConfig.getList(Key.COLUMN_LIST, String.class);
if (splitPk != null && splitPk.length() > 0 && quotedColumns != null) {
String escapeChar = ObReaderUtils.isOracleMode(originalConfig.getString(ObReaderKey.OB_COMPATIBILITY_MODE))
? "\"" : "`";
if (!splitPk.startsWith(escapeChar) && !splitPk.endsWith(escapeChar)) {
splitPk = escapeChar + splitPk + escapeChar;
}
for (String column : quotedColumns) {
if (column.equals(splitPk)) {
LOG.info("splitPk is an ob reserved keyword, set to {}", splitPk);
originalConfig.set(Key.SPLIT_PK, splitPk);
}
}
}
return this.readerJob.split(this.originalConfig, adviceNumber);
}

Expand Down Expand Up @@ -86,6 +102,7 @@ private void setDatabaseType(Configuration config) {
String obJdbcUrl = jdbcUrl.replace("jdbc:mysql:", "jdbc:oceanbase:");
Connection conn = DBUtil.getConnection(DataBaseType.OceanBase, obJdbcUrl, username, password);
String compatibleMode = ObReaderUtils.getCompatibleMode(conn);
config.set(ObReaderKey.OB_COMPATIBILITY_MODE, compatibleMode);
if (ObReaderUtils.isOracleMode(compatibleMode)) {
ObReaderUtils.compatibleMode = ObReaderUtils.OB_COMPATIBLE_MODE_ORACLE;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.alibaba.datax.plugin.reader.oceanbasev10reader.ext;

/**
* @author johnrobbet
*/
public class Constant {

public static String WEAK_READ_QUERY_SQL_TEMPLATE_WITHOUT_WHERE = "select /*+read_consistency(weak)*/ %s from %s ";

public static String WEAK_READ_QUERY_SQL_TEMPLATE = "select /*+read_consistency(weak)*/ %s from %s where (%s)";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.alibaba.datax.plugin.reader.oceanbasev10reader.ext;

/**
* @author johnrobbet
*/
public class ObReaderKey {

public final static String READ_BY_PARTITION = "readByPartition";

public final static String PARTITION_NAME = "partitionName";

public final static String PARTITION_TYPE = "partitionType";

public final static String OB_COMPATIBILITY_MODE = "obCompatibilityMode";

}
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package com.alibaba.datax.plugin.reader.oceanbasev10reader.ext;

import java.util.Arrays;
import java.util.List;

import com.alibaba.datax.common.constant.CommonConstant;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.reader.CommonRdbmsReader;
import com.alibaba.datax.plugin.rdbms.reader.Key;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import com.alibaba.datax.plugin.rdbms.writer.Constant;
import com.alibaba.datax.plugin.rdbms.reader.Constant;
import com.alibaba.datax.plugin.reader.oceanbasev10reader.OceanBaseReader;
import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.ObReaderUtils;
import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.PartitionSplitUtil;
import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -29,37 +30,62 @@ public void init(Configuration originalConfig) {
ObReaderUtils.escapeDatabaseKeywords(columns);
originalConfig.set(Key.COLUMN, columns);

List<JSONObject> conns = originalConfig.getList(com.alibaba.datax.plugin.rdbms.reader.Constant.CONN_MARK, JSONObject.class);
List<JSONObject> conns = originalConfig.getList(Constant.CONN_MARK, JSONObject.class);
for (int i = 0; i < conns.size(); i++) {
JSONObject conn = conns.get(i);
Configuration connConfig = Configuration.from(conn.toString());
List<String> tables = connConfig.getList(Key.TABLE, String.class);
ObReaderUtils.escapeDatabaseKeywords(tables);
originalConfig.set(String.format("%s[%d].%s", com.alibaba.datax.plugin.rdbms.reader.Constant.CONN_MARK, i, Key.TABLE), tables);

// tables will be null when querySql is configured
if (tables != null) {
ObReaderUtils.escapeDatabaseKeywords(tables);
originalConfig.set(String.format("%s[%d].%s", Constant.CONN_MARK, i, Key.TABLE),
tables);
}
}
super.init(originalConfig);
}

@Override
public List<Configuration> split(Configuration originalConfig, int adviceNumber) {
List<Configuration> list = super.split(originalConfig, adviceNumber);
List<Configuration> list;
// readByPartition is lower priority than splitPk.
// and readByPartition only works in table mode.
if (!isSplitPkValid(originalConfig) &&
originalConfig.getBool(Constant.IS_TABLE_MODE) &&
originalConfig.getBool(ObReaderKey.READ_BY_PARTITION, false)) {
LOG.info("try to split reader job by partition.");
list = PartitionSplitUtil.splitByPartition(originalConfig);
} else {
LOG.info("try to split reader job by splitPk.");
list = super.split(originalConfig, adviceNumber);
}

for (Configuration config : list) {
String jdbcUrl = config.getString(Key.JDBC_URL);
String obRegionName = getObRegionName(jdbcUrl);
config.set(CommonConstant.LOAD_BALANCE_RESOURCE_MARK, obRegionName);
}

return list;
}

private boolean isSplitPkValid(Configuration originalConfig) {
String splitPk = originalConfig.getString(Key.SPLIT_PK);
return splitPk != null && splitPk.trim().length() > 0;
}

private String getObRegionName(String jdbcUrl) {
if (jdbcUrl.startsWith(Constant.OB10_SPLIT_STRING)) {
String[] ss = jdbcUrl.split(Constant.OB10_SPLIT_STRING_PATTERN);
final String obJdbcDelimiter = com.alibaba.datax.plugin.rdbms.writer.Constant.OB10_SPLIT_STRING;
if (jdbcUrl.startsWith(obJdbcDelimiter)) {
String[] ss = jdbcUrl.split(obJdbcDelimiter);
if (ss.length >= 2) {
String tenant = ss[1].trim();
String[] sss = tenant.split(":");
return sss[0];
}
}

return null;
}
}
Loading

0 comments on commit 894b57b

Please sign in to comment.