Skip to content
This repository has been archived by the owner on May 10, 2022. It is now read-only.

refactor: delete duplicate config and use ClientOptions uniformly #117

Merged
merged 6 commits into from
Aug 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@
<artifactId>metrics-core</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-configuration2</artifactId>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this dep be shaded?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but If it hasn't compatibility problems, it maybe no need to be shaded, otherwise, should all dependencys be shaded?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid so. The deps for non-testing code generally need to be shaded in case of dep conflict.
But given the apache.common.configuration2 is not much used here, we can leave it unshaded for now.

<version>2.7</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down
55 changes: 55 additions & 0 deletions src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
// can be found in the LICENSE file in the root directory of this source tree.
package com.xiaomi.infra.pegasus.client;

import static com.xiaomi.infra.pegasus.client.PConfigUtil.loadConfiguration;

import java.time.Duration;
import org.apache.commons.configuration2.Configuration;
import org.apache.commons.configuration2.ConfigurationConverter;

/**
* Client Options to control the behavior of {@link PegasusClientInterface}.
Expand Down Expand Up @@ -31,6 +35,16 @@
*/
public class ClientOptions {

public static final int MIN_SOCK_CONNECT_TIMEOUT = 1000;

public static final String PEGASUS_META_SERVERS_KEY = "meta_servers";
public static final String PEGASUS_OPERATION_TIMEOUT_KEY = "operation_timeout";
public static final String PEGASUS_ASYNC_WORKERS_KEY = "async_workers";
public static final String PEGASUS_ENABLE_PERF_COUNTER_KEY = "enable_perf_counter";
public static final String PEGASUS_PERF_COUNTER_TAGS_KEY = "perf_counter_tags";
public static final String PEGASUS_PUSH_COUNTER_INTERVAL_SECS_KEY = "push_counter_interval_secs";
public static final String PEGASUS_META_QUERY_TIMEOUT_KEY = "meta_query_timeout";

public static final String DEFAULT_META_SERVERS =
"127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603";
public static final Duration DEFAULT_OPERATION_TIMEOUT = Duration.ofMillis(1000);
Expand Down Expand Up @@ -100,6 +114,47 @@ public static ClientOptions create() {
return builder().build();
}

public static ClientOptions create(String configPath) throws PException {
Configuration config = ConfigurationConverter.getConfiguration(loadConfiguration(configPath));

String metaList = config.getString(PEGASUS_META_SERVERS_KEY);
if (metaList == null) {
throw new IllegalArgumentException("no property set: " + PEGASUS_META_SERVERS_KEY);
}
metaList = metaList.trim();
if (metaList.isEmpty()) {
throw new IllegalArgumentException("invalid property: " + PEGASUS_META_SERVERS_KEY);
}

int asyncWorkers = config.getInt(PEGASUS_ASYNC_WORKERS_KEY, DEFAULT_ASYNC_WORKERS);
boolean enablePerfCounter =
config.getBoolean(PEGASUS_ENABLE_PERF_COUNTER_KEY, DEFAULT_ENABLE_PERF_COUNTER);
String perfCounterTags =
enablePerfCounter
? config.getString(PEGASUS_PERF_COUNTER_TAGS_KEY, DEFAULT_FALCON_PERF_COUNTER_TAGS)
: null;
Duration pushIntervalSecs =
Duration.ofSeconds(
config.getLong(
PEGASUS_PUSH_COUNTER_INTERVAL_SECS_KEY, DEFAULT_FALCON_PUSH_INTERVAL.getSeconds()));
Duration operationTimeOut =
Duration.ofMillis(
config.getLong(PEGASUS_OPERATION_TIMEOUT_KEY, DEFAULT_OPERATION_TIMEOUT.toMillis()));
Duration metaQueryTimeout =
Duration.ofMillis(
config.getLong(PEGASUS_META_QUERY_TIMEOUT_KEY, DEFAULT_META_QUERY_TIMEOUT.toMillis()));

return ClientOptions.builder()
.metaServers(metaList)
.operationTimeout(operationTimeOut)
.asyncWorkers(asyncWorkers)
.enablePerfCounter(enablePerfCounter)
.falconPerfCounterTags(perfCounterTags)
.falconPushInterval(pushIntervalSecs)
.metaQueryTimeout(metaQueryTimeout)
.build();
}

@Override
public boolean equals(Object options) {
if (this == options) {
Expand Down
45 changes: 10 additions & 35 deletions src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.tuple.ImmutablePair;
Expand All @@ -25,14 +23,10 @@
public class PegasusClient implements PegasusClientInterface {
private static final Logger LOGGER = LoggerFactory.getLogger(PegasusClient.class);

public static final String PEGASUS_ENABLE_WRITE_LIMIT = "enable_write_limit";
public static final String PEGASUS_ENABLE_WRITE_LIMIT_DEF = "true";
private final ClientOptions clientOptions;

private boolean enableWriteLimit;
private final Properties config;
private final ConcurrentHashMap<String, PegasusTable> tableMap;
private final Object tableMapLock;
private final String[] metaList;
private Cluster cluster;

private static class PegasusHasher implements KeyHasher {
Expand Down Expand Up @@ -72,36 +66,28 @@ private PegasusTable getTable(String tableName, int backupRequestDelayMs) throws
return table;
}

// pegasus client configuration keys
public static final String[] PEGASUS_CLIENT_CONFIG_KEYS =
ArrayUtils.add(ClusterOptions.allKeys(), PEGASUS_ENABLE_WRITE_LIMIT);

// configPath could be:
// - zk path: zk://host1:port1,host2:port2,host3:port3/path/to/config
// - local file path: file:///path/to/config
// - resource path: resource:///path/to/config
public PegasusClient(String configPath) throws PException {
this(PConfigUtil.loadConfiguration(configPath));
this(ClientOptions.create(configPath));
}

public PegasusClient(Properties config) throws PException {
this.config = config;
this.cluster = Cluster.createCluster(config);
public PegasusClient(ClientOptions clientOptions) throws PException {
this.clientOptions = clientOptions;
this.cluster = Cluster.createCluster(clientOptions);
this.tableMap = new ConcurrentHashMap<String, PegasusTable>();
this.tableMapLock = new Object();
this.metaList = cluster.getMetaList();
this.enableWriteLimit =
Boolean.parseBoolean(
config.getProperty(PEGASUS_ENABLE_WRITE_LIMIT, PEGASUS_ENABLE_WRITE_LIMIT_DEF));
LOGGER.info(getConfigurationString());
}

public boolean isWriteLimitEnabled() {
return enableWriteLimit;
return clientOptions.isWriteLimitEnabled();
}

String getMetaList() {
return Arrays.toString(metaList);
return clientOptions.getMetaServers();
}

@Override
Expand Down Expand Up @@ -181,18 +167,7 @@ public static int bytesCompare(byte[] left, byte[] right) {
}

public String getConfigurationString() {
String configString = "PegasusClient Configuration:\n";
if (this.config == null) {
return configString;
}
for (int i = 0; i < PEGASUS_CLIENT_CONFIG_KEYS.length; ++i) {
configString +=
(PEGASUS_CLIENT_CONFIG_KEYS[i]
+ "="
+ this.config.getProperty(PEGASUS_CLIENT_CONFIG_KEYS[i], "")
+ "\n");
}
return configString;
return clientOptions.toString();
}

@Override
Expand Down Expand Up @@ -220,8 +195,8 @@ public PegasusTableInterface openTable(String tableName, int backupRequestDelayM
}

@Override
public Properties getConfiguration() {
return config;
public ClientOptions getConfiguration() {
return clientOptions;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
// can be found in the LICENSE file in the root directory of this source tree.
package com.xiaomi.infra.pegasus.client;

import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -40,24 +39,12 @@ public static PegasusClientInterface createClient(String configPath) throws PExc
* Create a client instance instance with {@link ClientOptions}. After used, should call
* client.close() to release resource.
*
* @param options The client option
* @param clientOptions The client option
* @return PegasusClientInterface {@link PegasusClientInterface}
* @throws PException throws exception if any error occurs.
*/
public static PegasusClientInterface createClient(ClientOptions options) throws PException {
Properties pegasusConfig = new Properties();
pegasusConfig.setProperty("meta_servers", options.getMetaServers());
pegasusConfig.setProperty(
"operation_timeout", String.valueOf(options.getOperationTimeout().toMillis()));
pegasusConfig.setProperty("async_workers", String.valueOf(options.getAsyncWorkers()));
pegasusConfig.setProperty("enable_perf_counter", String.valueOf(options.isEnablePerfCounter()));
pegasusConfig.setProperty("perf_counter_tags", String.valueOf(options.isEnablePerfCounter()));
pegasusConfig.setProperty(
"push_counter_interval_secs", String.valueOf(options.getFalconPushInterval().getSeconds()));
pegasusConfig.setProperty("enable_write_limit", String.valueOf(options.isWriteLimitEnabled()));
pegasusConfig.setProperty(
"meta_query_timeout", String.valueOf(options.getMetaQueryTimeout().toMillis()));
return new PegasusClient(pegasusConfig);
public static PegasusClientInterface createClient(ClientOptions clientOptions) throws PException {
return new PegasusClient(clientOptions);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public interface PegasusClientInterface {
*
* @return config
*/
public Properties getConfiguration();
public ClientOptions getConfiguration();

/** Close the client. The client can not be used again after closed. */
public void close();
Expand Down
8 changes: 5 additions & 3 deletions src/main/java/com/xiaomi/infra/pegasus/rpc/Cluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@
// can be found in the LICENSE file in the root directory of this source tree.
package com.xiaomi.infra.pegasus.rpc;

import com.xiaomi.infra.pegasus.client.ClientOptions;
import com.xiaomi.infra.pegasus.client.PException;
import com.xiaomi.infra.pegasus.rpc.async.ClusterManager;
import java.util.Properties;
import org.apache.thrift.TException;

public abstract class Cluster {

public static Cluster createCluster(Properties config) throws IllegalArgumentException {
return new ClusterManager(ClusterOptions.create(config));
public static Cluster createCluster(ClientOptions clientOptions)
throws IllegalArgumentException, PException {
return new ClusterManager(clientOptions);
}

public abstract String[] getMetaList();
Expand Down
141 changes: 0 additions & 141 deletions src/main/java/com/xiaomi/infra/pegasus/rpc/ClusterOptions.java

This file was deleted.

Loading