Skip to content

Commit

Permalink
[improve][cli] PIP-343: Use picocli instead of jcommander in pulsar-f…
Browse files Browse the repository at this point in the history
…unction (apache#22331)

Signed-off-by: Zixuan Liu <[email protected]>
  • Loading branch information
nodece authored Mar 25, 2024
1 parent afe4261 commit 567174f
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 115 deletions.
4 changes: 2 additions & 2 deletions pulsar-functions/instance/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@
</dependency>

<dependency>
<groupId>com.beust</groupId>
<artifactId>jcommander</artifactId>
<groupId>info.picocli</groupId>
<artifactId>picocli</artifactId>
</dependency>

<dependency>
Expand Down
6 changes: 3 additions & 3 deletions pulsar-functions/localrun-shaded/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@
<include>org.rocksdb:*</include>
<include>org.eclipse.jetty*:*</include>
<include>org.apache.avro:avro</include>
<include>com.beust:*</include>
<include>info.picocli:*</include>
<include>net.jodah:*</include>
<include>io.airlift:*</include>
<include>com.yahoo.datasketches:*</include>
Expand Down Expand Up @@ -385,8 +385,8 @@
<shadedPattern>org.apache.pulsar.shaded.com.yahoo.sketches</shadedPattern>
</relocation>
<relocation>
<pattern>com.beust</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.beust</shadedPattern>
<pattern>info.picocli</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.info.picocli</shadedPattern>
</relocation>
<!-- Netty cannot be shaded, this is causing java.lang.NoSuchMethodError -->
<relocation>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@

import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import static org.apache.pulsar.common.functions.Utils.inferMissingArguments;
import com.beust.jcommander.IStringConverter;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonParser;
Expand Down Expand Up @@ -87,6 +84,10 @@
import org.apache.pulsar.functions.utils.functions.FunctionUtils;
import org.apache.pulsar.functions.utils.io.Connector;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
import picocli.CommandLine;
import picocli.CommandLine.ITypeConverter;
import picocli.CommandLine.Option;
import picocli.CommandLine.TypeConversionException;

@Slf4j
public class LocalRunner implements AutoCloseable {
Expand Down Expand Up @@ -115,95 +116,95 @@ private static class UserCodeClassLoader {
boolean classLoaderCreated;
}

public static class FunctionConfigConverter implements IStringConverter<FunctionConfig> {
public static class FunctionConfigConverter implements ITypeConverter<FunctionConfig> {
@Override
public FunctionConfig convert(String value) {
try {
return ObjectMapperFactory.getMapper().reader().readValue(value, FunctionConfig.class);
} catch (IOException e) {
throw new RuntimeException("Failed to parse function config:", e);
throw new TypeConversionException(e.getMessage());
}
}
}

public static class SourceConfigConverter implements IStringConverter<SourceConfig> {
public static class SourceConfigConverter implements ITypeConverter<SourceConfig> {
@Override
public SourceConfig convert(String value) {
try {
return ObjectMapperFactory.getMapper().reader().readValue(value, SourceConfig.class);
} catch (IOException e) {
throw new RuntimeException("Failed to parse source config:", e);
throw new TypeConversionException(e.getMessage());
}
}
}

public static class SinkConfigConverter implements IStringConverter<SinkConfig> {
public static class SinkConfigConverter implements ITypeConverter<SinkConfig> {
@Override
public SinkConfig convert(String value) {
try {
return ObjectMapperFactory.getMapper().reader().readValue(value, SinkConfig.class);
} catch (IOException e) {
throw new RuntimeException("Failed to parse sink config:", e);
throw new TypeConversionException(e.getMessage());
}
}
}

public static class RuntimeConverter implements IStringConverter<RuntimeEnv> {
public static class RuntimeConverter implements ITypeConverter<RuntimeEnv> {
@Override
public RuntimeEnv convert(String value) {
return RuntimeEnv.valueOf(value);
}
}

@Parameter(names = "--functionConfig", description = "The json representation of FunctionConfig",
@Option(names = "--functionConfig", description = "The json representation of FunctionConfig",
hidden = true, converter = FunctionConfigConverter.class)
protected FunctionConfig functionConfig;
@Parameter(names = "--sourceConfig", description = "The json representation of SourceConfig",
@Option(names = "--sourceConfig", description = "The json representation of SourceConfig",
hidden = true, converter = SourceConfigConverter.class)
protected SourceConfig sourceConfig;
@Parameter(names = "--sinkConfig", description = "The json representation of SinkConfig",
@Option(names = "--sinkConfig", description = "The json representation of SinkConfig",
hidden = true, converter = SinkConfigConverter.class)
protected SinkConfig sinkConfig;
@Parameter(names = "--stateStorageImplClass", description = "The implemenatation class "
@Option(names = "--stateStorageImplClass", description = "The implemenatation class "
+ "state storage service (by default Apache BookKeeper)", hidden = true, required = false)
protected String stateStorageImplClass;
@Parameter(names = "--stateStorageServiceUrl", description = "The URL for the state storage service "
@Option(names = "--stateStorageServiceUrl", description = "The URL for the state storage service "
+ "(by default Apache BookKeeper)", hidden = true)
protected String stateStorageServiceUrl;
@Parameter(names = "--brokerServiceUrl", description = "The URL for the Pulsar broker", hidden = true)
@Option(names = "--brokerServiceUrl", description = "The URL for the Pulsar broker", hidden = true)
protected String brokerServiceUrl;
@Parameter(names = "--webServiceUrl", description = "The URL for the Pulsar web service", hidden = true)
@Option(names = "--webServiceUrl", description = "The URL for the Pulsar web service", hidden = true)
protected String webServiceUrl = null;
@Parameter(names = "--clientAuthPlugin", description = "Client authentication plugin using which "
@Option(names = "--clientAuthPlugin", description = "Client authentication plugin using which "
+ "function-process can connect to broker", hidden = true)
protected String clientAuthPlugin;
@Parameter(names = "--clientAuthParams", description = "Client authentication param", hidden = true)
@Option(names = "--clientAuthParams", description = "Client authentication param", hidden = true)
protected String clientAuthParams;
@Parameter(names = "--useTls", description = "Use tls connection\n", hidden = true, arity = 1)
@Option(names = "--useTls", description = "Use tls connection\n", hidden = true, arity = "1")
protected boolean useTls;
@Parameter(names = "--tlsAllowInsecureConnection", description = "Allow insecure tls connection\n",
hidden = true, arity = 1)
@Option(names = "--tlsAllowInsecureConnection", description = "Allow insecure tls connection\n",
hidden = true, arity = "1")
protected boolean tlsAllowInsecureConnection;
@Parameter(names = "--tlsHostNameVerificationEnabled", description = "Enable hostname verification", hidden = true
, arity = 1)
@Option(names = "--tlsHostNameVerificationEnabled", description = "Enable hostname verification", hidden = true
, arity = "1")
protected boolean tlsHostNameVerificationEnabled;
@Parameter(names = "--tlsTrustCertFilePath", description = "tls trust cert file path", hidden = true)
@Option(names = "--tlsTrustCertFilePath", description = "tls trust cert file path", hidden = true)
protected String tlsTrustCertFilePath;
@Parameter(names = "--instanceIdOffset", description = "Start the instanceIds from this offset", hidden = true)
@Option(names = "--instanceIdOffset", description = "Start the instanceIds from this offset", hidden = true)
protected int instanceIdOffset = 0;
@Parameter(names = "--runtime", description = "Function runtime to use (Thread/Process)", hidden = true,
@Option(names = "--runtime", description = "Function runtime to use (Thread/Process)", hidden = true,
converter = RuntimeConverter.class)
protected RuntimeEnv runtimeEnv;
@Parameter(names = "--secretsProviderClassName",
@Option(names = "--secretsProviderClassName",
description = "Whats the classname of secrets provider", hidden = true)
protected String secretsProviderClassName;
@Parameter(names = "--secretsProviderConfig",
@Option(names = "--secretsProviderConfig",
description = "Whats the config for the secrets provider", hidden = true)
protected String secretsProviderConfig;
@Parameter(names = "--metricsPortStart", description = "The starting port range for metrics server. When running "
@Option(names = "--metricsPortStart", description = "The starting port range for metrics server. When running "
+ "instances as threads, one metrics server is used to host the stats for all instances.", hidden = true)
protected Integer metricsPortStart;
@Parameter(names = "--exitOnError", description = "The starting port range for metrics server. When running "
@Option(names = "--exitOnError", description = "The starting port range for metrics server. When running "
+ "instances as threads, one metrics server is used to host the stats for all instances.", hidden = true)
protected boolean exitOnError;

Expand All @@ -212,11 +213,10 @@ public RuntimeEnv convert(String value) {

public static void main(String[] args) throws Exception {
LocalRunner localRunner = LocalRunner.builder().build();
JCommander jcommander = new JCommander(localRunner);
jcommander.setProgramName("LocalRunner");
CommandLine jcommander = new CommandLine(localRunner);
jcommander.setCommandName("LocalRunner");

// parse args by JCommander
jcommander.parse(args);
jcommander.parseArgs(args);
try {
localRunner.start(true);
} catch (Exception e) {
Expand Down
4 changes: 2 additions & 2 deletions pulsar-functions/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@
</dependency>

<dependency>
<groupId>com.beust</groupId>
<artifactId>jcommander</artifactId>
<groupId>info.picocli</groupId>
<artifactId>picocli</artifactId>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@

import static org.apache.pulsar.functions.utils.FunctionCommon.getSinkType;
import static org.apache.pulsar.functions.utils.FunctionCommon.getSourceType;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.converters.StringConverter;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import com.google.protobuf.Empty;
Expand Down Expand Up @@ -59,104 +56,104 @@
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManagerImpl;
import picocli.CommandLine;
import picocli.CommandLine.Option;


@Slf4j
public class JavaInstanceStarter implements AutoCloseable {
@Parameter(names = "--function_details", description = "Function details json\n", required = true)
@Option(names = "--function_details", description = "Function details json\n", required = true)
public String functionDetailsJsonString;
@Parameter(
@Option(
names = "--jar",
description = "Path to Jar\n",
listConverter = StringConverter.class)
description = "Path to Jar\n")
public String jarFile;

@Parameter(
@Option(
names = "--transform_function_jar",
description = "Path to Transform Function Jar\n",
listConverter = StringConverter.class)
description = "Path to Transform Function Jar\n")
public String transformFunctionJarFile;

@Parameter(names = "--instance_id", description = "Instance Id\n", required = true)
@Option(names = "--instance_id", description = "Instance Id\n", required = true)
public int instanceId;

@Parameter(names = "--function_id", description = "Function Id\n", required = true)
@Option(names = "--function_id", description = "Function Id\n", required = true)
public String functionId;

@Parameter(names = "--function_version", description = "Function Version\n", required = true)
@Option(names = "--function_version", description = "Function Version\n", required = true)
public String functionVersion;

@Parameter(names = "--pulsar_serviceurl", description = "Pulsar Service Url\n", required = true)
@Option(names = "--pulsar_serviceurl", description = "Pulsar Service Url\n", required = true)
public String pulsarServiceUrl;

@Parameter(names = "--transform_function_id", description = "Transform Function Id\n")
@Option(names = "--transform_function_id", description = "Transform Function Id\n")
public String transformFunctionId;

@Parameter(names = "--client_auth_plugin", description = "Client auth plugin name\n")
@Option(names = "--client_auth_plugin", description = "Client auth plugin name\n")
public String clientAuthenticationPlugin;

@Parameter(names = "--client_auth_params", description = "Client auth param\n")
@Option(names = "--client_auth_params", description = "Client auth param\n")
public String clientAuthenticationParameters;

@Parameter(names = "--use_tls", description = "Use tls connection\n")
@Option(names = "--use_tls", description = "Use tls connection\n")
public String useTls = Boolean.FALSE.toString();

@Parameter(names = "--tls_allow_insecure", description = "Allow insecure tls connection\n")
@Option(names = "--tls_allow_insecure", description = "Allow insecure tls connection\n")
public String tlsAllowInsecureConnection = Boolean.FALSE.toString();

@Parameter(names = "--hostname_verification_enabled", description = "Enable hostname verification")
@Option(names = "--hostname_verification_enabled", description = "Enable hostname verification")
public String tlsHostNameVerificationEnabled = Boolean.FALSE.toString();

@Parameter(names = "--tls_trust_cert_path", description = "tls trust cert file path")
@Option(names = "--tls_trust_cert_path", description = "tls trust cert file path")
public String tlsTrustCertFilePath;

@Parameter(names = "--state_storage_impl_class", description = "State Storage Service "
@Option(names = "--state_storage_impl_class", description = "State Storage Service "
+ "Implementation class\n", required = false)
public String stateStorageImplClass;

@Parameter(names = "--state_storage_serviceurl", description = "State Storage Service Url\n", required = false)
@Option(names = "--state_storage_serviceurl", description = "State Storage Service Url\n", required = false)
public String stateStorageServiceUrl;

@Parameter(names = "--port", description = "Port to listen on\n", required = true)
@Option(names = "--port", description = "Port to listen on\n", required = true)
public int port;

@Parameter(names = "--metrics_port", description = "Port metrics will be exposed on\n", required = true)
@Option(names = "--metrics_port", description = "Port metrics will be exposed on\n", required = true)
public int metricsPort;

@Parameter(names = "--max_buffered_tuples", description = "Maximum number of tuples to buffer\n", required = true)
@Option(names = "--max_buffered_tuples", description = "Maximum number of tuples to buffer\n", required = true)
public int maxBufferedTuples;

@Parameter(names = "--expected_healthcheck_interval", description = "Expected interval in "
@Option(names = "--expected_healthcheck_interval", description = "Expected interval in "
+ "seconds between healtchecks", required = true)
public int expectedHealthCheckInterval;

@Parameter(names = "--secrets_provider", description = "The classname of the secrets provider", required = false)
@Option(names = "--secrets_provider", description = "The classname of the secrets provider", required = false)
public String secretsProviderClassName;

@Parameter(names = "--secrets_provider_config", description = "The config that needs to be "
@Option(names = "--secrets_provider_config", description = "The config that needs to be "
+ "passed to secrets provider", required = false)
public String secretsProviderConfig;

@Parameter(names = "--cluster_name", description = "The name of the cluster this "
@Option(names = "--cluster_name", description = "The name of the cluster this "
+ "instance is running on", required = true)
public String clusterName;

@Parameter(names = "--nar_extraction_directory", description = "The directory where "
@Option(names = "--nar_extraction_directory", description = "The directory where "
+ "extraction of nar packages happen", required = false)
public String narExtractionDirectory = NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR;

@Parameter(names = "--pending_async_requests", description = "Max pending async requests per instance",
@Option(names = "--pending_async_requests", description = "Max pending async requests per instance",
required = false)
public int maxPendingAsyncRequests = 1000;

@Parameter(names = "--web_serviceurl", description = "Pulsar Web Service Url", required = false)
@Option(names = "--web_serviceurl", description = "Pulsar Web Service Url", required = false)
public String webServiceUrl = null;

@Parameter(names = "--expose_pulsaradmin", description = "Whether the pulsar admin client "
@Option(names = "--expose_pulsaradmin", description = "Whether the pulsar admin client "
+ "exposed to function context, default is disabled.", required = false)
public Boolean exposePulsarAdminClientEnabled = false;

@Parameter(names = "--ignore_unknown_config_fields",
@Option(names = "--ignore_unknown_config_fields",
description = "Whether to ignore unknown properties when deserializing the connector configuration.",
required = false)
public Boolean ignoreUnknownConfigFields = false;
Expand All @@ -176,9 +173,8 @@ public void start(String[] args, ClassLoader functionInstanceClassLoader, ClassL
throws Exception {
Thread.currentThread().setContextClassLoader(functionInstanceClassLoader);

JCommander jcommander = new JCommander(this);
// parse args by JCommander
jcommander.parse(args);
CommandLine jcommander = new CommandLine(this);
jcommander.parseArgs(args);

InstanceConfig instanceConfig = new InstanceConfig();
instanceConfig.setFunctionId(functionId);
Expand Down
Loading

0 comments on commit 567174f

Please sign in to comment.