Skip to content

Commit

Permalink
Fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
komamitsu committed Nov 4, 2024
1 parent 0fde79e commit 35fbc63
Show file tree
Hide file tree
Showing 12 changed files with 262 additions and 282 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,84 +4,77 @@
import org.komamitsu.fluency.Fluency;
import org.komamitsu.fluency.buffer.Buffer;
import org.komamitsu.fluency.fluentd.ingester.sender.FluentdSender;
import org.komamitsu.fluency.fluentd.ingester.sender.MultiSender;
import org.komamitsu.fluency.fluentd.ingester.sender.RetryableSender;
import org.komamitsu.fluency.fluentd.ingester.sender.UnixSocketSender;
import org.komamitsu.fluency.fluentd.ingester.sender.failuredetect.FailureDetector;
import org.komamitsu.fluency.fluentd.ingester.sender.failuredetect.PhiAccrualFailureDetectStrategy;
import org.komamitsu.fluency.fluentd.ingester.sender.heartbeat.Heartbeater;
import org.komamitsu.fluency.fluentd.ingester.sender.heartbeat.SSLHeartbeater;
import org.komamitsu.fluency.fluentd.ingester.sender.heartbeat.UnixSocketHeartbeater;
import org.komamitsu.fluency.fluentd.ingester.sender.retry.ExponentialBackOffRetryStrategy;
import org.komamitsu.fluency.flusher.Flusher;

import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.stream.Stream;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.assertj.core.api.Assertions.assertThat;

