Skip to content

Commit

Permalink
Updated comments to reflect that throttler is not message-specific
Browse files Browse the repository at this point in the history
  • Loading branch information
Geoff Anderson committed Jun 4, 2015
1 parent 6842ed1 commit d586fb0
Showing 1 changed file with 18 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@


/**
* This class helps producers throttle their maximum message throughput.
* This class helps producers throttle throughput.
*
* The resulting average throughput will be approximately
* min(targetThroughput, maximumPossibleThroughput)
Expand All @@ -31,7 +31,9 @@
* throttler.throttle();
* }
* }
* </pre>
* </pre>
*
* Note that this can be used to throttle message throughput or data throughput.
*/
public class ThroughputThrottler {

Expand All @@ -44,25 +46,35 @@ public class ThroughputThrottler {
long targetThroughput = -1;
long startMs;

/**
* @param targetThroughput Can be messages/sec or bytes/sec
* @param startMs When the very first message is sent
*/
public ThroughputThrottler(long targetThroughput, long startMs) {
this.startMs = startMs;
this.targetThroughput = targetThroughput;
this.sleepTimeNs = NS_PER_SEC / targetThroughput;
}

public boolean shouldThrottle(long messageNum, long sendStartMs) {
/**
* @param amountSoFar bytes produced so far if you want to throttle data throughput, or
* messages produced so far if you want to throttle message throughput.
* @param sendStartMs timestamp of the most recently sent message
* @return
*/
public boolean shouldThrottle(long amountSoFar, long sendStartMs) {
if (this.targetThroughput <= 0) {
// No throttling in this case
return false;
}

float elapsedMs = (sendStartMs - startMs) / 1000.f;
return elapsedMs > 0 && (messageNum / elapsedMs) > this.targetThroughput;
return elapsedMs > 0 && (amountSoFar / elapsedMs) > this.targetThroughput;
}

public void throttle() {
// throttle message throughput by sleeping, on average,
// (1 / this.throughput) seconds between each sent message
// throttle throughput by sleeping, on average,
// (1 / this.throughput) seconds between "things sent"
sleepDeficitNs += sleepTimeNs;

// If enough sleep deficit has accumulated, sleep a little
Expand Down

0 comments on commit d586fb0

Please sign in to comment.