Skip to content

Commit

Permalink
fix: fix session serialization for PUSH with websocket (#120)
Browse files Browse the repository at this point in the history
When PUSH transport is websocket, the request associate to atmosphere is a stub implementation
that always return false for 'isRequestedSessionIdValid()' preventing PushSessionTracker to
serialize the session.
In addition, accessing the request from a background thread may throw IllegalStateException if
the servlet container is recycling request objects (e.g. Tomcat facade).
With this change, the session validiy is checked against the SessionListener that is already
observing session lifecycle events.

Fixes #119
  • Loading branch information
mcollovati authored Apr 19, 2024
1 parent d2f282a commit f0c58ed
Show file tree
Hide file tree
Showing 8 changed files with 281 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import jakarta.servlet.FilterRegistration;
import jakarta.servlet.ServletContext;

import java.util.function.Predicate;

import com.hazelcast.config.Config;
Expand Down Expand Up @@ -47,7 +48,6 @@
import com.vaadin.kubernetes.starter.sessiontracker.backend.BackendConnector;
import com.vaadin.kubernetes.starter.sessiontracker.backend.HazelcastConnector;
import com.vaadin.kubernetes.starter.sessiontracker.backend.RedisConnector;
import com.vaadin.kubernetes.starter.sessiontracker.push.PushSendListener;
import com.vaadin.kubernetes.starter.sessiontracker.push.PushSessionTracker;
import com.vaadin.kubernetes.starter.sessiontracker.serialization.SpringTransientHandler;
import com.vaadin.kubernetes.starter.sessiontracker.serialization.TransientHandler;
Expand Down Expand Up @@ -155,21 +155,26 @@ public static Predicate<Class<?>> withVaadinDefaultFilter(
@Order(Integer.MIN_VALUE + 50)
FilterRegistrationBean<SessionTrackerFilter> sessionTrackerFilterRegistration(
BackendConnector backendConnector,
SessionSerializer sessionSerializer) {
SessionSerializer sessionSerializer,
PushSessionTracker pushSessionTracker) {
SessionListener sessionListener = sessionListener(backendConnector,
sessionSerializer);
pushSessionTracker.setActiveSessionChecker(
sessionListener.activeSessionChecker());
return new FilterRegistrationBean<>(
sessionTrackerFilter(sessionSerializer)) {
@Override
protected FilterRegistration.Dynamic addRegistration(
String description, ServletContext servletContext) {
servletContext.addListener(sessionListener(backendConnector,
sessionSerializer));
servletContext.addListener(sessionListener);
return super.addRegistration(description, servletContext);
}
};
}

@Bean
PushSendListener pushSendListener(SessionSerializer sessionSerializer) {
PushSessionTracker pushSendListener(
SessionSerializer sessionSerializer) {
return new PushSessionTracker(sessionSerializer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
import jakarta.servlet.http.HttpSessionEvent;
import jakarta.servlet.http.HttpSessionListener;

import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -42,6 +46,7 @@ public class SessionListener implements HttpSessionListener {

private final BackendConnector sessionBackendConnector;
private final SessionSerializer sessionSerializer;
private final Set<String> activeSessions = ConcurrentHashMap.newKeySet();

/**
* Creates a new {@link SessionListener} instance.
Expand All @@ -63,6 +68,7 @@ public SessionListener(BackendConnector sessionBackendConnector,
public void sessionCreated(HttpSessionEvent se) {
HttpSession session = se.getSession();
getLogger().debug("Session with id {} created", session.getId());
activeSessions.add(session.getId());
String clusterKey = CurrentKey.get();
if (clusterKey != null) {
SessionInfo sessionInfo = sessionBackendConnector
Expand All @@ -88,6 +94,7 @@ public void sessionCreated(HttpSessionEvent se) {
public void sessionDestroyed(HttpSessionEvent se) {
HttpSession session = se.getSession();
String sessionId = session.getId();
activeSessions.remove(sessionId);
getLogger().debug("Session with id {} destroyed", sessionId);
SessionTrackerCookie.getFromSession(session).ifPresent(clusterKey -> {
getLogger().debug(
Expand All @@ -106,6 +113,16 @@ public void sessionDestroyed(HttpSessionEvent se) {
});
}

/**
* Gets a predicate that tests if the given identifier matches an active
* HTTP session.
*
* @return a predicate to check if an HTTP session is active or not.
*/
public Predicate<String> activeSessionChecker() {
return activeSessions::contains;
}

static Logger getLogger() {
return LoggerFactory.getLogger(SessionListener.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
*/
package com.vaadin.kubernetes.starter.sessiontracker.push;

import java.util.function.Consumer;

import org.atmosphere.cpr.AtmosphereResource;

import com.vaadin.flow.component.UI;
Expand All @@ -33,13 +35,23 @@ public NotifyingPushConnection(UI ui) {
super(ui);
}

@Override
public void connect(AtmosphereResource resource) {
super.connect(resource);
notifyPushListeners(listener -> listener.onConnect(resource));
}

@Override
protected void sendMessage(String message) {
super.sendMessage(message);
AtmosphereResource resource = getResource();
notifyPushListeners(listener -> listener.onMessageSent(resource));
}

private void notifyPushListeners(Consumer<PushSendListener> action) {
getUI().getSession().getService().getContext()
.getAttribute(Lookup.class).lookupAll(PushSendListener.class)
.forEach(listener -> listener.onMessageSent(resource));
.forEach(action);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,28 @@
import org.atmosphere.cpr.AtmosphereResource;

/**
* Associates AtmosphereResource with a PushConnection identifier in order to be
* able to reattach them later on.
* Component notified when a UIDL message is sent to the client via PUSH
* mechanism.
* <p>
* </p>
* The component is also notified when the PUSH connection is established in
* order to perform initialization tasks for the connected resource.
* <p>
* </p>
* Implementation must be thread safe, since method invocation may originate in
* different threads.
*/
public interface PushSendListener {

/**
* Invoked when a new PUSH connection is established.
*
* @param resource
* the {@link AtmosphereResource} behind the PUSH connection.
*/
default void onConnect(AtmosphereResource resource) {
}

/**
* Invoked whenever a UIDL message has been sent to the client.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@

import jakarta.servlet.http.HttpSession;

import java.util.Objects;
import java.util.Optional;
import java.util.function.Predicate;

import org.atmosphere.cpr.AtmosphereResource;
import org.atmosphere.cpr.AtmosphereResourceSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -31,29 +36,93 @@ public class PushSessionTracker implements PushSendListener {

private final SessionSerializer sessionSerializer;

private Predicate<String> activeSessionChecker = id -> true;

public PushSessionTracker(SessionSerializer sessionSerializer) {
this.sessionSerializer = sessionSerializer;
}

/**
* Sets the active HTTP session checker.
*
* @param activeSessionChecker
* active HTTP session checker.
*/
public void setActiveSessionChecker(
Predicate<String> activeSessionChecker) {
this.activeSessionChecker = Objects.requireNonNull(activeSessionChecker,
"session checker must not be null");
}

@Override
public void onConnect(AtmosphereResource resource) {
// The HTTP request associate to the resource might not be available
// after connection for example because recycled by the servlet
// container.
// To be able to always get the correct cluster key, it is fetched
// during connection and stored in the atmosphere resource session.
AtmosphereResourceSession resourceSession = resource
.getAtmosphereConfig().sessionFactory().getSession(resource);
tryGetSerializationKey(resource).ifPresent(key -> resourceSession
.setAttribute(CurrentKey.COOKIE_NAME, key));
}

@Override
public void onMessageSent(AtmosphereResource resource) {
HttpSession session = resource.session(false);
if (session != null && resource.getRequest().wrappedRequest()
.isRequestedSessionIdValid()) {
SessionTrackerCookie
.getValue(resource.getRequest().wrappedRequest())
.ifPresent(CurrentKey::set);
getLogger().debug("Serializing session {} with key {}",
session.getId(), CurrentKey.get());
try {
sessionSerializer.serialize(session);
} finally {
CurrentKey.clear();
HttpSession httpSession = resource.session(false);
if (httpSession != null
&& activeSessionChecker.test(httpSession.getId())) {
tryGetSerializationKey(resource).ifPresent(CurrentKey::set);
if (CurrentKey.get() != null) {
getLogger().debug("Serializing session {} with key {}",
httpSession.getId(), CurrentKey.get());
try {
sessionSerializer.serialize(httpSession);
} finally {
CurrentKey.clear();
}
} else {
getLogger().debug(
"Skipping session serialization. Missing serialization key.");
}
} else {
getLogger().debug(
"Skipping session serialization. Session is invalidated");
"Skipping session serialization. Session not available");
}
}

private Optional<String> tryGetSerializationKey(
AtmosphereResource resource) {

AtmosphereResourceSession resourceSession = resource
.getAtmosphereConfig().sessionFactory()
.getSession(resource, false);
HttpSession httpSession = resource.session(false);
String key = null;
if (resourceSession != null) {
key = resourceSession.getAttribute(CurrentKey.COOKIE_NAME,
String.class);
}
if (key == null) {
try {
key = SessionTrackerCookie
.getValue(resource.getRequest().wrappedRequest())
.orElse(null);
} catch (Exception ex) {
getLogger().debug("Cannot get serialization key from request",
ex);
}
}
if (key == null && httpSession != null) {
try {
key = SessionTrackerCookie.getFromSession(httpSession)
.orElse(null);
} catch (Exception ex) {
getLogger().debug("Cannot get serialization key from session",
ex);
}
}
return Optional.ofNullable(key);
}

private Logger getLogger() {
Expand Down
Loading

0 comments on commit f0c58ed

Please sign in to comment.