class FluencyExtBuilderForFluentdTest
{
// These assertMethods are copied from FluencyBuilderForFluentdTest
private void assertBuffer(Buffer buffer)
{
assertThat(buffer.getMaxBufferSize(), is(512 * 1024 * 1024L));
assertThat(buffer.getFileBackupDir(), is(nullValue()));
assertThat(buffer.bufferFormatType(), is("packed_forward"));
assertThat(buffer.getChunkExpandRatio(), is(2f));
assertThat(buffer.getChunkRetentionSize(), is(4 * 1024 * 1024));
assertThat(buffer.getChunkInitialSize(), is(1 * 1024 * 1024));
assertThat(buffer.getChunkRetentionTimeMillis(), is(1000));
assertThat(buffer.getJvmHeapBufferMode(), is(false));
assertThat(buffer.getMaxBufferSize()).isEqualTo(512 * 1024 * 1024L);
assertThat(buffer.getFileBackupDir()).isNull();
assertThat(buffer.bufferFormatType()).isEqualTo("packed_forward");
assertThat(buffer.getChunkExpandRatio()).isEqualTo(2f);
assertThat(buffer.getChunkRetentionSize()).isEqualTo(4 * 1024 * 1024);
assertThat(buffer.getChunkInitialSize()).isEqualTo(1 * 1024 * 1024);
assertThat(buffer.getChunkRetentionTimeMillis()).isEqualTo(1000);
assertThat(buffer.getJvmHeapBufferMode()).isFalse();
}

private void assertFlusher(Flusher flusher)
{
assertThat(flusher.isTerminated(), is(false));
assertThat(flusher.getFlushAttemptIntervalMillis(), is(600));
assertThat(flusher.getWaitUntilBufferFlushed(), is(60));
assertThat(flusher.getWaitUntilTerminated(), is(60));
assertThat(flusher.isTerminated()).isFalse();
assertThat(flusher.getFlushAttemptIntervalMillis()).isEqualTo(600);
assertThat(flusher.getWaitUntilBufferFlushed()).isEqualTo(60);
assertThat(flusher.getWaitUntilTerminated()).isEqualTo(60);
}

private void assertDefaultRetryableSender(RetryableSender sender, Class<? extends UnixSocketSender> expectedBaseClass)
{
assertThat(sender.getRetryStrategy(), instanceOf(ExponentialBackOffRetryStrategy.class));
assertThat(sender.getRetryStrategy()).isInstanceOf(ExponentialBackOffRetryStrategy.class);
ExponentialBackOffRetryStrategy retryStrategy = (ExponentialBackOffRetryStrategy) sender.getRetryStrategy();
assertThat(retryStrategy.getMaxRetryCount(), is(7));
assertThat(retryStrategy.getBaseIntervalMillis(), is(400));
assertThat(sender.getBaseSender(), instanceOf(expectedBaseClass));
assertThat(retryStrategy.getMaxRetryCount()).isEqualTo(7);
assertThat(retryStrategy.getBaseIntervalMillis()).isEqualTo(400);
assertThat(sender.getBaseSender()).isInstanceOf(expectedBaseClass);
}

private void assertUnixSocketSender(UnixSocketSender sender, Path expectedPath, boolean shouldHaveFailureDetector)
{
assertThat(sender.getPath(), is(expectedPath));
assertThat(sender.getConnectionTimeoutMilli(), is(5000));
assertThat(sender.getReadTimeoutMilli(), is(5000));
assertThat(sender.getPath()).isEqualTo(expectedPath);
assertThat(sender.getConnectionTimeoutMilli()).isEqualTo(5000);
assertThat(sender.getReadTimeoutMilli()).isEqualTo(5000);

FailureDetector failureDetector = sender.getFailureDetector();
if (shouldHaveFailureDetector) {
assertThat(failureDetector.getFailureIntervalMillis(), is(3 * 1000));
assertThat(failureDetector.getFailureDetectStrategy(), instanceOf(PhiAccrualFailureDetectStrategy.class));
assertThat(failureDetector.getHeartbeater(), instanceOf(UnixSocketHeartbeater.class));
assertThat(failureDetector.getFailureIntervalMillis()).isEqualTo(3 * 1000);
assertThat(failureDetector.getFailureDetectStrategy()).isInstanceOf(PhiAccrualFailureDetectStrategy.class);
assertThat(failureDetector.getHeartbeater()).isInstanceOf(UnixSocketHeartbeater.class);
{
UnixSocketHeartbeater hb = (UnixSocketHeartbeater) failureDetector.getHeartbeater();
assertThat(hb.getPath(), is(expectedPath));
assertThat(hb.getPath()).isEqualTo(expectedPath);
}
assertThat(failureDetector.getHeartbeater().getIntervalMillis(), is(1000));
assertThat(failureDetector.getHeartbeater().getIntervalMillis()).isEqualTo(1000);
}
else {
assertThat(failureDetector, is(nullValue()));
assertThat(failureDetector).isNull();
}
}

private void assertDefaultFluentdSender(FluentdSender sender, Path expectedPath)
{
assertThat(sender, instanceOf(RetryableSender.class));
assertThat(sender).isInstanceOf(RetryableSender.class);
RetryableSender retryableSender = (RetryableSender) sender;
assertDefaultRetryableSender(retryableSender, UnixSocketSender.class);
assertUnixSocketSender((UnixSocketSender) retryableSender.getBaseSender(), expectedPath, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package org.komamitsu.fluency.fluentd.ingester.sender;

import org.hamcrest.Matcher;
import org.junit.jupiter.api.Test;
import org.komamitsu.fluency.fluentd.MockUnixSocketServer;
import org.komamitsu.fluency.fluentd.ingester.sender.failuredetect.FailureDetector;
Expand All @@ -35,10 +34,9 @@
import java.util.Arrays;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.*;

class UnixSocketSenderTest
Expand All @@ -51,35 +49,39 @@ void testSend()
throws Exception
{
testSendBase(socketPath -> {
UnixSocketSender.Config senderConfig = new UnixSocketSender.Config();
senderConfig.setPath(socketPath);
return new UnixSocketSender(senderConfig);
}, is(1), is(1));
UnixSocketSender.Config senderConfig = new UnixSocketSender.Config();
senderConfig.setPath(socketPath);
return new UnixSocketSender(senderConfig);
},
count -> assertThat(count).isEqualTo(1),
count -> assertThat(count).isEqualTo(1));
}

@Test
void testSendWithHeartbeart()
throws Exception
{
testSendBase(socketPath -> {
UnixSocketHeartbeater.Config hbConfig = new UnixSocketHeartbeater.Config();
hbConfig.setPath(socketPath);
hbConfig.setIntervalMillis(400);
UnixSocketHeartbeater.Config hbConfig = new UnixSocketHeartbeater.Config();
hbConfig.setPath(socketPath);
hbConfig.setIntervalMillis(400);

UnixSocketSender.Config senderConfig = new UnixSocketSender.Config();
senderConfig.setPath(socketPath);
UnixSocketSender.Config senderConfig = new UnixSocketSender.Config();
senderConfig.setPath(socketPath);

return new UnixSocketSender(senderConfig,
return new UnixSocketSender(senderConfig,
new FailureDetector(
new PhiAccrualFailureDetectStrategy(),
new UnixSocketHeartbeater(hbConfig)));
}, greaterThan(1), greaterThan(1));
new PhiAccrualFailureDetectStrategy(),
new UnixSocketHeartbeater(hbConfig)));
},
count -> assertThat(count).isGreaterThan(1),
count -> assertThat(count).isGreaterThan(1));
}

