Skip to content

Commit

Permalink
[feat][broker] PIP-180 Part VI: Add ShadowManagedLedgerImpl (apache#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
Jason918 authored and lifepuzzlefun committed Jan 10, 2023
1 parent 3ebd96e commit 4f7cacc
Show file tree
Hide file tree
Showing 16 changed files with 937 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ public class ManagedLedgerConfig {
private int minimumBacklogEntriesForCaching = 1000;
private int maxBacklogBetweenCursorsForCaching = 1000;

@Getter
@Setter
private String shadowSourceName;

public boolean isCreateIfMissing() {
return createIfMissing;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
protected final BookKeeper bookKeeper;
protected final String name;
private final Map<String, byte[]> ledgerMetadata;
private final BookKeeper.DigestType digestType;
protected final BookKeeper.DigestType digestType;

protected ManagedLedgerConfig config;
protected Map<String, String> propertiesMap;
Expand All @@ -164,7 +164,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
.concurrencyLevel(1) // number of sections
.build();
protected final NavigableMap<Long, LedgerInfo> ledgers = new ConcurrentSkipListMap<>();
private volatile Stat ledgersStat;
protected volatile Stat ledgersStat;

// contains all cursors, where durable cursors are ordered by mark delete position
private final ManagedCursorContainer cursors = new ManagedCursorContainer();
Expand Down Expand Up @@ -215,10 +215,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
private final CallbackMutex offloadMutex = new CallbackMutex();
private static final CompletableFuture<PositionImpl> NULL_OFFLOAD_PROMISE = CompletableFuture
.completedFuture(PositionImpl.LATEST);
private volatile LedgerHandle currentLedger;
private long currentLedgerEntries = 0;
private long currentLedgerSize = 0;
private volatile long lastLedgerCreatedTimestamp = 0;
protected volatile LedgerHandle currentLedger;
protected long currentLedgerEntries = 0;
protected long currentLedgerSize = 0;
protected volatile long lastLedgerCreatedTimestamp = 0;
private volatile long lastLedgerCreationFailureTimestamp = 0;
private long lastLedgerCreationInitiationTimestamp = 0;

Expand All @@ -236,9 +236,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {

volatile PositionImpl lastConfirmedEntry;

private ManagedLedgerInterceptor managedLedgerInterceptor;
protected ManagedLedgerInterceptor managedLedgerInterceptor;

private volatile long lastAddEntryTimeMs = 0;
protected volatile long lastAddEntryTimeMs = 0;
private long inactiveLedgerRollOverTimeMs = 0;

protected static final int DEFAULT_LEDGER_DELETE_RETRIES = 3;
Expand Down Expand Up @@ -285,7 +285,7 @@ public enum PositionBound {
startIncluded, startExcluded
}

private static final AtomicReferenceFieldUpdater<ManagedLedgerImpl, State> STATE_UPDATER =
protected static final AtomicReferenceFieldUpdater<ManagedLedgerImpl, State> STATE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(ManagedLedgerImpl.class, State.class, "state");
protected volatile State state = null;
private volatile boolean migrated = false;
Expand All @@ -294,7 +294,7 @@ public enum PositionBound {
private final OrderedScheduler scheduledExecutor;

@Getter
private final OrderedExecutor executor;
protected final OrderedExecutor executor;

@Getter
private final ManagedLedgerFactoryImpl factory;
Expand Down Expand Up @@ -459,7 +459,11 @@ public void operationFailed(MetaStoreException e) {
scheduleTimeoutTask();
}

private synchronized void initializeBookKeeper(final ManagedLedgerInitializeLedgerCallback callback) {
protected boolean isLedgersReadonly() {
return false;
}

protected synchronized void initializeBookKeeper(final ManagedLedgerInitializeLedgerCallback callback) {
if (log.isDebugEnabled()) {
log.debug("[{}] initializing bookkeeper; ledgers {}", name, ledgers);
}
Expand Down Expand Up @@ -544,7 +548,7 @@ public void operationFailed(MetaStoreException e) {
}, ledgerMetadata);
}

private void initializeCursors(final ManagedLedgerInitializeLedgerCallback callback) {
protected void initializeCursors(final ManagedLedgerInitializeLedgerCallback callback) {
if (log.isDebugEnabled()) {
log.debug("[{}] initializing cursors", name);
}
Expand Down Expand Up @@ -785,7 +789,7 @@ public void asyncAddEntry(ByteBuf buffer, int numberOfMessages, AddEntryCallback
}));
}

private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {
protected synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {
if (!beforeAddEntry(addOperation)) {
return;
}
Expand Down Expand Up @@ -855,7 +859,7 @@ private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {
lastAddEntryTimeMs = System.currentTimeMillis();
}

private boolean beforeAddEntry(OpAddEntry addOperation) {
protected boolean beforeAddEntry(OpAddEntry addOperation) {
// if no interceptor, just return true to make sure addOperation will be initiate()
if (managedLedgerInterceptor == null) {
return true;
Expand Down Expand Up @@ -1603,7 +1607,7 @@ public void operationFailed(MetaStoreException e) {
}
}

private void handleBadVersion(Throwable e) {
protected void handleBadVersion(Throwable e) {
if (e instanceof BadVersionException) {
setFenced();
}
Expand Down Expand Up @@ -1643,7 +1647,7 @@ void createNewOpAddEntryForNewLedger() {
} while (existsOp != null && --pendingSize > 0);
}

private synchronized void updateLedgersIdsComplete() {
protected synchronized void updateLedgersIdsComplete() {
STATE_UPDATER.set(this, State.LedgerOpened);
updateLastLedgerCreatedTimeAndScheduleRolloverTask();

Expand Down Expand Up @@ -2610,8 +2614,8 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {
return;
}

List<LedgerInfo> ledgersToDelete = new ArrayList();
List<LedgerInfo> offloadedLedgersToDelete = new ArrayList();
List<LedgerInfo> ledgersToDelete = new ArrayList<>();
List<LedgerInfo> offloadedLedgersToDelete = new ArrayList<>();
Optional<OffloadPolicies> optionalOffloadPolicies = Optional.ofNullable(config.getLedgerOffloader() != null
&& config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
? config.getLedgerOffloader().getOffloadPolicies()
Expand Down Expand Up @@ -2739,23 +2743,8 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {
return;
}

PositionImpl currentLastConfirmedEntry = lastConfirmedEntry;
// Update metadata
for (LedgerInfo ls : ledgersToDelete) {
if (currentLastConfirmedEntry != null && ls.getLedgerId() == currentLastConfirmedEntry.getLedgerId()) {
// this info is relevant because the lastMessageId won't be available anymore
log.info("[{}] Ledger {} contains the current last confirmed entry {}, and it is going to be "
+ "deleted", name, ls.getLedgerId(), currentLastConfirmedEntry);
}
doDeleteLedgers(ledgersToDelete);

invalidateReadHandle(ls.getLedgerId());

ledgers.remove(ls.getLedgerId());
NUMBER_OF_ENTRIES_UPDATER.addAndGet(this, -ls.getEntries());
TOTAL_SIZE_UPDATER.addAndGet(this, -ls.getSize());

entryCache.invalidateAllEntries(ls.getLedgerId());
}
for (LedgerInfo ls : offloadedLedgersToDelete) {
LedgerInfo.Builder newInfoBuilder = ls.toBuilder();
newInfoBuilder.getOffloadContextBuilder().setBookkeeperDeleted(true);
Expand Down Expand Up @@ -2805,6 +2794,26 @@ public void operationFailed(MetaStoreException e) {
}
}

protected void doDeleteLedgers(List<LedgerInfo> ledgersToDelete) {
PositionImpl currentLastConfirmedEntry = lastConfirmedEntry;
// Update metadata
for (LedgerInfo ls : ledgersToDelete) {
if (currentLastConfirmedEntry != null && ls.getLedgerId() == currentLastConfirmedEntry.getLedgerId()) {
// this info is relevant because the lastMessageId won't be available anymore
log.info("[{}] Ledger {} contains the current last confirmed entry {}, and it is going to be "
+ "deleted", name, ls.getLedgerId(), currentLastConfirmedEntry);
}

invalidateReadHandle(ls.getLedgerId());

ledgers.remove(ls.getLedgerId());
NUMBER_OF_ENTRIES_UPDATER.addAndGet(this, -ls.getEntries());
TOTAL_SIZE_UPDATER.addAndGet(this, -ls.getSize());

entryCache.invalidateAllEntries(ls.getLedgerId());
}
}

/**
* Non-durable cursors have to be moved forward when data is trimmed since they are not retain that data.
* This is to make sure that the `consumedEntries` counter is correctly updated with the number of skipped
Expand Down Expand Up @@ -3764,7 +3773,7 @@ public NavigableMap<Long, LedgerInfo> getLedgersInfo() {
return ledgers;
}

private ManagedLedgerInfo getManagedLedgerInfo() {
protected ManagedLedgerInfo getManagedLedgerInfo() {
return buildManagedLedgerInfo(ledgers);
}

Expand Down Expand Up @@ -4334,7 +4343,7 @@ public CompletableFuture<Set<BookieId>> getEnsemblesAsync(long ledgerId) {
});
}

private void updateLastLedgerCreatedTimeAndScheduleRolloverTask() {
protected void updateLastLedgerCreatedTimeAndScheduleRolloverTask() {
this.lastLedgerCreatedTimestamp = clock.millis();
if (config.getMaximumRolloverTimeMs() > 0) {
if (checkLedgerRollTask != null && !checkLedgerRollTask.isDone()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ interface MetaStoreCallback<T> {
void operationFailed(MetaStoreException e);
}

interface UpdateCallback<T> {
void onUpdate(T result, Stat stat);
}

/**
* Get the metadata used by the ManagedLedger.
*
Expand Down Expand Up @@ -71,6 +75,19 @@ default void getManagedLedgerInfo(String ledgerName, boolean createIfMissing,
void getManagedLedgerInfo(String ledgerName, boolean createIfMissing, Map<String, String> properties,
MetaStoreCallback<ManagedLedgerInfo> callback);

/**
* Watch the metadata used by the ManagedLedger.
* @param ledgerName
* @param callback
*/
void watchManagedLedgerInfo(String ledgerName, UpdateCallback<ManagedLedgerInfo> callback);

/**
* Unwatch the metadata changes for ledgerName.
* @param ledgerName
*/
void unwatchManagedLedgerInfo(String ledgerName);

/**
*
* @param ledgerName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
Expand All @@ -47,10 +49,12 @@
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.Stat;

@Slf4j
public class MetaStoreImpl implements MetaStore {
public class MetaStoreImpl implements MetaStore, Consumer<Notification> {

private static final String BASE_NODE = "/managed-ledgers";
private static final String PREFIX = BASE_NODE + "/";
Expand All @@ -62,11 +66,17 @@ public class MetaStoreImpl implements MetaStore {
private final CompressionType ledgerInfoCompressionType;
private final CompressionType cursorInfoCompressionType;

private final Map<String, UpdateCallback<ManagedLedgerInfo>> managedLedgerInfoUpdateCallbackMap;

public MetaStoreImpl(MetadataStore store, OrderedExecutor executor) {
this.store = store;
this.executor = executor;
this.ledgerInfoCompressionType = CompressionType.NONE;
this.cursorInfoCompressionType = CompressionType.NONE;
managedLedgerInfoUpdateCallbackMap = new ConcurrentHashMap<>();
if (store != null) {
store.registerListener(this);
}
}

public MetaStoreImpl(MetadataStore store, OrderedExecutor executor, String ledgerInfoCompressionType,
Expand All @@ -75,6 +85,10 @@ public MetaStoreImpl(MetadataStore store, OrderedExecutor executor, String ledge
this.executor = executor;
this.ledgerInfoCompressionType = parseCompressionType(ledgerInfoCompressionType);
this.cursorInfoCompressionType = parseCompressionType(cursorInfoCompressionType);
managedLedgerInfoUpdateCallbackMap = new ConcurrentHashMap<>();
if (store != null) {
store.registerListener(this);
}
}

private CompressionType parseCompressionType(String value) {
Expand Down Expand Up @@ -327,6 +341,43 @@ public CompletableFuture<Boolean> asyncExists(String path) {
return store.exists(PREFIX + path);
}

@Override
public void watchManagedLedgerInfo(String ledgerName, UpdateCallback<ManagedLedgerInfo> callback) {
managedLedgerInfoUpdateCallbackMap.put(PREFIX + ledgerName, callback);
}

@Override
public void unwatchManagedLedgerInfo(String ledgerName) {
managedLedgerInfoUpdateCallbackMap.remove(PREFIX + ledgerName);
}

@Override
public void accept(Notification notification) {
if (!notification.getPath().startsWith(PREFIX) || notification.getType() != NotificationType.Modified) {
return;
}
UpdateCallback<ManagedLedgerInfo> callback = managedLedgerInfoUpdateCallbackMap.get(notification.getPath());
if (callback == null) {
return;
}
String ledgerName = notification.getPath().substring(PREFIX.length());
store.get(notification.getPath()).thenAcceptAsync(optResult -> {
if (optResult.isPresent()) {
ManagedLedgerInfo info;
try {
info = parseManagedLedgerInfo(optResult.get().getValue());
info = updateMLInfoTimestamp(info);
callback.onUpdate(info, optResult.get().getStat());
} catch (InvalidProtocolBufferException e) {
log.error("[{}] Error when parseManagedLedgerInfo", ledgerName, e);
}
}
}, executor.chooseThread(ledgerName)).exceptionally(ex -> {
log.error("[{}] Error when read ManagedLedgerInfo", ledgerName, ex);
return null;
});
}

//
// update timestamp if missing or 0
// 3 cases - timestamp does not exist for ledgers serialized before
Expand Down
Loading

0 comments on commit 4f7cacc

Please sign in to comment.