Skip to content

Commit

Permalink
KAFKA-9929: Support backward iterator on SessionStore (apache#9139)
Browse files Browse the repository at this point in the history
Implements KIP-617 for `SessionStore`

Reviewers: A. Sophie Blee-Goldman <[email protected]>, John Roesler <[email protected]>
  • Loading branch information
jeqo authored and javierfreire committed Oct 8, 2020
1 parent fa7661a commit 229989f
Show file tree
Hide file tree
Showing 19 changed files with 2,095 additions and 152 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,31 +28,153 @@
* @param <AGG> the aggregated value type
*/
public interface ReadOnlySessionStore<K, AGG> {

/**
* Retrieve all aggregated sessions for the provided key.
* Fetch any sessions with the matching key and the sessions end is &ge; earliestSessionEndTime and the sessions
* start is &le; latestSessionStartTime iterating from earliest to latest.
* <p>
* This iterator must be closed after use.
*
* @param key the key to return sessions for
* @param earliestSessionEndTime the end timestamp of the earliest session to search for, where iteration starts.
* @param latestSessionStartTime the end timestamp of the latest session to search for, where iteration ends.
* @return iterator of sessions with the matching key and aggregated values, from earliest to latest session time.
* @throws NullPointerException If null is used for key.
*/
default KeyValueIterator<Windowed<K>, AGG> findSessions(final K key,
final long earliestSessionEndTime,
final long latestSessionStartTime) {
throw new UnsupportedOperationException("This API is not supported by this implementation of ReadOnlySessionStore.");
}

/**
* Fetch any sessions with the matching key and the sessions end is &ge; earliestSessionEndTime and the sessions
* start is &le; latestSessionStartTime iterating from latest to earliest.
* <p>
* This iterator must be closed after use.
*
* @param key the key to return sessions for
* @param earliestSessionEndTime the end timestamp of the earliest session to search for, where iteration ends.
* @param latestSessionStartTime the end timestamp of the latest session to search for, where iteration starts.
* @return backward iterator of sessions with the matching key and aggregated values, from latest to earliest session time.
* @throws NullPointerException If null is used for key.
*/
default KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K key,
final long earliestSessionEndTime,
final long latestSessionStartTime) {
throw new UnsupportedOperationException("This API is not supported by this implementation of ReadOnlySessionStore.");
}

/**
* Fetch any sessions in the given range of keys and the sessions end is &ge; earliestSessionEndTime and the sessions
* start is &le; latestSessionStartTime iterating from earliest to latest.
* <p>
* This iterator must be closed after use.
*
* @param keyFrom The first key that could be in the range
* @param keyTo The last key that could be in the range
* @param earliestSessionEndTime the end timestamp of the earliest session to search for, where iteration starts.
* @param latestSessionStartTime the end timestamp of the latest session to search for, where iteration ends.
* @return iterator of sessions with the matching keys and aggregated values, from earliest to latest session time.
* @throws NullPointerException If null is used for any key.
*/
default KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom,
final K keyTo,
final long earliestSessionEndTime,
final long latestSessionStartTime) {
throw new UnsupportedOperationException("This API is not supported by this implementation of ReadOnlySessionStore.");
}


/**
* Fetch any sessions in the given range of keys and the sessions end is &ge; earliestSessionEndTime and the sessions
* start is &le; latestSessionStartTime iterating from latest to earliest.
* <p>
* This iterator must be closed after use.
*
* @param keyFrom The first key that could be in the range
* @param keyTo The last key that could be in the range
* @param earliestSessionEndTime the end timestamp of the earliest session to search for, where iteration ends.
* @param latestSessionStartTime the end timestamp of the latest session to search for, where iteration starts.
* @return backward iterator of sessions with the matching keys and aggregated values, from latest to earliest session time.
* @throws NullPointerException If null is used for any key.
*/
default KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K keyFrom,
final K keyTo,
final long earliestSessionEndTime,
final long latestSessionStartTime) {
throw new UnsupportedOperationException("This API is not supported by this implementation of ReadOnlySessionStore.");
}

/**
* Get the value of key from a single session.
*
* @param key the key to fetch
* @param startTime start timestamp of the session
* @param endTime end timestamp of the session
* @return The value or {@code null} if no session associated with the key can be found
* @throws NullPointerException If {@code null} is used for any key.
*/
default AGG fetchSession(final K key, final long startTime, final long endTime) {
throw new UnsupportedOperationException("This API is not supported by this implementation of ReadOnlySessionStore.");
}

