Skip to content

Commit

Permalink
Merge pull request #39 from rchakra3/parallel_responses
Browse files Browse the repository at this point in the history
Smooth Multi-client handling now supported
  • Loading branch information
rohancme committed May 1, 2015
2 parents 675764b + 045a983 commit aed3c73
Show file tree
Hide file tree
Showing 11 changed files with 231 additions and 79 deletions.
52 changes: 52 additions & 0 deletions src/edu/tomr/client/ClientRunner.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package edu.tomr.client;

import network.Connection;
import network.requests.NWRequest;
import network.responses.NWResponse;
import edu.tomr.protocol.ClientInfo;
import edu.tomr.protocol.ClientRequestPayload;
import edu.tomr.protocol.ClientRequestType;
import edu.tomr.protocol.ClientServiceMessage;
import edu.tomr.protocol.DBMessage;
import edu.tomr.utils.Constants;

public class ClientRunner implements Runnable {

private String serverIP;
private int lbport;
private int servicerPort;
private String message;
private ClientRequestType requestType=null;
private int id;
private String selfIP;

@Override
public void run() {
// TODO Auto-generated method stub
ClientServiceMessage serviceMessage = getServiceMessage();
Connection nodeConnection=new Connection(serviceMessage.getServiceIPAddress(),servicerPort);
DBMessage query=new DBMessage(requestType, new ClientRequestPayload("File-"+id, message.getBytes()), new ClientInfo(selfIP), serviceMessage.getPayloadID());
NWRequest request=new NWRequest(serviceMessage.getPayloadID(),query);
nodeConnection.send_request(request);
//this is block wait method
NWResponse response=nodeConnection.getnextResponse();
Constants.globalLog.debug(response.getAckMsg().toString());
}

public ClientRunner(String serverIP,int lbport,int servicerPort,String message,ClientRequestType requestType,int id,String selfIP){
this.serverIP=serverIP;
this.lbport=lbport;
this.servicerPort=servicerPort;
this.message=message;
this.requestType=requestType;
this.id=id;
this.selfIP=selfIP;
}

private ClientServiceMessage getServiceMessage() {
Connection lbConnection=new Connection(serverIP,lbport);
NWResponse response=lbConnection.getnextResponse();
return response.getClientServiceMsg();
}

}
35 changes: 22 additions & 13 deletions src/edu/tomr/client/InitializeClient.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package edu.tomr.client;

import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import network.Connection;
import network.NetworkUtilities;
Expand Down Expand Up @@ -47,20 +49,23 @@ private static void generateRequests(ClientRequestType requestType, int requestL
// TODO Auto-generated catch block
e.printStackTrace();
}

String randomString = generateString(requestLength);
ExecutorService executor = Executors.newFixedThreadPool(10);
for(int i=1; i<=numOfRequests; i++){
ClientServiceMessage serviceMessage = getServiceMessage();
Connection nodeConnection=new Connection(serviceMessage.getServiceIPAddress(),servicerNodePort);
String randomString = generateString(requestLength);
DBMessage query=new DBMessage(requestType, new ClientRequestPayload("File-"+i, randomString.getBytes()), new ClientInfo(utils.getSelfIP()), serviceMessage.getPayloadID());
NWRequest request=new NWRequest(serviceMessage.getPayloadID(),query);
nodeConnection.send_request(request);
//ClientServiceMessage serviceMessage = getServiceMessage();
//Connection nodeConnection=new Connection(serviceMessage.getServiceIPAddress(),servicerNodePort);

//DBMessage query=new DBMessage(requestType, new ClientRequestPayload("File-"+i, randomString.getBytes()), new ClientInfo(utils.getSelfIP()), serviceMessage.getPayloadID());
//NWRequest request=new NWRequest(serviceMessage.getPayloadID(),query);
//nodeConnection.send_request(request);
//this is block wait method
NWResponse response=nodeConnection.getnextResponse();

Constants.globalLog.debug(response.getAckMsg().toString());

//NWResponse response=nodeConnection.getnextResponse();
Runnable worker = new ClientRunner(serverIP,lbPort,servicerNodePort,randomString,requestType,i,utils.getSelfIP());
executor.execute(worker);
}
executor.shutdown();
while (!executor.isTerminated());
Constants.globalLog.debug("All client threads finished executing");
}

