Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
[ASTS] feat(bucket-retrieval): Dynamic Task Scheduler for updating bu…
Browse files Browse the repository at this point in the history
…cket list
  • Loading branch information
mdaudali committed Jul 13, 2024
1 parent 0018eda commit 9e296fd
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* (c) Copyright 2024 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.sweep.asts;

import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.logger.SafeLogger;
import com.palantir.logsafe.logger.SafeLoggerFactory;
import com.palantir.refreshable.Refreshable;
import java.time.Duration;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

// This feels like it should already exist.
public final class DynamicTaskScheduler {
private static final SafeLogger log = SafeLoggerFactory.get(DynamicTaskScheduler.class);
private final ScheduledExecutorService scheduledExecutorService;
private final Refreshable<Duration> automaticSweepRefreshDelay;
private final Runnable task;
private final AtomicBoolean isStarted = new AtomicBoolean(false);

private DynamicTaskScheduler(
ScheduledExecutorService scheduledExecutorService,
Refreshable<Duration> automaticSweepRefreshDelay,
Runnable task) {
this.scheduledExecutorService = scheduledExecutorService;
this.automaticSweepRefreshDelay = automaticSweepRefreshDelay;
this.task = task;
}

public static DynamicTaskScheduler create(
ScheduledExecutorService scheduledExecutorService,
Refreshable<Duration> automaticSweepRefreshDelay,
Runnable task) {
return new DynamicTaskScheduler(scheduledExecutorService, automaticSweepRefreshDelay, task);
}

public void start() {
if (isStarted.compareAndSet(false, true)) {
scheduleNextIteration(automaticSweepRefreshDelay.get());
} else {
log.warn("Attempted to start an already started task");
}
}

private void runOneIteration() {
Duration delay = automaticSweepRefreshDelay.get();
try {
log.info("Running task");
task.run();
} catch (Exception e) {
log.warn("Failed to run task. Will retry in the next interval", SafeArg.of("delay", delay), e);
}
scheduleNextIteration(delay);
}

private void scheduleNextIteration(Duration delay) {
log.info("Scheduling next iteration", SafeArg.of("delay", delay));
scheduledExecutorService.schedule(this::runOneIteration, delay.toMillis(), TimeUnit.MILLISECONDS);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* (c) Copyright 2024 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.sweep.asts;

import static com.palantir.logsafe.testing.Assertions.assertThat;

import com.palantir.refreshable.Refreshable;
import com.palantir.refreshable.SettableRefreshable;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.jmock.lib.concurrent.DeterministicScheduler;
import org.junit.jupiter.api.Test;

public final class DynamicTaskSchedulerTest {
private final DeterministicScheduler scheduler = new DeterministicScheduler();
private final SettableRefreshable<Duration> taskDelay = Refreshable.create(Duration.ofSeconds(1));

private final AtomicInteger taskRunCount = new AtomicInteger(0);
private final Runnable task = taskRunCount::incrementAndGet;

private final DynamicTaskScheduler taskRunner = DynamicTaskScheduler.create(scheduler, taskDelay, task);

@Test
public void doesNotRunTaskBeforeStarted() {
tick(Duration.ofDays(1));
assertThat(taskRunCount.get()).isEqualTo(0);
}

@Test
public void runsFirstTaskAfterInitialDelayOnceStarted() {
taskRunner.start();
tick(Duration.ofMillis(999));
assertThat(taskRunCount.get()).isEqualTo(0);

tick(Duration.ofMillis(2));
assertThat(taskRunCount.get()).isEqualTo(1);
}

@Test
public void runsSubsequentTasksWithUpdatedDelaysOnNextSchedule() {
taskRunner.start();
taskDelay.update(Duration.ofSeconds(2));
tick(Duration.ofMillis(1001));
assertThat(taskRunCount.get()).isEqualTo(1);

tick(Duration.ofMillis(1001));
assertThat(taskRunCount.get()).isEqualTo(1);

taskDelay.update(Duration.ofMillis(100));
tick(Duration.ofSeconds(1));
assertThat(taskRunCount.get()).isEqualTo(2);

tick(Duration.ofMillis(100));
assertThat(taskRunCount.get()).isEqualTo(3);
}

private void tick(Duration duration) {
scheduler.tick(duration.toMillis(), TimeUnit.MILLISECONDS);
}
}

0 comments on commit 9e296fd

Please sign in to comment.