Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Synchronize Runtime Updates Sending #1691

Merged
merged 7 commits into from
Apr 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,16 @@ public ProfilingInfo[] getProfilingInfo() {
public boolean wasCached() {
return wasCached;
}

/** @return {@code true} when the type differs from the cached value. */
public boolean isTypeChanged() {
return !Objects.equals(type, cachedType);
}

/** @return {@code true} when the function call differs from the cached value. */
public boolean isFunctionCallChanged() {
return !Objects.equals(callInfo, cachedCallInfo);
}
}

/** Information about the function call. */
Expand Down Expand Up @@ -273,6 +283,7 @@ private static class IdExecutionEventListener implements ExecutionEventListener
private final Consumer<Exception> onExceptionalCallback;
private final RuntimeCache cache;
private final MethodCallsCache callsCache;
private final UpdatesSynchronizationState syncState;
private final UUID nextExecutionItem;
private final Map<UUID, FunctionCallInfo> calls = new HashMap<>();
private final Timer timer;
Expand All @@ -284,6 +295,7 @@ private static class IdExecutionEventListener implements ExecutionEventListener
* @param entryCallTarget the call target being observed.
* @param cache the precomputed expression values.
* @param methodCallsCache the storage tracking the executed method calls.
* @param syncState the synchronization state of runtime updates.
* @param nextExecutionItem the next item scheduled for execution.
* @param functionCallCallback the consumer of function call events.
* @param onComputedCallback the consumer of the computed value events.
Expand All @@ -295,6 +307,7 @@ public IdExecutionEventListener(
CallTarget entryCallTarget,
RuntimeCache cache,
MethodCallsCache methodCallsCache,
UpdatesSynchronizationState syncState,
UUID nextExecutionItem, // The expression ID
Consumer<ExpressionCall> functionCallCallback,
Consumer<ExpressionValue> onComputedCallback,
Expand All @@ -304,6 +317,7 @@ public IdExecutionEventListener(
this.entryCallTarget = entryCallTarget;
this.cache = cache;
this.callsCache = methodCallsCache;
this.syncState = syncState;
this.nextExecutionItem = nextExecutionItem;
this.functionCallCallback = functionCallCallback;
this.onComputedCallback = onComputedCallback;
Expand Down Expand Up @@ -382,20 +396,29 @@ public void onReturnValue(EventContext context, VirtualFrame frame, Object resul
UUID nodeId = ((ExpressionNode) node).getId();
String resultType = Types.getName(result);

String cachedType = cache.getType(nodeId);
FunctionCallInfo call = calls.get(nodeId);
FunctionCallInfo cachedCall = cache.getCall(nodeId);
ProfilingInfo[] profilingInfo = new ProfilingInfo[] {new ExecutionTime(nanoTimeElapsed)};

ExpressionValue expressionValue =
new ExpressionValue(
nodeId, result, resultType, cachedType, call, cachedCall, profilingInfo, false);
if (expressionValue.isTypeChanged() || expressionValue.isFunctionCallChanged()) {
syncState.setExpressionUnsync(nodeId);
}
syncState.setVisualisationUnsync(nodeId);

// Panics are not cached because a panic can be fixed by changing seemingly unrelated code,
// like imports, and the invalidation mechanism can not always track those changes and
// appropriately invalidate all dependent expressions.
if (!isPanic) {
cache.offer(nodeId, result);
}
String cachedType = cache.putType(nodeId, resultType);
FunctionCallInfo call = calls.get(nodeId);
FunctionCallInfo cachedCall = cache.putCall(nodeId, call);
ProfilingInfo[] profilingInfo = new ProfilingInfo[] {new ExecutionTime(nanoTimeElapsed)};
cache.putType(nodeId, resultType);
cache.putCall(nodeId, call);

onComputedCallback.accept(
new ExpressionValue(
nodeId, result, resultType, cachedType, call, cachedCall, profilingInfo, false));
onComputedCallback.accept(expressionValue);
if (isPanic) {
throw context.createUnwind(result);
}
Expand Down Expand Up @@ -474,6 +497,7 @@ private UUID getNodeId(Node node) {
* @param locationFilter the location filter.
* @param cache the precomputed expression values.
* @param methodCallsCache the storage tracking the executed method calls.
* @param syncState the synchronization state of runtime updates.
* @param nextExecutionItem the next item scheduled for execution.
* @param functionCallCallback the consumer of function call events.
* @param onComputedCallback the consumer of the computed value events.
Expand All @@ -486,6 +510,7 @@ public EventBinding<ExecutionEventListener> bind(
LocationFilter locationFilter,
RuntimeCache cache,
MethodCallsCache methodCallsCache,
UpdatesSynchronizationState syncState,
UUID nextExecutionItem,
Consumer<IdExecutionInstrument.ExpressionCall> functionCallCallback,
Consumer<IdExecutionInstrument.ExpressionValue> onComputedCallback,
Expand All @@ -505,6 +530,7 @@ public EventBinding<ExecutionEventListener> bind(
entryCallTarget,
cache,
methodCallsCache,
syncState,
nextExecutionItem,
functionCallCallback,
onComputedCallback,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package org.enso.interpreter.instrument;

import java.lang.ref.SoftReference;
import java.util.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;

/** A storage for computed values. */
public final class RuntimeCache {
Expand Down Expand Up @@ -88,6 +91,15 @@ public Set<UUID> getCalls() {
return calls.keySet();
}

/**
* Remove the function call from the cache.
*
* @param key the expression associated with the function call.
*/
public void removeCall(UUID key) {
calls.remove(key);
}

/** Clear the cached calls. */
public void clearCalls() {
calls.clear();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package org.enso.interpreter.instrument;
4e6 marked this conversation as resolved.
Show resolved Hide resolved

import java.util.HashSet;
import java.util.Set;
import java.util.UUID;

/**
* The synchronization state of runtime updates.
*
* <p>The thread executing the program can be interrupted at any moment. For example, the interrupt
* may happen when an expression is computed and the runtime state is changed, but before the update
* is sent to the user. And since the runtime state has changed, the server won't send the updates
* during the next execution. This class is supposed to address this issue keeping in sync the
* runtime state and the update messages.
*
* <h1>Implementation
*
* <p>When implementing the synchronization, keep in mind the following principles:
*
* <ul>
* <li>Remove the synchronization flag before changing the server state.
* <li>Set the synchronization flag after the update is sent to the user.
* </ul>
*
* This way the server is guaranteed to send the update message at least once, regardless of when
* the thread interrupt has occurred.
*
* <p>The state consists of the following components:
*
* <ul>
* <li>Expressions state. Tracks all message updates that are sent when the expression metadata
* (e.g. the type or the underlying method pointer) is changed.
* <li>Method pointers state. Tracks message updates containing method pointers. Messages with
* method pointers are tracked separately from the expressions state because they have
* different invalidation rules. E.g., they should always be re-sent when the execution item
* is popped from the stack.
* <li>Visualisations state. Tracks the state of visualisation updates.
* </ul>
*/
public class UpdatesSynchronizationState {

private final Set<UUID> expressionsState = new HashSet<>();
private final Set<UUID> visualisationsState = new HashSet<>();
private final Set<UUID> methodPointersState = new HashSet<>();

@Override
public String toString() {
return "UpdatesSynchronizationState{"
+ "expressionsState="
+ expressionsState
+ ", visualisationsState="
+ visualisationsState
+ ", methodPointersState="
+ methodPointersState
+ '}';
}

/**
* Invalidate the state of the given expression.
*
* @param key the expression id.
*/
public void invalidate(UUID key) {
synchronized (this) {
expressionsState.remove(key);
visualisationsState.remove(key);
methodPointersState.remove(key);
}
}

/* Expressions */

/**
* Checks if the given expression update is synchronized.
*
* @param key the expression id.
* @return {@code true} if the expression update is synchronized.
*/
public boolean isExpressionSync(UUID key) {
synchronized (expressionsState) {
return expressionsState.contains(key);
}
}

/**
* Marks the given expression update as unsynchronized.
*
* @param key the expression id.
*/
public void setExpressionUnsync(UUID key) {
synchronized (expressionsState) {
expressionsState.remove(key);
}
}

/**
* Marks the given expression update as synchronized.
*
* @param key the expression id.
*/
public void setExpressionSync(UUID key) {
synchronized (expressionsState) {
expressionsState.add(key);
}
}

/* Visualisations */

/**
* Checks if the given visualisation update is synchronized.
*
* @param key the expression id.
* @return {@code true} if the visualisation update is synchronized.
*/
public boolean isVisualisationSync(UUID key) {
synchronized (visualisationsState) {
return visualisationsState.contains(key);
}
}

/**
* Marks the given visualisation update as unsynchronized.
*
* @param key the expression id.
*/
public void setVisualisationUnsync(UUID key) {
synchronized (visualisationsState) {
visualisationsState.remove(key);
}
}

/**
* Marks the given visualisation update as synchronized.
*
* @param key the expression id.
*/
public void setVisualisationSync(UUID key) {
synchronized (visualisationsState) {
visualisationsState.add(key);
}
}

/* Method pointers */

/**
* Checks if the given method pointer is synchronized.
*
* @param key the expression id.
* @return {@code true} if the method pointer update is synchronized.
*/
public boolean isMethodPointerSync(UUID key) {
synchronized (methodPointersState) {
return methodPointersState.contains(key);
}
}

/**
* Marks the method pointer as synchronized.
*
* @param key the expression id.
*/
public void setMethodPointerSync(UUID key) {
synchronized (methodPointersState) {
methodPointersState.add(key);
}
}

/** Clears the synchronization state of all method pointers. */
public void clearMethodPointersState() {
synchronized (methodPointersState) {
methodPointersState.clear();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.enso.interpreter.instrument.IdExecutionInstrument;
import org.enso.interpreter.instrument.MethodCallsCache;
import org.enso.interpreter.instrument.RuntimeCache;
import org.enso.interpreter.instrument.UpdatesSynchronizationState;
import org.enso.interpreter.instrument.execution.LocationFilter;
import org.enso.interpreter.node.callable.FunctionCallInstrumentationNode;
import org.enso.interpreter.node.expression.builtin.text.util.TypeToDisplayTextNodeGen;
Expand Down Expand Up @@ -92,6 +93,7 @@ private FunctionCallInstrumentationNode.FunctionCall prepareFunctionCall(
* @param call the call metadata.
* @param cache the precomputed expression values.
* @param methodCallsCache the storage tracking the executed method calls.
* @param syncState the synchronization state of runtime updates.
* @param nextExecutionItem the next item scheduled for execution.
* @param funCallCallback the consumer for function call events.
* @param onComputedCallback the consumer of the computed value events.
Expand All @@ -103,6 +105,7 @@ public void execute(
FunctionCallInstrumentationNode.FunctionCall call,
RuntimeCache cache,
MethodCallsCache methodCallsCache,
UpdatesSynchronizationState syncState,
UUID nextExecutionItem,
Consumer<IdExecutionInstrument.ExpressionCall> funCallCallback,
Consumer<IdExecutionInstrument.ExpressionValue> onComputedCallback,
Expand All @@ -122,6 +125,7 @@ public void execute(
locationFilter,
cache,
methodCallsCache,
syncState,
nextExecutionItem,
funCallCallback,
onComputedCallback,
Expand All @@ -143,6 +147,7 @@ public void execute(
* @param methodName the method name.
* @param cache the precomputed expression values.
* @param methodCallsCache the storage tracking the executed method calls.
* @param syncState the synchronization state of runtime updates.
* @param nextExecutionItem the next item scheduled for execution.
* @param funCallCallback the consumer for function call events.
* @param onComputedCallback the consumer of the computed value events.
Expand All @@ -155,6 +160,7 @@ public void execute(
String methodName,
RuntimeCache cache,
MethodCallsCache methodCallsCache,
UpdatesSynchronizationState syncState,
UUID nextExecutionItem,
Consumer<IdExecutionInstrument.ExpressionCall> funCallCallback,
Consumer<IdExecutionInstrument.ExpressionValue> onComputedCallback,
Expand All @@ -171,6 +177,7 @@ public void execute(
call,
cache,
methodCallsCache,
syncState,
nextExecutionItem,
funCallCallback,
onComputedCallback,
Expand Down
Loading