From 251ddae38c62126d1f81b64f413b4eb72f2caff6 Mon Sep 17 00:00:00 2001 From: Rohan Chakravarthy Date: Fri, 1 May 2015 00:50:00 -0400 Subject: [PATCH 1/9] Added a Client Response Handler and a Client for Client Responses --- .../responses/ClientResponseWrapper.java | 23 +++++++++++++++++++ .../outgoing/NodeResponseModule.java | 7 ++++++ 2 files changed, 30 insertions(+) create mode 100644 src/network/responses/ClientResponseWrapper.java diff --git a/src/network/responses/ClientResponseWrapper.java b/src/network/responses/ClientResponseWrapper.java new file mode 100644 index 0000000..a3846be --- /dev/null +++ b/src/network/responses/ClientResponseWrapper.java @@ -0,0 +1,23 @@ +package network.responses; + +import java.net.Socket; + +public class ClientResponseWrapper { + + private NWResponse response=null; + private Socket socket=null; + + public ClientResponseWrapper(NWResponse response,Socket socket){ + this.response=response; + this.socket=socket; + } + + public NWResponse getResponse() { + return response; + } + + public Socket getSocket() { + return socket; + } + +} diff --git a/src/network/responses/outgoing/NodeResponseModule.java b/src/network/responses/outgoing/NodeResponseModule.java index 4de350a..58a6724 100644 --- a/src/network/responses/outgoing/NodeResponseModule.java +++ b/src/network/responses/outgoing/NodeResponseModule.java @@ -5,6 +5,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import network.outgoing.NeighborConnection; +import network.responses.ClientResponseWrapper; import network.responses.NWResponse; //handles all outgoing response messages from this node public class NodeResponseModule { @@ -12,6 +13,7 @@ public class NodeResponseModule { //this might need to be a copy-on-write later when the neighborConnections become dynamic private ArrayList outgoingNeighborConnections=null; private ConcurrentLinkedQueue outgoingResponseQueue=null; + private ConcurrentLinkedQueue outgoingClientResponseQueue=null; public ArrayList getOutgoingNeighborConnections() { return outgoingNeighborConnections; @@ -20,6 +22,7 @@ public ArrayList getOutgoingNeighborConnections() { public NodeResponseModule(List neighborList,int neighborResponseServerPort){ this.outgoingNeighborConnections=getConnectionList(neighborList,neighborResponseServerPort); + outgoingClientResponseQueue=new ConcurrentLinkedQueue(); initializeQueue(); } @@ -43,6 +46,10 @@ public void insertOutgoingNWResponse(NWResponse response){ outgoingResponseQueue.add(response); } + public void insertOutgoingClientResponse(ClientResponseWrapper response){ + + } + private void initializeQueue(){ this.outgoingResponseQueue=new ConcurrentLinkedQueue(); } From 4cf1be7400aee73ce5c431b3f84d2d77ebef1342 Mon Sep 17 00:00:00 2001 From: Rohan Chakravarthy Date: Fri, 1 May 2015 01:04:35 -0400 Subject: [PATCH 2/9] Changes to switch from seqential client responses to a background client response queue --- src/network/NodeNetworkModule.java | 40 ++-------- .../outgoing/NodeResponseModule.java | 10 ++- .../OutgoingClientResponseServicer.java | 77 +++++++++++++++++++ 3 files changed, 92 insertions(+), 35 deletions(-) create mode 100644 src/network/responses/outgoing/OutgoingClientResponseServicer.java diff --git a/src/network/NodeNetworkModule.java b/src/network/NodeNetworkModule.java index 1292667..dfc43ec 100644 --- a/src/network/NodeNetworkModule.java +++ b/src/network/NodeNetworkModule.java @@ -12,6 +12,7 @@ import network.requests.incoming.NodeClientRequestHandler; import network.requests.incoming.StartupMessageHandler; import network.requests.outgoing.NodeNeighborModule; +import network.responses.ClientResponseWrapper; import network.responses.NWResponse; import network.responses.incoming.NetworkResponseHandler; import network.responses.outgoing.NodeResponseModule; @@ -149,14 +150,18 @@ public void sendOutgoingNWResponse(NWResponse response){ public void sendOutgoingClientResponse(AckMessage message, String clientIPAddress){ NWResponse response=new NWResponse(message); Socket clientSocket=clientConnectionList.get(clientIPAddress); - sendResponse(clientSocket,response); + ClientResponseWrapper clientResponse=new ClientResponseWrapper(response,clientSocket); + this.responseModule.insertOutgoingClientResponse(clientResponse); + //remove this entry from the list of clients. The socket has been closed + clientConnectionList.remove(clientIPAddress); + /*sendResponse(clientSocket,response); try { clientSocket.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); Constants.globalLog.error("Couldn't close the Client Socket"); - } + }*/ } @@ -170,36 +175,7 @@ private void constructorCommon(){ } - public void sendResponse(Socket socket,NWResponse response) { - // TODO Auto-generated method stub - ObjectMapper mapper = new ObjectMapper(); - mapper.configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false); - DataOutputStream output_stream=null; - try { - output_stream= new DataOutputStream(socket.getOutputStream()); - } catch (IOException e) { - Constants.globalLog.debug("Unable to open output stream to host:"+socket.getInetAddress()); - e.printStackTrace(); - return; - } - - try { - //mapper.writeValue(System.out, request); - mapper.writeValue(output_stream, response); - //end of message marker. - output_stream.writeChar('\n'); - output_stream.flush(); - } catch (JsonGenerationException e) { - Constants.globalLog.debug("Problem Generating JSON"); - e.printStackTrace(); - } catch (JsonMappingException e) { - Constants.globalLog.debug("Problem with JSON mapping"); - e.printStackTrace(); - } catch (IOException e) { - Constants.globalLog.debug("Problem with IO with host:"+socket.getInetAddress()); - e.printStackTrace(); - } - } + private NWRequest getStartUpRequest(int startUpMsgPort) { diff --git a/src/network/responses/outgoing/NodeResponseModule.java b/src/network/responses/outgoing/NodeResponseModule.java index 58a6724..0761a9d 100644 --- a/src/network/responses/outgoing/NodeResponseModule.java +++ b/src/network/responses/outgoing/NodeResponseModule.java @@ -27,8 +27,12 @@ public NodeResponseModule(List neighborList,int neighborResponseServerPo } public void startServicingResponses(){ - Thread servicer=new Thread(new OutgoingResponseServicer(outgoingResponseQueue,outgoingNeighborConnections)); - servicer.start(); + Thread nwResponseServicer=new Thread(new OutgoingResponseServicer(outgoingResponseQueue,outgoingNeighborConnections)); + nwResponseServicer.start(); + + //Starts the thread servicing client responses + Thread clientResponseServicer=new Thread(new OutgoingClientResponseServicer(outgoingClientResponseQueue)); + clientResponseServicer.start(); } private ArrayList getConnectionList(List neighborList,int neighborServerPort) { @@ -47,7 +51,7 @@ public void insertOutgoingNWResponse(NWResponse response){ } public void insertOutgoingClientResponse(ClientResponseWrapper response){ - + outgoingClientResponseQueue.add(response); } private void initializeQueue(){ diff --git a/src/network/responses/outgoing/OutgoingClientResponseServicer.java b/src/network/responses/outgoing/OutgoingClientResponseServicer.java new file mode 100644 index 0000000..e6b49da --- /dev/null +++ b/src/network/responses/outgoing/OutgoingClientResponseServicer.java @@ -0,0 +1,77 @@ +package network.responses.outgoing; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.Socket; +import java.util.concurrent.ConcurrentLinkedQueue; + +import org.codehaus.jackson.JsonGenerationException; +import org.codehaus.jackson.JsonGenerator; +import org.codehaus.jackson.map.JsonMappingException; +import org.codehaus.jackson.map.ObjectMapper; + +import edu.tomr.utils.Constants; +import network.responses.ClientResponseWrapper; +import network.responses.NWResponse; + +public class OutgoingClientResponseServicer implements Runnable { + + private ConcurrentLinkedQueue responseQueue=null; + + @Override + public void run() { + // TODO Auto-generated method stub + while(true){ + if(!responseQueue.isEmpty()){ + ClientResponseWrapper responseWrapper=responseQueue.poll(); + sendResponse(responseWrapper.getSocket(),responseWrapper.getResponse()); + } + } + } + + public OutgoingClientResponseServicer(ConcurrentLinkedQueue responseQueue){ + this.responseQueue=responseQueue; + } + + private void sendResponse(Socket socket,NWResponse response) { + // TODO Auto-generated method stub + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false); + DataOutputStream output_stream=null; + try { + output_stream= new DataOutputStream(socket.getOutputStream()); + } catch (IOException e) { + Constants.globalLog.debug("Unable to open output stream to host:"+socket.getInetAddress()); + e.printStackTrace(); + return; + } + + try { + //mapper.writeValue(System.out, request); + mapper.writeValue(output_stream, response); + //end of message marker. + output_stream.writeChar('\n'); + output_stream.flush(); + } catch (JsonGenerationException e) { + Constants.globalLog.debug("Problem Generating JSON"); + e.printStackTrace(); + } catch (JsonMappingException e) { + Constants.globalLog.debug("Problem with JSON mapping"); + e.printStackTrace(); + } catch (IOException e) { + Constants.globalLog.debug("Problem with IO with host:"+socket.getInetAddress()); + e.printStackTrace(); + } + + + //close the connection + try { + socket.close(); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + Constants.globalLog.error("Couldn't close the Client Socket"); + } + } + +} From b5e89dbac53236f0ea1ba43c0ea3d688908762ea Mon Sep 17 00:00:00 2001 From: Rohan Chakravarthy Date: Fri, 1 May 2015 00:50:00 -0400 Subject: [PATCH 3/9] Added a Client Response Handler and a Client for Client Responses --- .../responses/ClientResponseWrapper.java | 23 +++++++++++++++++++ .../outgoing/NodeResponseModule.java | 7 ++++++ 2 files changed, 30 insertions(+) create mode 100644 src/network/responses/ClientResponseWrapper.java diff --git a/src/network/responses/ClientResponseWrapper.java b/src/network/responses/ClientResponseWrapper.java new file mode 100644 index 0000000..a3846be --- /dev/null +++ b/src/network/responses/ClientResponseWrapper.java @@ -0,0 +1,23 @@ +package network.responses; + +import java.net.Socket; + +public class ClientResponseWrapper { + + private NWResponse response=null; + private Socket socket=null; + + public ClientResponseWrapper(NWResponse response,Socket socket){ + this.response=response; + this.socket=socket; + } + + public NWResponse getResponse() { + return response; + } + + public Socket getSocket() { + return socket; + } + +} diff --git a/src/network/responses/outgoing/NodeResponseModule.java b/src/network/responses/outgoing/NodeResponseModule.java index 4de350a..58a6724 100644 --- a/src/network/responses/outgoing/NodeResponseModule.java +++ b/src/network/responses/outgoing/NodeResponseModule.java @@ -5,6 +5,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import network.outgoing.NeighborConnection; +import network.responses.ClientResponseWrapper; import network.responses.NWResponse; //handles all outgoing response messages from this node public class NodeResponseModule { @@ -12,6 +13,7 @@ public class NodeResponseModule { //this might need to be a copy-on-write later when the neighborConnections become dynamic private ArrayList outgoingNeighborConnections=null; private ConcurrentLinkedQueue outgoingResponseQueue=null; + private ConcurrentLinkedQueue outgoingClientResponseQueue=null; public ArrayList getOutgoingNeighborConnections() { return outgoingNeighborConnections; @@ -20,6 +22,7 @@ public ArrayList getOutgoingNeighborConnections() { public NodeResponseModule(List neighborList,int neighborResponseServerPort){ this.outgoingNeighborConnections=getConnectionList(neighborList,neighborResponseServerPort); + outgoingClientResponseQueue=new ConcurrentLinkedQueue(); initializeQueue(); } @@ -43,6 +46,10 @@ public void insertOutgoingNWResponse(NWResponse response){ outgoingResponseQueue.add(response); } + public void insertOutgoingClientResponse(ClientResponseWrapper response){ + + } + private void initializeQueue(){ this.outgoingResponseQueue=new ConcurrentLinkedQueue(); } From ec40d8f8d549b24559f479b002b00f8258205ae9 Mon Sep 17 00:00:00 2001 From: Rohan Chakravarthy Date: Fri, 1 May 2015 01:04:35 -0400 Subject: [PATCH 4/9] Changes to switch from seqential client responses to a background client response queue --- src/network/NodeNetworkModule.java | 40 ++-------- .../outgoing/NodeResponseModule.java | 10 ++- .../OutgoingClientResponseServicer.java | 77 +++++++++++++++++++ 3 files changed, 92 insertions(+), 35 deletions(-) create mode 100644 src/network/responses/outgoing/OutgoingClientResponseServicer.java diff --git a/src/network/NodeNetworkModule.java b/src/network/NodeNetworkModule.java index 1292667..dfc43ec 100644 --- a/src/network/NodeNetworkModule.java +++ b/src/network/NodeNetworkModule.java @@ -12,6 +12,7 @@ import network.requests.incoming.NodeClientRequestHandler; import network.requests.incoming.StartupMessageHandler; import network.requests.outgoing.NodeNeighborModule; +import network.responses.ClientResponseWrapper; import network.responses.NWResponse; import network.responses.incoming.NetworkResponseHandler; import network.responses.outgoing.NodeResponseModule; @@ -149,14 +150,18 @@ public void sendOutgoingNWResponse(NWResponse response){ public void sendOutgoingClientResponse(AckMessage message, String clientIPAddress){ NWResponse response=new NWResponse(message); Socket clientSocket=clientConnectionList.get(clientIPAddress); - sendResponse(clientSocket,response); + ClientResponseWrapper clientResponse=new ClientResponseWrapper(response,clientSocket); + this.responseModule.insertOutgoingClientResponse(clientResponse); + //remove this entry from the list of clients. The socket has been closed + clientConnectionList.remove(clientIPAddress); + /*sendResponse(clientSocket,response); try { clientSocket.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); Constants.globalLog.error("Couldn't close the Client Socket"); - } + }*/ } @@ -170,36 +175,7 @@ private void constructorCommon(){ } - public void sendResponse(Socket socket,NWResponse response) { - // TODO Auto-generated method stub - ObjectMapper mapper = new ObjectMapper(); - mapper.configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false); - DataOutputStream output_stream=null; - try { - output_stream= new DataOutputStream(socket.getOutputStream()); - } catch (IOException e) { - Constants.globalLog.debug("Unable to open output stream to host:"+socket.getInetAddress()); - e.printStackTrace(); - return; - } - - try { - //mapper.writeValue(System.out, request); - mapper.writeValue(output_stream, response); - //end of message marker. - output_stream.writeChar('\n'); - output_stream.flush(); - } catch (JsonGenerationException e) { - Constants.globalLog.debug("Problem Generating JSON"); - e.printStackTrace(); - } catch (JsonMappingException e) { - Constants.globalLog.debug("Problem with JSON mapping"); - e.printStackTrace(); - } catch (IOException e) { - Constants.globalLog.debug("Problem with IO with host:"+socket.getInetAddress()); - e.printStackTrace(); - } - } + private NWRequest getStartUpRequest(int startUpMsgPort) { diff --git a/src/network/responses/outgoing/NodeResponseModule.java b/src/network/responses/outgoing/NodeResponseModule.java index 58a6724..0761a9d 100644 --- a/src/network/responses/outgoing/NodeResponseModule.java +++ b/src/network/responses/outgoing/NodeResponseModule.java @@ -27,8 +27,12 @@ public NodeResponseModule(List neighborList,int neighborResponseServerPo } public void startServicingResponses(){ - Thread servicer=new Thread(new OutgoingResponseServicer(outgoingResponseQueue,outgoingNeighborConnections)); - servicer.start(); + Thread nwResponseServicer=new Thread(new OutgoingResponseServicer(outgoingResponseQueue,outgoingNeighborConnections)); + nwResponseServicer.start(); + + //Starts the thread servicing client responses + Thread clientResponseServicer=new Thread(new OutgoingClientResponseServicer(outgoingClientResponseQueue)); + clientResponseServicer.start(); } private ArrayList getConnectionList(List neighborList,int neighborServerPort) { @@ -47,7 +51,7 @@ public void insertOutgoingNWResponse(NWResponse response){ } public void insertOutgoingClientResponse(ClientResponseWrapper response){ - + outgoingClientResponseQueue.add(response); } private void initializeQueue(){ diff --git a/src/network/responses/outgoing/OutgoingClientResponseServicer.java b/src/network/responses/outgoing/OutgoingClientResponseServicer.java new file mode 100644 index 0000000..e6b49da --- /dev/null +++ b/src/network/responses/outgoing/OutgoingClientResponseServicer.java @@ -0,0 +1,77 @@ +package network.responses.outgoing; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.Socket; +import java.util.concurrent.ConcurrentLinkedQueue; + +import org.codehaus.jackson.JsonGenerationException; +import org.codehaus.jackson.JsonGenerator; +import org.codehaus.jackson.map.JsonMappingException; +import org.codehaus.jackson.map.ObjectMapper; + +import edu.tomr.utils.Constants; +import network.responses.ClientResponseWrapper; +import network.responses.NWResponse; + +public class OutgoingClientResponseServicer implements Runnable { + + private ConcurrentLinkedQueue responseQueue=null; + + @Override + public void run() { + // TODO Auto-generated method stub + while(true){ + if(!responseQueue.isEmpty()){ + ClientResponseWrapper responseWrapper=responseQueue.poll(); + sendResponse(responseWrapper.getSocket(),responseWrapper.getResponse()); + } + } + } + + public OutgoingClientResponseServicer(ConcurrentLinkedQueue responseQueue){ + this.responseQueue=responseQueue; + } + + private void sendResponse(Socket socket,NWResponse response) { + // TODO Auto-generated method stub + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false); + DataOutputStream output_stream=null; + try { + output_stream= new DataOutputStream(socket.getOutputStream()); + } catch (IOException e) { + Constants.globalLog.debug("Unable to open output stream to host:"+socket.getInetAddress()); + e.printStackTrace(); + return; + } + + try { + //mapper.writeValue(System.out, request); + mapper.writeValue(output_stream, response); + //end of message marker. + output_stream.writeChar('\n'); + output_stream.flush(); + } catch (JsonGenerationException e) { + Constants.globalLog.debug("Problem Generating JSON"); + e.printStackTrace(); + } catch (JsonMappingException e) { + Constants.globalLog.debug("Problem with JSON mapping"); + e.printStackTrace(); + } catch (IOException e) { + Constants.globalLog.debug("Problem with IO with host:"+socket.getInetAddress()); + e.printStackTrace(); + } + + + //close the connection + try { + socket.close(); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + Constants.globalLog.error("Couldn't close the Client Socket"); + } + } + +} From af7283fc6807c3a748db3d3edd2ee5c7ba0c8180 Mon Sep 17 00:00:00 2001 From: Rohan Chakravarthy Date: Fri, 1 May 2015 06:22:41 -0400 Subject: [PATCH 5/9] Code for multiclients --- src/edu/tomr/client/ClientRunner.java | 52 +++++++++++++++++++++++ src/edu/tomr/client/InitializeClient.java | 27 +++++++----- 2 files changed, 68 insertions(+), 11 deletions(-) create mode 100644 src/edu/tomr/client/ClientRunner.java diff --git a/src/edu/tomr/client/ClientRunner.java b/src/edu/tomr/client/ClientRunner.java new file mode 100644 index 0000000..0376efd --- /dev/null +++ b/src/edu/tomr/client/ClientRunner.java @@ -0,0 +1,52 @@ +package edu.tomr.client; + +import network.Connection; +import network.requests.NWRequest; +import network.responses.NWResponse; +import edu.tomr.protocol.ClientInfo; +import edu.tomr.protocol.ClientRequestPayload; +import edu.tomr.protocol.ClientRequestType; +import edu.tomr.protocol.ClientServiceMessage; +import edu.tomr.protocol.DBMessage; +import edu.tomr.utils.Constants; + +public class ClientRunner implements Runnable { + + private String serverIP; + private int lbport; + private int servicerPort; + private String message; + private ClientRequestType requestType=null; + private int id; + private String selfIP; + + @Override + public void run() { + // TODO Auto-generated method stub + ClientServiceMessage serviceMessage = getServiceMessage(); + Connection nodeConnection=new Connection(serviceMessage.getServiceIPAddress(),servicerPort); + DBMessage query=new DBMessage(requestType, new ClientRequestPayload("File-"+id, message.getBytes()), new ClientInfo(selfIP), serviceMessage.getPayloadID()); + NWRequest request=new NWRequest(serviceMessage.getPayloadID(),query); + nodeConnection.send_request(request); + //this is block wait method + NWResponse response=nodeConnection.getnextResponse(); + Constants.globalLog.debug(response.getAckMsg().toString()); + } + + public ClientRunner(String serverIP,int lbport,int servicerPort,String message,ClientRequestType requestType,int id,String selfIP){ + this.serverIP=serverIP; + this.lbport=lbport; + this.servicerPort=servicerPort; + this.message=message; + this.requestType=requestType; + this.id=id; + this.selfIP=selfIP; + } + + private ClientServiceMessage getServiceMessage() { + Connection lbConnection=new Connection(serverIP,lbport); + NWResponse response=lbConnection.getnextResponse(); + return response.getClientServiceMsg(); + } + +} diff --git a/src/edu/tomr/client/InitializeClient.java b/src/edu/tomr/client/InitializeClient.java index 6bcff1c..ffe3766 100644 --- a/src/edu/tomr/client/InitializeClient.java +++ b/src/edu/tomr/client/InitializeClient.java @@ -1,6 +1,8 @@ package edu.tomr.client; import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import network.Connection; import network.NetworkUtilities; @@ -47,20 +49,23 @@ private static void generateRequests(ClientRequestType requestType, int requestL // TODO Auto-generated catch block e.printStackTrace(); } - + String randomString = generateString(requestLength); + ExecutorService executor = Executors.newFixedThreadPool(5); for(int i=1; i<=numOfRequests; i++){ - ClientServiceMessage serviceMessage = getServiceMessage(); - Connection nodeConnection=new Connection(serviceMessage.getServiceIPAddress(),servicerNodePort); - String randomString = generateString(requestLength); - DBMessage query=new DBMessage(requestType, new ClientRequestPayload("File-"+i, randomString.getBytes()), new ClientInfo(utils.getSelfIP()), serviceMessage.getPayloadID()); - NWRequest request=new NWRequest(serviceMessage.getPayloadID(),query); - nodeConnection.send_request(request); + //ClientServiceMessage serviceMessage = getServiceMessage(); + //Connection nodeConnection=new Connection(serviceMessage.getServiceIPAddress(),servicerNodePort); + + //DBMessage query=new DBMessage(requestType, new ClientRequestPayload("File-"+i, randomString.getBytes()), new ClientInfo(utils.getSelfIP()), serviceMessage.getPayloadID()); + //NWRequest request=new NWRequest(serviceMessage.getPayloadID(),query); + //nodeConnection.send_request(request); //this is block wait method - NWResponse response=nodeConnection.getnextResponse(); - - Constants.globalLog.debug(response.getAckMsg().toString()); - + //NWResponse response=nodeConnection.getnextResponse(); + Runnable worker = new ClientRunner(serverIP,lbPort,servicerNodePort,randomString,requestType,i,utils.getSelfIP()); + executor.execute(worker); } + executor.shutdown(); + while (!executor.isTerminated()); + Constants.globalLog.debug("All client threads finished executing"); } private static String generateString(int requestLength) { From c999b127f6a72a41dc845f04e33e6d8e79995118 Mon Sep 17 00:00:00 2001 From: Rohan Chakravarthy Date: Fri, 1 May 2015 06:26:36 -0400 Subject: [PATCH 6/9] Changes to make requestID the key instead of client IP address --- src/network/NodeNetworkModule.java | 3 ++- src/network/requests/incoming/NodeClientRequestServicer.java | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/network/NodeNetworkModule.java b/src/network/NodeNetworkModule.java index dfc43ec..57cfbdf 100644 --- a/src/network/NodeNetworkModule.java +++ b/src/network/NodeNetworkModule.java @@ -149,7 +149,8 @@ public void sendOutgoingNWResponse(NWResponse response){ //DUMMY-Waiting for ClientResponse Class public void sendOutgoingClientResponse(AckMessage message, String clientIPAddress){ NWResponse response=new NWResponse(message); - Socket clientSocket=clientConnectionList.get(clientIPAddress); + //Socket clientSocket=clientConnectionList.get(clientIPAddress); + Socket clientSocket=clientConnectionList.get(message.getRequestIdServiced()); ClientResponseWrapper clientResponse=new ClientResponseWrapper(response,clientSocket); this.responseModule.insertOutgoingClientResponse(clientResponse); //remove this entry from the list of clients. The socket has been closed diff --git a/src/network/requests/incoming/NodeClientRequestServicer.java b/src/network/requests/incoming/NodeClientRequestServicer.java index 13aff28..cd4f74a 100644 --- a/src/network/requests/incoming/NodeClientRequestServicer.java +++ b/src/network/requests/incoming/NodeClientRequestServicer.java @@ -34,7 +34,7 @@ public NodeClientRequestServicer(Socket socket,Node node, ConcurrentHashMap Date: Fri, 1 May 2015 07:18:29 -0400 Subject: [PATCH 7/9] Debugging changes --- src/edu/tomr/queue/ClientQueueProcessor.java | 20 +++++++++++--------- src/edu/tomr/start/NodeStarter.java | 3 +++ 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/src/edu/tomr/queue/ClientQueueProcessor.java b/src/edu/tomr/queue/ClientQueueProcessor.java index 1f0089f..a2b5b06 100644 --- a/src/edu/tomr/queue/ClientQueueProcessor.java +++ b/src/edu/tomr/queue/ClientQueueProcessor.java @@ -39,15 +39,17 @@ public void run() { } - public void handleMessage(DBMessage message) { - String ipAddress = ConsistentHashing.getNode(message.getPayload().getKey()); - - if(ipAddress.equalsIgnoreCase(parentNode.getSelfAddress())) { - parentNode.getRequestMapper().put(message.getRequestId(), message.getClientInfo().getIpAddress()); - parentNode.handleRequest(message, parentNode.getSelfAddress()); - } else { - parentNode.getRequestMapper().put(message.getRequestId(), message.getClientInfo().getIpAddress()); - parentNode.getNetworkModule().sendOutgoingRequest(message, ipAddress); + public void handleMessage(DBMessage message) { + if(message!=null && message.getPayload()!=null && message.getPayload().getKey()!=null) { + String ipAddress = ConsistentHashing.getNode(message.getPayload().getKey()); + + if(ipAddress.equalsIgnoreCase(parentNode.getSelfAddress())) { + parentNode.getRequestMapper().put(message.getRequestId(), message.getClientInfo().getIpAddress()); + parentNode.handleRequest(message, parentNode.getSelfAddress()); + } else { + parentNode.getRequestMapper().put(message.getRequestId(), message.getClientInfo().getIpAddress()); + parentNode.getNetworkModule().sendOutgoingRequest(message, ipAddress); + } } } diff --git a/src/edu/tomr/start/NodeStarter.java b/src/edu/tomr/start/NodeStarter.java index a7a45db..8090b9e 100644 --- a/src/edu/tomr/start/NodeStarter.java +++ b/src/edu/tomr/start/NodeStarter.java @@ -50,6 +50,9 @@ private void startBeatClient() { public static void main(String[] args) { + + System.setProperty("java.net.preferIPv4Stack" , "true"); + NodeStarter nodeStarter = new NodeStarter(); //nodeStarter.startBeatClient(); From 6ef5666f8d17e28c72031fb1bda7a0c9cf367503 Mon Sep 17 00:00:00 2001 From: VINAY Date: Fri, 1 May 2015 07:24:12 -0400 Subject: [PATCH 8/9] volatile added --- .../loadbalancer/ClientConnectionHandler.java | 33 ++++++++++--------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/src/edu/tomr/loadbalancer/ClientConnectionHandler.java b/src/edu/tomr/loadbalancer/ClientConnectionHandler.java index 0e7ff6a..8420d7a 100644 --- a/src/edu/tomr/loadbalancer/ClientConnectionHandler.java +++ b/src/edu/tomr/loadbalancer/ClientConnectionHandler.java @@ -13,7 +13,7 @@ public class ClientConnectionHandler implements Runnable { private Socket clientSocket; private UUID clientUID; - private static int turnOf = 0; + private static volatile Integer turnOf = 0; static{ //ConfigParams.loadProperties(); @@ -62,23 +62,26 @@ public void run() { - private synchronized String getIPAddress() { + private String getIPAddress() { String IPAddress = null; - try{ - - if(turnOf > ConfigParams.getIpAddresses().size() - 1){ - turnOf = 0; - } - IPAddress = ConfigParams.getIpAddresses().get(turnOf); - turnOf++; - } - catch(Exception e){ - Constants.globalLog.debug("Error while trying to access the IP Addresses for scheduling"); - e.printStackTrace(); - + synchronized(turnOf){ + try{ + + if(turnOf > ConfigParams.getIpAddresses().size() - 1){ + turnOf = 0; + } + IPAddress = ConfigParams.getIpAddresses().get(turnOf); + turnOf++; + } + catch(Exception e){ + Constants.globalLog.debug("Error while trying to access the IP Addresses for scheduling"); + e.printStackTrace(); + + } } return IPAddress; - } + } + } From 045a983dfc6bbeb19d2739f3c1988a1d03280230 Mon Sep 17 00:00:00 2001 From: Rohan Chakravarthy Date: Fri, 1 May 2015 11:42:55 -0400 Subject: [PATCH 9/9] Small changes to the client code --- src/edu/tomr/client/InitializeClient.java | 10 +++++++--- src/edu/tomr/queue/NodeQueueProcessor.java | 7 +------ 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/src/edu/tomr/client/InitializeClient.java b/src/edu/tomr/client/InitializeClient.java index ffe3766..0985a88 100644 --- a/src/edu/tomr/client/InitializeClient.java +++ b/src/edu/tomr/client/InitializeClient.java @@ -50,7 +50,7 @@ private static void generateRequests(ClientRequestType requestType, int requestL e.printStackTrace(); } String randomString = generateString(requestLength); - ExecutorService executor = Executors.newFixedThreadPool(5); + ExecutorService executor = Executors.newFixedThreadPool(10); for(int i=1; i<=numOfRequests; i++){ //ClientServiceMessage serviceMessage = getServiceMessage(); //Connection nodeConnection=new Connection(serviceMessage.getServiceIPAddress(),servicerNodePort); @@ -81,7 +81,11 @@ private static String generateString(int requestLength) { } public static void main(String[] args) { - - generateRequests(ClientRequestType.ADD, 50, 5); + serverIP = ConfigParams.getProperty("LB_IP"); + long startTime = System.nanoTime(); + //generateRequests(ClientRequestType.ADD, 1000, 2000); + generateRequests(ClientRequestType.GET, 50, 2000); + long endTime = System.nanoTime(); + System.out.println("Took "+((endTime - startTime)/1000000) + " ms"); } } diff --git a/src/edu/tomr/queue/NodeQueueProcessor.java b/src/edu/tomr/queue/NodeQueueProcessor.java index 2d9a1d0..66c1872 100644 --- a/src/edu/tomr/queue/NodeQueueProcessor.java +++ b/src/edu/tomr/queue/NodeQueueProcessor.java @@ -27,16 +27,11 @@ public void run() { boolean running = true; while(running) { - try { - Thread.sleep(50); + if(!queue.isEmpty()){ DBMessage msg = (DBMessage)queue.dequeueMessage(); handleMessage(msg); } - } catch (InterruptedException e) { - running = false; - e.printStackTrace(); - } } }