Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIP-617: Allow Kafka Streams State Stores to be iterated backwards #8976

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ public class SessionKey {

/**
* Create a new session key with the given key value and creation timestamp
* @param key the actual cryptographic key to use for request validation; may not be null
*
* @param key the actual cryptographic key to use for request validation; may not be null
* @param creationTimestamp the time at which the key was generated
*/
public SessionKey(SecretKey key, long creationTimestamp) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
*/
package org.apache.kafka.streams.processor.internals;

import java.time.Instant;
import java.util.List;

import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
Expand Down Expand Up @@ -90,11 +92,22 @@ public KeyValueIterator<K, V> range(final K from,
return wrapped().range(from, to);
}

@Override
public KeyValueIterator<K, V> reverseRange(final K from,
final K to) {
return wrapped().reverseRange(from, to);
}

@Override
public KeyValueIterator<K, V> all() {
return wrapped().all();
}

@Override
public KeyValueIterator<K, V> reverseAll() {
return wrapped().reverseAll();
}

@Override
public long approximateNumEntries() {
return wrapped().approximateNumEntries();
Expand Down Expand Up @@ -168,6 +181,13 @@ public WindowStoreIterator<V> fetch(final K key,
return wrapped().fetch(key, timeFrom, timeTo);
}

@Override
public WindowStoreIterator<V> backwardFetch(final K key,
final Instant timeFrom,
final Instant timeTo) {
return wrapped().backwardFetch(key, timeFrom, timeTo);
}

@Override
@Deprecated
public KeyValueIterator<Windowed<K>, V> fetch(final K from,
Expand All @@ -177,17 +197,36 @@ public KeyValueIterator<Windowed<K>, V> fetch(final K from,
return wrapped().fetch(from, to, timeFrom, timeTo);
}

@Override
public KeyValueIterator<Windowed<K>, V> backwardFetch(final K from,
final K to,
jeqo marked this conversation as resolved.
Show resolved Hide resolved
final Instant timeFrom,
final Instant timeTo) {
return wrapped().backwardFetch(from, to, timeFrom, timeTo);
}

@Override
public KeyValueIterator<Windowed<K>, V> all() {
return wrapped().all();
}

@Override
public KeyValueIterator<Windowed<K>, V> backwardAll() {
return wrapped().backwardAll();
}

@Override
@Deprecated
public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
final long timeTo) {
return wrapped().fetchAll(timeFrom, timeTo);
}

@Override
public KeyValueIterator<Windowed<K>, V> backwardFetchAll(final Instant timeFrom,
final Instant timeTo) {
return wrapped().backwardFetchAll(timeFrom, timeTo);
}
}

static class TimestampedWindowStoreReadOnlyDecorator<K, V>
Expand All @@ -214,6 +253,13 @@ public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key,
return wrapped().findSessions(key, earliestSessionEndTime, latestSessionStartTime);
}

@Override
public KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K key,
final long earliestSessionEndTime,
final long latestSessionStartTime) {
return wrapped().backwardFindSessions(key, earliestSessionEndTime, latestSessionStartTime);
}

@Override
public KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom,
final K keyTo,
Expand All @@ -222,6 +268,14 @@ public KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom,
return wrapped().findSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime);
}

@Override
public KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K keyFrom,
final K keyTo,
final long earliestSessionEndTime,
final long latestSessionStartTime) {
return wrapped().backwardFindSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime);
}

@Override
public void remove(final Windowed<K> sessionKey) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
Expand All @@ -243,10 +297,21 @@ public KeyValueIterator<Windowed<K>, AGG> fetch(final K key) {
return wrapped().fetch(key);
}

@Override
public KeyValueIterator<Windowed<K>, AGG> backwardFetch(final K key) {
return wrapped().backwardFetch(key);
}

@Override
public KeyValueIterator<Windowed<K>, AGG> fetch(final K from,
final K to) {
return wrapped().fetch(from, to);
}

