Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Overflow of Batch Events in Case of Send Failure #195

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@
<version>2.22.2</version>
<configuration>
<includes>
<include>**/HttpEventCollectorUnitTest.class</include>
<include>**/com.splunk.logging.HttpEventCollectorUnitTest.class</include>
</includes>
</configuration>
</plugin>
Expand All @@ -148,7 +148,7 @@
<version>2.22.2</version>
<configuration>
<includes>
<include>**/HttpLoggerStressTest.class</include>
<include>**/com.splunk.logging.HttpLoggerStressTest.class</include>
</includes>
</configuration>
</plugin>
Expand Down Expand Up @@ -180,6 +180,18 @@
<version>4.13.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>3.11.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<version>1.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
package com.splunk.logging;

/**
* @copyright
*
* Copyright 2013-2015 Splunk, Inc.
*
* @copyright Copyright 2013-2015 Splunk, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"): you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
Expand All @@ -36,6 +34,35 @@
*/
public class HttpEventCollectorErrorHandler {


/**
* This exception is passed to error callback when Splunk fails to flush events
*/
public static class FlushException extends Exception {
private int numMsg;
private String errorText;

/**
* Create an exception with number of messages that couldn't be flushed properly
*
* @param numMsg number of messages
*/
public FlushException(final int numMsg) {
this.errorText = "There was an exception flushing [" + numMsg + "] events!";
}

@Override
public String getMessage() {
return String.valueOf(numMsg);
}


@Override
public String toString() {
return getMessage();
}
}

/**
* This exception is passed to error callback when Splunk server replies an error
*/
Expand Down Expand Up @@ -88,7 +115,8 @@ public String getMessage() {
}


@Override public String toString() {
@Override
public String toString() {
return getReply();
}
}
Expand Down
55 changes: 45 additions & 10 deletions src/main/java/com/splunk/logging/HttpEventCollectorSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.io.Serializable;
import java.security.cert.CertificateException;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;


Expand All @@ -43,9 +44,9 @@ public class HttpEventCollectorSender extends TimerTask implements HttpEventColl
private static final String HttpRawCollectorUriPath = "/services/collector/raw";
private static final String JsonHttpContentType = "application/json; profile=urn:splunk:event:1.0; charset=utf-8";
private static final String PlainTextHttpContentType = "plain/text; charset=utf-8";
private static final String SendModeSequential = "sequential";
private static final String SendModeSParallel = "parallel";
private TimeoutSettings timeoutSettings = new TimeoutSettings();
private static final int MaxFlushRetries = 5;
private static final AtomicInteger FlushRetries = new AtomicInteger(0); // allow flushing up to 5 times until messages in buffer are cleared.
private static final Gson gson = new GsonBuilder()
.registerTypeAdapter(HttpEventCollectorEventInfo.class, new EventInfoTypeAdapter())
.create();
Expand All @@ -62,7 +63,7 @@ public enum SendMode
{
Sequential,
Parallel
};
}

/**
* Recommended default values for events batching.
Expand All @@ -71,6 +72,12 @@ public enum SendMode
public static final int DefaultBatchSize = 10 * 1024; // 10KB
public static final int DefaultBatchCount = 10; // 10 events

/**
* Send modes to choose from
*/
public static final String SendModeSequential = "sequential";
public static final String SendModeSParallel = "parallel";

