Skip to content

Commit

Permalink
[improve][offload] Extend the offload policies to allow specifying mo…
Browse files Browse the repository at this point in the history
…re conf (#20804)

### Motivation

The offload policies have limited the configurations for the offloaders.  That means if the offloader needs more configurations, we need to extend more fields in the OffloadPoliciesImpl. That doesn't make sense. We should make it extendable easily. Add a configuration map support to allow it to set more configurations.
  • Loading branch information
zymap authored and Technoboy- committed Aug 17, 2023
1 parent 3105d0a commit 25d3a86
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import lombok.Data;
Expand Down Expand Up @@ -118,6 +119,9 @@ public class OffloadPoliciesImpl implements Serializable, OffloadPolicies {
@Configuration
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
private OffloadedReadPriority managedLedgerOffloadedReadPriority = DEFAULT_OFFLOADED_READ_PRIORITY;
@Configuration
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
private Map<String, String> managedLedgerExtraConfigurations = null;

// s3 config, set by service configuration or cli
@Configuration
Expand Down Expand Up @@ -257,6 +261,14 @@ public static OffloadPoliciesImpl create(Properties properties) {
}
}
});
Map<String, String> extraConfigurations = properties.entrySet().stream()
.filter(entry -> entry.getKey().toString().startsWith("managedLedgerOffloadExtraConfig"))
.collect(Collectors.toMap(
entry -> entry.getKey().toString().replaceFirst("managedLedgerOffloadExtraConfig", ""),
entry -> entry.getValue().toString()));

data.setManagedLedgerExtraConfigurations(extraConfigurations);

data.compatibleWithBrokerConfigFile(properties);
return data;
}
Expand Down Expand Up @@ -347,6 +359,8 @@ public Properties toProperties() {
this.getManagedLedgerOffloadThresholdInSeconds());
setProperty(properties, "managedLedgerOffloadDeletionLagInMillis",
this.getManagedLedgerOffloadDeletionLagInMillis());
setProperty(properties, "managedLedgerOffloadExtraConfigurations",
this.getManagedLedgerExtraConfigurations());

if (this.isS3Driver()) {
setProperty(properties, "s3ManagedLedgerOffloadRegion",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,9 @@ public static Float stringToFloat(String val) {
* @return The converted list with type {@code <T>}.
*/
public static <T> List<T> stringToList(String val, Class<T> type) {
if (val == null) {
return null;
}
String[] tokens = trim(val).split(",");
return Arrays.stream(tokens).map(t -> {
return convert(trim(t), type);
Expand All @@ -330,13 +333,19 @@ public static <T> List<T> stringToList(String val, Class<T> type) {
* @return The converted set with type {@code <T>}.
*/
public static <T> Set<T> stringToSet(String val, Class<T> type) {
if (val == null) {
return null;
}
String[] tokens = trim(val).split(",");
return Arrays.stream(tokens).map(t -> {
return convert(trim(t), type);
}).collect(Collectors.toCollection(LinkedHashSet::new));
}

private static <K, V> Map<K, V> stringToMap(String strValue, Class<K> keyType, Class<V> valueType) {
if (strValue == null) {
return null;
}
String[] tokens = trim(strValue).split(",");
Map<K, V> map = new HashMap<>();
for (String token : tokens) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Map;
import java.util.Properties;
import org.testng.Assert;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -432,4 +433,16 @@ private byte[] loadClassData(String name) throws IOException {
}
}

@Test
public void testCreateOffloadPoliciesWithExtraConfiguration() {
Properties properties = new Properties();
properties.put("managedLedgerOffloadExtraConfigKey1", "value1");
properties.put("managedLedgerOffloadExtraConfigKey2", "value2");
OffloadPoliciesImpl policies = OffloadPoliciesImpl.create(properties);

Map<String, String> extraConfigurations = policies.getManagedLedgerExtraConfigurations();
Assert.assertEquals(extraConfigurations.size(), 2);
Assert.assertEquals(extraConfigurations.get("Key1"), "value1");
Assert.assertEquals(extraConfigurations.get("Key2"), "value2");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@
package org.apache.pulsar.common.util;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;

import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import java.util.Optional;
import java.util.Set;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -94,4 +97,46 @@ public static class MyConfig {
public Set<String> stringSet;
}

@Test
public void testNullStrValue() throws Exception {
class TestMap {
public List<String> list;
public Set<String> set;
public Map<String, String> map;
public Optional<String> optional;
}

Field listField = TestMap.class.getField("list");
Object listValue = FieldParser.value(null, listField);
assertNull(listValue);

listValue = FieldParser.value("null", listField);
assertTrue(listValue instanceof List);
assertEquals(((List) listValue).size(), 1);
assertEquals(((List) listValue).get(0), "null");


Field setField = TestMap.class.getField("set");
Object setValue = FieldParser.value(null, setField);
assertNull(setValue);

setValue = FieldParser.value("null", setField);
assertTrue(setValue instanceof Set);
assertEquals(((Set) setValue).size(), 1);
assertEquals(((Set) setValue).iterator().next(), "null");

Field mapField = TestMap.class.getField("map");
Object mapValue = FieldParser.value(null, mapField);
assertNull(mapValue);

try {
FieldParser.value("null", mapField);
} catch (IllegalArgumentException iae) {
assertTrue(iae.getMessage().contains("null map-value is not in correct format key1=value,key2=value2"));
}

Field optionalField = TestMap.class.getField("optional");
Object optionalValue = FieldParser.value(null, optionalField);
assertEquals(optionalValue, Optional.empty());
}
}

0 comments on commit 25d3a86

Please sign in to comment.