Skip to content

Commit

Permalink
optimize code logic
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm committed Mar 27, 2024
1 parent 291d748 commit 8f8cdcc
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ public interface JdbcConnectionProvider<JC extends JdbcConnection> extends AutoC
*/
JC getConnection();

/**
* Creates a new database connection.
*
* @return A new database connection.
*/
JC newConnection();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ private void eventConsumer(Event event) {
public void start() throws Exception {
this.databaseDialect.start();
this.taskManagerCoordinator.start();
//start snapshot engine
this.snapshotEngine.start();
SnapshotResult<?> result = this.snapshotEngine.execute();
this.snapshotEngine.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,13 @@ protected SnapshotResult<Jc> doExecute(Jc context, SnapshotContext<Part, Offset>
//Whether to determine whether to process the table data?
if (sourceConnectorConfig.isSnapshotData()) {
connectionPool = createConnectionPool(snapshotContext);
createDataEvents(context, snapshotContext, connectionPool);
handleTableDataAndCreateDataEvents(context, snapshotContext, connectionPool);
}
log.info("Snapshot 6: Release the locks");
releaseSnapshotLocks(context, snapshotContext);
} catch (Exception e) {
throw new RuntimeException(e);
log.error("Handle snapshot tables schema or data error", e);
return new SnapshotResult<>(SnapshotResultStatus.ABORTED, context);
} finally {
//close connection pool's connection
try {
Expand Down Expand Up @@ -190,7 +191,7 @@ private Connection createMasterConnection() throws SQLException {
* @param connectionPool The connection pool.
* @throws SQLException If an error occurs.
*/
private void createDataEvents(Jc context, SnapshotContext<Part, Offset> snapshotContext, Queue<JdbcConnection> connectionPool)
private void handleTableDataAndCreateDataEvents(Jc context, SnapshotContext<Part, Offset> snapshotContext, Queue<JdbcConnection> connectionPool)
throws Exception {

int handleDataThreadNum = connectionPool.size();
Expand Down Expand Up @@ -267,7 +268,7 @@ private Queue<JdbcConnection> createConnectionPool(final SnapshotContext<Part, O
conn.connection().setTransactionIsolation(jdbcConnection.connection().getTransactionIsolation());
connectionPool.add(conn);
}
log.info("Created connection pool with {} number", snapshotMaxThreads);
log.info("Create snapshot data handle thread pool with a size of {}.", snapshotMaxThreads);
return connectionPool;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,15 +178,15 @@ protected void readStructureOfTables(MysqlJdbcContext jdbcContext, SnapshotConte
for (TableId tableId : determineTables) {
StringBuilder dropTableDdl = new StringBuilder("DROP TABLE IF EXISTS ");
dropTableDdl.append(MysqlUtils.wrapper(tableId));
addParseDdlAndEvent(jdbcContext, dropTableDdl.toString(), tableId);
parseDdlSwitch2Event(jdbcContext, dropTableDdl.toString(), tableId);
}
final HashMap<String/* database Name */, List<TableId>/* table list */> databaseMapTables = determineTables.stream()
.collect(Collectors.groupingBy(TableId::getCatalogName, HashMap::new, Collectors.toList()));
Set<String> databaseSet = databaseMapTables.keySet();
// Read all table structures, construct DDL statements
for (String database : databaseSet) {
StringBuilder dropDatabaseDdl = new StringBuilder("DROP DATABASE IF EXISTS ").append(MysqlUtils.wrapper(database));
addParseDdlAndEvent(jdbcContext, dropDatabaseDdl.toString(), new TableId(database));
parseDdlSwitch2Event(jdbcContext, dropDatabaseDdl.toString(), new TableId(database));
String databaseCreateDdl = connection.query(MysqlDialectSql.SHOW_CREATE_DATABASE.ofWrapperSQL(MysqlUtils.wrapper(database)), rs -> {
if (rs.next() && rs.getMetaData().getColumnCount() > 1) {
String ddl = rs.getString(2);
Expand All @@ -199,16 +199,16 @@ protected void readStructureOfTables(MysqlJdbcContext jdbcContext, SnapshotConte
continue;
}
TableId tableId = new TableId(database);
addParseDdlAndEvent(jdbcContext, databaseCreateDdl, tableId);
addParseDdlAndEvent(jdbcContext, "USE " + database, tableId);
parseDdlSwitch2Event(jdbcContext, databaseCreateDdl, tableId);
parseDdlSwitch2Event(jdbcContext, "USE " + database, tableId);

// build create table snapshot event
List<TableId> tableIds = databaseMapTables.get(database);
createTableSnapshotEvent(tableIds, jdbcContext);
}
}

private void addParseDdlAndEvent(MysqlJdbcContext jdbcContext, String ddl, TableId tableId) {
private void parseDdlSwitch2Event(MysqlJdbcContext jdbcContext, String ddl, TableId tableId) {
jdbcContext.getParser().setCurrentDatabase(tableId.getCatalogName());
jdbcContext.getParser().setCatalogTableSet(jdbcContext.getCatalogTableSet());
jdbcContext.getParser().parse(ddl, event -> {
Expand Down Expand Up @@ -243,7 +243,7 @@ private void createTableSnapshotEvent(List<TableId> tableIds, MysqlJdbcContext j
if (resultSet.next()) {
// Get create table sql
String createTableDdl = resultSet.getString(2);
addParseDdlAndEvent(jdbcContext, createTableDdl, tableId);
parseDdlSwitch2Event(jdbcContext, createTableDdl, tableId);
}
});
}
Expand Down

0 comments on commit 8f8cdcc

Please sign in to comment.