Skip to content

Commit

Permalink
refactor: simplify JCTools usage sticking to unpadded variants
Browse files Browse the repository at this point in the history
Native compilation requires Unsafe handling which is easy in, say, Quarkus.
  • Loading branch information
jponge committed Nov 17, 2023
1 parent bc846aa commit 40f5592
Show file tree
Hide file tree
Showing 7 changed files with 11 additions and 235 deletions.
2 changes: 1 addition & 1 deletion implementation/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<version>${smallrye-common-annotation.version}</version>
</dependency>
<dependency>
<groupId>org.jctools</groupId>
<groupId>io.github.jponge.jct</groupId>
<artifactId>jctools-core</artifactId>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,6 @@
import java.util.Queue;
import java.util.function.Supplier;

import org.jctools.queues.atomic.MpscAtomicArrayQueue;
import org.jctools.queues.atomic.MpscUnboundedAtomicArrayQueue;
import org.jctools.queues.atomic.SpscAtomicArrayQueue;
import org.jctools.queues.atomic.SpscChunkedAtomicArrayQueue;
import org.jctools.queues.atomic.SpscUnboundedAtomicArrayQueue;
import org.jctools.queues.unpadded.MpscUnboundedUnpaddedArrayQueue;
import org.jctools.queues.unpadded.MpscUnpaddedArrayQueue;
import org.jctools.queues.unpadded.SpscChunkedUnpaddedArrayQueue;
Expand All @@ -23,27 +18,15 @@ private Queues() {
}

public static <T> Queue<T> createSpscArrayQueue(int capacity) {
if (Infrastructure.useUnsafeForQueues()) {
return new SpscUnpaddedArrayQueue<>(capacity);
} else {
return new SpscAtomicArrayQueue<>(capacity);
}
return new SpscUnpaddedArrayQueue<>(capacity);
}

public static <T> Queue<T> createSpscUnboundedArrayQueue(int chunkSize) {
if (Infrastructure.useUnsafeForQueues()) {
return new SpscUnboundedUnpaddedArrayQueue<>(chunkSize);
} else {
return new SpscUnboundedAtomicArrayQueue<>(chunkSize);
}
return new SpscUnboundedUnpaddedArrayQueue<>(chunkSize);
}

public static <T> Queue<T> createSpscChunkedArrayQueue(int capacity) {
if (Infrastructure.useUnsafeForQueues()) {
return new SpscChunkedUnpaddedArrayQueue<>(capacity);
} else {
return new SpscChunkedAtomicArrayQueue<>(capacity);
}
return new SpscChunkedUnpaddedArrayQueue<>(capacity);
}

public static <T> Supplier<Queue<T>> getXsQueueSupplier() {
Expand Down Expand Up @@ -104,26 +87,18 @@ public static <T> Supplier<Queue<T>> unbounded(int chunkSize) {
* @return the queue
*/
public static <T> Queue<T> createMpscQueue() {
if (Infrastructure.useUnsafeForQueues()) {
return new MpscUnboundedUnpaddedArrayQueue<>(Infrastructure.getBufferSizeS());
} else {
return new MpscUnboundedAtomicArrayQueue<>(Infrastructure.getBufferSizeS());
}
return new MpscUnboundedUnpaddedArrayQueue<>(Infrastructure.getBufferSizeS());
}

/**
* Creates an unbounded single producer / single consumer queue.
*
* @param chunkSize the chunk size
* @return the queue
* @param <T> the item type
* @return the queue
*/
public static <T> Queue<T> createSpscUnboundedQueue(int chunkSize) {
if (Infrastructure.useUnsafeForQueues()) {
return new SpscUnboundedUnpaddedArrayQueue<>(chunkSize);
} else {
return new SpscUnboundedAtomicArrayQueue<>(chunkSize);
}
return new SpscUnboundedUnpaddedArrayQueue<>(chunkSize);
}

/**
Expand All @@ -134,10 +109,6 @@ public static <T> Queue<T> createSpscUnboundedQueue(int chunkSize) {
* @return a new queue
*/
public static <T> Queue<T> createMpscArrayQueue(int capacity) {
if (Infrastructure.useUnsafeForQueues()) {
return new MpscUnpaddedArrayQueue<>(capacity);
} else {
return new MpscAtomicArrayQueue<>(capacity);
}
return new MpscUnpaddedArrayQueue<>(capacity);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ public class Infrastructure {
private static int bufferSizeXs = 32;
private static int bufferSizeS = 256;

private static boolean useUnsafeForQueues = true;

public static void reload() {
clearInterceptors();
reloadUniInterceptors();
Expand All @@ -79,27 +77,6 @@ public static void reload() {
multiOverflowDefaultBufferSize = 128;
bufferSizeXs = 32;
bufferSizeS = 256;
useUnsafeForQueues = true;
}

/**
* Should JCTools queues use variants with {@code Unsafe}, or should they use atomic field updaters?
* Atomic field updates work across JVM and native images, while padded JCTools queues are better suited
* for JVM mode applications.
*
* @return {@code true} when {@code Unsafe} should be reasonably available, {@code false} otherwise
*/
public static boolean useUnsafeForQueues() {
return useUnsafeForQueues;
}

/**
* Change how JCTools queues should be created ({@code Unsafe} vs atomic field updaters).
*
* @param useUnsafeForQueues {@code true} when {@code Unsafe} should be reasonably available, {@code false} otherwise
*/
public static void setUseUnsafeForQueues(boolean useUnsafeForQueues) {
Infrastructure.useUnsafeForQueues = useUnsafeForQueues;
}

/**
Expand Down
2 changes: 1 addition & 1 deletion implementation/src/main/java/module-info.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
open module io.smallrye.mutiny {

requires transitive io.smallrye.common.annotation;
requires jctools.core;
requires org.jctools.core;

exports io.smallrye.mutiny;
exports io.smallrye.mutiny.converters.multi;
Expand Down
77 changes: 0 additions & 77 deletions native-tests/pom.xml

This file was deleted.

This file was deleted.

7 changes: 2 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,10 @@
<module>bom</module>
<module>math</module>
<module>workshop-examples</module>
<module>native-tests</module>
</modules>

<properties>
<jctools-core.version>4.0.1</jctools-core.version>
<jctools-core.version>4.0.2-RC2</jctools-core.version>
<mutiny-zero.version>1.0.0</mutiny-zero.version>
<reactive-streams.version>1.0.4</reactive-streams.version>
<rxjava3.version>3.1.8</rxjava3.version>
Expand Down Expand Up @@ -125,8 +124,6 @@
<find-and-replace-maven-plugin.version>1.1.0</find-and-replace-maven-plugin.version>
<jreleaser-maven-plugin.version>1.9.0</jreleaser-maven-plugin.version>
<cyclonedx-maven-plugin.version>2.7.10</cyclonedx-maven-plugin.version>
<junit-platform-native.version>0.9.27</junit-platform-native.version>
<native-maven-plugin.version>0.9.7.1</native-maven-plugin.version>
</properties>

<dependencyManagement>
Expand All @@ -137,7 +134,7 @@
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.jctools</groupId>
<groupId>io.github.jponge.jct</groupId>
<artifactId>jctools-core</artifactId>
<version>${jctools-core.version}</version>
</dependency>
Expand Down

0 comments on commit 40f5592

Please sign in to comment.