Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-9929: Support backward iterator on SessionStore #9139

Merged
merged 15 commits into from
Oct 8, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ public class SessionKey {

/**
* Create a new session key with the given key value and creation timestamp
* @param key the actual cryptographic key to use for request validation; may not be null
*
* @param key the actual cryptographic key to use for request validation; may not be null
* @param creationTimestamp the time at which the key was generated
*/
public SessionKey(SecretKey key, long creationTimestamp) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,35 +24,156 @@
* Implementations should be thread-safe as concurrent reads and writes
* are expected.
*
* @param <K> the key type
* @param <K> the key type
* @param <AGG> the aggregated value type
*/
public interface ReadOnlySessionStore<K, AGG> {

/**
* Retrieve all aggregated sessions for the provided key.
* Fetch any sessions with the matching key and the sessions end is &ge; earliestSessionEndTime and the sessions
* start is &le; latestSessionStartTime iterating from earliest to latest.
* <p>
* This iterator must be closed after use.
*
* @param key the key to return sessions for
* @param earliestSessionEndTime the end timestamp of the earliest session to search for, where iteration starts.
* @param latestSessionStartTime the end timestamp of the latest session to search for, where iteration ends.
* @return iterator of sessions with the matching key and aggregated values, from earliest to latest session time.
* @throws NullPointerException If null is used for key.
*/
default KeyValueIterator<Windowed<K>, AGG> findSessions(final K key,
final long earliestSessionEndTime,
final long latestSessionStartTime) {
throw new UnsupportedOperationException("Moved from SessionStore");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It won't matter to users whether this method was moved from another interface or not. They just need to know why they're getting the exception. I.e., we just need to tell them that the store implementation they selected didn't implement the method.

Suggested change
throw new UnsupportedOperationException("Moved from SessionStore");
throw new UnsupportedOperationException("This API is not supported by this implementation of ReadOnlySessionStore.");

We should say the exact same thing in all default implementations. Right now, they're a bit inconsistent.

}

/**
* Fetch any sessions with the matching key and the sessions end is &ge; earliestSessionEndTime and the sessions
* start is &le; latestSessionStartTime iterating from latest to earliest.
* <p>
* This iterator must be closed after use.
*
* @param key the key to return sessions for
* @param earliestSessionEndTime the end timestamp of the earliest session to search for, where iteration ends.
* @param latestSessionStartTime the end timestamp of the latest session to search for, where iteration starts.
* @return backward iterator of sessions with the matching key and aggregated values, from latest to earliest session time.
* @throws NullPointerException If null is used for key.
*/
default KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K key,
final long earliestSessionEndTime,
final long latestSessionStartTime) {
throw new UnsupportedOperationException();
}

/**
* Fetch any sessions in the given range of keys and the sessions end is &ge; earliestSessionEndTime and the sessions
* start is &le; latestSessionStartTime iterating from earliest to latest.
* <p>
* This iterator must be closed after use.
*
* @param keyFrom The first key that could be in the range
* @param keyTo The last key that could be in the range
* @param earliestSessionEndTime the end timestamp of the earliest session to search for, where iteration starts.
* @param latestSessionStartTime the end timestamp of the latest session to search for, where iteration ends.
* @return iterator of sessions with the matching keys and aggregated values, from earliest to latest session time.
* @throws NullPointerException If null is used for any key.
*/
default KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom,
final K keyTo,
final long earliestSessionEndTime,
final long latestSessionStartTime) {
throw new UnsupportedOperationException("Moved from SessionStore");
}


/**
* Fetch any sessions in the given range of keys and the sessions end is &ge; earliestSessionEndTime and the sessions
* start is &le; latestSessionStartTime iterating from latest to earliest.
* <p>
* This iterator must be closed after use.
*
* @param keyFrom The first key that could be in the range
* @param keyTo The last key that could be in the range
* @param earliestSessionEndTime the end timestamp of the earliest session to search for, where iteration ends.
* @param latestSessionStartTime the end timestamp of the latest session to search for, where iteration starts.
* @return backward iterator of sessions with the matching keys and aggregated values, from latest to earliest session time.
* @throws NullPointerException If null is used for any key.
*/
default KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K keyFrom,
final K keyTo,
final long earliestSessionEndTime,
final long latestSessionStartTime) {
throw new UnsupportedOperationException();
}

/**
* Get the value of key from a single session.
*
* @param key the key to fetch
* @param startTime start timestamp of the session
* @param endTime end timestamp of the session
* @return The value or {@code null} if no session associated with the key can be found
* @throws NullPointerException If {@code null} is used for any key.
*/
default AGG fetchSession(final K key, final long startTime, final long endTime) {
throw new UnsupportedOperationException("Moved from SessionStore");
}

