Skip to content
This repository has been archived by the owner on May 10, 2022. It is now read-only.

add createClient API with clientOptions #49

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
259 changes: 259 additions & 0 deletions src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
// Copyright (c) 2019, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.
package com.xiaomi.infra.pegasus.client;

import java.time.Duration;

/**
* @author jiashuo1

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

你要想进 apache 的话是不能加 author 的

* <p>This class provides method to create an instance of {@link ClientOptions}.you can use
* <code>
* ClientOptions.create();
* </code>
* <p>to create a new instance of {@link ClientOptions} with default settings, or use such as
* <code>
* ClientOptions.builder().metaServers("127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603")
* * .operationTimeout(Duration.ofMillis(1000)).asyncWorkers(4).enablePerfCounter(false)
* * .falconPerfCounterTags("").falconPushInterval(Duration.ofSeconds(10)).build();
* </code>
* <p>to create an instance {@link ClientOptions} with custom settings.
*/
public class ClientOptions {
neverchanje marked this conversation as resolved.
Show resolved Hide resolved

public static final String DEFAULT_META_SERVERS =
"127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603";
public static final Duration DEFAULT_OPERATION_TIMEOUT = Duration.ofMillis(1000);
public static final int DEFAULT_ASYNC_WORKERS = 4;
public static final boolean DEFAULT_ENABLE_PERF_COUNTER = false;
public static final String DEFAULT_FALCON_PERF_COUNTER_TAGS = "";
public static final Duration DEFAULT_FALCON_PUSH_INTERVAL = Duration.ofSeconds(10);

private final String metaServers;
private final Duration operationTimeout;
private final int asyncWorkers;
private final boolean enablePerfCounter;
private final String falconPerfCounterTags;
private final Duration falconPushInterval;

protected ClientOptions(Builder builder) {
this.metaServers = builder.metaServers;
this.operationTimeout = builder.operationTimeout;
this.asyncWorkers = builder.asyncWorkers;
this.enablePerfCounter = builder.enablePerfCounter;
this.falconPerfCounterTags = builder.falconPerfCounterTags;
this.falconPushInterval = builder.falconPushInterval;
}

protected ClientOptions(ClientOptions original) {
this.metaServers = original.getMetaServers();
this.operationTimeout = original.getOperationTimeout();
this.asyncWorkers = original.getAsyncWorkers();
this.enablePerfCounter = original.isEnablePerfCounter();
this.falconPerfCounterTags = original.getFalconPerfCounterTags();
this.falconPushInterval = original.getFalconPushInterval();
}

/**
* Create a copy of {@literal options}
*
* @param options the original
* @return A new instance of {@link ClientOptions} containing the values of {@literal options}
*/
public static ClientOptions copyOf(ClientOptions options) {
return new ClientOptions(options);
}

/**
* Returns a new {@link ClientOptions.Builder} to construct {@link ClientOptions}.
*
* @return a new {@link ClientOptions.Builder} to construct {@link ClientOptions}.
*/
public static ClientOptions.Builder builder() {
return new ClientOptions.Builder();
}

/**
* Create a new instance of {@link ClientOptions} with default settings.
*
* @return a new instance of {@link ClientOptions} with default settings
*/
public static ClientOptions create() {
return builder().build();
}

@Override
public boolean equals(Object options) {
if (this == options) {
return true;
}
if (options instanceof ClientOptions) {
ClientOptions clientOptions = (ClientOptions) options;
if (this.metaServers.equals(clientOptions.metaServers)
&& this.operationTimeout == clientOptions.operationTimeout
&& this.asyncWorkers == clientOptions.asyncWorkers
&& this.enablePerfCounter == clientOptions.enablePerfCounter
&& this.falconPushInterval == clientOptions.falconPushInterval) {
return true;
}
}
return false;
}

@Override
public String toString() {
return "ClientOptions{"
+ "metaServers='"
+ metaServers
+ '\''
+ ", operationTimeout(ms)="
+ operationTimeout.toMillis()
+ ", asyncWorkers="
+ asyncWorkers
+ ", enablePerfCounter="
+ enablePerfCounter
+ ", falconPerfCounterTags='"
+ falconPerfCounterTags
+ '\''
+ ", falconPushInterval(s)="
+ falconPushInterval.getSeconds()
+ '}';
}

public static class Builder {
private String metaServers = DEFAULT_META_SERVERS;
private Duration operationTimeout = DEFAULT_OPERATION_TIMEOUT;
private int asyncWorkers = DEFAULT_ASYNC_WORKERS;
private boolean enablePerfCounter = DEFAULT_ENABLE_PERF_COUNTER;
private String falconPerfCounterTags = DEFAULT_FALCON_PERF_COUNTER_TAGS;
private Duration falconPushInterval = DEFAULT_FALCON_PUSH_INTERVAL;

protected Builder() {}

/**
* the list of meta server addresses, separated by commas, See {@link #DEFAULT_META_SERVERS}
*
* @param metaServers required field,must be the right meta_servers
* @return {@code this}
*/
public Builder metaServers(String metaServers) {
this.metaServers = metaServers;
return this;
}

/**
* The timeout for failing to finish an operation,Defaults to {@literal 1000ms}, see {@link
* #DEFAULT_OPERATION_TIMEOUT}
*
* @param operationTimeout operationTimeout
* @return {@code this}
*/
public Builder operationTimeout(Duration operationTimeout) {
this.operationTimeout = operationTimeout;
return this;
}

/**
* The number of background worker threads. Internally it is the number of Netty NIO threads for
* handling RPC events between client and Replica Servers.Defaults to {@literal 4},see {@link
* #DEFAULT_ASYNC_WORKERS}
*
* @param asyncWorkers asyncWorkers thread number
* @return {@code this}
*/
public Builder asyncWorkers(int asyncWorkers) {
this.asyncWorkers = asyncWorkers;
return this;
}

/**
* Whether to enable performance statistics. If true, the client will periodically report
* metrics to local falcon agent (currently we only support falcon as monitoring
* system).Defaults to {@literal true},see {@link #DEFAULT_ENABLE_PERF_COUNTER}
*
* @param enablePerfCounter enablePerfCounter
* @return {@code this}
*/
public Builder enablePerfCounter(boolean enablePerfCounter) {
this.enablePerfCounter = enablePerfCounter;
return this;
}

/**
* Additional tags for falcon metrics. For example:
* "cluster=c3srv-ad,job=recommend-service-history",Default is empty string,see {@link
* #DEFAULT_FALCON_PERF_COUNTER_TAGS}
*
* @param falconPerfCounterTags falconPerfCounterTags
* @return {@code this}
*/
public Builder falconPerfCounterTags(String falconPerfCounterTags) {
this.falconPerfCounterTags = falconPerfCounterTags;
return this;
}

/**
* The interval to report metrics to local falcon agent,Defaults to {@literal 10s},see {@link
* #DEFAULT_FALCON_PUSH_INTERVAL}
*
* @param falconPushInterval falconPushInterval
* @return {@code this}
*/
public Builder falconPushInterval(Duration falconPushInterval) {
this.falconPushInterval = falconPushInterval;
return this;
}

/**
* Create a new instance of {@link ClientOptions}.
*
* @return new instance of {@link ClientOptions}
*/
public ClientOptions build() {
return new ClientOptions(this);
}
}

/**
* Returns a builder to create new {@link ClientOptions} whose settings are replicated from the
* current {@link ClientOptions}.
*
* @return a {@link ClientOptions.Builder} to create new {@link ClientOptions} whose settings are
* replicated from the current {@link ClientOptions}.
*/
public ClientOptions.Builder mutate() {
Builder builder = new Builder();
builder
.metaServers(getMetaServers())
.operationTimeout(getOperationTimeout())
.asyncWorkers(getAsyncWorkers())
.enablePerfCounter(isEnablePerfCounter())
.falconPerfCounterTags(getFalconPerfCounterTags())
.falconPushInterval(getFalconPushInterval());
return builder;
}

public String getMetaServers() {
return metaServers;
}

public Duration getOperationTimeout() {
return operationTimeout;
}

public int getAsyncWorkers() {
return asyncWorkers;
}

public boolean isEnablePerfCounter() {
return enablePerfCounter;
}

public String getFalconPerfCounterTags() {
return falconPerfCounterTags;
}

public Duration getFalconPushInterval() {
return falconPushInterval;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// can be found in the LICENSE file in the root directory of this source tree.
package com.xiaomi.infra.pegasus.client;

import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -17,12 +18,27 @@ public class PegasusClientFactory {
private static String singletonClientConfigPath = null;
private static Object singletonClientLock = new Object();

private static ClientOptions singletonClientOptions = null;

// Create a client instance.
// After used, should call client.close() to release resource.
public static PegasusClientInterface createClient(String configPath) throws PException {
return new PegasusClient(configPath);
}

public static PegasusClientInterface createClient(ClientOptions options) throws PException {
Properties pegasusConfig = new Properties();
pegasusConfig.setProperty("meta_servers", options.getMetaServers());
pegasusConfig.setProperty(
"operation_timeout", String.valueOf(options.getOperationTimeout().toMillis()));
pegasusConfig.setProperty("async_workers", String.valueOf(options.getAsyncWorkers()));
pegasusConfig.setProperty("enable_perf_counter", String.valueOf(options.isEnablePerfCounter()));
pegasusConfig.setProperty("perf_counter_tags", String.valueOf(options.isEnablePerfCounter()));
pegasusConfig.setProperty(
"push_counter_interval_secs", String.valueOf(options.getFalconPushInterval().getSeconds()));
return new PegasusClient(pegasusConfig);
}

// Get the singleton client instance with default config path of "resource:///pegasus.properties".
public static PegasusClientInterface getSingletonClient() throws PException {
return getSingletonClient("resource:///pegasus.properties");
Expand Down Expand Up @@ -58,6 +74,29 @@ public static PegasusClientInterface getSingletonClient(String configPath) throw
}
}

public static PegasusClientInterface getSingletonClient(ClientOptions options) throws PException {
synchronized (singletonClientLock) {
if (singletonClient == null) {
try {
singletonClient = (PegasusClient) createClient(options);
singletonClientOptions = options;
LOGGER.info("Create Singleton PegasusClient with options \"" + options.toString() + "\"");
} catch (Throwable e) {
throw new PException("Create Singleton PegasusClient Failed", e);
}
} else if (!singletonClientOptions.equals(options)) {
LOGGER.error(
"Singleton PegasusClient options Conflict: \""
+ options.toString()
+ "\" VS \""
+ singletonClientOptions.toString()
+ "\"");
throw new PException("Singleton PegasusClient options Conflict");
}
return singletonClient;
}
}

// Close the singleton client instance.
public static void closeSingletonClient() throws PException {
synchronized (singletonClientLock) {
Expand Down
Loading