private HttpUrl url;
private String token;
private String channel;
Expand Down Expand Up @@ -176,11 +183,15 @@ public synchronized void send(
final String exception_message,
Serializable marker
) {
// create event info container and add it to the batch
HttpEventCollectorEventInfo eventInfo =
new HttpEventCollectorEventInfo(timeMsSinceEpoch, severity, message, logger_name, thread_name, properties, exception_message, marker);
eventsBatch.add(eventInfo);
eventsBatchSize += severity.length() + message.length();
// check whether message or severity are null
if (message != null && severity != null) {
// create event info container and add it to the batch
HttpEventCollectorEventInfo eventInfo =
new HttpEventCollectorEventInfo(timeMsSinceEpoch, severity, message, logger_name, thread_name, properties, exception_message, marker);
eventsBatch.add(eventInfo);
eventsBatchSize += severity.length() + message.length();
}
// flush anyway on message since last flush could have caused exception
if (eventsBatch.size() >= maxEventsBatchCount || eventsBatchSize > maxEventsBatchSize) {
flush();
}
Expand All @@ -199,7 +210,23 @@ public synchronized void send(final String message) {
*/
public synchronized void flush() {
if (eventsBatch.size() > 0) {
postEventsAsync(eventsBatch);
try {
postEventsAsync(eventsBatch);
}
catch (Exception e) {
// log error, update max retries
HttpEventCollectorErrorHandler.error(
eventsBatch,
new HttpEventCollectorErrorHandler.FlushException(eventsBatch.size()));
if(FlushRetries.incrementAndGet() < MaxFlushRetries) {
// do _not_ clear events list in this case since error could be network connection or some other fault
return;
}
else {
// max retries reached reset counter
FlushRetries.set(0);
}
}
}
// Clear the batch. A new list should be created because events are
// sending asynchronously and "previous" instance of eventsBatch object
Expand Down Expand Up @@ -321,7 +348,7 @@ public boolean verify(String hostname, SSLSession session) {
httpClient = builder.build();
}

private void postEventsAsync(final List<HttpEventCollectorEventInfo> events) {
protected void postEventsAsync(final List<HttpEventCollectorEventInfo> events) {
this.middleware.postEvents(events, this, new HttpEventCollectorMiddleware.IHttpSenderCallback() {

@Override
Expand Down Expand Up @@ -388,6 +415,14 @@ public void onFailure(Call call, IOException ex) {
});
}

public static int getMaxFlushRetries() {
return MaxFlushRetries;
}

protected int getCurrentEventsBatchSize() {
return eventsBatch.size();
}

public static class TimeoutSettings {
public static final long DEFAULT_CONNECT_TIMEOUT = 30000;
public static final long DEFAULT_WRITE_TIMEOUT = 0; // 0 means no timeout
Expand Down
62 changes: 62 additions & 0 deletions src/test/java/com/splunk/logging/HttpEventCollectorSenderTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.splunk.logging;

import org.hamcrest.Matchers;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;

import static org.hamcrest.MatcherAssert.assertThat;

public class HttpEventCollectorSenderTest {


final int maxFlushRetries = HttpEventCollectorSender.getMaxFlushRetries();
HttpEventCollectorSender httpEventCollectorSender;
final List<Exception> exceptionsThatOccured = new LinkedList<>();

@Before
public void setupHttpSender() {
httpEventCollectorSender = new HttpEventCollectorSender("", "", "", "", 0L, 3, 20000, HttpEventCollectorSender.SendModeSequential, new HashMap<>(), null);

httpEventCollectorSender = Mockito.spy(httpEventCollectorSender);
Mockito.doAnswer(invocationOnMock -> {
throw new NullPointerException("something bad happened");
}).when(httpEventCollectorSender).postEventsAsync(Mockito.anyList());

HttpEventCollectorErrorHandler.onError((data, ex) -> exceptionsThatOccured.add(ex));
}


@Test
public void testFlushRetries() {
httpEventCollectorSender.send("some random message");
httpEventCollectorSender.send("some random message");
Mockito.verify(httpEventCollectorSender, Mockito.times(0)).flush();
httpEventCollectorSender.send("some random message");
Mockito.verify(httpEventCollectorSender, Mockito.times(1)).flush();

assertThat(httpEventCollectorSender.getCurrentEventsBatchSize(), Matchers.is(3));
assertThat(exceptionsThatOccured, Matchers.hasSize(1));

// generate more until retry limit is hit
for (int i = 0; i < maxFlushRetries - 1; i++) {
httpEventCollectorSender.send("some random message");
}

assertThat(exceptionsThatOccured, Matchers.hasSize(5));
assertThat(httpEventCollectorSender.getCurrentEventsBatchSize(), Matchers.is(0));

Mockito.doNothing().when(httpEventCollectorSender).postEventsAsync(Mockito.anyList());

for (int i = 0; i < 500; i++) {
httpEventCollectorSender.send("some random message");
}

assertThat(exceptionsThatOccured, Matchers.hasSize(5));

}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
package com.splunk.logging; /**
* @copyright
*
* Copyright 2013-2015 Splunk, Inc.
Expand All @@ -17,21 +17,14 @@
*/

import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;

import com.splunk.logging.HttpEventCollectorErrorHandler;
import com.splunk.logging.HttpEventCollectorEventInfo;
import org.junit.Assert;
import org.junit.Test;
import sun.rmi.runtime.Log;

import java.io.ByteArrayInputStream;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.logging.LogManager;
import java.util.logging.Logger;

public class HttpEventCollectorUnitTest {
@Test
Expand All @@ -40,7 +33,7 @@ public void log4j_simple() throws Exception {
String loggerName = "splunk.log4jSimple";
userInputs.put("user_logger_name", loggerName);
userInputs.put("user_httpEventCollector_token", "11111111-2222-3333-4444-555555555555");
userInputs.put("user_middleware", "HttpEventCollectorUnitTestMiddleware");
userInputs.put("user_middleware", "com.splunk.logging.HttpEventCollectorUnitTestMiddleware");
userInputs.put("user_batch_size_count", "1");
userInputs.put("user_batch_size_bytes", "0");
userInputs.put("user_eventBodySerializer", "DoesNotExistButShouldNotCrashTest");
Expand Down Expand Up @@ -72,7 +65,7 @@ public void logback_simple() throws Exception {
final String loggerName = "splunk.logback";
userInputs.put("user_logger_name", loggerName);
userInputs.put("user_httpEventCollector_token", "11111111-2222-3333-4444-555555555555");
userInputs.put("user_middleware", "HttpEventCollectorUnitTestMiddleware");
userInputs.put("user_middleware", "com.splunk.logging.HttpEventCollectorUnitTestMiddleware");
userInputs.put("user_eventBodySerializer", "DoesNotExistButShouldNotCrashTest");
userInputs.put("user_eventHeaderSerializer", "DoesNotExistButShouldNotCrashTest");
TestUtil.resetLogbackConfiguration("logback_template.xml", "logback.xml", userInputs);
Expand Down Expand Up @@ -105,7 +98,7 @@ public void java_util_logger_simple() {
"com.splunk.logging.HttpEventCollectorLoggingHandler.batch_size_count=0\n" +
"com.splunk.logging.HttpEventCollectorLoggingHandler.batch_size_bytes=0\n" +
"com.splunk.logging.HttpEventCollectorLoggingHandler.batch_interval=0\n" +
"com.splunk.logging.HttpEventCollectorLoggingHandler.middleware=HttpEventCollectorUnitTestMiddleware\n" +
"com.splunk.logging.HttpEventCollectorLoggingHandler.middleware=com.splunk.logging.HttpEventCollectorUnitTestMiddleware\n" +
"com.splunk.logging.HttpEventCollectorLoggingHandler.eventBodySerializer=DoesNotExistButShouldNotCrashTest\n" +
"com.splunk.logging.HttpEventCollectorLoggingHandler.eventHeaderSerializer=DoesNotExistButShouldNotCrashTest\n"
);
Expand Down Expand Up @@ -136,7 +129,7 @@ public void java_util_logger_error_handler() {
"com.splunk.logging.HttpEventCollectorLoggingHandler.batch_size_count=0\n" +
"com.splunk.logging.HttpEventCollectorLoggingHandler.batch_size_bytes=0\n" +
"com.splunk.logging.HttpEventCollectorLoggingHandler.batch_interval=0\n" +
"com.splunk.logging.HttpEventCollectorLoggingHandler.middleware=HttpEventCollectorUnitTestMiddleware\n"
"com.splunk.logging.HttpEventCollectorLoggingHandler.middleware=com.splunk.logging.HttpEventCollectorUnitTestMiddleware\n"
);

// mimic server 404
Expand Down Expand Up @@ -168,7 +161,7 @@ public void java_util_logger_resend() {
"handlers=com.splunk.logging.HttpEventCollectorLoggingHandler\n" +
"com.splunk.logging.HttpEventCollectorLoggingHandler.url=http://localhost:8088\n" +
"com.splunk.logging.HttpEventCollectorLoggingHandler.token=TOKEN\n" +
"com.splunk.logging.HttpEventCollectorLoggingHandler.middleware=HttpEventCollectorUnitTestMiddleware\n" +
"com.splunk.logging.HttpEventCollectorLoggingHandler.middleware=com.splunk.logging.HttpEventCollectorUnitTestMiddleware\n" +
"com.splunk.logging.HttpEventCollectorLoggingHandler.batch_size_count=0\n" +
"com.splunk.logging.HttpEventCollectorLoggingHandler.batch_size_bytes=0\n" +
"com.splunk.logging.HttpEventCollectorLoggingHandler.batch_interval=0\n" +
Expand Down Expand Up @@ -211,7 +204,7 @@ public void java_util_logger_resend_max_retries() {
"com.splunk.logging.HttpEventCollectorLoggingHandler.batch_size_count=0\n" +
"com.splunk.logging.HttpEventCollectorLoggingHandler.batch_size_bytes=0\n" +
"com.splunk.logging.HttpEventCollectorLoggingHandler.batch_interval=0\n" +
"com.splunk.logging.HttpEventCollectorLoggingHandler.middleware=HttpEventCollectorUnitTestMiddleware\n" +
"com.splunk.logging.HttpEventCollectorLoggingHandler.middleware=com.splunk.logging.HttpEventCollectorUnitTestMiddleware\n" +
"com.splunk.logging.HttpEventCollectorLoggingHandler.retries_on_error=2\n"
);

Expand Down Expand Up @@ -246,7 +239,7 @@ public void java_util_logger_batching() {
"handlers=com.splunk.logging.HttpEventCollectorLoggingHandler\n" +
"com.splunk.logging.HttpEventCollectorLoggingHandler.url=http://localhost:8088\n" +
"com.splunk.logging.HttpEventCollectorLoggingHandler.token=TOKEN\n" +
"com.splunk.logging.HttpEventCollectorLoggingHandler.middleware=HttpEventCollectorUnitTestMiddleware\n" +
"com.splunk.logging.HttpEventCollectorLoggingHandler.middleware=com.splunk.logging.HttpEventCollectorUnitTestMiddleware\n" +
"com.splunk.logging.HttpEventCollectorLoggingHandler.batch_size_count=3\n"
);

Expand All @@ -273,7 +266,7 @@ public void java_util_logger_batching_default_count() {
"handlers=com.splunk.logging.HttpEventCollectorLoggingHandler\n" +
"com.splunk.logging.HttpEventCollectorLoggingHandler.url=http://localhost:8088\n" +
"com.splunk.logging.HttpEventCollectorLoggingHandler.token=TOKEN\n" +
"com.splunk.logging.HttpEventCollectorLoggingHandler.middleware=HttpEventCollectorUnitTestMiddleware\n"
"com.splunk.logging.HttpEventCollectorLoggingHandler.middleware=com.splunk.logging.HttpEventCollectorUnitTestMiddleware\n"
);
final int DefaultBatchCount = 10;
HttpEventCollectorUnitTestMiddleware.eventsReceived = 0;
Expand All @@ -296,7 +289,7 @@ public void java_util_logger_batching_default_size() {
"handlers=com.splunk.logging.HttpEventCollectorLoggingHandler\n" +
"com.splunk.logging.HttpEventCollectorLoggingHandler.url=http://localhost:8088\n" +
"com.splunk.logging.HttpEventCollectorLoggingHandler.token=TOKEN\n" +
"com.splunk.logging.HttpEventCollectorLoggingHandler.middleware=HttpEventCollectorUnitTestMiddleware\n"
"com.splunk.logging.HttpEventCollectorLoggingHandler.middleware=com.splunk.logging.HttpEventCollectorUnitTestMiddleware\n"
);
final int DefaultBatchSize = 10 * 1024;
HttpEventCollectorUnitTestMiddleware.eventsReceived = 0;
Expand All @@ -319,7 +312,7 @@ public void java_util_logger_batching_default_interval() {
"handlers=com.splunk.logging.HttpEventCollectorLoggingHandler\n" +
"com.splunk.logging.HttpEventCollectorLoggingHandler.url=http://localhost:8088\n" +
"com.splunk.logging.HttpEventCollectorLoggingHandler.token=TOKEN\n" +
"com.splunk.logging.HttpEventCollectorLoggingHandler.middleware=HttpEventCollectorUnitTestMiddleware\n"
"com.splunk.logging.HttpEventCollectorLoggingHandler.middleware=com.splunk.logging.HttpEventCollectorUnitTestMiddleware\n"
);
final int DefaultInterval = 10000;
HttpEventCollectorUnitTestMiddleware.eventsReceived = 0;
Expand Down
Loading