From 4f7cacc3e7a76b3b1c4ffe994b79269921627635 Mon Sep 17 00:00:00 2001 From: JiangHaiting Date: Tue, 29 Nov 2022 12:17:31 +0800 Subject: [PATCH] [feat][broker] PIP-180 Part VI: Add ShadowManagedLedgerImpl (#18265) --- .../mledger/ManagedLedgerConfig.java | 4 + .../mledger/impl/ManagedLedgerImpl.java | 81 ++-- .../bookkeeper/mledger/impl/MetaStore.java | 17 + .../mledger/impl/MetaStoreImpl.java | 53 ++- .../bookkeeper/mledger/impl/OpAddEntry.java | 38 +- .../mledger/impl/ShadowManagedLedgerImpl.java | 372 +++++++++++++++++- .../impl/ShadowManagedLedgerImplTest.java | 158 ++++++++ .../pulsar/broker/service/AbstractTopic.java | 17 +- .../pulsar/broker/service/BrokerService.java | 19 +- .../pulsar/broker/service/Producer.java | 67 +++- .../pulsar/broker/service/ServerCnx.java | 8 +- .../service/persistent/PersistentTopic.java | 13 +- .../persistent/ShadowReplicatorTest.java | 15 +- .../service/persistent/ShadowTopicTest.java | 164 +++++++- .../pulsar/client/impl/ProducerImpl.java | 11 +- .../pulsar/common/naming/TopicName.java | 8 + 16 files changed, 937 insertions(+), 108 deletions(-) create mode 100644 managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImplTest.java diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index c02a781fda354f..6e88a8e650d58c 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -86,6 +86,10 @@ public class ManagedLedgerConfig { private int minimumBacklogEntriesForCaching = 1000; private int maxBacklogBetweenCursorsForCaching = 1000; + @Getter + @Setter + private String shadowSourceName; + public boolean isCreateIfMissing() { return createIfMissing; } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 54736cc8249b53..19d1749790d236 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -152,7 +152,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { protected final BookKeeper bookKeeper; protected final String name; private final Map ledgerMetadata; - private final BookKeeper.DigestType digestType; + protected final BookKeeper.DigestType digestType; protected ManagedLedgerConfig config; protected Map propertiesMap; @@ -164,7 +164,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { .concurrencyLevel(1) // number of sections .build(); protected final NavigableMap ledgers = new ConcurrentSkipListMap<>(); - private volatile Stat ledgersStat; + protected volatile Stat ledgersStat; // contains all cursors, where durable cursors are ordered by mark delete position private final ManagedCursorContainer cursors = new ManagedCursorContainer(); @@ -215,10 +215,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { private final CallbackMutex offloadMutex = new CallbackMutex(); private static final CompletableFuture NULL_OFFLOAD_PROMISE = CompletableFuture .completedFuture(PositionImpl.LATEST); - private volatile LedgerHandle currentLedger; - private long currentLedgerEntries = 0; - private long currentLedgerSize = 0; - private volatile long lastLedgerCreatedTimestamp = 0; + protected volatile LedgerHandle currentLedger; + protected long currentLedgerEntries = 0; + protected long currentLedgerSize = 0; + protected volatile long lastLedgerCreatedTimestamp = 0; private volatile long lastLedgerCreationFailureTimestamp = 0; private long lastLedgerCreationInitiationTimestamp = 0; @@ -236,9 +236,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { volatile PositionImpl lastConfirmedEntry; - private ManagedLedgerInterceptor managedLedgerInterceptor; + protected ManagedLedgerInterceptor managedLedgerInterceptor; - private volatile long lastAddEntryTimeMs = 0; + protected volatile long lastAddEntryTimeMs = 0; private long inactiveLedgerRollOverTimeMs = 0; protected static final int DEFAULT_LEDGER_DELETE_RETRIES = 3; @@ -285,7 +285,7 @@ public enum PositionBound { startIncluded, startExcluded } - private static final AtomicReferenceFieldUpdater STATE_UPDATER = + protected static final AtomicReferenceFieldUpdater STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ManagedLedgerImpl.class, State.class, "state"); protected volatile State state = null; private volatile boolean migrated = false; @@ -294,7 +294,7 @@ public enum PositionBound { private final OrderedScheduler scheduledExecutor; @Getter - private final OrderedExecutor executor; + protected final OrderedExecutor executor; @Getter private final ManagedLedgerFactoryImpl factory; @@ -459,7 +459,11 @@ public void operationFailed(MetaStoreException e) { scheduleTimeoutTask(); } - private synchronized void initializeBookKeeper(final ManagedLedgerInitializeLedgerCallback callback) { + protected boolean isLedgersReadonly() { + return false; + } + + protected synchronized void initializeBookKeeper(final ManagedLedgerInitializeLedgerCallback callback) { if (log.isDebugEnabled()) { log.debug("[{}] initializing bookkeeper; ledgers {}", name, ledgers); } @@ -544,7 +548,7 @@ public void operationFailed(MetaStoreException e) { }, ledgerMetadata); } - private void initializeCursors(final ManagedLedgerInitializeLedgerCallback callback) { + protected void initializeCursors(final ManagedLedgerInitializeLedgerCallback callback) { if (log.isDebugEnabled()) { log.debug("[{}] initializing cursors", name); } @@ -785,7 +789,7 @@ public void asyncAddEntry(ByteBuf buffer, int numberOfMessages, AddEntryCallback })); } - private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) { + protected synchronized void internalAsyncAddEntry(OpAddEntry addOperation) { if (!beforeAddEntry(addOperation)) { return; } @@ -855,7 +859,7 @@ private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) { lastAddEntryTimeMs = System.currentTimeMillis(); } - private boolean beforeAddEntry(OpAddEntry addOperation) { + protected boolean beforeAddEntry(OpAddEntry addOperation) { // if no interceptor, just return true to make sure addOperation will be initiate() if (managedLedgerInterceptor == null) { return true; @@ -1603,7 +1607,7 @@ public void operationFailed(MetaStoreException e) { } } - private void handleBadVersion(Throwable e) { + protected void handleBadVersion(Throwable e) { if (e instanceof BadVersionException) { setFenced(); } @@ -1643,7 +1647,7 @@ void createNewOpAddEntryForNewLedger() { } while (existsOp != null && --pendingSize > 0); } - private synchronized void updateLedgersIdsComplete() { + protected synchronized void updateLedgersIdsComplete() { STATE_UPDATER.set(this, State.LedgerOpened); updateLastLedgerCreatedTimeAndScheduleRolloverTask(); @@ -2610,8 +2614,8 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture promise) { return; } - List ledgersToDelete = new ArrayList(); - List offloadedLedgersToDelete = new ArrayList(); + List ledgersToDelete = new ArrayList<>(); + List offloadedLedgersToDelete = new ArrayList<>(); Optional optionalOffloadPolicies = Optional.ofNullable(config.getLedgerOffloader() != null && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE ? config.getLedgerOffloader().getOffloadPolicies() @@ -2739,23 +2743,8 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture promise) { return; } - PositionImpl currentLastConfirmedEntry = lastConfirmedEntry; - // Update metadata - for (LedgerInfo ls : ledgersToDelete) { - if (currentLastConfirmedEntry != null && ls.getLedgerId() == currentLastConfirmedEntry.getLedgerId()) { - // this info is relevant because the lastMessageId won't be available anymore - log.info("[{}] Ledger {} contains the current last confirmed entry {}, and it is going to be " - + "deleted", name, ls.getLedgerId(), currentLastConfirmedEntry); - } + doDeleteLedgers(ledgersToDelete); - invalidateReadHandle(ls.getLedgerId()); - - ledgers.remove(ls.getLedgerId()); - NUMBER_OF_ENTRIES_UPDATER.addAndGet(this, -ls.getEntries()); - TOTAL_SIZE_UPDATER.addAndGet(this, -ls.getSize()); - - entryCache.invalidateAllEntries(ls.getLedgerId()); - } for (LedgerInfo ls : offloadedLedgersToDelete) { LedgerInfo.Builder newInfoBuilder = ls.toBuilder(); newInfoBuilder.getOffloadContextBuilder().setBookkeeperDeleted(true); @@ -2805,6 +2794,26 @@ public void operationFailed(MetaStoreException e) { } } + protected void doDeleteLedgers(List ledgersToDelete) { + PositionImpl currentLastConfirmedEntry = lastConfirmedEntry; + // Update metadata + for (LedgerInfo ls : ledgersToDelete) { + if (currentLastConfirmedEntry != null && ls.getLedgerId() == currentLastConfirmedEntry.getLedgerId()) { + // this info is relevant because the lastMessageId won't be available anymore + log.info("[{}] Ledger {} contains the current last confirmed entry {}, and it is going to be " + + "deleted", name, ls.getLedgerId(), currentLastConfirmedEntry); + } + + invalidateReadHandle(ls.getLedgerId()); + + ledgers.remove(ls.getLedgerId()); + NUMBER_OF_ENTRIES_UPDATER.addAndGet(this, -ls.getEntries()); + TOTAL_SIZE_UPDATER.addAndGet(this, -ls.getSize()); + + entryCache.invalidateAllEntries(ls.getLedgerId()); + } + } + /** * Non-durable cursors have to be moved forward when data is trimmed since they are not retain that data. * This is to make sure that the `consumedEntries` counter is correctly updated with the number of skipped @@ -3764,7 +3773,7 @@ public NavigableMap getLedgersInfo() { return ledgers; } - private ManagedLedgerInfo getManagedLedgerInfo() { + protected ManagedLedgerInfo getManagedLedgerInfo() { return buildManagedLedgerInfo(ledgers); } @@ -4334,7 +4343,7 @@ public CompletableFuture> getEnsemblesAsync(long ledgerId) { }); } - private void updateLastLedgerCreatedTimeAndScheduleRolloverTask() { + protected void updateLastLedgerCreatedTimeAndScheduleRolloverTask() { this.lastLedgerCreatedTimestamp = clock.millis(); if (config.getMaximumRolloverTimeMs() > 0) { if (checkLedgerRollTask != null && !checkLedgerRollTask.isDone()) { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java index b4c1383b772732..21e12d81a727da 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java @@ -43,6 +43,10 @@ interface MetaStoreCallback { void operationFailed(MetaStoreException e); } + interface UpdateCallback { + void onUpdate(T result, Stat stat); + } + /** * Get the metadata used by the ManagedLedger. * @@ -71,6 +75,19 @@ default void getManagedLedgerInfo(String ledgerName, boolean createIfMissing, void getManagedLedgerInfo(String ledgerName, boolean createIfMissing, Map properties, MetaStoreCallback callback); + /** + * Watch the metadata used by the ManagedLedger. + * @param ledgerName + * @param callback + */ + void watchManagedLedgerInfo(String ledgerName, UpdateCallback callback); + + /** + * Unwatch the metadata changes for ledgerName. + * @param ledgerName + */ + void unwatchManagedLedgerInfo(String ledgerName); + /** * * @param ledgerName diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java index 94f332bd931d5d..bcb73553324dd0 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java @@ -30,7 +30,9 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.RejectedExecutionException; +import java.util.function.Consumer; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.mledger.ManagedLedgerException; @@ -47,10 +49,12 @@ import org.apache.pulsar.common.compression.CompressionCodecProvider; import org.apache.pulsar.metadata.api.MetadataStore; import org.apache.pulsar.metadata.api.MetadataStoreException; +import org.apache.pulsar.metadata.api.Notification; +import org.apache.pulsar.metadata.api.NotificationType; import org.apache.pulsar.metadata.api.Stat; @Slf4j -public class MetaStoreImpl implements MetaStore { +public class MetaStoreImpl implements MetaStore, Consumer { private static final String BASE_NODE = "/managed-ledgers"; private static final String PREFIX = BASE_NODE + "/"; @@ -62,11 +66,17 @@ public class MetaStoreImpl implements MetaStore { private final CompressionType ledgerInfoCompressionType; private final CompressionType cursorInfoCompressionType; + private final Map> managedLedgerInfoUpdateCallbackMap; + public MetaStoreImpl(MetadataStore store, OrderedExecutor executor) { this.store = store; this.executor = executor; this.ledgerInfoCompressionType = CompressionType.NONE; this.cursorInfoCompressionType = CompressionType.NONE; + managedLedgerInfoUpdateCallbackMap = new ConcurrentHashMap<>(); + if (store != null) { + store.registerListener(this); + } } public MetaStoreImpl(MetadataStore store, OrderedExecutor executor, String ledgerInfoCompressionType, @@ -75,6 +85,10 @@ public MetaStoreImpl(MetadataStore store, OrderedExecutor executor, String ledge this.executor = executor; this.ledgerInfoCompressionType = parseCompressionType(ledgerInfoCompressionType); this.cursorInfoCompressionType = parseCompressionType(cursorInfoCompressionType); + managedLedgerInfoUpdateCallbackMap = new ConcurrentHashMap<>(); + if (store != null) { + store.registerListener(this); + } } private CompressionType parseCompressionType(String value) { @@ -327,6 +341,43 @@ public CompletableFuture asyncExists(String path) { return store.exists(PREFIX + path); } + @Override + public void watchManagedLedgerInfo(String ledgerName, UpdateCallback callback) { + managedLedgerInfoUpdateCallbackMap.put(PREFIX + ledgerName, callback); + } + + @Override + public void unwatchManagedLedgerInfo(String ledgerName) { + managedLedgerInfoUpdateCallbackMap.remove(PREFIX + ledgerName); + } + + @Override + public void accept(Notification notification) { + if (!notification.getPath().startsWith(PREFIX) || notification.getType() != NotificationType.Modified) { + return; + } + UpdateCallback callback = managedLedgerInfoUpdateCallbackMap.get(notification.getPath()); + if (callback == null) { + return; + } + String ledgerName = notification.getPath().substring(PREFIX.length()); + store.get(notification.getPath()).thenAcceptAsync(optResult -> { + if (optResult.isPresent()) { + ManagedLedgerInfo info; + try { + info = parseManagedLedgerInfo(optResult.get().getValue()); + info = updateMLInfoTimestamp(info); + callback.onUpdate(info, optResult.get().getStat()); + } catch (InvalidProtocolBufferException e) { + log.error("[{}] Error when parseManagedLedgerInfo", ledgerName, e); + } + } + }, executor.chooseThread(ledgerName)).exceptionally(ex -> { + log.error("[{}] Error when read ManagedLedgerInfo", ledgerName, ex); + return null; + }); + } + // // update timestamp if missing or 0 // 3 cases - timestamp does not exist for ledgers serialized before diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java index 43ad349ca84b9c..34d94efb4942b3 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java @@ -33,6 +33,7 @@ import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor; import org.apache.bookkeeper.mledger.util.SafeRun; import org.apache.bookkeeper.util.SafeRunnable; @@ -143,6 +144,17 @@ public void initiate() { } } + public void initiateShadowWrite() { + if (STATE_UPDATER.compareAndSet(OpAddEntry.this, State.OPEN, State.INITIATED)) { + addOpCount = ManagedLedgerImpl.ADD_OP_COUNT_UPDATER.incrementAndGet(ml); + lastInitTime = System.nanoTime(); + //Use entryId in PublishContext and call addComplete directly. + this.addComplete(BKException.Code.OK, ledger, ((Position) ctx).getEntryId(), addOpCount); + } else { + log.warn("[{}] initiate with unexpected state {}, expect OPEN state.", ml.getName(), state); + } + } + public void failed(ManagedLedgerException e) { AddEntryCallback cb = callbackUpdater.getAndSet(this, null); if (cb != null) { @@ -157,7 +169,6 @@ public void failed(ManagedLedgerException e) { @Override public void addComplete(int rc, final LedgerHandle lh, long entryId, Object ctx) { - if (!STATE_UPDATER.compareAndSet(OpAddEntry.this, State.INITIATED, State.COMPLETED)) { log.warn("[{}] The add op is terminal legacy callback for entry {}-{} adding.", ml.getName(), lh.getId(), entryId); @@ -165,11 +176,14 @@ public void addComplete(int rc, final LedgerHandle lh, long entryId, Object ctx) return; } - if (ledger.getId() != lh.getId()) { - log.warn("[{}] ledgerId {} doesn't match with acked ledgerId {}", ml.getName(), ledger.getId(), lh.getId()); + if (ledger != null && lh != null) { + if (ledger.getId() != lh.getId()) { + log.warn("[{}] ledgerId {} doesn't match with acked ledgerId {}", ml.getName(), ledger.getId(), + lh.getId()); + } + checkArgument(ledger.getId() == lh.getId(), "ledgerId %s doesn't match with acked ledgerId %s", + ledger.getId(), lh.getId()); } - checkArgument(ledger.getId() == lh.getId(), "ledgerId %s doesn't match with acked ledgerId %s", ledger.getId(), - lh.getId()); if (!checkAndCompleteOp(ctx)) { // means callback might have been completed by different thread (timeout task thread).. so do nothing @@ -179,7 +193,7 @@ public void addComplete(int rc, final LedgerHandle lh, long entryId, Object ctx) this.entryId = entryId; if (log.isDebugEnabled()) { log.debug("[{}] [{}] write-complete: ledger-id={} entry-id={} size={} rc={}", this, ml.getName(), - lh.getId(), entryId, dataLength, rc); + lh == null ? -1 : lh.getId(), entryId, dataLength, rc); } if (rc != BKException.Code.OK) { @@ -208,23 +222,27 @@ public void safeRun() { ManagedLedgerImpl.NUMBER_OF_ENTRIES_UPDATER.incrementAndGet(ml); ManagedLedgerImpl.TOTAL_SIZE_UPDATER.addAndGet(ml, dataLength); + + long ledgerId = ledger != null ? ledger.getId() : ((Position) ctx).getLedgerId(); if (ml.hasActiveCursors()) { // Avoid caching entries if no cursor has been created - EntryImpl entry = EntryImpl.create(ledger.getId(), entryId, data); + EntryImpl entry = EntryImpl.create(ledgerId, entryId, data); // EntryCache.insert: duplicates entry by allocating new entry and data. so, recycle entry after calling // insert ml.entryCache.insert(entry); entry.release(); } - PositionImpl lastEntry = PositionImpl.get(ledger.getId(), entryId); + PositionImpl lastEntry = PositionImpl.get(ledgerId, entryId); ManagedLedgerImpl.ENTRIES_ADDED_COUNTER_UPDATER.incrementAndGet(ml); ml.lastConfirmedEntry = lastEntry; if (closeWhenDone) { - log.info("[{}] Closing ledger {} for being full", ml.getName(), ledger.getId()); + log.info("[{}] Closing ledger {} for being full", ml.getName(), ledgerId); // `data` will be released in `closeComplete` - ledger.asyncClose(this, ctx); + if (ledger != null) { + ledger.asyncClose(this, ctx); + } } else { updateLatency(); AddEntryCallback cb = callbackUpdater.getAndSet(this, null); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java index 4bc0b1c87251ef..6831d8680be93d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java @@ -18,40 +18,388 @@ */ package org.apache.bookkeeper.mledger.impl; +import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException; +import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; import java.util.function.Supplier; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.AsyncCallback; +import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.common.util.OrderedScheduler; +import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; -import org.apache.pulsar.common.naming.TopicName; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.proto.MLDataFormats; +import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; +import org.apache.pulsar.metadata.api.Stat; /** - * Working in progress until PIP-180 is finished. - * Currently, it works nothing different with ManagedLedgerImpl. + * Detailed design can be found in PIP-180. */ @Slf4j public class ShadowManagedLedgerImpl extends ManagedLedgerImpl { - - private final TopicName shadowSource; private final String sourceMLName; + private volatile Stat sourceLedgersStat; public ShadowManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper, MetaStore store, ManagedLedgerConfig config, OrderedScheduler scheduledExecutor, String name, final Supplier mlOwnershipChecker) { super(factory, bookKeeper, store, config, scheduledExecutor, name, mlOwnershipChecker); - this.shadowSource = TopicName.get(config.getShadowSource()); - this.sourceMLName = shadowSource.getPersistenceNamingEncoding(); + this.sourceMLName = config.getShadowSourceName(); } + /** + * ShadowManagedLedger init steps: + * 1. this.initialize : read source managedLedgerInfo + * 2. super.initialize : read its own read source managedLedgerInfo + * 3. this.initializeBookKeeper + * 4. super.initializeCursors + */ @Override synchronized void initialize(ManagedLedgerInitializeLedgerCallback callback, Object ctx) { - // TODO: ShadowManagedLedger has different initialize process from normal ManagedLedger, - // which is complicated and will be implemented in the next PRs. - super.initialize(callback, ctx); + log.info("Opening shadow managed ledger {} with source={}", name, sourceMLName); + executor.executeOrdered(name, safeRun(() -> doInitialize(callback, ctx))); + } + + private void doInitialize(ManagedLedgerInitializeLedgerCallback callback, Object ctx) { + // Fetch the list of existing ledgers in the source managed ledger + store.watchManagedLedgerInfo(sourceMLName, (managedLedgerInfo, stat) -> + executor.executeOrdered(name, safeRun(() -> processSourceManagedLedgerInfo(managedLedgerInfo, stat))) + ); + store.getManagedLedgerInfo(sourceMLName, false, null, new MetaStore.MetaStoreCallback<>() { + @Override + public void operationComplete(MLDataFormats.ManagedLedgerInfo mlInfo, Stat stat) { + if (log.isDebugEnabled()) { + log.debug("[{}][{}] Source ML info:{}", name, sourceMLName, mlInfo); + } + if (sourceLedgersStat != null && sourceLedgersStat.getVersion() >= stat.getVersion()) { + log.warn("Newer version of mlInfo is already processed. Previous stat={}, current stat={}", + sourceLedgersStat, stat); + return; + } + sourceLedgersStat = stat; + if (mlInfo.getLedgerInfoCount() == 0) { + // Small chance here, since shadow topic is created after source topic exists. + log.warn("[{}] Source topic ledger list is empty! source={},mlInfo={},stat={}", name, sourceMLName, + mlInfo, stat); + ShadowManagedLedgerImpl.super.initialize(callback, ctx); + return; + } + + if (mlInfo.hasTerminatedPosition()) { + lastConfirmedEntry = new PositionImpl(mlInfo.getTerminatedPosition()); + log.info("[{}][{}] Recovering managed ledger terminated at {}", name, sourceMLName, + lastConfirmedEntry); + } + + for (LedgerInfo ls : mlInfo.getLedgerInfoList()) { + ledgers.put(ls.getLedgerId(), ls); + } + + final long lastLedgerId = ledgers.lastKey(); + mbean.startDataLedgerOpenOp(); + AsyncCallback.OpenCallback opencb = (rc, lh, ctx1) -> executor.executeOrdered(name, safeRun(() -> { + mbean.endDataLedgerOpenOp(); + if (log.isDebugEnabled()) { + log.debug("[{}] Opened source ledger {}", name, lastLedgerId); + } + if (rc == BKException.Code.OK) { + LedgerInfo info = + LedgerInfo.newBuilder() + .setLedgerId(lastLedgerId) + .setEntries(lh.getLastAddConfirmed() + 1) + .setSize(lh.getLength()) + .setTimestamp(clock.millis()).build(); + ledgers.put(lastLedgerId, info); + + //Always consider the last ledger is opened in source. + STATE_UPDATER.set(ShadowManagedLedgerImpl.this, State.LedgerOpened); + currentLedger = lh; + + if (managedLedgerInterceptor != null) { + managedLedgerInterceptor.onManagedLedgerLastLedgerInitialize(name, lh) + .thenRun(() -> ShadowManagedLedgerImpl.super.initialize(callback, ctx)) + .exceptionally(ex -> { + callback.initializeFailed( + new ManagedLedgerException.ManagedLedgerInterceptException( + ex.getCause())); + return null; + }); + } else { + ShadowManagedLedgerImpl.super.initialize(callback, ctx); + } + } else if (isNoSuchLedgerExistsException(rc)) { + log.warn("[{}] Source ledger not found: {}", name, lastLedgerId); + ledgers.remove(lastLedgerId); + ShadowManagedLedgerImpl.super.initialize(callback, ctx); + } else { + log.error("[{}] Failed to open source ledger {}: {}", name, lastLedgerId, + BKException.getMessage(rc)); + callback.initializeFailed(createManagedLedgerException(rc)); + } + })); + //open ledger in readonly mode. + bookKeeper.asyncOpenLedgerNoRecovery(lastLedgerId, digestType, config.getPassword(), opencb, null); + + } + + @Override + public void operationFailed(ManagedLedgerException.MetaStoreException e) { + if (e instanceof ManagedLedgerException.MetadataNotFoundException) { + callback.initializeFailed(new ManagedLedgerException.ManagedLedgerNotFoundException(e)); + } else { + callback.initializeFailed(new ManagedLedgerException(e)); + } + } + }); + } + + @Override + protected boolean isLedgersReadonly() { + return true; + } + + @Override + protected synchronized void initializeBookKeeper(ManagedLedgerInitializeLedgerCallback callback) { + if (log.isDebugEnabled()) { + log.debug("[{}] initializing bookkeeper for shadowManagedLedger; ledgers {}", name, ledgers); + } + + // Calculate total entries and size + Iterator iterator = ledgers.values().iterator(); + while (iterator.hasNext()) { + LedgerInfo li = iterator.next(); + if (li.getEntries() > 0) { + NUMBER_OF_ENTRIES_UPDATER.addAndGet(this, li.getEntries()); + TOTAL_SIZE_UPDATER.addAndGet(this, li.getSize()); + } else if (li.getLedgerId() != currentLedger.getId()) { + //do not remove the last empty ledger. + iterator.remove(); + } + } + + initLastConfirmedEntry(); + // Save it back to ensure all nodes exist and properties are persisted. + store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, new MetaStore.MetaStoreCallback<>() { + @Override + public void operationComplete(Void result, Stat stat) { + ledgersStat = stat; + initializeCursors(callback); + } + + @Override + public void operationFailed(ManagedLedgerException.MetaStoreException e) { + handleBadVersion(e); + callback.initializeFailed(new ManagedLedgerException(e)); + } + }); + } + + private void initLastConfirmedEntry() { + if (currentLedger == null) { + return; + } + lastConfirmedEntry = new PositionImpl(currentLedger.getId(), currentLedger.getLastAddConfirmed()); + // bypass empty ledgers, find last ledger with Message if possible. + while (lastConfirmedEntry.getEntryId() == -1) { + Map.Entry formerLedger = ledgers.lowerEntry(lastConfirmedEntry.getLedgerId()); + if (formerLedger != null) { + LedgerInfo ledgerInfo = formerLedger.getValue(); + lastConfirmedEntry = PositionImpl.get(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1); + } else { + break; + } + } } - public TopicName getShadowSource() { - return shadowSource; + @Override + protected synchronized void internalAsyncAddEntry(OpAddEntry addOperation) { + if (!beforeAddEntry(addOperation)) { + return; + } + if (state != State.LedgerOpened) { + addOperation.failed(new ManagedLedgerException("Managed ledger is not opened")); + return; + } + + if (addOperation.getCtx() == null || !(addOperation.getCtx() instanceof Position position)) { + addOperation.failed(new ManagedLedgerException("Illegal addOperation context object.")); + return; + } + + if (log.isDebugEnabled()) { + log.debug("[{}] Add entry into shadow ledger lh={} entries={}, pos=({},{})", + name, currentLedger.getId(), currentLedgerEntries, position.getLedgerId(), position.getEntryId()); + } + pendingAddEntries.add(addOperation); + if (position.getLedgerId() <= currentLedger.getId()) { + // Write into lastLedger + if (position.getLedgerId() == currentLedger.getId()) { + addOperation.setLedger(currentLedger); + } + currentLedgerEntries = position.getEntryId(); + currentLedgerSize += addOperation.data.readableBytes(); + addOperation.initiateShadowWrite(); + } // for addOperation with ledgerId > currentLedger, will be processed in `updateLedgersIdsComplete` + lastAddEntryTimeMs = System.currentTimeMillis(); + } + + /** + * terminate is not allowed on shadow topic. + * @param callback + * @param ctx + */ + @Override + public synchronized void asyncTerminate(AsyncCallbacks.TerminateCallback callback, Object ctx) { + callback.terminateFailed(new ManagedLedgerException("Terminate is not allowed on shadow topic."), ctx); + } + + /** + * Handle source ManagedLedgerInfo updates. + * Update types: + * 1. new ledgers. + * 2. old ledgers deleted. + * 3. old ledger offload info updated (including ledger deleted from bookie by offloader) + */ + private synchronized void processSourceManagedLedgerInfo(MLDataFormats.ManagedLedgerInfo mlInfo, Stat stat) { + + if (log.isDebugEnabled()) { + log.debug("[{}][{}] new SourceManagedLedgerInfo:{}, prevStat={},stat={}", name, sourceMLName, mlInfo, + sourceLedgersStat, stat); + } + if (sourceLedgersStat != null && sourceLedgersStat.getVersion() >= stat.getVersion()) { + log.warn("Newer version of mlInfo is already processed. Previous stat={}, current stat={}", + sourceLedgersStat, stat); + return; + } + sourceLedgersStat = stat; + + if (mlInfo.hasTerminatedPosition()) { + lastConfirmedEntry = new PositionImpl(mlInfo.getTerminatedPosition()); + log.info("[{}][{}] Process managed ledger terminated at {}", name, sourceMLName, lastConfirmedEntry); + } + + TreeMap newLedgerInfos = new TreeMap<>(); + for (LedgerInfo ls : mlInfo.getLedgerInfoList()) { + newLedgerInfos.put(ls.getLedgerId(), ls); + } + + for (Map.Entry ledgerInfoEntry : newLedgerInfos.entrySet()) { + Long ledgerId = ledgerInfoEntry.getKey(); + LedgerInfo ledgerInfo = ledgerInfoEntry.getValue(); + if (ledgerInfo.getEntries() > 0) { + LedgerInfo oldLedgerInfo = ledgers.put(ledgerId, ledgerInfo); + if (oldLedgerInfo == null) { + log.info("[{}]Read new ledger info from source,ledgerId={}", name, ledgerId); + } else { + if (!oldLedgerInfo.equals(ledgerInfo)) { + log.info("[{}] Old ledger info updated in source,ledgerId={}", name, ledgerId); + // ledger deleted from bookkeeper by offloader. + if (ledgerInfo.hasOffloadContext() + && ledgerInfo.getOffloadContext().getBookkeeperDeleted() + && (!oldLedgerInfo.hasOffloadContext() || !oldLedgerInfo.getOffloadContext() + .getBookkeeperDeleted())) { + log.info("[{}] Old ledger removed from bookkeeper by offloader in source,ledgerId={}", name, + ledgerId); + invalidateReadHandle(ledgerId); + } + } + } + } + } + Long lastLedgerId = newLedgerInfos.lastKey(); + // open the last ledger. + if (lastLedgerId != null && !(currentLedger != null && currentLedger.getId() == lastLedgerId)) { + ledgers.put(lastLedgerId, newLedgerInfos.get(lastLedgerId)); + mbean.startDataLedgerOpenOp(); + //open ledger in readonly mode. + bookKeeper.asyncOpenLedgerNoRecovery(lastLedgerId, digestType, config.getPassword(), + (rc, lh, ctx1) -> executor.executeOrdered(name, safeRun(() -> { + mbean.endDataLedgerOpenOp(); + if (log.isDebugEnabled()) { + log.debug("[{}] Opened new source ledger {}", name, lastLedgerId); + } + if (rc == BKException.Code.OK) { + LedgerInfo info = LedgerInfo.newBuilder() + .setLedgerId(lastLedgerId) + .setEntries(lh.getLastAddConfirmed() + 1) + .setSize(lh.getLength()) + .setTimestamp(clock.millis()).build(); + ledgers.put(lastLedgerId, info); + currentLedger = lh; + currentLedgerEntries = 0; + currentLedgerSize = 0; + initLastConfirmedEntry(); + updateLedgersIdsComplete(); + maybeUpdateCursorBeforeTrimmingConsumedLedger(); + } else if (isNoSuchLedgerExistsException(rc)) { + log.warn("[{}] Source ledger not found: {}", name, lastLedgerId); + ledgers.remove(lastLedgerId); + } else { + log.error("[{}] Failed to open source ledger {}: {}", name, lastLedgerId, + BKException.getMessage(rc)); + } + })), null); + } + + //handle old ledgers deleted. + List ledgersToDelete = new ArrayList<>(ledgers.headMap(newLedgerInfos.firstKey(), false).values()); + if (!ledgersToDelete.isEmpty()) { + log.info("[{}]ledgers deleted in source, size={}", name, ledgersToDelete.size()); + try { + advanceCursorsIfNecessary(ledgersToDelete); + } catch (ManagedLedgerException.LedgerNotExistException e) { + log.info("[{}] First non deleted Ledger is not found, advanceCursors fails", name); + } + doDeleteLedgers(ledgersToDelete); + } + } + + + @Override + public synchronized void asyncClose(AsyncCallbacks.CloseCallback callback, Object ctx) { + store.unwatchManagedLedgerInfo(sourceMLName); + super.asyncClose(callback, ctx); + } + + @Override + protected synchronized void updateLedgersIdsComplete() { + STATE_UPDATER.set(this, State.LedgerOpened); + updateLastLedgerCreatedTimeAndScheduleRolloverTask(); + + if (log.isDebugEnabled()) { + log.debug("[{}] Resending {} pending messages", name, pendingAddEntries.size()); + } + + createNewOpAddEntryForNewLedger(); + + // Process all the pending addEntry requests + for (OpAddEntry op : pendingAddEntries) { + Position position = (Position) op.getCtx(); + if (position.getLedgerId() <= currentLedger.getId()) { + if (position.getLedgerId() == currentLedger.getId()) { + op.setLedger(currentLedger); + } else { + op.setLedger(null); + } + currentLedgerEntries = position.getEntryId(); + currentLedgerSize += op.data.readableBytes(); + op.initiateShadowWrite(); + } else { + break; + } + } + } + + @Override + protected void updateLastLedgerCreatedTimeAndScheduleRolloverTask() { + this.lastLedgerCreatedTimestamp = clock.millis(); } } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImplTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImplTest.java new file mode 100644 index 00000000000000..4482e9944c0ceb --- /dev/null +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImplTest.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl; + +import static org.testng.Assert.*; +import io.netty.buffer.ByteBuf; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.test.MockedBookKeeperTestCase; +import org.awaitility.Awaitility; +import org.testng.annotations.Test; + +@Slf4j +public class ShadowManagedLedgerImplTest extends MockedBookKeeperTestCase { + + private ShadowManagedLedgerImpl openShadowManagedLedger(String name, String sourceName) + throws ManagedLedgerException, InterruptedException { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setShadowSourceName(sourceName); + Map properties = new HashMap<>(); + properties.put(ManagedLedgerConfig.PROPERTY_SOURCE_TOPIC_KEY, "source_topic"); + config.setProperties(properties); + ManagedLedger shadowML = factory.open(name, config); + assertTrue(shadowML instanceof ShadowManagedLedgerImpl); + return (ShadowManagedLedgerImpl) shadowML; + } + + @Test + public void testShadowWrites() throws Exception { + ManagedLedgerImpl sourceML = (ManagedLedgerImpl) factory.open("source_ML", new ManagedLedgerConfig() + .setMaxEntriesPerLedger(2) + .setRetentionTime(-1, TimeUnit.DAYS) + .setRetentionSizeInMB(-1)); + byte[] data = new byte[10]; + List positions = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + Position pos = sourceML.addEntry(data); + log.info("pos={}", pos); + positions.add(pos); + } + log.info("currentLedgerId:{}", sourceML.currentLedger.getId()); + assertEquals(sourceML.ledgers.size(), 3); + + ShadowManagedLedgerImpl shadowML = openShadowManagedLedger("shadow_ML", "source_ML"); + //After init, the state should be the same. + assertEquals(shadowML.ledgers.size(), 3); + assertEquals(sourceML.currentLedger.getId(), shadowML.currentLedger.getId()); + assertEquals(sourceML.lastConfirmedEntry, shadowML.lastConfirmedEntry); + + //Add new data to source ML + Position newPos = sourceML.addEntry(data); + + // The state should not be the same. + log.info("Source.LCE={},Shadow.LCE={}", sourceML.lastConfirmedEntry, shadowML.lastConfirmedEntry); + assertNotEquals(sourceML.lastConfirmedEntry, shadowML.lastConfirmedEntry); + + //Add new data to source ML, and a new ledger rolled + newPos = sourceML.addEntry(data); + assertEquals(sourceML.ledgers.size(), 4); + Awaitility.await().untilAsserted(()->assertEquals(shadowML.ledgers.size(), 4)); + log.info("Source.LCE={},Shadow.LCE={}", sourceML.lastConfirmedEntry, shadowML.lastConfirmedEntry); + Awaitility.await().untilAsserted(()->assertEquals(sourceML.lastConfirmedEntry, shadowML.lastConfirmedEntry)); + + {// test write entry with ledgerId < currentLedger + CompletableFuture future = new CompletableFuture<>(); + shadowML.asyncAddEntry(data, new AsyncCallbacks.AddEntryCallback() { + @Override + public void addComplete(Position position, ByteBuf entryData, Object ctx) { + future.complete(position); + } + + @Override + public void addFailed(ManagedLedgerException exception, Object ctx) { + future.completeExceptionally(exception); + } + }, positions.get(2)); + assertEquals(future.get(), positions.get(2)); + // LCE is not updated. + log.info("1.Source.LCE={},Shadow.LCE={}", sourceML.lastConfirmedEntry, shadowML.lastConfirmedEntry); + assertNotEquals(sourceML.lastConfirmedEntry, shadowML.lastConfirmedEntry); + } + + {// test write entry with ledgerId == currentLedger + newPos = sourceML.addEntry(data); + assertEquals(sourceML.ledgers.size(), 4); + assertNotEquals(sourceML.lastConfirmedEntry, shadowML.lastConfirmedEntry); + + CompletableFuture future = new CompletableFuture<>(); + shadowML.asyncAddEntry(data, new AsyncCallbacks.AddEntryCallback() { + @Override + public void addComplete(Position position, ByteBuf entryData, Object ctx) { + future.complete(position); + } + + @Override + public void addFailed(ManagedLedgerException exception, Object ctx) { + future.completeExceptionally(exception); + } + }, newPos); + assertEquals(future.get(), newPos); + // LCE should be updated. + log.info("2.Source.LCE={},Shadow.LCE={}", sourceML.lastConfirmedEntry, shadowML.lastConfirmedEntry); + assertEquals(sourceML.lastConfirmedEntry, shadowML.lastConfirmedEntry); + } + + {// test write entry with ledgerId > currentLedger + PositionImpl fakePos = PositionImpl.get(newPos.getLedgerId() + 1, newPos.getEntryId()); + + CompletableFuture future = new CompletableFuture<>(); + shadowML.asyncAddEntry(data, new AsyncCallbacks.AddEntryCallback() { + @Override + public void addComplete(Position position, ByteBuf entryData, Object ctx) { + future.complete(position); + } + + @Override + public void addFailed(ManagedLedgerException exception, Object ctx) { + future.completeExceptionally(exception); + } + }, fakePos); + //This write will be queued unit new ledger is rolled in source. + + newPos = sourceML.addEntry(data); // new ledger rolled. + newPos = sourceML.addEntry(data); + Awaitility.await().untilAsserted(() -> assertEquals(shadowML.ledgers.size(), 5)); + assertEquals(future.get(), fakePos); + // LCE should be updated. + log.info("3.Source.LCE={},Shadow.LCE={}", sourceML.lastConfirmedEntry, shadowML.lastConfirmedEntry); + assertEquals(sourceML.lastConfirmedEntry, shadowML.lastConfirmedEntry); + } + } +} \ No newline at end of file diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 02e9adcca3722f..c9f95ab524f55d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -620,13 +620,15 @@ public String getReplicatorPrefix() { return replicatorPrefix; } + protected String getSchemaId() { + String base = TopicName.get(getName()).getPartitionedTopicName(); + return TopicName.get(base).getSchemaName(); + } @Override public CompletableFuture hasSchema() { - String base = TopicName.get(getName()).getPartitionedTopicName(); - String id = TopicName.get(base).getSchemaName(); return brokerService.pulsar() .getSchemaRegistryService() - .getSchema(id).thenApply(Objects::nonNull); + .getSchema(getSchemaId()).thenApply(Objects::nonNull); } @Override @@ -635,8 +637,7 @@ public CompletableFuture addSchema(SchemaData schema) { return CompletableFuture.completedFuture(SchemaVersion.Empty); } - String base = TopicName.get(getName()).getPartitionedTopicName(); - String id = TopicName.get(base).getSchemaName(); + String id = getSchemaId(); SchemaRegistryService schemaRegistryService = brokerService.pulsar().getSchemaRegistryService(); if (allowAutoUpdateSchema()) { @@ -667,8 +668,7 @@ private boolean allowAutoUpdateSchema() { @Override public CompletableFuture deleteSchema() { - String base = TopicName.get(getName()).getPartitionedTopicName(); - String id = TopicName.get(base).getSchemaName(); + String id = getSchemaId(); SchemaRegistryService schemaRegistryService = brokerService.pulsar().getSchemaRegistryService(); return BookkeeperSchemaStorage.ignoreUnrecoverableBKException(schemaRegistryService.getSchema(id)) .thenCompose(schema -> { @@ -687,8 +687,7 @@ public CompletableFuture deleteSchema() { @Override public CompletableFuture checkSchemaCompatibleForConsumer(SchemaData schema) { - String base = TopicName.get(getName()).getPartitionedTopicName(); - String id = TopicName.get(base).getSchemaName(); + String id = getSchemaId(); return brokerService.pulsar() .getSchemaRegistryService() .checkConsumerCompatibility(id, schema, getSchemaCompatibilityStrategy()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 2015128b9e800f..c5f3d508a56db2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.service; import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.bookkeeper.mledger.ManagedLedgerConfig.PROPERTY_SOURCE_TOPIC_KEY; import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun; import static org.apache.commons.collections4.CollectionUtils.isEmpty; import static org.apache.commons.lang3.StringUtils.isNotBlank; @@ -85,6 +86,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader; import org.apache.bookkeeper.mledger.util.Futures; +import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.bookie.rackawareness.IsolatedBookieEnsemblePlacementPolicy; import org.apache.pulsar.broker.PulsarServerException; @@ -1452,8 +1454,17 @@ protected CompletableFuture> fetchTopicPropertiesAsync(Topic if (metadata.partitions == PartitionedTopicMetadata.NON_PARTITIONED) { return managedLedgerFactory.getManagedLedgerPropertiesAsync( topicName.getPersistenceNamingEncoding()); + } else { + // Check if the partitioned topic is a ShadowTopic + if (MapUtils.getString(metadata.properties, PROPERTY_SOURCE_TOPIC_KEY) != null) { + String sourceTopic = metadata.properties.get(PROPERTY_SOURCE_TOPIC_KEY); + Map result = new HashMap<>(); + result.put(PROPERTY_SOURCE_TOPIC_KEY, TopicName.getTopicPartitionNameString( + sourceTopic, topicName.getPartitionIndex())); + return CompletableFuture.completedFuture(result); + } + return CompletableFuture.completedFuture(null); } - return CompletableFuture.completedFuture(metadata.properties); }); } } @@ -1473,6 +1484,7 @@ private void checkOwnershipAndCreatePersistentTopic(final String topic, boolean propertiesFuture = CompletableFuture.completedFuture(properties); } propertiesFuture.thenAccept(finalProperties -> + //TODO add topicName in properties? createPersistentTopic(topic, createIfMissing, topicFuture, finalProperties) ).exceptionally(throwable -> { log.warn("[{}] Read topic property failed", topic, throwable); @@ -1528,6 +1540,10 @@ private void createPersistentTopic(final String topic, boolean createIfMissing, } managedLedgerConfig.setCreateIfMissing(createIfMissing); managedLedgerConfig.setProperties(properties); + String shadowSource = managedLedgerConfig.getShadowSource(); + if (shadowSource != null) { + managedLedgerConfig.setShadowSourceName(TopicName.get(shadowSource).getPersistenceNamingEncoding()); + } // Once we have the configuration, we can proceed with the async open operation managedLedgerFactory.asyncOpen(topicName.getPersistenceNamingEncoding(), managedLedgerConfig, @@ -1779,7 +1795,6 @@ public CompletableFuture getManagedLedgerConfig(TopicName t serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled()); managedLedgerConfig.setNewEntriesCheckDelayInMillis( serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis()); - return managedLedgerConfig; }); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index d7b99f9e8146e6..fe2249679a84cc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import org.apache.bookkeeper.mledger.Position; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.service.BrokerServiceException.TopicClosedException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException; @@ -85,6 +86,7 @@ public class Producer { private final boolean isRemote; private final String remoteCluster; private final boolean isNonPersistentTopic; + private final boolean isShadowTopic; private final boolean isEncrypted; private final ProducerAccessMode accessMode; @@ -116,6 +118,8 @@ public Producer(Topic topic, TransportCnx cnx, long producerId, String producerN this.chunkedMessageRate = new Rate(); this.isNonPersistentTopic = topic instanceof NonPersistentTopic; this.msgDrop = this.isNonPersistentTopic ? new Rate() : null; + this.isShadowTopic = + topic instanceof PersistentTopic && ((PersistentTopic) topic).getShadowSourceTopic().isPresent(); this.metadata = metadata != null ? metadata : Collections.emptyMap(); @@ -182,14 +186,14 @@ public boolean isSuccessorTo(Producer other) { } public void publishMessage(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize, - boolean isChunked, boolean isMarker) { - if (checkAndStartPublish(producerId, sequenceId, headersAndPayload, batchSize)) { - publishMessageToTopic(headersAndPayload, sequenceId, batchSize, isChunked, isMarker); + boolean isChunked, boolean isMarker, Position position) { + if (checkAndStartPublish(producerId, sequenceId, headersAndPayload, batchSize, position)) { + publishMessageToTopic(headersAndPayload, sequenceId, batchSize, isChunked, isMarker, position); } } public void publishMessage(long producerId, long lowestSequenceId, long highestSequenceId, - ByteBuf headersAndPayload, long batchSize, boolean isChunked, boolean isMarker) { + ByteBuf headersAndPayload, long batchSize, boolean isChunked, boolean isMarker, Position position) { if (lowestSequenceId > highestSequenceId) { cnx.execute(() -> { cnx.getCommandSender().sendSendError(producerId, highestSequenceId, ServerError.MetadataError, @@ -198,13 +202,22 @@ public void publishMessage(long producerId, long lowestSequenceId, long highestS }); return; } - if (checkAndStartPublish(producerId, highestSequenceId, headersAndPayload, batchSize)) { + if (checkAndStartPublish(producerId, highestSequenceId, headersAndPayload, batchSize, position)) { publishMessageToTopic(headersAndPayload, lowestSequenceId, highestSequenceId, batchSize, isChunked, - isMarker); + isMarker, position); } } - public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize) { + public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize, + Position position) { + if (isShadowTopic && position == null || !isShadowTopic && position != null) { + cnx.execute(() -> { + cnx.getCommandSender().sendSendError(producerId, sequenceId, ServerError.NotAllowedError, + "Only shadow topic supports sending messages with messageId"); + cnx.completedSendOperation(isNonPersistentTopic, headersAndPayload.readableBytes()); + }); + return false; + } if (isClosed) { cnx.execute(() -> { cnx.getCommandSender().sendSendError(producerId, sequenceId, ServerError.PersistenceError, @@ -246,20 +259,20 @@ public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf he } private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, long batchSize, boolean isChunked, - boolean isMarker) { + boolean isMarker, Position position) { MessagePublishContext messagePublishContext = MessagePublishContext.get(this, sequenceId, msgIn, headersAndPayload.readableBytes(), - batchSize, isChunked, System.nanoTime(), isMarker); + batchSize, isChunked, System.nanoTime(), isMarker, position); this.cnx.getBrokerService().getInterceptor() .onMessagePublish(this, headersAndPayload, messagePublishContext); topic.publishMessage(headersAndPayload, messagePublishContext); } private void publishMessageToTopic(ByteBuf headersAndPayload, long lowestSequenceId, long highestSequenceId, - long batchSize, boolean isChunked, boolean isMarker) { + long batchSize, boolean isChunked, boolean isMarker, Position position) { MessagePublishContext messagePublishContext = MessagePublishContext.get(this, lowestSequenceId, highestSequenceId, msgIn, headersAndPayload.readableBytes(), batchSize, - isChunked, System.nanoTime(), isMarker); + isChunked, System.nanoTime(), isMarker, position); this.cnx.getBrokerService().getInterceptor() .onMessagePublish(this, headersAndPayload, messagePublishContext); topic.publishMessage(headersAndPayload, messagePublishContext); @@ -335,7 +348,10 @@ public TransportCnx getCnx() { return this.cnx; } - private static final class MessagePublishContext implements PublishContext, Runnable { + /** + * MessagePublishContext implements Position because that ShadowManagedLedger need to know the source position info. + */ + private static final class MessagePublishContext implements PublishContext, Runnable, Position { /* * To store context information built by message payload * processors (time duration, size etc), if any configured @@ -361,6 +377,21 @@ private static final class MessagePublishContext implements PublishContext, Runn private long entryTimestamp; + @Override + public Position getNext() { + return null; + } + + @Override + public long getLedgerId() { + return ledgerId; + } + + @Override + public long getEntryId() { + return entryId; + } + public String getProducerName() { return producer.getProducerName(); } @@ -505,7 +536,7 @@ public void run() { } static MessagePublishContext get(Producer producer, long sequenceId, Rate rateIn, int msgSize, - long batchSize, boolean chunked, long startTimeNs, boolean isMarker) { + long batchSize, boolean chunked, long startTimeNs, boolean isMarker, Position position) { MessagePublishContext callback = RECYCLER.get(); callback.producer = producer; callback.sequenceId = sequenceId; @@ -517,6 +548,8 @@ static MessagePublishContext get(Producer producer, long sequenceId, Rate rateIn callback.originalSequenceId = -1L; callback.startTimeNs = startTimeNs; callback.isMarker = isMarker; + callback.ledgerId = position == null ? -1 : position.getLedgerId(); + callback.entryId = position == null ? -1 : position.getEntryId(); if (callback.propertyMap != null) { callback.propertyMap.clear(); } @@ -524,7 +557,7 @@ static MessagePublishContext get(Producer producer, long sequenceId, Rate rateIn } static MessagePublishContext get(Producer producer, long lowestSequenceId, long highestSequenceId, Rate rateIn, - int msgSize, long batchSize, boolean chunked, long startTimeNs, boolean isMarker) { + int msgSize, long batchSize, boolean chunked, long startTimeNs, boolean isMarker, Position position) { MessagePublishContext callback = RECYCLER.get(); callback.producer = producer; callback.sequenceId = lowestSequenceId; @@ -537,6 +570,8 @@ static MessagePublishContext get(Producer producer, long lowestSequenceId, long callback.startTimeNs = startTimeNs; callback.chunked = chunked; callback.isMarker = isMarker; + callback.ledgerId = position == null ? -1 : position.getLedgerId(); + callback.entryId = position == null ? -1 : position.getEntryId(); if (callback.propertyMap != null) { callback.propertyMap.clear(); } @@ -757,10 +792,10 @@ public void checkEncryption() { public void publishTxnMessage(TxnID txnID, long producerId, long sequenceId, long highSequenceId, ByteBuf headersAndPayload, long batchSize, boolean isChunked, boolean isMarker) { - checkAndStartPublish(producerId, sequenceId, headersAndPayload, batchSize); + checkAndStartPublish(producerId, sequenceId, headersAndPayload, batchSize, null); MessagePublishContext messagePublishContext = MessagePublishContext.get(this, sequenceId, highSequenceId, msgIn, - headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker); + headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker, null); this.cnx.getBrokerService().getInterceptor() .onMessagePublish(this, headersAndPayload, messagePublishContext); topic.publishTxnMessage(txnID, headersAndPayload, messagePublishContext); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 3ce02e23489edb..15842f45985499 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1596,13 +1596,17 @@ protected void handleSend(CommandSend send, ByteBuf headersAndPayload) { return; } + // This position is only used for shadow replicator + Position position = send.hasMessageId() + ? PositionImpl.get(send.getMessageId().getLedgerId(), send.getMessageId().getEntryId()) : null; + // Persist the message if (send.hasHighestSequenceId() && send.getSequenceId() <= send.getHighestSequenceId()) { producer.publishMessage(send.getProducerId(), send.getSequenceId(), send.getHighestSequenceId(), - headersAndPayload, send.getNumMessages(), send.isIsChunk(), send.isMarker()); + headersAndPayload, send.getNumMessages(), send.isIsChunk(), send.isMarker(), position); } else { producer.publishMessage(send.getProducerId(), send.getSequenceId(), headersAndPayload, - send.getNumMessages(), send.isIsChunk(), send.isMarker()); + send.getNumMessages(), send.isIsChunk(), send.isMarker(), position); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index ea20d413484cf4..7cf9ad722676a8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -306,7 +306,7 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS } transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry()); if (ledger instanceof ShadowManagedLedgerImpl) { - shadowSourceTopic = ((ShadowManagedLedgerImpl) ledger).getShadowSource(); + shadowSourceTopic = TopicName.get(ledger.getConfig().getShadowSource()); } else { shadowSourceTopic = null; } @@ -1767,6 +1767,17 @@ public int getNumberOfSameAddressConsumers(final String clientAddress) { return getNumberOfSameAddressConsumers(clientAddress, subscriptions.values()); } + @Override + protected String getSchemaId() { + if (shadowSourceTopic == null) { + return super.getSchemaId(); + } else { + //reuse schema from shadow source. + String base = shadowSourceTopic.getPartitionedTopicName(); + return TopicName.get(base).getSchemaName(); + } + } + @Override public ConcurrentOpenHashMap getSubscriptions() { return subscriptions; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java index 0ab645f0890486..cc169f355f0233 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.service.persistent; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeUnit; @@ -30,7 +31,6 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.common.policies.data.TenantInfoImpl; -import org.assertj.core.util.Lists; import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.BeforeClass; @@ -62,10 +62,14 @@ public void testShadowReplication() throws Exception { String shadowTopicName = "persistent://prop1/ns-shadow/shadow-topic"; String shadowTopicName2 = "persistent://prop1/ns-shadow/shadow-topic-2"; + admin.topics().createNonPartitionedTopic(sourceTopicName); + admin.topics().createShadowTopic(shadowTopicName, sourceTopicName); + admin.topics().createShadowTopic(shadowTopicName2, sourceTopicName); + admin.topics().setShadowTopics(sourceTopicName, Lists.newArrayList(shadowTopicName, shadowTopicName2)); + @Cleanup Producer producer = pulsarClient.newProducer().topic(sourceTopicName).create(); - // NOTE: shadow topic is not ready yet. So use normal topic instead. - // The only difference for consumer should be that the message id is changed. + @Cleanup Consumer shadowConsumer = pulsarClient.newConsumer().topic(shadowTopicName).subscriptionName("shadow-sub") @@ -78,8 +82,6 @@ public void testShadowReplication() throws Exception { PersistentTopic sourceTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(sourceTopicName).get().get(); - admin.topics().setShadowTopics(sourceTopicName, Lists.newArrayList(shadowTopicName, shadowTopicName2)); - Awaitility.await().untilAsserted(()->Assert.assertEquals(sourceTopic.getShadowReplicators().size(), 2)); ShadowReplicator @@ -129,7 +131,6 @@ public void testShadowReplication() throws Exception { //`replicatedFrom` is set as localClusterName in shadow topic. Assert.assertNotEquals(shadowMessage.getReplicatedFrom(), sourceMessage.getReplicatedFrom()); - //Currently, msg is copied in BK. So the message id is not the same. - Assert.assertNotEquals(shadowMessage.getMessageId(), sourceMessage.getMessageId()); + Assert.assertEquals(shadowMessage.getMessageId(), sourceMessage.getMessageId()); } } \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicTest.java index 9026f406b4312a..5f8e8f6ffd9b99 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicTest.java @@ -19,10 +19,25 @@ package org.apache.pulsar.broker.service.persistent; +import com.google.common.collect.Lists; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.TimeUnit; +import lombok.AllArgsConstructor; +import lombok.Cleanup; +import lombok.Data; +import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.impl.ShadowManagedLedgerImpl; import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.common.naming.TopicName; +import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -43,10 +58,14 @@ protected void cleanup() throws Exception { internalCleanup(); } - @Test() + private String newShadowSourceTopicName() { + return "persistent://" + newTopicName(); + } + + @Test public void testNonPartitionedShadowTopicSetup() throws Exception { - String sourceTopic = "persistent://prop/ns-abc/source"; - String shadowTopic = "persistent://prop/ns-abc/shadow"; + String sourceTopic = newShadowSourceTopicName(); + String shadowTopic = sourceTopic + "-shadow"; //1. test shadow topic setting in topic creation. admin.topics().createNonPartitionedTopic(sourceTopic); admin.topics().createShadowTopic(shadowTopic, sourceTopic); @@ -65,20 +84,22 @@ public void testNonPartitionedShadowTopicSetup() throws Exception { Assert.assertEquals(brokerShadowTopic.getShadowSourceTopic().get().toString(), sourceTopic); } - @Test() + @Test public void testPartitionedShadowTopicSetup() throws Exception { - String sourceTopic = "persistent://prop/ns-abc/source-p"; - String shadowTopic = "persistent://prop/ns-abc/shadow-p"; + String sourceTopic = newShadowSourceTopicName(); + String shadowTopic = sourceTopic + "-shadow"; + String sourceTopicPartition = TopicName.get(sourceTopic).getPartition(0).toString(); String shadowTopicPartition = TopicName.get(shadowTopic).getPartition(0).toString(); //1. test shadow topic setting in topic creation. admin.topics().createPartitionedTopic(sourceTopic, 2); admin.topics().createShadowTopic(shadowTopic, sourceTopic); pulsarClient.newProducer().topic(shadowTopic).create().close();//trigger loading partitions. + PersistentTopic brokerShadowTopic = (PersistentTopic) pulsar.getBrokerService() .getTopicIfExists(shadowTopicPartition).get().get(); Assert.assertTrue(brokerShadowTopic.getManagedLedger() instanceof ShadowManagedLedgerImpl); - Assert.assertEquals(brokerShadowTopic.getShadowSourceTopic().get().toString(), sourceTopic); + Assert.assertEquals(brokerShadowTopic.getShadowSourceTopic().get().toString(), sourceTopicPartition); Assert.assertEquals(admin.topics().getShadowSource(shadowTopic), sourceTopic); //2. test shadow topic could be properly loaded after unload. @@ -89,8 +110,135 @@ public void testPartitionedShadowTopicSetup() throws Exception { brokerShadowTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(shadowTopicPartition).get().get(); Assert.assertTrue(brokerShadowTopic.getManagedLedger() instanceof ShadowManagedLedgerImpl); - Assert.assertEquals(brokerShadowTopic.getShadowSourceTopic().get().toString(), sourceTopic); + Assert.assertEquals(brokerShadowTopic.getShadowSourceTopic().get().toString(), sourceTopicPartition); + } + + @Test + public void testShadowTopicNotWritable() throws Exception { + String sourceTopic = newShadowSourceTopicName(); + String shadowTopic = sourceTopic + "-shadow"; + admin.topics().createNonPartitionedTopic(sourceTopic); + admin.topics().createShadowTopic(shadowTopic, sourceTopic); + @Cleanup + Producer producer = pulsarClient.newProducer().topic(shadowTopic).create(); + Assert.expectThrows(PulsarClientException.NotAllowedException.class, ()-> producer.send(new byte[]{1,2,3})); + } + + private void awaitUntilShadowReplicatorReady(String sourceTopic, String shadowTopic) { + Awaitility.await().untilAsserted(()->{ + PersistentTopic sourcePersistentTopic = + (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(sourceTopic).get().get(); + ShadowReplicator + replicator = (ShadowReplicator) sourcePersistentTopic.getShadowReplicators().get(shadowTopic); + if (replicator == null) { + return; + } + Assert.assertEquals(String.valueOf(replicator.getState()), "Started"); + }); + } + @Test + public void testShadowTopicConsuming() throws Exception { + String sourceTopic = newShadowSourceTopicName(); + String shadowTopic = sourceTopic + "-shadow"; + admin.topics().createNonPartitionedTopic(sourceTopic); + admin.topics().createShadowTopic(shadowTopic, sourceTopic); + admin.topics().setShadowTopics(sourceTopic, Lists.newArrayList(shadowTopic)); + awaitUntilShadowReplicatorReady(sourceTopic, shadowTopic); + + @Cleanup Producer producer = pulsarClient.newProducer().topic(sourceTopic).create(); + @Cleanup Consumer consumer = + pulsarClient.newConsumer().topic(shadowTopic).subscriptionName("sub").subscribe(); + byte[] content = "Hello Shadow Topic".getBytes(StandardCharsets.UTF_8); + MessageId id = producer.send(content); + log.info("msg send to source topic, id={}", id); + Message msg = consumer.receive(5, TimeUnit.SECONDS); + Assert.assertEquals(msg.getMessageId(), id); + Assert.assertEquals(msg.getValue(), content); + } + + + @Test + public void testShadowTopicConsumingWithStringSchema() throws Exception { + String sourceTopic = newShadowSourceTopicName(); + String shadowTopic = sourceTopic + "-shadow"; + admin.topics().createNonPartitionedTopic(sourceTopic); + admin.topics().createShadowTopic(shadowTopic, sourceTopic); + admin.topics().setShadowTopics(sourceTopic, Lists.newArrayList(shadowTopic)); + awaitUntilShadowReplicatorReady(sourceTopic, shadowTopic); + + @Cleanup Producer producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create(); + @Cleanup Consumer consumer = + pulsarClient.newConsumer(Schema.STRING).topic(shadowTopic).subscriptionName("sub") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe(); + String content = "Hello Shadow Topic"; + MessageId id = producer.send(content); + Message msg = consumer.receive(); + Assert.assertEquals(msg.getMessageId(), id); + Assert.assertEquals(msg.getValue(), content); + + for (int i = 0; i < 10; i++) { + producer.send(content + i); + } + for (int i = 0; i < 10; i++) { + Assert.assertEquals(consumer.receive().getValue(), content + i); + } } + @AllArgsConstructor + @NoArgsConstructor + @Data + private static class Point { + int x; + int y; + } + @Test + public void testShadowTopicConsumingWithJsonSchema() throws Exception { + String sourceTopic = newShadowSourceTopicName(); + String shadowTopic = sourceTopic + "-shadow"; + admin.topics().createNonPartitionedTopic(sourceTopic); + admin.topics().createShadowTopic(shadowTopic, sourceTopic); + admin.topics().setShadowTopics(sourceTopic, Lists.newArrayList(shadowTopic)); + awaitUntilShadowReplicatorReady(sourceTopic, shadowTopic); + + @Cleanup Producer producer = + pulsarClient.newProducer(Schema.JSON(Point.class)).topic(sourceTopic).create(); + @Cleanup Consumer consumer = + pulsarClient.newConsumer(Schema.JSON(Point.class)).topic(shadowTopic).subscriptionName("sub") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe(); + Point content = new Point(1, 2); + MessageId id = producer.send(content); + Message msg = consumer.receive(); + Assert.assertEquals(msg.getMessageId(), id); + Assert.assertEquals(msg.getValue(), content); + } + + @Test + public void testConsumeShadowMessageWithoutCache() throws Exception { + String sourceTopic = newShadowSourceTopicName(); + String shadowTopic = sourceTopic + "-shadow"; + admin.topics().createNonPartitionedTopic(sourceTopic); + @Cleanup Producer producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create(); + String content = "Hello Shadow Topic"; + MessageId id = producer.send(content); + for (int i = 0; i < 10; i++) { + producer.send(content + i); + } + + admin.topics().createShadowTopic(shadowTopic, sourceTopic); + // disable shadow replicator + // admin.topics().setShadowTopics(sourceTopic, Lists.newArrayList(shadowTopic)); + @Cleanup Consumer consumer = + pulsarClient.newConsumer(Schema.STRING).topic(shadowTopic).subscriptionName("sub") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + Message msg = consumer.receive(); + Assert.assertEquals(msg.getMessageId(), id); + Assert.assertEquals(msg.getValue(), content); + + for (int i = 0; i < 10; i++) { + Assert.assertEquals(consumer.receive().getValue(), content + i); + } + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index e82bcb7cad6e73..dceaf502141821 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -533,6 +533,8 @@ public void sendAsync(Message message, SendCallback callback) { ? msg.getMessageBuilder().getSchemaVersion() : null; byte[] orderingKey = totalChunks > 1 && msg.getMessageBuilder().hasOrderingKey() ? msg.getMessageBuilder().getOrderingKey() : null; + // msg.messageId will be reset if previous message chunk is sent successfully. + final MessageId messageId = msg.getMessageId(); for (int chunkId = 0; chunkId < totalChunks; chunkId++) { // Need to reset the schemaVersion, because the schemaVersion is based on a ByteBuf object in // `MessageMetadata`, if we want to re-serialize the `SEND` command using a same `MessageMetadata`, @@ -555,7 +557,7 @@ public void sendAsync(Message message, SendCallback callback) { synchronized (this) { serializeAndSendMessage(msg, payload, sequenceId, uuid, chunkId, totalChunks, readStartIndex, payloadChunkSize, compressedPayload, compressed, - compressedPayload.readableBytes(), callback, chunkedMessageCtx); + compressedPayload.readableBytes(), callback, chunkedMessageCtx, messageId); readStartIndex = ((chunkId + 1) * payloadChunkSize); } } @@ -617,7 +619,8 @@ private void serializeAndSendMessage(MessageImpl msg, boolean compressed, int compressedPayloadSize, SendCallback callback, - ChunkedMessageCtx chunkedMessageCtx) throws IOException { + ChunkedMessageCtx chunkedMessageCtx, + MessageId messageId) throws IOException { ByteBuf chunkPayload = compressedPayload; MessageMetadata msgMetadata = msg.getMessageBuilder(); if (totalChunks > 1 && TopicName.get(topic).isPersistent()) { @@ -686,14 +689,14 @@ private void serializeAndSendMessage(MessageImpl msg, : 1; final OpSendMsg op; if (msg.getSchemaState() == MessageImpl.SchemaState.Ready) { - ByteBufPair cmd = sendMessage(producerId, sequenceId, numMessages, msg.getMessageId(), msgMetadata, + ByteBufPair cmd = sendMessage(producerId, sequenceId, numMessages, messageId, msgMetadata, encryptedPayload); op = OpSendMsg.create(msg, cmd, sequenceId, callback); } else { op = OpSendMsg.create(msg, null, sequenceId, callback); final MessageMetadata finalMsgMetadata = msgMetadata; op.rePopulate = () -> { - op.cmd = sendMessage(producerId, sequenceId, numMessages, msg.getMessageId(), finalMsgMetadata, + op.cmd = sendMessage(producerId, sequenceId, numMessages, messageId, finalMsgMetadata, encryptedPayload); }; } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java index 52c3ef3a3cada7..7987a54630442d 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java @@ -290,6 +290,14 @@ public static int getPartitionIndex(String topic) { return partitionIndex; } + /** + * A helper method to get a partition name of a topic in String. + * @return topic + "-partition-" + partition. + */ + public static String getTopicPartitionNameString(String topic, int partitionIndex) { + return topic + PARTITIONED_TOPIC_SUFFIX + partitionIndex; + } + /** * Returns the http rest path for use in the admin web service. * Eg: