diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index 92c9c91198134..0efd1ca2a823c 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -32,6 +32,7 @@ import org.apache.bookkeeper.common.annotation.InterfaceStability; import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader; import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor; +import org.apache.commons.collections4.MapUtils; import org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet; /** @@ -742,4 +743,10 @@ public int getMaxBacklogBetweenCursorsForCaching() { public void setMaxBacklogBetweenCursorsForCaching(int maxBacklogBetweenCursorsForCaching) { this.maxBacklogBetweenCursorsForCaching = maxBacklogBetweenCursorsForCaching; } + + public String getShadowSource() { + return MapUtils.getString(properties, PROPERTY_SOURCE_TOPIC_KEY); + } + + public static final String PROPERTY_SOURCE_TOPIC_KEY = "PULSAR.SHADOW_SOURCE"; } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java index e42c2581ba101..667d641ac9ae0 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java @@ -18,6 +18,7 @@ */ package org.apache.bookkeeper.mledger; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; import org.apache.bookkeeper.common.annotation.InterfaceAudience; @@ -197,4 +198,8 @@ void asyncOpenReadOnlyCursor(String managedLedgerName, Position startPosition, M * */ long getCacheEvictionTimeThreshold(); + /** + * @return properties of this managedLedger. + */ + CompletableFuture> getManagedLedgerPropertiesAsync(String name); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index d7596a7468a40..e0af1cc632612 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -367,11 +367,13 @@ public void asyncOpen(final String name, final ManagedLedgerConfig config, final ledgers.computeIfAbsent(name, (mlName) -> { // Create the managed ledger CompletableFuture future = new CompletableFuture<>(); - final ManagedLedgerImpl newledger = new ManagedLedgerImpl(this, - bookkeeperFactory.get( - new EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(), - config.getBookKeeperEnsemblePlacementPolicyProperties())), - store, config, scheduledExecutor, name, mlOwnershipChecker); + BookKeeper bk = bookkeeperFactory.get( + new EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(), + config.getBookKeeperEnsemblePlacementPolicyProperties())); + final ManagedLedgerImpl newledger = config.getShadowSource() == null + ? new ManagedLedgerImpl(this, bk, store, config, scheduledExecutor, name, mlOwnershipChecker) + : new ShadowManagedLedgerImpl(this, bk, store, config, scheduledExecutor, name, + mlOwnershipChecker); PendingInitializeManagedLedger pendingLedger = new PendingInitializeManagedLedger(newledger); pendingInitializeLedgers.put(name, pendingLedger); newledger.initialize(new ManagedLedgerInitializeLedgerCallback() { @@ -954,6 +956,11 @@ public void operationFailed(MetaStoreException e) { return future; } + @Override + public CompletableFuture> getManagedLedgerPropertiesAsync(String name) { + return store.getManagedLedgerPropertiesAsync(name); + } + public MetaStore getMetaStore() { return store; } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java index 35f109b21dc5c..3d60066782af2 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java @@ -158,4 +158,13 @@ void asyncUpdateCursorInfo(String ledgerName, String cursorName, ManagedCursorIn * if the operation succeeds. */ CompletableFuture asyncExists(String ledgerName); + + + /** + * Get managed ledger properties from meta store. + * + * @param name ledgerName + * @return a future represents the result of the operation. + */ + CompletableFuture> getManagedLedgerPropertiesAsync(String name); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java index a501b9e43dc0f..2949902ac353f 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java @@ -23,6 +23,8 @@ import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -148,6 +150,33 @@ public void getManagedLedgerInfo(String ledgerName, boolean createIfMissing, Map }); } + public CompletableFuture> getManagedLedgerPropertiesAsync(String name) { + CompletableFuture> result = new CompletableFuture<>(); + getManagedLedgerInfo(name, false, new MetaStoreCallback<>() { + @Override + public void operationComplete(MLDataFormats.ManagedLedgerInfo mlInfo, Stat stat) { + HashMap propertiesMap = new HashMap<>(mlInfo.getPropertiesCount()); + if (mlInfo.getPropertiesCount() > 0) { + for (int i = 0; i < mlInfo.getPropertiesCount(); i++) { + MLDataFormats.KeyValue property = mlInfo.getProperties(i); + propertiesMap.put(property.getKey(), property.getValue()); + } + } + result.complete(propertiesMap); + } + + @Override + public void operationFailed(MetaStoreException e) { + if (e instanceof MetadataNotFoundException) { + result.complete(Collections.emptyMap()); + } else { + result.completeExceptionally(e); + } + } + }); + return result; + } + @Override public void asyncUpdateLedgerIds(String ledgerName, ManagedLedgerInfo mlInfo, Stat stat, MetaStoreCallback callback) { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java new file mode 100644 index 0000000000000..346780a228349 --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.bookkeeper.mledger.impl; + +import java.util.function.Supplier; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.common.util.OrderedScheduler; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.pulsar.common.naming.TopicName; + +/** + * Working in progress until PIP-180 is finished. + * Currently, it works nothing different with ManagedLedgerImpl. + */ +@Slf4j +public class ShadowManagedLedgerImpl extends ManagedLedgerImpl { + + private final TopicName shadowSource; + private final String sourceMLName; + + public ShadowManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper, + MetaStore store, ManagedLedgerConfig config, + OrderedScheduler scheduledExecutor, + String name, final Supplier mlOwnershipChecker) { + super(factory, bookKeeper, store, config, scheduledExecutor, name, mlOwnershipChecker); + this.shadowSource = TopicName.get(config.getShadowSource()); + this.sourceMLName = shadowSource.getPersistenceNamingEncoding(); + } + + @Override + synchronized void initialize(ManagedLedgerInitializeLedgerCallback callback, Object ctx) { + // TODO: ShadowManagedLedger has different initialize process from normal ManagedLedger, + // which is complicated and will be implemented in the next PRs. + super.initialize(callback, ctx); + } + + public TopicName getShadowSource() { + return shadowSource; + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 8491615448aae..2f6cb020b7085 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1405,6 +1405,22 @@ protected CompletableFuture> loadOrCreatePersistentTopic(final S return topicFuture; } + CompletableFuture> fetchTopicPropertiesAsync(TopicName topicName) { + if (!topicName.isPartitioned()) { + return managedLedgerFactory.getManagedLedgerPropertiesAsync(topicName.getPersistenceNamingEncoding()); + } else { + TopicName partitionedTopicName = TopicName.get(topicName.getPartitionedTopicName()); + return fetchPartitionedTopicMetadataAsync(partitionedTopicName) + .thenCompose(metadata -> { + if (metadata.partitions == PartitionedTopicMetadata.NON_PARTITIONED) { + return managedLedgerFactory.getManagedLedgerPropertiesAsync( + topicName.getPersistenceNamingEncoding()); + } + return CompletableFuture.completedFuture(metadata.properties); + }); + } + } + private void checkOwnershipAndCreatePersistentTopic(final String topic, boolean createIfMissing, CompletableFuture> topicFuture, Map properties) { @@ -1412,7 +1428,21 @@ private void checkOwnershipAndCreatePersistentTopic(final String topic, boolean pulsar.getNamespaceService().isServiceUnitActiveAsync(topicName) .thenAccept(isActive -> { if (isActive) { - createPersistentTopic(topic, createIfMissing, topicFuture, properties); + CompletableFuture> propertiesFuture; + if (properties == null) { + //Read properties from storage when loading topic. + propertiesFuture = fetchTopicPropertiesAsync(topicName); + } else { + propertiesFuture = CompletableFuture.completedFuture(properties); + } + propertiesFuture.thenAccept(finalProperties -> + createPersistentTopic(topic, createIfMissing, topicFuture, finalProperties) + ).exceptionally(throwable -> { + log.warn("[{}] Read topic property failed", topic, throwable); + pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); + topicFuture.completeExceptionally(throwable); + return null; + }); } else { // namespace is being unloaded String msg = String.format("Namespace is being unloaded, cannot add topic %s", topic); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index fb1c521bb00b0..a3193935e5325 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -72,6 +72,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.mledger.impl.ShadowManagedLedgerImpl; import org.apache.bookkeeper.net.BookieId; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -176,6 +177,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal private final ConcurrentOpenHashMap shadowReplicators; @Getter private volatile List shadowTopics; + private final TopicName shadowSourceTopic; static final String DEDUPLICATION_CURSOR_NAME = "pulsar.dedup"; private static final String TOPIC_EPOCH_PROPERTY_NAME = "pulsar.topic.epoch"; @@ -302,6 +304,11 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS this.transactionBuffer = new TransactionBufferDisable(); } transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry()); + if (ledger instanceof ShadowManagedLedgerImpl) { + shadowSourceTopic = ((ShadowManagedLedgerImpl) ledger).getShadowSource(); + } else { + shadowSourceTopic = null; + } } @Override @@ -381,6 +388,7 @@ public CompletableFuture initialize() { } else { this.transactionBuffer = new TransactionBufferDisable(); } + shadowSourceTopic = null; } private void initializeDispatchRateLimiterIfNeeded() { @@ -3330,4 +3338,8 @@ private CompletableFuture transactionBufferCleanupAndClose() { public long getLastDataMessagePublishedTimestamp() { return lastDataMessagePublishedTimestamp; } + + public Optional getShadowSourceTopic() { + return Optional.ofNullable(shadowSourceTopic); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 970bfd763a4e5..2771808a9fa0b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -201,6 +201,8 @@ public void setup() throws Exception { doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory(); doReturn(mock(PulsarClientImpl.class)).when(pulsar).getClient(); + doAnswer(invocationOnMock -> CompletableFuture.completedFuture(null)) + .when(mlFactoryMock).getManagedLedgerPropertiesAsync(any()); doAnswer(invocation -> { DeleteLedgerCallback deleteLedgerCallback = invocation.getArgument(1); deleteLedgerCallback.deleteLedgerComplete(null); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicTest.java new file mode 100644 index 0000000000000..22bfd70cf8883 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicTest.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.broker.service.persistent; + + +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.impl.ShadowManagedLedgerImpl; +import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.common.naming.TopicName; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Slf4j +public class ShadowTopicTest extends BrokerTestBase { + + @BeforeClass(alwaysRun = true) + @Override + protected void setup() throws Exception { + baseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + internalCleanup(); + } + + @Test() + public void testNonPartitionedShadowTopicSetup() throws Exception { + String sourceTopic = "persistent://prop/ns-abc/source"; + String shadowTopic = "persistent://prop/ns-abc/shadow"; + //1. test shadow topic setting in topic creation. + admin.topics().createNonPartitionedTopic(sourceTopic); + admin.topics().createShadowTopic(shadowTopic, sourceTopic); + PersistentTopic brokerShadowTopic = + (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(shadowTopic).get().get(); + Assert.assertTrue(brokerShadowTopic.getManagedLedger() instanceof ShadowManagedLedgerImpl); + Assert.assertEquals(brokerShadowTopic.getShadowSourceTopic().get().toString(), sourceTopic); + Assert.assertEquals(admin.topics().getShadowSource(shadowTopic), sourceTopic); + + //2. test shadow topic could be properly loaded after unload. + admin.namespaces().unload("prop/ns-abc"); + Assert.assertTrue(pulsar.getBrokerService().getTopicReference(shadowTopic).isEmpty()); + Assert.assertEquals(admin.topics().getShadowSource(shadowTopic), sourceTopic); + brokerShadowTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(shadowTopic).get().get(); + Assert.assertTrue(brokerShadowTopic.getManagedLedger() instanceof ShadowManagedLedgerImpl); + Assert.assertEquals(brokerShadowTopic.getShadowSourceTopic().get().toString(), sourceTopic); + } + + @Test() + public void testPartitionedShadowTopicSetup() throws Exception { + String sourceTopic = "persistent://prop/ns-abc/source-p"; + String shadowTopic = "persistent://prop/ns-abc/shadow-p"; + String shadowTopicPartition = TopicName.get(shadowTopic).getPartition(0).toString(); + + //1. test shadow topic setting in topic creation. + admin.topics().createPartitionedTopic(sourceTopic, 2); + admin.topics().createShadowTopic(shadowTopic, sourceTopic); + pulsarClient.newProducer().topic(shadowTopic).create().close();//trigger loading partitions. + PersistentTopic brokerShadowTopic = (PersistentTopic) pulsar.getBrokerService() + .getTopicIfExists(shadowTopicPartition).get().get(); + Assert.assertTrue(brokerShadowTopic.getManagedLedger() instanceof ShadowManagedLedgerImpl); + Assert.assertEquals(brokerShadowTopic.getShadowSourceTopic().get().toString(), sourceTopic); + Assert.assertEquals(admin.topics().getShadowSource(shadowTopic), sourceTopic); + + //2. test shadow topic could be properly loaded after unload. + admin.namespaces().unload("prop/ns-abc"); + Assert.assertTrue(pulsar.getBrokerService().getTopicReference(shadowTopic).isEmpty()); + + Assert.assertEquals(admin.topics().getShadowSource(shadowTopic), sourceTopic); + brokerShadowTopic = + (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(shadowTopicPartition).get().get(); + Assert.assertTrue(brokerShadowTopic.getManagedLedger() instanceof ShadowManagedLedgerImpl); + Assert.assertEquals(brokerShadowTopic.getShadowSourceTopic().get().toString(), sourceTopic); + } + + +} diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java index 177cae9a9a438..d2d0754f1d112 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -4397,4 +4397,58 @@ CompletableFuture> examineMessageAsync(String topic, String init * @param sourceTopic source topic name */ CompletableFuture> getShadowTopicsAsync(String sourceTopic); + + /** + * Get the shadow source topic name of the given shadow topic. + * @param shadowTopic shadow topic name. + * @return The topic name of the source of the shadow topic. + */ + String getShadowSource(String shadowTopic) throws PulsarAdminException; + + /** + * Get the shadow source topic name of the given shadow topic asynchronously. + * @param shadowTopic shadow topic name. + * @return The topic name of the source of the shadow topic. + */ + CompletableFuture getShadowSourceAsync(String shadowTopic); + + /** + * Create a new shadow topic as the shadow of the source topic. + * The source topic must exist before call this method. + *

+ * For partitioned source topic, the partition number of shadow topic follows the source topic at creation. If + * the partition number of the source topic changes, the shadow topic needs to update its partition number + * manually. + * For non-partitioned source topic, the shadow topic will be created as non-partitioned topic. + *

+ * + * NOTE: This is still WIP until PIP-180 is finished. + * + * @param shadowTopic shadow topic name, and it must be a persistent topic name. + * @param sourceTopic source topic name, and it must be a persistent topic name. + * @param properties properties to be created with in the shadow topic. + * @throws PulsarAdminException + */ + void createShadowTopic(String shadowTopic, String sourceTopic, Map properties) + throws PulsarAdminException; + + /** + * Create a new shadow topic, see #{@link #createShadowTopic(String, String, Map)} for details. + */ + CompletableFuture createShadowTopicAsync(String shadowTopic, String sourceTopic, + Map properties); + + /** + * Create a new shadow topic, see #{@link #createShadowTopic(String, String, Map)} for details. + */ + default void createShadowTopic(String shadowTopic, String sourceTopic) throws PulsarAdminException { + createShadowTopic(shadowTopic, sourceTopic, null); + } + + /** + * Create a new shadow topic, see #{@link #createShadowTopic(String, String, Map)} for details. + */ + default CompletableFuture createShadowTopicAsync(String shadowTopic, String sourceTopic) { + return createShadowTopicAsync(shadowTopic, sourceTopic, null); + } } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index e3b51accdfd7c..4312080ac2233 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -131,6 +131,8 @@ public class TopicsImpl extends BaseResource implements Topics { private static final String ENCRYPTION_KEYS = "X-Pulsar-Base64-encryption-keys"; // CHECKSTYLE.ON: MemberName + public static final String PROPERTY_SHADOW_SOURCE_KEY = "PULSAR.SHADOW_SOURCE"; + public TopicsImpl(WebTarget web, Authentication auth, long readTimeoutMs) { super(auth, readTimeoutMs); adminTopics = web.path("/admin"); @@ -2705,7 +2707,43 @@ public CompletableFuture removeShadowTopicsAsync(String sourceTopic) { public CompletableFuture> getShadowTopicsAsync(String sourceTopic) { TopicName tn = validateTopic(sourceTopic); WebTarget path = topicPath(tn, "shadowTopics"); - return asyncGetRequest(path, new FutureCallback>(){}); + return asyncGetRequest(path, new FutureCallback>() {}); + } + + @Override + public String getShadowSource(String shadowTopic) throws PulsarAdminException { + return sync(() -> getShadowSourceAsync(shadowTopic)); + } + + @Override + public CompletableFuture getShadowSourceAsync(String shadowTopic) { + return getPropertiesAsync(shadowTopic).thenApply( + properties -> properties != null ? properties.get(PROPERTY_SHADOW_SOURCE_KEY) : null); + } + + @Override + public void createShadowTopic(String shadowTopic, String sourceTopic, Map properties) + throws PulsarAdminException { + sync(() -> createShadowTopicAsync(shadowTopic, sourceTopic, properties)); + } + + @Override + public CompletableFuture createShadowTopicAsync(String shadowTopic, String sourceTopic, + Map properties) { + checkArgument(TopicName.get(shadowTopic).isPersistent(), "Shadow topic must be persistent"); + checkArgument(TopicName.get(sourceTopic).isPersistent(), "Source topic must be persistent"); + return getPartitionedTopicMetadataAsync(sourceTopic).thenCompose(sourceTopicMeta -> { + HashMap shadowProperties = new HashMap<>(); + if (properties != null) { + shadowProperties.putAll(properties); + } + shadowProperties.put(PROPERTY_SHADOW_SOURCE_KEY, sourceTopic); + if (sourceTopicMeta.partitions == PartitionedTopicMetadata.NON_PARTITIONED) { + return createNonPartitionedTopicAsync(shadowTopic, shadowProperties); + } else { + return createPartitionedTopicAsync(shadowTopic, sourceTopicMeta.partitions, shadowProperties); + } + }); } private static final Logger log = LoggerFactory.getLogger(TopicsImpl.class); diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index b9cfbe8c97eab..bcb1fca1705e5 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -34,7 +34,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; import com.google.common.collect.Sets; - import java.io.ByteArrayOutputStream; import java.io.File; import java.io.PrintStream; @@ -51,7 +50,6 @@ import java.util.Properties; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; - import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.admin.cli.utils.SchemaExtractor; import org.apache.pulsar.client.admin.Bookies; @@ -74,7 +72,6 @@ import org.apache.pulsar.client.admin.Topics; import org.apache.pulsar.client.admin.Transactions; import org.apache.pulsar.client.admin.internal.OffloadProcessStatusImpl; -import org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl; import org.apache.pulsar.client.admin.internal.PulsarAdminImpl; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.SubscriptionType; @@ -1946,6 +1943,22 @@ public boolean matches(Long timestamp) { cmdTopics.run(split("remove-shadow-topics persistent://myprop/clust/ns1/ds1")); verify(mockTopics).removeShadowTopics("persistent://myprop/clust/ns1/ds1"); + cmdTopics.run(split("create-shadow-topic -s persistent://myprop/clust/ns1/source persistent://myprop/clust/ns1/ds1")); + verify(mockTopics).createShadowTopic("persistent://myprop/clust/ns1/ds1", "persistent://myprop/clust/ns1/source", null); + + cmdTopics = new CmdTopics(() -> admin); + cmdTopics.run(split("create-shadow-topic -p a=aa,b=bb,c=cc -s persistent://myprop/clust/ns1/source persistent://myprop/clust/ns1/ds1")); + HashMap p = new HashMap<>(); + p.put("a","aa"); + p.put("b","bb"); + p.put("c","cc"); + verify(mockTopics).createShadowTopic("persistent://myprop/clust/ns1/ds1", "persistent://myprop/clust/ns1/source", p); + + cmdTopics.run(split("get-shadow-source persistent://myprop/clust/ns1/ds1")); + verify(mockTopics).getShadowSource("persistent://myprop/clust/ns1/ds1"); + + + } private static LedgerInfo newLedger(long id, long entries, long size) { diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index ae37a591bae3a..245cfc1b85506 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -255,6 +255,8 @@ public CmdTopics(Supplier admin) { jcommander.addCommand("get-shadow-topics", new GetShadowTopics()); jcommander.addCommand("set-shadow-topics", new SetShadowTopics()); jcommander.addCommand("remove-shadow-topics", new RemoveShadowTopics()); + jcommander.addCommand("create-shadow-topic", new CreateShadowTopic()); + jcommander.addCommand("get-shadow-source", new GetShadowSource()); jcommander.addCommand("get-schema-validation-enforce", new GetSchemaValidationEnforced()); jcommander.addCommand("set-schema-validation-enforce", new SetSchemaValidationEnforced()); @@ -1714,6 +1716,38 @@ void run() throws PulsarAdminException { } } + @Parameters(commandDescription = "Create a shadow topic for an existing source topic.") + private class CreateShadowTopic extends CliCommand { + + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Parameter(names = {"--source", "-s"}, description = "source topic name", required = true) + private String sourceTopic; + + @Parameter(names = {"--properties", "-p"}, description = "key value pair properties(eg: a=a b=b c=c)") + private java.util.List propertyList; + + @Override + void run() throws Exception { + String topic = validateTopicName(params); + Map properties = parseListKeyValueMap(propertyList); + getTopics().createShadowTopic(topic, TopicName.get(sourceTopic).toString(), properties); + } + } + + @Parameters(commandDescription = "Get the source topic for a shadow topic") + private class GetShadowSource extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Override + void run() throws PulsarAdminException { + String shadowTopic = validatePersistentTopic(params); + print(getTopics().getShadowSource(shadowTopic)); + } + } + @Parameters(commandDescription = "Get the delayed delivery policy for a topic") private class GetDelayedDelivery extends CliCommand { @Parameter(description = "tenant/namespace/topic", required = true)