Skip to content

Commit

Permalink
Abstract AbstractDelayedDeliveryTracker & remove `super.getPriorityQu…
Browse files Browse the repository at this point in the history
…eue()` & rebase master
  • Loading branch information
coderzc committed Nov 4, 2022
1 parent ca7b00c commit c925582
Show file tree
Hide file tree
Showing 8 changed files with 521 additions and 414 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.delayed;

import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
import java.time.Clock;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;

@Slf4j
public abstract class AbstractDelayedDeliveryTracker implements DelayedDeliveryTracker, TimerTask {

protected final PersistentDispatcherMultipleConsumers dispatcher;

// Reference to the shared (per-broker) timer for delayed delivery
protected final Timer timer;

// Current timeout or null if not set
protected Timeout timeout;

// Timestamp at which the timeout is currently set
private long currentTimeoutTarget;

// Last time the TimerTask was triggered for this class
private long lastTickRun;

protected long tickTimeMillis;

protected final Clock clock;

private final boolean isDelayedDeliveryDeliverAtTimeStrict;

public AbstractDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer,
long tickTimeMillis,
boolean isDelayedDeliveryDeliverAtTimeStrict) {
this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict);
}

public AbstractDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer,
long tickTimeMillis, Clock clock,
boolean isDelayedDeliveryDeliverAtTimeStrict) {
this.dispatcher = dispatcher;
this.timer = timer;
this.tickTimeMillis = tickTimeMillis;
this.clock = clock;
this.isDelayedDeliveryDeliverAtTimeStrict = isDelayedDeliveryDeliverAtTimeStrict;
}


/**
* When {@link #isDelayedDeliveryDeliverAtTimeStrict} is false, we allow for early delivery by as much as the
* {@link #tickTimeMillis} because it is a slight optimization to let messages skip going back into the delay
* tracker for a brief amount of time when we're already trying to dispatch to the consumer.
*
* When {@link #isDelayedDeliveryDeliverAtTimeStrict} is true, we use the current time to determine when messages
* can be delivered. As a consequence, there are two delays that will affect delivery. The first is the
* {@link #tickTimeMillis} and the second is the {@link Timer}'s granularity.
*
* @return the cutoff time to determine whether a message is ready to deliver to the consumer
*/
protected long getCutoffTime() {
return isDelayedDeliveryDeliverAtTimeStrict ? clock.millis() : clock.millis() + tickTimeMillis;
}

public void resetTickTime(long tickTime) {
if (this.tickTimeMillis != tickTime) {
this.tickTimeMillis = tickTime;
}
}

protected void updateTimer() {
if (getNumberOfDelayedMessages() == 0) {
if (timeout != null) {
currentTimeoutTarget = -1;
timeout.cancel();
timeout = null;
}
return;
}
long timestamp = nextDeliveryTime();
if (timestamp == currentTimeoutTarget) {
// The timer is already set to the correct target time
return;
}

if (timeout != null) {
timeout.cancel();
}

long now = clock.millis();
long delayMillis = timestamp - now;

if (delayMillis < 0) {
// There are messages that are already ready to be delivered. If
// the dispatcher is not getting them is because the consumer is
// either not connected or slow.
// We don't need to keep retriggering the timer. When the consumer
// catches up, the dispatcher will do the readMoreEntries() and
// get these messages
return;
}

// Compute the earliest time that we schedule the timer to run.
long remainingTickDelayMillis = lastTickRun + tickTimeMillis - now;
long calculatedDelayMillis = Math.max(delayMillis, remainingTickDelayMillis);

if (log.isDebugEnabled()) {
log.debug("[{}] Start timer in {} millis", dispatcher.getName(), calculatedDelayMillis);
}

// Even though we may delay longer than this timestamp because of the tick delay, we still track the
// current timeout with reference to the next message's timestamp.
currentTimeoutTarget = timestamp;
timeout = timer.newTimeout(this, calculatedDelayMillis, TimeUnit.MILLISECONDS);
}

@Override
public void run(Timeout timeout) throws Exception {
if (log.isDebugEnabled()) {
log.debug("[{}] Timer triggered", dispatcher.getName());
}
if (timeout == null || timeout.isCancelled()) {
return;
}

synchronized (dispatcher) {
lastTickRun = clock.millis();
currentTimeoutTarget = -1;
this.timeout = null;
dispatcher.readMoreEntries();
}
}

@Override
public void close() {
if (timeout != null) {
timeout.cancel();
timeout = null;
}
}

protected abstract long nextDeliveryTime();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,44 +19,21 @@
package org.apache.pulsar.broker.delayed;

import com.google.common.annotations.VisibleForTesting;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
import java.time.Clock;
import java.util.NavigableSet;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;

