Skip to content

Commit

Permalink
1. Split the upgrade code into different version
Browse files Browse the repository at this point in the history
2. Log the dml/ddl sql in origin friendly format
3. Fix ddl of 3.0.0 and 3.1.0
  • Loading branch information
ruanwenjun committed Jan 20, 2023
1 parent 3b54de0 commit 69a1c55
Show file tree
Hide file tree
Showing 13 changed files with 899 additions and 650 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -91,17 +93,17 @@ public void runScript(Reader reader) throws IOException, SQLException {
* @throws IOException if there is an error reading from the Reader
*/
private void runScript(Connection conn, Reader reader) throws IOException, SQLException {
StringBuffer command = null;
List<String> command = null;
try {
LineNumberReader lineReader = new LineNumberReader(reader);
String line;
while ((line = lineReader.readLine()) != null) {
if (command == null) {
command = new StringBuffer();
command = new ArrayList<>();
}
String trimmedLine = line.trim();
if (trimmedLine.startsWith("--")) {
logger.info(trimmedLine);
logger.info("\n", trimmedLine);
} else if (trimmedLine.length() < 1 || trimmedLine.startsWith("//")) {
// Do nothing
} else if (trimmedLine.startsWith("delimiter")) {
Expand All @@ -110,12 +112,11 @@ private void runScript(Connection conn, Reader reader) throws IOException, SQLEx

} else if (!fullLineDelimiter && trimmedLine.endsWith(getDelimiter())
|| fullLineDelimiter && trimmedLine.equals(getDelimiter())) {
command.append(line, 0, line.lastIndexOf(getDelimiter()));
command.append(" ");
logger.info("sql: {}", command);
command.add(line.substring(0, line.lastIndexOf(getDelimiter())));
logger.info("\n{}", String.join("\n", command));

try (Statement statement = conn.createStatement()) {
statement.execute(command.toString());
statement.execute(String.join(" ", command));
try (ResultSet rs = statement.getResultSet()) {
if (stopOnError && rs != null) {
ResultSetMetaData md = rs.getMetaData();
Expand All @@ -142,8 +143,7 @@ private void runScript(Connection conn, Reader reader) throws IOException, SQLEx
command = null;
Thread.yield();
} else {
command.append(line);
command.append(" ");
command.add(line);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ BEGIN
ALTER TABLE `t_ds_alert` ADD COLUMN `project_code` bigint DEFAULT NULL COMMENT 'project_code';
ALTER TABLE `t_ds_alert` ADD COLUMN `process_definition_code` bigint DEFAULT NULL COMMENT 'process_definition_code';
ALTER TABLE `t_ds_alert` ADD COLUMN `process_instance_id` int DEFAULT NULL COMMENT 'process_instance_id';
ALTER TABLE `t_ds_alert` MODIFY COLUMN `alert_type` int DEFAULT NULL COMMENT 'alert_type';
ALTER TABLE `t_ds_alert` ADD COLUMN `alert_type` int DEFAULT NULL COMMENT 'alert_type';
END IF;
END;
d//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,41 +392,46 @@ CALL modify_t_ds_task_group_col_description;
DROP PROCEDURE modify_t_ds_task_group_col_description;

-- alter table `t_ds_worker_group` add `other_params_json` text;
-- alter table `t_ds_process_instance` add `state_history` text;
drop procedure if exists add_column_safety;
drop procedure if exists add_t_ds_task_group_other_params_json;
delimiter d//
create procedure add_column_safety(target_table_name varchar(256), target_column varchar(256),
target_column_type varchar(256), sths_else varchar(256))
begin
declare target_database varchar(256);
select database() into target_database;
IF EXISTS(SELECT *
FROM information_schema.COLUMNS
WHERE COLUMN_NAME = target_column
AND TABLE_NAME = target_table_name
)
CREATE PROCEDURE add_t_ds_task_group_other_params_json()
BEGIN
IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME='t_ds_worker_group'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME='other_params_json')
THEN
set @statement =
concat('alter table ', target_table_name, ' change column ', target_column, ' ', target_column, ' ',
target_column_type, ' ',
sths_else);
PREPARE STMT_c FROM @statement;
EXECUTE STMT_c;
alter table `t_ds_worker_group` add column `other_params_json` text DEFAULT NULL COMMENT "other params json";
ELSE
set @statement =
concat('alter table ', target_table_name, ' add column ', target_column, ' ', target_column_type, ' ',
sths_else);
PREPARE STMT_a FROM @statement;
EXECUTE STMT_a;
alter table `t_ds_worker_group` modify column `other_params_json` text DEFAULT NULL COMMENT "other params json";
END IF;
end;
END;
d//
delimiter ;

call add_column_safety('t_ds_worker_group','other_params_json', 'text' , "DEFAULT NULL COMMENT 'other params json'");
call add_column_safety('t_ds_process_instance','state_history', 'text' , "DEFAULT NULL COMMENT 'state history desc' AFTER `state`");
call add_t_ds_task_group_other_params_json();
drop procedure if exists add_t_ds_task_group_other_params_json;

-- alter table `t_ds_process_instance` add `state_history` text;
drop procedure if exists add_t_ds_process_instance_state_history;
delimiter d//
CREATE PROCEDURE add_t_ds_process_instance_state_history()
BEGIN
IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME='t_ds_process_instance'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME='state_history')
THEN
alter table `t_ds_process_instance` add column `state_history` text DEFAULT NULL COMMENT "other params json";
ELSE
alter table `t_ds_process_instance` modify column `state_history` text DEFAULT NULL COMMENT "other params json";
END IF;
END;
d//
delimiter ;
call add_t_ds_process_instance_state_history();
drop procedure if exists add_t_ds_process_instance_state_history;

drop procedure if exists add_column_safety;

alter table t_ds_process_instance alter column process_instance_priority set default 2;
alter table t_ds_schedules alter column process_instance_priority set default 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,5 +65,22 @@ d//
delimiter ;

-- ALTER TABLE t_ds_worker_group ADD COLUMN description varchar(255) DEFAULT NULL COMMENT 'ds worker group description';
call add_column_safety('t_ds_worker_group','description', 'varchar(255)' , "DEFAULT NULL COMMENT 'ds worker group description'");
drop procedure if exists add_column_safety;
drop procedure if exists modify_t_ds_worker_group_description;
delimiter d//
CREATE PROCEDURE modify_t_ds_worker_group_description()
BEGIN
IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME='t_ds_worker_group'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME='description')
THEN
alter table `t_ds_worker_group` add column `description` varchar(255) DEFAULT NULL COMMENT "ds worker group description";
ELSE
alter table `t_ds_worker_group` modify column `description` varchar(255) DEFAULT NULL COMMENT "ds worker group description";
END IF;
END;
d//
delimiter ;

call modify_t_ds_worker_group_description();
drop procedure if exists modify_t_ds_worker_group_description;
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.dolphinscheduler.tools.datasource.dao.UpgradeDao;
import org.apache.dolphinscheduler.tools.datasource.upgrader.DolphinSchedulerUpgrader;
import org.apache.dolphinscheduler.tools.datasource.upgrader.DolphinSchedulerVersion;

import org.apache.commons.collections4.CollectionUtils;

Expand All @@ -45,7 +46,7 @@ public class DolphinSchedulerManager {

private final UpgradeDao upgradeDao;

private Map<String, DolphinSchedulerUpgrader> upgraderMap = new HashMap<>();
private Map<DolphinSchedulerVersion, DolphinSchedulerUpgrader> upgraderMap = new HashMap<>();

public DolphinSchedulerManager(DataSource dataSource, List<UpgradeDao> daos,
List<DolphinSchedulerUpgrader> dolphinSchedulerUpgraders) throws Exception {
Expand Down Expand Up @@ -121,21 +122,11 @@ public void upgradeDolphinScheduler() throws IOException {
logger.info("upgrade DolphinScheduler metadata version from {} to {}", version, schemaVersion);
logger.info("Begin upgrading DolphinScheduler's table structure");
upgradeDao.upgradeDolphinScheduler(schemaDir);
if ("1.3.0".equals(schemaVersion)) {
upgradeDao.upgradeDolphinSchedulerWorkerGroup();
} else if ("1.3.2".equals(schemaVersion)) {
upgradeDao.upgradeDolphinSchedulerResourceList();
} else if ("2.0.0".equals(schemaVersion)) {
upgradeDao.upgradeDolphinSchedulerTo200(schemaDir);
}
DolphinSchedulerUpgrader dolphinSchedulerUpgrader = upgraderMap.get(schemaVersion);
if (dolphinSchedulerUpgrader != null) {
dolphinSchedulerUpgrader.doUpgrade();
}
DolphinSchedulerVersion.getVersion(schemaVersion).ifPresent(v -> upgraderMap.get(v).doUpgrade());
version = schemaVersion;
}
}

// todo: do we need to do this in all version > 2.0.6?
if (SchemaUtils.isAGreatVersion("2.0.6", currentVersion)
&& SchemaUtils.isAGreatVersion(SchemaUtils.getSoftVersion(), currentVersion)) {
upgradeDao.upgradeDolphinSchedulerResourceFileSize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,38 +43,6 @@ public class ResourceDao {

public static final Logger logger = LoggerFactory.getLogger(ResourceDao.class);

/**
* list all resources
*
* @param conn connection
* @return map that key is full_name and value is id
*/
Map<String, Integer> listAllResources(Connection conn) {
Map<String, Integer> resourceMap = new HashMap<>();

String sql = String.format("SELECT id,full_name FROM t_ds_resources");
ResultSet rs = null;
PreparedStatement pstmt = null;
try {
pstmt = conn.prepareStatement(sql);
rs = pstmt.executeQuery();

while (rs.next()) {
Integer id = rs.getInt(1);
String fullName = rs.getString(2);
resourceMap.put(fullName, id);
}

} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("sql: " + sql, e);
} finally {
ConnectionUtils.releaseResource(rs, pstmt, conn);
}

return resourceMap;
}

/**
* list all resources by the type
*
Expand Down
Loading

0 comments on commit 69a1c55

Please sign in to comment.