private void testSendBase(
SenderCreator senderCreator,
Matcher<? super Integer> connectCountMatcher,
Matcher<? super Integer> closeCountMatcher)
Consumer<Integer> connectCountAssertion,
Consumer<Integer> closeCountAssertion)
throws Exception
{
try (MockUnixSocketServer server = new MockUnixSocketServer()) {
Expand Down Expand Up @@ -135,9 +137,9 @@ private void testSendBase(
}
LOG.debug("recvCount={}", recvCount);

assertThat(connectCount, connectCountMatcher);
assertThat(recvLen, is((long) concurrency * reqNum * 10));
assertThat(closeCount, closeCountMatcher);
connectCountAssertion.accept(connectCount);
assertThat(recvLen).isEqualTo((long) concurrency * reqNum * 10);
closeCountAssertion.accept(closeCount);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.hamcrest.CoreMatchers;
import org.junit.jupiter.api.Test;
import org.komamitsu.fluency.EventTime;
import org.komamitsu.fluency.fluentd.ingester.FluentdIngester;
Expand All @@ -44,9 +43,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand Down Expand Up @@ -108,9 +105,9 @@ static class BufferTestHelper
void baseTestMessageBuffer(final int loopCount, final boolean multiTags, final boolean syncFlush, final boolean eventTime, final Buffer buffer)
throws IOException, InterruptedException
{
assertThat(buffer.getBufferUsage(), is(0f));
assertThat(buffer.getAllocatedSize(), is(0L));
assertThat(buffer.getBufferedDataSize(), is(0L));
assertThat(buffer.getBufferUsage()).isEqualTo(0f);
assertThat(buffer.getAllocatedSize()).isEqualTo(0L);
assertThat(buffer.getBufferedDataSize()).isEqualTo(0L);

final int concurrency = 4;
final CountDownLatch latch = new CountDownLatch(concurrency);
Expand Down Expand Up @@ -167,9 +164,9 @@ void baseTestMessageBuffer(final int loopCount, final boolean multiTags, final b
executorService.execute(emitTask);
}
assertTrue(latch.await(10, TimeUnit.SECONDS));
assertThat(buffer.getBufferUsage(), is(greaterThan(0f)));
assertThat(buffer.getAllocatedSize(), is(greaterThan(0L)));
assertThat(buffer.getBufferedDataSize(), is(greaterThan(0L)));
assertThat(buffer.getBufferUsage()).isGreaterThan(0f);
assertThat(buffer.getAllocatedSize()).isGreaterThan(0L);
assertThat(buffer.getBufferedDataSize()).isGreaterThan(0L);

buffer.flush(ingester, true);
buffer.close();
Expand All @@ -186,9 +183,9 @@ void baseTestMessageBuffer(final int loopCount, final boolean multiTags, final b
flushService.shutdownNow();
}
buffer.close(); // Just in case
assertThat(buffer.getBufferUsage(), is(0f));
assertThat(buffer.getAllocatedSize(), is(0L));
assertThat(buffer.getBufferedDataSize(), is(0L));
assertThat(buffer.getBufferUsage()).isEqualTo(0f);
assertThat(buffer.getAllocatedSize()).isEqualTo(0L);
assertThat(buffer.getBufferedDataSize()).isEqualTo(0L);

int totalLoopCount = concurrency * loopCount;

Expand Down Expand Up @@ -260,17 +257,17 @@ private void analyzeResult(String tag, ImmutableValue timestamp, Map<String, Obj
tagCounts.put(tag, count + 1);

if (eventTime) {
assertThat(timestamp.isExtensionValue(), is(true));
assertThat(timestamp.isExtensionValue()).isTrue();
ExtensionValue tsInEventTime = timestamp.asExtensionValue();
assertThat(tsInEventTime.getType(), CoreMatchers.is((byte) 0x00));
assertThat(tsInEventTime.getType()).isEqualTo((byte) 0x00);
ByteBuffer secondsAndNanoSeconds = ByteBuffer.wrap(tsInEventTime.getData());
int seconds = secondsAndNanoSeconds.getInt();
int nanoSeconds = secondsAndNanoSeconds.getInt();
assertTrue(start / 1000 <= seconds && seconds <= end / 1000);
assertThat(nanoSeconds, is(999999999));
assertThat(nanoSeconds).isEqualTo(999999999);
}
else {
assertThat(timestamp.isIntegerValue(), is(true));
assertThat(timestamp.isIntegerValue()).isTrue();
long tsInEpochMilli = timestamp.asIntegerValue().asLong();
assertTrue(start <= tsInEpochMilli && tsInEpochMilli <= end);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
import org.komamitsu.fluency.EventTime;
import org.msgpack.jackson.dataformat.MessagePackFactory;

import java.nio.ByteBuffer;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;

class EventTimeTest
Expand All @@ -37,28 +35,28 @@ void instantiation()
{
long now = System.currentTimeMillis();
EventTime eventTime = EventTime.fromEpoch(now / 1000);
assertThat(eventTime.getSeconds(), is(now / 1000));
assertThat(eventTime.getNanoseconds(), is(0L));
assertThat(eventTime.getSeconds()).isEqualTo(now / 1000);
assertThat(eventTime.getNanoseconds()).isEqualTo(0L);
}

{
long now = System.currentTimeMillis();
EventTime eventTime = EventTime.fromEpoch(now / 1000, 999999999L);
assertThat(eventTime.getSeconds(), is(now / 1000));
assertThat(eventTime.getNanoseconds(), is(999999999L));
assertThat(eventTime.getSeconds()).isEqualTo(now / 1000);
assertThat(eventTime.getNanoseconds()).isEqualTo(999999999L);
}

{
long now = System.currentTimeMillis();
EventTime eventTime = EventTime.fromEpochMilli(now);
assertThat(eventTime.getSeconds(), is(now / 1000));
assertThat(eventTime.getNanoseconds(), Matchers.is(now % 1000 * 1000000));
assertThat(eventTime.getSeconds()).isEqualTo(now / 1000);
assertThat(eventTime.getNanoseconds()).isEqualTo(now % 1000 * 1000000);
}

{
EventTime eventTime = EventTime.fromEpoch(0xFFFFFFFFL, 0xFFFFFFFFL);
assertThat(eventTime.getSeconds(), is(0xFFFFFFFFL));
assertThat(eventTime.getNanoseconds(), is(0xFFFFFFFFL));
assertThat(eventTime.getSeconds()).isEqualTo(0xFFFFFFFFL);
assertThat(eventTime.getNanoseconds()).isEqualTo(0xFFFFFFFFL);
}
}

Expand Down Expand Up @@ -88,29 +86,29 @@ void serialize()
ObjectMapper objectMapper = new ObjectMapper(new MessagePackFactory());
byte[] bytes = objectMapper.writeValueAsBytes(eventTime);
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
assertThat(byteBuffer.get(), is((byte) 0xD7));
assertThat(byteBuffer.get(), is((byte) 0x00));
assertThat(byteBuffer.getInt(), is((int) (now / 1000)));
assertThat(byteBuffer.getInt(), is(999999999));
assertThat(byteBuffer.get()).isEqualTo((byte) 0xD7);
assertThat(byteBuffer.get()).isEqualTo((byte) 0x00);
assertThat(byteBuffer.getInt()).isEqualTo((int) (now / 1000));
assertThat(byteBuffer.getInt()).isEqualTo(999999999);
}

{
EventTime eventTime = EventTime.fromEpoch(0xFFEEDDCCL, 0xFEDCBA98L);
ObjectMapper objectMapper = new ObjectMapper(new MessagePackFactory());
byte[] bytes = objectMapper.writeValueAsBytes(eventTime);
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
assertThat(byteBuffer.get(), is((byte) 0xD7));
assertThat(byteBuffer.get(), is((byte) 0x00));
assertThat(byteBuffer.get()).isEqualTo((byte) 0xD7);
assertThat(byteBuffer.get()).isEqualTo((byte) 0x00);

assertThat(byteBuffer.get(), is((byte) 0xFF));
assertThat(byteBuffer.get(), is((byte) 0xEE));
assertThat(byteBuffer.get(), is((byte) 0xDD));
assertThat(byteBuffer.get(), is((byte) 0xCC));
assertThat(byteBuffer.get()).isEqualTo((byte) 0xFF);
assertThat(byteBuffer.get()).isEqualTo((byte) 0xEE);
assertThat(byteBuffer.get()).isEqualTo((byte) 0xDD);
assertThat(byteBuffer.get()).isEqualTo((byte) 0xCC);

assertThat(byteBuffer.get(), is((byte) 0xFE));
assertThat(byteBuffer.get(), is((byte) 0xDC));
assertThat(byteBuffer.get(), is((byte) 0xBA));
assertThat(byteBuffer.get(), is((byte) 0x98));
assertThat(byteBuffer.get()).isEqualTo((byte) 0xFE);
assertThat(byteBuffer.get()).isEqualTo((byte) 0xDC);
assertThat(byteBuffer.get()).isEqualTo((byte) 0xBA);
assertThat(byteBuffer.get()).isEqualTo((byte) 0x98);
}
}
}
Loading

0 comments on commit 35fbc63

Please sign in to comment.