Skip to content

Commit

Permalink
GossipRouter now uses all command line options, such as -solinger (or…
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Jan 23, 2024
1 parent a31aa6a commit e0cf7b2
Showing 1 changed file with 14 additions and 61 deletions.
75 changes: 14 additions & 61 deletions src/org/jgroups/stack/GossipRouter.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,24 +63,17 @@ public class GossipRouter extends ReceiverAdapter implements ConnectionListener,

@ManagedAttribute(description="Time (in ms) for setting SO_LINGER on sockets returned from accept(). 0 means do not set SO_LINGER"
,type=AttributeType.TIME)
protected long linger_timeout=2000L;

@ManagedAttribute(description="Time (in ms) for SO_TIMEOUT on sockets returned from accept(). 0 means don't set SO_TIMEOUT"
,type=AttributeType.TIME)
protected long sock_read_timeout;
protected int linger_timeout=-1;

protected ThreadFactory thread_factory=new DefaultThreadFactory("gossip", false, true);

protected SocketFactory socket_factory=new DefaultSocketFactory();

@ManagedAttribute(description="The max queue size of backlogged connections")
protected int backlog=1000;

@ManagedAttribute(description="Initial size of the TCP/NIO receive buffer (in bytes)")
protected int recv_buf_size;

@ManagedAttribute(description="Expose GossipRouter via JMX")
protected boolean jmx=true;
protected boolean jmx;

@ManagedAttribute(description="Use non-blocking IO (true) or blocking IO (false). Cannot be changed at runtime")
protected boolean use_nio;
Expand Down Expand Up @@ -133,18 +126,14 @@ public GossipRouter(InetAddress bind_addr, int local_port) throws Exception {
public GossipRouter port(int port) {this.port=port; return this;}
public long expiryTime() {return expiry_time;}
public GossipRouter expiryTime(long t) {this.expiry_time=t; return this;}
public long lingerTimeout() {return linger_timeout;}
public GossipRouter lingerTimeout(long t) {this.linger_timeout=t; return this;}
public long socketReadTimeout() {return sock_read_timeout;}
public GossipRouter socketReadTimeout(long t) {this.sock_read_timeout=t; return this;}
public int lingerTimeout() {return linger_timeout;}
public GossipRouter lingerTimeout(int t) {this.linger_timeout=t; return this;}
public int recvBufferSize() {return recv_buf_size;}
public GossipRouter recvBufferSize(int s) {recv_buf_size=s; return this;}
public ThreadFactory threadPoolFactory() {return thread_factory;}
public GossipRouter threadPoolFactory(ThreadFactory f) {this.thread_factory=f; return this;}
public SocketFactory socketFactory() {return socket_factory;}
public GossipRouter socketFactory(SocketFactory sf) {this.socket_factory=sf; return this;}
public int backlog() {return backlog;}
public GossipRouter backlog(int backlog) {this.backlog=backlog; return this;}
public boolean jmx() {return jmx;}
public GossipRouter jmx(boolean flag) {jmx=flag; return this;}
public boolean useNio() {return use_nio;}
Expand Down Expand Up @@ -215,9 +204,10 @@ public GossipRouter start() throws Exception {
recv_buf_size, "jgroups.nio.gossiprouter")
: new TcpServer(thread_factory, socket_factory, bind_addr, port, port, null, 0,
recv_buf_size, "jgroups.tcp.gossiprouter");
server.receiver(this).setMaxLength(max_length);
server.receiver(this).setMaxLength(max_length)
.addConnectionListener(this)
.connExpireTimeout(expiry_time).linger(linger_timeout);
server.start();
server.addConnectionListener(this);
Runtime.getRuntime().addShutdownHook(new Thread(GossipRouter.this::stop));
return this;
}
Expand Down Expand Up @@ -344,7 +334,6 @@ public void receive(Address sender, DataInput in) throws Exception {
// we might be concurrent traffic from *different* (senders) TcpConnections for the
// *same* target address, so we have to synchronized below in order to avoid corruption
// of data (https://issues.redhat.com/browse/JGRP-2722)
//noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized(out) {
out.position(0);
request.writeTo(out);
Expand Down Expand Up @@ -409,8 +398,8 @@ public Map<String,String> handleProbe(String... keys) {
StringBuilder sb=new StringBuilder();
for(DiagnosticsHandler.ProbeHandler handler : diag.getProbeHandlers()) {
String[] tmp=handler.supportedKeys();
if(tmp != null && tmp.length > 0) {
for(String s : tmp)
if(tmp != null) {
for(String s: tmp)
sb.append(s).append(" ");
}
}
Expand Down Expand Up @@ -755,32 +744,15 @@ public Entry(Address client_addr, PhysicalAddress phys_addr, String logical_name



/**
* Prints startup information.
*/
private void printStartupInfo() {
System.out.println("GossipRouter started at " + Util.utcNow());

System.out.print("Listening on port " + port);
System.out.println(" bound on address " + server.localAddress());

System.out.print("Backlog is " + backlog);
System.out.print(", linger timeout is " + linger_timeout);
System.out.println(", and read timeout is " + sock_read_timeout);
}




public static void main(String[] args) throws Exception {
int port=12001;
int backlog=0, recv_buf_size=0, max_length=0;
long soLinger=-1;
long soTimeout=-1;
long expiry_time=60000;
int recv_buf_size=0, max_length=0;
long expiry_time=0;
boolean diag_enabled=true, diag_enable_udp=true, diag_enable_tcp=false;
InetAddress diag_mcast_addr=null, diag_bind_addr=null;
int diag_port=7500, diag_port_range=50, diag_ttl=8;
int diag_port=7500, diag_port_range=50, diag_ttl=8, soLinger=-1;
List<NetworkInterface> diag_bind_interfaces=null;
String diag_passcode=null;

Expand All @@ -800,10 +772,6 @@ public static void main(String[] args) throws Exception {
bind_addr=args[++i];
continue;
}
if("-backlog".equals(arg)) {
backlog=Integer.parseInt(args[++i]);
continue;
}
if("-recv_buf_size".equals(args[i])) {
recv_buf_size=Integer.parseInt(args[++i]);
continue;
Expand All @@ -817,11 +785,7 @@ public static void main(String[] args) throws Exception {
continue;
}
if("-solinger".equals(arg)) {
soLinger=Long.parseLong(args[++i]);
continue;
}
if("-sotimeout".equals(arg)) {
soTimeout=Long.parseLong(args[++i]);
soLinger=Integer.parseInt(args[++i]);
continue;
}
if("-nio".equals(arg)) {
Expand Down Expand Up @@ -937,9 +901,7 @@ public static void main(String[] args) throws Exception {
GossipRouter router=new GossipRouter(bind_addr, port)
.jmx(jmx).expiryTime(expiry_time)
.useNio(nio)
.backlog(backlog)
.recvBufferSize(recv_buf_size)
.socketReadTimeout(soTimeout)
.lingerTimeout(soLinger)
.emitSuspectEvents(suspects)
.dumpMessages(dump_msgs)
Expand Down Expand Up @@ -994,19 +956,10 @@ static void help() {
System.out.println();
System.out.println("Options:");
System.out.println();
System.out.println(" -backlog <backlog> - Max queue size of backlogged connections. Must be");
System.out.println(" greater than zero or the default of 1000 will be used.");
System.out.println();
System.out.println(" -jmx <true|false> - Expose attributes and operations via JMX.\n");
System.out.println(" -recv_buf_size <bytes> - Sets the receive buffer");
System.out.println();
System.out.println(" -solinger <msecs> - Time for setting SO_LINGER on connections. 0");
System.out.println(" means do not set SO_LINGER. Must be greater than");
System.out.println(" or equal to zero or the default of 2000 will be used.");
System.out.println();
System.out.println(" -sotimeout <msecs> - Time for setting SO_TIMEOUT on connections. 0");
System.out.println(" means don't set SO_TIMEOUT. Must be greater than");
System.out.println(" or equal to zero or the default of 3000 will be used.");
System.out.println(" -solinger <msecs> - Time for setting SO_LINGER on connections");
System.out.println();
System.out.println(" -expiry <msecs> - Time for closing idle connections. 0 means don't expire.");
System.out.println();
Expand Down

0 comments on commit e0cf7b2

Please sign in to comment.