Skip to content

Commit

Permalink
KAFKA-9929: Support backward iterator on WindowStore (#9138)
Browse files Browse the repository at this point in the history
Implements KIP-617 on WindowStore that depends on #9137.

Testing strategy: extend existing tests to validate reverse operations are supported.

Reviewers: A. Sophie Blee-Goldman <[email protected]>, Guozhang Wang <[email protected]>
  • Loading branch information
jeqo authored Sep 2, 2020
1 parent 4662ed4 commit 4f06d9e
Show file tree
Hide file tree
Showing 48 changed files with 2,367 additions and 673 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals;

import java.util.List;

import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
Expand All @@ -32,6 +30,8 @@
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.WrappedStateStore;

import java.util.List;

abstract class AbstractReadOnlyDecorator<T extends StateStore, K, V> extends WrappedStateStore<T, K, V> {

static final String ERROR_MESSAGE = "Global store is read only";
Expand Down Expand Up @@ -180,26 +180,52 @@ public WindowStoreIterator<V> fetch(final K key,
return wrapped().fetch(key, timeFrom, timeTo);
}

@Override
public WindowStoreIterator<V> backwardFetch(final K key,
final long timeFrom,
final long timeTo) {
return wrapped().backwardFetch(key, timeFrom, timeTo);
}

@Override
@Deprecated
public KeyValueIterator<Windowed<K>, V> fetch(final K from,
final K to,
public KeyValueIterator<Windowed<K>, V> fetch(final K keyFrom,
final K keyTo,
final long timeFrom,
final long timeTo) {
return wrapped().fetch(from, to, timeFrom, timeTo);
return wrapped().fetch(keyFrom, keyTo, timeFrom, timeTo);
}

@Override
public KeyValueIterator<Windowed<K>, V> backwardFetch(final K keyFrom,
final K keyTo,
final long timeFrom,
final long timeTo) {
return wrapped().backwardFetch(keyFrom, keyTo, timeFrom, timeTo);
}

@Override
public KeyValueIterator<Windowed<K>, V> all() {
return wrapped().all();
}

@Override
public KeyValueIterator<Windowed<K>, V> backwardAll() {
return wrapped().backwardAll();
}

@Override
@Deprecated
public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
final long timeTo) {
return wrapped().fetchAll(timeFrom, timeTo);
}

@Override
public KeyValueIterator<Windowed<K>, V> backwardFetchAll(final long timeFrom,
final long timeTo) {
return wrapped().backwardFetchAll(timeFrom, timeTo);
}
}

static class TimestampedWindowStoreReadOnlyDecorator<K, V>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,28 @@ public WindowStoreIterator<V> fetch(final K key,
return wrapped().fetch(key, timeFrom, timeTo);
}

@Override
public WindowStoreIterator<V> backwardFetch(final K key,
final long timeFrom,
final long timeTo) {
return wrapped().backwardFetch(key, timeFrom, timeTo);
}

@SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed
@Override
public KeyValueIterator<Windowed<K>, V> fetch(final K from,
final K to,
public KeyValueIterator<Windowed<K>, V> fetch(final K keyFrom,
final K keyTo,
final long timeFrom,
final long timeTo) {
return wrapped().fetch(from, to, timeFrom, timeTo);
return wrapped().fetch(keyFrom, keyTo, timeFrom, timeTo);
}

@Override
public KeyValueIterator<Windowed<K>, V> backwardFetch(final K keyFrom,
final K keyTo,
final long timeFrom,
final long timeTo) {
return wrapped().backwardFetch(keyFrom, keyTo, timeFrom, timeTo);
}

@SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed
Expand All @@ -189,10 +204,21 @@ public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
return wrapped().fetchAll(timeFrom, timeTo);
}

@Override
public KeyValueIterator<Windowed<K>, V> backwardFetchAll(final long timeFrom,
final long timeTo) {
return wrapped().backwardFetchAll(timeFrom, timeTo);
}

@Override
public KeyValueIterator<Windowed<K>, V> all() {
return wrapped().all();
}

@Override
public KeyValueIterator<Windowed<K>, V> backwardAll() {
return wrapped().backwardAll();
}
}

