-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[improve][broker] PIP-192 Added ServiceUnitStateCompactionStrategy #19045
Changes from 3 commits
3e7c2bf
e348be3
bf665d3
c818108
dbbc861
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.pulsar.broker.loadbalance.extensions.channel; | ||
|
||
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Assigned; | ||
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Free; | ||
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Owned; | ||
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Released; | ||
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Splitting; | ||
import com.google.common.annotations.VisibleForTesting; | ||
import org.apache.commons.lang.StringUtils; | ||
import org.apache.pulsar.client.api.Schema; | ||
import org.apache.pulsar.common.topics.TopicCompactionStrategy; | ||
|
||
public class ServiceUnitStateCompactionStrategy implements TopicCompactionStrategy<ServiceUnitStateData> { | ||
|
||
private final Schema<ServiceUnitStateData> schema; | ||
|
||
private boolean checkBrokers = true; | ||
|
||
public ServiceUnitStateCompactionStrategy() { | ||
schema = Schema.JSON(ServiceUnitStateData.class); | ||
} | ||
|
||
@Override | ||
public Schema<ServiceUnitStateData> getSchema() { | ||
return schema; | ||
} | ||
|
||
@VisibleForTesting | ||
public void checkBrokers(boolean check) { | ||
Demogorgon314 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
this.checkBrokers = check; | ||
} | ||
|
||
@Override | ||
public boolean shouldKeepLeft(ServiceUnitStateData from, ServiceUnitStateData to) { | ||
ServiceUnitState prevState = from == null ? Free : from.state(); | ||
ServiceUnitState state = to == null ? Free : to.state(); | ||
if (!ServiceUnitState.isValidTransition(prevState, state)) { | ||
return true; | ||
} | ||
|
||
if (checkBrokers) { | ||
if (prevState == Free && (state == Assigned || state == Owned)) { | ||
// Free -> Assigned || Owned broker check | ||
return StringUtils.isBlank(to.broker()); | ||
} else if (prevState == Owned && state == Assigned) { | ||
// Owned -> Assigned(transfer) broker check | ||
return !StringUtils.equals(from.broker(), to.sourceBroker()) | ||
|| StringUtils.isBlank(to.broker()) | ||
|| StringUtils.equals(from.broker(), to.broker()); | ||
} else if (prevState == Assigned && state == Released) { | ||
// Assigned -> Released(transfer) broker check | ||
return !StringUtils.equals(from.broker(), to.broker()) | ||
|| !StringUtils.equals(from.sourceBroker(), to.sourceBroker()); | ||
} else if (prevState == Released && state == Owned) { | ||
// Released -> Owned(transfer) broker check | ||
return !StringUtils.equals(from.broker(), to.broker()) | ||
|| !StringUtils.equals(from.sourceBroker(), to.sourceBroker()); | ||
} else if (prevState == Assigned && state == Owned) { | ||
// Assigned -> Owned broker check | ||
return !StringUtils.equals(from.broker(), to.broker()) | ||
|| !StringUtils.equals(from.sourceBroker(), to.sourceBroker()); | ||
} else if (prevState == Owned && state == Splitting) { | ||
// Owned -> Splitting broker check | ||
return !StringUtils.equals(from.broker(), to.broker()); | ||
} | ||
} | ||
|
||
return false; | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -77,6 +77,8 @@ | |
import org.apache.commons.collections4.CollectionUtils; | ||
import org.apache.commons.lang3.StringUtils; | ||
import org.apache.pulsar.broker.PulsarServerException; | ||
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl; | ||
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateCompactionStrategy; | ||
import org.apache.pulsar.broker.namespace.NamespaceService; | ||
import org.apache.pulsar.broker.resources.NamespaceResources.PartitionedTopicResources; | ||
import org.apache.pulsar.broker.service.AbstractReplicator; | ||
|
@@ -151,6 +153,7 @@ | |
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl; | ||
import org.apache.pulsar.common.protocol.Commands; | ||
import org.apache.pulsar.common.protocol.schema.SchemaData; | ||
import org.apache.pulsar.common.topics.TopicCompactionStrategy; | ||
import org.apache.pulsar.common.util.Codec; | ||
import org.apache.pulsar.common.util.DateFormatter; | ||
import org.apache.pulsar.common.util.FutureUtil; | ||
|
@@ -202,6 +205,11 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal | |
private CompletableFuture<Long> currentCompaction = CompletableFuture.completedFuture(COMPACTION_NEVER_RUN); | ||
private final CompactedTopic compactedTopic; | ||
|
||
// TODO: Create compaction strategy from topic policy when exposing strategic compaction to users. | ||
private static Map<String, TopicCompactionStrategy> strategicCompactionMap = Map.of( | ||
ServiceUnitStateChannelImpl.TOPIC, | ||
new ServiceUnitStateCompactionStrategy()); | ||
codelipenghui marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
private CompletableFuture<MessageIdImpl> currentOffload = CompletableFuture.completedFuture( | ||
(MessageIdImpl) MessageId.earliest); | ||
|
||
|
@@ -1563,6 +1571,8 @@ public void checkCompaction() { | |
} | ||
|
||
if (backlogEstimate > compactionThreshold) { | ||
log.info("topic:{} backlogEstimate:{} is bigger than compactionThreshold:{}. Triggering compaction", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It should be a debug-level log. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought this topic compaction log is useful. Could you share your concerns about this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Suppose you have many topics, like 50k topics per broker. By default, we will have 50k logs per minute. And we already have logs after the compaction task started. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok. I will make this debug lvl. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated. |
||
topic, backlogEstimate, compactionThreshold); | ||
try { | ||
triggerCompaction(); | ||
} catch (AlreadyRunningException are) { | ||
|
@@ -2981,7 +2991,13 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) { | |
public synchronized void triggerCompaction() | ||
throws PulsarServerException, AlreadyRunningException { | ||
if (currentCompaction.isDone()) { | ||
currentCompaction = brokerService.pulsar().getCompactor().compact(topic); | ||
|
||
if (strategicCompactionMap.containsKey(topic)) { | ||
currentCompaction = brokerService.pulsar().getStrategicCompactor() | ||
.compact(topic, strategicCompactionMap.get(topic)); | ||
} else { | ||
currentCompaction = brokerService.pulsar().getCompactor().compact(topic); | ||
} | ||
} else { | ||
throw new AlreadyRunningException("Compaction already in progress"); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -385,6 +385,7 @@ private <T> void phaseTwoLoop(String topic, Iterator<Message<T>> reader, | |
promise.completeExceptionally(e); | ||
return; | ||
} | ||
outstanding.release(MAX_OUTSTANDING); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we move this after There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, I found a deadlock if we don't release this semaphore before completing the future. deadLock example
|
||
promise.complete(null); | ||
return; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you help explain why do we need modify here ?
we update
serviceUnitTombstoneErrorCnt
andserviceUnitTombstoneCnt
in the async method. these values could both be 0. and let line 643 pass.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It relies on
producer.flush()
to persist all outstanding messages before returning this call.Yes, because these values could be 0, I updated this code path here. Also, I further cleaned the metrics computation code to be cleaner.