Skip to content

Commit

Permalink
[BugFix] Fix disable base compaction with minute granularity
Browse files Browse the repository at this point in the history
Signed-off-by: meegoo <[email protected]>
  • Loading branch information
meegoo committed Nov 15, 2024
1 parent 20e446e commit b46e2b5
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 31 deletions.
4 changes: 4 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,10 @@ CONF_mInt64(size_tiered_level_multiple, "5");
CONF_mInt64(size_tiered_level_multiple_dupkey, "10");
CONF_mInt64(size_tiered_level_num, "7");

// disable compaction will take affect when tablet size is larger than this value
// for those small tablet, we will always compact them
CONF_mInt64(disable_compaction_tablet_size_threshold, "52428800");

CONF_Bool(enable_check_string_lengths, "true");

// Max row source mask memory bytes, default is 200M.
Expand Down
39 changes: 26 additions & 13 deletions be/src/storage/compaction_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ void CompactionManager::update_candidates(std::vector<CompactionCandidate> candi
}
}
for (auto& candidate : candidates) {
if (_check_disable_compaction(candidate)) {
continue;
}
if (candidate.tablet->enable_compaction()) {
VLOG(2) << "update candidate " << candidate.tablet->tablet_id() << " type "
<< starrocks::to_string(candidate.type) << " score " << candidate.score;
Expand Down Expand Up @@ -200,6 +203,25 @@ void CompactionManager::remove_candidate(int64_t tablet_id) {
}
}

bool CompactionManager::_check_disable_compaction(const CompactionCandidate& candidate) {
if (candidate.type == CompactionType::BASE_COMPACTION &&
_table_to_disable_deadline_map.find(candidate.tablet->tablet_meta()->table_id()) !=
_table_to_disable_deadline_map.end()) {
int64_t deadline = _table_to_disable_deadline_map[candidate.tablet->tablet_meta()->table_id()];
if (deadline > 0 && UnixSeconds() < deadline) {
if (candidate.tablet->data_size() > config::disable_compaction_tablet_size_threshold) {
return true;
}
} else {
// disable compaction deadline has passed, remove it from map
_table_to_disable_deadline_map.erase(candidate.tablet->tablet_meta()->table_id());
// check if the tablet should compact now after the deadline
update_tablet_async(candidate.tablet);
}
}
return false;
}

bool CompactionManager::_check_precondition(const CompactionCandidate& candidate) {
if (!candidate.tablet) {
LOG(WARNING) << "candidate with null tablet";
Expand All @@ -212,19 +234,6 @@ bool CompactionManager::_check_precondition(const CompactionCandidate& candidate
return false;
}

// check if the table base compaction is disabled
if (candidate.type == CompactionType::BASE_COMPACTION &&
_table_to_disable_deadline_map.find(tablet->tablet_meta()->table_id()) !=
_table_to_disable_deadline_map.end()) {
int64_t deadline = _table_to_disable_deadline_map[tablet->tablet_meta()->table_id()];
if (deadline > 0 && UnixSeconds() < deadline) {
VLOG(2) << "skip tablet:" << tablet->tablet_id() << " because table is disabled";
return false;
} else {
_table_to_disable_deadline_map.erase(tablet->tablet_meta()->table_id());
}
}

int64_t last_failure_ts = 0;
DataDir* data_dir = tablet->data_dir();
if (candidate.type == CUMULATIVE_COMPACTION) {
Expand Down Expand Up @@ -291,6 +300,10 @@ bool CompactionManager::pick_candidate(CompactionCandidate* candidate) {

auto iter = _compaction_candidates.begin();
while (iter != _compaction_candidates.end()) {
if (_check_disable_compaction(*iter)) {
_compaction_candidates.erase(iter++);
continue;
}
if (_check_precondition(*iter)) {
*candidate = *iter;
_compaction_candidates.erase(iter);
Expand Down
1 change: 1 addition & 0 deletions be/src/storage/compaction_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ class CompactionManager {
CompactionManager& operator=(CompactionManager&& compaction_manager) = delete;

void _dispatch_worker();
bool _check_disable_compaction(const CompactionCandidate& candidate);
bool _check_precondition(const CompactionCandidate& candidate);
void _schedule();
void _notify();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.starrocks.task.CompactionControlTask;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.quartz.CronExpression;
import org.quartz.CronScheduleBuilder;
import org.quartz.CronTrigger;
import org.quartz.DateBuilder;
Expand All @@ -40,6 +41,7 @@
import org.quartz.impl.StdSchedulerFactory;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

public class CompactionControlScheduler {
Expand Down Expand Up @@ -71,6 +73,7 @@ public void updateTableForbiddenTimeRanges(Long tableId, String crontab) throws
// Check if a job with the same JobKey already exists, and delete it if found
if (scheduler.checkExists(jobKey)) {
scheduler.deleteJob(jobKey);
tableCompactionMap.remove(tableId);
}

// If crontab is empty, remove the job and return
Expand All @@ -86,15 +89,28 @@ public void updateTableForbiddenTimeRanges(Long tableId, String crontab) throws
}

// Parse each field and build the Quartz Cron expression
if (!cronParts[0].equals("*")) {
throw new IllegalArgumentException("Invalid crontab format. The minute field must be '*'.");
}
String hour = cronParts[1];
String dayOfMonth = cronParts[2];
String month = cronParts[3];
String dayOfWeek = cronParts[4];

// If either dayOfMonth or dayOfWeek is *, set the other to ?
if (!dayOfMonth.equals("*") && !dayOfWeek.equals("*")) {
throw new IllegalArgumentException("For Quartz cron, either dayOfMonth or dayOfWeek should be '*', not both.");
throw new IllegalArgumentException("For Quartz cron, either day of month or day of week must be '*'.");
}

Set<Trigger> triggers = Sets.newHashSet();
// fully disable should trigger immediately
if (dayOfMonth.equals("*") && dayOfWeek.equals("*") && hour.equals("*") && month.equals("*")) {
triggers.add(TriggerBuilder.newTrigger()
.withIdentity("syncImmediateTrigger" + tableId, groupKeyString)
.startNow() // Start immediately
.build());
}

if (dayOfMonth.equals("*")) {
dayOfMonth = "?";
} else {
Expand All @@ -103,6 +119,11 @@ public void updateTableForbiddenTimeRanges(Long tableId, String crontab) throws

String modifiedCron = String.format("0 0/5 %s %s %s %s", hour, dayOfMonth, month, dayOfWeek);

if (!CronExpression.isValidExpression(modifiedCron)) {
throw new IllegalArgumentException("Invalid crontab format. You can check through "
+ "https://www.freeformatter.com/cron-expression-generator-quartz.html");
}

// Build the new JobDetail
JobDetail jobDetail = JobBuilder.newJob(DisableCompactionJob.class)
.withIdentity(jobKey)
Expand All @@ -117,15 +138,10 @@ public void updateTableForbiddenTimeRanges(Long tableId, String crontab) throws
.withIdentity(triggerKey)
.withSchedule(CronScheduleBuilder.cronSchedule(modifiedCron))
.build();

// Create a SimpleTrigger to execute the job immediately
Trigger immediateTrigger = TriggerBuilder.newTrigger()
.withIdentity("syncImmediateTrigger" + tableId, groupKeyString)
.startNow() // Start immediately
.build();
triggers.add(cronTrigger);

// Schedule the job with both triggers
scheduler.scheduleJob(jobDetail, Sets.newHashSet(cronTrigger, immediateTrigger), true);
scheduler.scheduleJob(jobDetail, triggers, true);

triggerSyncJobWithDelay();

Expand Down Expand Up @@ -202,14 +218,13 @@ public void execute(JobExecutionContext context) throws JobExecutionException {
CompactionControlTask task = new CompactionControlTask(backendId, tableCompactionMap);
// add task to send
batchTask.addTask(task);
LOG.debug("add compaction control task. backendId: {} data: {}", backendId, tableCompactionMap);
});
if (batchTask.getTaskNum() > 0) {
for (AgentTask task : batchTask.getAllTasks()) {
AgentTaskQueue.addTask(task);
}
AgentTaskExecutor.submit(batchTask);
LOG.debug("tablet[{}] send compaction control task. num: {}", batchTask.getTaskNum());
LOG.info("disable compaction: {}", tableCompactionMap);
}
}
}
Expand Down
24 changes: 16 additions & 8 deletions test/sql/test_compaction_cron/R/test_compaction_cron
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,6 @@ PROPERTIES (
"replication_num" = "3"
);
-- !result




-- name: test_disable_compaction
create database test_disable_compaction;
-- result:
Expand All @@ -72,6 +68,9 @@ use test_disable_compaction;
create table t(k int, v int) properties('base_compaction_forbidden_time_ranges'='* * * * *');
-- result:
-- !result
update information_schema.be_configs set value='-1' where name='disable_compaction_tablet_size_threshold';
-- result:
-- !result
select sleep(10);
-- result:
1
Expand Down Expand Up @@ -102,10 +101,6 @@ select min(NUM_VERSION) from information_schema.be_tablets t1, information_schem
-- result:
7
-- !result




-- name: test_primary_key
create table t(k int, v int) primary key(k) properties('base_compaction_forbidden_time_ranges'='* * * * *');
-- result:
Expand All @@ -117,4 +112,17 @@ create table t(k int, v int) primary key(k);
alter table t set ('base_compaction_forbidden_time_ranges'='* 7-20 * * *');
-- result:
E: (5064, 'Getting analyzing error. Detail message: Property base_compaction_forbidden_time_ranges not support primary keys table or cloud native table.')
-- !result
-- name: test_invalid_cron
create table t(k int, v int) properties('base_compaction_forbidden_time_ranges'='5 8-18 * * *');
-- result:
E: (1064, "Invalid crontab format. The minute field must be '*'.")
-- !result
create table t(k int, v int) properties('base_compaction_forbidden_time_ranges'='-1 * * * *');
-- result:
E: (1064, "Invalid crontab format. The minute field must be '*'.")
-- !result
create table t(k int, v int) properties('base_compaction_forbidden_time_ranges'='* * * * 0');
-- result:
E: (1064, 'Invalid crontab format. You can check through https://www.freeformatter.com/cron-expression-generator-quartz.html')
-- !result
5 changes: 5 additions & 0 deletions test/sql/test_compaction_cron/T/test_compaction_cron
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ show create table t;
create database test_disable_compaction;
use test_disable_compaction;
create table t(k int, v int) properties('base_compaction_forbidden_time_ranges'='* * * * *');
update information_schema.be_configs set value='-1' where name='disable_compaction_tablet_size_threshold';
select sleep(10);
insert into t values(1,1),(2,2),(3,3),(4,4),(5,5);
delete from t where k = 1;
Expand All @@ -25,3 +26,7 @@ create table t(k int, v int) primary key(k) properties('base_compaction_forbidd
create table t(k int, v int) primary key(k);
alter table t set ('base_compaction_forbidden_time_ranges'='* 7-20 * * *');

-- name: test_invalid_cron
create table t(k int, v int) properties('base_compaction_forbidden_time_ranges'='5 8-18 * * *');
create table t(k int, v int) properties('base_compaction_forbidden_time_ranges'='-1 * * * *');
create table t(k int, v int) properties('base_compaction_forbidden_time_ranges'='* * * * 0');

0 comments on commit b46e2b5

Please sign in to comment.