private static String generateString(int requestLength) {
Expand All @@ -76,7 +81,11 @@ private static String generateString(int requestLength) {
}

public static void main(String[] args) {

generateRequests(ClientRequestType.ADD, 50, 5);
serverIP = ConfigParams.getProperty("LB_IP");
long startTime = System.nanoTime();
//generateRequests(ClientRequestType.ADD, 1000, 2000);
generateRequests(ClientRequestType.GET, 50, 2000);
long endTime = System.nanoTime();
System.out.println("Took "+((endTime - startTime)/1000000) + " ms");
}
}
33 changes: 18 additions & 15 deletions src/edu/tomr/loadbalancer/ClientConnectionHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
public class ClientConnectionHandler implements Runnable {
private Socket clientSocket;
private UUID clientUID;
private static int turnOf = 0;
private static volatile Integer turnOf = 0;

static{
//ConfigParams.loadProperties();
Expand Down Expand Up @@ -62,23 +62,26 @@ public void run() {



private synchronized String getIPAddress() {
private String getIPAddress() {
String IPAddress = null;
try{

if(turnOf > ConfigParams.getIpAddresses().size() - 1){
turnOf = 0;
}
IPAddress = ConfigParams.getIpAddresses().get(turnOf);
turnOf++;
}
catch(Exception e){
Constants.globalLog.debug("Error while trying to access the IP Addresses for scheduling");
e.printStackTrace();

synchronized(turnOf){
try{

if(turnOf > ConfigParams.getIpAddresses().size() - 1){
turnOf = 0;
}
IPAddress = ConfigParams.getIpAddresses().get(turnOf);
turnOf++;
}
catch(Exception e){
Constants.globalLog.debug("Error while trying to access the IP Addresses for scheduling");
e.printStackTrace();

}
}
return IPAddress;

}
}


}
20 changes: 11 additions & 9 deletions src/edu/tomr/queue/ClientQueueProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,17 @@ public void run() {
}


public void handleMessage(DBMessage message) {
String ipAddress = ConsistentHashing.getNode(message.getPayload().getKey());

if(ipAddress.equalsIgnoreCase(parentNode.getSelfAddress())) {
parentNode.getRequestMapper().put(message.getRequestId(), message.getClientInfo().getIpAddress());
parentNode.handleRequest(message, parentNode.getSelfAddress());
} else {
parentNode.getRequestMapper().put(message.getRequestId(), message.getClientInfo().getIpAddress());
parentNode.getNetworkModule().sendOutgoingRequest(message, ipAddress);
public void handleMessage(DBMessage message) {
if(message!=null && message.getPayload()!=null && message.getPayload().getKey()!=null) {
String ipAddress = ConsistentHashing.getNode(message.getPayload().getKey());

if(ipAddress.equalsIgnoreCase(parentNode.getSelfAddress())) {
parentNode.getRequestMapper().put(message.getRequestId(), message.getClientInfo().getIpAddress());
parentNode.handleRequest(message, parentNode.getSelfAddress());
} else {
parentNode.getRequestMapper().put(message.getRequestId(), message.getClientInfo().getIpAddress());
parentNode.getNetworkModule().sendOutgoingRequest(message, ipAddress);
}
}
}

Expand Down
7 changes: 1 addition & 6 deletions src/edu/tomr/queue/NodeQueueProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,11 @@ public void run() {
boolean running = true;

while(running) {
try {
Thread.sleep(50);

if(!queue.isEmpty()){
DBMessage msg = (DBMessage)queue.dequeueMessage();
handleMessage(msg);
}
} catch (InterruptedException e) {
running = false;
e.printStackTrace();
}
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/edu/tomr/start/NodeStarter.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ private void startBeatClient() {

public static void main(String[] args) {


System.setProperty("java.net.preferIPv4Stack" , "true");

NodeStarter nodeStarter = new NodeStarter();
//nodeStarter.startBeatClient();

Expand Down
43 changes: 10 additions & 33 deletions src/network/NodeNetworkModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import network.requests.incoming.NodeClientRequestHandler;
import network.requests.incoming.StartupMessageHandler;
import network.requests.outgoing.NodeNeighborModule;
import network.responses.ClientResponseWrapper;
import network.responses.NWResponse;
import network.responses.incoming.NetworkResponseHandler;
import network.responses.outgoing.NodeResponseModule;
Expand Down Expand Up @@ -148,15 +149,20 @@ public void sendOutgoingNWResponse(NWResponse response){
//DUMMY-Waiting for ClientResponse Class
public void sendOutgoingClientResponse(AckMessage message, String clientIPAddress){
NWResponse response=new NWResponse(message);
Socket clientSocket=clientConnectionList.get(clientIPAddress);
sendResponse(clientSocket,response);
//Socket clientSocket=clientConnectionList.get(clientIPAddress);
Socket clientSocket=clientConnectionList.get(message.getRequestIdServiced());
ClientResponseWrapper clientResponse=new ClientResponseWrapper(response,clientSocket);
this.responseModule.insertOutgoingClientResponse(clientResponse);
//remove this entry from the list of clients. The socket has been closed
clientConnectionList.remove(clientIPAddress);
/*sendResponse(clientSocket,response);
try {
clientSocket.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
Constants.globalLog.error("Couldn't close the Client Socket");
}
}*/


}
Expand All @@ -170,36 +176,7 @@ private void constructorCommon(){

}

public void sendResponse(Socket socket,NWResponse response) {
// TODO Auto-generated method stub
ObjectMapper mapper = new ObjectMapper();
mapper.configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false);
DataOutputStream output_stream=null;
try {
output_stream= new DataOutputStream(socket.getOutputStream());
} catch (IOException e) {
Constants.globalLog.debug("Unable to open output stream to host:"+socket.getInetAddress());
e.printStackTrace();
return;
}

try {
//mapper.writeValue(System.out, request);
mapper.writeValue(output_stream, response);
//end of message marker.
output_stream.writeChar('\n');
output_stream.flush();
} catch (JsonGenerationException e) {
Constants.globalLog.debug("Problem Generating JSON");
e.printStackTrace();
} catch (JsonMappingException e) {
Constants.globalLog.debug("Problem with JSON mapping");
e.printStackTrace();
} catch (IOException e) {
Constants.globalLog.debug("Problem with IO with host:"+socket.getInetAddress());
e.printStackTrace();
}
}


private NWRequest getStartUpRequest(int startUpMsgPort) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public NodeClientRequestServicer(Socket socket,Node node, ConcurrentHashMap<Stri
private void handleRequest(NWRequest request){
//currently clients are only sending DBMessage. Don't see this changing
DBMessage msg=request.getdBMessage();
clientConnectionList.put(msg.getClientInfo().getIpAddress(), mySocket);
clientConnectionList.put(msg.getRequestId(), mySocket);
myNode.handleRequest(msg);
}

Expand Down
23 changes: 23 additions & 0 deletions src/network/responses/ClientResponseWrapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package network.responses;

import java.net.Socket;

public class ClientResponseWrapper {

private NWResponse response=null;
private Socket socket=null;

public ClientResponseWrapper(NWResponse response,Socket socket){
this.response=response;
this.socket=socket;
}

public NWResponse getResponse() {
return response;
}

public Socket getSocket() {
return socket;
}

}
15 changes: 13 additions & 2 deletions src/network/responses/outgoing/NodeResponseModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
import java.util.concurrent.ConcurrentLinkedQueue;

import network.outgoing.NeighborConnection;
import network.responses.ClientResponseWrapper;
import network.responses.NWResponse;
//handles all outgoing response messages from this node
public class NodeResponseModule {

//this might need to be a copy-on-write later when the neighborConnections become dynamic
private ArrayList<NeighborConnection> outgoingNeighborConnections=null;
private ConcurrentLinkedQueue<NWResponse> outgoingResponseQueue=null;
private ConcurrentLinkedQueue<ClientResponseWrapper> outgoingClientResponseQueue=null;

public ArrayList<NeighborConnection> getOutgoingNeighborConnections() {
return outgoingNeighborConnections;
Expand All @@ -20,12 +22,17 @@ public ArrayList<NeighborConnection> getOutgoingNeighborConnections() {
public NodeResponseModule(List<String> neighborList,int neighborResponseServerPort){

this.outgoingNeighborConnections=getConnectionList(neighborList,neighborResponseServerPort);
outgoingClientResponseQueue=new ConcurrentLinkedQueue<ClientResponseWrapper>();
initializeQueue();
}

public void startServicingResponses(){
Thread servicer=new Thread(new OutgoingResponseServicer(outgoingResponseQueue,outgoingNeighborConnections));
servicer.start();
Thread nwResponseServicer=new Thread(new OutgoingResponseServicer(outgoingResponseQueue,outgoingNeighborConnections));
nwResponseServicer.start();

//Starts the thread servicing client responses
Thread clientResponseServicer=new Thread(new OutgoingClientResponseServicer(outgoingClientResponseQueue));
clientResponseServicer.start();
}

private ArrayList<NeighborConnection> getConnectionList(List<String> neighborList,int neighborServerPort) {
Expand All @@ -43,6 +50,10 @@ public void insertOutgoingNWResponse(NWResponse response){
outgoingResponseQueue.add(response);
}

public void insertOutgoingClientResponse(ClientResponseWrapper response){
outgoingClientResponseQueue.add(response);
}

private void initializeQueue(){
this.outgoingResponseQueue=new ConcurrentLinkedQueue<NWResponse>();
}
Expand Down
Loading

0 comments on commit aed3c73

Please sign in to comment.