Skip to content

Commit

Permalink
fixed issue #218 , support target db meta reload
Browse files Browse the repository at this point in the history
  • Loading branch information
agapple committed Nov 10, 2016
1 parent 518002e commit a4b4ee7
Showing 1 changed file with 31 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,20 +89,20 @@
*/
public class DbLoadAction implements InitializingBean, DisposableBean {

private static final Logger logger = LoggerFactory.getLogger(DbLoadAction.class);
private static final String WORKER_NAME = "DbLoadAction";
private static final Logger logger = LoggerFactory.getLogger(DbLoadAction.class);
private static final String WORKER_NAME = "DbLoadAction";
private static final String WORKER_NAME_FORMAT = "pipelineId = %s , pipelineName = %s , " + WORKER_NAME;
private static final int DEFAULT_POOL_SIZE = 5;
private int poolSize = DEFAULT_POOL_SIZE;
private int retry = 3;
private int retryWait = 3000;
private LoadInterceptor interceptor;
private ExecutorService executor;
private DbDialectFactory dbDialectFactory;
private static final int DEFAULT_POOL_SIZE = 5;
private int poolSize = DEFAULT_POOL_SIZE;
private int retry = 3;
private int retryWait = 3000;
private LoadInterceptor interceptor;
private ExecutorService executor;
private DbDialectFactory dbDialectFactory;
private ConfigClientService configClientService;
private int batchSize = 50;
private boolean useBatch = true;
private LoadStatsTracker loadStatsTracker;
private int batchSize = 50;
private boolean useBatch = true;
private LoadStatsTracker loadStatsTracker;

/**
* 返回结果为已处理成功的记录
Expand Down Expand Up @@ -512,14 +512,14 @@ enum ExecuteResult {

class DbLoadWorker implements Callable<Exception> {

private DbLoadContext context;
private DbDialect dbDialect;
private DbLoadContext context;
private DbDialect dbDialect;
private List<EventData> datas;
private boolean canBatch;
private List<EventData> allFailedDatas = new ArrayList<EventData>();
private boolean canBatch;
private List<EventData> allFailedDatas = new ArrayList<EventData>();
private List<EventData> allProcesedDatas = new ArrayList<EventData>();
private List<EventData> processedDatas = new ArrayList<EventData>();
private List<EventData> failedDatas = new ArrayList<EventData>();
private List<EventData> processedDatas = new ArrayList<EventData>();
private List<EventData> failedDatas = new ArrayList<EventData>();

public DbLoadWorker(DbLoadContext context, List<EventData> datas, boolean canBatch){
this.context = context;
Expand Down Expand Up @@ -739,9 +739,19 @@ private void doPreparedStatement(PreparedStatement ps, DbDialect dbDialect, LobC

Boolean isRequired = isRequiredMap.get(StringUtils.lowerCase(column.getColumnName()));
if (isRequired == null) {
throw new LoadException(String.format("column name %s is not found in Table[%s]",
column.getColumnName(),
table.toString()));
// 清理一下目标库的表结构,二次检查一下
table = dbDialect.findTable(data.getSchemaName(), data.getTableName(), false);
isRequiredMap = new HashMap<String, Boolean>();
for (Column tableColumn : table.getColumns()) {
isRequiredMap.put(StringUtils.lowerCase(tableColumn.getName()), tableColumn.isRequired());
}

isRequired = isRequiredMap.get(StringUtils.lowerCase(column.getColumnName()));
if (isRequired == null) {
throw new LoadException(String.format("column name %s is not found in Table[%s]",
column.getColumnName(),
table.toString()));
}
}

Object param = null;
Expand Down

0 comments on commit a4b4ee7

Please sign in to comment.