From dfac3a890f1670c430763dc8e1cb111bb4449951 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Thu, 23 Jul 2020 20:42:14 +0800 Subject: [PATCH 1/3] init --- pom.xml | 5 + .../infra/pegasus/client/ClientOptions.java | 55 +++++++ .../infra/pegasus/client/PegasusClient.java | 45 ++---- .../pegasus/client/PegasusClientFactory.java | 19 +-- .../client/PegasusClientInterface.java | 2 +- .../com/xiaomi/infra/pegasus/rpc/Cluster.java | 8 +- .../infra/pegasus/rpc/ClusterOptions.java | 141 ------------------ .../pegasus/rpc/async/ClusterManager.java | 22 +-- .../infra/pegasus/client/TestPException.java | 11 +- .../pegasus/rpc/async/ClusterManagerTest.java | 18 +-- .../pegasus/rpc/async/MetaSessionTest.java | 21 +-- .../pegasus/rpc/async/ReplicaSessionTest.java | 6 +- .../pegasus/rpc/async/TableHandlerTest.java | 6 +- .../pegasus/rpc/async/TimeoutBenchmark.java | 7 +- 14 files changed, 126 insertions(+), 240 deletions(-) delete mode 100644 src/main/java/com/xiaomi/infra/pegasus/rpc/ClusterOptions.java diff --git a/pom.xml b/pom.xml index e37ee80d..2983ec80 100644 --- a/pom.xml +++ b/pom.xml @@ -35,6 +35,11 @@ metrics-core 3.1.2 + + org.apache.commons + commons-configuration2 + 2.7 + org.slf4j slf4j-api diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java b/src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java index 2d1db356..176013bd 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java @@ -3,7 +3,11 @@ // can be found in the LICENSE file in the root directory of this source tree. package com.xiaomi.infra.pegasus.client; +import static com.xiaomi.infra.pegasus.client.PConfigUtil.loadConfiguration; + import java.time.Duration; +import org.apache.commons.configuration2.Configuration; +import org.apache.commons.configuration2.ConfigurationConverter; /** * Client Options to control the behavior of {@link PegasusClientInterface}. @@ -31,6 +35,16 @@ */ public class ClientOptions { + public static final int MIN_SOCK_CONNECT_TIMEOUT = 1000; + + public static final String PEGASUS_META_SERVERS_KEY = "meta_servers"; + public static final String PEGASUS_OPERATION_TIMEOUT_KEY = "operation_timeout"; + public static final String PEGASUS_ASYNC_WORKERS_KEY = "async_workers"; + public static final String PEGASUS_ENABLE_PERF_COUNTER_KEY = "enable_perf_counter"; + public static final String PEGASUS_PERF_COUNTER_TAGS_KEY = "perf_counter_tags"; + public static final String PEGASUS_PUSH_COUNTER_INTERVAL_SECS_KEY = "push_counter_interval_secs"; + public static final String PEGASUS_META_QUERY_TIMEOUT_KEY = "meta_query_timeout"; + public static final String DEFAULT_META_SERVERS = "127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603"; public static final Duration DEFAULT_OPERATION_TIMEOUT = Duration.ofMillis(1000); @@ -100,6 +114,47 @@ public static ClientOptions create() { return builder().build(); } + public static ClientOptions create(String configPath) throws PException { + Configuration config = ConfigurationConverter.getConfiguration(loadConfiguration(configPath)); + + String metaList = config.getString(PEGASUS_META_SERVERS_KEY); + if (metaList == null) { + throw new IllegalArgumentException("no property set: " + PEGASUS_META_SERVERS_KEY); + } + metaList = metaList.trim(); + if (metaList.isEmpty()) { + throw new IllegalArgumentException("invalid property: " + PEGASUS_META_SERVERS_KEY); + } + + int asyncWorkers = config.getInt(PEGASUS_ASYNC_WORKERS_KEY, DEFAULT_ASYNC_WORKERS); + boolean enablePerfCounter = + config.getBoolean(PEGASUS_ENABLE_PERF_COUNTER_KEY, DEFAULT_ENABLE_PERF_COUNTER); + String perfCounterTags = + enablePerfCounter + ? config.getString(PEGASUS_PERF_COUNTER_TAGS_KEY, DEFAULT_FALCON_PERF_COUNTER_TAGS) + : null; + Duration pushIntervalSecs = + Duration.ofSeconds( + config.getLong( + PEGASUS_PUSH_COUNTER_INTERVAL_SECS_KEY, DEFAULT_FALCON_PUSH_INTERVAL.getSeconds())); + Duration operationTimeOut = + Duration.ofMillis( + config.getLong(PEGASUS_OPERATION_TIMEOUT_KEY, DEFAULT_OPERATION_TIMEOUT.toMillis())); + Duration metaQueryTimeout = + Duration.ofMillis( + config.getLong(PEGASUS_META_QUERY_TIMEOUT_KEY, DEFAULT_META_QUERY_TIMEOUT.toMillis())); + + return ClientOptions.builder() + .metaServers(metaList) + .operationTimeout(operationTimeOut) + .asyncWorkers(asyncWorkers) + .enablePerfCounter(enablePerfCounter) + .falconPerfCounterTags(perfCounterTags) + .falconPushInterval(pushIntervalSecs) + .metaQueryTimeout(metaQueryTimeout) + .build(); + } + @Override public boolean equals(Object options) { if (this == options) { diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java index a00e7c58..c092aee0 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java @@ -8,9 +8,7 @@ import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; -import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; -import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.tuple.ImmutablePair; @@ -25,14 +23,10 @@ public class PegasusClient implements PegasusClientInterface { private static final Logger LOGGER = LoggerFactory.getLogger(PegasusClient.class); - public static final String PEGASUS_ENABLE_WRITE_LIMIT = "enable_write_limit"; - public static final String PEGASUS_ENABLE_WRITE_LIMIT_DEF = "true"; + private ClientOptions clientOptions; - private boolean enableWriteLimit; - private final Properties config; private final ConcurrentHashMap tableMap; private final Object tableMapLock; - private final String[] metaList; private Cluster cluster; private static class PegasusHasher implements KeyHasher { @@ -72,36 +66,28 @@ private PegasusTable getTable(String tableName, int backupRequestDelayMs) throws return table; } - // pegasus client configuration keys - public static final String[] PEGASUS_CLIENT_CONFIG_KEYS = - ArrayUtils.add(ClusterOptions.allKeys(), PEGASUS_ENABLE_WRITE_LIMIT); - // configPath could be: // - zk path: zk://host1:port1,host2:port2,host3:port3/path/to/config // - local file path: file:///path/to/config // - resource path: resource:///path/to/config public PegasusClient(String configPath) throws PException { - this(PConfigUtil.loadConfiguration(configPath)); + this(ClientOptions.create(configPath)); } - public PegasusClient(Properties config) throws PException { - this.config = config; - this.cluster = Cluster.createCluster(config); + public PegasusClient(ClientOptions clientOptions) throws PException { + this.clientOptions = clientOptions; + this.cluster = Cluster.createCluster(clientOptions); this.tableMap = new ConcurrentHashMap(); this.tableMapLock = new Object(); - this.metaList = cluster.getMetaList(); - this.enableWriteLimit = - Boolean.parseBoolean( - config.getProperty(PEGASUS_ENABLE_WRITE_LIMIT, PEGASUS_ENABLE_WRITE_LIMIT_DEF)); LOGGER.info(getConfigurationString()); } public boolean isWriteLimitEnabled() { - return enableWriteLimit; + return clientOptions.isWriteLimitEnabled(); } String getMetaList() { - return Arrays.toString(metaList); + return clientOptions.getMetaServers(); } @Override @@ -181,18 +167,7 @@ public static int bytesCompare(byte[] left, byte[] right) { } public String getConfigurationString() { - String configString = "PegasusClient Configuration:\n"; - if (this.config == null) { - return configString; - } - for (int i = 0; i < PEGASUS_CLIENT_CONFIG_KEYS.length; ++i) { - configString += - (PEGASUS_CLIENT_CONFIG_KEYS[i] - + "=" - + this.config.getProperty(PEGASUS_CLIENT_CONFIG_KEYS[i], "") - + "\n"); - } - return configString; + return clientOptions.toString(); } @Override @@ -220,8 +195,8 @@ public PegasusTableInterface openTable(String tableName, int backupRequestDelayM } @Override - public Properties getConfiguration() { - return config; + public ClientOptions getConfiguration() { + return clientOptions; } @Override diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientFactory.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientFactory.java index fb329f08..1dc21f1b 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientFactory.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientFactory.java @@ -3,7 +3,6 @@ // can be found in the LICENSE file in the root directory of this source tree. package com.xiaomi.infra.pegasus.client; -import java.util.Properties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,24 +39,12 @@ public static PegasusClientInterface createClient(String configPath) throws PExc * Create a client instance instance with {@link ClientOptions}. After used, should call * client.close() to release resource. * - * @param options The client option + * @param clientOptions The client option * @return PegasusClientInterface {@link PegasusClientInterface} * @throws PException throws exception if any error occurs. */ - public static PegasusClientInterface createClient(ClientOptions options) throws PException { - Properties pegasusConfig = new Properties(); - pegasusConfig.setProperty("meta_servers", options.getMetaServers()); - pegasusConfig.setProperty( - "operation_timeout", String.valueOf(options.getOperationTimeout().toMillis())); - pegasusConfig.setProperty("async_workers", String.valueOf(options.getAsyncWorkers())); - pegasusConfig.setProperty("enable_perf_counter", String.valueOf(options.isEnablePerfCounter())); - pegasusConfig.setProperty("perf_counter_tags", String.valueOf(options.isEnablePerfCounter())); - pegasusConfig.setProperty( - "push_counter_interval_secs", String.valueOf(options.getFalconPushInterval().getSeconds())); - pegasusConfig.setProperty("enable_write_limit", String.valueOf(options.isWriteLimitEnabled())); - pegasusConfig.setProperty( - "meta_query_timeout", String.valueOf(options.getMetaQueryTimeout().toMillis())); - return new PegasusClient(pegasusConfig); + public static PegasusClientInterface createClient(ClientOptions clientOptions) throws PException { + return new PegasusClient(clientOptions); } /** diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java index 598e357d..f28f9a26 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java @@ -16,7 +16,7 @@ public interface PegasusClientInterface { * * @return config */ - public Properties getConfiguration(); + public ClientOptions getConfiguration(); /** Close the client. The client can not be used again after closed. */ public void close(); diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/Cluster.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/Cluster.java index aa0953a9..0ec9e59b 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/Cluster.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/Cluster.java @@ -3,14 +3,16 @@ // can be found in the LICENSE file in the root directory of this source tree. package com.xiaomi.infra.pegasus.rpc; +import com.xiaomi.infra.pegasus.client.ClientOptions; +import com.xiaomi.infra.pegasus.client.PException; import com.xiaomi.infra.pegasus.rpc.async.ClusterManager; -import java.util.Properties; import org.apache.thrift.TException; public abstract class Cluster { - public static Cluster createCluster(Properties config) throws IllegalArgumentException { - return new ClusterManager(ClusterOptions.create(config)); + public static Cluster createCluster(ClientOptions clientOptions) + throws IllegalArgumentException, PException { + return new ClusterManager(clientOptions); } public abstract String[] getMetaList(); diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/ClusterOptions.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/ClusterOptions.java deleted file mode 100644 index f392adf9..00000000 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/ClusterOptions.java +++ /dev/null @@ -1,141 +0,0 @@ -// Copyright (c) 2017, Xiaomi, Inc. All rights reserved. -// This source code is licensed under the Apache License Version 2.0, which -// can be found in the LICENSE file in the root directory of this source tree. -package com.xiaomi.infra.pegasus.rpc; - -import java.util.Properties; - -/** ClusterOptions is the internal options for connecting a Pegasus cluster. */ -public class ClusterOptions { - public static final int MIN_SOCK_CONNECT_TIMEOUT = 1000; - - public static final String PEGASUS_META_SERVERS_KEY = "meta_servers"; - - public static final String PEGASUS_OPERATION_TIMEOUT_KEY = "operation_timeout"; - public static final String PEGASUS_OPERATION_TIMEOUT_DEF = "1000"; - - public static final String PEGASUS_ASYNC_WORKERS_KEY = "async_workers"; - public static final String PEGASUS_ASYNC_WORKERS_DEF = "4"; - - public static final String PEGASUS_ENABLE_PERF_COUNTER_KEY = "enable_perf_counter"; - public static final String PEGASUS_ENABLE_PERF_COUNTER_DEF = "true"; - - public static final String PEGASUS_PERF_COUNTER_TAGS_KEY = "perf_counter_tags"; - public static final String PEGASUS_PERF_COUNTER_TAGS_DEF = ""; - - public static final String PEGASUS_PUSH_COUNTER_INTERVAL_SECS_KEY = "push_counter_interval_secs"; - public static final String PEGASUS_PUSH_COUNTER_INTERVAL_SECS_DEF = "60"; - - public static final String PEGASUS_META_QUERY_TIMEOUT_KEY = "meta_query_timeout"; - public static final String PEGASUS_META_QUERY_TIMEOUT_DEF = "5000"; - - public static String[] allKeys() { - return new String[] { - PEGASUS_META_SERVERS_KEY, - PEGASUS_OPERATION_TIMEOUT_KEY, - PEGASUS_ASYNC_WORKERS_KEY, - PEGASUS_ENABLE_PERF_COUNTER_KEY, - PEGASUS_PERF_COUNTER_TAGS_KEY, - PEGASUS_PUSH_COUNTER_INTERVAL_SECS_KEY, - PEGASUS_META_QUERY_TIMEOUT_KEY - }; - } - - private final int operationTimeout; - private final String[] metaList; - private final int asyncWorkers; - private final boolean enablePerfCounter; - private final String perfCounterTags; - private final int pushCounterIntervalSecs; - private final int metaQueryTimeout; - - public int operationTimeout() { - return this.operationTimeout; - } - - public String[] metaList() { - return this.metaList == null ? null : this.metaList.clone(); - } - - public int asyncWorkers() { - return this.asyncWorkers; - } - - public boolean enablePerfCounter() { - return this.enablePerfCounter; - } - - public String perfCounterTags() { - return this.perfCounterTags; - } - - public int pushCounterIntervalSecs() { - return this.pushCounterIntervalSecs; - } - - public int metaQueryTimeout() { - return this.metaQueryTimeout; - } - - public static ClusterOptions create(Properties config) { - int operationTimeout = - Integer.parseInt( - config.getProperty(PEGASUS_OPERATION_TIMEOUT_KEY, PEGASUS_OPERATION_TIMEOUT_DEF)); - String metaList = config.getProperty(PEGASUS_META_SERVERS_KEY); - if (metaList == null) { - throw new IllegalArgumentException("no property set: " + PEGASUS_META_SERVERS_KEY); - } - metaList = metaList.trim(); - if (metaList.isEmpty()) { - throw new IllegalArgumentException("invalid property: " + PEGASUS_META_SERVERS_KEY); - } - String[] address = metaList.split(","); - - int asyncWorkers = - Integer.parseInt(config.getProperty(PEGASUS_ASYNC_WORKERS_KEY, PEGASUS_ASYNC_WORKERS_DEF)); - boolean enablePerfCounter = - Boolean.parseBoolean( - config.getProperty(PEGASUS_ENABLE_PERF_COUNTER_KEY, PEGASUS_ENABLE_PERF_COUNTER_DEF)); - String perfCounterTags = - enablePerfCounter - ? config.getProperty(PEGASUS_PERF_COUNTER_TAGS_KEY, PEGASUS_PERF_COUNTER_TAGS_DEF) - : null; - int pushIntervalSecs = - Integer.parseInt( - config.getProperty( - PEGASUS_PUSH_COUNTER_INTERVAL_SECS_KEY, PEGASUS_PUSH_COUNTER_INTERVAL_SECS_DEF)); - int metaQueryTimeout = - Integer.parseInt( - config.getProperty(PEGASUS_META_QUERY_TIMEOUT_KEY, PEGASUS_META_QUERY_TIMEOUT_DEF)); - - return new ClusterOptions( - operationTimeout, - address, - asyncWorkers, - enablePerfCounter, - perfCounterTags, - pushIntervalSecs, - metaQueryTimeout); - } - - public static ClusterOptions forTest(String[] metaList) { - return new ClusterOptions(1000, metaList, 1, false, null, 60, 1000); - } - - private ClusterOptions( - int operationTimeout, - String[] metaList, - int asyncWorkers, - boolean enablePerfCounter, - String perfCounterTags, - int pushCounterIntervalSecs, - int metaQueryTimeout) { - this.operationTimeout = operationTimeout; - this.metaList = metaList; - this.asyncWorkers = asyncWorkers; - this.enablePerfCounter = enablePerfCounter; - this.perfCounterTags = perfCounterTags; - this.pushCounterIntervalSecs = pushCounterIntervalSecs; - this.metaQueryTimeout = metaQueryTimeout; - } -} diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManager.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManager.java index 3c1c6963..79302b5d 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManager.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManager.java @@ -6,9 +6,9 @@ import static java.lang.Integer.max; import com.xiaomi.infra.pegasus.base.rpc_address; +import com.xiaomi.infra.pegasus.client.ClientOptions; import com.xiaomi.infra.pegasus.metrics.MetricsManager; import com.xiaomi.infra.pegasus.rpc.Cluster; -import com.xiaomi.infra.pegasus.rpc.ClusterOptions; import com.xiaomi.infra.pegasus.rpc.ReplicationException; import com.xiaomi.infra.pegasus.rpc.TableOptions; import io.netty.channel.EventLoopGroup; @@ -43,22 +43,24 @@ public class ClusterManager extends Cluster { logger.info("operating system name: {}", osName); } - public ClusterManager(ClusterOptions opts) throws IllegalArgumentException { - setTimeout(opts.operationTimeout()); - this.enableCounter = opts.enablePerfCounter(); + public ClusterManager(ClientOptions opts) throws IllegalArgumentException { + setTimeout((int) opts.getOperationTimeout().toMillis()); + this.enableCounter = opts.isEnablePerfCounter(); if (enableCounter) { - MetricsManager.detectHostAndInit(opts.perfCounterTags(), opts.pushCounterIntervalSecs()); + MetricsManager.detectHostAndInit( + opts.getFalconPerfCounterTags(), (int) opts.getFalconPushInterval().getSeconds()); } replicaSessions = new ConcurrentHashMap(); - replicaGroup = getEventLoopGroupInstance(opts.asyncWorkers()); + replicaGroup = getEventLoopGroupInstance(opts.getAsyncWorkers()); metaGroup = getEventLoopGroupInstance(1); tableGroup = getEventLoopGroupInstance(1); - metaList = opts.metaList(); + metaList = opts.getMetaServers().split(","); // the constructor of meta session is depend on the replicaSessions, // so the replicaSessions should be initialized earlier - metaSession = new MetaSession(this, opts.metaList(), opts.metaQueryTimeout(), 10, metaGroup); + metaSession = + new MetaSession(this, metaList, (int) opts.getMetaQueryTimeout().toMillis(), 10, metaGroup); } public EventExecutor getExecutor() { @@ -80,9 +82,7 @@ public ReplicaSession getReplicaSession(rpc_address address) { if (ss != null) return ss; ss = new ReplicaSession( - address, - replicaGroup, - max(operationTimeout, ClusterOptions.MIN_SOCK_CONNECT_TIMEOUT)); + address, replicaGroup, max(operationTimeout, ClientOptions.MIN_SOCK_CONNECT_TIMEOUT)); replicaSessions.put(address, ss); return ss; } diff --git a/src/test/java/com/xiaomi/infra/pegasus/client/TestPException.java b/src/test/java/com/xiaomi/infra/pegasus/client/TestPException.java index edcaa02d..3b529b00 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/client/TestPException.java +++ b/src/test/java/com/xiaomi/infra/pegasus/client/TestPException.java @@ -11,7 +11,6 @@ import com.xiaomi.infra.pegasus.base.gpid; import com.xiaomi.infra.pegasus.client.PegasusTable.Request; import com.xiaomi.infra.pegasus.operator.rrdb_put_operator; -import com.xiaomi.infra.pegasus.rpc.ClusterOptions; import com.xiaomi.infra.pegasus.rpc.TableOptions; import com.xiaomi.infra.pegasus.rpc.async.ClusterManager; import com.xiaomi.infra.pegasus.rpc.async.TableHandler; @@ -57,8 +56,9 @@ public void testVersion() { @Test public void testHandleReplicationException() throws Exception { - String[] metaList = {"127.0.0.1:34601", "127.0.0.1:34602", "127.0.0.1:34603"}; - ClusterManager manager = new ClusterManager(ClusterOptions.forTest(metaList)); + String metaList = "127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603"; + ClusterManager manager = + new ClusterManager(ClientOptions.builder().metaServers(metaList).build()); TableHandler table = manager.openTable("temp", TableOptions.forTest()); DefaultPromise promise = table.newPromise(); update_request req = new update_request(new blob(), new blob(), 100); @@ -94,8 +94,9 @@ public void testHandleReplicationException() throws Exception { public void testTimeOutIsZero() throws Exception { // ensure "PException ERR_TIMEOUT" is thrown with the real timeout value, when user given // timeout is 0. - String[] metaList = {"127.0.0.1:34601", "127.0.0.1:34602", "127.0.0.1:34603"}; - ClusterManager manager = new ClusterManager(ClusterOptions.forTest(metaList)); + String metaList = "127.0.0.1:34601,127.0.0.1:34602, 127.0.0.1:34603"; + ClusterManager manager = + new ClusterManager(ClientOptions.builder().metaServers(metaList).build()); TableHandler table = manager.openTable("temp", TableOptions.forTest()); DefaultPromise promise = table.newPromise(); update_request req = new update_request(new blob(), new blob(), 100); diff --git a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManagerTest.java b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManagerTest.java index ff1be96c..8e9ec9ca 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManagerTest.java +++ b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManagerTest.java @@ -5,7 +5,7 @@ import com.xiaomi.infra.pegasus.base.error_code; import com.xiaomi.infra.pegasus.base.rpc_address; -import com.xiaomi.infra.pegasus.rpc.ClusterOptions; +import com.xiaomi.infra.pegasus.client.ClientOptions; import com.xiaomi.infra.pegasus.rpc.ReplicationException; import com.xiaomi.infra.pegasus.rpc.TableOptions; import org.junit.After; @@ -29,9 +29,10 @@ public void after() throws Exception {} /** Method: getReplicaSession(rpc_address address) */ @Test public void testGetReplicaSession() throws Exception { - String[] address_list = {"127.0.0.1:1", "127.0.0.1:2", "127.0.0.1:3"}; + String address_list = "127.0.0.1:1,127.0.0.1:2,127.0.0.1:3"; - ClusterManager testManager = new ClusterManager(ClusterOptions.forTest(address_list)); + ClusterManager testManager = + new ClusterManager(ClientOptions.builder().metaServers(address_list).build()); // input an invalid rpc address rpc_address address = new rpc_address(); @@ -43,8 +44,9 @@ public void testGetReplicaSession() throws Exception { @Test public void testOpenTable() throws Exception { // test invalid meta list - String[] addr_list = {"127.0.0.1:123", "127.0.0.1:124", "127.0.0.1:125"}; - ClusterManager testManager = new ClusterManager(ClusterOptions.forTest(addr_list)); + String address_list = "127.0.0.1:123,127.0.0.1:124,127.0.0.1:125"; + ClusterManager testManager = + new ClusterManager(ClientOptions.builder().metaServers(address_list).build()); TableHandler result = null; try { @@ -57,10 +59,8 @@ public void testOpenTable() throws Exception { testManager.close(); // test partially invalid meta list - String[] addr_list2 = { - "127.0.0.1:123", "127.0.0.1:34603", "127.0.0.1:34601", "127.0.0.1:34602" - }; - testManager = new ClusterManager(ClusterOptions.forTest(addr_list2)); + String address_list2 = "127.0.0.1:123,127.0.0.1:34603,127.0.0.1:34601,127.0.0.1:34602"; + testManager = new ClusterManager(ClientOptions.builder().metaServers(address_list2).build()); try { result = testManager.openTable("hehe", TableOptions.forTest()); } catch (ReplicationException e) { diff --git a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/MetaSessionTest.java b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/MetaSessionTest.java index 2b146d48..49b60ba4 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/MetaSessionTest.java +++ b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/MetaSessionTest.java @@ -7,12 +7,12 @@ import com.xiaomi.infra.pegasus.base.error_code.error_types; import com.xiaomi.infra.pegasus.base.gpid; import com.xiaomi.infra.pegasus.base.rpc_address; +import com.xiaomi.infra.pegasus.client.ClientOptions; import com.xiaomi.infra.pegasus.operator.client_operator; import com.xiaomi.infra.pegasus.operator.query_cfg_operator; import com.xiaomi.infra.pegasus.replication.partition_configuration; import com.xiaomi.infra.pegasus.replication.query_cfg_request; import com.xiaomi.infra.pegasus.replication.query_cfg_response; -import com.xiaomi.infra.pegasus.rpc.ClusterOptions; import com.xiaomi.infra.pegasus.tools.Toollet; import com.xiaomi.infra.pegasus.tools.Tools; import java.util.ArrayList; @@ -64,8 +64,9 @@ public void testMetaConnect() throws Exception { // test: first connect to a wrong server // then it forward to the right server // then the wrong server crashed - String[] addr_list = {"127.0.0.1:34602", "127.0.0.1:34603", "127.0.0.1:34601"}; - ClusterManager manager = new ClusterManager(ClusterOptions.forTest(addr_list)); + String address_list = "127.0.0.1:34602,127.0.0.1:34603,127.0.0.1:34601"; + ClusterManager manager = + new ClusterManager(ClientOptions.builder().metaServers(address_list).build()); MetaSession session = manager.getMetaSession(); rpc_address addr = new rpc_address(); @@ -113,10 +114,10 @@ private rpc_address[] getAddressFromSession(List sessions) { @Test public void testDNSResolveHost() throws Exception { // ensure meta list keeps consistent with dns. + + String address_list = "127.0.0.1:34602, 127.0.0.1:34603, 127.0.0.1:34601"; ClusterManager manager = - new ClusterManager( - ClusterOptions.forTest( - new String[] {"127.0.0.1:34602", "127.0.0.1:34603", "127.0.0.1:34601"})); + new ClusterManager(ClientOptions.builder().metaServers(address_list).build()); MetaSession session = manager.getMetaSession(); MetaSession meta = Mockito.spy(session); ReplicaSession meta2 = meta.getMetaList().get(0); // 127.0.0.1:34602 @@ -156,7 +157,7 @@ public void testDNSResolveHost() throws Exception { @Test public void testDNSMetaAllChanged() throws Exception { ClusterManager manager = - new ClusterManager(ClusterOptions.forTest(new String[] {"localhost:34601"})); + new ClusterManager(ClientOptions.builder().metaServers("localhost:34601").build()); MetaSession session = manager.getMetaSession(); MetaSession meta = Mockito.spy(session); // curLeader=0, hostPort="localhost:34601" @@ -218,7 +219,7 @@ public void testMetaForwardUnknownPrimary() throws Exception { // into local meta list, and set it to current leader. ClusterManager manager = - new ClusterManager(ClusterOptions.forTest(new String[] {"localhost:34601"})); + new ClusterManager(ClientOptions.builder().metaServers("localhost:34601").build()); MetaSession session = manager.getMetaSession(); MetaSession meta = Mockito.spy(session); // curLeader=0, hostPort="localhost:34601" @@ -268,7 +269,7 @@ public void run() {} @Test public void testDNSResetMetaMaxQueryCount() { ClusterManager manager = - new ClusterManager(ClusterOptions.forTest(new String[] {"localhost:34601"})); + new ClusterManager(ClientOptions.builder().metaServers("localhost:34601").build()); MetaSession metaMock = Mockito.spy(manager.getMetaSession()); List metaList = metaMock.getMetaList(); @@ -303,7 +304,7 @@ public void testDNSResetMetaMaxQueryCount() { public void testDNSMetaUnavailable() { // Ensures when the DNS returns meta all unavailable, finally the query will timeout. ClusterManager manager = - new ClusterManager(ClusterOptions.forTest(new String[] {"localhost:34601"})); + new ClusterManager(ClientOptions.builder().metaServers("localhost:34601").build()); MetaSession metaMock = Mockito.spy(manager.getMetaSession()); List metaList = metaMock.getMetaList(); metaList.remove(0); // del the "localhost:34601" diff --git a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSessionTest.java b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSessionTest.java index a19235b3..14d0bcb6 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSessionTest.java +++ b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSessionTest.java @@ -8,11 +8,11 @@ import com.xiaomi.infra.pegasus.base.error_code; import com.xiaomi.infra.pegasus.base.gpid; import com.xiaomi.infra.pegasus.base.rpc_address; +import com.xiaomi.infra.pegasus.client.ClientOptions; import com.xiaomi.infra.pegasus.client.PegasusClient; import com.xiaomi.infra.pegasus.operator.client_operator; import com.xiaomi.infra.pegasus.operator.rrdb_get_operator; import com.xiaomi.infra.pegasus.operator.rrdb_put_operator; -import com.xiaomi.infra.pegasus.rpc.ClusterOptions; import com.xiaomi.infra.pegasus.rpc.KeyHasher; import com.xiaomi.infra.pegasus.rpc.async.ReplicaSession.ConnState; import com.xiaomi.infra.pegasus.tools.Toollet; @@ -35,13 +35,13 @@ import org.slf4j.Logger; public class ReplicaSessionTest { - private String[] metaList = {"127.0.0.1:34601", "127.0.0.1:34602", "127.0.0.1:34603"}; + private String metaList = "127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603"; private final Logger logger = org.slf4j.LoggerFactory.getLogger(ReplicaSessionTest.class); private ClusterManager manager; @Before public void before() throws Exception { - manager = new ClusterManager(ClusterOptions.forTest(metaList)); + manager = new ClusterManager(ClientOptions.builder().metaServers(metaList).build()); } @After diff --git a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/TableHandlerTest.java b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/TableHandlerTest.java index 81a8bbdc..66a41311 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/TableHandlerTest.java +++ b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/TableHandlerTest.java @@ -6,8 +6,8 @@ import com.xiaomi.infra.pegasus.base.error_code; import com.xiaomi.infra.pegasus.base.error_code.error_types; import com.xiaomi.infra.pegasus.base.rpc_address; +import com.xiaomi.infra.pegasus.client.ClientOptions; import com.xiaomi.infra.pegasus.operator.client_operator; -import com.xiaomi.infra.pegasus.rpc.ClusterOptions; import com.xiaomi.infra.pegasus.rpc.ReplicationException; import com.xiaomi.infra.pegasus.rpc.TableOptions; import com.xiaomi.infra.pegasus.rpc.async.TableHandler.ReplicaConfiguration; @@ -28,14 +28,14 @@ public class TableHandlerTest { private static final Logger logger = org.slf4j.LoggerFactory.getLogger(TableHandlerTest.class); - private String[] addr_list = {"127.0.0.1:34601", "127.0.0.1:34602", "127.0.0.1:34603"}; + private String addr_list = "127.0.0.1:34601, 127.0.0.1:34602, 127.0.0.1:34603"; private String[] replica_servers = {"127.0.0.1:34801", "127.0.0.1:34802", "127.0.01:34803"}; private ClusterManager testManager; @Before public void before() throws Exception { - testManager = new ClusterManager(ClusterOptions.forTest(addr_list)); + testManager = new ClusterManager(ClientOptions.builder().metaServers(addr_list).build()); } @After diff --git a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/TimeoutBenchmark.java b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/TimeoutBenchmark.java index 145274ee..656bb85d 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/TimeoutBenchmark.java +++ b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/TimeoutBenchmark.java @@ -5,7 +5,7 @@ import com.xiaomi.infra.pegasus.base.error_code; import com.xiaomi.infra.pegasus.base.gpid; -import com.xiaomi.infra.pegasus.rpc.ClusterOptions; +import com.xiaomi.infra.pegasus.client.ClientOptions; import com.xiaomi.infra.pegasus.rpc.ReplicationException; import com.xiaomi.infra.pegasus.rpc.TableOptions; import com.xiaomi.infra.pegasus.tools.Toollet; @@ -62,8 +62,9 @@ private void testTimeout( @Test public void timeoutChecker() { - String[] metaList = {"127.0.0.1:34601", "127.0.0.1:34602", "127.0.0.1:34603"}; - ClusterManager manager = new ClusterManager(ClusterOptions.forTest(metaList)); + String metaList = "127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603"; + ClusterManager manager = + new ClusterManager(ClientOptions.builder().metaServers(metaList).build()); TableHandler handle; try { From 0ba352444935986b80cbc1f2b9726ab0fac9a166 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Thu, 23 Jul 2020 20:45:59 +0800 Subject: [PATCH 2/3] fix test --- .../xiaomi/infra/pegasus/rpc/async/MetaSessionTest.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/MetaSessionTest.java b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/MetaSessionTest.java index 49b60ba4..76ad0d66 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/MetaSessionTest.java +++ b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/MetaSessionTest.java @@ -15,6 +15,7 @@ import com.xiaomi.infra.pegasus.replication.query_cfg_response; import com.xiaomi.infra.pegasus.tools.Toollet; import com.xiaomi.infra.pegasus.tools.Tools; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -304,7 +305,11 @@ public void testDNSResetMetaMaxQueryCount() { public void testDNSMetaUnavailable() { // Ensures when the DNS returns meta all unavailable, finally the query will timeout. ClusterManager manager = - new ClusterManager(ClientOptions.builder().metaServers("localhost:34601").build()); + new ClusterManager( + ClientOptions.builder() + .metaServers("localhost:34601") + .metaQueryTimeout(Duration.ofMillis(1000)) + .build()); MetaSession metaMock = Mockito.spy(manager.getMetaSession()); List metaList = metaMock.getMetaList(); metaList.remove(0); // del the "localhost:34601" From 74ded22514eccf879c36cc9161704d3c3e15ff56 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Fri, 24 Jul 2020 14:38:15 +0800 Subject: [PATCH 3/3] fix --- .../java/com/xiaomi/infra/pegasus/client/PegasusClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java index c092aee0..73c56b7e 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java @@ -23,7 +23,7 @@ public class PegasusClient implements PegasusClientInterface { private static final Logger LOGGER = LoggerFactory.getLogger(PegasusClient.class); - private ClientOptions clientOptions; + private final ClientOptions clientOptions; private final ConcurrentHashMap tableMap; private final Object tableMapLock;