static class TimestampedWindowStoreReadWriteDecorator<K, V>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ public interface ReadOnlyWindowStore<K, V> {
/**
* Get the value of key from a window.
*
* @param key the key to fetch
* @param time start timestamp (inclusive) of the window
* @param key the key to fetch
* @param time start timestamp (inclusive) of the window
* @return The value or {@code null} if no value is found in the window
* @throws InvalidStateStoreException if the store is not initialized
* @throws NullPointerException If {@code null} is used for any key.
* @throws NullPointerException if {@code null} is used for any key.
*/
V fetch(K key, long time);

Expand Down Expand Up @@ -67,12 +67,12 @@ public interface ReadOnlyWindowStore<K, V> {
* For each key, the iterator guarantees ordering of windows, starting from the oldest/earliest
* available window to the newest/latest window.
*
* @param key the key to fetch
* @param timeFrom time range start (inclusive)
* @param timeTo time range end (inclusive)
* @return an iterator over key-value pairs {@code <timestamp, value>}
* @param key the key to fetch
* @param timeFrom time range start (inclusive), where iteration starts.
* @param timeTo time range end (inclusive), where iteration ends.
* @return an iterator over key-value pairs {@code <timestamp, value>}, from beginning to end of time.
* @throws InvalidStateStoreException if the store is not initialized
* @throws NullPointerException If {@code null} is used for key.
* @throws NullPointerException if {@code null} is used for key.
* @deprecated Use {@link #fetch(Object, Instant, Instant)} instead
*/
@Deprecated
Expand Down Expand Up @@ -104,66 +104,134 @@ public interface ReadOnlyWindowStore<K, V> {
* For each key, the iterator guarantees ordering of windows, starting from the oldest/earliest
* available window to the newest/latest window.
*
* @param key the key to fetch
* @param from time range start (inclusive)
* @param to time range end (inclusive)
* @return an iterator over key-value pairs {@code <timestamp, value>}
* @param key the key to fetch
* @param timeFrom time range start (inclusive), where iteration starts.
* @param timeTo time range end (inclusive), where iteration ends.
* @return an iterator over key-value pairs {@code <timestamp, value>}, from beginning to end of time.
* @throws InvalidStateStoreException if the store is not initialized
* @throws NullPointerException If {@code null} is used for key.
* @throws IllegalArgumentException if duration is negative or can't be represented as {@code long milliseconds}
* @throws NullPointerException if {@code null} is used for key.
* @throws IllegalArgumentException if duration is negative or can't be represented as {@code long milliseconds}
*/
WindowStoreIterator<V> fetch(K key, Instant from, Instant to) throws IllegalArgumentException;
WindowStoreIterator<V> fetch(K key, Instant timeFrom, Instant timeTo) throws IllegalArgumentException;

/**
* Get all the key-value pairs with the given key and the time range from all the existing windows
* in backward order with respect to time (from end to beginning of time).
* <p>
* This iterator must be closed after use.
* <p>
* The time range is inclusive and applies to the starting timestamp of the window.
* For example, if we have the following windows:
* <pre>
* +-------------------------------+
* | key | start time | end time |
* +-------+------------+----------+
* | A | 10 | 20 |
* +-------+------------+----------+
* | A | 15 | 25 |
* +-------+------------+----------+
* | A | 20 | 30 |
* +-------+------------+----------+
* | A | 25 | 35 |
* +--------------------------------
* </pre>
* And we call {@code store.backwardFetch("A", Instant.ofEpochMilli(10), Instant.ofEpochMilli(20))} then the
* results will contain the first three windows from the table above in backward order,
* i.e., all those where 10 &lt;= start time &lt;= 20.
* <p>
* For each key, the iterator guarantees ordering of windows, starting from the newest/latest
* available window to the oldest/earliest window.
*
* @param key the key to fetch
* @param timeFrom time range start (inclusive), where iteration ends.
* @param timeTo time range end (inclusive), where iteration starts.
* @return an iterator over key-value pairs {@code <timestamp, value>}, from end to beginning of time.
* @throws InvalidStateStoreException if the store is not initialized
* @throws NullPointerException if {@code null} is used for key.
* @throws IllegalArgumentException if duration is negative or can't be represented as {@code long milliseconds}
*/
WindowStoreIterator<V> backwardFetch(K key, Instant timeFrom, Instant timeTo) throws IllegalArgumentException;

/**
* Get all the key-value pairs in the given key range and time range from all the existing windows.
* <p>
* This iterator must be closed after use.
*
* @param from the first key in the range
* @param to the last key in the range
* @param timeFrom time range start (inclusive)
* @param timeTo time range end (inclusive)
* @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}
* @param keyFrom the first key in the range
* @param keyTo the last key in the range
* @param timeFrom time range start (inclusive), where iteration starts.
* @param timeTo time range end (inclusive), where iteration ends.
* @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}, from beginning to end of time.
* @throws InvalidStateStoreException if the store is not initialized
* @throws NullPointerException If {@code null} is used for any key.
* @throws NullPointerException if {@code null} is used for any key.
* @deprecated Use {@link #fetch(Object, Object, Instant, Instant)} instead
*/
@Deprecated
KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo);
KeyValueIterator<Windowed<K>, V> fetch(K keyFrom, K keyTo, long timeFrom, long timeTo);

/**
* Get all the key-value pairs in the given key range and time range from all the existing windows.
* <p>
* This iterator must be closed after use.
*
* @param from the first key in the range
* @param to the last key in the range
* @param fromTime time range start (inclusive)
* @param toTime time range end (inclusive)
* @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}
* @param keyFrom the first key in the range
* @param keyTo the last key in the range
* @param timeFrom time range start (inclusive), where iteration starts.
* @param timeTo time range end (inclusive), where iteration ends.
* @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}, from beginning to end of time.
* @throws InvalidStateStoreException if the store is not initialized
* @throws NullPointerException If {@code null} is used for any key.
* @throws IllegalArgumentException if duration is negative or can't be represented as {@code long milliseconds}
* @throws NullPointerException if {@code null} is used for any key.
* @throws IllegalArgumentException if duration is negative or can't be represented as {@code long milliseconds}
*/
KeyValueIterator<Windowed<K>, V> fetch(K from, K to, Instant fromTime, Instant toTime)
KeyValueIterator<Windowed<K>, V> fetch(K keyFrom, K keyTo, Instant timeFrom, Instant timeTo)
throws IllegalArgumentException;

/**
* Gets all the key-value pairs in the existing windows.
*
* @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}
* @throws InvalidStateStoreException if the store is not initialized
*/
* Get all the key-value pairs in the given key range and time range from all the existing windows
* in backward order with respect to time (from end to beginning of time).
* <p>
* This iterator must be closed after use.
*
* @param keyFrom the first key in the range
* @param keyTo the last key in the range
* @param timeFrom time range start (inclusive), where iteration ends.
* @param timeTo time range end (inclusive), where iteration starts.
* @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}, from end to beginning of time.
* @throws InvalidStateStoreException if the store is not initialized
* @throws NullPointerException if {@code null} is used for any key.
* @throws IllegalArgumentException if duration is negative or can't be represented as {@code long milliseconds}
*/
KeyValueIterator<Windowed<K>, V> backwardFetch(K keyFrom, K keyTo, Instant timeFrom, Instant timeTo)
throws IllegalArgumentException;


/**
* Gets all the key-value pairs in the existing windows.
*
* @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}, from beginning to end of time.
* @throws InvalidStateStoreException if the store is not initialized
*/
KeyValueIterator<Windowed<K>, V> all();


/**
* Gets all the key-value pairs in the existing windows in backward order
* with respect to time (from end to beginning of time).
*
* @return an backward iterator over windowed key-value pairs {@code <Windowed<K>, value>}, from the end to beginning of time.
* @throws InvalidStateStoreException if the store is not initialized
*/
default KeyValueIterator<Windowed<K>, V> backwardAll() {
throw new UnsupportedOperationException();
}

/**
* Gets all the key-value pairs that belong to the windows within in the given time range.
*
* @param timeFrom the beginning of the time slot from which to search (inclusive)
* @param timeTo the end of the time slot from which to search (inclusive)
* @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}
* @param timeFrom the beginning of the time slot from which to search (inclusive), where iteration starts.
* @param timeTo the end of the time slot from which to search (inclusive), where iteration ends.
* @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}, from beginning to end of time.
* @throws InvalidStateStoreException if the store is not initialized
* @throws NullPointerException if {@code null} is used for any key
* @throws NullPointerException if {@code null} is used for any key
* @deprecated Use {@link #fetchAll(Instant, Instant)} instead
*/
@Deprecated
Expand All @@ -172,12 +240,25 @@ KeyValueIterator<Windowed<K>, V> fetch(K from, K to, Instant fromTime, Instant t
/**
* Gets all the key-value pairs that belong to the windows within in the given time range.
*
* @param from the beginning of the time slot from which to search (inclusive)
* @param to the end of the time slot from which to search (inclusive)
* @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}
* @param timeFrom the beginning of the time slot from which to search (inclusive), where iteration starts.
* @param timeTo the end of the time slot from which to search (inclusive), where iteration ends.
* @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}, from beginning to end of time.
* @throws InvalidStateStoreException if the store is not initialized
* @throws NullPointerException if {@code null} is used for any key
* @throws IllegalArgumentException if duration is negative or can't be represented as {@code long milliseconds}
*/
KeyValueIterator<Windowed<K>, V> fetchAll(Instant timeFrom, Instant timeTo) throws IllegalArgumentException;

/**
* Gets all the key-value pairs that belong to the windows within in the given time range in backward order
* with respect to time (from end to beginning of time).
*
* @param timeFrom the beginning of the time slot from which to search (inclusive), where iteration ends.
* @param timeTo the end of the time slot from which to search (inclusive), where iteration starts.
* @return an backward iterator over windowed key-value pairs {@code <Windowed<K>, value>}, from end to beginning of time.
* @throws InvalidStateStoreException if the store is not initialized
* @throws NullPointerException if {@code null} is used for any key
* @throws IllegalArgumentException if duration is negative or can't be represented as {@code long milliseconds}
* @throws NullPointerException if {@code null} is used for any key
* @throws IllegalArgumentException if duration is negative or can't be represented as {@code long milliseconds}
*/
KeyValueIterator<Windowed<K>, V> fetchAll(Instant from, Instant to) throws IllegalArgumentException;
KeyValueIterator<Windowed<K>, V> backwardFetchAll(Instant timeFrom, Instant timeTo) throws IllegalArgumentException;
}
Loading

0 comments on commit 4f06d9e

Please sign in to comment.