@Slf4j
public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, TimerTask {
public class InMemoryDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker {

protected final TripleLongPriorityQueue priorityQueue = new TripleLongPriorityQueue();

protected final PersistentDispatcherMultipleConsumers dispatcher;

// Reference to the shared (per-broker) timer for delayed delivery
private final Timer timer;

// Current timeout or null if not set
protected Timeout timeout;

// Timestamp at which the timeout is currently set
private long currentTimeoutTarget;

// Last time the TimerTask was triggered for this class
private long lastTickRun;

protected long tickTimeMillis;

protected final Clock clock;

private final boolean isDelayedDeliveryDeliverAtTimeStrict;

// If we detect that all messages have fixed delay time, such that the delivery is
// always going to be in FIFO order, then we can avoid pulling all the messages in
// tracker. Instead, we use the lookahead for detection and pause the read from
Expand Down Expand Up @@ -84,29 +61,10 @@ public InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers disp
long tickTimeMillis, Clock clock,
boolean isDelayedDeliveryDeliverAtTimeStrict,
long fixedDelayDetectionLookahead) {
this.dispatcher = dispatcher;
this.timer = timer;
this.tickTimeMillis = tickTimeMillis;
this.clock = clock;
this.isDelayedDeliveryDeliverAtTimeStrict = isDelayedDeliveryDeliverAtTimeStrict;
super(dispatcher, timer, tickTimeMillis, clock, isDelayedDeliveryDeliverAtTimeStrict);
this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
}

/**
* When {@link #isDelayedDeliveryDeliverAtTimeStrict} is false, we allow for early delivery by as much as the
* {@link #tickTimeMillis} because it is a slight optimization to let messages skip going back into the delay
* tracker for a brief amount of time when we're already trying to dispatch to the consumer.
*
* When {@link #isDelayedDeliveryDeliverAtTimeStrict} is true, we use the current time to determine when messages
* can be delivered. As a consequence, there are two delays that will affect delivery. The first is the
* {@link #tickTimeMillis} and the second is the {@link Timer}'s granularity.
*
* @return the cutoff time to determine whether a message is ready to deliver to the consumer
*/
protected long getCutoffTime() {
return isDelayedDeliveryDeliverAtTimeStrict ? clock.millis() : clock.millis() + tickTimeMillis;
}

@Override
public boolean addMessage(long ledgerId, long entryId, long deliverAt) {
if (deliverAt < 0 || deliverAt <= getCutoffTime()) {
Expand All @@ -119,7 +77,6 @@ public boolean addMessage(long ledgerId, long entryId, long deliverAt) {
deliverAt - clock.millis());
}


priorityQueue.add(deliverAt, ledgerId, entryId);
updateTimer();

Expand Down Expand Up @@ -189,14 +146,6 @@ public NavigableSet<PositionImpl> getScheduledMessages(int maxMessages) {
return positions;
}

@Override
public void resetTickTime(long tickTime) {

if (this.tickTimeMillis != tickTime) {
this.tickTimeMillis = tickTime;
}
}

@Override
public void clear() {
this.priorityQueue.clear();
Expand All @@ -212,87 +161,10 @@ public long getBufferMemoryUsage() {
return priorityQueue.bytesCapacity();
}

/**
* Update the scheduled timer task such that:
* 1. If there are no delayed messages, return and do not schedule a timer task.
* 2. If the next message in the queue has the same deliverAt time as the timer task, return and leave existing
* timer task in place.
* 3. If the deliverAt time for the next delayed message has already passed (i.e. the delay is negative), return
* without scheduling a timer task since the subscription is backlogged.
* 4. Else, schedule a timer task where the delay is the greater of these two: the next message's deliverAt time or
* the last tick time plus the tickTimeMillis (to ensure we do not schedule the task more frequently than the
* tickTimeMillis).
*/
protected void updateTimer() {
if (getNumberOfDelayedMessages() == 0) {
if (timeout != null) {
currentTimeoutTarget = -1;
timeout.cancel();
timeout = null;
}
return;
}
long timestamp = nextDeliveryTime();
if (timestamp == currentTimeoutTarget) {
// The timer is already set to the correct target time
return;
}

if (timeout != null) {
timeout.cancel();
}

long now = clock.millis();
long delayMillis = timestamp - now;

if (delayMillis < 0) {
// There are messages that are already ready to be delivered. If
// the dispatcher is not getting them is because the consumer is
// either not connected or slow.
// We don't need to keep retriggering the timer. When the consumer
// catches up, the dispatcher will do the readMoreEntries() and
// get these messages
return;
}

// Compute the earliest time that we schedule the timer to run.
long remainingTickDelayMillis = lastTickRun + tickTimeMillis - now;
long calculatedDelayMillis = Math.max(delayMillis, remainingTickDelayMillis);

if (log.isDebugEnabled()) {
log.debug("[{}] Start timer in {} millis", dispatcher.getName(), calculatedDelayMillis);
}

// Even though we may delay longer than this timestamp because of the tick delay, we still track the
// current timeout with reference to the next message's timestamp.
currentTimeoutTarget = timestamp;
timeout = timer.newTimeout(this, calculatedDelayMillis, TimeUnit.MILLISECONDS);
}

@Override
public void run(Timeout timeout) throws Exception {
if (log.isDebugEnabled()) {
log.debug("[{}] Timer triggered", dispatcher.getName());
}
if (timeout == null || timeout.isCancelled()) {
return;
}

synchronized (dispatcher) {
lastTickRun = clock.millis();
currentTimeoutTarget = -1;
this.timeout = null;
dispatcher.readMoreEntries();
}
}

@Override
public void close() {
super.close();
priorityQueue.close();
if (timeout != null) {
timeout.cancel();
timeout = null;
}
}

@Override
Expand All @@ -309,9 +181,6 @@ public boolean containsMessage(long ledgerId, long entryId) {
return false;
}

protected TripleLongPriorityQueue getPriorityQueue() {
return priorityQueue;
}
protected long nextDeliveryTime() {
return priorityQueue.peekN1();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
@Slf4j
@Data
@AllArgsConstructor
public abstract class Bucket {
abstract class Bucket {

static final String DELAYED_BUCKET_KEY_PREFIX = "#pulsar.internal.delayed.bucket";
static final String DELIMITER = "_";
Expand Down
Loading

0 comments on commit c925582

Please sign in to comment.