Skip to content

Commit

Permalink
StreamingTaskRunner: Close the rejection period updater executor serv…
Browse files Browse the repository at this point in the history
…ice (#17490)
  • Loading branch information
adithyachakilam authored Nov 19, 2024
1 parent 8853c7e commit c1d6328
Showing 1 changed file with 6 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -249,6 +250,7 @@ public enum Status

private volatile DateTime minMessageTime;
private volatile DateTime maxMessageTime;
private final ScheduledExecutorService rejectionPeriodUpdaterExec;

public SeekableStreamIndexTaskRunner(
final SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType, RecordType> task,
Expand All @@ -273,15 +275,15 @@ public SeekableStreamIndexTaskRunner(

minMessageTime = ioConfig.getMinimumMessageTime().or(DateTimes.MIN);
maxMessageTime = ioConfig.getMaximumMessageTime().or(DateTimes.MAX);
rejectionPeriodUpdaterExec = Execs.scheduledSingleThreaded("RejectionPeriodUpdater-Exec--%d");

if (ioConfig.getRefreshRejectionPeriodsInMinutes() != null) {
Execs.scheduledSingleThreaded("RejectionPeriodUpdater-Exec--%d")
rejectionPeriodUpdaterExec
.scheduleWithFixedDelay(
this::refreshMinMaxMessageTime,
ioConfig.getRefreshRejectionPeriodsInMinutes(),
ioConfig.getRefreshRejectionPeriodsInMinutes(),
TimeUnit.MINUTES
);
TimeUnit.MINUTES);
}
resetNextCheckpointTime();
}
Expand Down Expand Up @@ -940,6 +942,7 @@ public void onFailure(Throwable t)
toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode);
toolbox.getDataSegmentServerAnnouncer().unannounce();
}
rejectionPeriodUpdaterExec.shutdown();
}
catch (Throwable e) {
if (caughtExceptionOuter != null) {
Expand Down

0 comments on commit c1d6328

Please sign in to comment.