Skip to content

Commit

Permalink
feat: support session reset time windows config and init default valu…
Browse files Browse the repository at this point in the history
…e to 30s (apache#161)
  • Loading branch information
foreverneverer authored Jul 27, 2021
1 parent 614c76d commit 4222817
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 5 deletions.
44 changes: 41 additions & 3 deletions src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public class ClientOptions {
public static final String PEGASUS_PUSH_COUNTER_INTERVAL_SECS_KEY = "push_counter_interval_secs";
public static final String PEGASUS_META_QUERY_TIMEOUT_KEY = "meta_query_timeout";
public static final String PEGASUS_AUTH_PROTOCOL_KEY = "auth_protocol";
public static final String PEGASUS_SESSION_RESET_TIME_WINDOW_SECS_KEY =
"session_reset_time_window_secs";

public static final String DEFAULT_META_SERVERS =
"127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603";
Expand All @@ -59,6 +61,7 @@ public class ClientOptions {
public static final boolean DEFAULT_ENABLE_WRITE_LIMIT = true;
public static final Duration DEFAULT_META_QUERY_TIMEOUT = Duration.ofMillis(5000);
public static final String DEFAULT_AUTH_PROTOCOL = "";
public static final long DEFAULT_SESSION_RESET_SECS_WINDOW = 30;

private final String metaServers;
private final Duration operationTimeout;
Expand All @@ -70,6 +73,7 @@ public class ClientOptions {
private final Duration metaQueryTimeout;
private final String authProtocol;
private final Credential credential;
private final long sessionResetTimeWindowSecs;

protected ClientOptions(Builder builder) {
this.metaServers = builder.metaServers;
Expand All @@ -82,6 +86,7 @@ protected ClientOptions(Builder builder) {
this.metaQueryTimeout = builder.metaQueryTimeout;
this.authProtocol = builder.authProtocol;
this.credential = builder.credential;
this.sessionResetTimeWindowSecs = builder.sessionResetTimeWindowSecs;
}

protected ClientOptions(ClientOptions original) {
Expand All @@ -95,6 +100,7 @@ protected ClientOptions(ClientOptions original) {
this.metaQueryTimeout = original.getMetaQueryTimeout();
this.authProtocol = original.getAuthProtocol();
this.credential = original.getCredential();
this.sessionResetTimeWindowSecs = original.getSessionResetTimeWindowSecs();
}

/**
Expand Down Expand Up @@ -156,6 +162,9 @@ public static ClientOptions create(String configPath) throws PException {
config.getLong(PEGASUS_META_QUERY_TIMEOUT_KEY, DEFAULT_META_QUERY_TIMEOUT.toMillis()));
String authProtocol = config.getString(PEGASUS_AUTH_PROTOCOL_KEY, DEFAULT_AUTH_PROTOCOL);
Credential credential = Credential.create(authProtocol, config);
long sessionResetTimeWindowSecs =
config.getLong(
PEGASUS_SESSION_RESET_TIME_WINDOW_SECS_KEY, DEFAULT_SESSION_RESET_SECS_WINDOW);

return ClientOptions.builder()
.metaServers(metaList)
Expand All @@ -167,6 +176,7 @@ public static ClientOptions create(String configPath) throws PException {
.metaQueryTimeout(metaQueryTimeout)
.authProtocol(authProtocol)
.credential(credential)
.sessionResetTimeWindowSecs(sessionResetTimeWindowSecs)
.build();
}

Expand All @@ -185,8 +195,9 @@ public boolean equals(Object options) {
&& this.falconPushInterval.toMillis() == clientOptions.falconPushInterval.toMillis()
&& this.enableWriteLimit == clientOptions.enableWriteLimit
&& this.metaQueryTimeout.toMillis() == clientOptions.metaQueryTimeout.toMillis()
&& this.authProtocol == clientOptions.authProtocol
&& this.credential == clientOptions.credential;
&& this.authProtocol.equals(clientOptions.authProtocol)
&& this.credential == clientOptions.credential
&& this.sessionResetTimeWindowSecs == clientOptions.sessionResetTimeWindowSecs;
}
return false;
}
Expand Down Expand Up @@ -214,7 +225,9 @@ public String toString() {
+ ", metaQueryTimeout(ms)="
+ metaQueryTimeout.toMillis()
+ ", authProtocol="
+ authProtocol;
+ authProtocol
+ ", sessionResetTimeWindowSecs="
+ sessionResetTimeWindowSecs;
if (credential != null) {
res += ", credential=" + credential.toString();
}
Expand All @@ -233,6 +246,7 @@ public static class Builder {
private Duration metaQueryTimeout = DEFAULT_META_QUERY_TIMEOUT;
private String authProtocol = DEFAULT_AUTH_PROTOCOL;
private Credential credential = null;
private long sessionResetTimeWindowSecs = DEFAULT_SESSION_RESET_SECS_WINDOW;

protected Builder() {}

Expand Down Expand Up @@ -365,6 +379,20 @@ public Builder credential(Credential credential) {
return this;
}

/**
* session reset time window, If the timeout duration exceeds this value, the connection will be
* reset
*
* @param sessionResetTimeWindowSecs sessionResetTimeWindowSecs must >= 10s, Defaults to
* {@linkplain #DEFAULT_SESSION_RESET_SECS_WINDOW}
* @return {@code this}
*/
public Builder sessionResetTimeWindowSecs(long sessionResetTimeWindowSecs) {
assert sessionResetTimeWindowSecs >= 10 : "sessionResetTimeWindowSecs must be >= 10s";
this.sessionResetTimeWindowSecs = sessionResetTimeWindowSecs;
return this;
}

/**
* Create a new instance of {@link ClientOptions}.
*
Expand Down Expand Up @@ -500,4 +528,14 @@ public String getAuthProtocol() {
public Credential getCredential() {
return credential;
}

/**
* session reset time window, If the timeout duration exceeds this value, the connection will be
* reset
*
* @return sessionResetTimeWindowSecs
*/
public long getSessionResetTimeWindowSecs() {
return sessionResetTimeWindowSecs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class ClusterManager extends Cluster {
private static final Logger logger = org.slf4j.LoggerFactory.getLogger(ClusterManager.class);

private int operationTimeout;
private long sessionResetTimeWindowSecs;
private int retryDelay;
private boolean enableCounter;

Expand All @@ -63,6 +64,7 @@ public class ClusterManager extends Cluster {
public ClusterManager(ClientOptions opts) throws IllegalArgumentException {
setTimeout((int) opts.getOperationTimeout().toMillis());
this.enableCounter = opts.isEnablePerfCounter();
this.sessionResetTimeWindowSecs = opts.getSessionResetTimeWindowSecs();
if (enableCounter) {
MetricsManager.detectHostAndInit(
opts.getFalconPerfCounterTags(), (int) opts.getFalconPushInterval().getSeconds());
Expand Down Expand Up @@ -103,6 +105,7 @@ public ReplicaSession getReplicaSession(rpc_address address) {
address,
replicaGroup,
max(operationTimeout, ClientOptions.MIN_SOCK_CONNECT_TIMEOUT),
sessionResetTimeWindowSecs,
sessionInterceptorManager);
replicaSessions.put(address, ss);
return ss;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,12 @@ public ReplicaSession(
rpc_address address,
EventLoopGroup rpcGroup,
int socketTimeout,
long sessionResetTimeWindowSec,
ReplicaSessionInterceptorManager interceptorManager) {
this.address = address;
this.rpcGroup = rpcGroup;
this.interceptorManager = interceptorManager;
this.sessionResetTimeWindowMs = sessionResetTimeWindowSec * 1000;

final ReplicaSession this_ = this;
boot = new Bootstrap();
Expand Down Expand Up @@ -481,7 +483,7 @@ static final class VolatileFields {

// Timestamp of the first timed out rpc.
private AtomicLong firstRecentTimedOutMs;
private static final long sessionResetTimeWindowMs = 10 * 1000; // 10s
private final long sessionResetTimeWindowMs;

private static final Logger logger = org.slf4j.LoggerFactory.getLogger(ReplicaSession.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ public void testSessionConnectTimeout() throws InterruptedException {
long start = System.currentTimeMillis();
EventLoopGroup rpcGroup = new NioEventLoopGroup(4);
ReplicaSession rs =
new ReplicaSession(addr, rpcGroup, 1000, (ReplicaSessionInterceptorManager) null);
new ReplicaSession(addr, rpcGroup, 1000, 30, (ReplicaSessionInterceptorManager) null);
rs.tryConnect().awaitUninterruptibly();
long end = System.currentTimeMillis();
Assert.assertEquals((end - start) / 1000, 1); // ensure connect failed within 1sec
Expand Down

0 comments on commit 4222817

Please sign in to comment.