/**
* Retrieve all aggregated sessions for the provided key.
* This iterator must be closed after use.
* <p>
* For each key, the iterator guarantees ordering of sessions, starting from the oldest/earliest
* available session to the newest/latest session.
*
* @param key record key to find aggregated session values for
* @return KeyValueIterator containing all sessions for the provided key.
* @throws NullPointerException If null is used for key.
*
* @param key record key to find aggregated session values for
* @return KeyValueIterator containing all sessions for the provided key, from oldest to newest session.
* @throws NullPointerException If null is used for key.
*/
KeyValueIterator<Windowed<K>, AGG> fetch(final K key);

/**
* Retrieve all aggregated sessions for the given range of keys.
* Retrieve all aggregated sessions for the provided key.
* This iterator must be closed after use.
* <p>
* For each key, the iterator guarantees ordering of sessions, starting from the newest/latest
* available session to the oldest/earliest session.
*
* @param key record key to find aggregated session values for
* @return backward KeyValueIterator containing all sessions for the provided key, from newest to oldest session.
* @throws NullPointerException If null is used for key.
*/
default KeyValueIterator<Windowed<K>, AGG> backwardFetch(final K key) {
throw new UnsupportedOperationException();
}

/**
* Retrieve all aggregated sessions for the given range of keys.
* This iterator must be closed after use.
* <p>
* For each key, the iterator guarantees ordering of sessions, starting from the oldest/earliest
* available session to the newest/latest session.
*
* @param from first key in the range to find aggregated session values for
* @param to last key in the range to find aggregated session values for
* @return KeyValueIterator containing all sessions for the provided key.
* @throws NullPointerException If null is used for any of the keys.
* @param from first key in the range to find aggregated session values for
* @param to last key in the range to find aggregated session values for
* @return KeyValueIterator containing all sessions for the provided key, from oldest to newest session.
* @throws NullPointerException If null is used for any of the keys.
*/
KeyValueIterator<Windowed<K>, AGG> fetch(final K from, final K to);

/**
* Retrieve all aggregated sessions for the given range of keys.
* This iterator must be closed after use.
* <p>
* For each key, the iterator guarantees ordering of sessions, starting from the newest/latest
* available session to the oldest/earliest session.
*
* @param from first key in the range to find aggregated session values for
* @param to last key in the range to find aggregated session values for
* @return backward KeyValueIterator containing all sessions for the provided key, from newest to oldest session.
* @throws NullPointerException If null is used for any of the keys.
*/
default KeyValueIterator<Windowed<K>, AGG> backwardFetch(final K from, final K to) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,55 +34,17 @@
*/
public interface SessionStore<K, AGG> extends StateStore, ReadOnlySessionStore<K, AGG> {

/**
* Fetch any sessions with the matching key and the sessions end is &ge; earliestSessionEndTime and the sessions
* start is &le; latestSessionStartTime
*
* This iterator must be closed after use.
*
* @param key the key to return sessions for
* @param earliestSessionEndTime the end timestamp of the earliest session to search for
* @param latestSessionStartTime the end timestamp of the latest session to search for
* @return iterator of sessions with the matching key and aggregated values
* @throws NullPointerException If null is used for key.
*/
KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime);

/**
* Fetch any sessions in the given range of keys and the sessions end is &ge; earliestSessionEndTime and the sessions
* start is &le; latestSessionStartTime
*
* This iterator must be closed after use.
*
* @param keyFrom The first key that could be in the range
* @param keyTo The last key that could be in the range
* @param earliestSessionEndTime the end timestamp of the earliest session to search for
* @param latestSessionStartTime the end timestamp of the latest session to search for
* @return iterator of sessions with the matching keys and aggregated values
* @throws NullPointerException If null is used for any key.
*/
KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime);

/**
* Get the value of key from a single session.
*
* @param key the key to fetch
* @param startTime start timestamp of the session
* @param endTime end timestamp of the session
* @return The value or {@code null} if no session associated with the key can be found
* @throws NullPointerException If {@code null} is used for any key.
*/
AGG fetchSession(final K key, final long startTime, final long endTime);

/**
* Remove the session aggregated with provided {@link Windowed} key from the store
*
* @param sessionKey key of the session to remove
* @throws NullPointerException If null is used for sessionKey.
*/
void remove(final Windowed<K> sessionKey);

/**
* Write the aggregated value for the provided key to the store
*
* @param sessionKey key of the session to write
* @param aggregate the aggregated value for the session, it can be null;
* if the serialized bytes are also null it is interpreted as deletes
Expand Down
Loading