Skip to content

Commit

Permalink
[fix][broker] Duplicate LedgerOffloader creation when namespace/topic… (
Browse files Browse the repository at this point in the history
apache#21591)

(cherry picked from commit 98bf9dd)
  • Loading branch information
shibd authored and nikhil-ctds committed Dec 20, 2023
1 parent 2c7f4e2 commit 1a55a80
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
Expand Down Expand Up @@ -86,6 +87,7 @@ public class OffloadPoliciesImpl implements Serializable, OffloadPolicies {
public static final Long DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES = null;
public static final Long DEFAULT_OFFLOAD_THRESHOLD_IN_SECONDS = null;
public static final Long DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS = null;
public static final String EXTRA_CONFIG_PREFIX = "managedLedgerOffloadExtraConfig";

public static final String OFFLOAD_THRESHOLD_NAME_IN_CONF_FILE =
"managedLedgerOffloadAutoTriggerSizeThresholdBytes";
Expand Down Expand Up @@ -121,8 +123,7 @@ public class OffloadPoliciesImpl implements Serializable, OffloadPolicies {
private OffloadedReadPriority managedLedgerOffloadedReadPriority = DEFAULT_OFFLOADED_READ_PRIORITY;
@Configuration
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
private Map<String, String> managedLedgerExtraConfigurations = null;

private Map<String, String> managedLedgerExtraConfigurations = new HashMap<>();
// s3 config, set by service configuration or cli
@Configuration
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
Expand Down Expand Up @@ -248,8 +249,7 @@ public static OffloadPoliciesImpl create(String driver, String region, String bu

public static OffloadPoliciesImpl create(Properties properties) {
OffloadPoliciesImpl data = new OffloadPoliciesImpl();
Field[] fields = OffloadPoliciesImpl.class.getDeclaredFields();
Arrays.stream(fields).forEach(f -> {
for (Field f : CONFIGURATION_FIELDS) {
if (properties.containsKey(f.getName())) {
try {
f.setAccessible(true);
Expand All @@ -260,14 +260,15 @@ public static OffloadPoliciesImpl create(Properties properties) {
f.getName(), properties.get(f.getName())), e);
}
}
});
}

Map<String, String> extraConfigurations = properties.entrySet().stream()
.filter(entry -> entry.getKey().toString().startsWith("managedLedgerOffloadExtraConfig"))
.collect(Collectors.toMap(
entry -> entry.getKey().toString().replaceFirst("managedLedgerOffloadExtraConfig", ""),
entry -> entry.getValue().toString()));
.filter(entry -> entry.getKey().toString().startsWith(EXTRA_CONFIG_PREFIX))
.collect(Collectors.toMap(
entry -> entry.getKey().toString().replaceFirst(EXTRA_CONFIG_PREFIX, ""),
entry -> entry.getValue().toString()));

data.setManagedLedgerExtraConfigurations(extraConfigurations);
data.getManagedLedgerExtraConfigurations().putAll(extraConfigurations);

data.compatibleWithBrokerConfigFile(properties);
return data;
Expand Down Expand Up @@ -346,66 +347,21 @@ public boolean bucketValid() {

public Properties toProperties() {
Properties properties = new Properties();
setProperty(properties, "managedLedgerOffloadedReadPriority", this.getManagedLedgerOffloadedReadPriority());
setProperty(properties, "offloadersDirectory", this.getOffloadersDirectory());
setProperty(properties, "managedLedgerOffloadDriver", this.getManagedLedgerOffloadDriver());
setProperty(properties, "managedLedgerOffloadMaxThreads",
this.getManagedLedgerOffloadMaxThreads());
setProperty(properties, "managedLedgerOffloadPrefetchRounds",
this.getManagedLedgerOffloadPrefetchRounds());
setProperty(properties, "managedLedgerOffloadThresholdInBytes",
this.getManagedLedgerOffloadThresholdInBytes());
setProperty(properties, "managedLedgerOffloadThresholdInSeconds",
this.getManagedLedgerOffloadThresholdInSeconds());
setProperty(properties, "managedLedgerOffloadDeletionLagInMillis",
this.getManagedLedgerOffloadDeletionLagInMillis());
setProperty(properties, "managedLedgerOffloadExtraConfigurations",
this.getManagedLedgerExtraConfigurations());

if (this.isS3Driver()) {
setProperty(properties, "s3ManagedLedgerOffloadRegion",
this.getS3ManagedLedgerOffloadRegion());
setProperty(properties, "s3ManagedLedgerOffloadBucket",
this.getS3ManagedLedgerOffloadBucket());
setProperty(properties, "s3ManagedLedgerOffloadServiceEndpoint",
this.getS3ManagedLedgerOffloadServiceEndpoint());
setProperty(properties, "s3ManagedLedgerOffloadMaxBlockSizeInBytes",
this.getS3ManagedLedgerOffloadMaxBlockSizeInBytes());
setProperty(properties, "s3ManagedLedgerOffloadCredentialId",
this.getS3ManagedLedgerOffloadCredentialId());
setProperty(properties, "s3ManagedLedgerOffloadCredentialSecret",
this.getS3ManagedLedgerOffloadCredentialSecret());
setProperty(properties, "s3ManagedLedgerOffloadRole",
this.getS3ManagedLedgerOffloadRole());
setProperty(properties, "s3ManagedLedgerOffloadRoleSessionName",
this.getS3ManagedLedgerOffloadRoleSessionName());
setProperty(properties, "s3ManagedLedgerOffloadReadBufferSizeInBytes",
this.getS3ManagedLedgerOffloadReadBufferSizeInBytes());
} else if (this.isGcsDriver()) {
setProperty(properties, "gcsManagedLedgerOffloadRegion",
this.getGcsManagedLedgerOffloadRegion());
setProperty(properties, "gcsManagedLedgerOffloadBucket",
this.getGcsManagedLedgerOffloadBucket());
setProperty(properties, "gcsManagedLedgerOffloadMaxBlockSizeInBytes",
this.getGcsManagedLedgerOffloadMaxBlockSizeInBytes());
setProperty(properties, "gcsManagedLedgerOffloadReadBufferSizeInBytes",
this.getGcsManagedLedgerOffloadReadBufferSizeInBytes());
setProperty(properties, "gcsManagedLedgerOffloadServiceAccountKeyFile",
this.getGcsManagedLedgerOffloadServiceAccountKeyFile());
} else if (this.isFileSystemDriver()) {
setProperty(properties, "fileSystemProfilePath", this.getFileSystemProfilePath());
setProperty(properties, "fileSystemURI", this.getFileSystemURI());
}

setProperty(properties, "managedLedgerOffloadBucket", this.getManagedLedgerOffloadBucket());
setProperty(properties, "managedLedgerOffloadRegion", this.getManagedLedgerOffloadRegion());
setProperty(properties, "managedLedgerOffloadServiceEndpoint",
this.getManagedLedgerOffloadServiceEndpoint());
setProperty(properties, "managedLedgerOffloadMaxBlockSizeInBytes",
this.getManagedLedgerOffloadMaxBlockSizeInBytes());
setProperty(properties, "managedLedgerOffloadReadBufferSizeInBytes",
this.getManagedLedgerOffloadReadBufferSizeInBytes());

for (Field f : CONFIGURATION_FIELDS) {
try {
f.setAccessible(true);
if ("managedLedgerExtraConfigurations".equals(f.getName())) {
Map<String, String> extraConfig = (Map<String, String>) f.get(this);
extraConfig.forEach((key, value) -> {
setProperty(properties, EXTRA_CONFIG_PREFIX + key, value);
});
} else {
setProperty(properties, f.getName(), f.get(this));
}
} catch (Exception e) {
throw new IllegalArgumentException("An error occurred while processing the field: " + f.getName(), e);
}
}
return properties;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar.common.policies.data;

import static org.apache.pulsar.common.policies.data.OffloadPoliciesImpl.EXTRA_CONFIG_PREFIX;
import static org.testng.Assert.assertEquals;
import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
Expand All @@ -26,6 +28,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.testng.Assert;
Expand Down Expand Up @@ -436,13 +439,37 @@ private byte[] loadClassData(String name) throws IOException {
@Test
public void testCreateOffloadPoliciesWithExtraConfiguration() {
Properties properties = new Properties();
properties.put("managedLedgerOffloadExtraConfigKey1", "value1");
properties.put("managedLedgerOffloadExtraConfigKey2", "value2");
properties.put(EXTRA_CONFIG_PREFIX + "Key1", "value1");
properties.put(EXTRA_CONFIG_PREFIX + "Key2", "value2");
OffloadPoliciesImpl policies = OffloadPoliciesImpl.create(properties);

Map<String, String> extraConfigurations = policies.getManagedLedgerExtraConfigurations();
Assert.assertEquals(extraConfigurations.size(), 2);
Assert.assertEquals(extraConfigurations.get("Key1"), "value1");
Assert.assertEquals(extraConfigurations.get("Key2"), "value2");
}

/**
* Test toProperties as well as create from properties.
* @throws Exception
*/
@Test
public void testToProperties() throws Exception {
// Base information convert.
OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create("aws-s3", "test-region", "test-bucket",
"http://test.endpoint", null, null, null, null, 32 * 1024 * 1024, 5 * 1024 * 1024,
10 * 1024 * 1024L, 100L, 10000L, OffloadedReadPriority.TIERED_STORAGE_FIRST);
assertEquals(offloadPolicies, OffloadPoliciesImpl.create(offloadPolicies.toProperties()));

// Set useless config to offload policies. Make sure convert conversion result is the same.
offloadPolicies.setFileSystemProfilePath("/test/file");
assertEquals(offloadPolicies, OffloadPoliciesImpl.create(offloadPolicies.toProperties()));

// Set extra config to offload policies. Make sure convert conversion result is the same.
Map<String, String> extraConfiguration = new HashMap<>();
extraConfiguration.put("key1", "value1");
extraConfiguration.put("key2", "value2");
offloadPolicies.setManagedLedgerExtraConfigurations(extraConfiguration);
assertEquals(offloadPolicies, OffloadPoliciesImpl.create(offloadPolicies.toProperties()));
}
}

0 comments on commit 1a55a80

Please sign in to comment.