Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pulsar Clients: Added a mandatory stop to the Backoff logic #747

Merged
merged 6 commits into from
Sep 13, 2017
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public abstract class AbstractReplicator {
protected static final ProducerConfiguration producerConfiguration = new ProducerConfiguration()
.setSendTimeout(0, TimeUnit.SECONDS).setBlockIfQueueFull(true);

protected final Backoff backOff = new Backoff(100, TimeUnit.MILLISECONDS, 1, TimeUnit.MINUTES);
protected final Backoff backOff = new Backoff(100, TimeUnit.MILLISECONDS, 1, TimeUnit.MINUTES, 0 ,TimeUnit.MILLISECONDS);

protected final String replicatorPrefix;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMu

private int totalAvailablePermits = 0;
private int readBatchSize;
private final Backoff readFailureBackoff = new Backoff(15, TimeUnit.SECONDS, 1, TimeUnit.MINUTES);
private final Backoff readFailureBackoff = new Backoff(15, TimeUnit.SECONDS, 1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
private static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers> TOTAL_UNACKED_MESSAGES_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class, "totalUnackedMessages");
private volatile int totalUnackedMessages = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp

private static final int MaxReadBatchSize = 100;
private int readBatchSize;
private final Backoff readFailureBackoff = new Backoff(15, TimeUnit.SECONDS, 1, TimeUnit.MINUTES);
private final Backoff readFailureBackoff = new Backoff(15, TimeUnit.SECONDS, 1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);

public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, SubType subscriptionType, int partitionIndex,
PersistentTopic topic) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat

private int messageTTLInSeconds = 0;

private final Backoff readFailureBackoff = new Backoff(1, TimeUnit.SECONDS, 1, TimeUnit.MINUTES);
private final Backoff readFailureBackoff = new Backoff(1, TimeUnit.SECONDS, 1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);

private PersistentMessageExpiryMonitor expiryMonitor;
// for connected subscriptions, message expiry will be checked if the backlog is greater than this threshold
Expand Down
1 change: 1 addition & 0 deletions pulsar-client-cpp/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ lib*.so*
/system-test/SystemTest

# IDE generated files
.csettings
.cproject
.project
.settings/
Expand Down
24 changes: 20 additions & 4 deletions pulsar-client-cpp/lib/Backoff.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,40 @@
* under the License.
*/
#include "Backoff.h"
#include <algorithm>

namespace pulsar {

Backoff::Backoff(const TimeDuration& initial, const TimeDuration& max)
Backoff::Backoff(const TimeDuration& initial, const TimeDuration& max, const TimeDuration& mandatoryStop)
: initial_(initial),
max_(max),
next_(initial) {
next_(initial),
mandatoryStopMade_(false),
mandatoryStop_(mandatoryStop),
seed_(time(NULL)) {
}

TimeDuration Backoff::next() {
TimeDuration current = next_;
next_ = std::min(next_ * 2, max_);
return current;

// Check for mandatory stop
if (initial_ == current) {
timeElapsedSinceDisconnection_ = boost::posix_time::milliseconds(0);
}
if (!mandatoryStopMade_ && timeElapsedSinceDisconnection_ + current > mandatoryStop_) {
current = std::max(initial_, mandatoryStop_ - timeElapsedSinceDisconnection_);
mandatoryStopMade_ = true;
}
timeElapsedSinceDisconnection_ += current;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that the exact current time might be slightly different than the "sleep" time, I think we should be better use an absolute time instead of timeElapsedSinceDisconnection_. That will be resistent to the multiple timeout being scheduled.


// Add Randomness
current = current - (current * (rand_r(&seed_) % 10) / 100);
return std::max(initial_, current);
}

void Backoff::reset() {
next_ = initial_;
mandatoryStopMade_ = false;
}

} //pulsar - namespace end
13 changes: 10 additions & 3 deletions pulsar-client-cpp/lib/Backoff.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
#ifndef _PULSAR_BACKOFF_HEADER_
#define _PULSAR_BACKOFF_HEADER_
#include <boost/date_time/posix_time/posix_time.hpp>
#include <stdlib.h> /* srand, rand */
#include <algorithm>
#include <time.h> /* time */

#pragma GCC visibility push(default)

Expand All @@ -28,13 +31,17 @@ typedef boost::posix_time::time_duration TimeDuration;

class Backoff {
public:
Backoff(const TimeDuration& intial, const TimeDuration& max);
Backoff(const TimeDuration&, const TimeDuration&, const TimeDuration&);
TimeDuration next();
void reset();
private:
TimeDuration initial_;
TimeDuration max_;
const TimeDuration initial_;
const TimeDuration max_;
TimeDuration next_;
TimeDuration mandatoryStop_;
TimeDuration timeElapsedSinceDisconnection_;
bool mandatoryStopMade_;
unsigned int seed_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to randomSeed_ ?

};
}

