Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix busy-waiting loops #2977 #3019

Closed
wants to merge 11 commits into from
27 changes: 15 additions & 12 deletions commander/src/main/java/com/iluwatar/commander/Retry.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;

Expand Down Expand Up @@ -59,6 +62,7 @@ public interface HandleErrorIssue<T> {

private static final SecureRandom RANDOM = new SecureRandom();

private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final Operation op;
private final HandleErrorIssue<T> handleError;
private final int maxAttempts;
Expand Down Expand Up @@ -86,26 +90,25 @@ public interface HandleErrorIssue<T> {
*/

public void perform(List<Exception> list, T obj) {
do {
scheduler.schedule(() -> {
try {
op.operation(list);
return;
} catch (Exception e) {
}catch (Exception e){
this.errors.add(e);
if (this.attempts.incrementAndGet() >= this.maxAttempts || !this.test.test(e)) {
this.handleError.handleIssue(obj, e);
scheduler.shutdown();
return; //return here... don't go further
}
try {
long testDelay =
(long) Math.pow(2, this.attempts.intValue()) * 1000 + RANDOM.nextInt(1000);
long delay = Math.min(testDelay, this.maxDelay);
Thread.sleep(delay);
} catch (InterruptedException f) {
//ignore
}
perform(list, obj);
}
} while (true);
}, calculateDelay(), TimeUnit.MILLISECONDS);
}

private long calculateDelay(){
long testDelay =
(long) Math.pow(2, this.attempts.intValue()) * 1000 + RANDOM.nextInt(1000);
return Math.min(testDelay, this.maxDelay);
}

}
61 changes: 50 additions & 11 deletions microservices-log-aggregation/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,58 @@ The `LogAggregator` collects logs from various services and stores them in the `
```java
public class LogAggregator {

private final CentralLogStore centralLogStore;
private final LogLevel minimumLogLevel;

public LogAggregator(CentralLogStore centralLogStore, LogLevel minimumLogLevel) {
this.centralLogStore = centralLogStore;
this.minimumLogLevel = minimumLogLevel;
}
private static final int BUFFER_THRESHOLD = 3;
private final CentralLogStore centralLogStore;
private final ConcurrentLinkedQueue<LogEntry> buffer = new ConcurrentLinkedQueue<>();
private final LogLevel minLogLevel;
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final AtomicInteger logCount = new AtomicInteger(0);

public LogAggregator(CentralLogStore centralLogStore, LogLevel minLogLevel) {
this.centralLogStore = centralLogStore;
this.minLogLevel = minLogLevel;
startBufferFlusher();
}

public void collectLog(LogEntry logEntry) {
if (logEntry.getLevel() == null || minLogLevel == null) {
LOGGER.warn("Log level or threshold level is null. Skipping.");
return;
}

if (logEntry.getLevel().compareTo(minLogLevel) < 0) {
LOGGER.debug("Log level below threshold. Skipping.");
return;
}

buffer.offer(logEntry);

if (logCount.incrementAndGet() >= BUFFER_THRESHOLD) {
flushBuffer();
}
}

public void stop() {
executorService.shutdownNow();
if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
LOGGER.error("Log aggregator did not terminate.");
}
flushBuffer();
}

public void collectLog(LogEntry logEntry) {
if (logEntry.getLogLevel().compareTo(minimumLogLevel) >= 0) {
centralLogStore.storeLog(logEntry);
private void flushBuffer() {
LogEntry logEntry;
while ((logEntry = buffer.poll()) != null) {
centralLogStore.storeLog(logEntry);
logCount.decrementAndGet();
}
}

private void startBufferFlusher() {
scheduler.scheduleWithFixedDelay(this::flushBuffer, 0, 5000, TimeUnit.MILLISECONDS);
}
}
}
```

Expand Down Expand Up @@ -107,7 +146,7 @@ The `main` application creates services, generates logs, aggregates, and finally
```java
public class App {

public static void main(String[] args) throws InterruptedException {
public static void main(String[] args) {
final CentralLogStore centralLogStore = new CentralLogStore();
final LogAggregator aggregator = new LogAggregator(centralLogStore, LogLevel.INFO);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
package com.iluwatar.logaggregation;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -45,7 +45,7 @@ public class LogAggregator {
private final CentralLogStore centralLogStore;
private final ConcurrentLinkedQueue<LogEntry> buffer = new ConcurrentLinkedQueue<>();
private final LogLevel minLogLevel;
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final AtomicInteger logCount = new AtomicInteger(0);

/**
Expand Down Expand Up @@ -90,8 +90,8 @@ public void collectLog(LogEntry logEntry) {
* @throws InterruptedException If any thread has interrupted the current thread.
*/
public void stop() throws InterruptedException {
executorService.shutdownNow();
if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) {
LOGGER.error("Log aggregator did not terminate.");
}
flushBuffer();
Expand All @@ -106,15 +106,7 @@ private void flushBuffer() {
}

private void startBufferFlusher() {
executorService.execute(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(5000); // Flush every 5 seconds.
flushBuffer();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
//flush every 5 seconds
scheduler.scheduleWithFixedDelay(this::flushBuffer, 0, 5000, TimeUnit.MILLISECONDS);
}
}
75 changes: 46 additions & 29 deletions queue-based-load-leveling/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,49 +99,66 @@ The `ServiceExecutor` class represents the task consumer. It retrieves tasks fro

```java
public class ServiceExecutor implements Runnable {

private MessageQueue msgQueue;

public ServiceExecutor(MessageQueue msgQueue) {
this.msgQueue = msgQueue;
}
private final MessageQueue msgQueue;
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

@Override
public void run() {
while (true) {
Message message = msgQueue.getMessage(); // Retrieve a message from the queue
if (message != null) {
// Process the message
} else {
// No more messages to process
break;
}
public ServiceExecutor(MessageQueue msgQueue) {
this.msgQueue = msgQueue;
}

public void run() {
scheduler.scheduleWithFixedDelay(() -> {
var msg = msgQueue.retrieveMsg();

if (null != msg) {
LOGGER.info(msg + " is served.");
} else {
LOGGER.info("Service Executor: Waiting for Messages to serve .. ");
}
}, 0, 1, TimeUnit.SECONDS);
}

public void shutdown(int shutdownTime) {
try {
if (!scheduler.awaitTermination(shutdownTime, TimeUnit.SECONDS)) {
LOGGER.info("Executor was shut down and Exiting.");
scheduler.shutdownNow();
}
} catch (InterruptedException e) {
LOGGER.error(e.getMessage());
}
}
}
}
```

Finally, we have the `App` class which sets up the `TaskGenerator` and `ServiceExecutor` threads and submits them to an `ExecutorService`.

```java
public class App {
public static void main(String[] args) {
var msgQueue = new MessageQueue();
public static void main(String[] args) {
ExecutorService executor = null;

final var taskRunnable1 = new TaskGenerator(msgQueue, 5);
final var taskRunnable2 = new TaskGenerator(msgQueue, 1);
final var taskRunnable3 = new TaskGenerator(msgQueue, 2);
var msgQueue = new MessageQueue();

final var srvRunnable = new ServiceExecutor(msgQueue);
final var taskRunnable1 = new TaskGenerator(msgQueue, 5);
final var taskRunnable2 = new TaskGenerator(msgQueue, 1);
final var taskRunnable3 = new TaskGenerator(msgQueue, 2);

ExecutorService executor = Executors.newFixedThreadPool(2);
executor.submit(taskRunnable1);
executor.submit(taskRunnable2);
executor.submit(taskRunnable3);
executor.submit(srvRunnable);
final var srvRunnable = new ServiceExecutor(msgQueue);

executor.shutdown();
}
executor = Executors.newFixedThreadPool(2);
executor.submit(taskRunnable1);
executor.submit(taskRunnable2);
executor.submit(taskRunnable3);

executor.submit(srvRunnable);

executor.shutdown();

srvRunnable.shutdown(SHUTDOWN_TIME);

}
}
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;

/**
Expand Down Expand Up @@ -104,12 +103,7 @@ public static void main(String[] args) {
+ " Executor will shutdown only after all the Threads are completed.");
executor.shutdown();

// Wait for SHUTDOWN_TIME seconds for all the threads to complete
// their tasks and then shut down the executor and then exit.
if (!executor.awaitTermination(SHUTDOWN_TIME, TimeUnit.SECONDS)) {
LOGGER.info("Executor was shut down and Exiting.");
executor.shutdownNow();
}
srvRunnable.shutdown(SHUTDOWN_TIME);
} catch (Exception e) {
LOGGER.error(e.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,18 @@
package com.iluwatar.queue.load.leveling;

import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
* ServiceExecuotr class. This class will pick up Messages one by one from the Blocking Queue and
* process them.
*/
@Slf4j
public class ServiceExecutor implements Runnable {

private final MessageQueue msgQueue;
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

public ServiceExecutor(MessageQueue msgQueue) {
this.msgQueue = msgQueue;
Expand All @@ -43,19 +46,26 @@ public ServiceExecutor(MessageQueue msgQueue) {
* The ServiceExecutor thread will retrieve each message and process it.
*/
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
var msg = msgQueue.retrieveMsg();
scheduler.scheduleWithFixedDelay(() -> {
var msg = msgQueue.retrieveMsg();

if (null != msg) {
LOGGER.info(msg + " is served.");
} else {
LOGGER.info("Service Executor: Waiting for Messages to serve .. ");
}
if (null != msg) {
LOGGER.info(msg + " is served.");
} else {
LOGGER.info("Service Executor: Waiting for Messages to serve .. ");
}
}, 0, 1, TimeUnit.SECONDS);
}

Thread.sleep(1000);
public void shutdown(int shutdownTime) {
// Wait for SHUTDOWN_TIME seconds for all the threads to complete
// their tasks and then shut down the executor and then exit.
try {
if (!scheduler.awaitTermination(shutdownTime, TimeUnit.SECONDS)) {
LOGGER.info("Executor was shut down and Exiting.");
scheduler.shutdownNow();
}
} catch (Exception e) {
} catch (InterruptedException e) {
LOGGER.error(e.getMessage());
}
}
Expand Down
Loading
Loading