Skip to content

Commit

Permalink
Reduce CPU usage when client idle and batch message enable
Browse files Browse the repository at this point in the history
Reduce CPU usage when client idle and batch message enable
  • Loading branch information
stillerrr committed Aug 17, 2024
1 parent 9d9b8df commit 5f70dee
Showing 1 changed file with 10 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
Expand Down Expand Up @@ -84,6 +87,7 @@
import org.apache.pulsar.client.impl.metrics.UpDownCounter;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.client.util.MathUtils;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.MessageMetadata;
Expand Down Expand Up @@ -126,6 +130,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
private volatile long producerDeadline = 0; // gets set on first successful connection

private final BatchMessageContainerBase batchMessageContainer;
private ScheduledExecutorService batchMessageScheduleService;
private CompletableFuture<MessageId> lastSendFuture = CompletableFuture.completedFuture(null);
private LastSendFutureWrapper lastSendFutureWrapper = LastSendFutureWrapper.create(lastSendFuture);

Expand Down Expand Up @@ -168,7 +173,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
// The goal is to optimize batch density while also ensuring that a producer never waits longer than the configured
// batchingMaxPublishDelayMicros to send a batch.
// Only update from within synchronized block on this producer.
private ScheduledFuture<?> batchFlushTask;
private java.util.concurrent.ScheduledFuture<?> batchFlushTask;
// The time, in nanos, of the last batch send. This field ensures that we don't deliver batches via the
// batchFlushTask before the batchingMaxPublishDelayMicros duration has passed.
private long lastBatchSendNanoTime;
Expand Down Expand Up @@ -266,6 +271,8 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration
}
this.batchMessageContainer = (BatchMessageContainerBase) containerBuilder.build();
this.batchMessageContainer.setProducer(this);
ThreadFactory threadFactory = new ExecutorProvider.ExtendedThreadFactory("pulsar-batch", Thread.currentThread().isDaemon());
batchMessageScheduleService = Executors.newSingleThreadScheduledExecutor(threadFactory);
} else {
this.batchMessageContainer = null;
}
Expand Down Expand Up @@ -2226,9 +2233,8 @@ private void maybeScheduleBatchFlushTask() {

// must acquire semaphore before calling
private void scheduleBatchFlushTask(long batchingDelayMicros) {
ClientCnx cnx = cnx();
if (cnx != null && isBatchMessagingEnabled()) {
this.batchFlushTask = cnx.ctx().executor().schedule(catchingAndLoggingThrowables(this::batchFlushTask),
if (isBatchMessagingEnabled()) {
this.batchFlushTask = this.batchMessageScheduleService.schedule(catchingAndLoggingThrowables(this::batchFlushTask),
batchingDelayMicros, TimeUnit.MICROSECONDS);
}
}
Expand Down

0 comments on commit 5f70dee

Please sign in to comment.