Skip to content

Commit

Permalink
Fix semaphore incorrectly reset
Browse files Browse the repository at this point in the history
  • Loading branch information
borkaehw committed Dec 12, 2018
1 parent 87f4ae3 commit 33afd6b
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ public class WorkerMultiplexer extends Thread {
private static Semaphore semInstanceMap = new Semaphore(1);
private static Map<Integer, WorkerMultiplexer> instanceMap = new HashMap<>();
private Map<Integer, InputStream> responseMap;
private Map<Integer, Semaphore> semResponseNotEmpty;
private Map<Integer, Semaphore> semResponseMapNotEmpty;
private Semaphore semResponseMap;
private Semaphore semResponseNotEmpty;
private Semaphore semAccessProcess;

private Subprocess process;
Expand All @@ -49,7 +50,8 @@ public class WorkerMultiplexer extends Thread {
WorkerMultiplexer(Integer workerHash) {
semAccessProcess = new Semaphore(1);
semResponseMap = new Semaphore(1);
semResponseNotEmpty = new HashMap<>();
semResponseNotEmpty = new Semaphore(1);
semResponseMapNotEmpty = new HashMap<>();
responseMap = new HashMap<>();
this.workerHash = workerHash;

Expand Down Expand Up @@ -172,11 +174,16 @@ public synchronized void putRequest(byte[] request) throws IOException {
}

public InputStream getResponse(int workerId) throws Exception {
Semaphore waitForResponse;
try {
semResponseNotEmpty.get(workerId).acquire();
semResponseNotEmpty.acquire();
waitForResponse = semResponseMapNotEmpty.get(workerId);
} catch (InterruptedException e) {
throw e;
} finally {
semResponseNotEmpty.release();
}
waitForResponse.acquire();
try {
semResponseMap.acquire();
InputStream response = responseMap.get(workerId);
Expand All @@ -188,8 +195,15 @@ public InputStream getResponse(int workerId) throws Exception {
}
}

public synchronized void setResponseMap(int workerId) {
semResponseNotEmpty.put(workerId, new Semaphore(0));
public void setResponseMap(int workerId) throws InterruptedException {
try {
semResponseNotEmpty.acquire();
semResponseMapNotEmpty.put(workerId, new Semaphore(0));
} catch (InterruptedException e) {
throw e;
} finally {
semResponseNotEmpty.release();
}
}

public void waitRequest() throws InterruptedException, IOException {
Expand All @@ -210,11 +224,13 @@ public void waitRequest() throws InterruptedException, IOException {

try {
semResponseMap.acquire();
semResponseNotEmpty.acquire();
responseMap.put(workerId, new ByteArrayInputStream(tempOs.toByteArray()));
semResponseNotEmpty.get(workerId).release();
semResponseMapNotEmpty.get(workerId).release();
} catch (InterruptedException e) {
throw e;
} finally {
semResponseNotEmpty.release();
semResponseMap.release();
}
}
Expand All @@ -224,8 +240,7 @@ public void run() {
try {
waitRequest();
} catch (Exception e) {
System.out.println("Receiver killed");
// Terminate the thread
e.printStackTrace();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ InputStream getInputStream() {
byte[] requestBytes = request.toByteArray();
request.reset();
try {
workerMultiplexer.putRequest(requestBytes);
workerMultiplexer.setResponseMap(workerId);
workerMultiplexer.putRequest(requestBytes);
return workerMultiplexer.getResponse(workerId);
} catch (Exception e) {
e.printStackTrace();
Expand Down

0 comments on commit 33afd6b

Please sign in to comment.