Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][offload] Extend the offload policies to allow specifying more conf #20804

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import lombok.Data;
Expand Down Expand Up @@ -118,6 +119,9 @@ public class OffloadPoliciesImpl implements Serializable, OffloadPolicies {
@Configuration
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
private OffloadedReadPriority managedLedgerOffloadedReadPriority = DEFAULT_OFFLOADED_READ_PRIORITY;
@Configuration
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
private Map<String, String> managedLedgerExtraConfigurations = null;

// s3 config, set by service configuration or cli
@Configuration
Expand Down Expand Up @@ -257,6 +261,14 @@ public static OffloadPoliciesImpl create(Properties properties) {
}
}
});
Map<String, String> extraConfigurations = properties.entrySet().stream()
.filter(entry -> entry.getKey().toString().startsWith("managedLedgerOffloadExtraConfig"))
gaoran10 marked this conversation as resolved.
Show resolved Hide resolved
.collect(Collectors.toMap(
entry -> entry.getKey().toString().replaceFirst("managedLedgerOffloadExtraConfig", ""),
entry -> entry.getValue().toString()));

data.setManagedLedgerExtraConfigurations(extraConfigurations);

data.compatibleWithBrokerConfigFile(properties);
return data;
}
Expand Down Expand Up @@ -347,6 +359,8 @@ public Properties toProperties() {
this.getManagedLedgerOffloadThresholdInSeconds());
setProperty(properties, "managedLedgerOffloadDeletionLagInMillis",
this.getManagedLedgerOffloadDeletionLagInMillis());
setProperty(properties, "managedLedgerOffloadExtraConfigurations",
this.getManagedLedgerExtraConfigurations());

if (this.isS3Driver()) {
setProperty(properties, "s3ManagedLedgerOffloadRegion",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,9 @@ public static Float stringToFloat(String val) {
* @return The converted list with type {@code <T>}.
*/
public static <T> List<T> stringToList(String val, Class<T> type) {
if (val == null) {
return null;
}
String[] tokens = trim(val).split(",");
return Arrays.stream(tokens).map(t -> {
return convert(trim(t), type);
Expand All @@ -330,13 +333,19 @@ public static <T> List<T> stringToList(String val, Class<T> type) {
* @return The converted set with type {@code <T>}.
*/
public static <T> Set<T> stringToSet(String val, Class<T> type) {
if (val == null) {
return null;
}
String[] tokens = trim(val).split(",");
return Arrays.stream(tokens).map(t -> {
return convert(trim(t), type);
}).collect(Collectors.toCollection(LinkedHashSet::new));
}

private static <K, V> Map<K, V> stringToMap(String strValue, Class<K> keyType, Class<V> valueType) {
if (strValue == null) {
return null;
}
String[] tokens = trim(strValue).split(",");
Map<K, V> map = new HashMap<>();
for (String token : tokens) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Map;
import java.util.Properties;
import org.testng.Assert;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -432,4 +433,16 @@ private byte[] loadClassData(String name) throws IOException {
}
}

@Test
public void testCreateOffloadPoliciesWithExtraConfiguration() {
Properties properties = new Properties();
properties.put("managedLedgerOffloadExtraConfigKey1", "value1");
properties.put("managedLedgerOffloadExtraConfigKey2", "value2");
OffloadPoliciesImpl policies = OffloadPoliciesImpl.create(properties);
Technoboy- marked this conversation as resolved.
Show resolved Hide resolved

Map<String, String> extraConfigurations = policies.getManagedLedgerExtraConfigurations();
Assert.assertEquals(extraConfigurations.size(), 2);
Assert.assertEquals(extraConfigurations.get("Key1"), "value1");
Assert.assertEquals(extraConfigurations.get("Key2"), "value2");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@
package org.apache.pulsar.common.util;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;

import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import java.util.Optional;
import java.util.Set;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -94,4 +97,46 @@ public static class MyConfig {
public Set<String> stringSet;
}

@Test
public void testNullStrValue() throws Exception {
class TestMap {
public List<String> list;
public Set<String> set;
public Map<String, String> map;
public Optional<String> optional;
}

Field listField = TestMap.class.getField("list");
Object listValue = FieldParser.value(null, listField);
assertNull(listValue);

listValue = FieldParser.value("null", listField);
assertTrue(listValue instanceof List);
assertEquals(((List) listValue).size(), 1);
assertEquals(((List) listValue).get(0), "null");


Field setField = TestMap.class.getField("set");
Object setValue = FieldParser.value(null, setField);
assertNull(setValue);

setValue = FieldParser.value("null", setField);
assertTrue(setValue instanceof Set);
assertEquals(((Set) setValue).size(), 1);
assertEquals(((Set) setValue).iterator().next(), "null");

Field mapField = TestMap.class.getField("map");
Object mapValue = FieldParser.value(null, mapField);
assertNull(mapValue);

try {
FieldParser.value("null", mapField);
} catch (IllegalArgumentException iae) {
assertTrue(iae.getMessage().contains("null map-value is not in correct format key1=value,key2=value2"));
}

Field optionalField = TestMap.class.getField("optional");
Object optionalValue = FieldParser.value(null, optionalField);
assertEquals(optionalValue, Optional.empty());
}
}