forked from ReactiveX/RxJava
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Issue#15 circular fifo bufer optimization (ReactiveX#29)
* Issue#15 New optimized implementation of CircularFifoBuffer
- Loading branch information
1 parent
b6f3379
commit 7d9ac6c
Showing
12 changed files
with
1,117 additions
and
119 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
103 changes: 103 additions & 0 deletions
103
src/jmh/java/io/github/robwin/circuitbreaker/CircularBufferBenchmark.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
/* | ||
* | ||
* Copyright 2016 Robert Winkler and Bohdan Storozhuk | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
* | ||
* | ||
*/ | ||
package io.github.robwin.circuitbreaker; | ||
|
||
import io.github.robwin.circularbuffer.CircularFifoBuffer; | ||
import io.github.robwin.circularbuffer.ConcurrentCircularFifoBuffer; | ||
import javaslang.collection.List; | ||
import javaslang.control.Option; | ||
import org.openjdk.jmh.annotations.Benchmark; | ||
import org.openjdk.jmh.annotations.BenchmarkMode; | ||
import org.openjdk.jmh.annotations.Fork; | ||
import org.openjdk.jmh.annotations.Group; | ||
import org.openjdk.jmh.annotations.GroupThreads; | ||
import org.openjdk.jmh.annotations.Measurement; | ||
import org.openjdk.jmh.annotations.Mode; | ||
import org.openjdk.jmh.annotations.OutputTimeUnit; | ||
import org.openjdk.jmh.annotations.Scope; | ||
import org.openjdk.jmh.annotations.Setup; | ||
import org.openjdk.jmh.annotations.State; | ||
import org.openjdk.jmh.annotations.Warmup; | ||
import org.openjdk.jmh.infra.Blackhole; | ||
|
||
import java.util.concurrent.TimeUnit; | ||
|
||
/** | ||
* @author bstorozhuk | ||
*/ | ||
@State(Scope.Benchmark) | ||
@OutputTimeUnit(TimeUnit.NANOSECONDS) | ||
@BenchmarkMode(Mode.AverageTime) | ||
public class CircularBufferBenchmark { | ||
public static final int FORK_COUNT = 2; | ||
private static final int WARMUP_COUNT = 10; | ||
private static final int ITERATION_COUNT = 10; | ||
private static final int CAPACITY = 10; | ||
private CircularFifoBuffer<Object> circularFifoBuffer; | ||
private Object event; | ||
|
||
@Setup | ||
public void setUp() { | ||
event = new Object(); | ||
circularFifoBuffer = new ConcurrentCircularFifoBuffer<>(CAPACITY); | ||
} | ||
|
||
@Benchmark | ||
@Warmup(iterations = WARMUP_COUNT) | ||
@Fork(value = FORK_COUNT) | ||
@Measurement(iterations = ITERATION_COUNT) | ||
@Group("circularBuffer") | ||
@GroupThreads(1) | ||
public void circularBufferAddEvent() { | ||
circularFifoBuffer.add(event); | ||
} | ||
|
||
@Benchmark | ||
@Warmup(iterations = WARMUP_COUNT) | ||
@Fork(value = FORK_COUNT) | ||
@Measurement(iterations = ITERATION_COUNT) | ||
@Group("circularBuffer") | ||
@GroupThreads(1) | ||
public void circularBufferToList(Blackhole bh) { | ||
List<Object> events = circularFifoBuffer.toList(); | ||
bh.consume(events); | ||
} | ||
|
||
@Benchmark | ||
@Warmup(iterations = WARMUP_COUNT) | ||
@Fork(value = FORK_COUNT) | ||
@Measurement(iterations = ITERATION_COUNT) | ||
@Group("circularBuffer") | ||
@GroupThreads(1) | ||
public void circularBufferSize(Blackhole bh) { | ||
int size = circularFifoBuffer.size(); | ||
bh.consume(size); | ||
} | ||
|
||
@Benchmark | ||
@Warmup(iterations = WARMUP_COUNT) | ||
@Fork(value = FORK_COUNT) | ||
@Measurement(iterations = ITERATION_COUNT) | ||
@Group("circularBuffer") | ||
@GroupThreads(1) | ||
public void circularBufferTakeEvent(Blackhole bh) { | ||
Option<Object> event = circularFifoBuffer.take(); | ||
bh.consume(event); | ||
} | ||
} |
100 changes: 0 additions & 100 deletions
100
src/main/java/io/github/robwin/circuitbreaker/internal/CircularFifoBuffer.java
This file was deleted.
Oops, something went wrong.
73 changes: 73 additions & 0 deletions
73
src/main/java/io/github/robwin/circularbuffer/CircularFifoBuffer.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
/* | ||
* | ||
* Copyright 2016 Robert Winkler and Bohdan Storozhuk | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
* | ||
* | ||
*/ | ||
package io.github.robwin.circularbuffer; | ||
|
||
import javaslang.collection.List; | ||
import javaslang.control.Option; | ||
|
||
/** | ||
* A {@link CircularFifoBuffer} is a first in first out buffer with a fixed size that replaces its oldest element if full. | ||
* {@link CircularFifoBuffer} does NOT accept null elements. | ||
**/ | ||
public interface CircularFifoBuffer<T> { | ||
/** | ||
* Returns the number of elements in this {@link CircularFifoBuffer}. | ||
* | ||
* @return the number of elements in this {@link CircularFifoBuffer} | ||
*/ | ||
int size(); | ||
|
||
/** | ||
* Returns <tt>true</tt> if this {@link CircularFifoBuffer} contains no elements. | ||
* | ||
* @return <tt>true</tt> if this {@link CircularFifoBuffer} contains no elements | ||
*/ | ||
boolean isEmpty(); | ||
|
||
/** | ||
* Returns <tt>true</tt> if this {@link CircularFifoBuffer} is full. | ||
* | ||
* @return <tt>true</tt> if this {@link CircularFifoBuffer} is full | ||
*/ | ||
boolean isFull(); | ||
|
||
/** | ||
* Returns a list containing all of the elements in this {@link CircularFifoBuffer}. | ||
* The elements are copied into an array. | ||
* | ||
* @return a list containing all of the elements in this {@link CircularFifoBuffer} | ||
*/ | ||
List<T> toList(); | ||
|
||
/** | ||
* Adds element to the {@link CircularFifoBuffer} | ||
* and overwrites the oldest element when {@link CircularFifoBuffer#isFull}. | ||
* | ||
* @throws NullPointerException if the specified element is null | ||
*/ | ||
void add(T element); | ||
|
||
/** | ||
* Retrieves and removes the head of this queue, | ||
* or returns {@link Option.None} if this queue is empty. | ||
* | ||
* @return the head of this queue, or {@link Option.None} if this queue is empty | ||
*/ | ||
Option<T> take(); | ||
} |
95 changes: 95 additions & 0 deletions
95
src/main/java/io/github/robwin/circularbuffer/ConcurrentCircularFifoBuffer.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
/* | ||
* | ||
* Copyright 2016 Robert Winkler and Bohdan Storozhuk | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
* | ||
* | ||
*/ | ||
package io.github.robwin.circularbuffer; | ||
|
||
import javaslang.collection.List; | ||
import javaslang.control.Option; | ||
|
||
import java.util.Arrays; | ||
|
||
/** | ||
* Thread safe implementation of {@link CircularFifoBuffer} on top of {@link ConcurrentEvictingQueue} | ||
**/ | ||
public class ConcurrentCircularFifoBuffer<T> implements CircularFifoBuffer<T> { | ||
|
||
private final ConcurrentEvictingQueue<T> queue; | ||
private final int capacity; | ||
|
||
/** | ||
* Creates an {@code ConcurrentCircularFifoBuffer} with the given (fixed) | ||
* capacity | ||
* | ||
* @param capacity the capacity of this {@code ConcurrentCircularFifoBuffer} | ||
* @throws IllegalArgumentException if {@code capacity < 1} | ||
*/ | ||
public ConcurrentCircularFifoBuffer(int capacity) { | ||
this.capacity = capacity; | ||
queue = new ConcurrentEvictingQueue<>(capacity); | ||
} | ||
|
||
/** | ||
* {@inheritDoc} | ||
*/ | ||
@Override | ||
public int size() { | ||
return queue.size(); | ||
} | ||
|
||
/** | ||
* {@inheritDoc} | ||
*/ | ||
@Override | ||
public boolean isEmpty() { | ||
return queue.isEmpty(); | ||
} | ||
|
||
/** | ||
* {@inheritDoc} | ||
*/ | ||
@Override | ||
public boolean isFull() { | ||
return queue.size() == capacity; | ||
} | ||
|
||
/** | ||
* {@inheritDoc} | ||
*/ | ||
@Override | ||
@SuppressWarnings("unchecked") | ||
public List<T> toList(){ | ||
T[] elementsArray = (T[]) queue.toArray(); | ||
return List.ofAll(Arrays.asList(elementsArray)); | ||
} | ||
|
||
/** | ||
* {@inheritDoc} | ||
*/ | ||
@Override | ||
public void add(T element) { | ||
queue.offer(element); | ||
} | ||
|
||
/** | ||
* {@inheritDoc} | ||
*/ | ||
@Override | ||
public Option<T> take() { | ||
return Option.of(queue.poll()); | ||
} | ||
} |
Oops, something went wrong.