Skip to content

Commit

Permalink
Fixing PagedTopic ensureTopic
Browse files Browse the repository at this point in the history
Fixing ConverterNamedTopic
Add debugging to TopicSubscriberManagementTests
  • Loading branch information
thegridman committed Dec 1, 2024
1 parent c7c228b commit ec0b3cd
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1245,13 +1245,26 @@ public NamedTopicView ensureTopic(String sName, ClassLoader loader)
topic = (NamedTopicView) store.get(sName, loader);
if (topic != null)
{
return topic;
if (topic.isActive())
{
return topic;
}
}

ReentrantLock lock = getTopicStoreLock();
lock.lock();
try
{
topic = (NamedTopicView) store.get(sName, loader);
if (topic != null)
{
if (topic.isActive())
{
return topic;
}
store.releaseTopic(topic, loader);
}

PagedTopicCaches topicCaches = new PagedTopicCaches(sName, this);
NamedTopicView pagedTopic = new NamedTopicView(new PagedTopicConnector(topicCaches));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

import com.tangosol.net.Service;

import com.tangosol.net.TopicService;

import com.tangosol.net.topic.NamedTopic;
import com.tangosol.net.topic.NamedTopicEvent;
import com.tangosol.net.topic.NamedTopicListener;
Expand Down Expand Up @@ -60,6 +62,12 @@ public ConverterNamedTopic(NamedTopic<F> topic, Converter<F, T> convUp, Converte

// ----- NamedTopic methods -----------------------------------------

@Override
public Publisher<T> createPublisher()
{
return new ConverterPublisher<>(f_topic.createPublisher(), this);
}

@Override
@SuppressWarnings({"rawtypes", "unchecked"})
public Publisher<T> createPublisher(Publisher.Option<? super T>... options)
Expand All @@ -68,6 +76,12 @@ public Publisher<T> createPublisher(Publisher.Option<? super T>... options)
return new ConverterPublisher<>(f_topic.createPublisher(opts), this);
}

@Override
public Subscriber<T> createSubscriber()
{
return new ConverterSubscriber<>(f_topic.createSubscriber(), this);
}

@Override
@SuppressWarnings({"rawtypes", "unchecked"})
public <U> Subscriber<U> createSubscriber(Subscriber.Option<? super T, U>... options)
Expand All @@ -76,6 +90,18 @@ public <U> Subscriber<U> createSubscriber(Subscriber.Option<? super T, U>... opt
return new ConverterSubscriber<>(f_topic.createSubscriber(opts), this);
}

@Override
public void ensureSubscriberGroup(String sName)
{
f_topic.ensureSubscriberGroup(sName);
}

@Override
public TopicService getTopicService()
{
return f_topic.getTopicService();
}

@Override
public void ensureSubscriberGroup(String sGroup, Filter<?> filter, ValueExtractor<?, ?> extractor)
{
Expand Down Expand Up @@ -136,6 +162,30 @@ public void destroy()
f_topic.destroy();
}

@Override
public boolean isDestroyed()
{
return f_topic.isDestroyed();
}

@Override
public boolean isReleased()
{
return f_topic.isReleased();
}

@Override
public int getRemainingMessages(String sSubscriberGroup)
{
return f_topic.getRemainingMessages(sSubscriberGroup);
}

@Override
public void close()
{
f_topic.close();
}

@Override
public boolean isActive()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.junit.rules.TestName;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -220,16 +221,25 @@ public void shouldDisconnectSingleSubscriberByKey() throws Exception
assertThat(futureLostThree.isDone(), is(false));

// all channels should eventually be allocated to subscriberTwo
Eventually.assertDeferred(() -> subscriberTwo.getChannels().length, is(topic.getChannelCount()));
Eventually.assertDeferred(() ->
{
int[] anChannel = subscriberTwo.getChannels();
System.err.println(">>>>> In shouldDisconnectSingleSubscriberByKey - Eventually asserting subscriberTwo channels current=" + Arrays.toString(anChannel));
return anChannel.length;
}, is(topic.getChannelCount()));
final AtomicReference<Map<Long, Set<Integer>>> ref = new AtomicReference<>();
Eventually.assertDeferred(() ->
{
Map<Long, Set<Integer>> map = caches.getChannelAllocations(sGroupOne);
ref.set(map);
System.err.println(">>>>> In shouldDisconnectSingleSubscriberByKey - Eventually asserting subscriberTwo subscription channels current=" + map);
return map.size();
}, is(1));

assertThat(ref.get().get(subscriberTwo.getId()), is(subscriberTwo.getChannelSet()));
Map<Long, Set<Integer>> mapAllocation = ref.get();
Set<Integer> channelSet = subscriberTwo.getChannelSet();
System.err.println(">>>>> In shouldDisconnectSingleSubscriberByGroupAndId - Asserting subscriberTwo channels == subscription channels - subscriber=" + channelSet + " map=" + mapAllocation);
assertThat(mapAllocation.get(subscriberTwo.getId()), is(subscriberTwo.getChannelSet()));

// receive futures should still be waiting
assertThat(futureOne.isDone(), is(false));
Expand Down Expand Up @@ -313,10 +323,23 @@ public void shouldDisconnectSingleSubscriberByGroupAndId() throws Exception
assertThat(futureLostThree.isDone(), is(false));

// all channels should eventually be allocated to subscriberTwo
Eventually.assertDeferred(() -> subscriberTwo.getChannels().length, is(topic.getChannelCount()));
Eventually.assertDeferred(() -> caches.getChannelAllocations(sGroupOne).size(), is(1));
Eventually.assertDeferred(() ->
{
int[] anChannel = subscriberTwo.getChannels();
System.err.println(">>>>> In shouldDisconnectSingleSubscriberByGroupAndId - Eventually asserting subscriberTwo channels current=" + Arrays.toString(anChannel));
return anChannel.length;
}, is(topic.getChannelCount()));
Eventually.assertDeferred(() ->
{
Map<Long, Set<Integer>> map = caches.getChannelAllocations(sGroupOne);
System.err.println(">>>>> In shouldDisconnectSingleSubscriberByGroupAndId - Eventually asserting subscriberTwo subscription channels current=" + map);
return map.size();
}, is(1));

Map<Long, Set<Integer>> mapAllocation = caches.getChannelAllocations(sGroupOne);
assertThat(mapAllocation.get(subscriberTwo.getId()), is(subscriberTwo.getChannelSet()));
Set<Integer> channelSet = subscriberTwo.getChannelSet();
System.err.println(">>>>> In shouldDisconnectSingleSubscriberByGroupAndId - Asserting subscriberTwo channels == subscription channels - subscriber=" + channelSet + " map=" + mapAllocation);
assertThat(mapAllocation.get(subscriberTwo.getId()), is(channelSet));

// receive futures should still be waiting
assertThat(futureOne.isDone(), is(false));
Expand Down

0 comments on commit ec0b3cd

Please sign in to comment.