Skip to content

Commit

Permalink
Fix busy-waiting loops iluwatar#2977
Browse files Browse the repository at this point in the history
  • Loading branch information
shree243 committed Jun 18, 2024
1 parent 5dce3d8 commit 3c79c9c
Show file tree
Hide file tree
Showing 8 changed files with 249 additions and 158 deletions.
45 changes: 27 additions & 18 deletions commander/src/main/java/com/iluwatar/commander/Retry.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;

Expand Down Expand Up @@ -66,46 +67,54 @@ public interface HandleErrorIssue<T> {
private final AtomicInteger attempts;
private final Predicate<Exception> test;
private final List<Exception> errors;
private final ScheduledExecutorService scheduler;

Retry(Operation op, HandleErrorIssue<T> handleError, int maxAttempts,
long maxDelay, Predicate<Exception>... ignoreTests) {
long maxDelay, Predicate<Exception>... ignoreTests) {
this.op = op;
this.handleError = handleError;
this.maxAttempts = maxAttempts;
this.maxDelay = maxDelay;
this.attempts = new AtomicInteger();
this.test = Arrays.stream(ignoreTests).reduce(Predicate::or).orElse(e -> false);
this.errors = new ArrayList<>();
this.scheduler = Executors.newScheduledThreadPool(1);
}

/**
* Performing the operation with retries.
*
* @param list is the exception list
* @param obj is the parameter to be passed into handleIsuue method
* @param obj is the parameter to be passed into handleIssue method
*/

public void perform(List<Exception> list, T obj) {
do {
attempts.set(0); // reset attempts before starting
executeWithRetry(list, obj);
}

private void executeWithRetry(List<Exception> list, T obj) {
scheduler.schedule(() -> {
try {
op.operation(list);
return;
} catch (Exception e) {
this.errors.add(e);
if (this.attempts.incrementAndGet() >= this.maxAttempts || !this.test.test(e)) {
this.handleError.handleIssue(obj, e);
return; //return here... don't go further
}
try {
long testDelay =
(long) Math.pow(2, this.attempts.intValue()) * 1000 + RANDOM.nextInt(1000);
long delay = Math.min(testDelay, this.maxDelay);
Thread.sleep(delay);
} catch (InterruptedException f) {
//ignore
errors.add(e);
if (attempts.incrementAndGet() >= maxAttempts || !test.test(e)) {
handleError.handleIssue(obj, e);
scheduler.shutdown();
} else {
long testDelay = (long) Math.pow(2, attempts.intValue()) * 1000 + RANDOM.nextInt(1000);
long delay = Math.min(testDelay, maxDelay);
executeWithRetry(list, obj);
}
}
} while (true);
}, calculateDelay(), TimeUnit.MILLISECONDS);
}

private long calculateDelay() {
if (attempts.get() == 0) {
return 0;
}
long testDelay = (long) Math.pow(2, attempts.intValue()) * 1000 + RANDOM.nextInt(1000);
return Math.min(testDelay, maxDelay);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,18 @@
*/
package com.iluwatar.logaggregation;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Responsible for collecting and buffering logs from different services.
* Once the logs reach a certain threshold or after a certain time interval,
* they are flushed to the central log store. This class ensures logs are collected
* and processed asynchronously and efficiently, providing both an immediate collection
* they are flushed to the central log store. This class ensures logs are
* collected
* and processed asynchronously and efficiently, providing both an immediate
* collection
* and periodic flushing.
*/
@Slf4j
Expand All @@ -45,14 +45,14 @@ public class LogAggregator {
private final CentralLogStore centralLogStore;
private final ConcurrentLinkedQueue<LogEntry> buffer = new ConcurrentLinkedQueue<>();
private final LogLevel minLogLevel;
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private final AtomicInteger logCount = new AtomicInteger(0);

/**
* constructor of LogAggregator.
*
* @param centralLogStore central log store implement
* @param minLogLevel min log level to store log
* @param minLogLevel min log level to store log
*/
public LogAggregator(CentralLogStore centralLogStore, LogLevel minLogLevel) {
this.centralLogStore = centralLogStore;
Expand Down Expand Up @@ -86,15 +86,23 @@ public void collectLog(LogEntry logEntry) {
/**
* Stops the log aggregator service and flushes any remaining logs to
* the central log store.
*
* @throws InterruptedException If any thread has interrupted the current thread.
*/
public void stop() throws InterruptedException {
executorService.shutdownNow();
if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
LOGGER.error("Log aggregator did not terminate.");
public void stop() {
scheduler.shutdown();
try {
if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) {
LOGGER.error("Log aggregator did not terminate.");
}
}
} catch (InterruptedException e) {
scheduler.shutdownNow();
Thread.currentThread().interrupt();
LOGGER.error("Log aggregator thread interrupted.", e);
} finally {
flushBuffer();
}
flushBuffer();
}

private void flushBuffer() {
Expand All @@ -106,15 +114,6 @@ private void flushBuffer() {
}

private void startBufferFlusher() {
executorService.execute(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(5000); // Flush every 5 seconds.
flushBuffer();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
scheduler.scheduleAtFixedRate(this::flushBuffer, 5, 5, TimeUnit.SECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,21 @@
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/

package com.iluwatar.queue.load.leveling;

import lombok.extern.slf4j.Slf4j;

/**
* ServiceExecuotr class. This class will pick up Messages one by one from the Blocking Queue and
* ServiceExecutor class. This class will pick up Messages one by one from the
* Blocking Queue and
* process them.
*/
@Slf4j
public class ServiceExecutor implements Runnable {

private final MessageQueue msgQueue;
private volatile boolean isRunning = true;

public ServiceExecutor(MessageQueue msgQueue) {
this.msgQueue = msgQueue;
Expand All @@ -42,21 +45,31 @@ public ServiceExecutor(MessageQueue msgQueue) {
/**
* The ServiceExecutor thread will retrieve each message and process it.
*/
@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
while (isRunning) {
var msg = msgQueue.retrieveMsg();

if (null != msg) {
if (msg != null) {
LOGGER.info(msg + " is served.");
} else {
LOGGER.info("Service Executor: Waiting for Messages to serve .. ");
}

// Simulating processing time
Thread.sleep(1000);
}
} catch (Exception e) {
LOGGER.error(e.getMessage());
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Reset interrupt status
LOGGER.error("ServiceExecutor thread interrupted", e);
}
}

/**
* Stops the execution of the ServiceExecutor thread.
*/
public void stop() {
isRunning = false;
}
}
56 changes: 39 additions & 17 deletions retry/src/main/java/com/iluwatar/retry/Retry.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,16 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;

/**
* Decorates {@link BusinessOperation business operation} with "retry" capabilities.
* Decorates {@link BusinessOperation business operation} with "retry"
* capabilities.
*
* @param <T> the remote op's return type
*/
Expand All @@ -43,29 +48,31 @@ public final class Retry<T> implements BusinessOperation<T> {
private final AtomicInteger attempts;
private final Predicate<Exception> test;
private final List<Exception> errors;
private final ScheduledExecutorService scheduler;

/**
* Ctor.
*
* @param op the {@link BusinessOperation} to retry
* @param maxAttempts number of times to retry
* @param delay delay (in milliseconds) between attempts
* @param ignoreTests tests to check whether the remote exception can be ignored. No exceptions
* @param ignoreTests tests to check whether the remote exception can be
* ignored. No exceptions
* will be ignored if no tests are given
*/
@SafeVarargs
public Retry(
BusinessOperation<T> op,
int maxAttempts,
long delay,
Predicate<Exception>... ignoreTests
) {
Predicate<Exception>... ignoreTests) {
this.op = op;
this.maxAttempts = maxAttempts;
this.delay = delay;
this.attempts = new AtomicInteger();
this.test = Arrays.stream(ignoreTests).reduce(Predicate::or).orElse(e -> false);
this.errors = new ArrayList<>();
this.scheduler = Executors.newScheduledThreadPool(1);
}

/**
Expand All @@ -88,22 +95,37 @@ public int attempts() {

@Override
public T perform() throws BusinessException {
do {
try {
return this.op.perform();
} catch (BusinessException e) {
this.errors.add(e);
CompletableFuture<T> future = new CompletableFuture<>();
performWithRetry(future);
try {
return future.get();
} catch (Exception e) {
throw new BusinessException("Operation failed after retries");
} finally {
scheduler.shutdown();
}
}

private void performWithRetry(CompletableFuture<T> future) {
scheduler.schedule(() -> {
try {
future.complete(this.op.perform());
} catch (Exception e) {
this.errors.add((Exception) e);
if (this.attempts.incrementAndGet() >= this.maxAttempts || !this.test.test(e)) {
throw e;
}

try {
Thread.sleep(this.delay);
} catch (InterruptedException f) {
//ignore
future.completeExceptionally(e);
scheduler.shutdown();
} else {
performWithRetry(future);
}
}
} while (true);
}, calculateDelay(), TimeUnit.MILLISECONDS);
}

private long calculateDelay() {
if (attempts.get() == 0) {
return 0;
}
return delay;
}
}
Loading

0 comments on commit 3c79c9c

Please sign in to comment.