Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: improve debug tool performance #158

Merged
merged 7 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -272,9 +272,8 @@ private void queueSerialization(String sessionId,
// sense to retry the operation instantly.
CompletableFuture.runAsync(
() -> backendConnector.markSerializationStarted(clusterKey),
executorService).whenComplete((unused, error) -> {
executorService).handle((unused, error) -> {
if (error != null) {
pending.remove(sessionId);
getLogger().debug(
"Failed marking serialization start for of session {} with distributed key {}",
sessionId, clusterKey, error);
Expand All @@ -287,6 +286,13 @@ private void queueSerialization(String sessionId,
handleSessionSerialization(sessionId, attributes,
whenSerialized);
}
return null;
}).whenComplete((unused, error) -> {
pending.remove(sessionId);
if (error != null) {
getLogger().error("Serialization of session {} failed",
sessionId, error);
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,12 @@ private void writeTrackingMetadata() throws IOException {
List<Track> trackList = tracking.values().stream()
.filter(Objects::nonNull)
.map(t -> t.assignHandle(this::lookupObjectHandle))
.filter(t -> t.getHandle() != -1)
.collect(Collectors.toList());
cast.markMetadata();
reset();
writeStreamHeader();
writeObject(true);
writeObject(true); // debug flag
writeObject(new ArrayList<>(trackList));
cast.copy();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import java.util.IdentityHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;

Expand Down Expand Up @@ -73,10 +74,25 @@ public TransientHandler apply(String sessionId, String clusterKey) {
DebugTransientHandler::new);
}

Job newJob(String sessionId, String clusterKey) {
Job job = new Job(sessionId, clusterKey);
jobs.put(clusterKey, job);
return job;
/**
* Gets a new Job for the current session and cluster key, or an empty
* {@link Optional} if there is a job already in progress.
*
* @param sessionId
* the session id
* @param clusterKey
* the cluster key
* @return a new Job for the current session and cluster key, or an empty
* {@link Optional} if there is a job already in progress.
*/
synchronized Optional<Job> newJob(String sessionId, String clusterKey) {
if (!jobs.containsKey(clusterKey) && jobs.values().stream()
.noneMatch(j -> j.isRunning(sessionId))) {
Job job = new Job(sessionId, clusterKey);
jobs.put(clusterKey, job);
return Optional.of(job);
}
return Optional.empty();
}

void shutdown() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.lang.invoke.SerializedLambda;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -24,10 +25,10 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.Stack;
import java.util.StringJoiner;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand All @@ -48,15 +49,20 @@ class Job {
private static final Pattern SERIALIZEDLAMBDA_CANNOT_CAST = Pattern.compile(
"class java.lang.invoke.SerializedLambda cannot be cast to class ([^ ]+)( |$)");

private CountDownLatch serializationLatch = new CountDownLatch(2);
private final CountDownLatch serializationCompletedLatch = new CountDownLatch(
1);
private final CountDownLatch serializationStartedLatch = new CountDownLatch(
1);
private final String sessionId;
private long startTimeNanos;
private final Set<Outcome> outcome = new LinkedHashSet<>();
private final Map<String, List<String>> messages = new LinkedHashMap<>();
private String clusterKey;
private final Map<Object, Track> tracked = new IdentityHashMap<>();
private final Map<Integer, Track> trackedById = new IdentityHashMap<>();
private final Map<Integer, Track> trackedByHandle = new IdentityHashMap<>();

private final Stack<Track> deserializingStack = new Stack<>();
private final ArrayDeque<Track> deserializingStack = new ArrayDeque<>();
private final Map<String, List<String>> unserializableDetails = new HashMap<>();

private final Map<Integer, SerializedLambda> serializedLambdaMap = new HashMap<>();
Expand All @@ -79,30 +85,54 @@ class Job {
* @return the serialized session holder.
*/
boolean waitForSerializationCompletion(int timeout, Logger logger) {
boolean completed = false;
boolean completed = true;
try {
completed = serializationLatch.await(timeout,
completed = serializationStartedLatch.await(timeout,
TimeUnit.MILLISECONDS);
if (!completed) {
timeout();
logger.error(
"Session serialization timed out because did not complete in {} ms. "
+ "Increase the serialization timeout (in milliseconds) by the "
+ "'vaadin.serialization.timeout' application or system property.",
"Session serialization timed out because it did not start in {} ms, "
+ "most likely because another attempt is already in progress.",
timeout);
return false;
}
} catch (Exception e) { // NOSONAR
logger.error("Testing of session serialization failed", e);
}
if (completed) {
try {
completed = serializationCompletedLatch.await(timeout,
TimeUnit.MILLISECONDS);
if (!completed) {
timeout();
logger.error(
"Session serialization timed out because it did not complete in {} ms. "
+ "Increase the serialization timeout (in milliseconds) using the "
+ "'vaadin.serialization.timeout' application or system property.",
timeout);
return false;
}
} catch (Exception e) { // NOSONAR
logger.error("Testing of session serialization failed", e);
}
}
return completed;
}

boolean isRunning(String sessionId) {
return this.sessionId.equals(sessionId)
&& serializationStartedLatch.getCount() > 0
|| serializationCompletedLatch.getCount() > 0;
}

void reset() {
startTimeNanos = System.nanoTime();
outcome.clear();
messages.clear();
tracked.clear();
trackedById.clear();
trackedByHandle.clear();
deserializingStack.clear();
unserializableDetails.clear();
serializedLambdaMap.clear();
Expand All @@ -111,13 +141,11 @@ void reset() {

void cancel() {
outcome.add(Outcome.CANCELED);
while (serializationLatch.getCount() > 0) {
serializationLatch.countDown();
}
releaseLocks();
}

public void serializationStarted() {
serializationLatch.countDown();
serializationStartedLatch.countDown();
reset();
}

Expand Down Expand Up @@ -173,7 +201,7 @@ void serialized(SessionInfo info) {
}
}
} finally {
serializationLatch.countDown();
serializationCompletedLatch.countDown();
}
}

Expand All @@ -186,6 +214,24 @@ void serializationFailed(Exception ex) {
void timeout() {
outcome.remove(Outcome.SERIALIZATION_FAILED);
outcome.add(Outcome.SERIALIZATION_TIMEOUT);
releaseLocks();
}

private void releaseLocks() {
if (serializationStartedLatch.getCount() > 0) {
serializationStartedLatch.countDown();
}
if (serializationCompletedLatch.getCount() > 0) {
serializationCompletedLatch.countDown();
}
}

void deserializationStarted() {
trackedByHandle.clear();
trackedByHandle.putAll(tracked.values().stream()
.filter(t -> t.getHandle() != -1).collect(Collectors
.toMap(Track::getHandle, Function.identity())));

}

void deserialized() {
Expand Down Expand Up @@ -249,9 +295,8 @@ void pushDeserialization(Track track, Object obj) {
// get serialized instance from tracked objects
// save it and then ensure it is deserialized correctly
int trackId = track.id;
Object serializedObject = tracked.values().stream()
.filter(t -> t.id == trackId).findFirst().map(t -> t.object)
.orElse(null);
Object serializedObject = Optional.of(trackedById.get(trackId))
.map(t -> t.object).orElse(null);
// Following condition should always be true, otherwise it means the
// stream is somehow corrupted
if (serializedObject instanceof SerializedLambda) {
Expand Down Expand Up @@ -293,9 +338,8 @@ void popDeserialization(Track track, Object obj) {
Optional<String> dumpDeserializationStack() {
if (!deserializingStack.isEmpty()) {
return Optional.of(deserializingStack.peek())
.flatMap(stackEntry -> tracked.values().stream().filter(
t -> t.getHandle() == stackEntry.getHandle())
.findFirst())
.flatMap(stackEntry -> Optional.ofNullable(
trackedByHandle.get(stackEntry.getHandle())))
.map(track -> {
StringJoiner joiner = new StringJoiner(
System.lineSeparator());
Expand All @@ -314,6 +358,7 @@ Optional<String> dumpDeserializationStack() {
}

Result complete() {
releaseLocks();
if (outcome.isEmpty()) {
outcome.add(Outcome.SUCCESS);
}
Expand Down Expand Up @@ -361,7 +406,10 @@ private static String tryDetectClassCastTarget(String message) {
public void track(Object object, Track track) {
if (track == null) {
track = new Track(-1, -1, null, null);
} else {
trackedById.put(track.id, track);
}
tracked.put(object, track);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Consumer;

Expand Down Expand Up @@ -234,7 +235,15 @@ private void internalSerializeAndDeserialize(WrappedSession session,
// Work on a copy of the session to avoid overwriting attributes
DebugHttpSession debugHttpSession = new DebugHttpSession(session);
String clusterKey = debugHttpSession.getClusterKey();
Job job = debugBackendConnector.newJob(session.getId(), clusterKey);
Optional<Job> maybeJob = debugBackendConnector.newJob(session.getId(),
clusterKey);
if (maybeJob.isEmpty()) {
LOGGER.debug(
"A serialization test for session {} is already in progress, rejecting request.",
session.getId());
return;
}
Job job = maybeJob.get();
try {
trySerialize(sessionSerializer, debugHttpSession, job);
SessionInfo info = debugBackendConnector.waitForCompletion(job,
Expand Down Expand Up @@ -311,6 +320,7 @@ private static void trySerialize(SessionSerializer serializer,
private static void tryDeserialize(SessionSerializer serializer,
SessionInfo info, HttpSession debugHttpSession, Job job) {
try {
job.deserializationStarted();
serializer.deserialize(info, debugHttpSession);
job.deserialized();
} catch (Exception e) {
Expand Down