Expand Down
3 changes: 1 addition & 2 deletions pulsar-client-cpp/lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
#include "DestinationName.h"
#include <algorithm>

using namespace pulsar;
namespace pulsar {

DECLARE_LOG_OBJECT()
Expand All @@ -37,7 +36,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
const std::string& subscription, const ConsumerConfiguration& conf,
const ExecutorServicePtr listenerExecutor /* = NULL by default */,
const ConsumerTopicType consumerTopicType /* = NonPartitioned by default */ )
: HandlerBase(client, topic),
: HandlerBase(client, topic, Backoff(milliseconds(100), seconds(60), milliseconds(0))),
waitingForZeroQueueSizeMessage(false),
config_(conf),
subscription_(subscription),
Expand Down
4 changes: 2 additions & 2 deletions pulsar-client-cpp/lib/HandlerBase.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ DECLARE_LOG_OBJECT()

namespace pulsar {

HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic)
HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic, const Backoff& backoff)
: client_(client),
topic_(topic),
connection_(),
mutex_(),
creationTimestamp_(now()),
operationTimeut_(seconds(client->conf().getOperationTimeoutSeconds())),
state_(Pending),
backoff_(milliseconds(100), seconds(60)),
backoff_(backoff),
timer_(client->getIOExecutorProvider()->get()->createDeadlineTimer()) {
}

Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/HandlerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ typedef boost::shared_ptr<HandlerBase> HandlerBasePtr;
class HandlerBase {

public:
HandlerBase(const ClientImplPtr& client, const std::string& topic);
HandlerBase(const ClientImplPtr&, const std::string&, const Backoff&);

virtual ~HandlerBase();

Expand Down
11 changes: 7 additions & 4 deletions pulsar-client-cpp/lib/ProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
#include <boost/bind.hpp>
#include <boost/date_time/local_time/local_time.hpp>

using namespace pulsar;
namespace pulsar {
DECLARE_LOG_OBJECT()

Expand All @@ -48,9 +47,13 @@ OpSendMsg::OpSendMsg(uint64_t producerId, uint64_t sequenceId, const Message& ms
}

ProducerImpl::ProducerImpl(ClientImplPtr client, const std::string& topic,
const ProducerConfiguration& producerConfiguration)
: HandlerBase(client, topic),
conf_(producerConfiguration),
const ProducerConfiguration& conf)
: HandlerBase(
client,
topic,
Backoff(milliseconds(100), seconds(60),
milliseconds(std::max(100, conf.getSendTimeout() - 100)))),
conf_(conf),
executor_(client->getIOExecutorProvider()->get()),
pendingMessagesQueue_(conf_.getMaxPendingMessages()),
producerStr_("[" + topic_ + ", " + producerName_ + "] "),
Expand Down
85 changes: 79 additions & 6 deletions pulsar-client-cpp/tests/BackoffTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,91 @@
using namespace pulsar;
using boost::posix_time::milliseconds;
using boost::posix_time::seconds;

static bool withinTenPercent(const unsigned int& t1, const unsigned int& t2) {
return (t1 >= t2 * 0.9 && t1 <= t2);
}

TEST(BackoffTest, basicTest) {
Backoff backoff(milliseconds(5), seconds(60));
Backoff backoff(milliseconds(5), seconds(60), seconds(60));
ASSERT_EQ(backoff.next().total_milliseconds(), 5);
ASSERT_EQ(backoff.next().total_milliseconds(), 10);
ASSERT_TRUE(withinTenPercent(backoff.next().total_milliseconds(), 10));
backoff.reset();
ASSERT_EQ(backoff.next().total_milliseconds(), 5);
}

TEST(BackoffTest, maxTest) {
Backoff backoff(milliseconds(5), milliseconds(20));
Backoff backoff(milliseconds(5), milliseconds(20), milliseconds(20));
ASSERT_EQ(backoff.next().total_milliseconds(), 5);
ASSERT_EQ(backoff.next().total_milliseconds(), 10);
ASSERT_EQ(backoff.next().total_milliseconds(), 20);
ASSERT_EQ(backoff.next().total_milliseconds(), 20);
ASSERT_TRUE(withinTenPercent(backoff.next().total_milliseconds(), 10));
ASSERT_TRUE(withinTenPercent(backoff.next().total_milliseconds(), 5));
ASSERT_TRUE(withinTenPercent(backoff.next().total_milliseconds(), 20));
}

TEST(BackoffTest, mandatoryStopTest) {
Backoff backoff(milliseconds(100), seconds(60), milliseconds(1900));
ASSERT_EQ(backoff.next().total_milliseconds(), 100);
ASSERT_TRUE(withinTenPercent(backoff.next().total_milliseconds(), 200));
ASSERT_TRUE(withinTenPercent(backoff.next().total_milliseconds(), 400));
ASSERT_TRUE(withinTenPercent(backoff.next().total_milliseconds(), 800));
// would have been 1600 w/o the mandatory stop
ASSERT_TRUE(withinTenPercent(backoff.next().total_milliseconds(), 400));
ASSERT_TRUE(withinTenPercent(backoff.next().total_milliseconds(), 3200));
ASSERT_TRUE(withinTenPercent(backoff.next().total_milliseconds(), 6400));
ASSERT_TRUE(withinTenPercent(backoff.next().total_milliseconds(), 12800));
ASSERT_TRUE(withinTenPercent(backoff.next().total_milliseconds(), 25600));
ASSERT_TRUE(withinTenPercent(backoff.next().total_milliseconds(), 51200));
ASSERT_TRUE(withinTenPercent(backoff.next().total_milliseconds(), 60000));
ASSERT_TRUE(withinTenPercent(backoff.next().total_milliseconds(), 60000));

backoff.reset();
ASSERT_EQ(backoff.next().total_milliseconds(), 100);
ASSERT_TRUE(withinTenPercent(backoff.next().total_milliseconds(), 200));
ASSERT_TRUE(withinTenPercent(backoff.next().total_milliseconds(), 400));
ASSERT_TRUE(withinTenPercent(backoff.next().total_milliseconds(), 800));
// would have been 1600 w/o the mandatory stop
ASSERT_TRUE(withinTenPercent(backoff.next().total_milliseconds(), 400));

backoff.reset();
ASSERT_EQ(backoff.next().total_milliseconds(), 100);
ASSERT_TRUE(withinTenPercent(backoff.next().total_milliseconds(), 200));
ASSERT_TRUE(withinTenPercent(backoff.next().total_milliseconds(), 400));
ASSERT_TRUE(withinTenPercent(backoff.next().total_milliseconds(), 800));

backoff.reset();
ASSERT_EQ(backoff.next().total_milliseconds(), 100);
ASSERT_TRUE(withinTenPercent(backoff.next().total_milliseconds(), 200));
ASSERT_TRUE(withinTenPercent(backoff.next().total_milliseconds(), 400));
ASSERT_TRUE(withinTenPercent(backoff.next().total_milliseconds(), 800));
}

TEST(BackoffTest, ignoringMandatoryStopTest) {
Backoff backoff(milliseconds(100), seconds(60), milliseconds(0));
ASSERT_EQ(backoff.next().total_milliseconds(), 100);
ASSERT_TRUE(withinTenPercent(backoff.next().total_milliseconds(), 200));
ASSERT_TRUE(withinTenPercent(backoff.next().total_milliseconds(), 400));
ASSERT_TRUE(withinTenPercent(backoff.next().total_milliseconds(), 800));
ASSERT_TRUE(withinTenPercent(backoff.next().total_milliseconds(), 1600));
ASSERT_TRUE(withinTenPercent(backoff.next().total_milliseconds(), 3200));
ASSERT_TRUE(withinTenPercent(backoff.next().total_milliseconds(), 6400));
ASSERT_TRUE(withinTenPercent(backoff.next().total_milliseconds(), 12800));
ASSERT_TRUE(withinTenPercent(backoff.next().total_milliseconds(), 25600));
ASSERT_TRUE(withinTenPercent(backoff.next().total_milliseconds(), 51200));
ASSERT_TRUE(withinTenPercent(backoff.next().total_milliseconds(), 60000));
ASSERT_TRUE(withinTenPercent(backoff.next().total_milliseconds(), 60000));

backoff.reset();
ASSERT_EQ(backoff.next().total_milliseconds(), 100);
ASSERT_TRUE(withinTenPercent(backoff.next().total_milliseconds(), 200));
ASSERT_TRUE(withinTenPercent(backoff.next().total_milliseconds(), 400));
ASSERT_TRUE(withinTenPercent(backoff.next().total_milliseconds(), 800));
ASSERT_TRUE(withinTenPercent(backoff.next().total_milliseconds(), 1600));
ASSERT_TRUE(withinTenPercent(backoff.next().total_milliseconds(), 3200));
ASSERT_TRUE(withinTenPercent(backoff.next().total_milliseconds(), 6400));
ASSERT_TRUE(withinTenPercent(backoff.next().total_milliseconds(), 12800));
ASSERT_TRUE(withinTenPercent(backoff.next().total_milliseconds(), 25600));
ASSERT_TRUE(withinTenPercent(backoff.next().total_milliseconds(), 51200));
ASSERT_TRUE(withinTenPercent(backoff.next().total_milliseconds(), 60000));
ASSERT_TRUE(withinTenPercent(backoff.next().total_milliseconds(), 60000));
}

Original file line number Diff line number Diff line change
Expand Up @@ -21,30 +21,50 @@
import java.util.Random;
import java.util.concurrent.TimeUnit;

// All variables are in TimeUnit millis by default
public class Backoff {
private static final long DEFAULT_INTERVAL_IN_NANOSECONDS = TimeUnit.MILLISECONDS.toNanos(100);
private static final long MAX_BACKOFF_INTERVAL_NANOSECONDS = TimeUnit.SECONDS.toNanos(30);
private final long initial;
private final long max;
private long next;
private long mandatoryStop;
public long timeElapsedSinceDisconnection;
private boolean mandatoryStopMade = false;

private static final Random random = new Random();

public Backoff(long initial, TimeUnit unitInitial, long max, TimeUnit unitMax) {
public Backoff(long initial, TimeUnit unitInitial, long max, TimeUnit unitMax, long mandatoryStop,
TimeUnit unitMandatoryStop) {
this.initial = unitInitial.toMillis(initial);
this.max = unitMax.toMillis(max);
this.next = this.initial;
this.mandatoryStop = unitMandatoryStop.toMillis(mandatoryStop);
}

public long next() {
long current = this.next;
if (current < max) {
this.next = Math.min(this.next * 2, this.max);
}

if (initial == current) {
timeElapsedSinceDisconnection = 0;
}

// Randomly increase the timeout up to 25% to avoid simultaneous retries
current += random.nextInt((int) current / 4);
return current;
if (!mandatoryStopMade && timeElapsedSinceDisconnection + current > mandatoryStop) {
current = Math.max(initial, mandatoryStop - timeElapsedSinceDisconnection);
mandatoryStopMade = true;
}
// increment timeElapsedSinceDisconnection
timeElapsedSinceDisconnection += current;

// Randomly decrease the timeout up to 10% to avoid simultaneous retries
// If current < 10 then current/10 < 1 and we get an exception from Random saying "Bound must be positive"
if (current > 10) {
current -= random.nextInt((int) current / 10);
}
return Math.max(initial, current);
}

public void reduceToHalf() {
Expand All @@ -55,6 +75,7 @@ public void reduceToHalf() {

public void reset() {
this.next = this.initial;
this.mandatoryStopMade = false;
}

public static boolean shouldBackoff(long initialTimestamp, TimeUnit unitInitial, int failedAttempts) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ enum ConsumerType {

protected ConsumerBase(PulsarClientImpl client, String topic, String subscription, ConsumerConfiguration conf,
int receiverQueueSize, ExecutorService listenerExecutor, CompletableFuture<Consumer> subscribeFuture) {
super(client, topic);
super(client, topic, new Backoff(100, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS, 0 , TimeUnit.MILLISECONDS));
this.maxReceiverQueueSize = receiverQueueSize;
this.subscription = subscription;
this.conf = conf;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ enum State {
Failed // Handler is failed
};

public HandlerBase(PulsarClientImpl client, String topic) {
public HandlerBase(PulsarClientImpl client, String topic, Backoff backoff) {
this.client = client;
this.topic = topic;
this.backoff = new Backoff(100, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS);
this.backoff = backoff;
STATE_UPDATER.set(this, State.Uninitialized);
CLIENT_CNX_UPDATER.set(this, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageBuilder;
Expand All @@ -35,7 +36,7 @@ public abstract class ProducerBase extends HandlerBase implements Producer {

protected ProducerBase(PulsarClientImpl client, String topic, ProducerConfiguration conf,
CompletableFuture<Producer> producerCreatedFuture) {
super(client, topic);
super(client, topic, new Backoff(100, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS, Math.max(100, conf.getSendTimeoutMs() - 100), TimeUnit.MILLISECONDS));
this.producerCreatedFuture = producerCreatedFuture;
this.conf = conf;
}
Expand Down
Loading