Skip to content

Commit

Permalink
[improve] [broker] Pin AppendIndexMetadataInterceptor to field in `Ma…
Browse files Browse the repository at this point in the history
…nagedLedgerInterceptorImpl` (#20112)
  • Loading branch information
lifepuzzlefun authored Apr 20, 2023
1 parent 00d09cb commit 2b41e4e
Showing 1 changed file with 30 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,27 @@ public class ManagedLedgerInterceptorImpl implements ManagedLedgerInterceptor {
private static final Logger log = LoggerFactory.getLogger(ManagedLedgerInterceptorImpl.class);
private static final String INDEX = "index";
private final Set<BrokerEntryMetadataInterceptor> brokerEntryMetadataInterceptors;

private final AppendIndexMetadataInterceptor appendIndexMetadataInterceptor;
private final Set<ManagedLedgerPayloadProcessor.Processor> inputProcessors;
private final Set<ManagedLedgerPayloadProcessor.Processor> outputProcessors;

public ManagedLedgerInterceptorImpl(Set<BrokerEntryMetadataInterceptor> brokerEntryMetadataInterceptors,
Set<ManagedLedgerPayloadProcessor> brokerEntryPayloadProcessors) {
this.brokerEntryMetadataInterceptors = brokerEntryMetadataInterceptors;

// save appendIndexMetadataInterceptor to field
AppendIndexMetadataInterceptor appendIndexMetadataInterceptor = null;

for (BrokerEntryMetadataInterceptor interceptor : this.brokerEntryMetadataInterceptors) {
if (interceptor instanceof AppendIndexMetadataInterceptor) {
appendIndexMetadataInterceptor = (AppendIndexMetadataInterceptor) interceptor;
break;
}
}

this.appendIndexMetadataInterceptor = appendIndexMetadataInterceptor;

if (brokerEntryPayloadProcessors != null) {
this.inputProcessors = new LinkedHashSet<>();
this.outputProcessors = new LinkedHashSet<>();
Expand All @@ -61,12 +76,11 @@ public ManagedLedgerInterceptorImpl(Set<BrokerEntryMetadataInterceptor> brokerEn

public long getIndex() {
long index = -1;
for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) {
if (interceptor instanceof AppendIndexMetadataInterceptor) {
index = ((AppendIndexMetadataInterceptor) interceptor).getIndex();
break;
}

if (appendIndexMetadataInterceptor != null) {
return appendIndexMetadataInterceptor.getIndex();
}

return index;
}

Expand All @@ -81,10 +95,8 @@ public OpAddEntry beforeAddEntry(OpAddEntry op, int numberOfMessages) {

@Override
public void afterFailedAddEntry(int numberOfMessages) {
for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) {
if (interceptor instanceof AppendIndexMetadataInterceptor) {
((AppendIndexMetadataInterceptor) interceptor).decreaseWithNumberOfMessages(numberOfMessages);
}
if (appendIndexMetadataInterceptor != null) {
appendIndexMetadataInterceptor.decreaseWithNumberOfMessages(numberOfMessages);
}
}

Expand All @@ -95,21 +107,17 @@ public void onManagedLedgerPropertiesInitialize(Map<String, String> propertiesMa
}

if (propertiesMap.containsKey(INDEX)) {
for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) {
if (interceptor instanceof AppendIndexMetadataInterceptor) {
((AppendIndexMetadataInterceptor) interceptor)
.recoveryIndexGenerator(Long.parseLong(propertiesMap.get(INDEX)));
break;
}
if (appendIndexMetadataInterceptor != null) {
appendIndexMetadataInterceptor.recoveryIndexGenerator(
Long.parseLong(propertiesMap.get(INDEX)));
}
}
}

@Override
public CompletableFuture<Void> onManagedLedgerLastLedgerInitialize(String name, LedgerHandle lh) {
CompletableFuture<Void> promise = new CompletableFuture<>();
boolean hasAppendIndexMetadataInterceptor = brokerEntryMetadataInterceptors.stream()
.anyMatch(interceptor -> interceptor instanceof AppendIndexMetadataInterceptor);
boolean hasAppendIndexMetadataInterceptor = appendIndexMetadataInterceptor != null;
if (hasAppendIndexMetadataInterceptor && lh.getLastAddConfirmed() >= 0) {
lh.readAsync(lh.getLastAddConfirmed(), lh.getLastAddConfirmed()).whenComplete((entries, ex) -> {
if (ex != null) {
Expand All @@ -122,14 +130,9 @@ public CompletableFuture<Void> onManagedLedgerLastLedgerInitialize(String name,
if (ledgerEntry != null) {
BrokerEntryMetadata brokerEntryMetadata =
Commands.parseBrokerEntryMetadataIfExist(ledgerEntry.getEntryBuffer());
for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) {
if (interceptor instanceof AppendIndexMetadataInterceptor) {
if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) {
((AppendIndexMetadataInterceptor) interceptor)
.recoveryIndexGenerator(brokerEntryMetadata.getIndex());
}
break;
}
if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) {
appendIndexMetadataInterceptor.recoveryIndexGenerator(
brokerEntryMetadata.getIndex());
}
}
entries.close();
Expand All @@ -153,11 +156,8 @@ public CompletableFuture<Void> onManagedLedgerLastLedgerInitialize(String name,

@Override
public void onUpdateManagedLedgerInfo(Map<String, String> propertiesMap) {
for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) {
if (interceptor instanceof AppendIndexMetadataInterceptor) {
propertiesMap.put(INDEX, String.valueOf(((AppendIndexMetadataInterceptor) interceptor).getIndex()));
break;
}
if (appendIndexMetadataInterceptor != null) {
propertiesMap.put(INDEX, String.valueOf(appendIndexMetadataInterceptor.getIndex()));
}
}

Expand Down

0 comments on commit 2b41e4e

Please sign in to comment.