diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java index 843c1bde3b912..f9148ba8699fd 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java @@ -31,6 +31,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.stream.Collectors; import lombok.Data; @@ -118,6 +119,9 @@ public class OffloadPoliciesImpl implements Serializable, OffloadPolicies { @Configuration @JsonProperty(access = JsonProperty.Access.READ_WRITE) private OffloadedReadPriority managedLedgerOffloadedReadPriority = DEFAULT_OFFLOADED_READ_PRIORITY; + @Configuration + @JsonProperty(access = JsonProperty.Access.READ_WRITE) + private Map managedLedgerExtraConfigurations = null; // s3 config, set by service configuration or cli @Configuration @@ -257,6 +261,14 @@ public static OffloadPoliciesImpl create(Properties properties) { } } }); + Map extraConfigurations = properties.entrySet().stream() + .filter(entry -> entry.getKey().toString().startsWith("managedLedgerOffloadExtraConfig")) + .collect(Collectors.toMap( + entry -> entry.getKey().toString().replaceFirst("managedLedgerOffloadExtraConfig", ""), + entry -> entry.getValue().toString())); + + data.setManagedLedgerExtraConfigurations(extraConfigurations); + data.compatibleWithBrokerConfigFile(properties); return data; } @@ -347,6 +359,8 @@ public Properties toProperties() { this.getManagedLedgerOffloadThresholdInSeconds()); setProperty(properties, "managedLedgerOffloadDeletionLagInMillis", this.getManagedLedgerOffloadDeletionLagInMillis()); + setProperty(properties, "managedLedgerOffloadExtraConfigurations", + this.getManagedLedgerExtraConfigurations()); if (this.isS3Driver()) { setProperty(properties, "s3ManagedLedgerOffloadRegion", diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java index 626a14b92eedd..8d1ae5294ff7b 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java @@ -314,6 +314,9 @@ public static Float stringToFloat(String val) { * @return The converted list with type {@code }. */ public static List stringToList(String val, Class type) { + if (val == null) { + return null; + } String[] tokens = trim(val).split(","); return Arrays.stream(tokens).map(t -> { return convert(trim(t), type); @@ -330,6 +333,9 @@ public static List stringToList(String val, Class type) { * @return The converted set with type {@code }. */ public static Set stringToSet(String val, Class type) { + if (val == null) { + return null; + } String[] tokens = trim(val).split(","); return Arrays.stream(tokens).map(t -> { return convert(trim(t), type); @@ -337,6 +343,9 @@ public static Set stringToSet(String val, Class type) { } private static Map stringToMap(String strValue, Class keyType, Class valueType) { + if (strValue == null) { + return null; + } String[] tokens = trim(strValue).split(","); Map map = new HashMap<>(); for (String token : tokens) { diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java index 00b9aab0b1591..d79d2c32ffa7f 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java @@ -26,6 +26,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.Map; import java.util.Properties; import org.testng.Assert; import org.testng.annotations.Test; @@ -432,4 +433,16 @@ private byte[] loadClassData(String name) throws IOException { } } + @Test + public void testCreateOffloadPoliciesWithExtraConfiguration() { + Properties properties = new Properties(); + properties.put("managedLedgerOffloadExtraConfigKey1", "value1"); + properties.put("managedLedgerOffloadExtraConfigKey2", "value2"); + OffloadPoliciesImpl policies = OffloadPoliciesImpl.create(properties); + + Map extraConfigurations = policies.getManagedLedgerExtraConfigurations(); + Assert.assertEquals(extraConfigurations.size(), 2); + Assert.assertEquals(extraConfigurations.get("Key1"), "value1"); + Assert.assertEquals(extraConfigurations.get("Key2"), "value2"); + } } diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/FieldParserTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/FieldParserTest.java index e90b6cbc4a13a..b24e9ae40822a 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/FieldParserTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/FieldParserTest.java @@ -19,12 +19,15 @@ package org.apache.pulsar.common.util; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; +import java.lang.reflect.Field; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import org.testng.annotations.Test; @@ -94,4 +97,46 @@ public static class MyConfig { public Set stringSet; } + @Test + public void testNullStrValue() throws Exception { + class TestMap { + public List list; + public Set set; + public Map map; + public Optional optional; + } + + Field listField = TestMap.class.getField("list"); + Object listValue = FieldParser.value(null, listField); + assertNull(listValue); + + listValue = FieldParser.value("null", listField); + assertTrue(listValue instanceof List); + assertEquals(((List) listValue).size(), 1); + assertEquals(((List) listValue).get(0), "null"); + + + Field setField = TestMap.class.getField("set"); + Object setValue = FieldParser.value(null, setField); + assertNull(setValue); + + setValue = FieldParser.value("null", setField); + assertTrue(setValue instanceof Set); + assertEquals(((Set) setValue).size(), 1); + assertEquals(((Set) setValue).iterator().next(), "null"); + + Field mapField = TestMap.class.getField("map"); + Object mapValue = FieldParser.value(null, mapField); + assertNull(mapValue); + + try { + FieldParser.value("null", mapField); + } catch (IllegalArgumentException iae) { + assertTrue(iae.getMessage().contains("null map-value is not in correct format key1=value,key2=value2")); + } + + Field optionalField = TestMap.class.getField("optional"); + Object optionalValue = FieldParser.value(null, optionalField); + assertEquals(optionalValue, Optional.empty()); + } }