diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java b/src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java new file mode 100644 index 00000000..0ed5daa3 --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java @@ -0,0 +1,301 @@ +// Copyright (c) 2019, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. +package com.xiaomi.infra.pegasus.client; + +import java.time.Duration; + +/** + * Client Options to control the behavior of {@link PegasusClientInterface}. + * + *
To create a new instance with default settings: + * + *
{@code + * ClientOptions.create(); + * }+ * + * To customize the settings: + * + *
{@code + * ClientOptions opts = + * ClientOptions.builder() + * .metaServers("127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603") + * .operationTimeout(Duration.ofMillis(1000)) + * .asyncWorkers(4) + * .enablePerfCounter(false) + * .falconPerfCounterTags("") + * .falconPushInterval(Duration.ofSeconds(10)) + * .build(); + * }+ */ +public class ClientOptions { + + public static final String DEFAULT_META_SERVERS = + "127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603"; + public static final Duration DEFAULT_OPERATION_TIMEOUT = Duration.ofMillis(1000); + public static final int DEFAULT_ASYNC_WORKERS = 4; + public static final boolean DEFAULT_ENABLE_PERF_COUNTER = false; + public static final String DEFAULT_FALCON_PERF_COUNTER_TAGS = ""; + public static final Duration DEFAULT_FALCON_PUSH_INTERVAL = Duration.ofSeconds(10); + + private final String metaServers; + private final Duration operationTimeout; + private final int asyncWorkers; + private final boolean enablePerfCounter; + private final String falconPerfCounterTags; + private final Duration falconPushInterval; + + protected ClientOptions(Builder builder) { + this.metaServers = builder.metaServers; + this.operationTimeout = builder.operationTimeout; + this.asyncWorkers = builder.asyncWorkers; + this.enablePerfCounter = builder.enablePerfCounter; + this.falconPerfCounterTags = builder.falconPerfCounterTags; + this.falconPushInterval = builder.falconPushInterval; + } + + protected ClientOptions(ClientOptions original) { + this.metaServers = original.getMetaServers(); + this.operationTimeout = original.getOperationTimeout(); + this.asyncWorkers = original.getAsyncWorkers(); + this.enablePerfCounter = original.isEnablePerfCounter(); + this.falconPerfCounterTags = original.getFalconPerfCounterTags(); + this.falconPushInterval = original.getFalconPushInterval(); + } + + /** + * Create a copy of {@literal options} + * + * @param options the original + * @return A new instance of {@link ClientOptions} containing the values of {@literal options} + */ + public static ClientOptions copyOf(ClientOptions options) { + return new ClientOptions(options); + } + + /** + * Returns a new {@link ClientOptions.Builder} to construct {@link ClientOptions}. + * + * @return a new {@link ClientOptions.Builder} to construct {@link ClientOptions}. + */ + public static ClientOptions.Builder builder() { + return new ClientOptions.Builder(); + } + + /** + * Create a new instance of {@link ClientOptions} with default settings. + * + * @return a new instance of {@link ClientOptions} with default settings + */ + public static ClientOptions create() { + return builder().build(); + } + + @Override + public boolean equals(Object options) { + if (this == options) { + return true; + } + if (options instanceof ClientOptions) { + ClientOptions clientOptions = (ClientOptions) options; + return this.metaServers.equals(clientOptions.metaServers) + && this.operationTimeout.toMillis() == clientOptions.operationTimeout.toMillis() + && this.asyncWorkers == clientOptions.asyncWorkers + && this.enablePerfCounter == clientOptions.enablePerfCounter + && this.falconPerfCounterTags.equals(clientOptions.falconPerfCounterTags) + && this.falconPushInterval.toMillis() == clientOptions.falconPushInterval.toMillis(); + } + return false; + } + + @Override + public String toString() { + return "ClientOptions{" + + "metaServers='" + + metaServers + + '\'' + + ", operationTimeout(ms)=" + + operationTimeout.toMillis() + + ", asyncWorkers=" + + asyncWorkers + + ", enablePerfCounter=" + + enablePerfCounter + + ", falconPerfCounterTags='" + + falconPerfCounterTags + + '\'' + + ", falconPushInterval(s)=" + + falconPushInterval.getSeconds() + + '}'; + } + + /** Builder for {@link ClientOptions}. */ + public static class Builder { + private String metaServers = DEFAULT_META_SERVERS; + private Duration operationTimeout = DEFAULT_OPERATION_TIMEOUT; + private int asyncWorkers = DEFAULT_ASYNC_WORKERS; + private boolean enablePerfCounter = DEFAULT_ENABLE_PERF_COUNTER; + private String falconPerfCounterTags = DEFAULT_FALCON_PERF_COUNTER_TAGS; + private Duration falconPushInterval = DEFAULT_FALCON_PUSH_INTERVAL; + + protected Builder() {} + + /** + * The list of meta server addresses, separated by commas, See {@link #DEFAULT_META_SERVERS}. + * + * @param metaServers must not be {@literal null} or empty. + * @return {@code this} + */ + public Builder metaServers(String metaServers) { + this.metaServers = metaServers; + return this; + } + + /** + * The timeout for failing to finish an operation. Defaults to {@literal 1000ms}, see {@link + * #DEFAULT_OPERATION_TIMEOUT}. + * + * @param operationTimeout operationTimeout + * @return {@code this} + */ + public Builder operationTimeout(Duration operationTimeout) { + this.operationTimeout = operationTimeout; + return this; + } + + /** + * The number of background worker threads. Internally it is the number of Netty NIO threads for + * handling RPC events between client and Replica Servers. Defaults to {@literal 4}, see {@link + * #DEFAULT_ASYNC_WORKERS}. + * + * @param asyncWorkers asyncWorkers thread number + * @return {@code this} + */ + public Builder asyncWorkers(int asyncWorkers) { + this.asyncWorkers = asyncWorkers; + return this; + } + + /** + * Whether to enable performance statistics. If true, the client will periodically report + * metrics to local falcon agent (currently we only support falcon as monitoring system). + * Defaults to {@literal false}, see {@link #DEFAULT_ENABLE_PERF_COUNTER}. + * + * @param enablePerfCounter enablePerfCounter + * @return {@code this} + */ + public Builder enablePerfCounter(boolean enablePerfCounter) { + this.enablePerfCounter = enablePerfCounter; + return this; + } + + /** + * Additional tags for falcon metrics. For example: + * "cluster=c3srv-ad,job=recommend-service-history". Defaults to empty string, see {@link + * #DEFAULT_FALCON_PERF_COUNTER_TAGS}. + * + * @param falconPerfCounterTags falconPerfCounterTags + * @return {@code this} + */ + public Builder falconPerfCounterTags(String falconPerfCounterTags) { + this.falconPerfCounterTags = falconPerfCounterTags; + return this; + } + + /** + * The interval to report metrics to local falcon agent. Defaults to {@literal 10s}, see {@link + * #DEFAULT_FALCON_PUSH_INTERVAL}. + * + * @param falconPushInterval falconPushInterval + * @return {@code this} + */ + public Builder falconPushInterval(Duration falconPushInterval) { + this.falconPushInterval = falconPushInterval; + return this; + } + + /** + * Create a new instance of {@link ClientOptions}. + * + * @return new instance of {@link ClientOptions}. + */ + public ClientOptions build() { + return new ClientOptions(this); + } + } + + /** + * Returns a builder to create new {@link ClientOptions} whose settings are replicated from the + * current {@link ClientOptions}. + * + * @return a {@link ClientOptions.Builder} to create new {@link ClientOptions} whose settings are + * replicated from the current {@link ClientOptions}. + */ + public ClientOptions.Builder mutate() { + Builder builder = new Builder(); + builder + .metaServers(getMetaServers()) + .operationTimeout(getOperationTimeout()) + .asyncWorkers(getAsyncWorkers()) + .enablePerfCounter(isEnablePerfCounter()) + .falconPerfCounterTags(getFalconPerfCounterTags()) + .falconPushInterval(getFalconPushInterval()); + return builder; + } + + /** + * The list of meta server addresses, separated by commas. + * + * @return the list of meta server addresses. + */ + public String getMetaServers() { + return metaServers; + } + + /** + * The timeout for failing to finish an operation. Defaults to {@literal 1000ms}. + * + * @return the timeout for failing to finish an operation. + */ + public Duration getOperationTimeout() { + return operationTimeout; + } + + /** + * The number of background worker threads. Internally it is the number of Netty NIO threads for + * handling RPC events between client and Replica Servers. Defaults to {@literal 4}. + * + * @return The number of background worker threads. + */ + public int getAsyncWorkers() { + return asyncWorkers; + } + + /** + * Whether to enable performance statistics. If true, the client will periodically report metrics + * to local falcon agent (currently we only support falcon as monitoring system). Defaults to + * {@literal false}. + * + * @return whether to enable performance statistics. + */ + public boolean isEnablePerfCounter() { + return enablePerfCounter; + } + + /** + * Additional tags for falcon metrics. Defaults to empty string. + * + * @return additional tags for falcon metrics. + */ + public String getFalconPerfCounterTags() { + return falconPerfCounterTags; + } + + /** + * The interval to report metrics to local falcon agent. Defaults to {@literal 10s}. + * + * @return the interval to report metrics to local falcon agent. + */ + public Duration getFalconPushInterval() { + return falconPushInterval; + } +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientFactory.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientFactory.java index 98fef09f..65e5e55a 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientFactory.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientFactory.java @@ -3,6 +3,7 @@ // can be found in the LICENSE file in the root directory of this source tree. package com.xiaomi.infra.pegasus.client; +import java.util.Properties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -17,12 +18,27 @@ public class PegasusClientFactory { private static String singletonClientConfigPath = null; private static Object singletonClientLock = new Object(); + private static ClientOptions singletonClientOptions = null; + // Create a client instance. // After used, should call client.close() to release resource. public static PegasusClientInterface createClient(String configPath) throws PException { return new PegasusClient(configPath); } + public static PegasusClientInterface createClient(ClientOptions options) throws PException { + Properties pegasusConfig = new Properties(); + pegasusConfig.setProperty("meta_servers", options.getMetaServers()); + pegasusConfig.setProperty( + "operation_timeout", String.valueOf(options.getOperationTimeout().toMillis())); + pegasusConfig.setProperty("async_workers", String.valueOf(options.getAsyncWorkers())); + pegasusConfig.setProperty("enable_perf_counter", String.valueOf(options.isEnablePerfCounter())); + pegasusConfig.setProperty("perf_counter_tags", String.valueOf(options.isEnablePerfCounter())); + pegasusConfig.setProperty( + "push_counter_interval_secs", String.valueOf(options.getFalconPushInterval().getSeconds())); + return new PegasusClient(pegasusConfig); + } + // Get the singleton client instance with default config path of "resource:///pegasus.properties". public static PegasusClientInterface getSingletonClient() throws PException { return getSingletonClient("resource:///pegasus.properties"); @@ -58,6 +74,29 @@ public static PegasusClientInterface getSingletonClient(String configPath) throw } } + public static PegasusClientInterface getSingletonClient(ClientOptions options) throws PException { + synchronized (singletonClientLock) { + if (singletonClient == null) { + try { + singletonClient = (PegasusClient) createClient(options); + singletonClientOptions = options; + LOGGER.info("Create Singleton PegasusClient with options \"" + options.toString() + "\""); + } catch (Throwable e) { + throw new PException("Create Singleton PegasusClient Failed", e); + } + } else if (!singletonClientOptions.equals(options)) { + LOGGER.error( + "Singleton PegasusClient options Conflict: \"" + + options.toString() + + "\" VS \"" + + singletonClientOptions.toString() + + "\""); + throw new PException("Singleton PegasusClient options Conflict"); + } + return singletonClient; + } + } + // Close the singleton client instance. public static void closeSingletonClient() throws PException { synchronized (singletonClientLock) { diff --git a/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java b/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java index 969935dd..206e2fda 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java +++ b/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java @@ -2250,4 +2250,111 @@ public void fullScanWithFilter() throws PException { PegasusClientFactory.closeSingletonClient(); } + + @Test + public void createClient() throws PException { + System.out.println("test createClient with clientOptions"); + ClientOptions clientOptions = ClientOptions.create(); + byte[] value = null; + + // test createClient(clientOptions) + PegasusClientInterface client = null; + try { + client = PegasusClientFactory.createClient(clientOptions); + client.set( + "temp", + "createClient".getBytes(), + "createClient_0".getBytes(), + "createClient_0".getBytes()); + value = client.get("temp", "createClient".getBytes(), "createClient_0".getBytes()); + } catch (Exception e) { + e.printStackTrace(); + Assert.assertTrue(false); + } + + Assert.assertTrue(new String(value).equals("createClient_0")); + + // test getSingletonClient(ClientOptions options) + PegasusClientInterface singletonClient = null; + try { + singletonClient = PegasusClientFactory.getSingletonClient(clientOptions); + singletonClient.set( + "temp", + "getSingletonClient".getBytes(), + "createClient_1".getBytes(), + "createClient_1".getBytes()); + value = + singletonClient.get("temp", "getSingletonClient".getBytes(), "createClient_1".getBytes()); + } catch (Exception e) { + e.printStackTrace(); + Assert.assertTrue(false); + } + + Assert.assertTrue(new String(value).equals("createClient_1")); + + // test getSingletonClient(ClientOptions options) --> same clientOptions + PegasusClientInterface singletonClient1 = null; + try { + singletonClient1 = PegasusClientFactory.getSingletonClient(clientOptions); + singletonClient1.set( + "temp", + "getSingletonClient".getBytes(), + "createClient_2".getBytes(), + "createClient_2".getBytes()); + value = + singletonClient1.get( + "temp", "getSingletonClient".getBytes(), "createClient_2".getBytes()); + } catch (Exception e) { + e.printStackTrace(); + Assert.assertTrue(false); + } + + Assert.assertTrue(new String(value).equals("createClient_2")); + Assert.assertTrue(singletonClient1 == singletonClient); + + // test getSingletonClient(ClientOptions options) --> different clientOptions,but values of + // clientOptions is same + ClientOptions clientOptions1 = ClientOptions.create(); + try { + singletonClient1 = PegasusClientFactory.getSingletonClient(clientOptions1); + singletonClient1.set( + "temp", + "getSingletonClient".getBytes(), + "createClient_3".getBytes(), + "createClient_3".getBytes()); + value = + singletonClient1.get( + "temp", "getSingletonClient".getBytes(), "createClient_3".getBytes()); + } catch (Exception e) { + e.printStackTrace(); + Assert.assertTrue(false); + } + + Assert.assertTrue(new String(value).equals("createClient_3")); + Assert.assertTrue(singletonClient1 == singletonClient); + + // test getSingletonClient(ClientOptions options) --> different clientOptions,and values of + // clientOptions is different + ClientOptions clientOptions2 = + ClientOptions.builder() + .metaServers("127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603") + .asyncWorkers(5) // default value is 4,this set different value + .build(); + try { + singletonClient1 = PegasusClientFactory.getSingletonClient(clientOptions2); + singletonClient1.set( + "temp", + "getSingletonClient".getBytes(), + "createClient_4".getBytes(), + "createClient_4".getBytes()); + value = + singletonClient1.get( + "temp", "getSingletonClient".getBytes(), "createClient_4".getBytes()); + } catch (Exception e) { + // if values of clientOptions is different,the code's right logic is "throw exception" + Assert.assertTrue(true); + } + + PegasusClientFactory.closeSingletonClient(); + } }