diff --git a/be/src/common/config.h b/be/src/common/config.h index 1f2a5468b5ba5..60266a840b7d6 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -379,6 +379,10 @@ CONF_mInt64(size_tiered_level_multiple, "5"); CONF_mInt64(size_tiered_level_multiple_dupkey, "10"); CONF_mInt64(size_tiered_level_num, "7"); +// disable compaction will take affect when tablet size is larger than this value +// for those small tablet, we will always compact them +CONF_mInt64(disable_compaction_tablet_size_threshold, "52428800"); + CONF_Bool(enable_check_string_lengths, "true"); // Max row source mask memory bytes, default is 200M. diff --git a/be/src/storage/compaction_manager.cpp b/be/src/storage/compaction_manager.cpp index c67b548e7c465..6df762cbf9738 100644 --- a/be/src/storage/compaction_manager.cpp +++ b/be/src/storage/compaction_manager.cpp @@ -162,6 +162,9 @@ void CompactionManager::update_candidates(std::vector candi } } for (auto& candidate : candidates) { + if (_check_disable_compaction(candidate)) { + continue; + } if (candidate.tablet->enable_compaction()) { VLOG(2) << "update candidate " << candidate.tablet->tablet_id() << " type " << starrocks::to_string(candidate.type) << " score " << candidate.score; @@ -200,6 +203,25 @@ void CompactionManager::remove_candidate(int64_t tablet_id) { } } +bool CompactionManager::_check_disable_compaction(const CompactionCandidate& candidate) { + if (candidate.type == CompactionType::BASE_COMPACTION && + _table_to_disable_deadline_map.find(candidate.tablet->tablet_meta()->table_id()) != + _table_to_disable_deadline_map.end()) { + int64_t deadline = _table_to_disable_deadline_map[candidate.tablet->tablet_meta()->table_id()]; + if (deadline > 0 && UnixSeconds() < deadline) { + if (candidate.tablet->data_size() > config::disable_compaction_tablet_size_threshold) { + return true; + } + } else { + // disable compaction deadline has passed, remove it from map + _table_to_disable_deadline_map.erase(candidate.tablet->tablet_meta()->table_id()); + // check if the tablet should compact now after the deadline + update_tablet_async(candidate.tablet); + } + } + return false; +} + bool CompactionManager::_check_precondition(const CompactionCandidate& candidate) { if (!candidate.tablet) { LOG(WARNING) << "candidate with null tablet"; @@ -212,19 +234,6 @@ bool CompactionManager::_check_precondition(const CompactionCandidate& candidate return false; } - // check if the table base compaction is disabled - if (candidate.type == CompactionType::BASE_COMPACTION && - _table_to_disable_deadline_map.find(tablet->tablet_meta()->table_id()) != - _table_to_disable_deadline_map.end()) { - int64_t deadline = _table_to_disable_deadline_map[tablet->tablet_meta()->table_id()]; - if (deadline > 0 && UnixSeconds() < deadline) { - VLOG(2) << "skip tablet:" << tablet->tablet_id() << " because table is disabled"; - return false; - } else { - _table_to_disable_deadline_map.erase(tablet->tablet_meta()->table_id()); - } - } - int64_t last_failure_ts = 0; DataDir* data_dir = tablet->data_dir(); if (candidate.type == CUMULATIVE_COMPACTION) { @@ -291,6 +300,10 @@ bool CompactionManager::pick_candidate(CompactionCandidate* candidate) { auto iter = _compaction_candidates.begin(); while (iter != _compaction_candidates.end()) { + if (_check_disable_compaction(*iter)) { + _compaction_candidates.erase(iter++); + continue; + } if (_check_precondition(*iter)) { *candidate = *iter; _compaction_candidates.erase(iter); diff --git a/be/src/storage/compaction_manager.h b/be/src/storage/compaction_manager.h index 7a3f97d2de9a6..6a3934600d60d 100644 --- a/be/src/storage/compaction_manager.h +++ b/be/src/storage/compaction_manager.h @@ -146,6 +146,7 @@ class CompactionManager { CompactionManager& operator=(CompactionManager&& compaction_manager) = delete; void _dispatch_worker(); + bool _check_disable_compaction(const CompactionCandidate& candidate); bool _check_precondition(const CompactionCandidate& candidate); void _schedule(); void _notify(); diff --git a/fe/fe-core/src/main/java/com/starrocks/lake/compaction/CompactionControlScheduler.java b/fe/fe-core/src/main/java/com/starrocks/lake/compaction/CompactionControlScheduler.java index a6cb7d4dba3ec..806f70b545bb4 100644 --- a/fe/fe-core/src/main/java/com/starrocks/lake/compaction/CompactionControlScheduler.java +++ b/fe/fe-core/src/main/java/com/starrocks/lake/compaction/CompactionControlScheduler.java @@ -23,6 +23,7 @@ import com.starrocks.task.CompactionControlTask; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.quartz.CronExpression; import org.quartz.CronScheduleBuilder; import org.quartz.CronTrigger; import org.quartz.DateBuilder; @@ -40,6 +41,7 @@ import org.quartz.impl.StdSchedulerFactory; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; public class CompactionControlScheduler { @@ -71,6 +73,7 @@ public void updateTableForbiddenTimeRanges(Long tableId, String crontab) throws // Check if a job with the same JobKey already exists, and delete it if found if (scheduler.checkExists(jobKey)) { scheduler.deleteJob(jobKey); + tableCompactionMap.remove(tableId); } // If crontab is empty, remove the job and return @@ -86,6 +89,9 @@ public void updateTableForbiddenTimeRanges(Long tableId, String crontab) throws } // Parse each field and build the Quartz Cron expression + if (!cronParts[0].equals("*")) { + throw new IllegalArgumentException("Invalid crontab format. The minute field must be '*'."); + } String hour = cronParts[1]; String dayOfMonth = cronParts[2]; String month = cronParts[3]; @@ -93,8 +99,18 @@ public void updateTableForbiddenTimeRanges(Long tableId, String crontab) throws // If either dayOfMonth or dayOfWeek is *, set the other to ? if (!dayOfMonth.equals("*") && !dayOfWeek.equals("*")) { - throw new IllegalArgumentException("For Quartz cron, either dayOfMonth or dayOfWeek should be '*', not both."); + throw new IllegalArgumentException("For Quartz cron, either day of month or day of week must be '*'."); + } + + Set triggers = Sets.newHashSet(); + // fully disable should trigger immediately + if (dayOfMonth.equals("*") && dayOfWeek.equals("*") && hour.equals("*") && month.equals("*")) { + triggers.add(TriggerBuilder.newTrigger() + .withIdentity("syncImmediateTrigger" + tableId, groupKeyString) + .startNow() // Start immediately + .build()); } + if (dayOfMonth.equals("*")) { dayOfMonth = "?"; } else { @@ -103,6 +119,11 @@ public void updateTableForbiddenTimeRanges(Long tableId, String crontab) throws String modifiedCron = String.format("0 0/5 %s %s %s %s", hour, dayOfMonth, month, dayOfWeek); + if (!CronExpression.isValidExpression(modifiedCron)) { + throw new IllegalArgumentException("Invalid crontab format. You can check through " + + "https://www.freeformatter.com/cron-expression-generator-quartz.html"); + } + // Build the new JobDetail JobDetail jobDetail = JobBuilder.newJob(DisableCompactionJob.class) .withIdentity(jobKey) @@ -117,15 +138,10 @@ public void updateTableForbiddenTimeRanges(Long tableId, String crontab) throws .withIdentity(triggerKey) .withSchedule(CronScheduleBuilder.cronSchedule(modifiedCron)) .build(); - - // Create a SimpleTrigger to execute the job immediately - Trigger immediateTrigger = TriggerBuilder.newTrigger() - .withIdentity("syncImmediateTrigger" + tableId, groupKeyString) - .startNow() // Start immediately - .build(); + triggers.add(cronTrigger); // Schedule the job with both triggers - scheduler.scheduleJob(jobDetail, Sets.newHashSet(cronTrigger, immediateTrigger), true); + scheduler.scheduleJob(jobDetail, triggers, true); triggerSyncJobWithDelay(); @@ -202,14 +218,13 @@ public void execute(JobExecutionContext context) throws JobExecutionException { CompactionControlTask task = new CompactionControlTask(backendId, tableCompactionMap); // add task to send batchTask.addTask(task); - LOG.debug("add compaction control task. backendId: {} data: {}", backendId, tableCompactionMap); }); if (batchTask.getTaskNum() > 0) { for (AgentTask task : batchTask.getAllTasks()) { AgentTaskQueue.addTask(task); } AgentTaskExecutor.submit(batchTask); - LOG.debug("tablet[{}] send compaction control task. num: {}", batchTask.getTaskNum()); + LOG.info("disable compaction: {}", tableCompactionMap); } } } diff --git a/test/sql/test_compaction_cron/R/test_compaction_cron b/test/sql/test_compaction_cron/R/test_compaction_cron index 46a0590a318b3..36795f55292f3 100644 --- a/test/sql/test_compaction_cron/R/test_compaction_cron +++ b/test/sql/test_compaction_cron/R/test_compaction_cron @@ -58,10 +58,6 @@ PROPERTIES ( "replication_num" = "3" ); -- !result - - - - -- name: test_disable_compaction create database test_disable_compaction; -- result: @@ -72,6 +68,9 @@ use test_disable_compaction; create table t(k int, v int) properties('base_compaction_forbidden_time_ranges'='* * * * *'); -- result: -- !result +update information_schema.be_configs set value='-1' where name='disable_compaction_tablet_size_threshold'; +-- result: +-- !result select sleep(10); -- result: 1 @@ -102,10 +101,6 @@ select min(NUM_VERSION) from information_schema.be_tablets t1, information_schem -- result: 7 -- !result - - - - -- name: test_primary_key create table t(k int, v int) primary key(k) properties('base_compaction_forbidden_time_ranges'='* * * * *'); -- result: @@ -117,4 +112,17 @@ create table t(k int, v int) primary key(k); alter table t set ('base_compaction_forbidden_time_ranges'='* 7-20 * * *'); -- result: E: (5064, 'Getting analyzing error. Detail message: Property base_compaction_forbidden_time_ranges not support primary keys table or cloud native table.') +-- !result +-- name: test_invalid_cron +create table t(k int, v int) properties('base_compaction_forbidden_time_ranges'='5 8-18 * * *'); +-- result: +E: (1064, "Invalid crontab format. The minute field must be '*'.") +-- !result +create table t(k int, v int) properties('base_compaction_forbidden_time_ranges'='-1 * * * *'); +-- result: +E: (1064, "Invalid crontab format. The minute field must be '*'.") +-- !result +create table t(k int, v int) properties('base_compaction_forbidden_time_ranges'='* * * * 0'); +-- result: +E: (1064, 'Invalid crontab format. You can check through https://www.freeformatter.com/cron-expression-generator-quartz.html') -- !result \ No newline at end of file diff --git a/test/sql/test_compaction_cron/T/test_compaction_cron b/test/sql/test_compaction_cron/T/test_compaction_cron index 3599745153fed..458fcb861e3f9 100644 --- a/test/sql/test_compaction_cron/T/test_compaction_cron +++ b/test/sql/test_compaction_cron/T/test_compaction_cron @@ -10,6 +10,7 @@ show create table t; create database test_disable_compaction; use test_disable_compaction; create table t(k int, v int) properties('base_compaction_forbidden_time_ranges'='* * * * *'); +update information_schema.be_configs set value='-1' where name='disable_compaction_tablet_size_threshold'; select sleep(10); insert into t values(1,1),(2,2),(3,3),(4,4),(5,5); delete from t where k = 1; @@ -25,3 +26,7 @@ create table t(k int, v int) primary key(k) properties('base_compaction_forbidd create table t(k int, v int) primary key(k); alter table t set ('base_compaction_forbidden_time_ranges'='* 7-20 * * *'); +-- name: test_invalid_cron +create table t(k int, v int) properties('base_compaction_forbidden_time_ranges'='5 8-18 * * *'); +create table t(k int, v int) properties('base_compaction_forbidden_time_ranges'='-1 * * * *'); +create table t(k int, v int) properties('base_compaction_forbidden_time_ranges'='* * * * 0');