From ecc8854026a6976c38270ec7b69fe2303e5b0be1 Mon Sep 17 00:00:00 2001 From: Bor Kae Hwang Date: Wed, 12 Dec 2018 14:05:43 -0700 Subject: [PATCH] Fix wrong semaphore address --- .../build/lib/worker/WorkerMultiplexer.java | 41 +++++++++++++++---- .../build/lib/worker/WorkerProxy.java | 2 +- 2 files changed, 33 insertions(+), 10 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java index 7145c5ca084841..015395c1b02505 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java @@ -37,8 +37,9 @@ public class WorkerMultiplexer extends Thread { private static Semaphore semInstanceMap = new Semaphore(1); private static Map instanceMap = new HashMap<>(); private Map responseMap; - private Map semResponseNotEmpty; + private Map responseChecker; private Semaphore semResponseMap; + private Semaphore semResponseChecker; private Semaphore semAccessProcess; private Subprocess process; @@ -49,7 +50,8 @@ public class WorkerMultiplexer extends Thread { WorkerMultiplexer(Integer workerHash) { semAccessProcess = new Semaphore(1); semResponseMap = new Semaphore(1); - semResponseNotEmpty = new HashMap<>(); + semResponseChecker = new Semaphore(1); + responseChecker = new HashMap<>(); responseMap = new HashMap<>(); this.workerHash = workerHash; @@ -171,12 +173,19 @@ public synchronized void putRequest(byte[] request) throws IOException { stdin.flush(); } - public InputStream getResponse(int workerId) throws Exception { + public InputStream getResponse(int workerId) throws InterruptedException { + Semaphore waitForResponse; try { - semResponseNotEmpty.get(workerId).acquire(); + semResponseChecker.acquire(); + waitForResponse = responseChecker.get(workerId); } catch (InterruptedException e) { throw e; + } finally { + semResponseChecker.release(); } + + waitForResponse.acquire(); + try { semResponseMap.acquire(); InputStream response = responseMap.get(workerId); @@ -188,8 +197,15 @@ public InputStream getResponse(int workerId) throws Exception { } } - public synchronized void setResponseMap(int workerId) { - semResponseNotEmpty.put(workerId, new Semaphore(0)); + public void setResponseMap(int workerId) throws InterruptedException { + try { + semResponseChecker.acquire(); + responseChecker.put(workerId, new Semaphore(0)); + } catch (InterruptedException e) { + throw e; + } finally { + semResponseChecker.release(); + } } public void waitRequest() throws InterruptedException, IOException { @@ -211,12 +227,20 @@ public void waitRequest() throws InterruptedException, IOException { try { semResponseMap.acquire(); responseMap.put(workerId, new ByteArrayInputStream(tempOs.toByteArray())); - semResponseNotEmpty.get(workerId).release(); } catch (InterruptedException e) { throw e; } finally { semResponseMap.release(); } + + try { + semResponseChecker.acquire(); + responseChecker.get(workerId).release(); + } catch (InterruptedException e) { + throw e; + } finally { + semResponseChecker.release(); + } } public void run() { @@ -224,8 +248,7 @@ public void run() { try { waitRequest(); } catch (Exception e) { - System.out.println("Receiver killed"); - // Terminate the thread + e.printStackTrace(); } } } diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerProxy.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerProxy.java index 6c010a0f6ac3e1..3bcc915645307e 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/WorkerProxy.java +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerProxy.java @@ -86,8 +86,8 @@ InputStream getInputStream() { byte[] requestBytes = request.toByteArray(); request.reset(); try { - workerMultiplexer.putRequest(requestBytes); workerMultiplexer.setResponseMap(workerId); + workerMultiplexer.putRequest(requestBytes); return workerMultiplexer.getResponse(workerId); } catch (Exception e) { e.printStackTrace();