Skip to content

Commit

Permalink
Improved non-blocking algorithm of AdaptiveExecutionStrategy.
Browse files Browse the repository at this point in the history
Now only starting one pending producer also in case of CAS failures.

Signed-off-by: Simone Bordet <[email protected]>
  • Loading branch information
sbordet committed Nov 2, 2022
1 parent deb4f3a commit e2b00bd
Showing 1 changed file with 9 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ public class AdaptiveExecutionStrategy extends ContainerLifeCycle implements Exe
/**
* The production state of the strategy.
*/
static final int IDLE = 0; // No tasks or producers.
static final int PRODUCING = 1; // There is an active producing thread.
static final int REPRODUCING = 2; // There is an active producing thread and demand for more production.
private static final int IDLE = 0; // No tasks or producers.
private static final int PRODUCING = 1; // There is an active producing thread.
private static final int REPRODUCING = 2; // There is an active producing thread and demand for more production.

/**
* The sub-strategies used by the strategy to consume tasks that are produced.
Expand Down Expand Up @@ -327,18 +327,17 @@ private SubStrategy selectSubStrategy(Runnable task, boolean nonBlocking)
return SubStrategy.PRODUCE_CONSUME;

// check if a pending producer is available.
int executed = 0;
boolean tryExecuted = false;
while (true)
{
long biState = _state.get();
int state = AtomicBiInteger.getLo(biState);
int pending = AtomicBiInteger.getHi(biState);

// If a pending producer is available or one can be started
pending += executed;
if (pending <= 0 && _tryExecutor.tryExecute(_runPendingProducer))
if (tryExecuted || pending <= 0 && _tryExecutor.tryExecute(_runPendingProducer))
{
executed++;
tryExecuted = true;
pending++;
}

Expand Down Expand Up @@ -369,18 +368,17 @@ private SubStrategy selectSubStrategy(Runnable task, boolean nonBlocking)
if (!nonBlocking)
{
// check if a pending producer is available.
int executed = 0;
boolean tryExecuted = false;
while (true)
{
long biState = _state.get();
int state = AtomicBiInteger.getLo(biState);
int pending = AtomicBiInteger.getHi(biState);

// If a pending producer is available or one can be started
pending += executed;
if (pending <= 0 && _tryExecutor.tryExecute(_runPendingProducer))
if (tryExecuted || pending <= 0 && _tryExecutor.tryExecute(_runPendingProducer))
{
executed++;
tryExecuted = true;
pending++;
}

Expand Down

0 comments on commit e2b00bd

Please sign in to comment.