Skip to content

Commit

Permalink
Fix wrong semaphore address
Browse files Browse the repository at this point in the history
  • Loading branch information
borkaehw committed Dec 13, 2018
1 parent 87f4ae3 commit ecc8854
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ public class WorkerMultiplexer extends Thread {
private static Semaphore semInstanceMap = new Semaphore(1);
private static Map<Integer, WorkerMultiplexer> instanceMap = new HashMap<>();
private Map<Integer, InputStream> responseMap;
private Map<Integer, Semaphore> semResponseNotEmpty;
private Map<Integer, Semaphore> responseChecker;
private Semaphore semResponseMap;
private Semaphore semResponseChecker;
private Semaphore semAccessProcess;

private Subprocess process;
Expand All @@ -49,7 +50,8 @@ public class WorkerMultiplexer extends Thread {
WorkerMultiplexer(Integer workerHash) {
semAccessProcess = new Semaphore(1);
semResponseMap = new Semaphore(1);
semResponseNotEmpty = new HashMap<>();
semResponseChecker = new Semaphore(1);
responseChecker = new HashMap<>();
responseMap = new HashMap<>();
this.workerHash = workerHash;

Expand Down Expand Up @@ -171,12 +173,19 @@ public synchronized void putRequest(byte[] request) throws IOException {
stdin.flush();
}

public InputStream getResponse(int workerId) throws Exception {
public InputStream getResponse(int workerId) throws InterruptedException {
Semaphore waitForResponse;
try {
semResponseNotEmpty.get(workerId).acquire();
semResponseChecker.acquire();
waitForResponse = responseChecker.get(workerId);
} catch (InterruptedException e) {
throw e;
} finally {
semResponseChecker.release();
}

waitForResponse.acquire();

try {
semResponseMap.acquire();
InputStream response = responseMap.get(workerId);
Expand All @@ -188,8 +197,15 @@ public InputStream getResponse(int workerId) throws Exception {
}
}

public synchronized void setResponseMap(int workerId) {
semResponseNotEmpty.put(workerId, new Semaphore(0));
public void setResponseMap(int workerId) throws InterruptedException {
try {
semResponseChecker.acquire();
responseChecker.put(workerId, new Semaphore(0));
} catch (InterruptedException e) {
throw e;
} finally {
semResponseChecker.release();
}
}

public void waitRequest() throws InterruptedException, IOException {
Expand All @@ -211,21 +227,28 @@ public void waitRequest() throws InterruptedException, IOException {
try {
semResponseMap.acquire();
responseMap.put(workerId, new ByteArrayInputStream(tempOs.toByteArray()));
semResponseNotEmpty.get(workerId).release();
} catch (InterruptedException e) {
throw e;
} finally {
semResponseMap.release();
}

try {
semResponseChecker.acquire();
responseChecker.get(workerId).release();
} catch (InterruptedException e) {
throw e;
} finally {
semResponseChecker.release();
}
}

public void run() {
while (!this.interrupted()) {
try {
waitRequest();
} catch (Exception e) {
System.out.println("Receiver killed");
// Terminate the thread
e.printStackTrace();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ InputStream getInputStream() {
byte[] requestBytes = request.toByteArray();
request.reset();
try {
workerMultiplexer.putRequest(requestBytes);
workerMultiplexer.setResponseMap(workerId);
workerMultiplexer.putRequest(requestBytes);
return workerMultiplexer.getResponse(workerId);
} catch (Exception e) {
e.printStackTrace();
Expand Down

0 comments on commit ecc8854

Please sign in to comment.