From 42228178bcd148bcd590b63df9ccfbf7b0f71140 Mon Sep 17 00:00:00 2001 From: Jiashuo Date: Tue, 27 Jul 2021 18:03:13 +0800 Subject: [PATCH] feat: support session reset time windows config and init default value to 30s (#161) --- .../infra/pegasus/client/ClientOptions.java | 44 +++++++++++++++++-- .../pegasus/rpc/async/ClusterManager.java | 3 ++ .../pegasus/rpc/async/ReplicaSession.java | 4 +- .../pegasus/rpc/async/ReplicaSessionTest.java | 2 +- 4 files changed, 48 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java b/src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java index a9f1aef569..a622761b94 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java @@ -48,6 +48,8 @@ public class ClientOptions { public static final String PEGASUS_PUSH_COUNTER_INTERVAL_SECS_KEY = "push_counter_interval_secs"; public static final String PEGASUS_META_QUERY_TIMEOUT_KEY = "meta_query_timeout"; public static final String PEGASUS_AUTH_PROTOCOL_KEY = "auth_protocol"; + public static final String PEGASUS_SESSION_RESET_TIME_WINDOW_SECS_KEY = + "session_reset_time_window_secs"; public static final String DEFAULT_META_SERVERS = "127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603"; @@ -59,6 +61,7 @@ public class ClientOptions { public static final boolean DEFAULT_ENABLE_WRITE_LIMIT = true; public static final Duration DEFAULT_META_QUERY_TIMEOUT = Duration.ofMillis(5000); public static final String DEFAULT_AUTH_PROTOCOL = ""; + public static final long DEFAULT_SESSION_RESET_SECS_WINDOW = 30; private final String metaServers; private final Duration operationTimeout; @@ -70,6 +73,7 @@ public class ClientOptions { private final Duration metaQueryTimeout; private final String authProtocol; private final Credential credential; + private final long sessionResetTimeWindowSecs; protected ClientOptions(Builder builder) { this.metaServers = builder.metaServers; @@ -82,6 +86,7 @@ protected ClientOptions(Builder builder) { this.metaQueryTimeout = builder.metaQueryTimeout; this.authProtocol = builder.authProtocol; this.credential = builder.credential; + this.sessionResetTimeWindowSecs = builder.sessionResetTimeWindowSecs; } protected ClientOptions(ClientOptions original) { @@ -95,6 +100,7 @@ protected ClientOptions(ClientOptions original) { this.metaQueryTimeout = original.getMetaQueryTimeout(); this.authProtocol = original.getAuthProtocol(); this.credential = original.getCredential(); + this.sessionResetTimeWindowSecs = original.getSessionResetTimeWindowSecs(); } /** @@ -156,6 +162,9 @@ public static ClientOptions create(String configPath) throws PException { config.getLong(PEGASUS_META_QUERY_TIMEOUT_KEY, DEFAULT_META_QUERY_TIMEOUT.toMillis())); String authProtocol = config.getString(PEGASUS_AUTH_PROTOCOL_KEY, DEFAULT_AUTH_PROTOCOL); Credential credential = Credential.create(authProtocol, config); + long sessionResetTimeWindowSecs = + config.getLong( + PEGASUS_SESSION_RESET_TIME_WINDOW_SECS_KEY, DEFAULT_SESSION_RESET_SECS_WINDOW); return ClientOptions.builder() .metaServers(metaList) @@ -167,6 +176,7 @@ public static ClientOptions create(String configPath) throws PException { .metaQueryTimeout(metaQueryTimeout) .authProtocol(authProtocol) .credential(credential) + .sessionResetTimeWindowSecs(sessionResetTimeWindowSecs) .build(); } @@ -185,8 +195,9 @@ public boolean equals(Object options) { && this.falconPushInterval.toMillis() == clientOptions.falconPushInterval.toMillis() && this.enableWriteLimit == clientOptions.enableWriteLimit && this.metaQueryTimeout.toMillis() == clientOptions.metaQueryTimeout.toMillis() - && this.authProtocol == clientOptions.authProtocol - && this.credential == clientOptions.credential; + && this.authProtocol.equals(clientOptions.authProtocol) + && this.credential == clientOptions.credential + && this.sessionResetTimeWindowSecs == clientOptions.sessionResetTimeWindowSecs; } return false; } @@ -214,7 +225,9 @@ public String toString() { + ", metaQueryTimeout(ms)=" + metaQueryTimeout.toMillis() + ", authProtocol=" - + authProtocol; + + authProtocol + + ", sessionResetTimeWindowSecs=" + + sessionResetTimeWindowSecs; if (credential != null) { res += ", credential=" + credential.toString(); } @@ -233,6 +246,7 @@ public static class Builder { private Duration metaQueryTimeout = DEFAULT_META_QUERY_TIMEOUT; private String authProtocol = DEFAULT_AUTH_PROTOCOL; private Credential credential = null; + private long sessionResetTimeWindowSecs = DEFAULT_SESSION_RESET_SECS_WINDOW; protected Builder() {} @@ -365,6 +379,20 @@ public Builder credential(Credential credential) { return this; } + /** + * session reset time window, If the timeout duration exceeds this value, the connection will be + * reset + * + * @param sessionResetTimeWindowSecs sessionResetTimeWindowSecs must >= 10s, Defaults to + * {@linkplain #DEFAULT_SESSION_RESET_SECS_WINDOW} + * @return {@code this} + */ + public Builder sessionResetTimeWindowSecs(long sessionResetTimeWindowSecs) { + assert sessionResetTimeWindowSecs >= 10 : "sessionResetTimeWindowSecs must be >= 10s"; + this.sessionResetTimeWindowSecs = sessionResetTimeWindowSecs; + return this; + } + /** * Create a new instance of {@link ClientOptions}. * @@ -500,4 +528,14 @@ public String getAuthProtocol() { public Credential getCredential() { return credential; } + + /** + * session reset time window, If the timeout duration exceeds this value, the connection will be + * reset + * + * @return sessionResetTimeWindowSecs + */ + public long getSessionResetTimeWindowSecs() { + return sessionResetTimeWindowSecs; + } } diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManager.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManager.java index 2522c80225..25e9db5e2e 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManager.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManager.java @@ -41,6 +41,7 @@ public class ClusterManager extends Cluster { private static final Logger logger = org.slf4j.LoggerFactory.getLogger(ClusterManager.class); private int operationTimeout; + private long sessionResetTimeWindowSecs; private int retryDelay; private boolean enableCounter; @@ -63,6 +64,7 @@ public class ClusterManager extends Cluster { public ClusterManager(ClientOptions opts) throws IllegalArgumentException { setTimeout((int) opts.getOperationTimeout().toMillis()); this.enableCounter = opts.isEnablePerfCounter(); + this.sessionResetTimeWindowSecs = opts.getSessionResetTimeWindowSecs(); if (enableCounter) { MetricsManager.detectHostAndInit( opts.getFalconPerfCounterTags(), (int) opts.getFalconPushInterval().getSeconds()); @@ -103,6 +105,7 @@ public ReplicaSession getReplicaSession(rpc_address address) { address, replicaGroup, max(operationTimeout, ClientOptions.MIN_SOCK_CONNECT_TIMEOUT), + sessionResetTimeWindowSecs, sessionInterceptorManager); replicaSessions.put(address, ss); return ss; diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java index 1175eb12a2..a8f34e7418 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java @@ -58,10 +58,12 @@ public ReplicaSession( rpc_address address, EventLoopGroup rpcGroup, int socketTimeout, + long sessionResetTimeWindowSec, ReplicaSessionInterceptorManager interceptorManager) { this.address = address; this.rpcGroup = rpcGroup; this.interceptorManager = interceptorManager; + this.sessionResetTimeWindowMs = sessionResetTimeWindowSec * 1000; final ReplicaSession this_ = this; boot = new Bootstrap(); @@ -481,7 +483,7 @@ static final class VolatileFields { // Timestamp of the first timed out rpc. private AtomicLong firstRecentTimedOutMs; - private static final long sessionResetTimeWindowMs = 10 * 1000; // 10s + private final long sessionResetTimeWindowMs; private static final Logger logger = org.slf4j.LoggerFactory.getLogger(ReplicaSession.class); } diff --git a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSessionTest.java b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSessionTest.java index 47b31e6769..155a6180ef 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSessionTest.java +++ b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSessionTest.java @@ -296,7 +296,7 @@ public void testSessionConnectTimeout() throws InterruptedException { long start = System.currentTimeMillis(); EventLoopGroup rpcGroup = new NioEventLoopGroup(4); ReplicaSession rs = - new ReplicaSession(addr, rpcGroup, 1000, (ReplicaSessionInterceptorManager) null); + new ReplicaSession(addr, rpcGroup, 1000, 30, (ReplicaSessionInterceptorManager) null); rs.tryConnect().awaitUninterruptibly(); long end = System.currentTimeMillis(); Assert.assertEquals((end - start) / 1000, 1); // ensure connect failed within 1sec