Skip to content

Commit

Permalink
[branch-2.10][fix][broker] Duplicate LedgerOffloader creation when na… (
Browse files Browse the repository at this point in the history
  • Loading branch information
shibd authored Nov 23, 2023
1 parent d0cb05b commit f2ff9c5
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ protected void setup() throws Exception {
conf.setAllowAutoTopicCreation(true);
conf.setDefaultNumPartitions(3);
conf.setForceDeleteNamespaceAllowed(true);
conf.setSystemTopicEnabled(true);
conf.setTopicLevelPoliciesEnabled(true);
super.internalSetup();
super.producerBaseSetup();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.lang.annotation.Target;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
Expand Down Expand Up @@ -222,8 +221,7 @@ public static OffloadPoliciesImpl create(String driver, String region, String bu

public static OffloadPoliciesImpl create(Properties properties) {
OffloadPoliciesImpl data = new OffloadPoliciesImpl();
Field[] fields = OffloadPoliciesImpl.class.getDeclaredFields();
Arrays.stream(fields).forEach(f -> {
for (Field f : CONFIGURATION_FIELDS) {
if (properties.containsKey(f.getName())) {
try {
f.setAccessible(true);
Expand All @@ -234,7 +232,7 @@ public static OffloadPoliciesImpl create(Properties properties) {
f.getName(), properties.get(f.getName())), e);
}
}
});
}
data.compatibleWithBrokerConfigFile(properties);
return data;
}
Expand Down Expand Up @@ -311,62 +309,14 @@ public boolean bucketValid() {

public Properties toProperties() {
Properties properties = new Properties();
setProperty(properties, "managedLedgerOffloadedReadPriority", this.getManagedLedgerOffloadedReadPriority());
setProperty(properties, "offloadersDirectory", this.getOffloadersDirectory());
setProperty(properties, "managedLedgerOffloadDriver", this.getManagedLedgerOffloadDriver());
setProperty(properties, "managedLedgerOffloadMaxThreads",
this.getManagedLedgerOffloadMaxThreads());
setProperty(properties, "managedLedgerOffloadPrefetchRounds",
this.getManagedLedgerOffloadPrefetchRounds());
setProperty(properties, "managedLedgerOffloadThresholdInBytes",
this.getManagedLedgerOffloadThresholdInBytes());
setProperty(properties, "managedLedgerOffloadDeletionLagInMillis",
this.getManagedLedgerOffloadDeletionLagInMillis());

if (this.isS3Driver()) {
setProperty(properties, "s3ManagedLedgerOffloadRegion",
this.getS3ManagedLedgerOffloadRegion());
setProperty(properties, "s3ManagedLedgerOffloadBucket",
this.getS3ManagedLedgerOffloadBucket());
setProperty(properties, "s3ManagedLedgerOffloadServiceEndpoint",
this.getS3ManagedLedgerOffloadServiceEndpoint());
setProperty(properties, "s3ManagedLedgerOffloadMaxBlockSizeInBytes",
this.getS3ManagedLedgerOffloadMaxBlockSizeInBytes());
setProperty(properties, "s3ManagedLedgerOffloadCredentialId",
this.getS3ManagedLedgerOffloadCredentialId());
setProperty(properties, "s3ManagedLedgerOffloadCredentialSecret",
this.getS3ManagedLedgerOffloadCredentialSecret());
setProperty(properties, "s3ManagedLedgerOffloadRole",
this.getS3ManagedLedgerOffloadRole());
setProperty(properties, "s3ManagedLedgerOffloadRoleSessionName",
this.getS3ManagedLedgerOffloadRoleSessionName());
setProperty(properties, "s3ManagedLedgerOffloadReadBufferSizeInBytes",
this.getS3ManagedLedgerOffloadReadBufferSizeInBytes());
} else if (this.isGcsDriver()) {
setProperty(properties, "gcsManagedLedgerOffloadRegion",
this.getGcsManagedLedgerOffloadRegion());
setProperty(properties, "gcsManagedLedgerOffloadBucket",
this.getGcsManagedLedgerOffloadBucket());
setProperty(properties, "gcsManagedLedgerOffloadMaxBlockSizeInBytes",
this.getGcsManagedLedgerOffloadMaxBlockSizeInBytes());
setProperty(properties, "gcsManagedLedgerOffloadReadBufferSizeInBytes",
this.getGcsManagedLedgerOffloadReadBufferSizeInBytes());
setProperty(properties, "gcsManagedLedgerOffloadServiceAccountKeyFile",
this.getGcsManagedLedgerOffloadServiceAccountKeyFile());
} else if (this.isFileSystemDriver()) {
setProperty(properties, "fileSystemProfilePath", this.getFileSystemProfilePath());
setProperty(properties, "fileSystemURI", this.getFileSystemURI());
}

setProperty(properties, "managedLedgerOffloadBucket", this.getManagedLedgerOffloadBucket());
setProperty(properties, "managedLedgerOffloadRegion", this.getManagedLedgerOffloadRegion());
setProperty(properties, "managedLedgerOffloadServiceEndpoint",
this.getManagedLedgerOffloadServiceEndpoint());
setProperty(properties, "managedLedgerOffloadMaxBlockSizeInBytes",
this.getManagedLedgerOffloadMaxBlockSizeInBytes());
setProperty(properties, "managedLedgerOffloadReadBufferSizeInBytes",
this.getManagedLedgerOffloadReadBufferSizeInBytes());

for (Field f : CONFIGURATION_FIELDS) {
try {
f.setAccessible(true);
setProperty(properties, f.getName(), f.get(this));
} catch (Exception e) {
throw new IllegalArgumentException("An error occurred while processing the field: " + f.getName(), e);
}
}
return properties;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,4 +305,20 @@ public void mergeTest() {
Assert.assertNull(offloadPolicies.getS3ManagedLedgerOffloadRegion());
}

/**
* Test toProperties as well as create from properties.
* @throws Exception
*/
@Test
public void testToProperties() throws Exception {
// Base information convert.
OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create("aws-s3", "test-region", "test-bucket",
"http://test.endpoint",null, null, null, null, 32 * 1024 * 1024, 5 * 1024 * 1024,
10 * 1024 * 1024L, 10000L, OffloadedReadPriority.TIERED_STORAGE_FIRST);
Assert.assertEquals(offloadPolicies, OffloadPoliciesImpl.create(offloadPolicies.toProperties()));

// Set useless config to offload policies. Make sure convert conversion result is the same.
offloadPolicies.setFileSystemProfilePath("/test/file");
Assert.assertEquals(offloadPolicies, OffloadPoliciesImpl.create(offloadPolicies.toProperties()));
}
}

0 comments on commit f2ff9c5

Please sign in to comment.