Skip to content

Commit

Permalink
[fix](dynamic partition) fix dynamic partition thread met uncatch exc…
Browse files Browse the repository at this point in the history
…eption #35778 (#35824)

cherry pick from #35778
  • Loading branch information
yujun777 authored Jun 5, 2024
1 parent 29f0856 commit 5d4be00
Show file tree
Hide file tree
Showing 10 changed files with 181 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,13 @@ public void checkDistribution(DistributionInfo distributionInfo) throws DdlExcep
HashDistributionInfo info = (HashDistributionInfo) distributionInfo;
// buckets num
if (info.getBucketNum() != bucketsNum) {
ErrorReport.reportDdlException(ErrorCode.ERR_COLOCATE_TABLE_MUST_HAS_SAME_BUCKET_NUM, bucketsNum);
ErrorReport.reportDdlException(ErrorCode.ERR_COLOCATE_TABLE_MUST_HAS_SAME_BUCKET_NUM,
info.getBucketNum(), bucketsNum);
}
// distribution col size
if (info.getDistributionColumns().size() != distributionColTypes.size()) {
ErrorReport.reportDdlException(ErrorCode.ERR_COLOCATE_TABLE_MUST_HAS_SAME_DISTRIBUTION_COLUMN_SIZE,
distributionColTypes.size());
info.getDistributionColumns().size(), distributionColTypes.size());
}
// distribution col type
for (int i = 0; i < distributionColTypes.size(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.apache.doris.catalog.RangePartitionInfo;
import org.apache.doris.catalog.RangePartitionItem;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
Expand Down Expand Up @@ -101,9 +100,9 @@ public DynamicPartitionScheduler(String name, long intervalMs) {
this.initialize = false;
}

public void executeDynamicPartitionFirstTime(Long dbId, Long tableId) {
public void executeDynamicPartitionFirstTime(Long dbId, Long tableId) throws DdlException {
List<Pair<Long, Long>> tempDynamicPartitionTableInfo = Lists.newArrayList(Pair.of(dbId, tableId));
executeDynamicPartition(tempDynamicPartitionTableInfo);
executeDynamicPartition(tempDynamicPartitionTableInfo, true);
}

public void registerDynamicPartitionTable(Long dbId, Long tableId) {
Expand Down Expand Up @@ -185,8 +184,9 @@ private static long getNextPartitionSize(ArrayList<Long> historyPartitionsSize)
}
}

private static int getBucketsNum(DynamicPartitionProperty property, OlapTable table, String nowPartitionName) {
if (!table.isAutoBucket()) {
private static int getBucketsNum(DynamicPartitionProperty property, OlapTable table,
String nowPartitionName, boolean executeFirstTime) {
if (!table.isAutoBucket() || executeFirstTime) {
return property.getBuckets();
}

Expand Down Expand Up @@ -216,23 +216,18 @@ private static int getBucketsNum(DynamicPartitionProperty property, OlapTable ta
}

private ArrayList<AddPartitionClause> getAddPartitionClause(Database db, OlapTable olapTable,
Column partitionColumn, String partitionFormat) {
Column partitionColumn, String partitionFormat, boolean executeFirstTime) throws DdlException {
ArrayList<AddPartitionClause> addPartitionClauses = new ArrayList<>();
DynamicPartitionProperty dynamicPartitionProperty = olapTable.getTableProperty().getDynamicPartitionProperty();
RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) olapTable.getPartitionInfo();
ZonedDateTime now = ZonedDateTime.now(dynamicPartitionProperty.getTimeZone().toZoneId());

boolean createHistoryPartition = dynamicPartitionProperty.isCreateHistoryPartition();
int idx;
int start = dynamicPartitionProperty.getStart();
int historyPartitionNum = dynamicPartitionProperty.getHistoryPartitionNum();
// When enable create_history_partition, will check the valid value from start and history_partition_num.
if (createHistoryPartition) {
if (historyPartitionNum == DynamicPartitionProperty.NOT_SET_HISTORY_PARTITION_NUM) {
idx = start;
} else {
idx = Math.max(start, -historyPartitionNum);
}
idx = DynamicPartitionUtil.getRealStart(dynamicPartitionProperty.getStart(),
dynamicPartitionProperty.getHistoryPartitionNum());
} else {
idx = 0;
}
Expand Down Expand Up @@ -261,12 +256,14 @@ private ArrayList<AddPartitionClause> getAddPartitionClause(Database db, OlapTab
PartitionKey upperBound = PartitionKey.createPartitionKey(Collections.singletonList(upperValue),
Collections.singletonList(partitionColumn));
addPartitionKeyRange = Range.closedOpen(lowerBound, upperBound);
} catch (AnalysisException | IllegalArgumentException e) {
} catch (Exception e) {
// AnalysisException: keys.size is always equal to column.size, cannot reach this exception
// IllegalArgumentException: lb is greater than ub
LOG.warn("Error in gen addPartitionKeyRange. Error={}, db: {}, table: {}", e.getMessage(),
db.getFullName(), olapTable.getName());
continue;
LOG.warn("Error in gen addPartitionKeyRange. db: {}, table: {}, partition idx: {}",
db.getFullName(), olapTable.getName(), idx, e);
recordCreatePartitionFailedMsg(db.getFullName(), olapTable.getName(),
e.getMessage(), olapTable.getId());
throw new DdlException(e.getMessage());
}
for (PartitionItem partitionItem : rangePartitionInfo.getIdToItem(false).values()) {
// only support single column partition now
Expand All @@ -276,13 +273,16 @@ private ArrayList<AddPartitionClause> getAddPartitionClause(Database db, OlapTab
isPartitionExists = true;
if (addPartitionKeyRange.equals(partitionItem.getItems())) {
if (LOG.isDebugEnabled()) {
LOG.debug("partition range {} exist in table {}, clear fail msg",
addPartitionKeyRange, olapTable.getName());
LOG.debug("partition range {} exist in db {} table {} partition idx {}, clear fail msg",
addPartitionKeyRange, db.getFullName(), olapTable.getName(), idx);
}
clearCreatePartitionFailedMsg(olapTable.getId());
} else {
LOG.warn("check partition range {} in db {} table {} partiton idx {} fail",
addPartitionKeyRange, db.getFullName(), olapTable.getName(), idx, e);
recordCreatePartitionFailedMsg(db.getFullName(), olapTable.getName(),
e.getMessage(), olapTable.getId());
throw new DdlException(e.getMessage());
}
break;
}
Expand Down Expand Up @@ -318,7 +318,7 @@ private ArrayList<AddPartitionClause> getAddPartitionClause(Database db, OlapTab

DistributionDesc distributionDesc = null;
DistributionInfo distributionInfo = olapTable.getDefaultDistributionInfo();
int bucketsNum = getBucketsNum(dynamicPartitionProperty, olapTable, nowPartitionName);
int bucketsNum = getBucketsNum(dynamicPartitionProperty, olapTable, nowPartitionName, executeFirstTime);
if (distributionInfo.getType() == DistributionInfo.DistributionInfoType.HASH) {
HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo;
List<String> distColumnNames = new ArrayList<>();
Expand Down Expand Up @@ -392,11 +392,11 @@ private Range<PartitionKey> getClosedRange(Database db, OlapTable olapTable, Col
PartitionKey upperBorderBound = PartitionKey.createPartitionKey(
Collections.singletonList(upperBorderPartitionValue), Collections.singletonList(partitionColumn));
reservedHistoryPartitionKeyRange = Range.closed(lowerBorderBound, upperBorderBound);
} catch (AnalysisException e) {
} catch (org.apache.doris.common.AnalysisException | org.apache.doris.nereids.exceptions.AnalysisException e) {
// AnalysisException: keys.size is always equal to column.size, cannot reach this exception
// IllegalArgumentException: lb is greater than ub
LOG.warn("Error in gen reservePartitionKeyRange. Error={}, db: {}, table: {}", e.getMessage(),
db.getFullName(), olapTable.getName());
LOG.warn("Error in gen reservePartitionKeyRange. {}, table: {}",
db.getFullName(), olapTable.getName(), e);
}
return reservedHistoryPartitionKeyRange;
}
Expand All @@ -414,9 +414,11 @@ private ArrayList<DropPartitionClause> getDropPartitionClause(Database db, OlapT
return dropPartitionClauses;
}

int realStart = DynamicPartitionUtil.getRealStart(dynamicPartitionProperty.getStart(),
dynamicPartitionProperty.getHistoryPartitionNum());
ZonedDateTime now = ZonedDateTime.now(dynamicPartitionProperty.getTimeZone().toZoneId());
String lowerBorder = DynamicPartitionUtil.getPartitionRangeString(dynamicPartitionProperty,
now, dynamicPartitionProperty.getStart(), partitionFormat);
now, realStart, partitionFormat);
String upperBorder = DynamicPartitionUtil.getPartitionRangeString(dynamicPartitionProperty,
now, dynamicPartitionProperty.getEnd() + 1, partitionFormat);
PartitionValue lowerPartitionValue = new PartitionValue(lowerBorder);
Expand All @@ -430,11 +432,12 @@ private ArrayList<DropPartitionClause> getDropPartitionClause(Database db, OlapT
Collections.singletonList(partitionColumn));
reservePartitionKeyRange = Range.closedOpen(lowerBound, upperBound);
reservedHistoryPartitionKeyRangeList.add(reservePartitionKeyRange);
} catch (AnalysisException | IllegalArgumentException e) {
} catch (Exception e) {
// AnalysisException: keys.size is always equal to column.size, cannot reach this exception
// IllegalArgumentException: lb is greater than ub
LOG.warn("Error in gen reservePartitionKeyRange. Error={}, db: {}, table: {}", e.getMessage(),
db.getFullName(), olapTable.getName());
LOG.warn("Error in gen reservePartitionKeyRange. db: {}, table: {}",
db.getFullName(), olapTable.getName(), e);
recordDropPartitionFailedMsg(db.getFullName(), olapTable.getName(), e.getMessage(), olapTable.getId());
return dropPartitionClauses;
}

Expand All @@ -455,7 +458,8 @@ private ArrayList<DropPartitionClause> getDropPartitionClause(Database db, OlapT
reservedHistoryPartitionKeyRangeList.add(reservedHistoryPartitionKeyRange);
} catch (IllegalArgumentException e) {
return dropPartitionClauses;
} catch (AnalysisException e) {
} catch (org.apache.doris.common.AnalysisException
| org.apache.doris.nereids.exceptions.AnalysisException e) {
throw new DdlException(e.getMessage());
}
}
Expand Down Expand Up @@ -487,7 +491,8 @@ private ArrayList<DropPartitionClause> getDropPartitionClause(Database db, OlapT
return dropPartitionClauses;
}

private void executeDynamicPartition(Collection<Pair<Long, Long>> dynamicPartitionTableInfoCol) {
private void executeDynamicPartition(Collection<Pair<Long, Long>> dynamicPartitionTableInfoCol,
boolean executeFirstTime) throws DdlException {
Iterator<Pair<Long, Long>> iterator = dynamicPartitionTableInfoCol.iterator();
while (iterator.hasNext()) {
Pair<Long, Long> tableInfo = iterator.next();
Expand Down Expand Up @@ -546,12 +551,16 @@ private void executeDynamicPartition(Collection<Pair<Long, Long>> dynamicPartiti
}

if (!skipAddPartition) {
addPartitionClauses = getAddPartitionClause(db, olapTable, partitionColumn, partitionFormat);
addPartitionClauses = getAddPartitionClause(db, olapTable, partitionColumn, partitionFormat,
executeFirstTime);
}
dropPartitionClauses = getDropPartitionClause(db, olapTable, partitionColumn, partitionFormat);
tableName = olapTable.getName();
} catch (DdlException e) {
LOG.warn("should not happen", e);
} catch (Exception e) {
LOG.warn("has error", e);
if (executeFirstTime) {
throw new DdlException(e.getMessage());
}
} finally {
olapTable.readUnlock();
}
Expand All @@ -565,6 +574,10 @@ private void executeDynamicPartition(Collection<Pair<Long, Long>> dynamicPartiti
clearDropPartitionFailedMsg(olapTable.getId());
} catch (Exception e) {
recordDropPartitionFailedMsg(db.getFullName(), tableName, e.getMessage(), olapTable.getId());
LOG.warn("has error", e);
if (executeFirstTime) {
throw new DdlException(e.getMessage());
}
} finally {
olapTable.writeUnlock();
}
Expand All @@ -577,6 +590,10 @@ private void executeDynamicPartition(Collection<Pair<Long, Long>> dynamicPartiti
clearCreatePartitionFailedMsg(olapTable.getId());
} catch (Exception e) {
recordCreatePartitionFailedMsg(db.getFullName(), tableName, e.getMessage(), olapTable.getId());
LOG.warn("has error", e);
if (executeFirstTime) {
throw new DdlException(e.getMessage());
}
}
}
}
Expand Down Expand Up @@ -634,7 +651,14 @@ protected void runAfterCatalogReady() {
}
setInterval(Config.dynamic_partition_check_interval_seconds * 1000L);
if (Config.dynamic_partition_enable) {
executeDynamicPartition(dynamicPartitionTableInfo);
try {
executeDynamicPartition(dynamicPartitionTableInfo, false);
} catch (Exception e) {
// previous had log DdlException
if (LOG.isDebugEnabled()) {
LOG.debug("dynamic partition has error: ", e);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1113,9 +1113,9 @@ public enum ErrorCode {
ERR_COLOCATE_TABLE_MUST_HAS_SAME_REPLICATION_ALLOCATION(5063, new byte[]{'4', '2', '0', '0', '0'},
"Colocate tables must have same replication allocation: %s"),
ERR_COLOCATE_TABLE_MUST_HAS_SAME_BUCKET_NUM(5063, new byte[]{'4', '2', '0', '0', '0'},
"Colocate tables must have same bucket num: %s"),
"Colocate tables must have same bucket num: %s should be %s"),
ERR_COLOCATE_TABLE_MUST_HAS_SAME_DISTRIBUTION_COLUMN_SIZE(5063, new byte[]{'4', '2', '0', '0', '0'},
"Colocate tables distribution columns size must be same : %s"),
"Colocate tables distribution columns size must be same: %s should be %s"),
ERR_COLOCATE_TABLE_MUST_HAS_SAME_DISTRIBUTION_COLUMN_TYPE(5063, new byte[]{'4', '2', '0', '0', '0'},
"Colocate tables distribution columns must have the same data type: %s should be %s"),
ERR_COLOCATE_NOT_COLOCATE_TABLE(5064, new byte[]{'4', '2', '0', '0', '0'},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -577,21 +577,18 @@ public static Map<String, String> analyzeDynamicPartition(Map<String, String> pr
long expectCreatePartitionNum = 0;
if (!createHistoryPartition) {
start = 0;
expectCreatePartitionNum = (long) end - start;
} else {
int historyPartitionNum = Integer.parseInt(analyzedProperties.getOrDefault(
DynamicPartitionProperty.HISTORY_PARTITION_NUM,
String.valueOf(DynamicPartitionProperty.NOT_SET_HISTORY_PARTITION_NUM)));
if (historyPartitionNum != DynamicPartitionProperty.NOT_SET_HISTORY_PARTITION_NUM) {
expectCreatePartitionNum = (long) end - Math.max(start, -historyPartitionNum);
} else {
if (start == Integer.MIN_VALUE) {
throw new DdlException("Provide start or history_partition_num property"
+ " when create_history_partition=true. Otherwise set create_history_partition=false");
}
expectCreatePartitionNum = (long) end - start;
start = getRealStart(start, historyPartitionNum);
if (start == Integer.MIN_VALUE) {
throw new DdlException("Provide start or history_partition_num property"
+ " when create_history_partition=true. Otherwise set create_history_partition=false");
}
}
expectCreatePartitionNum = (long) end - start;

if (hasEnd && (expectCreatePartitionNum > Config.max_dynamic_partition_num)
&& Boolean.parseBoolean(analyzedProperties.getOrDefault(DynamicPartitionProperty.ENABLE, "true"))) {
throw new DdlException("Too many dynamic partitions: "
Expand Down Expand Up @@ -673,6 +670,14 @@ public static Map<String, String> analyzeDynamicPartition(Map<String, String> pr
return analyzedProperties;
}

public static int getRealStart(int start, int historyPartitionNum) {
if (historyPartitionNum == DynamicPartitionProperty.NOT_SET_HISTORY_PARTITION_NUM) {
return start;
} else {
return Math.max(start, -historyPartitionNum);
}
}

public static void checkAlterAllowed(OlapTable olapTable) throws DdlException {
TableProperty tableProperty = olapTable.getTableProperty();
if (tableProperty != null && tableProperty.getDynamicPartitionProperty() != null
Expand Down
10 changes: 8 additions & 2 deletions fe/fe-core/src/test/java/org/apache/doris/alter/AlterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,12 @@ public static void beforeClass() throws Exception {
+ "PROPERTIES\n"
+ "(\n"
+ "\"colocate_with\" = \"group_3\",\n"
+ "\"replication_num\" = \"1\",\n"
+ "\"dynamic_partition.enable\" = \"true\",\n"
+ "\"dynamic_partition.time_unit\" = \"DAY\",\n"
+ "\"dynamic_partition.end\" = \"3\",\n"
+ "\"dynamic_partition.prefix\" = \"p\",\n"
+ "\"dynamic_partition.buckets\" = \"32\",\n"
+ "\"dynamic_partition.buckets\" = \"3\",\n"
+ "\"dynamic_partition.replication_num\" = \"1\",\n"
+ "\"dynamic_partition.create_history_partition\"=\"true\",\n"
+ "\"dynamic_partition.start\" = \"-3\"\n"
Expand Down Expand Up @@ -229,7 +230,12 @@ public static void tearDown() {
private static void createTable(String sql) throws Exception {
Config.enable_odbc_table = true;
CreateTableStmt createTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext);
Env.getCurrentEnv().createTable(createTableStmt);
try {
Env.getCurrentEnv().createTable(createTableStmt);
} catch (Exception e) {
e.printStackTrace();
throw e;
}
}

private static void createRemoteStorageResource(String sql) throws Exception {
Expand Down
Loading

0 comments on commit 5d4be00

Please sign in to comment.