/**
* Retrieve all aggregated sessions for the provided key.
* This iterator must be closed after use.
* <p>
* For each key, the iterator guarantees ordering of sessions, starting from the oldest/earliest
* available session to the newest/latest session.
*
* @param key record key to find aggregated session values for
* @return KeyValueIterator containing all sessions for the provided key.
* @return KeyValueIterator containing all sessions for the provided key, from oldest to newest session.
* @throws NullPointerException If null is used for key.
*
*/
KeyValueIterator<Windowed<K>, AGG> fetch(final K key);

/**
* Retrieve all aggregated sessions for the given range of keys.
* Retrieve all aggregated sessions for the provided key.
* This iterator must be closed after use.
* <p>
* For each key, the iterator guarantees ordering of sessions, starting from the newest/latest
* available session to the oldest/earliest session.
*
* @param key record key to find aggregated session values for
* @return backward KeyValueIterator containing all sessions for the provided key, from newest to oldest session.
* @throws NullPointerException If null is used for key.
*/
default KeyValueIterator<Windowed<K>, AGG> backwardFetch(final K key) {
throw new UnsupportedOperationException("This API is not supported by this implementation of ReadOnlySessionStore.");
}

/**
* Retrieve all aggregated sessions for the given range of keys.
* This iterator must be closed after use.
* <p>
* For each key, the iterator guarantees ordering of sessions, starting from the oldest/earliest
* available session to the newest/latest session.
*
* @param from first key in the range to find aggregated session values for
* @param to last key in the range to find aggregated session values for
* @return KeyValueIterator containing all sessions for the provided key.
* @return KeyValueIterator containing all sessions for the provided key, from oldest to newest session.
* @throws NullPointerException If null is used for any of the keys.
*/
KeyValueIterator<Windowed<K>, AGG> fetch(final K from, final K to);

/**
* Retrieve all aggregated sessions for the given range of keys.
* This iterator must be closed after use.
* <p>
* For each key, the iterator guarantees ordering of sessions, starting from the newest/latest
* available session to the oldest/earliest session.
*
* @param from first key in the range to find aggregated session values for
* @param to last key in the range to find aggregated session values for
* @return backward KeyValueIterator containing all sessions for the provided key, from newest to oldest session.
* @throws NullPointerException If null is used for any of the keys.
*/
default KeyValueIterator<Windowed<K>, AGG> backwardFetch(final K from, final K to) {
throw new UnsupportedOperationException("This API is not supported by this implementation of ReadOnlySessionStore.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,46 +34,6 @@
*/
public interface SessionStore<K, AGG> extends StateStore, ReadOnlySessionStore<K, AGG> {

/**
* Fetch any sessions with the matching key and the sessions end is &ge; earliestSessionEndTime and the sessions
* start is &le; latestSessionStartTime
*
* This iterator must be closed after use.
*
* @param key the key to return sessions for
* @param earliestSessionEndTime the end timestamp of the earliest session to search for
* @param latestSessionStartTime the end timestamp of the latest session to search for
* @return iterator of sessions with the matching key and aggregated values
* @throws NullPointerException If null is used for key.
*/
KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime);

/**
* Fetch any sessions in the given range of keys and the sessions end is &ge; earliestSessionEndTime and the sessions
* start is &le; latestSessionStartTime
*
* This iterator must be closed after use.
*
* @param keyFrom The first key that could be in the range
* @param keyTo The last key that could be in the range
* @param earliestSessionEndTime the end timestamp of the earliest session to search for
* @param latestSessionStartTime the end timestamp of the latest session to search for
* @return iterator of sessions with the matching keys and aggregated values
* @throws NullPointerException If null is used for any key.
*/
KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime);

/**
* Get the value of key from a single session.
*
* @param key the key to fetch
* @param startTime start timestamp of the session
* @param endTime end timestamp of the session
* @return The value or {@code null} if no session associated with the key can be found
* @throws NullPointerException If {@code null} is used for any key.
*/
AGG fetchSession(final K key, final long startTime, final long endTime);

/**
* Remove the session aggregated with provided {@link Windowed} key from the store
* @param sessionKey key of the session to remove
Expand Down
Loading

0 comments on commit 229989f

Please sign in to comment.