Skip to content

Commit

Permalink
Refactor Reactor::{connect, connectMulti, canConnect}
Browse files Browse the repository at this point in the history
0. `canConnect` doesn't experience situations where throwing Error is required.
1. `canConnect` is solely for testing and should not throw.
2. `connect` and `connectMulti` should throw. Simply return result from `canConnect` and throw in functions that calls them.
  • Loading branch information
axmmisaka authored and Kagamihara Nadeshiko committed Dec 13, 2023
1 parent f1ac83b commit bfd23ec
Showing 1 changed file with 43 additions and 23 deletions.
66 changes: 43 additions & 23 deletions src/core/reactor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,18 @@ export interface Call<A, R> extends Write<A>, Read<R> {
invoke: (args: A) => R | undefined;
}

export enum CanConnectResult {
SUCCESS = 0,
SELF_LOOP = "Source port and destination port are the same.",
DESTINATION_OCCUPIED = "Destination port is already occupied.",
DOWNSTREAM_WRITE_CONFLICT = "Write conflict: port is already occupied.",
NOT_IN_SCOPE = "Source and destination ports are not in scope.",
RT_CONNECTION_OUTSIDE_CONTAINER = "New connection is outside of container.",
RT_DIRECT_FEED_THROUGH = "New connection introduces direct feed through.",
RT_CYCLE = "New connection introduces cycle.",
MUTATION_CAUSALITY_LOOP = "New connection will change the causal effect of the mutation that triggered this connection."
}

/**
* Abstract class for a schedulable action. It is intended as a wrapper for a
* regular action. In addition to a get method, it also has a schedule method
Expand Down Expand Up @@ -1091,18 +1103,21 @@ export abstract class Reactor extends Component {
* @param src The start point of a new connection.
* @param dst The end point of a new connection.
*/
public canConnect<R, S extends R>(src: IOPort<S>, dst: IOPort<R>): boolean {
public canConnect<R, S extends R>(
src: IOPort<S>,
dst: IOPort<R>
): CanConnectResult {
// Immediate rule out trivial self loops.
if (src === dst) {
throw Error("Source port and destination port are the same.");
return CanConnectResult.SELF_LOOP;
}

// Check the race condition
// - between reactors and reactions (NOTE: check also needs to happen
// in addReaction)
const deps = this._dependencyGraph.getUpstreamNeighbors(dst); // FIXME this will change with multiplex ports
if (deps !== undefined && deps.size > 0) {
throw Error("Destination port is already occupied.");
return CanConnectResult.DESTINATION_OCCUPIED;
}

if (!this._runtime.isRunning()) {
Expand All @@ -1114,10 +1129,13 @@ export abstract class Reactor extends Component {
// Rule out write conflicts.
// - (between reactors)
if (this._dependencyGraph.getDownstreamNeighbors(dst).size > 0) {
return false;
return CanConnectResult.DOWNSTREAM_WRITE_CONFLICT;
}

return this._isInScope(src, dst);
if (!this._isInScope(src, dst)) {
return CanConnectResult.NOT_IN_SCOPE;
}
return CanConnectResult.SUCCESS;
} else {
// Attempt to make a connection while executing.
// Check the local dependency graph to figure out whether this change
Expand All @@ -1131,7 +1149,7 @@ export abstract class Reactor extends Component {
src._isContainedBy(this) &&
dst._isContainedBy(this)
) {
throw Error("New connection is outside of container.");
return CanConnectResult.RT_CONNECTION_OUTSIDE_CONTAINER;
}

// Take the local graph and merge in all the causality interfaces
Expand All @@ -1148,23 +1166,21 @@ export abstract class Reactor extends Component {

// 1) check for loops
const hasCycle = graph.hasCycle();
if (hasCycle) {
return CanConnectResult.RT_CYCLE;
}

// 2) check for direct feed through.
// FIXME: This doesn't handle while direct feed thorugh cases.
let hasDirectFeedThrough = false;
if (src instanceof InPort && dst instanceof OutPort) {
hasDirectFeedThrough = dst.getContainer() === src.getContainer();
}
// Throw error cases
if (hasDirectFeedThrough && hasCycle) {
throw Error("New connection introduces direct feed through and cycle.");
} else if (hasCycle) {
throw Error("New connection introduces cycle.");
} else if (hasDirectFeedThrough) {
throw Error("New connection introduces direct feed through.");
if (
src instanceof InPort &&
dst instanceof OutPort &&
dst.getContainer() === src.getContainer()
) {
return CanConnectResult.RT_DIRECT_FEED_THROUGH;
}

return true;
return CanConnectResult.SUCCESS;
}
}

Expand Down Expand Up @@ -1258,11 +1274,14 @@ export abstract class Reactor extends Component {
if (dst === undefined || dst === null) {
throw new Error("Cannot connect unspecified destination");
}
if (this.canConnect(src, dst)) {
this._uncheckedConnect(src, dst);
} else {
throw new Error(`ERROR connecting ${src} to ${dst}`);
const canConnectResult = this.canConnect(src, dst);
// I know, this looks a bit weird. But
if (canConnectResult !== CanConnectResult.SUCCESS) {
throw new Error(
`ERROR connecting ${src} to ${dst}. Reason is ${canConnectResult.valueOf()}`
);
}
this._uncheckedConnect(src, dst);
}

protected _connectMulti<R, S extends R>(
Expand Down Expand Up @@ -1316,7 +1335,8 @@ export abstract class Reactor extends Component {
}

for (let i = 0; i < leftPorts.length && i < rightPorts.length; i++) {
if (!this.canConnect(leftPorts[i], rightPorts[i])) {
const canConnectResult = this.canConnect(leftPorts[i], rightPorts[i]);
if (canConnectResult !== CanConnectResult.SUCCESS) {
throw new Error(
`ERROR connecting ${leftPorts[i]}
to ${rightPorts[i]}
Expand Down

0 comments on commit bfd23ec

Please sign in to comment.