Skip to content

Commit

Permalink
reduce synchronization, cleanup code
Browse files Browse the repository at this point in the history
  • Loading branch information
ar committed Jan 2, 2025
1 parent 53b10f2 commit d4ee1e7
Showing 1 changed file with 38 additions and 28 deletions.
66 changes: 38 additions & 28 deletions jpos/src/main/java/org/jpos/util/ThroughputControl.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.jpos.util;

import java.time.Instant;
import java.util.concurrent.atomic.AtomicInteger;

/**
* ThroughputControl limits the throughput
Expand All @@ -42,70 +43,79 @@
public class ThroughputControl {
private int[] period;
private int[] max;
private int[] cnt;
private AtomicInteger[] cnt;
private long[] start;
private long[] sleep;
private static final long MIN_SLEEP = 50L;
private static final long MAX_SLEEP = 500L;

/**
* @param maxTransactions ditto
* @param periodInMillis ditto
* @param maxTransactions Maximum number of transactions allowed.
* @param periodInMillis Duration (in milliseconds) over which the maximum is calculated.
*/
public ThroughputControl (int maxTransactions, int periodInMillis) {
this (new int[] { maxTransactions },
new int[] { periodInMillis });
}
/**
* @param maxTransactions ditto
* @param periodInMillis ditto
* @param maxTransactions Array of maximum transactions allowed for each period.
* @param periodInMillis Array of periods (in milliseconds) corresponding to each maximum.
*/
public ThroughputControl (int[] maxTransactions, int[] periodInMillis) {
super();
int l = maxTransactions.length;
period = new int[l];
max = new int[l];
cnt = new int[l];
cnt = new AtomicInteger[l];
start = new long[l];
sleep = new long[l];
for (int i=0; i<l; i++) {
this.max[i] = maxTransactions[i];
this.period[i] = periodInMillis[i];
this.sleep[i] = Math.min(Math.max (periodInMillis[i]/10, 500L),50L);
// Calculate sleep time, ensuring it is within the defined minimum and maximum bounds.
this.sleep[i] = Math.min(Math.max(periodInMillis[i] / 10, MIN_SLEEP), MAX_SLEEP);
this.start[i] = Instant.now().toEpochMilli();
this.cnt[i] = new AtomicInteger(0);
}
}

/**
* control should be called on every transaction.
* it may sleep for a while in order to control the system throughput
*
*
* @return aprox sleep time or zero if no sleep
*/
public long control() {
boolean delayed = false;
long init = Instant.now().toEpochMilli();
for (int i=0; i<cnt.length; i++) {
synchronized (this) {
cnt[i]++;
}
do {
if (cnt[i] > max[i]) {
delayed = true;
try {
Thread.sleep (sleep[i]);
} catch (InterruptedException e) { }
}
synchronized (this) {
long now = Instant.now().toEpochMilli();
if (now - start[i] > period[i]) {
long elapsed = now - start[i];
int allowed = (int) (elapsed * max[i] / period[i]);
start[i] = now;
cnt[i] = Math.max (cnt[i] - allowed, 0);
for (int i = 0; i < cnt.length; i++) {
if (cnt[i].incrementAndGet() > max[i]) {
delayed = true;
while (cnt[i].get() > max[i]) {
try {
Thread.sleep(sleep[i]);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

checkAndReset(i);
}
} while (cnt[i] > max[i]);
}
}
return delayed ? Instant.now().toEpochMilli() - init : 0L;
}
}

private synchronized void checkAndReset(int i) {
long now = Instant.now().toEpochMilli();
if (now - start[i] > period[i]) {
long elapsed = now - start[i];
int allowed = (int) (elapsed * max[i] / period[i]);
start[i] = now;
cnt[i].addAndGet(-allowed);

if (cnt[i].get() < 0) {
cnt[i].set(0);
}
}
}
}

0 comments on commit d4ee1e7

Please sign in to comment.