@Override
public KeyValueIterator<Windowed<K>, AGG> backwardFetch(final K from,
final K to) {
return wrapped().backwardFetch(from, to);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public interface KeyValueStore<K, V> extends StateStore, ReadOnlyKeyValueStore<K
/**
* Update the value associated with this key.
*
* @param key The key to associate the value to
* @param key The key to associate the value to
* @param value The value to update, it can be {@code null};
* if the serialized bytes are also {@code null} it is interpreted as deletes
* @throws NullPointerException If {@code null} is used for key.
Expand All @@ -42,7 +42,7 @@ public interface KeyValueStore<K, V> extends StateStore, ReadOnlyKeyValueStore<K
/**
* Update the value associated with this key, unless a value is already associated with the key.
*
* @param key The key to associate the value to
* @param key The key to associate the value to
* @param value The value to update, it can be {@code null};
* if the serialized bytes are also {@code null} it is interpreted as deletes
* @return The old value or {@code null} if there is no such key.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
/**
* A key-value store that only supports read operations.
* Implementations should be thread-safe as concurrent reads and writes are expected.
*
* <p>
* Please note that this contract defines the thread-safe read functionality only; it does not
* guarantee anything about whether the actual instance is writable by another thread, or
* whether it uses some locking mechanism under the hood. For this reason, making dependencies
Expand All @@ -38,35 +38,68 @@ public interface ReadOnlyKeyValueStore<K, V> {
*
* @param key The key to fetch
* @return The value or null if no value is found.
* @throws NullPointerException If null is used for key.
* @throws NullPointerException If null is used for key.
* @throws InvalidStateStoreException if the store is not initialized
*/
V get(K key);

/**
* Get an iterator over a given range of keys. This iterator must be closed after use.
* The returned iterator must be safe from {@link java.util.ConcurrentModificationException}s
* and must not return null values. No ordering guarantees are provided.
* and must not return null values.
* Order is not guaranteed as bytes lexicographical ordering might not represent key order.
*
* @param from The first key that could be in the range
* @param to The last key that could be in the range
* @param to The last key that could be in the range
* @return The iterator for this range.
* @throws NullPointerException If null is used for from or to.
* @throws NullPointerException If null is used for from or to.
* @throws InvalidStateStoreException if the store is not initialized
*/
KeyValueIterator<K, V> range(K from, K to);

/**
* Get a reverse iterator over a given range of keys. This iterator must be closed after use.
* The returned iterator must be safe from {@link java.util.ConcurrentModificationException}s
* and must not return null values.
* Order is not guaranteed as bytes lexicographical ordering might not represent key order.
*
* @param from The last key that could be in the range
* @param to The first key that could be in the range
jeqo marked this conversation as resolved.
Show resolved Hide resolved
* @return The reverse iterator for this range.
* @throws NullPointerException If null is used for from or to.
* @throws InvalidStateStoreException if the store is not initialized
*/
default KeyValueIterator<K, V> reverseRange(K from, K to) {
throw new UnsupportedOperationException();
}

/**
* Return an iterator over all keys in this store. This iterator must be closed after use.
* The returned iterator must be safe from {@link java.util.ConcurrentModificationException}s
* and must not return null values. No ordering guarantees are provided.
* and must not return null values.
* Order is not guaranteed as bytes lexicographical ordering might not represent key order.
*
* @return An iterator of all key/value pairs in the store.
* @throws InvalidStateStoreException if the store is not initialized
*/
KeyValueIterator<K, V> all();

/**
* Return an approximate count of key-value mappings in this store.
* Return a reverse iterator over all keys in this store. This iterator must be closed after use.
* The returned iterator must be safe from {@link java.util.ConcurrentModificationException}s
* and must not return null values.
* Order is not guaranteed as bytes lexicographical ordering might not represent key order.
*
* @return An reverse iterator of all key/value pairs in the store.
* @throws InvalidStateStoreException if the store is not initialized
*/
default KeyValueIterator<K, V> reverseAll() {
throw new UnsupportedOperationException();
}

/**
* Return an approximate count of key-value mappings in this store.
* <p>
* The count is not guaranteed to be exact in order to accommodate stores
* where an exact count is expensive to calculate.
*
Expand Down
Loading