Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RandomDestination at first & round-robin #195

Merged
merged 3 commits into from
Jun 3, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Collections;
import java.util.Formatter;
import java.util.List;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -113,7 +114,7 @@ public abstract class AbstractLogstashTcpSocketAppender<Event extends DeferredPr
* Index into {@link #destinations} of the "primary" destination.
*/
private static final int PRIMARY_DESTINATION_INDEX = 0;

/**
* The host to which to connect and send events
*/
Expand Down Expand Up @@ -252,7 +253,32 @@ public abstract class AbstractLogstashTcpSocketAppender<Event extends DeferredPr
* @see #isGetHostStringPossible()
*/
private final boolean getHostStringPossible = isGetHostStringPossible();


/**
* The first set destination is random.
*
* Defaults to disable. The first destination is considered the primary destination.
*/
private boolean randomDestination;

/**
* Object to create the first random index.
*/
private Random firstRandomIndexGenerator = new Random();

/**
* If the size is exceeded, round-robin occurs.
* Destination cycles by round-robin.
*
* When null (the default), disabled round-robin.
*/
private Integer roundRobinSize;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really understand why a size is needed. It seems like you would want to either round robbin, or not. Why would you want to not round robin for a while, and then start round robin?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you know, if you do round robin, a reconnection occurs.
When reconnecting each time, a big cost was introduced, and the "roundRobinSize" was introduced as a concept of buffer.

Copy link
Collaborator

@philsttr philsttr Mar 17, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohhhh, I thought it would just round-robin the next time a connection was needed (i.e. if a connection was broken). I didn't notice that your change forces a reconnect after every N log messages. Makes sense now. If this is the case, I think it would make more sense to use a time based round-robin, rather than message count based. That way sudden increases of load don't trigger a reconnection storm.

This is kind of similar to the secondaryConnectionTTL already implemented. However, it applies to any connection. So, maybe it would just be connectionTTL.


/**
* Count of round-robin
*/
private int roundRobinCount;

/**
* Event handler responsible for performing the TCP transmission.
*/
Expand Down Expand Up @@ -284,7 +310,7 @@ private class TcpSendingEventHandler implements EventHandler<LogEvent<Event>>, L
* This is a buffered wrapper of the socket output stream.
*/
private volatile OutputStream outputStream;

/**
* Time at which the last event was sent.
* Used to calculate if a keep alive message
Expand Down Expand Up @@ -333,7 +359,7 @@ private class TcpSendingEventHandler implements EventHandler<LogEvent<Event>>, L
*/
private class KeepAliveRunnable implements Runnable {

private int previousDestinationIndex = PRIMARY_DESTINATION_INDEX;
private int previousDestinationIndex = connectedDestinationIndex;

@Override
public void run() {
Expand All @@ -342,8 +368,8 @@ public void run() {
if (hasKeepAliveDurationElapsed(lastSent, currentTime)) {
/*
* Publish a keep alive message to the RingBuffer.
*
* A null event indicates that this is a keep alive message.
*
* A null event indicates that this is a keep alive message.
*/
getDisruptor().getRingBuffer().publishEvent(getEventTranslator(), null);
scheduleKeepAlive(currentTime);
Expand Down Expand Up @@ -439,6 +465,7 @@ public void onEvent(LogEvent<Event> logEvent, long sequence, boolean endOfBatch)
* Therefore, we need to send the event.
*/
encoder.doEncode(logEvent.event);
roundRobin();
} else if (hasKeepAliveDurationElapsed(lastSentTimestamp, currentTime)){
/*
* This is a keep alive event, and the keepAliveDuration has passed,
Expand Down Expand Up @@ -474,6 +501,21 @@ public void onEvent(LogEvent<Event> logEvent, long sequence, boolean endOfBatch)
}
}

/**
* If the condition is satisfied, round-robin occurs.
* The index is repeated in a circle.
*/
private void roundRobin() {
if (isRoundRobin() && (roundRobinCount++ >= roundRobinSize)) {
connectedDestinationIndex++;
if (connectedDestinationIndex >= destinations.size()) {
connectedDestinationIndex = PRIMARY_DESTINATION_INDEX;
}
roundRobinCount = 0;
reopenSocket();
}
}

private boolean hasKeepAliveDurationElapsed(long lastSent, long currentTime) {
return isKeepAliveEnabled()
&& lastSent + keepAliveDuration.getMilliseconds() < currentTime;
Expand All @@ -485,6 +527,9 @@ private boolean shouldCloseConnection(long currentTime) {

@Override
public void onStart() {
if (randomDestination) {
connectedDestinationIndex = firstRandomIndexGenerator.nextInt(destinations.size());
}
openSocket();
scheduleKeepAlive(System.currentTimeMillis());
}
Expand All @@ -509,7 +554,8 @@ private synchronized void reopenSocket() {
* then it should be able to be used to send.
*/
private synchronized void openSocket() {
int destinationIndex = 0;
int destinationIndex = randomDestination || isRoundRobin() ? connectedDestinationIndex : PRIMARY_DESTINATION_INDEX;
int retryCount = 0;
int errorCount = 0;
while (isStarted() && !Thread.currentThread().isInterrupted()) {
long startTime = System.currentTimeMillis();
Expand Down Expand Up @@ -563,24 +609,25 @@ private synchronized void openSocket() {
*/
updateCurrentThreadName();
}

this.readerFuture = scheduleReaderRunnable(
new ReaderRunnable(tempSocket.getInputStream()));

return;

} catch (Exception e) {

CloseUtil.closeQuietly(tempOutputStream);
CloseUtil.closeQuietly(tempSocket);

/*
* Retry immediately with next available host if any. Otherwise, sleep and retry with primary
*/
destinationIndex++;
if (destinationIndex >= destinations.size()) {
retryCount++;
if (destinationIndex >= destinations.size() && (retryCount >= destinations.size())) {
destinationIndex = PRIMARY_DESTINATION_INDEX;

/*
* If the connection timed out, then take the elapsed time into account
* when calculating time to sleep
Expand All @@ -604,6 +651,9 @@ private synchronized void openSocket() {
}
}
else {
if (destinationIndex >= destinations.size()) {
destinationIndex = PRIMARY_DESTINATION_INDEX;
}
/*
* Avoid spamming status messages by checking the MAX_REPEAT_CONNECTION_ERROR_LOG.
*/
Expand Down Expand Up @@ -677,7 +727,11 @@ private synchronized void unscheduleKeepAlive() {
}
}
}


private boolean isRoundRobin() {
return roundRobinSize != null;
}

/**
* An extension of logback's {@link ConfigurableSSLSocketFactory}
* that supports creating unconnected sockets
Expand Down Expand Up @@ -1132,4 +1186,34 @@ public void setThreadNameFormat(String threadNameFormat) {
super.setThreadNameFormat(threadNameFormat);
}

public boolean isRandomDestination() {
return randomDestination;
}

/**
* Sets random Destination. The first set destination is random.
*
* <p>
* @param randomDestination enable random Destination.
*/
public void setRandomDestination(boolean randomDestination) {
this.randomDestination = randomDestination;
}

public Integer getRoundRobinSize() {
return roundRobinSize;
}

/**
* If the size is exceeded, round-robin occurs.
* Destination cycles by round-robin.
*
* <p>
* When null (the default), disabled round-robin.
*
* @param roundRobinSize the number of round-robin incidents.
*/
public void setRoundRobinSize(Integer roundRobinSize) {
this.roundRobinSize = roundRobinSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ public class LogstashTcpSocketAppenderTest {

@Mock
private Future<?> readableRunnableFuture;

@Mock
private RandomWrapper randomWrapper;

private class TestableLogstashTcpSocketAppender extends LogstashTcpSocketAppender {
@Override
Expand Down Expand Up @@ -251,6 +254,35 @@ public void testReconnectToSecondaryOnOpen() throws Exception {
// 2) Second attempt on SECONDARY
inOrder.verify(socket).connect(host("localhost", 10001), anyInt());
}

@Test
public void testRandomDestinationAndReconnectToSecondaryOnOpen() throws Exception {
appender.addDestination("localhost:10000");
appender.addDestination("localhost:10001");
appender.setRandomDestination(true);

// Make it failed to connect to second destination
doThrow(SocketTimeoutException.class)
.when(socket).connect(host("localhost", 10001), anyInt());

// The first index is second destination.
when(randomWrapper.nextInt(appender.getDestinations().size())).thenReturn(1);

// Start the appender and verify it is actually started.
// It should try to connect to second destination by random destination, fail then retry on first destination.
appender.start();
verify(encoder).start();

// TWO connection attempts must have been made (without delay)
verify(socket, timeout(VERIFICATION_TIMEOUT).times(2)).connect(any(SocketAddress.class), anyInt());
InOrder inOrder = inOrder(socket);

// 1) First attempt on second destination: failure
inOrder.verify(socket).connect(host("localhost", 10001), anyInt());

// 2) Second attempt on first destination
inOrder.verify(socket).connect(host("localhost", 10000), anyInt());
}


/**
Expand Down Expand Up @@ -310,12 +342,12 @@ public void testReconnectToPrimaryWhileOnSecondary() throws Exception {
appender.addDestination("localhost:10000");
appender.addDestination("localhost:10001");
appender.setSecondaryConnectionTTL(Duration.buildByMilliseconds(100));

// Primary refuses first connection to force the appender to go on the secondary.
doThrow(SocketTimeoutException.class)
.doNothing()
.when(socket).connect(host("localhost", 10000), anyInt());


// Start the appender and verify it is actually started
// At this point, it should be connected to primary.
Expand Down Expand Up @@ -494,6 +526,32 @@ public void testDestination_MixedType() throws Exception {
appender.start();
Assert.assertFalse(appender.isStarted());
}

@Test
public void testRoundRobin() throws Exception {
appender.addDestination("localhost:10000");
appender.addDestination("localhost:10001");
appender.setRoundRobinSize(2);

appender.start();

verify(encoder).start();

appender.append(event1);
appender.append(event1);
appender.append(event1);

verify(socket, timeout(VERIFICATION_TIMEOUT).times(2)).connect(any(SocketAddress.class), anyInt());
InOrder inOrder = inOrder(socket);

// 1) connected to primary at startup
inOrder.verify(socket).connect(host("localhost", 10000), anyInt());

// 2) connected to next destination by round-robin
inOrder.verify(socket).connect(host("localhost", 10001), anyInt());


}

private SocketAddress host(final String host, final int port) {
return argThat(hasHostAndPort(host, port));
Expand Down
46 changes: 46 additions & 0 deletions src/test/java/net/logstash/logback/appender/RandomWrapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.logstash.logback.appender;

import java.util.Random;

/**
* RandomWrapper
*
* @author [email protected]
* @since 2016. 12. 22.
*/
public class RandomWrapper extends Random {
int seq = 0;
public RandomWrapper() {
super();
}

@Override
public int nextInt() {
return seq++;
}

@Override
public int nextInt(int bound) {
int result = seq;
if (result < bound) {
seq++;
} else {
result = 0;
seq = 1;
}
return result;
}
}