Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add multi-column support to UpdateBy RollingFormula() operator #6143

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
36 changes: 21 additions & 15 deletions Base/src/main/java/io/deephaven/base/ringbuffer/ByteRingBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
* {@code long} values. Head and tail will not wrap around; instead we use storage arrays sized to 2^N to allow fast
* determination of storage indices through a mask operation.
*/
public class ByteRingBuffer implements Serializable {
public class ByteRingBuffer implements RingBuffer, Serializable {
static final long FIXUP_THRESHOLD = 1L << 62;
final boolean growable;
byte[] storage;
Expand All @@ -32,7 +32,7 @@ public class ByteRingBuffer implements Serializable {
*
* @param capacity minimum capacity of the ring buffer
*/
public ByteRingBuffer(int capacity) {
public ByteRingBuffer(final int capacity) {
this(capacity, true);
}

Expand All @@ -42,7 +42,7 @@ public ByteRingBuffer(int capacity) {
* @param capacity minimum capacity of ring buffer
* @param growable whether to allow growth when the buffer is full.
*/
public ByteRingBuffer(int capacity, boolean growable) {
public ByteRingBuffer(final int capacity, final boolean growable) {
Assert.leq(capacity, "ByteRingBuffer capacity", MathUtil.MAX_POWER_OF_2);

this.growable = growable;
Expand All @@ -59,7 +59,7 @@ public ByteRingBuffer(int capacity, boolean growable) {
*
* @param increase Increase amount. The ring buffer's capacity will be increased by at least this amount.
*/
protected void grow(int increase) {
protected void grow(final int increase) {
final int size = size();
final long newCapacity = (long) storage.length + increase;
// assert that we are not asking for the impossible
Expand All @@ -83,7 +83,7 @@ protected void grow(int increase) {
*
* @param dest The destination buffer.
*/
protected void copyRingBufferToArray(byte[] dest) {
protected void copyRingBufferToArray(final byte[] dest) {
final int size = size();
final int storageHead = (int) (head & mask);

Expand All @@ -99,26 +99,32 @@ protected void copyRingBufferToArray(byte[] dest) {
System.arraycopy(storage, 0, dest, firstCopyLen, secondCopyLen);
}

@Override
public boolean isFull() {
return size() == storage.length;
}

@Override
public boolean isEmpty() {
return tail == head;
}

@Override
public int size() {
return Math.toIntExact(tail - head);
}

@Override
public int capacity() {
return storage.length;
}

@Override
public int remaining() {
return storage.length - size();
}

@Override
public void clear() {
tail = head = 0;
}
Expand All @@ -131,7 +137,7 @@ public void clear() {
* @throws UnsupportedOperationException when {@code growable} is {@code false} and buffer is full
* @return {@code true} if the byte was added successfully
*/
public boolean add(byte e) {
public boolean add(final byte e) {
if (isFull()) {
if (!growable) {
throw new UnsupportedOperationException("Ring buffer is full and growth is disabled");
Expand All @@ -151,7 +157,7 @@ public boolean add(byte e) {
* @param count the minimum number of empty entries in the buffer after this call
* @throws UnsupportedOperationException when {@code growable} is {@code false} and buffer is full
*/
public void ensureRemaining(int count) {
public void ensureRemaining(final int count) {
if (remaining() < count) {
if (!growable) {
throw new UnsupportedOperationException("Ring buffer is full and growth is disabled");
Expand All @@ -168,7 +174,7 @@ public void ensureRemaining(int count) {
*
* @param e the value to add to the buffer
*/
public void addUnsafe(byte e) {
public void addUnsafe(final byte e) {
// This is an extremely paranoid wrap check that in all likelihood will never run. With FIXUP_THRESHOLD at
// 1 << 62, and the user pushing 2^32 values per second(!), it will take 68 years to wrap this counter .
if (tail >= FIXUP_THRESHOLD) {
Expand All @@ -188,7 +194,7 @@ public void addUnsafe(byte e) {
* @param notFullResult value to return is the buffer is not full
* @return the overwritten entry if the buffer is full, the provided value otherwise
*/
public byte addOverwrite(byte e, byte notFullResult) {
public byte addOverwrite(final byte e, final byte notFullResult) {
byte val = notFullResult;
if (isFull()) {
val = remove();
Expand All @@ -204,7 +210,7 @@ public byte addOverwrite(byte e, byte notFullResult) {
* @param e the byte to be added to the buffer
* @return true if the value was added successfully, false otherwise
*/
public boolean offer(byte e) {
public boolean offer(final byte e) {
if (isFull()) {
return false;
}
Expand All @@ -218,7 +224,7 @@ public boolean offer(byte e) {
* @param count The number of elements to remove.
* @throws NoSuchElementException if the buffer is empty
*/
public byte[] remove(int count) {
public byte[] remove(final int count) {
final int size = size();
if (size < count) {
throw new NoSuchElementException();
Expand Down Expand Up @@ -264,7 +270,7 @@ public byte removeUnsafe() {
* @param onEmpty the value to return if the ring buffer is empty
* @return The removed element if the ring buffer was non-empty, otherwise the value of 'onEmpty'
*/
public byte poll(byte onEmpty) {
public byte poll(final byte onEmpty) {
if (isEmpty()) {
return onEmpty;
}
Expand All @@ -291,7 +297,7 @@ public byte element() {
* @param onEmpty the value to return if the ring buffer is empty
* @return The head element if the ring buffer is non-empty, otherwise the value of 'onEmpty'
*/
public byte peek(byte onEmpty) {
public byte peek(final byte onEmpty) {
if (isEmpty()) {
return onEmpty;
}
Expand All @@ -314,7 +320,7 @@ public byte front() {
* @throws NoSuchElementException if the buffer is empty
* @return The element at the specified offset
*/
public byte front(int offset) {
public byte front(final int offset) {
if (offset < 0 || offset >= size()) {
throw new NoSuchElementException();
}
Expand All @@ -341,7 +347,7 @@ public byte back() {
* @param onEmpty the value to return if the ring buffer is empty
* @return The tail element if the ring buffer is non-empty, otherwise the value of 'onEmpty'
*/
public byte peekBack(byte onEmpty) {
public byte peekBack(final byte onEmpty) {
if (isEmpty()) {
return onEmpty;
}
Expand Down
36 changes: 21 additions & 15 deletions Base/src/main/java/io/deephaven/base/ringbuffer/CharRingBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* {@code long} values. Head and tail will not wrap around; instead we use storage arrays sized to 2^N to allow fast
* determination of storage indices through a mask operation.
*/
public class CharRingBuffer implements Serializable {
public class CharRingBuffer implements RingBuffer, Serializable {
static final long FIXUP_THRESHOLD = 1L << 62;
final boolean growable;
char[] storage;
Expand All @@ -28,7 +28,7 @@ public class CharRingBuffer implements Serializable {
*
* @param capacity minimum capacity of the ring buffer
*/
public CharRingBuffer(int capacity) {
public CharRingBuffer(final int capacity) {
this(capacity, true);
}

Expand All @@ -38,7 +38,7 @@ public CharRingBuffer(int capacity) {
* @param capacity minimum capacity of ring buffer
* @param growable whether to allow growth when the buffer is full.
*/
public CharRingBuffer(int capacity, boolean growable) {
public CharRingBuffer(final int capacity, final boolean growable) {
Assert.leq(capacity, "CharRingBuffer capacity", MathUtil.MAX_POWER_OF_2);

this.growable = growable;
Expand All @@ -55,7 +55,7 @@ public CharRingBuffer(int capacity, boolean growable) {
*
* @param increase Increase amount. The ring buffer's capacity will be increased by at least this amount.
*/
protected void grow(int increase) {
protected void grow(final int increase) {
final int size = size();
final long newCapacity = (long) storage.length + increase;
// assert that we are not asking for the impossible
Expand All @@ -79,7 +79,7 @@ protected void grow(int increase) {
*
* @param dest The destination buffer.
*/
protected void copyRingBufferToArray(char[] dest) {
protected void copyRingBufferToArray(final char[] dest) {
final int size = size();
final int storageHead = (int) (head & mask);

Expand All @@ -95,26 +95,32 @@ protected void copyRingBufferToArray(char[] dest) {
System.arraycopy(storage, 0, dest, firstCopyLen, secondCopyLen);
}

@Override
public boolean isFull() {
return size() == storage.length;
}

@Override
public boolean isEmpty() {
return tail == head;
}

@Override
public int size() {
return Math.toIntExact(tail - head);
}

@Override
public int capacity() {
return storage.length;
}

@Override
public int remaining() {
return storage.length - size();
}

@Override
public void clear() {
tail = head = 0;
}
Expand All @@ -127,7 +133,7 @@ public void clear() {
* @throws UnsupportedOperationException when {@code growable} is {@code false} and buffer is full
* @return {@code true} if the char was added successfully
*/
public boolean add(char e) {
public boolean add(final char e) {
if (isFull()) {
if (!growable) {
throw new UnsupportedOperationException("Ring buffer is full and growth is disabled");
Expand All @@ -147,7 +153,7 @@ public boolean add(char e) {
* @param count the minimum number of empty entries in the buffer after this call
* @throws UnsupportedOperationException when {@code growable} is {@code false} and buffer is full
*/
public void ensureRemaining(int count) {
public void ensureRemaining(final int count) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More opportunities to add @Override to this code.

if (remaining() < count) {
if (!growable) {
throw new UnsupportedOperationException("Ring buffer is full and growth is disabled");
Expand All @@ -164,7 +170,7 @@ public void ensureRemaining(int count) {
*
* @param e the value to add to the buffer
*/
public void addUnsafe(char e) {
public void addUnsafe(final char e) {
// This is an extremely paranoid wrap check that in all likelihood will never run. With FIXUP_THRESHOLD at
// 1 << 62, and the user pushing 2^32 values per second(!), it will take 68 years to wrap this counter .
if (tail >= FIXUP_THRESHOLD) {
Expand All @@ -184,7 +190,7 @@ public void addUnsafe(char e) {
* @param notFullResult value to return is the buffer is not full
* @return the overwritten entry if the buffer is full, the provided value otherwise
*/
public char addOverwrite(char e, char notFullResult) {
public char addOverwrite(final char e, final char notFullResult) {
char val = notFullResult;
if (isFull()) {
val = remove();
Expand All @@ -200,7 +206,7 @@ public char addOverwrite(char e, char notFullResult) {
* @param e the char to be added to the buffer
* @return true if the value was added successfully, false otherwise
*/
public boolean offer(char e) {
public boolean offer(final char e) {
if (isFull()) {
return false;
}
Expand All @@ -214,7 +220,7 @@ public boolean offer(char e) {
* @param count The number of elements to remove.
* @throws NoSuchElementException if the buffer is empty
*/
public char[] remove(int count) {
public char[] remove(final int count) {
final int size = size();
if (size < count) {
throw new NoSuchElementException();
Expand Down Expand Up @@ -260,7 +266,7 @@ public char removeUnsafe() {
* @param onEmpty the value to return if the ring buffer is empty
* @return The removed element if the ring buffer was non-empty, otherwise the value of 'onEmpty'
*/
public char poll(char onEmpty) {
public char poll(final char onEmpty) {
if (isEmpty()) {
return onEmpty;
}
Expand All @@ -287,7 +293,7 @@ public char element() {
* @param onEmpty the value to return if the ring buffer is empty
* @return The head element if the ring buffer is non-empty, otherwise the value of 'onEmpty'
*/
public char peek(char onEmpty) {
public char peek(final char onEmpty) {
if (isEmpty()) {
return onEmpty;
}
Expand All @@ -310,7 +316,7 @@ public char front() {
* @throws NoSuchElementException if the buffer is empty
* @return The element at the specified offset
*/
public char front(int offset) {
public char front(final int offset) {
if (offset < 0 || offset >= size()) {
throw new NoSuchElementException();
}
Expand All @@ -337,7 +343,7 @@ public char back() {
* @param onEmpty the value to return if the ring buffer is empty
* @return The tail element if the ring buffer is non-empty, otherwise the value of 'onEmpty'
*/
public char peekBack(char onEmpty) {
public char peekBack(final char onEmpty) {
if (isEmpty()) {
return onEmpty;
}
Expand Down
Loading