Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ISSUE-631 : Schema Registry numeric properties cannot be set via a property file #629

Merged
merged 3 commits into from
Dec 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -251,10 +251,10 @@ public static void main(String[] args) throws Exception {
public static Map<String, Object> createConfig(String schemaRegistryUrl) {
Map<String, Object> config = new HashMap<>();
config.put(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), schemaRegistryUrl);
config.put(SchemaRegistryClient.Configuration.CLASSLOADER_CACHE_SIZE.name(), 10L);
config.put(SchemaRegistryClient.Configuration.CLASSLOADER_CACHE_EXPIRY_INTERVAL_SECS.name(), 5000L);
config.put(SchemaRegistryClient.Configuration.SCHEMA_VERSION_CACHE_SIZE.name(), 1000L);
config.put(SchemaRegistryClient.Configuration.SCHEMA_VERSION_CACHE_EXPIRY_INTERVAL_SECS.name(), 60 * 60 * 1000L);
config.put(SchemaRegistryClient.Configuration.CLASSLOADER_CACHE_SIZE.name(), 10);
config.put(SchemaRegistryClient.Configuration.CLASSLOADER_CACHE_EXPIRY_INTERVAL_SECS.name(), 5000);
config.put(SchemaRegistryClient.Configuration.SCHEMA_VERSION_CACHE_SIZE.name(), 1000);
config.put(SchemaRegistryClient.Configuration.SCHEMA_VERSION_CACHE_EXPIRY_INTERVAL_SECS.name(), 60 * 60 * 1000);
return config;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1253,6 +1253,7 @@ public static final class Configuration {
String.class,
"URL of schema registry to which this client connects to. For ex: http://localhost:9090/api/v1",
"http://localhost:9090/api/v1",
ConfigEntry.StringConverter.get(),
heritamas marked this conversation as resolved.
Show resolved Hide resolved
ConfigEntry.NonEmptyStringValidator.get());

/**
Expand All @@ -1268,6 +1269,7 @@ public static final class Configuration {
String.class,
"URL of schema registry to which this client connects to. For ex: http://localhost:9090/api/v1",
DEFAULT_LOCAL_JARS_PATH,
ConfigEntry.StringConverter.get(),
ConfigEntry.NonEmptyStringValidator.get());

/**
Expand All @@ -1289,6 +1291,7 @@ public static final class Configuration {
Integer.class,
"Maximum size of classloader cache",
DEFAULT_CLASSLOADER_CACHE_SIZE,
ConfigEntry.IntegerConverter.get(),
ConfigEntry.PositiveNumberValidator.get());

/**
Expand All @@ -1300,6 +1303,7 @@ public static final class Configuration {
Integer.class,
"Expiry interval(in seconds) of an entry in classloader cache",
DEFAULT_CLASSLOADER_CACHE_EXPIRY_INTERVAL_SECS,
ConfigEntry.IntegerConverter.get(),
ConfigEntry.PositiveNumberValidator.get());

public static final long DEFAULT_SCHEMA_CACHE_SIZE = 1024;
Expand All @@ -1313,6 +1317,7 @@ public static final class Configuration {
Integer.class,
"Maximum size of schema version cache",
DEFAULT_SCHEMA_CACHE_SIZE,
ConfigEntry.IntegerConverter.get(),
ConfigEntry.PositiveNumberValidator.get());

/**
Expand All @@ -1323,6 +1328,7 @@ public static final class Configuration {
Integer.class,
"Expiry interval(in seconds) of an entry in schema version cache",
DEFAULT_SCHEMA_CACHE_EXPIRY_INTERVAL_SECS,
ConfigEntry.IntegerConverter.get(),
ConfigEntry.PositiveNumberValidator.get());

/**
Expand All @@ -1333,6 +1339,7 @@ public static final class Configuration {
Integer.class,
"Maximum size of schema metadata cache",
DEFAULT_SCHEMA_CACHE_SIZE,
ConfigEntry.IntegerConverter.get(),
ConfigEntry.PositiveNumberValidator.get());

/**
Expand All @@ -1343,6 +1350,7 @@ public static final class Configuration {
Integer.class,
"Expiry interval(in seconds) of an entry in schema metadata cache",
DEFAULT_SCHEMA_CACHE_EXPIRY_INTERVAL_SECS,
ConfigEntry.IntegerConverter.get(),
ConfigEntry.PositiveNumberValidator.get());

/**
Expand All @@ -1354,6 +1362,7 @@ public static final class Configuration {
Integer.class,
"Maximum size of schema text cache",
DEFAULT_SCHEMA_CACHE_SIZE,
ConfigEntry.IntegerConverter.get(),
ConfigEntry.PositiveNumberValidator.get());

/**
Expand All @@ -1364,6 +1373,7 @@ public static final class Configuration {
Integer.class,
"Expiry interval(in seconds) of an entry in schema text cache.",
DEFAULT_SCHEMA_CACHE_EXPIRY_INTERVAL_SECS,
ConfigEntry.IntegerConverter.get(),
ConfigEntry.PositiveNumberValidator.get());

/**
Expand All @@ -1374,6 +1384,7 @@ public static final class Configuration {
String.class,
"Schema Registry URL selector class.",
FailoverUrlSelector.class.getName(),
ConfigEntry.StringConverter.get(),
ConfigEntry.NonEmptyStringValidator.get());

/**
Expand All @@ -1384,6 +1395,7 @@ public static final class Configuration {
String.class,
"Schema Registry Dynamic JAAS config for SASL connection.",
null,
ConfigEntry.StringConverter.get(),
ConfigEntry.NonEmptyStringValidator.get());

// connection properties
Expand Down Expand Up @@ -1411,16 +1423,18 @@ public Configuration(Map<String, ?> config) {
for (Map.Entry<String, ?> entry : config.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
Object finalValue = value;

ConfigEntry configEntry = options.get(key);
if (configEntry != null) {
if (value != null) {
configEntry.validator().validate((value));
finalValue = configEntry.converter().convert(value);
configEntry.validator().validate((finalValue));
} else {
value = configEntry.defaultValue();
finalValue = configEntry.defaultValue();
}
}
result.put(key, value);
result.put(key, finalValue);
}

return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@
**/
package com.hortonworks.registries.schemaregistry;

import com.hortonworks.registries.schemaregistry.errors.ConfigTypeConversionException;

import java.io.Serializable;
import java.util.Collections;

/**
* Details about a configuration entry containing
* - name, type, description, mandatory, default value and validator.
* <p>
* Mandatory configuration entry can be created with {@link ConfigEntry#mandatory(String, Class, String, Object, Validator)}
* Optional configuration entry can be created with {@link ConfigEntry#optional(String, Class, String, Object, Validator)}
* Mandatory configuration entry can be created with {@link ConfigEntry#mandatory(String, Class, String, Object, Converter, Validator)}
* Optional configuration entry can be created with {@link ConfigEntry#optional(String, Class, String, Object, Converter, Validator)}
*/
public final class ConfigEntry<T> implements Serializable {

Expand All @@ -33,36 +35,41 @@ public final class ConfigEntry<T> implements Serializable {
private final String description;
private final boolean mandatory;
private final T defaultValue;
private final Converter<? extends T> converter;
private final Validator<? extends T> validator;

private ConfigEntry(String name,
Class<? extends T> type,
String description,
boolean mandatory,
T defaultValue,
Converter<? extends T> converter,
Validator<? extends T> validator) {
this.name = name;
this.type = type;
this.description = description;
this.mandatory = mandatory;
this.defaultValue = defaultValue;
this.converter = converter;
this.validator = validator;
}

public static <T> ConfigEntry<T> mandatory(String name,
Class<? extends T> type,
String description,
T defaultValue,
Converter<? extends T> converter,
Validator<? extends T> validator) {
return new ConfigEntry<T>(name, type, description, true, defaultValue, validator);
return new ConfigEntry<T>(name, type, description, true, defaultValue, converter, validator);
}

public static <T> ConfigEntry<T> optional(String name,
Class<? extends T> type,
String description,
T defaultValue,
Converter<? extends T> converter,
Validator<? extends T> validator) {
return new ConfigEntry<T>(name, type, description, false, defaultValue, validator);
return new ConfigEntry<T>(name, type, description, false, defaultValue, converter, validator);
}

public String name() {
Expand All @@ -85,6 +92,10 @@ public Validator<? extends T> validator() {
return validator;
}

public Converter<? extends T> converter() {
return converter;
}

public boolean isMandatory() {
return mandatory;
}
Expand Down Expand Up @@ -128,6 +139,59 @@ public String toString() {
'}';
}

/**
* Converts an Object to a typed value.
*
* @param <T> type of the resulting value.
*/
public interface Converter<T> {
T convert(Object obj);
}

public static class IntegerConverter implements Converter<Integer> {

private static final Converter<Integer> instance = new IntegerConverter();

@Override
public Integer convert(Object obj) {
if (obj instanceof String) {
try {
return Integer.valueOf((String)obj);
} catch (NumberFormatException e) {
throw new ConfigTypeConversionException(String.format("Value: %s can't be parsed as an Integer", obj), e);
}
} else if (obj instanceof Integer){
return ((Integer) obj);
heritamas marked this conversation as resolved.
Show resolved Hide resolved
} else {
throw new ConfigTypeConversionException(String.format("Value: %s (type: %s) is expected to be convertible to an Integer", obj, obj.getClass().getCanonicalName()));
}
}

public static Converter<Integer> get() {
return instance;
}
}

public static class StringConverter implements Converter<String> {

private static final Converter<String> instance = new StringConverter();

@Override
public String convert(Object obj) {

if (obj instanceof String) {
return (String)obj;
} else {
return obj.toString();
}
}

public static Converter<String> get() {
return instance;
}
}


/**
* Validates the given value for a given {@link ConfigEntry}
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/**
* Copyright 2017-2019 Cloudera, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/

package com.hortonworks.registries.schemaregistry.errors;
heritamas marked this conversation as resolved.
Show resolved Hide resolved

public class ConfigTypeConversionException extends RuntimeException {

public ConfigTypeConversionException(String message) { super(message); }

public ConfigTypeConversionException(String message, Throwable cause) { super(message, cause); }
}